diff options
Diffstat (limited to 'weed/command/filer_sync_jobs.go')
| -rw-r--r-- | weed/command/filer_sync_jobs.go | 8 |
1 files changed, 5 insertions, 3 deletions
diff --git a/weed/command/filer_sync_jobs.go b/weed/command/filer_sync_jobs.go index 9d2ba75d5..d49031b98 100644 --- a/weed/command/filer_sync_jobs.go +++ b/weed/command/filer_sync_jobs.go @@ -6,6 +6,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "sync" + "sync/atomic" ) type MetadataProcessor struct { @@ -14,15 +15,16 @@ type MetadataProcessor struct { activeJobsCond *sync.Cond concurrencyLimit int fn pb.ProcessMetadataFunc - processedTsWatermark int64 + processedTsWatermark atomic.Int64 } -func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int) *MetadataProcessor { +func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor { t := &MetadataProcessor{ fn: fn, activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse), concurrencyLimit: concurrency, } + t.processedTsWatermark.Store(offsetTsNs) t.activeJobsCond = sync.NewCond(&t.activeJobsLock) return t } @@ -61,7 +63,7 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) } } if isOldest { - t.processedTsWatermark = resp.TsNs + t.processedTsWatermark.Store(resp.TsNs) } t.activeJobsCond.Signal() }() |
