diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/volume_grpc_tail.go | 70 |
1 files changed, 44 insertions, 26 deletions
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index 34c55a599..1e0c91274 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -67,34 +67,13 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe return lastTimestampNs, sendErr } - err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error { - - isLastChunk := false - - // need to send body by chunks - for i := 0; i < len(needleBody); i += BufferSizeLimit { - stopOffset := i + BufferSizeLimit - if stopOffset >= len(needleBody) { - isLastChunk = true - stopOffset = len(needleBody) - } - - sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{ - NeedleHeader: needleHeader, - NeedleBody: needleBody[i:stopOffset], - IsLastChunk: isLastChunk, - }) - if sendErr != nil { - return sendErr - } - } - - lastProcessedTimestampNs = needleAppendAtNs - return nil + scanner := &VolumeFileScanner4Tailing{ + stream:stream, + } - }) + err = storage.ScanVolumeFileFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), scanner) - return + return scanner.lastProcessedTimestampNs, err } @@ -115,3 +94,42 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv }) } + +// generate the volume idx +type VolumeFileScanner4Tailing struct { + stream volume_server_pb.VolumeServer_VolumeTailSenderServer + lastProcessedTimestampNs uint64 +} + +func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock storage.SuperBlock) error { + return nil + +} +func (scanner *VolumeFileScanner4Tailing) ReadNeedleBody() bool { + return true +} + +func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { + isLastChunk := false + + // need to send body by chunks + for i := 0; i < len(needleBody); i += BufferSizeLimit { + stopOffset := i + BufferSizeLimit + if stopOffset >= len(needleBody) { + isLastChunk = true + stopOffset = len(needleBody) + } + + sendErr := scanner.stream.Send(&volume_server_pb.VolumeTailSenderResponse{ + NeedleHeader: needleHeader, + NeedleBody: needleBody[i:stopOffset], + IsLastChunk: isLastChunk, + }) + if sendErr != nil { + return sendErr + } + } + + scanner.lastProcessedTimestampNs = n.AppendAtNs + return nil +} |
