aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_sync.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2021-12-30 00:23:57 -0800
committerchrislu <chris.lu@gmail.com>2021-12-30 00:23:57 -0800
commit5c87fcc6d28b230154db35cbe7735a5f1b84024f (patch)
treeaccf7f2221083bb3aec27c5ba60b24570349b85d /weed/command/filer_sync.go
parentfb434318e36ac8e78ab304bfd5421f110c10bdf1 (diff)
downloadseaweedfs-5c87fcc6d28b230154db35cbe7735a5f1b84024f.tar.xz
seaweedfs-5c87fcc6d28b230154db35cbe7735a5f1b84024f.zip
add client id for all metadata listening clients
Diffstat (limited to 'weed/command/filer_sync.go')
-rw-r--r--weed/command/filer_sync.go10
1 files changed, 6 insertions, 4 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 230b24a52..326bd1fbe 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -37,6 +37,7 @@ type SyncOptions struct {
bDebug *bool
aProxyByFiler *bool
bProxyByFiler *bool
+ clientId int32
}
var (
@@ -66,6 +67,7 @@ func init() {
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
+ syncOptions.clientId = util.RandomInt32()
}
var cmdFilerSynchronize = &Command{
@@ -97,7 +99,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
filerB := pb.ServerAddress(*syncOptions.filerB)
go func() {
for {
- err := doSubscribeFilerMetaChanges(grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
+ err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
@@ -109,7 +111,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
if !*syncOptions.isActivePassive {
go func() {
for {
- err := doSubscribeFilerMetaChanges(grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
+ err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
@@ -124,7 +126,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
return true
}
-func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
+func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error {
// read source filer signature
@@ -172,7 +174,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler pb.
return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
})
- return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler),
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), clientId,
sourcePath, nil, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
}