diff options
Diffstat (limited to 'weed/server/volume_grpc_tail.go')
| -rw-r--r-- | weed/server/volume_grpc_tail.go | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go new file mode 100644 index 000000000..da248498f --- /dev/null +++ b/weed/server/volume_grpc_tail.go @@ -0,0 +1,97 @@ +package weed_server + +import ( + "fmt" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stream volume_server_pb.VolumeServer_VolumeTailServer) error { + + v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + if v == nil { + return fmt.Errorf("not found volume id %d", req.VolumeId) + } + + defer glog.V(1).Infof("tailing volume %d finished", v.Id) + + lastTimestampNs := req.SinceNs + drainingSeconds := req.DrainingSeconds + + for { + lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs) + if err != nil { + glog.Infof("sendNeedlesSince: %v", err) + return fmt.Errorf("streamFollow: %v", err) + } + time.Sleep(2 * time.Second) + + if req.DrainingSeconds == 0 { + lastTimestampNs = lastProcessedTimestampNs + continue + } + if lastProcessedTimestampNs == lastTimestampNs { + drainingSeconds-- + if drainingSeconds <= 0 { + return nil + } + glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds) + } else { + lastTimestampNs = lastProcessedTimestampNs + drainingSeconds = req.DrainingSeconds + glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds) + } + + } + +} + +func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) { + + foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs) + if err != nil { + return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err) + } + + // log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne) + + if isLastOne { + // need to heart beat to the client to ensure the connection health + sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{IsLastChunk: true}) + return lastTimestampNs, sendErr + } + + err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error { + + blockSizeLimit := 1024 * 1024 * 2 + isLastChunk := false + + // need to send body by chunks + for i := 0; i < len(needleBody); i += blockSizeLimit { + stopOffset := i + blockSizeLimit + if stopOffset >= len(needleBody) { + isLastChunk = true + stopOffset = len(needleBody) + } + + sendErr := stream.Send(&volume_server_pb.VolumeTailResponse{ + NeedleHeader: needleHeader, + NeedleBody: needleBody[i:stopOffset], + IsLastChunk: isLastChunk, + }) + if sendErr != nil { + return sendErr + } + } + + lastProcessedTimestampNs = needleAppendAtNs + return nil + + }) + + return + +} |
