aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@uber.com>2019-03-25 09:16:12 -0700
committerChris Lu <chris.lu@uber.com>2019-03-25 09:16:12 -0700
commit70815e91249f481b71ca1fbca14ff41430e42681 (patch)
tree3a476da560702cc9249e15a09eff0cc777ebca63 /weed/server
parenteaa42c3865f65153d12fc8e9b63bdf45b13ea9c3 (diff)
downloadseaweedfs-70815e91249f481b71ca1fbca14ff41430e42681.tar.xz
seaweedfs-70815e91249f481b71ca1fbca14ff41430e42681.zip
WIP
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/volume_grpc_follow.go53
1 files changed, 53 insertions, 0 deletions
diff --git a/weed/server/volume_grpc_follow.go b/weed/server/volume_grpc_follow.go
new file mode 100644
index 000000000..bdd0ef6f5
--- /dev/null
+++ b/weed/server/volume_grpc_follow.go
@@ -0,0 +1,53 @@
+package weed_server
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "io"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+)
+
+func (vs *VolumeServer) VolumeFollow(req *volume_server_pb.VolumeFollowRequest, stream volume_server_pb.VolumeServer_VolumeFollowServer) error {
+
+ v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ stopOffset := v.Size()
+ foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.Since)
+ if err != nil {
+ return fmt.Errorf("fail to locate by appendAtNs: %s", err)
+ }
+
+ if isLastOne {
+ return nil
+ }
+
+ startOffset := int64(foundOffset) * int64(types.NeedleEntrySize)
+
+ buf := make([]byte, 1024*1024*2)
+ return sendFileContent(v.DataFile(), buf, startOffset, stopOffset, stream)
+
+}
+
+func sendFileContent(datFile *os.File, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeFollowServer) error {
+ var blockSizeLimit = int64(len(buf))
+ for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit {
+ n, readErr := datFile.ReadAt(buf, startOffset+i)
+ if readErr == nil || readErr == io.EOF {
+ resp := &volume_server_pb.VolumeFollowResponse{}
+ resp.FileContent = buf[i : i+int64(n)]
+ sendErr := stream.Send(resp)
+ if sendErr != nil {
+ return sendErr
+ }
+ } else {
+ return readErr
+ }
+ }
+ return nil
+}