diff options
Diffstat (limited to 'weed/command/filer_sync.go')
| -rw-r--r-- | weed/command/filer_sync.go | 54 |
1 files changed, 30 insertions, 24 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index af0a624b1..587174059 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -20,19 +20,21 @@ import ( ) type SyncOptions struct { - isActivePassive *bool - filerA *string - filerB *string - aPath *string - bPath *string - aReplication *string - bReplication *string - aCollection *string - bCollection *string - aTtlSec *int - bTtlSec *int - aDebug *bool - bDebug *bool + isActivePassive *bool + filerA *string + filerB *string + aPath *string + bPath *string + aReplication *string + bReplication *string + aCollection *string + bCollection *string + aTtlSec *int + bTtlSec *int + aDebug *bool + bDebug *bool + aProxyByFiler *bool + bProxyByFiler *bool } var ( @@ -43,7 +45,7 @@ var ( func init() { cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle - syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow if true") + syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true") syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster") syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster") syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A") @@ -54,6 +56,8 @@ func init() { syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B") syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A") syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B") + syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", true, "read and write file chunks by filer A instead of volume servers") + syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", true, "read and write file chunks by filer B instead of volume servers") syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files") syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files") syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file") @@ -62,8 +66,8 @@ func init() { var cmdFilerSynchronize = &Command{ UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>", - Short: "continuously synchronize between two active-active or active-passive SeaweedFS clusters", - Long: `continuously synchronize file changes between two active-active or active-passive filers + Short: "resumeable continuous synchronization between two active-active or active-passive SeaweedFS clusters", + Long: `resumeable continuous synchronization for file changes between two active-active or active-passive filers filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content, and write to the other destination. Different from filer.replicate: @@ -86,8 +90,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool { go func() { for { - err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.filerB, - *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bDebug) + err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, + *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, + *syncOptions.bDebug) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) time.Sleep(1747 * time.Millisecond) @@ -98,8 +103,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool { if !*syncOptions.isActivePassive { go func() { for { - err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.filerA, - *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aDebug) + err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, + *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, + *syncOptions.aDebug) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) time.Sleep(2147 * time.Millisecond) @@ -113,8 +119,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool { return true } -func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath, targetFiler, targetPath string, - replicationStr, collection string, ttlSec int, debug bool) error { +func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath string, sourceReadChunkFromFiler bool, targetFiler, targetPath string, + replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler, debug bool) error { // read source filer signature sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler) @@ -138,9 +144,9 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so // create filer sink filerSource := &source.FilerSource{} - filerSource.DoInitialize(pb.ServerToGrpcAddress(sourceFiler), sourcePath) + filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, sourceReadChunkFromFiler) filerSink := &filersink.FilerSink{} - filerSink.DoInitialize(pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption) + filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, grpcDialOption, sinkWriteChunkByFiler) filerSink.SetSourceFiler(filerSource) processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { |
