diff options
| author | chrislu <chris.lu@gmail.com> | 2025-06-16 22:46:13 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-06-16 22:46:13 -0700 |
| commit | c602f53a6e7eca007319a8aab95f96e6fe0f73ae (patch) | |
| tree | 261ac63ba73fe07c607a36bca2f26120c06db8d6 /weed/server/volume_grpc_tail.go | |
| parent | d2be5822a106cc871c10581e96462a438556586e (diff) | |
| download | seaweedfs-c602f53a6e7eca007319a8aab95f96e6fe0f73ae.tar.xz seaweedfs-c602f53a6e7eca007319a8aab95f96e6fe0f73ae.zip | |
tail-volume-uses-the-source-volume-version
Diffstat (limited to 'weed/server/volume_grpc_tail.go')
| -rw-r--r-- | weed/server/volume_grpc_tail.go | 10 |
1 files changed, 7 insertions, 3 deletions
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index b44d7d248..935635a83 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -3,9 +3,10 @@ package weed_server import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" "time" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" @@ -65,12 +66,13 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe if isLastOne { // need to heart beat to the client to ensure the connection health - sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true}) + sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true, Version: uint32(v.Version())}) return lastTimestampNs, sendErr } scanner := &VolumeFileScanner4Tailing{ - stream: stream, + stream: stream, + version: uint32(v.Version()), } err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToActualOffset(), scanner) @@ -101,6 +103,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv type VolumeFileScanner4Tailing struct { stream volume_server_pb.VolumeServer_VolumeTailSenderServer lastProcessedTimestampNs uint64 + version uint32 } func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error { @@ -126,6 +129,7 @@ func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset i NeedleHeader: needleHeader, NeedleBody: needleBody[i:stopOffset], IsLastChunk: isLastChunk, + Version: scanner.version, }) if sendErr != nil { return sendErr |
