aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go3
-rw-r--r--weed/command/filer_backup.go3
-rw-r--r--weed/command/filer_meta_backup.go3
-rw-r--r--weed/command/filer_meta_tail.go18
-rw-r--r--weed/command/filer_remote_sync.go3
-rw-r--r--weed/command/filer_sync.go3
-rw-r--r--weed/pb/filer_pb_tail.go23
7 files changed, 24 insertions, 32 deletions
diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
index c521ce33e..b89e8de51 100644
--- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
+++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
@@ -72,8 +72,7 @@ func startGenerateMetadata() {
func startSubscribeMetadata(eachEntryFunc func(event *filer_pb.SubscribeMetadataResponse) error) {
- tailErr := pb.FollowMetadata(*tailFiler, grpc.WithInsecure(), "tail",
- *dir, 0, 0, eachEntryFunc, false)
+ tailErr := pb.FollowMetadata(*tailFiler, grpc.WithInsecure(), "tail", *dir, nil, 0, 0, eachEntryFunc, false)
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index 0c450181b..f0a4bd79b 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -112,7 +112,6 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
})
- return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(),
- sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), sourcePath, nil, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
}
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index 6fe323fba..ebc39beb1 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -195,8 +195,7 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
return metaBackup.setOffset(lastTime)
})
- return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup",
- *metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false)
+ return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup", *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, processEventFnWithOffset, false)
}
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index 28c0db99b..3b30e2b6a 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -103,17 +103,15 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
}
}
- tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail",
- *tailTarget, time.Now().Add(-*tailStart).UnixNano(), 0,
- func(resp *filer_pb.SubscribeMetadataResponse) error {
- if !shouldPrint(resp) {
- return nil
- }
- if err := eachEntryFunc(resp); err != nil {
- return err
- }
+ tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail", *tailTarget, nil, time.Now().Add(-*tailStart).UnixNano(), 0, func(resp *filer_pb.SubscribeMetadataResponse) error {
+ if !shouldPrint(resp) {
return nil
- }, false)
+ }
+ if err := eachEntryFunc(resp); err != nil {
+ return err
+ }
+ return nil
+ }, false)
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index 8d2719660..fd6ed23b9 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -210,8 +210,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
})
- return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption,
- "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
+ return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync", mountedDir, nil, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
}
func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 5440811dd..4ebb87f71 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -170,8 +170,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
})
- return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler,
- sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler, sourcePath, nil, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
}
diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go
index 31fb62fb3..63900081c 100644
--- a/weed/pb/filer_pb_tail.go
+++ b/weed/pb/filer_pb_tail.go
@@ -12,12 +12,12 @@ import (
type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
-func FollowMetadata(filerAddress string, grpcDialOption grpc.DialOption,
- clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
+func FollowMetadata(filerAddress string, grpcDialOption grpc.DialOption, clientName string,
+ pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, selfSignature int32,
processEventFn ProcessMetadataFunc, fatalOnError bool) error {
- err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(
- clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError))
+ err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(clientName,
+ pathPrefix, additionalPathPrefixes, lastTsNs, selfSignature, processEventFn, fatalOnError))
if err != nil {
return fmt.Errorf("subscribing filer meta change: %v", err)
}
@@ -28,8 +28,7 @@ func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
processEventFn ProcessMetadataFunc, fatalOnError bool) error {
- err := filerClient.WithFilerClient(makeFunc(
- clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError))
+ err := filerClient.WithFilerClient(makeFunc(clientName, pathPrefix, nil, lastTsNs, selfSignature, processEventFn, fatalOnError))
if err != nil {
return fmt.Errorf("subscribing filer meta change: %v", err)
}
@@ -37,16 +36,16 @@ func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
return nil
}
-func makeFunc(clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
- processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error {
+func makeFunc(clientName string, pathPrefix string, additionalPathPrefixes []string, lastTsNs int64, selfSignature int32, processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error {
return func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: clientName,
- PathPrefix: pathPrefix,
- SinceNs: lastTsNs,
- Signature: selfSignature,
+ ClientName: clientName,
+ PathPrefix: pathPrefix,
+ PathPrefixes: additionalPathPrefixes,
+ SinceNs: lastTsNs,
+ Signature: selfSignature,
})
if err != nil {
return fmt.Errorf("subscribe: %v", err)