diff options
Diffstat (limited to 'weed/command/filer_sync.go')
| -rw-r--r-- | weed/command/filer_sync.go | 63 |
1 files changed, 52 insertions, 11 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 1550d155a..abf13a81d 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -26,7 +26,9 @@ type SyncOptions struct { filerA *string filerB *string aPath *string + aExcludePaths *string bPath *string + bExcludePaths *string aReplication *string bReplication *string aCollection *string @@ -43,6 +45,7 @@ type SyncOptions struct { bProxyByFiler *bool metricsHttpPort *int clientId int32 + clientEpoch int32 } var ( @@ -57,7 +60,9 @@ func init() { syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster") syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster") syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A") + syncOptions.aExcludePaths = cmdFilerSynchronize.Flag.String("a.excludePaths", "", "exclude directories to sync on filer A") syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B") + syncOptions.bExcludePaths = cmdFilerSynchronize.Flag.String("b.excludePaths", "", "exclude directories to sync on filer B") syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A") syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B") syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A") @@ -131,9 +136,25 @@ func runFilerSynchronize(cmd *Command, args []string) bool { os.Exit(2) } for { - err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB, - *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, - *syncOptions.bDebug, aFilerSignature, bFilerSignature) + syncOptions.clientEpoch++ + err := doSubscribeFilerMetaChanges( + syncOptions.clientId, + syncOptions.clientEpoch, + grpcDialOption, + filerA, + *syncOptions.aPath, + strings.Split(*syncOptions.aExcludePaths, ","), + *syncOptions.aProxyByFiler, + filerB, + *syncOptions.bPath, + *syncOptions.bReplication, + *syncOptions.bCollection, + *syncOptions.bTtlSec, + *syncOptions.bProxyByFiler, + *syncOptions.bDiskType, + *syncOptions.bDebug, + aFilerSignature, + bFilerSignature) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) time.Sleep(1747 * time.Millisecond) @@ -151,9 +172,25 @@ func runFilerSynchronize(cmd *Command, args []string) bool { } go func() { for { - err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA, - *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, - *syncOptions.aDebug, bFilerSignature, aFilerSignature) + syncOptions.clientEpoch++ + err := doSubscribeFilerMetaChanges( + syncOptions.clientId, + syncOptions.clientEpoch, + grpcDialOption, + filerB, + *syncOptions.bPath, + strings.Split(*syncOptions.bExcludePaths, ","), + *syncOptions.bProxyByFiler, + filerA, + *syncOptions.aPath, + *syncOptions.aReplication, + *syncOptions.aCollection, + *syncOptions.aTtlSec, + *syncOptions.aProxyByFiler, + *syncOptions.aDiskType, + *syncOptions.aDebug, + bFilerSignature, + aFilerSignature) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) time.Sleep(2147 * time.Millisecond) @@ -183,7 +220,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd return nil } -func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, +func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error { // if first time, start from now @@ -202,7 +239,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) filerSink.SetSourceFiler(filerSource) - persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug) + persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, filerSink, debug) processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification @@ -226,7 +263,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, + return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, clientEpoch, sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError) } @@ -299,7 +336,7 @@ func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature } -func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error { +func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error { // process function processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification @@ -319,7 +356,11 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl if !strings.HasPrefix(resp.Directory, sourcePath) { return nil } - + for _, excludePath := range excludePaths { + if strings.HasPrefix(resp.Directory, excludePath) { + return nil + } + } // handle deletions if filer_pb.IsDelete(resp) { if !strings.HasPrefix(string(sourceOldKey), sourcePath) { |
