diff options
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/filer_backup.go | 8 | ||||
| -rw-r--r-- | weed/command/filer_meta_backup.go | 6 | ||||
| -rw-r--r-- | weed/command/filer_meta_tail.go | 5 | ||||
| -rw-r--r-- | weed/command/filer_remote_gateway.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_remote_gateway_buckets.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync_dir.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_sync.go | 10 |
8 files changed, 24 insertions, 13 deletions
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 9e5041531..0b902f96f 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -54,8 +54,10 @@ func runFilerBackup(cmd *Command, args []string) bool { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + clientId := util.RandomInt32() + for { - err := doFilerBackup(grpcDialOption, &filerBackupOptions) + err := doFilerBackup(grpcDialOption, &filerBackupOptions, clientId) if err != nil { glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err) time.Sleep(1747 * time.Millisecond) @@ -69,7 +71,7 @@ const ( BackupKeyPrefix = "backup." ) -func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions) error { +func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32) error { // find data sink config := util.GetViper() @@ -112,7 +114,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), + return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, sourcePath, nil, startFrom.UnixNano(), 0, processEventFnWithOffset, false) } diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index d52ed3349..56c7f7a8c 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -27,7 +27,8 @@ type FilerMetaBackupOptions struct { restart *bool backupFilerConfig *string - store filer.FilerStore + store filer.FilerStore + clientId int32 } func init() { @@ -36,6 +37,7 @@ func init() { metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer") metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup") metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store") + metaBackup.clientId = util.RandomInt32() } var cmdFilerMetaBackup = &Command{ @@ -195,7 +197,7 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { return metaBackup.setOffset(lastTime) }) - return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", + return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId, *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, processEventFnWithOffset, false) } diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 26b155440..1158ef1e0 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -6,7 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/golang/protobuf/jsonpb" jsoniter "github.com/json-iterator/go" - "github.com/olivere/elastic/v7" + elastic "github.com/olivere/elastic/v7" "os" "path/filepath" "strings" @@ -48,6 +48,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + clientId := util.RandomInt32() var filterFunc func(dir, fname string) bool if *tailPattern != "" { @@ -105,7 +106,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool { } } - tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", + tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId, *tailTarget, nil, time.Now().Add(-*tailStart).UnixNano(), 0, func(resp *filer_pb.SubscribeMetadataResponse) error { if !shouldPrint(resp) { diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go index fa0239558..33454f378 100644 --- a/weed/command/filer_remote_gateway.go +++ b/weed/command/filer_remote_gateway.go @@ -28,6 +28,7 @@ type RemoteGatewayOptions struct { mappings *remote_pb.RemoteStorageMapping remoteConfs map[string]*remote_pb.RemoteConf bucketsDir string + clientId int32 } var _ = filer_pb.FilerClient(&RemoteGatewayOptions{}) @@ -54,6 +55,7 @@ func init() { remoteGatewayOptions.timeAgo = cmdFilerRemoteGateway.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") remoteGatewayOptions.include = cmdFilerRemoteGateway.Flag.String("include", "", "pattens of new bucket names, e.g., s3*") remoteGatewayOptions.exclude = cmdFilerRemoteGateway.Flag.String("exclude", "", "pattens of new bucket names, e.g., local*") + remoteGatewayOptions.clientId = util.RandomInt32() } var cmdFilerRemoteGateway = &Command{ diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index fc11cdbc5..4f65b5842 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -38,7 +38,7 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo) - return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) } diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index 681ea35e9..d6ccf7b79 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -18,6 +18,7 @@ type RemoteSyncOptions struct { readChunkFromFiler *bool timeAgo *time.Duration dir *string + clientId int32 } var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) @@ -41,6 +42,7 @@ func init() { remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer") remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now, skipping previous metadata changes. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + remoteSyncOptions.clientId = util.RandomInt32() } var cmdFilerRemoteSynchronize = &Command{ diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index 947f526bb..ccedc9d80 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -40,7 +40,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo) - return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) } diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 230b24a52..326bd1fbe 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -37,6 +37,7 @@ type SyncOptions struct { bDebug *bool aProxyByFiler *bool bProxyByFiler *bool + clientId int32 } var ( @@ -66,6 +67,7 @@ func init() { syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files") syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file") syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file") + syncOptions.clientId = util.RandomInt32() } var cmdFilerSynchronize = &Command{ @@ -97,7 +99,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool { filerB := pb.ServerAddress(*syncOptions.filerB) go func() { for { - err := doSubscribeFilerMetaChanges(grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB, + err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB, *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) @@ -109,7 +111,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool { if !*syncOptions.isActivePassive { go func() { for { - err := doSubscribeFilerMetaChanges(grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA, + err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA, *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) @@ -124,7 +126,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool { return true } -func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, +func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error { // read source filer signature @@ -172,7 +174,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler pb. return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), + return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), clientId, sourcePath, nil, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false) } |
