diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-12-11 10:25:02 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-12-11 10:25:02 -0800 |
| commit | 84b8a8e010269b84a1d6f2b7c730cc49b85bf280 (patch) | |
| tree | ebca3a8efd3100dbb40139ecbffa47b0a7d2b1e0 | |
| parent | c6d6ee8297b7bb569d52b22fa677613514475ce2 (diff) | |
| download | seaweedfs-84b8a8e010269b84a1d6f2b7c730cc49b85bf280.tar.xz seaweedfs-84b8a8e010269b84a1d6f2b7c730cc49b85bf280.zip | |
filer.sync: fix checkpoint not being saved properly (#7719)
* filer.sync: fix race condition on first checkpoint save
Initialize lastWriteTime to time.Now() instead of zero time to prevent
the first checkpoint save from being triggered immediately when the
first event arrives. This gives async jobs time to complete and update
the watermark before the checkpoint is saved.
Previously, the zero time caused lastWriteTime.Add(3s).Before(now) to
be true on the first event, triggering an immediate checkpoint save
attempt. But since jobs are processed asynchronously, the watermark
was still 0 (initial value), causing the save to be skipped due to
the 'if offsetTsNs == 0 { return nil }' check.
Fixes #7717
* filer.sync: save checkpoint on graceful shutdown
Add graceful shutdown handling to save the final checkpoint when
filer.sync is terminated. Previously, any sync progress within the
last 3-second checkpoint interval would be lost on shutdown.
Changes:
- Add syncState struct to track current processor and offset save info
- Add atomic pointers syncStateA2B and syncStateB2A for both directions
- Register grace.OnInterrupt hook to save checkpoints on shutdown
- Modify doSubscribeFilerMetaChanges to update sync state atomically
This ensures that when filer.sync is restarted, it resumes from the
correct position instead of potentially replaying old events.
Fixes #7717
| -rw-r--r-- | weed/command/filer_sync.go | 52 | ||||
| -rw-r--r-- | weed/pb/filer_pb_tail.go | 2 |
2 files changed, 50 insertions, 4 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 9b489297c..94c9bc1be 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -60,10 +60,22 @@ const ( DefaultConcurrencyLimit = 32 ) +// syncState tracks the current sync state for graceful shutdown checkpoint saving +type syncState struct { + processor *MetadataProcessor + grpcDialOption grpc.DialOption + targetFiler pb.ServerAddress + sourcePath string + sourceFilerSignature int32 +} + var ( syncOptions SyncOptions syncCpuProfile *string syncMemProfile *string + // atomic pointers to current sync states for graceful shutdown + syncStateA2B atomic.Pointer[syncState] + syncStateB2A atomic.Pointer[syncState] ) func init() { @@ -143,6 +155,27 @@ func runFilerSynchronize(cmd *Command, args []string) bool { return true } + // register graceful shutdown hook to save checkpoints + grace.OnInterrupt(func() { + saveCheckpoint := func(name string, state *syncState) { + if state == nil || state.processor == nil { + return + } + offsetTsNs := state.processor.processedTsWatermark.Load() + if offsetTsNs == 0 { + return + } + if err := setOffset(state.grpcDialOption, state.targetFiler, getSignaturePrefixByPath(state.sourcePath), state.sourceFilerSignature, offsetTsNs); err != nil { + glog.Errorf("failed to save checkpoint for %s on shutdown: %v", name, err) + } else { + glog.V(0).Infof("saved checkpoint for %s on shutdown: %v", name, time.Unix(0, offsetTsNs)) + } + } + + saveCheckpoint("A->B", syncStateA2B.Load()) + saveCheckpoint("B->A", syncStateB2A.Load()) + }) + go func() { // a->b // set synchronization start timestamp to offset @@ -172,7 +205,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool { *syncOptions.concurrency, *syncOptions.bDoDeleteFiles, aFilerSignature, - bFilerSignature) + bFilerSignature, + &syncStateA2B) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) time.Sleep(1747 * time.Millisecond) @@ -210,7 +244,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool { *syncOptions.concurrency, *syncOptions.aDoDeleteFiles, bFilerSignature, - aFilerSignature) + aFilerSignature, + &syncStateB2A) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) time.Sleep(2147 * time.Millisecond) @@ -241,7 +276,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd } func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, - replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32) error { + replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState]) error { // if first time, start from now // if has previously synced, resume from that point of time @@ -278,6 +313,17 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti } processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs) + // update sync state for graceful shutdown checkpoint saving + if statePtr != nil { + statePtr.Store(&syncState{ + processor: processor, + grpcDialOption: grpcDialOption, + targetFiler: targetFiler, + sourcePath: sourcePath, + sourceFilerSignature: sourceFilerSignature, + }) + } + var lastLogTsNs = time.Now().UnixNano() var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index f5ffac129..d4e1e137e 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -110,7 +110,7 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc { var counter int64 - var lastWriteTime time.Time + var lastWriteTime = time.Now() return func(resp *filer_pb.SubscribeMetadataResponse) error { if err := processEventFn(resp); err != nil { return err |
