diff options
| author | chrislu <chris.lu@gmail.com> | 2022-07-23 10:50:28 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-07-23 10:50:28 -0700 |
| commit | 64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4 (patch) | |
| tree | 30903fa25af0bcf797f93e354f68fb41216dc602 /weed/command | |
| parent | 2c8818351f418e3584a6c5410c396f747aebd725 (diff) | |
| download | seaweedfs-64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4.tar.xz seaweedfs-64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4.zip | |
metadata subscription uses client epoch
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/filer_backup.go | 8 | ||||
| -rw-r--r-- | weed/command/filer_meta_backup.go | 8 | ||||
| -rw-r--r-- | weed/command/filer_meta_tail.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_remote_gateway.go | 1 | ||||
| -rw-r--r-- | weed/command/filer_remote_gateway_buckets.go | 3 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync.go | 1 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync_dir.go | 3 | ||||
| -rw-r--r-- | weed/command/filer_sync.go | 11 |
8 files changed, 24 insertions, 13 deletions
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index d191c693b..90bc8c5c3 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -55,9 +55,11 @@ func runFilerBackup(cmd *Command, args []string) bool { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") clientId := util.RandomInt32() + var clientEpoch int32 for { - err := doFilerBackup(grpcDialOption, &filerBackupOptions, clientId) + clientEpoch++ + err := doFilerBackup(grpcDialOption, &filerBackupOptions, clientId, clientEpoch) if err != nil { glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err) time.Sleep(1747 * time.Millisecond) @@ -71,7 +73,7 @@ const ( BackupKeyPrefix = "backup." ) -func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32) error { +func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32, clientEpoch int32) error { // find data sink config := util.GetViper() @@ -114,6 +116,6 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, sourcePath, nil, startFrom.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) + return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, clientEpoch, sourcePath, nil, startFrom.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) } diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index cf679885d..54cfc31b7 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -27,8 +27,9 @@ type FilerMetaBackupOptions struct { restart *bool backupFilerConfig *string - store filer.FilerStore - clientId int32 + store filer.FilerStore + clientId int32 + clientEpoch int32 } func init() { @@ -194,7 +195,8 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { return metaBackup.setOffset(lastTime) }) - return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId, + metaBackup.clientEpoch++ + return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId, metaBackup.clientEpoch, *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) } diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 66a87c3d9..e319e93e6 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -110,7 +110,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool { untilTsNs = time.Now().Add(-*tailStop).UnixNano() } - tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId, *tailTarget, nil, + tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId, 0, *tailTarget, nil, time.Now().Add(-*tailStart).UnixNano(), untilTsNs, 0, func(resp *filer_pb.SubscribeMetadataResponse) error { if !shouldPrint(resp) { return nil diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go index 33454f378..9389ff79b 100644 --- a/weed/command/filer_remote_gateway.go +++ b/weed/command/filer_remote_gateway.go @@ -29,6 +29,7 @@ type RemoteGatewayOptions struct { remoteConfs map[string]*remote_pb.RemoteConf bucketsDir string clientId int32 + clientEpoch int32 } var _ = filer_pb.FilerClient(&RemoteGatewayOptions{}) diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index 9fe0e29df..121f46114 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -39,7 +39,8 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo) - return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, + option.clientEpoch++ + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.clientEpoch, option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) } diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index d6ccf7b79..49334f13d 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -19,6 +19,7 @@ type RemoteSyncOptions struct { timeAgo *time.Duration dir *string clientId int32 + clientEpoch int32 } var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index 5fc20be9a..b54bfcf71 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -40,7 +40,8 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo) - return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, + option.clientEpoch++ + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.clientEpoch, mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) } diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 1550d155a..130138a3d 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -43,6 +43,7 @@ type SyncOptions struct { bProxyByFiler *bool metricsHttpPort *int clientId int32 + clientEpoch int32 } var ( @@ -131,7 +132,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool { os.Exit(2) } for { - err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB, + syncOptions.clientEpoch++ + err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB, *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug, aFilerSignature, bFilerSignature) if err != nil { @@ -151,7 +153,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool { } go func() { for { - err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA, + syncOptions.clientEpoch++ + err := doSubscribeFilerMetaChanges(syncOptions.clientId, syncOptions.clientEpoch, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA, *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug, bFilerSignature, aFilerSignature) if err != nil { @@ -183,7 +186,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd return nil } -func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, +func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error { // if first time, start from now @@ -226,7 +229,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, + return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, clientEpoch, sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError) } |
