aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_tail.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-06-16 22:46:13 -0700
committerchrislu <chris.lu@gmail.com>2025-06-16 22:46:13 -0700
commitc602f53a6e7eca007319a8aab95f96e6fe0f73ae (patch)
tree261ac63ba73fe07c607a36bca2f26120c06db8d6 /weed/server/volume_grpc_tail.go
parentd2be5822a106cc871c10581e96462a438556586e (diff)
downloadseaweedfs-c602f53a6e7eca007319a8aab95f96e6fe0f73ae.tar.xz
seaweedfs-c602f53a6e7eca007319a8aab95f96e6fe0f73ae.zip
tail-volume-uses-the-source-volume-version
Diffstat (limited to 'weed/server/volume_grpc_tail.go')
-rw-r--r--weed/server/volume_grpc_tail.go10
1 files changed, 7 insertions, 3 deletions
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index b44d7d248..935635a83 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -3,9 +3,10 @@ package weed_server
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
@@ -65,12 +66,13 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe
if isLastOne {
// need to heart beat to the client to ensure the connection health
- sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true})
+ sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true, Version: uint32(v.Version())})
return lastTimestampNs, sendErr
}
scanner := &VolumeFileScanner4Tailing{
- stream: stream,
+ stream: stream,
+ version: uint32(v.Version()),
}
err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToActualOffset(), scanner)
@@ -101,6 +103,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
type VolumeFileScanner4Tailing struct {
stream volume_server_pb.VolumeServer_VolumeTailSenderServer
lastProcessedTimestampNs uint64
+ version uint32
}
func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error {
@@ -126,6 +129,7 @@ func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset i
NeedleHeader: needleHeader,
NeedleBody: needleBody[i:stopOffset],
IsLastChunk: isLastChunk,
+ Version: scanner.version,
})
if sendErr != nil {
return sendErr