diff options
| author | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2024-01-12 23:57:18 +0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-01-12 10:57:18 -0800 |
| commit | 1169f943103684ded4d67edac686fd94e8e78ccc (patch) | |
| tree | 107c58b62c76202d32ca7c8cd34cf1af58ce2668 /weed/command/filer_sync_jobs.go | |
| parent | 0e8a54f6f67e534d3af01d70ce45bd9cbfe87d42 (diff) | |
| download | seaweedfs-1169f943103684ded4d67edac686fd94e8e78ccc.tar.xz seaweedfs-1169f943103684ded4d67edac686fd94e8e78ccc.zip | |
Fix filer sync set offset (#5197)
* fix: compose 2mount with sync
* fix: DATA RACE
https://github.com/seaweedfs/seaweedfs/issues/5194
https://github.com/seaweedfs/seaweedfs/issues/5195
Diffstat (limited to 'weed/command/filer_sync_jobs.go')
| -rw-r--r-- | weed/command/filer_sync_jobs.go | 8 |
1 files changed, 5 insertions, 3 deletions
diff --git a/weed/command/filer_sync_jobs.go b/weed/command/filer_sync_jobs.go index 9d2ba75d5..d49031b98 100644 --- a/weed/command/filer_sync_jobs.go +++ b/weed/command/filer_sync_jobs.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "sync" + "sync/atomic" ) type MetadataProcessor struct { @@ -14,15 +15,16 @@ type MetadataProcessor struct { activeJobsCond *sync.Cond concurrencyLimit int fn pb.ProcessMetadataFunc - processedTsWatermark int64 + processedTsWatermark atomic.Int64 } -func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int) *MetadataProcessor { +func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor { t := &MetadataProcessor{ fn: fn, activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse), concurrencyLimit: concurrency, } + t.processedTsWatermark.Store(offsetTsNs) t.activeJobsCond = sync.NewCond(&t.activeJobsLock) return t } @@ -61,7 +63,7 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) } } if isOldest { - t.processedTsWatermark = resp.TsNs + t.processedTsWatermark.Store(resp.TsNs) } t.activeJobsCond.Signal() }() |
