aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_sync_jobs.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-13 17:52:18 -0800
committerchrislu <chris.lu@gmail.com>2024-01-13 17:52:18 -0800
commit49f7de9daa46a9cc1b793ba56e39b84b288e3dc4 (patch)
tree79de189b8903c7db01f7cf8bb274672a291b5fff /weed/command/filer_sync_jobs.go
parentd6ba97219be6f9b94f56008863423004fac46358 (diff)
parent8d23e36c4512679e1c866bf166bd57b6b52bcc90 (diff)
downloadseaweedfs-49f7de9daa46a9cc1b793ba56e39b84b288e3dc4.tar.xz
seaweedfs-49f7de9daa46a9cc1b793ba56e39b84b288e3dc4.zip
Merge branch 'master' into mq-subscribe
Diffstat (limited to 'weed/command/filer_sync_jobs.go')
-rw-r--r--weed/command/filer_sync_jobs.go8
1 files changed, 5 insertions, 3 deletions
diff --git a/weed/command/filer_sync_jobs.go b/weed/command/filer_sync_jobs.go
index 9d2ba75d5..d49031b98 100644
--- a/weed/command/filer_sync_jobs.go
+++ b/weed/command/filer_sync_jobs.go
@@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"sync"
+ "sync/atomic"
)
type MetadataProcessor struct {
@@ -14,15 +15,16 @@ type MetadataProcessor struct {
activeJobsCond *sync.Cond
concurrencyLimit int
fn pb.ProcessMetadataFunc
- processedTsWatermark int64
+ processedTsWatermark atomic.Int64
}
-func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int) *MetadataProcessor {
+func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor {
t := &MetadataProcessor{
fn: fn,
activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse),
concurrencyLimit: concurrency,
}
+ t.processedTsWatermark.Store(offsetTsNs)
t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
return t
}
@@ -61,7 +63,7 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse)
}
}
if isOldest {
- t.processedTsWatermark = resp.TsNs
+ t.processedTsWatermark.Store(resp.TsNs)
}
t.activeJobsCond.Signal()
}()