diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-04-18 00:18:29 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-04-18 00:18:29 -0700 |
| commit | b09e8dbb377562e6d61bf40b5dbadc3a3edd1360 (patch) | |
| tree | 79d086d0bd92a693de8f1afa958d254d53d097ea /weed/server | |
| parent | 13ad5c196656c586e908defce9b6d8e717663625 (diff) | |
| download | seaweedfs-b09e8dbb377562e6d61bf40b5dbadc3a3edd1360.tar.xz seaweedfs-b09e8dbb377562e6d61bf40b5dbadc3a3edd1360.zip | |
add VolumeStreamFollow, but not used yet
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/volume_grpc_copy_incremental.go | 4 | ||||
| -rw-r--r-- | weed/server/volume_grpc_stream_follow.go | 77 |
2 files changed, 79 insertions, 2 deletions
diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go index 41b7a798c..06e7017e8 100644 --- a/weed/server/volume_grpc_copy_incremental.go +++ b/weed/server/volume_grpc_copy_incremental.go @@ -18,9 +18,9 @@ func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrem } stopOffset := v.Size() - foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.Since) + foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.SinceNs) if err != nil { - return fmt.Errorf("fail to locate by appendAtNs %d: %s", req.Since, err) + return fmt.Errorf("fail to locate by appendAtNs %d: %s", req.SinceNs, err) } if isLastOne { diff --git a/weed/server/volume_grpc_stream_follow.go b/weed/server/volume_grpc_stream_follow.go new file mode 100644 index 000000000..7c01e4b9c --- /dev/null +++ b/weed/server/volume_grpc_stream_follow.go @@ -0,0 +1,77 @@ +package weed_server + +import ( + "fmt" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +func (vs *VolumeServer) VolumeStreamFollow(req *volume_server_pb.VolumeStreamFollowRequest, stream volume_server_pb.VolumeServer_VolumeStreamFollowServer) error { + + v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + if v == nil { + return fmt.Errorf("not found volume id %d", req.VolumeId) + } + + lastTimestampNs := req.SinceNs + drainingSeconds := req.DrainingSeconds + + ticker := time.NewTicker(time.Second) + + for { + select { + case <-ticker.C: + lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs) + if err != nil { + return fmt.Errorf("streamFollow: %v", err) + } + if req.DrainingSeconds == 0 { + continue + } + if lastProcessedTimestampNs == lastTimestampNs { + drainingSeconds-- + if drainingSeconds <= 0 { + return nil + } + glog.V(0).Infof("volume %d drains requests with %d seconds remaining ...", v.Id, drainingSeconds) + } else { + drainingSeconds = req.DrainingSeconds + glog.V(0).Infof("volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds) + } + } + } + +} + +func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeStreamFollowServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) { + + foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs) + if err != nil { + return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err) + } + + if isLastOne { + return lastTimestampNs, nil + } + + err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error { + + sendErr := stream.Send(&volume_server_pb.VolumeStreamFollowResponse{ + NeedleHeader: needleHeader, + NeedleBody: needleBody, + }) + if sendErr != nil { + return sendErr + } + + lastProcessedTimestampNs = needleAppendAtNs + return nil + + }) + + return + +} |
