aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-04-18 11:05:02 -0700
committerChris Lu <chris.lu@gmail.com>2019-04-18 11:05:02 -0700
commit3dce1016cb36a511e3ae0fa67c5eb5631f1ce4d6 (patch)
treee91672383dc2bc10a75079c4d9ca6ca98eef4994 /weed/server
parentb142f9f1d57cebd1445baed7ce7fc88bb0f450f0 (diff)
downloadseaweedfs-3dce1016cb36a511e3ae0fa67c5eb5631f1ce4d6.tar.xz
seaweedfs-3dce1016cb36a511e3ae0fa67c5eb5631f1ce4d6.zip
add volume tailer
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/volume_grpc_stream_follow.go54
1 files changed, 30 insertions, 24 deletions
diff --git a/weed/server/volume_grpc_stream_follow.go b/weed/server/volume_grpc_stream_follow.go
index 7c01e4b9c..52a9c1473 100644
--- a/weed/server/volume_grpc_stream_follow.go
+++ b/weed/server/volume_grpc_stream_follow.go
@@ -9,57 +9,63 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage"
)
-func (vs *VolumeServer) VolumeStreamFollow(req *volume_server_pb.VolumeStreamFollowRequest, stream volume_server_pb.VolumeServer_VolumeStreamFollowServer) error {
+func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stream volume_server_pb.VolumeServer_VolumeTailServer) error {
v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
if v == nil {
return fmt.Errorf("not found volume id %d", req.VolumeId)
}
+ defer glog.V(1).Infof("tailing volume %d finished", v.Id)
+
lastTimestampNs := req.SinceNs
drainingSeconds := req.DrainingSeconds
- ticker := time.NewTicker(time.Second)
-
for {
- select {
- case <-ticker.C:
- lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
- if err != nil {
- return fmt.Errorf("streamFollow: %v", err)
- }
- if req.DrainingSeconds == 0 {
- continue
- }
- if lastProcessedTimestampNs == lastTimestampNs {
- drainingSeconds--
- if drainingSeconds <= 0 {
- return nil
- }
- glog.V(0).Infof("volume %d drains requests with %d seconds remaining ...", v.Id, drainingSeconds)
- } else {
- drainingSeconds = req.DrainingSeconds
- glog.V(0).Infof("volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
+ lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
+ if err != nil {
+ glog.Infof("sendNeedlesSince: %v", err)
+ return fmt.Errorf("streamFollow: %v", err)
+ }
+ time.Sleep(2 * time.Second)
+
+ if req.DrainingSeconds == 0 {
+ continue
+ }
+ if lastProcessedTimestampNs == lastTimestampNs {
+ drainingSeconds--
+ if drainingSeconds <= 0 {
+ return nil
}
+ glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds)
+ } else {
+ lastTimestampNs = lastProcessedTimestampNs
+ drainingSeconds = req.DrainingSeconds
+ glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
}
+
}
}
-func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeStreamFollowServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
+func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
if err != nil {
return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err)
}
+ // log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne)
+
if isLastOne {
- return lastTimestampNs, nil
+ // need to heart beat to the client to ensure the connection health
+ sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{})
+ return lastTimestampNs, sendErr
}
err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
- sendErr := stream.Send(&volume_server_pb.VolumeStreamFollowResponse{
+ sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{
NeedleHeader: needleHeader,
NeedleBody: needleBody,
})