diff options
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/command/filer_sync.go | 52 | ||||
| -rw-r--r-- | weed/pb/filer_pb_tail.go | 2 |
2 files changed, 50 insertions, 4 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 9b489297c..94c9bc1be 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -60,10 +60,22 @@ const ( DefaultConcurrencyLimit = 32 ) +// syncState tracks the current sync state for graceful shutdown checkpoint saving +type syncState struct { + processor *MetadataProcessor + grpcDialOption grpc.DialOption + targetFiler pb.ServerAddress + sourcePath string + sourceFilerSignature int32 +} + var ( syncOptions SyncOptions syncCpuProfile *string syncMemProfile *string + // atomic pointers to current sync states for graceful shutdown + syncStateA2B atomic.Pointer[syncState] + syncStateB2A atomic.Pointer[syncState] ) func init() { @@ -143,6 +155,27 @@ func runFilerSynchronize(cmd *Command, args []string) bool { return true } + // register graceful shutdown hook to save checkpoints + grace.OnInterrupt(func() { + saveCheckpoint := func(name string, state *syncState) { + if state == nil || state.processor == nil { + return + } + offsetTsNs := state.processor.processedTsWatermark.Load() + if offsetTsNs == 0 { + return + } + if err := setOffset(state.grpcDialOption, state.targetFiler, getSignaturePrefixByPath(state.sourcePath), state.sourceFilerSignature, offsetTsNs); err != nil { + glog.Errorf("failed to save checkpoint for %s on shutdown: %v", name, err) + } else { + glog.V(0).Infof("saved checkpoint for %s on shutdown: %v", name, time.Unix(0, offsetTsNs)) + } + } + + saveCheckpoint("A->B", syncStateA2B.Load()) + saveCheckpoint("B->A", syncStateB2A.Load()) + }) + go func() { // a->b // set synchronization start timestamp to offset @@ -172,7 +205,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool { *syncOptions.concurrency, *syncOptions.bDoDeleteFiles, aFilerSignature, - bFilerSignature) + bFilerSignature, + &syncStateA2B) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) time.Sleep(1747 * time.Millisecond) @@ -210,7 +244,8 @@ func runFilerSynchronize(cmd *Command, args []string) bool { *syncOptions.concurrency, *syncOptions.aDoDeleteFiles, bFilerSignature, - aFilerSignature) + aFilerSignature, + &syncStateB2A) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) time.Sleep(2147 * time.Millisecond) @@ -241,7 +276,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd } func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, - replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32) error { + replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState]) error { // if first time, start from now // if has previously synced, resume from that point of time @@ -278,6 +313,17 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti } processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs) + // update sync state for graceful shutdown checkpoint saving + if statePtr != nil { + statePtr.Store(&syncState{ + processor: processor, + grpcDialOption: grpcDialOption, + targetFiler: targetFiler, + sourcePath: sourcePath, + sourceFilerSignature: sourceFilerSignature, + }) + } + var lastLogTsNs = time.Now().UnixNano() var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go index f5ffac129..d4e1e137e 100644 --- a/weed/pb/filer_pb_tail.go +++ b/weed/pb/filer_pb_tail.go @@ -110,7 +110,7 @@ func makeSubscribeMetadataFunc(option *MetadataFollowOption, processEventFn Proc func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc { var counter int64 - var lastWriteTime time.Time + var lastWriteTime = time.Now() return func(resp *filer_pb.SubscribeMetadataResponse) error { if err := processEventFn(resp); err != nil { return err |
