aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-23 10:50:28 -0700
committerchrislu <chris.lu@gmail.com>2022-07-23 10:50:28 -0700
commit64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4 (patch)
tree30903fa25af0bcf797f93e354f68fb41216dc602 /weed/command
parent2c8818351f418e3584a6c5410c396f747aebd725 (diff)
downloadseaweedfs-64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4.tar.xz
seaweedfs-64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4.zip
metadata subscription uses client epoch
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/filer_backup.go8
-rw-r--r--weed/command/filer_meta_backup.go8
-rw-r--r--weed/command/filer_meta_tail.go2
-rw-r--r--weed/command/filer_remote_gateway.go1
-rw-r--r--weed/command/filer_remote_gateway_buckets.go3
-rw-r--r--weed/command/filer_remote_sync.go1
-rw-r--r--weed/command/filer_remote_sync_dir.go3
-rw-r--r--weed/command/filer_sync.go11
8 files changed, 24 insertions, 13 deletions
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index d191c693b..90bc8c5c3 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -55,9 +55,11 @@ func runFilerBackup(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
clientId := util.RandomInt32()
+ var clientEpoch int32
for {
- err := doFilerBackup(grpcDialOption, &filerBackupOptions, clientId)
+ clientEpoch++
+ err := doFilerBackup(grpcDialOption, &filerBackupOptions, clientId, clientEpoch)
if err != nil {
glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
time.Sleep(1747 * time.Millisecond)
@@ -71,7 +73,7 @@ const (
BackupKeyPrefix = "backup."
)
-func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32) error {
+func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32, clientEpoch int32) error {
// find data sink
config := util.GetViper()
@@ -114,6 +116,6 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
})
- return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, sourcePath, nil, startFrom.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, clientEpoch, sourcePath, nil, startFrom.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
}
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index cf679885d..54cfc31b7 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -27,8 +27,9 @@ type FilerMetaBackupOptions struct {
restart *bool
backupFilerConfig *string
- store filer.FilerStore
- clientId int32
+ store filer.FilerStore
+ clientId int32
+ clientEpoch int32
}
func init() {
@@ -194,7 +195,8 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
return metaBackup.setOffset(lastTime)
})
- return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId,
+ metaBackup.clientEpoch++
+ return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId, metaBackup.clientEpoch,
*metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
}
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index 66a87c3d9..e319e93e6 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -110,7 +110,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
untilTsNs = time.Now().Add(-*tailStop).UnixNano()
}
- tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId, *tailTarget, nil,
+ tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId, 0, *tailTarget, nil,
time.Now().Add(-*tailStart).UnixNano(), untilTsNs, 0, func(resp *filer_pb.SubscribeMetadataResponse) error {
if !shouldPrint(resp) {
return nil
diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go
index 33454f378..9389ff79b 100644
--- a/weed/command/filer_remote_gateway.go
+++ b/weed/command/filer_remote_gateway.go
@@ -29,6 +29,7 @@ type RemoteGatewayOptions struct {
remoteConfs map[string]*remote_pb.RemoteConf
bucketsDir string
clientId int32
+ clientEpoch int32
}
var _ = filer_pb.FilerClient(&RemoteGatewayOptions{})
diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go
index 9fe0e29df..121f46114 100644
--- a/weed/command/filer_remote_gateway_buckets.go
+++ b/weed/command/filer_remote_gateway_buckets.go
@@ -39,7 +39,8 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
- return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId,
+ option.clientEpoch++
+ return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.clientEpoch,
option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
}
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index d6ccf7b79..49334f13d 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -19,6 +19,7 @@ type RemoteSyncOptions struct {
timeAgo *time.Duration
dir *string
clientId int32
+ clientEpoch int32
}
var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go
index 5fc20be9a..b54bfcf71 100644
--- a/weed/command/filer_remote_sync_dir.go
+++ b/weed/command/filer_remote_sync_dir.go
@@ -40,7 +40,8 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo)
- return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId,
+ option.clientEpoch++
+ return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.clientEpoch,
mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
}
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 1550d155a..130138a3d 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -43,6 +43,7 @@ type SyncOptions struct {
bProxyByFiler *bool
metricsHttpPort *int
clientId int32
+ clientEpoch int32
}
var (
@@ -131,7 +132,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
os.Exit(2)
}
for {
- err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
+ syncOptions.clientEpoch++
+ err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType,
*syncOptions.bDebug, aFilerSignature, bFilerSignature)
if err != nil {
@@ -151,7 +153,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
}
go func() {
for {
- err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
+ syncOptions.clientEpoch++
+ err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType,
*syncOptions.aDebug, bFilerSignature, aFilerSignature)
if err != nil {
@@ -183,7 +186,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd
return nil
}
-func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
+func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
// if first time, start from now
@@ -226,7 +229,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption,
return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs)
})
- return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId,
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, clientEpoch,
sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError)
}