diff options
Diffstat (limited to 'weed/command/filer_sync.go')
| -rw-r--r-- | weed/command/filer_sync.go | 129 |
1 files changed, 99 insertions, 30 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 5440811dd..b7da1baf9 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -12,9 +12,11 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" + statsCollect "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/grace" "google.golang.org/grpc" + "os" "strings" "time" ) @@ -35,8 +37,12 @@ type SyncOptions struct { bDiskType *string aDebug *bool bDebug *bool + aFromTsMs *int64 + bFromTsMs *int64 aProxyByFiler *bool bProxyByFiler *bool + metricsHttpPort *int + clientId int32 } var ( @@ -64,8 +70,12 @@ func init() { syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers") syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files") syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files") + syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond") + syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond") syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file") syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file") + syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port") + syncOptions.clientId = util.RandomInt32() } var cmdFilerSynchronize = &Command{ @@ -93,10 +103,37 @@ func runFilerSynchronize(cmd *Command, args []string) bool { grace.SetupProfiling(*syncCpuProfile, *syncMemProfile) + filerA := pb.ServerAddress(*syncOptions.filerA) + filerB := pb.ServerAddress(*syncOptions.filerB) + + // start filer.sync metrics server + go statsCollect.StartMetricsServer(*syncOptions.metricsHttpPort) + + // read a filer signature + aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA) + if aFilerErr != nil { + glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr) + return true + } + // read b filer signature + bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB) + if bFilerErr != nil { + glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr) + return true + } + go func() { + // a->b + // set synchronization start timestamp to offset + initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs) + if initOffsetError != nil { + glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError) + os.Exit(2) + } for { - err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, *syncOptions.filerB, - *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug) + err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB, + *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, + *syncOptions.bDebug, aFilerSignature, bFilerSignature) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) time.Sleep(1747 * time.Millisecond) @@ -105,10 +142,18 @@ func runFilerSynchronize(cmd *Command, args []string) bool { }() if !*syncOptions.isActivePassive { + // b->a + // set synchronization start timestamp to offset + initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs) + if initOffsetError != nil { + glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError) + os.Exit(2) + } go func() { for { - err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, *syncOptions.filerA, - *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug) + err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA, + *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, + *syncOptions.aDebug, bFilerSignature, aFilerSignature) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) time.Sleep(2147 * time.Millisecond) @@ -122,23 +167,28 @@ func runFilerSynchronize(cmd *Command, args []string) bool { return true } -func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath string, sourceReadChunkFromFiler bool, targetFiler, targetPath string, - replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error { - - // read source filer signature - sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler) - if sourceErr != nil { - return sourceErr +// initOffsetFromTsMs Initialize offset +func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64) error { + if fromTsMs <= 0 { + return nil } - // read target filer signature - targetFilerSignature, targetErr := replication.ReadFilerSignature(grpcDialOption, targetFiler) - if targetErr != nil { - return targetErr + // convert to nanosecond + fromTsNs := fromTsMs * 1000_000 + // If not successful, exit the program. + setOffsetErr := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, fromTsNs) + if setOffsetErr != nil { + return setOffsetErr } + glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB) + return nil +} + +func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, + replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error { // if first time, start from now // if has previously synced, resume from that point of time - sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature) + sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature) if err != nil { return err } @@ -147,9 +197,9 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so // create filer sink filerSource := &source.FilerSource{} - filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, sourceReadChunkFromFiler) + filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler) filerSink := &filersink.FilerSink{} - filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) + filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) filerSink.SetSourceFiler(filerSource) persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug) @@ -165,13 +215,19 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so return persistEventFn(resp) } + var lastLogTsNs = time.Now().Nanosecond() + var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { - glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) - return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs) + now := time.Now().Nanosecond() + glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) + lastLogTsNs = now + // collect synchronous offset + statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(lastTsNs)) + return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler, - sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false) + return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, + sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError) } @@ -179,9 +235,19 @@ const ( SyncKeyPrefix = "sync." ) -func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { +// When each business is distinguished according to path, and offsets need to be maintained separately. +func getSignaturePrefixByPath(path string) string { + // compatible historical version + if path == "/" { + return SyncKeyPrefix + } else { + return SyncKeyPrefix + path + } +} + +func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { - readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) @@ -206,8 +272,8 @@ func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix str } -func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error { - return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { +func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error { + return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature)) @@ -255,16 +321,19 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl } // handle deletions - if message.OldEntry != nil && message.NewEntry == nil { + if filer_pb.IsDelete(resp) { if !strings.HasPrefix(string(sourceOldKey), sourcePath) { return nil } key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath) - return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) + if !dataSink.IsIncremental() { + return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) + } + return nil } // handle new entries - if message.OldEntry == nil && message.NewEntry != nil { + if filer_pb.IsCreate(resp) { if !strings.HasPrefix(string(sourceNewKey), sourcePath) { return nil } @@ -273,7 +342,7 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl } // this is something special? - if message.OldEntry == nil && message.NewEntry == nil { + if filer_pb.IsEmpty(resp) { return nil } |
