diff options
Diffstat (limited to 'weed/command/filer_sync.go')
| -rw-r--r-- | weed/command/filer_sync.go | 77 |
1 files changed, 22 insertions, 55 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 0f34e5701..5440811dd 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -15,7 +15,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/grace" "google.golang.org/grpc" - "io" "strings" "time" ) @@ -71,8 +70,8 @@ func init() { var cmdFilerSynchronize = &Command{ UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>", - Short: "resumeable continuous synchronization between two active-active or active-passive SeaweedFS clusters", - Long: `resumeable continuous synchronization for file changes between two active-active or active-passive filers + Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters", + Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content, and write to the other destination. Different from filer.replicate: @@ -89,6 +88,7 @@ var cmdFilerSynchronize = &Command{ func runFilerSynchronize(cmd *Command, args []string) bool { + util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") grace.SetupProfiling(*syncCpuProfile, *syncMemProfile) @@ -165,50 +165,14 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so return persistEventFn(resp) } - return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "syncTo_" + targetFiler, - PathPrefix: sourcePath, - SinceNs: sourceFilerOffsetTsNs, - Signature: targetFilerSignature, - }) - if err != nil { - return fmt.Errorf("listen: %v", err) - } - - var counter int64 - var lastWriteTime time.Time - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } - - if err := processEventFn(resp); err != nil { - return err - } - - counter++ - if lastWriteTime.Add(3 * time.Second).Before(time.Now()) { - glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3)) - counter = 0 - lastWriteTime = time.Now() - if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil { - return err - } - } - - } - + processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { + glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) + return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs) }) + return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler, + sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false) + } const ( @@ -359,16 +323,19 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl return processEventFn } -func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) string { +func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) (key string) { if !dataSink.IsIncremental() { - return util.Join(targetPath, string(sourceKey)[len(sourcePath):]) - } - var mTime int64 - if message.NewEntry != nil { - mTime = message.NewEntry.Attributes.Mtime - } else if message.OldEntry != nil { - mTime = message.OldEntry.Attributes.Mtime + key = util.Join(targetPath, string(sourceKey)[len(sourcePath):]) + } else { + var mTime int64 + if message.NewEntry != nil { + mTime = message.NewEntry.Attributes.Mtime + } else if message.OldEntry != nil { + mTime = message.OldEntry.Attributes.Mtime + } + dateKey := time.Unix(mTime, 0).Format("2006-01-02") + key = util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):]) } - dateKey := time.Unix(mTime, 0).Format("2006-01-02") - return util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):]) + + return escapeKey(key) } |
