aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_sync_jobs.go
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2024-01-12 23:57:18 +0500
committerGitHub <noreply@github.com>2024-01-12 10:57:18 -0800
commit1169f943103684ded4d67edac686fd94e8e78ccc (patch)
tree107c58b62c76202d32ca7c8cd34cf1af58ce2668 /weed/command/filer_sync_jobs.go
parent0e8a54f6f67e534d3af01d70ce45bd9cbfe87d42 (diff)
downloadseaweedfs-1169f943103684ded4d67edac686fd94e8e78ccc.tar.xz
seaweedfs-1169f943103684ded4d67edac686fd94e8e78ccc.zip
Fix filer sync set offset (#5197)
* fix: compose 2mount with sync * fix: DATA RACE https://github.com/seaweedfs/seaweedfs/issues/5194 https://github.com/seaweedfs/seaweedfs/issues/5195
Diffstat (limited to 'weed/command/filer_sync_jobs.go')
-rw-r--r--weed/command/filer_sync_jobs.go8
1 files changed, 5 insertions, 3 deletions
diff --git a/weed/command/filer_sync_jobs.go b/weed/command/filer_sync_jobs.go
index 9d2ba75d5..d49031b98 100644
--- a/weed/command/filer_sync_jobs.go
+++ b/weed/command/filer_sync_jobs.go
@@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"sync"
+ "sync/atomic"
)
type MetadataProcessor struct {
@@ -14,15 +15,16 @@ type MetadataProcessor struct {
activeJobsCond *sync.Cond
concurrencyLimit int
fn pb.ProcessMetadataFunc
- processedTsWatermark int64
+ processedTsWatermark atomic.Int64
}
-func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int) *MetadataProcessor {
+func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor {
t := &MetadataProcessor{
fn: fn,
activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse),
concurrencyLimit: concurrency,
}
+ t.processedTsWatermark.Store(offsetTsNs)
t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
return t
}
@@ -61,7 +63,7 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse)
}
}
if isOldest {
- t.processedTsWatermark = resp.TsNs
+ t.processedTsWatermark.Store(resp.TsNs)
}
t.activeJobsCond.Signal()
}()