From 84b8a8e010269b84a1d6f2b7c730cc49b85bf280 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 11 Dec 2025 10:25:02 -0800 Subject: filer.sync: fix checkpoint not being saved properly (#7719) * filer.sync: fix race condition on first checkpoint save Initialize lastWriteTime to time.Now() instead of zero time to prevent the first checkpoint save from being triggered immediately when the first event arrives. This gives async jobs time to complete and update the watermark before the checkpoint is saved. Previously, the zero time caused lastWriteTime.Add(3s).Before(now) to be true on the first event, triggering an immediate checkpoint save attempt. But since jobs are processed asynchronously, the watermark was still 0 (initial value), causing the save to be skipped due to the 'if offsetTsNs == 0 { return nil }' check. Fixes #7717 * filer.sync: save checkpoint on graceful shutdown Add graceful shutdown handling to save the final checkpoint when filer.sync is terminated. Previously, any sync progress within the last 3-second checkpoint interval would be lost on shutdown. Changes: - Add syncState struct to track current processor and offset save info - Add atomic pointers syncStateA2B and syncStateB2A for both directions - Register grace.OnInterrupt hook to save checkpoints on shutdown - Modify doSubscribeFilerMetaChanges to update sync state atomically This ensures that when filer.sync is restarted, it resumes from the correct position instead of potentially replaying old events. Fixes #7717 --- weed/command/filer_sync.go | 52 +++++++++++++++++++++++++++++++++++++++++++--- weed/pb/filer_pb_tail.go | 2 +- 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 9b489297c..94c9bc1be 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -60,10 +60,22 @@ const ( DefaultConcurrencyLimit = 32 ) +// syncState tracks the current sync state for graceful shutdown checkpoint saving +type syncState struct { + processor *MetadataProcessor + grpcDialOption grpc.DialOption + targetFiler pb.ServerAddress + sourcePath string + sourceFilerSignature int32 +} + var ( syncOptions SyncOptions syncCpuProfile *string syncMemProfile *string + // atomic pointers to current sync states for graceful shutdown + syncStateA2B atomic.Pointer[syncState] + syncStateB2A atomic.Pointer[syncState] ) func init() { @@ -143,6 +155,27 @@ func runFilerSynchronize(cmd *Command, args []string) bool { return true } + // register graceful shutdown hook to save checkpoints + grace.OnInterrupt(func() { + saveCheckpoint := func(name string, state *syncState) { + if state == nil || state.processor == nil { + return + } + offsetTsNs := state.processor.processedTsWatermark.Load() + if offsetTsNs == 0 { + return + } + if err := setOffset(state.grpcDialOption, state.targetFiler, getSignaturePrefixByPath(state.sourcePath), state.sourceFilerSignature, offsetTsNs); err != nil { + glog.Errorf("failed to save checkpoint for %s on shutdown: %v", name, err) + } else { + glog.V(0).Infof("saved checkpoint for %s on shutdown: %v", name, time.Unix(0, offsetTsNs)) + } + } + + saveCheckpoint("A->B", syncStateA2B.Load()) + saveCheckpoint("B->A", syncStateB2A.Load()) + }) + go func() { // a->b // set synchronization start timestamp to offset @@ -172,7 +205,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool { *syncOptions.concurrency, *syncOptions.bDoDeleteFiles, aFilerSignature, - bFilerSignature) + bFilerSignature, + &syncStateA2B) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) time.Sleep(1747 * time.Millisecond) @@ -210,7 +244,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool { *syncOptions.concurrency, *syncOptions.aDoDeleteFiles, bFilerSignature, - aFilerSignature) + aFilerSignature, + &syncStateB2A) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) time.Sleep(2147 * time.Millisecond) @@ -241,7 +276,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd } func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, - replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32) error { + replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState]) error { // if first time, start from now // if has previously synced, resume from that point of time @@ -278,6 +313,17 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti } processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs) + // update sync state for graceful shutdown checkpoint saving + if statePtr != nil { + statePtr.Store(&syncState{ + processor: processor, + grpcDialOption: grpcDialOption, + targetFiler: targetFiler, + sourcePath: sourcePath, + sourceFilerSignature: sourceFilerSignature, + }) + } + var lastLogTsNs = time.Now().UnixNano() var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index f5ffac129..d4e1e137e 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -110,7 +110,7 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc { var counter int64 - var lastWriteTime time.Time + var lastWriteTime = time.Now() return func(resp *filer_pb.SubscribeMetadataResponse) error { if err := processEventFn(resp); err != nil { return err -- cgit v1.2.3