aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/filer_backup.go15
-rw-r--r--weed/command/filer_meta_backup.go17
-rw-r--r--weed/command/filer_meta_tail.go30
-rw-r--r--weed/command/filer_remote_gateway_buckets.go17
-rw-r--r--weed/command/filer_remote_sync_dir.go17
-rw-r--r--weed/command/filer_sync.go16
6 files changed, 94 insertions, 18 deletions
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index 6f379a6d6..b51dd65b6 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -138,6 +138,19 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
}()
}
- return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, clientEpoch, sourcePath, nil, startFrom.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
+ metadataFollowOption := &pb.MetadataFollowOption{
+ ClientName: "backup_" + dataSink.GetName(),
+ ClientId: clientId,
+ ClientEpoch: clientEpoch,
+ SelfSignature: 0,
+ PathPrefix: sourcePath,
+ AdditionalPathPrefixes: nil,
+ DirectoriesToWatch: nil,
+ StartTsNs: startFrom.UnixNano(),
+ StopTsNs: 0,
+ EventErrorType: pb.TrivialOnError,
+ }
+
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
}
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index f2cba9382..ff4a61e41 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -196,8 +196,21 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
})
metaBackup.clientEpoch++
- return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId, metaBackup.clientEpoch,
- *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
+
+ metadataFollowOption := &pb.MetadataFollowOption{
+ ClientName: "meta_backup",
+ ClientId: metaBackup.clientId,
+ ClientEpoch: metaBackup.clientEpoch,
+ SelfSignature: 0,
+ PathPrefix: *metaBackup.filerDirectory,
+ AdditionalPathPrefixes: nil,
+ DirectoriesToWatch: nil,
+ StartTsNs: startTime.UnixNano(),
+ StopTsNs: 0,
+ EventErrorType: pb.TrivialOnError,
+ }
+
+ return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, metadataFollowOption, processEventFnWithOffset)
}
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index 58a496ff4..32855072b 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -107,16 +107,28 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
untilTsNs = time.Now().Add(-*tailStop).UnixNano()
}
- tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId, 0, *tailTarget, nil,
- time.Now().Add(-*tailStart).UnixNano(), untilTsNs, 0, func(resp *filer_pb.SubscribeMetadataResponse) error {
- if !shouldPrint(resp) {
- return nil
- }
- if err := eachEntryFunc(resp); err != nil {
- return err
- }
+ metadataFollowOption := &pb.MetadataFollowOption{
+ ClientName: "tail",
+ ClientId: clientId,
+ ClientEpoch: 0,
+ SelfSignature: 0,
+ PathPrefix: *tailTarget,
+ AdditionalPathPrefixes: nil,
+ DirectoriesToWatch: nil,
+ StartTsNs: time.Now().Add(-*tailStart).UnixNano(),
+ StopTsNs: untilTsNs,
+ EventErrorType: pb.TrivialOnError,
+ }
+
+ tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, metadataFollowOption, func(resp *filer_pb.SubscribeMetadataResponse) error {
+ if !shouldPrint(resp) {
return nil
- }, pb.TrivialOnError)
+ }
+ if err := eachEntryFunc(resp); err != nil {
+ return err
+ }
+ return nil
+ })
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go
index c5fce44f8..e884deaad 100644
--- a/weed/command/filer_remote_gateway_buckets.go
+++ b/weed/command/filer_remote_gateway_buckets.go
@@ -39,8 +39,21 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
option.clientEpoch++
- return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.clientEpoch,
- option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
+
+ metadataFollowOption := &pb.MetadataFollowOption{
+ ClientName: "filer.remote.sync",
+ ClientId: option.clientId,
+ ClientEpoch: option.clientEpoch,
+ SelfSignature: 0,
+ PathPrefix: option.bucketsDir,
+ AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote},
+ DirectoriesToWatch: nil,
+ StartTsNs: lastOffsetTs.UnixNano(),
+ StopTsNs: 0,
+ EventErrorType: pb.TrivialOnError,
+ }
+
+ return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset)
}
func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go
index 5bc6ae300..1dc91b5d5 100644
--- a/weed/command/filer_remote_sync_dir.go
+++ b/weed/command/filer_remote_sync_dir.go
@@ -63,8 +63,21 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo)
option.clientEpoch++
- return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.clientEpoch,
- mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError)
+
+ metadataFollowOption := &pb.MetadataFollowOption{
+ ClientName: "filer.remote.sync",
+ ClientId: option.clientId,
+ ClientEpoch: option.clientEpoch,
+ SelfSignature: 0,
+ PathPrefix: mountedDir,
+ AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote},
+ DirectoriesToWatch: nil,
+ StartTsNs: lastOffsetTs.UnixNano(),
+ StopTsNs: 0,
+ EventErrorType: pb.TrivialOnError,
+ }
+
+ return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset)
}
func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index efef6250e..fcf60ae87 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -287,8 +287,20 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, processor.processedTsWatermark)
})
- return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, clientEpoch,
- sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError)
+ metadataFollowOption := &pb.MetadataFollowOption{
+ ClientName: clientName,
+ ClientId: clientId,
+ ClientEpoch: clientEpoch,
+ SelfSignature: targetFilerSignature,
+ PathPrefix: sourcePath,
+ AdditionalPathPrefixes: nil,
+ DirectoriesToWatch: nil,
+ StartTsNs: sourceFilerOffsetTsNs,
+ StopTsNs: 0,
+ EventErrorType: pb.RetryForeverOnError,
+ }
+
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
}