From 1169f943103684ded4d67edac686fd94e8e78ccc Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Fri, 12 Jan 2024 23:57:18 +0500 Subject: Fix filer sync set offset (#5197) * fix: compose 2mount with sync * fix: DATA RACE https://github.com/seaweedfs/seaweedfs/issues/5194 https://github.com/seaweedfs/seaweedfs/issues/5195 --- weed/command/filer_sync.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) (limited to 'weed/command/filer_sync.go') diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index b1e32b65e..006f6794a 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -19,6 +19,7 @@ import ( "os" "regexp" "strings" + "sync/atomic" "time" ) @@ -50,7 +51,7 @@ type SyncOptions struct { aDoDeleteFiles *bool bDoDeleteFiles *bool clientId int32 - clientEpoch int32 + clientEpoch atomic.Int32 } const ( @@ -150,10 +151,10 @@ func runFilerSynchronize(cmd *Command, args []string) bool { os.Exit(2) } for { - syncOptions.clientEpoch++ + syncOptions.clientEpoch.Add(1) err := doSubscribeFilerMetaChanges( syncOptions.clientId, - syncOptions.clientEpoch, + syncOptions.clientEpoch.Load(), grpcDialOption, filerA, *syncOptions.aPath, @@ -188,10 +189,10 @@ func runFilerSynchronize(cmd *Command, args []string) bool { } go func() { for { - syncOptions.clientEpoch++ + syncOptions.clientEpoch.Add(1) err := doSubscribeFilerMetaChanges( syncOptions.clientId, - syncOptions.clientEpoch, + syncOptions.clientEpoch.Load(), grpcDialOption, filerB, *syncOptions.bPath, @@ -274,7 +275,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrencyLimit) concurrency = DefaultConcurrencyLimit } - processor := NewMetadataProcessor(processEventFn, concurrency) + processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs) var lastLogTsNs = time.Now().UnixNano() var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) @@ -282,16 +283,17 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti processor.AddSyncJob(resp) return nil }, 3*time.Second, func(counter int64, lastTsNs int64) error { - if processor.processedTsWatermark == 0 { + offsetTsNs := processor.processedTsWatermark.Load() + if offsetTsNs == 0 { return nil } // use processor.processedTsWatermark instead of the lastTsNs from the most recent job now := time.Now().UnixNano() - glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9)) + glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) lastLogTsNs = now // collect synchronous offset - statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(processor.processedTsWatermark)) - return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, processor.processedTsWatermark) + statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs)) + return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs) }) metadataFollowOption := &pb.MetadataFollowOption{ -- cgit v1.2.3