diff options
| author | Chris Lu <chris.lu@uber.com> | 2019-03-25 09:16:12 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@uber.com> | 2019-03-25 09:16:12 -0700 |
| commit | 70815e91249f481b71ca1fbca14ff41430e42681 (patch) | |
| tree | 3a476da560702cc9249e15a09eff0cc777ebca63 /weed/server | |
| parent | eaa42c3865f65153d12fc8e9b63bdf45b13ea9c3 (diff) | |
| download | seaweedfs-70815e91249f481b71ca1fbca14ff41430e42681.tar.xz seaweedfs-70815e91249f481b71ca1fbca14ff41430e42681.zip | |
WIP
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/volume_grpc_follow.go | 53 |
1 files changed, 53 insertions, 0 deletions
diff --git a/weed/server/volume_grpc_follow.go b/weed/server/volume_grpc_follow.go new file mode 100644 index 000000000..bdd0ef6f5 --- /dev/null +++ b/weed/server/volume_grpc_follow.go @@ -0,0 +1,53 @@ +package weed_server + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "io" + "os" + + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +func (vs *VolumeServer) VolumeFollow(req *volume_server_pb.VolumeFollowRequest, stream volume_server_pb.VolumeServer_VolumeFollowServer) error { + + v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + if v == nil { + return fmt.Errorf("not found volume id %d", req.VolumeId) + } + + stopOffset := v.Size() + foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.Since) + if err != nil { + return fmt.Errorf("fail to locate by appendAtNs: %s", err) + } + + if isLastOne { + return nil + } + + startOffset := int64(foundOffset) * int64(types.NeedleEntrySize) + + buf := make([]byte, 1024*1024*2) + return sendFileContent(v.DataFile(), buf, startOffset, stopOffset, stream) + +} + +func sendFileContent(datFile *os.File, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeFollowServer) error { + var blockSizeLimit = int64(len(buf)) + for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit { + n, readErr := datFile.ReadAt(buf, startOffset+i) + if readErr == nil || readErr == io.EOF { + resp := &volume_server_pb.VolumeFollowResponse{} + resp.FileContent = buf[i : i+int64(n)] + sendErr := stream.Send(resp) + if sendErr != nil { + return sendErr + } + } else { + return readErr + } + } + return nil +} |
