diff options
Diffstat (limited to 'weed/command/filer_sync.go')
| -rw-r--r-- | weed/command/filer_sync.go | 52 |
1 files changed, 49 insertions, 3 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 9b489297c..94c9bc1be 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -60,10 +60,22 @@ const ( DefaultConcurrencyLimit = 32 ) +// syncState tracks the current sync state for graceful shutdown checkpoint saving +type syncState struct { + processor *MetadataProcessor + grpcDialOption grpc.DialOption + targetFiler pb.ServerAddress + sourcePath string + sourceFilerSignature int32 +} + var ( syncOptions SyncOptions syncCpuProfile *string syncMemProfile *string + // atomic pointers to current sync states for graceful shutdown + syncStateA2B atomic.Pointer[syncState] + syncStateB2A atomic.Pointer[syncState] ) func init() { @@ -143,6 +155,27 @@ func runFilerSynchronize(cmd *Command, args []string) bool { return true } + // register graceful shutdown hook to save checkpoints + grace.OnInterrupt(func() { + saveCheckpoint := func(name string, state *syncState) { + if state == nil || state.processor == nil { + return + } + offsetTsNs := state.processor.processedTsWatermark.Load() + if offsetTsNs == 0 { + return + } + if err := setOffset(state.grpcDialOption, state.targetFiler, getSignaturePrefixByPath(state.sourcePath), state.sourceFilerSignature, offsetTsNs); err != nil { + glog.Errorf("failed to save checkpoint for %s on shutdown: %v", name, err) + } else { + glog.V(0).Infof("saved checkpoint for %s on shutdown: %v", name, time.Unix(0, offsetTsNs)) + } + } + + saveCheckpoint("A->B", syncStateA2B.Load()) + saveCheckpoint("B->A", syncStateB2A.Load()) + }) + go func() { // a->b // set synchronization start timestamp to offset @@ -172,7 +205,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool { *syncOptions.concurrency, *syncOptions.bDoDeleteFiles, aFilerSignature, - bFilerSignature) + bFilerSignature, + &syncStateA2B) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) time.Sleep(1747 * time.Millisecond) @@ -210,7 +244,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool { *syncOptions.concurrency, *syncOptions.aDoDeleteFiles, bFilerSignature, - aFilerSignature) + aFilerSignature, + &syncStateB2A) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) time.Sleep(2147 * time.Millisecond) @@ -241,7 +276,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd } func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, - replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32) error { + replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState]) error { // if first time, start from now // if has previously synced, resume from that point of time @@ -278,6 +313,17 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti } processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs) + // update sync state for graceful shutdown checkpoint saving + if statePtr != nil { + statePtr.Store(&syncState{ + processor: processor, + grpcDialOption: grpcDialOption, + targetFiler: targetFiler, + sourcePath: sourcePath, + sourceFilerSignature: sourceFilerSignature, + }) + } + var lastLogTsNs = time.Now().UnixNano() var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { |
