diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-13 17:52:18 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-13 17:52:18 -0800 |
| commit | 49f7de9daa46a9cc1b793ba56e39b84b288e3dc4 (patch) | |
| tree | 79de189b8903c7db01f7cf8bb274672a291b5fff /weed/command/filer_sync_jobs.go | |
| parent | d6ba97219be6f9b94f56008863423004fac46358 (diff) | |
| parent | 8d23e36c4512679e1c866bf166bd57b6b52bcc90 (diff) | |
| download | seaweedfs-49f7de9daa46a9cc1b793ba56e39b84b288e3dc4.tar.xz seaweedfs-49f7de9daa46a9cc1b793ba56e39b84b288e3dc4.zip | |
Merge branch 'master' into mq-subscribe
Diffstat (limited to 'weed/command/filer_sync_jobs.go')
| -rw-r--r-- | weed/command/filer_sync_jobs.go | 8 |
1 files changed, 5 insertions, 3 deletions
diff --git a/weed/command/filer_sync_jobs.go b/weed/command/filer_sync_jobs.go index 9d2ba75d5..d49031b98 100644 --- a/weed/command/filer_sync_jobs.go +++ b/weed/command/filer_sync_jobs.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "sync" + "sync/atomic" ) type MetadataProcessor struct { @@ -14,15 +15,16 @@ type MetadataProcessor struct { activeJobsCond *sync.Cond concurrencyLimit int fn pb.ProcessMetadataFunc - processedTsWatermark int64 + processedTsWatermark atomic.Int64 } -func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int) *MetadataProcessor { +func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor { t := &MetadataProcessor{ fn: fn, activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse), concurrencyLimit: concurrency, } + t.processedTsWatermark.Store(offsetTsNs) t.activeJobsCond = sync.NewCond(&t.activeJobsLock) return t } @@ -61,7 +63,7 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) } } if isOldest { - t.processedTsWatermark = resp.TsNs + t.processedTsWatermark.Store(resp.TsNs) } t.activeJobsCond.Signal() }() |
