aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-06-16 22:46:13 -0700
committerchrislu <chris.lu@gmail.com>2025-06-16 22:46:13 -0700
commitc602f53a6e7eca007319a8aab95f96e6fe0f73ae (patch)
tree261ac63ba73fe07c607a36bca2f26120c06db8d6
parentd2be5822a106cc871c10581e96462a438556586e (diff)
downloadseaweedfs-c602f53a6e7eca007319a8aab95f96e6fe0f73ae.tar.xz
seaweedfs-c602f53a6e7eca007319a8aab95f96e6fe0f73ae.zip
tail-volume-uses-the-source-volume-version
-rw-r--r--weed/operation/tail_volume.go3
-rw-r--r--weed/pb/volume_server.proto1
-rw-r--r--weed/pb/volume_server_pb/volume_server.pb.go13
-rw-r--r--weed/server/volume_grpc_tail.go10
4 files changed, 21 insertions, 6 deletions
diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go
index 67ee3e825..3ab0c73cc 100644
--- a/weed/operation/tail_volume.go
+++ b/weed/operation/tail_volume.go
@@ -54,6 +54,7 @@ func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.Dia
needleHeader := resp.NeedleHeader
needleBody := resp.NeedleBody
+ version := needle.Version(resp.Version)
if len(needleHeader) == 0 {
continue
@@ -73,7 +74,7 @@ func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.Dia
n := new(needle.Needle)
n.ParseNeedleHeader(needleHeader)
- err = n.ReadNeedleBodyBytes(needleBody, needle.GetCurrentVersion())
+ err = n.ReadNeedleBodyBytes(needleBody, version)
if err != nil {
return err
}
diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto
index 616ed01be..79b1ba1d0 100644
--- a/weed/pb/volume_server.proto
+++ b/weed/pb/volume_server.proto
@@ -341,6 +341,7 @@ message VolumeTailSenderResponse {
bytes needle_header = 1;
bytes needle_body = 2;
bool is_last_chunk = 3;
+ uint32 version = 4;
}
message VolumeTailReceiverRequest {
diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go
index 2eb514824..b4c5ec809 100644
--- a/weed/pb/volume_server_pb/volume_server.pb.go
+++ b/weed/pb/volume_server_pb/volume_server.pb.go
@@ -2535,6 +2535,7 @@ type VolumeTailSenderResponse struct {
NeedleHeader []byte `protobuf:"bytes,1,opt,name=needle_header,json=needleHeader,proto3" json:"needle_header,omitempty"`
NeedleBody []byte `protobuf:"bytes,2,opt,name=needle_body,json=needleBody,proto3" json:"needle_body,omitempty"`
IsLastChunk bool `protobuf:"varint,3,opt,name=is_last_chunk,json=isLastChunk,proto3" json:"is_last_chunk,omitempty"`
+ Version uint32 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -2590,6 +2591,13 @@ func (x *VolumeTailSenderResponse) GetIsLastChunk() bool {
return false
}
+func (x *VolumeTailSenderResponse) GetVersion() uint32 {
+ if x != nil {
+ return x.Version
+ }
+ return 0
+}
+
type VolumeTailReceiverRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"`
@@ -5887,12 +5895,13 @@ const file_volume_server_proto_rawDesc = "" +
"\x17VolumeTailSenderRequest\x12\x1b\n" +
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x19\n" +
"\bsince_ns\x18\x02 \x01(\x04R\asinceNs\x120\n" +
- "\x14idle_timeout_seconds\x18\x03 \x01(\rR\x12idleTimeoutSeconds\"\x84\x01\n" +
+ "\x14idle_timeout_seconds\x18\x03 \x01(\rR\x12idleTimeoutSeconds\"\x9e\x01\n" +
"\x18VolumeTailSenderResponse\x12#\n" +
"\rneedle_header\x18\x01 \x01(\fR\fneedleHeader\x12\x1f\n" +
"\vneedle_body\x18\x02 \x01(\fR\n" +
"needleBody\x12\"\n" +
- "\ris_last_chunk\x18\x03 \x01(\bR\visLastChunk\"\xb7\x01\n" +
+ "\ris_last_chunk\x18\x03 \x01(\bR\visLastChunk\x12\x18\n" +
+ "\aversion\x18\x04 \x01(\rR\aversion\"\xb7\x01\n" +
"\x19VolumeTailReceiverRequest\x12\x1b\n" +
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x19\n" +
"\bsince_ns\x18\x02 \x01(\x04R\asinceNs\x120\n" +
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index b44d7d248..935635a83 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -3,9 +3,10 @@ package weed_server
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
@@ -65,12 +66,13 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe
if isLastOne {
// need to heart beat to the client to ensure the connection health
- sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true})
+ sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true, Version: uint32(v.Version())})
return lastTimestampNs, sendErr
}
scanner := &VolumeFileScanner4Tailing{
- stream: stream,
+ stream: stream,
+ version: uint32(v.Version()),
}
err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToActualOffset(), scanner)
@@ -101,6 +103,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
type VolumeFileScanner4Tailing struct {
stream volume_server_pb.VolumeServer_VolumeTailSenderServer
lastProcessedTimestampNs uint64
+ version uint32
}
func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error {
@@ -126,6 +129,7 @@ func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset i
NeedleHeader: needleHeader,
NeedleBody: needleBody[i:stopOffset],
IsLastChunk: isLastChunk,
+ Version: scanner.version,
})
if sendErr != nil {
return sendErr