diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-10-22 00:50:30 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-10-22 00:50:30 -0700 |
| commit | fc412e428bd6816f1d53ed9017d1ac7ee02db817 (patch) | |
| tree | a0ee11f646b6072ab83fb9bf8f96f56b8b1a343a /weed/server/volume_grpc_tail.go | |
| parent | c9a183eb69205e7f821615ced2bc1f11f47d6e40 (diff) | |
| download | seaweedfs-fc412e428bd6816f1d53ed9017d1ac7ee02db817.tar.xz seaweedfs-fc412e428bd6816f1d53ed9017d1ac7ee02db817.zip | |
refactor ScanVolumeFileFrom()
Diffstat (limited to 'weed/server/volume_grpc_tail.go')
| -rw-r--r-- | weed/server/volume_grpc_tail.go | 70 |
1 files changed, 44 insertions, 26 deletions
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index 34c55a599..1e0c91274 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -67,34 +67,13 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe return lastTimestampNs, sendErr } - err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error { - - isLastChunk := false - - // need to send body by chunks - for i := 0; i < len(needleBody); i += BufferSizeLimit { - stopOffset := i + BufferSizeLimit - if stopOffset >= len(needleBody) { - isLastChunk = true - stopOffset = len(needleBody) - } - - sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{ - NeedleHeader: needleHeader, - NeedleBody: needleBody[i:stopOffset], - IsLastChunk: isLastChunk, - }) - if sendErr != nil { - return sendErr - } - } - - lastProcessedTimestampNs = needleAppendAtNs - return nil + scanner := &VolumeFileScanner4Tailing{ + stream:stream, + } - }) + err = storage.ScanVolumeFileFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), scanner) - return + return scanner.lastProcessedTimestampNs, err } @@ -115,3 +94,42 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv }) } + +// generate the volume idx +type VolumeFileScanner4Tailing struct { + stream volume_server_pb.VolumeServer_VolumeTailSenderServer + lastProcessedTimestampNs uint64 +} + +func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock storage.SuperBlock) error { + return nil + +} +func (scanner *VolumeFileScanner4Tailing) ReadNeedleBody() bool { + return true +} + +func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { + isLastChunk := false + + // need to send body by chunks + for i := 0; i < len(needleBody); i += BufferSizeLimit { + stopOffset := i + BufferSizeLimit + if stopOffset >= len(needleBody) { + isLastChunk = true + stopOffset = len(needleBody) + } + + sendErr := scanner.stream.Send(&volume_server_pb.VolumeTailSenderResponse{ + NeedleHeader: needleHeader, + NeedleBody: needleBody[i:stopOffset], + IsLastChunk: isLastChunk, + }) + if sendErr != nil { + return sendErr + } + } + + scanner.lastProcessedTimestampNs = n.AppendAtNs + return nil +} |
