aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_sync.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-23 10:50:28 -0700
committerchrislu <chris.lu@gmail.com>2022-07-23 10:50:28 -0700
commit64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4 (patch)
tree30903fa25af0bcf797f93e354f68fb41216dc602 /weed/command/filer_sync.go
parent2c8818351f418e3584a6c5410c396f747aebd725 (diff)
downloadseaweedfs-64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4.tar.xz
seaweedfs-64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4.zip
metadata subscription uses client epoch
Diffstat (limited to 'weed/command/filer_sync.go')
-rw-r--r--weed/command/filer_sync.go11
1 files changed, 7 insertions, 4 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 1550d155a..130138a3d 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -43,6 +43,7 @@ type SyncOptions struct {
bProxyByFiler *bool
metricsHttpPort *int
clientId int32
+ clientEpoch int32
}
var (
@@ -131,7 +132,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
os.Exit(2)
}
for {
- err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
+ syncOptions.clientEpoch++
+ err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType,
*syncOptions.bDebug, aFilerSignature, bFilerSignature)
if err != nil {
@@ -151,7 +153,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
}
go func() {
for {
- err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
+ syncOptions.clientEpoch++
+ err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType,
*syncOptions.aDebug, bFilerSignature, aFilerSignature)
if err != nil {
@@ -183,7 +186,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd
return nil
}
-func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
+func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
// if first time, start from now
@@ -226,7 +229,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption,
return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs)
})
- return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId,
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, clientEpoch,
sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError)
}