aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_sync.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/filer_sync.go')
-rw-r--r--weed/command/filer_sync.go129
1 files changed, 99 insertions, 30 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 5440811dd..b7da1baf9 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -12,9 +12,11 @@ import (
"github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
+ statsCollect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"google.golang.org/grpc"
+ "os"
"strings"
"time"
)
@@ -35,8 +37,12 @@ type SyncOptions struct {
bDiskType *string
aDebug *bool
bDebug *bool
+ aFromTsMs *int64
+ bFromTsMs *int64
aProxyByFiler *bool
bProxyByFiler *bool
+ metricsHttpPort *int
+ clientId int32
}
var (
@@ -64,8 +70,12 @@ func init() {
syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
+ syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
+ syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
+ syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port")
+ syncOptions.clientId = util.RandomInt32()
}
var cmdFilerSynchronize = &Command{
@@ -93,10 +103,37 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
+ filerA := pb.ServerAddress(*syncOptions.filerA)
+ filerB := pb.ServerAddress(*syncOptions.filerB)
+
+ // start filer.sync metrics server
+ go statsCollect.StartMetricsServer(*syncOptions.metricsHttpPort)
+
+ // read a filer signature
+ aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA)
+ if aFilerErr != nil {
+ glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr)
+ return true
+ }
+ // read b filer signature
+ bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB)
+ if bFilerErr != nil {
+ glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr)
+ return true
+ }
+
go func() {
+ // a->b
+ // set synchronization start timestamp to offset
+ initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs)
+ if initOffsetError != nil {
+ glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
+ os.Exit(2)
+ }
for {
- err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, *syncOptions.filerB,
- *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug)
+ err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
+ *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType,
+ *syncOptions.bDebug, aFilerSignature, bFilerSignature)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
time.Sleep(1747 * time.Millisecond)
@@ -105,10 +142,18 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
}()
if !*syncOptions.isActivePassive {
+ // b->a
+ // set synchronization start timestamp to offset
+ initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs)
+ if initOffsetError != nil {
+ glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
+ os.Exit(2)
+ }
go func() {
for {
- err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, *syncOptions.filerA,
- *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug)
+ err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
+ *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType,
+ *syncOptions.aDebug, bFilerSignature, aFilerSignature)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
time.Sleep(2147 * time.Millisecond)
@@ -122,23 +167,28 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
return true
}
-func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath string, sourceReadChunkFromFiler bool, targetFiler, targetPath string,
- replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error {
-
- // read source filer signature
- sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
- if sourceErr != nil {
- return sourceErr
+// initOffsetFromTsMs Initialize offset
+func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64) error {
+ if fromTsMs <= 0 {
+ return nil
}
- // read target filer signature
- targetFilerSignature, targetErr := replication.ReadFilerSignature(grpcDialOption, targetFiler)
- if targetErr != nil {
- return targetErr
+ // convert to nanosecond
+ fromTsNs := fromTsMs * 1000_000
+ // If not successful, exit the program.
+ setOffsetErr := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, fromTsNs)
+ if setOffsetErr != nil {
+ return setOffsetErr
}
+ glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB)
+ return nil
+}
+
+func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
+ replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
// if first time, start from now
// if has previously synced, resume from that point of time
- sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature)
+ sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature)
if err != nil {
return err
}
@@ -147,9 +197,9 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
// create filer sink
filerSource := &source.FilerSource{}
- filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, sourceReadChunkFromFiler)
+ filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
filerSink := &filersink.FilerSink{}
- filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
+ filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource)
persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
@@ -165,13 +215,19 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
return persistEventFn(resp)
}
+ var lastLogTsNs = time.Now().Nanosecond()
+ var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
- glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
- return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
+ now := time.Now().Nanosecond()
+ glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
+ lastLogTsNs = now
+ // collect synchronous offset
+ statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(lastTsNs))
+ return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs)
})
- return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler,
- sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId,
+ sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError)
}
@@ -179,9 +235,19 @@ const (
SyncKeyPrefix = "sync."
)
-func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
+// When each business is distinguished according to path, and offsets need to be maintained separately.
+func getSignaturePrefixByPath(path string) string {
+ // compatible historical version
+ if path == "/" {
+ return SyncKeyPrefix
+ } else {
+ return SyncKeyPrefix + path
+ }
+}
+
+func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
- readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
@@ -206,8 +272,8 @@ func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix str
}
-func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error {
- return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
+ return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
@@ -255,16 +321,19 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl
}
// handle deletions
- if message.OldEntry != nil && message.NewEntry == nil {
+ if filer_pb.IsDelete(resp) {
if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
return nil
}
key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
- return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
+ if !dataSink.IsIncremental() {
+ return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
+ }
+ return nil
}
// handle new entries
- if message.OldEntry == nil && message.NewEntry != nil {
+ if filer_pb.IsCreate(resp) {
if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
return nil
}
@@ -273,7 +342,7 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl
}
// this is something special?
- if message.OldEntry == nil && message.NewEntry == nil {
+ if filer_pb.IsEmpty(resp) {
return nil
}