aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_tail.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-10-22 00:50:30 -0700
committerChris Lu <chris.lu@gmail.com>2019-10-22 00:50:30 -0700
commitfc412e428bd6816f1d53ed9017d1ac7ee02db817 (patch)
treea0ee11f646b6072ab83fb9bf8f96f56b8b1a343a /weed/server/volume_grpc_tail.go
parentc9a183eb69205e7f821615ced2bc1f11f47d6e40 (diff)
downloadseaweedfs-fc412e428bd6816f1d53ed9017d1ac7ee02db817.tar.xz
seaweedfs-fc412e428bd6816f1d53ed9017d1ac7ee02db817.zip
refactor ScanVolumeFileFrom()
Diffstat (limited to 'weed/server/volume_grpc_tail.go')
-rw-r--r--weed/server/volume_grpc_tail.go70
1 files changed, 44 insertions, 26 deletions
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index 34c55a599..1e0c91274 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -67,34 +67,13 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe
return lastTimestampNs, sendErr
}
- err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
-
- isLastChunk := false
-
- // need to send body by chunks
- for i := 0; i < len(needleBody); i += BufferSizeLimit {
- stopOffset := i + BufferSizeLimit
- if stopOffset >= len(needleBody) {
- isLastChunk = true
- stopOffset = len(needleBody)
- }
-
- sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{
- NeedleHeader: needleHeader,
- NeedleBody: needleBody[i:stopOffset],
- IsLastChunk: isLastChunk,
- })
- if sendErr != nil {
- return sendErr
- }
- }
-
- lastProcessedTimestampNs = needleAppendAtNs
- return nil
+ scanner := &VolumeFileScanner4Tailing{
+ stream:stream,
+ }
- })
+ err = storage.ScanVolumeFileFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), scanner)
- return
+ return scanner.lastProcessedTimestampNs, err
}
@@ -115,3 +94,42 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
})
}
+
+// generate the volume idx
+type VolumeFileScanner4Tailing struct {
+ stream volume_server_pb.VolumeServer_VolumeTailSenderServer
+ lastProcessedTimestampNs uint64
+}
+
+func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock storage.SuperBlock) error {
+ return nil
+
+}
+func (scanner *VolumeFileScanner4Tailing) ReadNeedleBody() bool {
+ return true
+}
+
+func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
+ isLastChunk := false
+
+ // need to send body by chunks
+ for i := 0; i < len(needleBody); i += BufferSizeLimit {
+ stopOffset := i + BufferSizeLimit
+ if stopOffset >= len(needleBody) {
+ isLastChunk = true
+ stopOffset = len(needleBody)
+ }
+
+ sendErr := scanner.stream.Send(&volume_server_pb.VolumeTailSenderResponse{
+ NeedleHeader: needleHeader,
+ NeedleBody: needleBody[i:stopOffset],
+ IsLastChunk: isLastChunk,
+ })
+ if sendErr != nil {
+ return sendErr
+ }
+ }
+
+ scanner.lastProcessedTimestampNs = n.AppendAtNs
+ return nil
+}