diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-04-18 11:05:02 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-04-18 11:05:02 -0700 |
| commit | 3dce1016cb36a511e3ae0fa67c5eb5631f1ce4d6 (patch) | |
| tree | e91672383dc2bc10a75079c4d9ca6ca98eef4994 /weed/server | |
| parent | b142f9f1d57cebd1445baed7ce7fc88bb0f450f0 (diff) | |
| download | seaweedfs-3dce1016cb36a511e3ae0fa67c5eb5631f1ce4d6.tar.xz seaweedfs-3dce1016cb36a511e3ae0fa67c5eb5631f1ce4d6.zip | |
add volume tailer
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/volume_grpc_stream_follow.go | 54 |
1 files changed, 30 insertions, 24 deletions
diff --git a/weed/server/volume_grpc_stream_follow.go b/weed/server/volume_grpc_stream_follow.go index 7c01e4b9c..52a9c1473 100644 --- a/weed/server/volume_grpc_stream_follow.go +++ b/weed/server/volume_grpc_stream_follow.go @@ -9,57 +9,63 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" ) -func (vs *VolumeServer) VolumeStreamFollow(req *volume_server_pb.VolumeStreamFollowRequest, stream volume_server_pb.VolumeServer_VolumeStreamFollowServer) error { +func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stream volume_server_pb.VolumeServer_VolumeTailServer) error { v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) if v == nil { return fmt.Errorf("not found volume id %d", req.VolumeId) } + defer glog.V(1).Infof("tailing volume %d finished", v.Id) + lastTimestampNs := req.SinceNs drainingSeconds := req.DrainingSeconds - ticker := time.NewTicker(time.Second) - for { - select { - case <-ticker.C: - lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs) - if err != nil { - return fmt.Errorf("streamFollow: %v", err) - } - if req.DrainingSeconds == 0 { - continue - } - if lastProcessedTimestampNs == lastTimestampNs { - drainingSeconds-- - if drainingSeconds <= 0 { - return nil - } - glog.V(0).Infof("volume %d drains requests with %d seconds remaining ...", v.Id, drainingSeconds) - } else { - drainingSeconds = req.DrainingSeconds - glog.V(0).Infof("volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds) + lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs) + if err != nil { + glog.Infof("sendNeedlesSince: %v", err) + return fmt.Errorf("streamFollow: %v", err) + } + time.Sleep(2 * time.Second) + + if req.DrainingSeconds == 0 { + continue + } + if lastProcessedTimestampNs == lastTimestampNs { + drainingSeconds-- + if drainingSeconds <= 0 { + return nil } + glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds) + } else { + lastTimestampNs = lastProcessedTimestampNs + drainingSeconds = req.DrainingSeconds + glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds) } + } } -func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeStreamFollowServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) { +func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) { foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs) if err != nil { return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err) } + // log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne) + if isLastOne { - return lastTimestampNs, nil + // need to heart beat to the client to ensure the connection health + sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{}) + return lastTimestampNs, sendErr } err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error { - sendErr := stream.Send(&volume_server_pb.VolumeStreamFollowResponse{ + sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{ NeedleHeader: needleHeader, NeedleBody: needleBody, }) |
