aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-04-18 00:18:29 -0700
committerChris Lu <chris.lu@gmail.com>2019-04-18 00:18:29 -0700
commitb09e8dbb377562e6d61bf40b5dbadc3a3edd1360 (patch)
tree79d086d0bd92a693de8f1afa958d254d53d097ea /weed/server
parent13ad5c196656c586e908defce9b6d8e717663625 (diff)
downloadseaweedfs-b09e8dbb377562e6d61bf40b5dbadc3a3edd1360.tar.xz
seaweedfs-b09e8dbb377562e6d61bf40b5dbadc3a3edd1360.zip
add VolumeStreamFollow, but not used yet
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/volume_grpc_copy_incremental.go4
-rw-r--r--weed/server/volume_grpc_stream_follow.go77
2 files changed, 79 insertions, 2 deletions
diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go
index 41b7a798c..06e7017e8 100644
--- a/weed/server/volume_grpc_copy_incremental.go
+++ b/weed/server/volume_grpc_copy_incremental.go
@@ -18,9 +18,9 @@ func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrem
}
stopOffset := v.Size()
- foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.Since)
+ foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.SinceNs)
if err != nil {
- return fmt.Errorf("fail to locate by appendAtNs %d: %s", req.Since, err)
+ return fmt.Errorf("fail to locate by appendAtNs %d: %s", req.SinceNs, err)
}
if isLastOne {
diff --git a/weed/server/volume_grpc_stream_follow.go b/weed/server/volume_grpc_stream_follow.go
new file mode 100644
index 000000000..7c01e4b9c
--- /dev/null
+++ b/weed/server/volume_grpc_stream_follow.go
@@ -0,0 +1,77 @@
+package weed_server
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+)
+
+func (vs *VolumeServer) VolumeStreamFollow(req *volume_server_pb.VolumeStreamFollowRequest, stream volume_server_pb.VolumeServer_VolumeStreamFollowServer) error {
+
+ v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ lastTimestampNs := req.SinceNs
+ drainingSeconds := req.DrainingSeconds
+
+ ticker := time.NewTicker(time.Second)
+
+ for {
+ select {
+ case <-ticker.C:
+ lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
+ if err != nil {
+ return fmt.Errorf("streamFollow: %v", err)
+ }
+ if req.DrainingSeconds == 0 {
+ continue
+ }
+ if lastProcessedTimestampNs == lastTimestampNs {
+ drainingSeconds--
+ if drainingSeconds <= 0 {
+ return nil
+ }
+ glog.V(0).Infof("volume %d drains requests with %d seconds remaining ...", v.Id, drainingSeconds)
+ } else {
+ drainingSeconds = req.DrainingSeconds
+ glog.V(0).Infof("volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
+ }
+ }
+ }
+
+}
+
+func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeStreamFollowServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
+
+ foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
+ if err != nil {
+ return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err)
+ }
+
+ if isLastOne {
+ return lastTimestampNs, nil
+ }
+
+ err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
+
+ sendErr := stream.Send(&volume_server_pb.VolumeStreamFollowResponse{
+ NeedleHeader: needleHeader,
+ NeedleBody: needleBody,
+ })
+ if sendErr != nil {
+ return sendErr
+ }
+
+ lastProcessedTimestampNs = needleAppendAtNs
+ return nil
+
+ })
+
+ return
+
+}