aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_sync.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command/filer_sync.go')
-rw-r--r--weed/command/filer_sync.go52
1 files changed, 49 insertions, 3 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 9b489297c..94c9bc1be 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -60,10 +60,22 @@ const (
DefaultConcurrencyLimit = 32
)
+// syncState tracks the current sync state for graceful shutdown checkpoint saving
+type syncState struct {
+ processor *MetadataProcessor
+ grpcDialOption grpc.DialOption
+ targetFiler pb.ServerAddress
+ sourcePath string
+ sourceFilerSignature int32
+}
+
var (
syncOptions SyncOptions
syncCpuProfile *string
syncMemProfile *string
+ // atomic pointers to current sync states for graceful shutdown
+ syncStateA2B atomic.Pointer[syncState]
+ syncStateB2A atomic.Pointer[syncState]
)
func init() {
@@ -143,6 +155,27 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
return true
}
+ // register graceful shutdown hook to save checkpoints
+ grace.OnInterrupt(func() {
+ saveCheckpoint := func(name string, state *syncState) {
+ if state == nil || state.processor == nil {
+ return
+ }
+ offsetTsNs := state.processor.processedTsWatermark.Load()
+ if offsetTsNs == 0 {
+ return
+ }
+ if err := setOffset(state.grpcDialOption, state.targetFiler, getSignaturePrefixByPath(state.sourcePath), state.sourceFilerSignature, offsetTsNs); err != nil {
+ glog.Errorf("failed to save checkpoint for %s on shutdown: %v", name, err)
+ } else {
+ glog.V(0).Infof("saved checkpoint for %s on shutdown: %v", name, time.Unix(0, offsetTsNs))
+ }
+ }
+
+ saveCheckpoint("A->B", syncStateA2B.Load())
+ saveCheckpoint("B->A", syncStateB2A.Load())
+ })
+
go func() {
// a->b
// set synchronization start timestamp to offset
@@ -172,7 +205,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
*syncOptions.concurrency,
*syncOptions.bDoDeleteFiles,
aFilerSignature,
- bFilerSignature)
+ bFilerSignature,
+ &syncStateA2B)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
time.Sleep(1747 * time.Millisecond)
@@ -210,7 +244,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
*syncOptions.concurrency,
*syncOptions.aDoDeleteFiles,
bFilerSignature,
- aFilerSignature)
+ aFilerSignature,
+ &syncStateB2A)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
time.Sleep(2147 * time.Millisecond)
@@ -241,7 +276,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd
}
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
- replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32) error {
+ replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState]) error {
// if first time, start from now
// if has previously synced, resume from that point of time
@@ -278,6 +313,17 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
}
processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs)
+ // update sync state for graceful shutdown checkpoint saving
+ if statePtr != nil {
+ statePtr.Store(&syncState{
+ processor: processor,
+ grpcDialOption: grpcDialOption,
+ targetFiler: targetFiler,
+ sourcePath: sourcePath,
+ sourceFilerSignature: sourceFilerSignature,
+ })
+ }
+
var lastLogTsNs = time.Now().UnixNano()
var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {