aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhihao.qu <zhihao.qu@ly.com>2022-06-09 10:53:19 +0800
committerzhihao.qu <zhihao.qu@ly.com>2022-06-09 10:53:19 +0800
commitcd5cca36a4d9bd389fe83ae8906ca6f44e24f3b6 (patch)
treede509cb71c7882b1182a2328d4b490ba858a39af
parent4a5135961f71fa3b9a3b9b9267cf20e92372a8ec (diff)
downloadseaweedfs-cd5cca36a4d9bd389fe83ae8906ca6f44e24f3b6.tar.xz
seaweedfs-cd5cca36a4d9bd389fe83ae8906ca6f44e24f3b6.zip
feat(filer.sync): add fromTsMs. Extract signature from doSubscribeFilerMetaChanges
-rw-r--r--weed/command/filer_sync.go66
1 files changed, 53 insertions, 13 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index deb458525..7aa9c1e8d 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -15,6 +15,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"google.golang.org/grpc"
+ "os"
"strings"
"time"
)
@@ -35,6 +36,8 @@ type SyncOptions struct {
bDiskType *string
aDebug *bool
bDebug *bool
+ aFromTsMs *int64
+ bFromTsMs *int64
aProxyByFiler *bool
bProxyByFiler *bool
clientId int32
@@ -65,6 +68,8 @@ func init() {
syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
+ syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
+ syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
syncOptions.clientId = util.RandomInt32()
@@ -97,10 +102,32 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
filerA := pb.ServerAddress(*syncOptions.filerA)
filerB := pb.ServerAddress(*syncOptions.filerB)
+
+ // read a filer signature
+ aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA)
+ if aFilerErr != nil {
+ glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr)
+ return true
+ }
+ // read b filer signature
+ bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB)
+ if bFilerErr != nil {
+ glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr)
+ return true
+ }
+
go func() {
+ // a->b
+ // set synchronization start timestamp to offset
+ initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs)
+ if initOffsetError != nil {
+ glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
+ os.Exit(2)
+ }
for {
err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
- *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug)
+ *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType,
+ *syncOptions.bDebug, aFilerSignature, bFilerSignature)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
time.Sleep(1747 * time.Millisecond)
@@ -109,10 +136,18 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
}()
if !*syncOptions.isActivePassive {
+ // b->a
+ // set synchronization start timestamp to offset
+ initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs)
+ if initOffsetError != nil {
+ glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
+ os.Exit(2)
+ }
go func() {
for {
err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
- *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug)
+ *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType,
+ *syncOptions.aDebug, bFilerSignature, aFilerSignature)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
time.Sleep(2147 * time.Millisecond)
@@ -126,19 +161,24 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
return true
}
-func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
- replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error {
-
- // read source filer signature
- sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
- if sourceErr != nil {
- return sourceErr
+// initOffsetFromTsMs Initialize offset
+func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64) error {
+ if fromTsMs <= 0 {
+ return nil
}
- // read target filer signature
- targetFilerSignature, targetErr := replication.ReadFilerSignature(grpcDialOption, targetFiler)
- if targetErr != nil {
- return targetErr
+ // convert to nanosecond
+ fromTsNs := fromTsMs * 1000_000
+ // If not successful, exit the program.
+ setOffsetErr := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, fromTsNs)
+ if setOffsetErr != nil {
+ return setOffsetErr
}
+ glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB)
+ return nil
+}
+
+func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
+ replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
// if first time, start from now
// if has previously synced, resume from that point of time