aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_tail.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/volume_grpc_tail.go')
-rw-r--r--weed/server/volume_grpc_tail.go97
1 files changed, 97 insertions, 0 deletions
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
new file mode 100644
index 000000000..da248498f
--- /dev/null
+++ b/weed/server/volume_grpc_tail.go
@@ -0,0 +1,97 @@
+package weed_server
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+)
+
+func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stream volume_server_pb.VolumeServer_VolumeTailServer) error {
+
+ v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ defer glog.V(1).Infof("tailing volume %d finished", v.Id)
+
+ lastTimestampNs := req.SinceNs
+ drainingSeconds := req.DrainingSeconds
+
+ for {
+ lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
+ if err != nil {
+ glog.Infof("sendNeedlesSince: %v", err)
+ return fmt.Errorf("streamFollow: %v", err)
+ }
+ time.Sleep(2 * time.Second)
+
+ if req.DrainingSeconds == 0 {
+ lastTimestampNs = lastProcessedTimestampNs
+ continue
+ }
+ if lastProcessedTimestampNs == lastTimestampNs {
+ drainingSeconds--
+ if drainingSeconds <= 0 {
+ return nil
+ }
+ glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds)
+ } else {
+ lastTimestampNs = lastProcessedTimestampNs
+ drainingSeconds = req.DrainingSeconds
+ glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
+ }
+
+ }
+
+}
+
+func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
+
+ foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
+ if err != nil {
+ return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err)
+ }
+
+ // log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne)
+
+ if isLastOne {
+ // need to heart beat to the client to ensure the connection health
+ sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{IsLastChunk: true})
+ return lastTimestampNs, sendErr
+ }
+
+ err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
+
+ blockSizeLimit := 1024 * 1024 * 2
+ isLastChunk := false
+
+ // need to send body by chunks
+ for i := 0; i < len(needleBody); i += blockSizeLimit {
+ stopOffset := i + blockSizeLimit
+ if stopOffset >= len(needleBody) {
+ isLastChunk = true
+ stopOffset = len(needleBody)
+ }
+
+ sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{
+ NeedleHeader: needleHeader,
+ NeedleBody: needleBody[i:stopOffset],
+ IsLastChunk: isLastChunk,
+ })
+ if sendErr != nil {
+ return sendErr
+ }
+ }
+
+ lastProcessedTimestampNs = needleAppendAtNs
+ return nil
+
+ })
+
+ return
+
+}