diff options
| author | chrislu <chris.lu@gmail.com> | 2023-03-21 23:01:49 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-03-21 23:01:49 -0700 |
| commit | 5db9fcccd4194660a8503696ac44c3539c29d41e (patch) | |
| tree | 9c2afed1dcfb5b958208fbc095919152eb32fc7d /weed/command | |
| parent | de4545c28b8283fb80ae03dc95910a0ab3a67142 (diff) | |
| download | seaweedfs-5db9fcccd4194660a8503696ac44c3539c29d41e.tar.xz seaweedfs-5db9fcccd4194660a8503696ac44c3539c29d41e.zip | |
refactoring
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/filer_backup.go | 15 | ||||
| -rw-r--r-- | weed/command/filer_meta_backup.go | 17 | ||||
| -rw-r--r-- | weed/command/filer_meta_tail.go | 30 | ||||
| -rw-r--r-- | weed/command/filer_remote_gateway_buckets.go | 17 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync_dir.go | 17 | ||||
| -rw-r--r-- | weed/command/filer_sync.go | 16 |
6 files changed, 94 insertions, 18 deletions
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 6f379a6d6..b51dd65b6 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -138,6 +138,19 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti }() } - return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, clientEpoch, sourcePath, nil, startFrom.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "backup_" + dataSink.GetName(), + ClientId: clientId, + ClientEpoch: clientEpoch, + SelfSignature: 0, + PathPrefix: sourcePath, + AdditionalPathPrefixes: nil, + DirectoriesToWatch: nil, + StartTsNs: startFrom.UnixNano(), + StopTsNs: 0, + EventErrorType: pb.TrivialOnError, + } + + return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset) } diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index f2cba9382..ff4a61e41 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -196,8 +196,21 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { }) metaBackup.clientEpoch++ - return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId, metaBackup.clientEpoch, - *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) + + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "meta_backup", + ClientId: metaBackup.clientId, + ClientEpoch: metaBackup.clientEpoch, + SelfSignature: 0, + PathPrefix: *metaBackup.filerDirectory, + AdditionalPathPrefixes: nil, + DirectoriesToWatch: nil, + StartTsNs: startTime.UnixNano(), + StopTsNs: 0, + EventErrorType: pb.TrivialOnError, + } + + return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, metadataFollowOption, processEventFnWithOffset) } diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 58a496ff4..32855072b 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -107,16 +107,28 @@ func runFilerMetaTail(cmd *Command, args []string) bool { untilTsNs = time.Now().Add(-*tailStop).UnixNano() } - tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId, 0, *tailTarget, nil, - time.Now().Add(-*tailStart).UnixNano(), untilTsNs, 0, func(resp *filer_pb.SubscribeMetadataResponse) error { - if !shouldPrint(resp) { - return nil - } - if err := eachEntryFunc(resp); err != nil { - return err - } + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "tail", + ClientId: clientId, + ClientEpoch: 0, + SelfSignature: 0, + PathPrefix: *tailTarget, + AdditionalPathPrefixes: nil, + DirectoriesToWatch: nil, + StartTsNs: time.Now().Add(-*tailStart).UnixNano(), + StopTsNs: untilTsNs, + EventErrorType: pb.TrivialOnError, + } + + tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, metadataFollowOption, func(resp *filer_pb.SubscribeMetadataResponse) error { + if !shouldPrint(resp) { return nil - }, pb.TrivialOnError) + } + if err := eachEntryFunc(resp); err != nil { + return err + } + return nil + }) if tailErr != nil { fmt.Printf("tail %s: %v\n", *tailFiler, tailErr) diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index c5fce44f8..e884deaad 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -39,8 +39,21 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo) option.clientEpoch++ - return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.clientEpoch, - option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) + + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "filer.remote.sync", + ClientId: option.clientId, + ClientEpoch: option.clientEpoch, + SelfSignature: 0, + PathPrefix: option.bucketsDir, + AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote}, + DirectoriesToWatch: nil, + StartTsNs: lastOffsetTs.UnixNano(), + StopTsNs: 0, + EventErrorType: pb.TrivialOnError, + } + + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset) } func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index 5bc6ae300..1dc91b5d5 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -63,8 +63,21 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo) option.clientEpoch++ - return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.clientEpoch, - mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) + + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "filer.remote.sync", + ClientId: option.clientId, + ClientEpoch: option.clientEpoch, + SelfSignature: 0, + PathPrefix: mountedDir, + AdditionalPathPrefixes: []string{filer.DirectoryEtcRemote}, + DirectoriesToWatch: nil, + StartTsNs: lastOffsetTs.UnixNano(), + StopTsNs: 0, + EventErrorType: pb.TrivialOnError, + } + + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, metadataFollowOption, processEventFnWithOffset) } func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index efef6250e..fcf60ae87 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -287,8 +287,20 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, processor.processedTsWatermark) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, clientEpoch, - sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError) + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: clientName, + ClientId: clientId, + ClientEpoch: clientEpoch, + SelfSignature: targetFilerSignature, + PathPrefix: sourcePath, + AdditionalPathPrefixes: nil, + DirectoriesToWatch: nil, + StartTsNs: sourceFilerOffsetTsNs, + StopTsNs: 0, + EventErrorType: pb.RetryForeverOnError, + } + + return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset) } |
