aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2024-01-12 23:57:18 +0500
committerGitHub <noreply@github.com>2024-01-12 10:57:18 -0800
commit1169f943103684ded4d67edac686fd94e8e78ccc (patch)
tree107c58b62c76202d32ca7c8cd34cf1af58ce2668
parent0e8a54f6f67e534d3af01d70ce45bd9cbfe87d42 (diff)
downloadseaweedfs-1169f943103684ded4d67edac686fd94e8e78ccc.tar.xz
seaweedfs-1169f943103684ded4d67edac686fd94e8e78ccc.zip
Fix filer sync set offset (#5197)
* fix: compose 2mount with sync * fix: DATA RACE https://github.com/seaweedfs/seaweedfs/issues/5194 https://github.com/seaweedfs/seaweedfs/issues/5195
-rw-r--r--docker/compose/local-sync-mount-compose.yml35
-rw-r--r--weed/command/filer_remote_gateway_buckets.go12
-rw-r--r--weed/command/filer_remote_sync_dir.go12
-rw-r--r--weed/command/filer_sync.go22
-rw-r--r--weed/command/filer_sync_jobs.go8
5 files changed, 64 insertions, 25 deletions
diff --git a/docker/compose/local-sync-mount-compose.yml b/docker/compose/local-sync-mount-compose.yml
index fec866698..0ce1fdeda 100644
--- a/docker/compose/local-sync-mount-compose.yml
+++ b/docker/compose/local-sync-mount-compose.yml
@@ -3,19 +3,54 @@ services:
node1:
image: chrislusf/seaweedfs:local
command: "server -master -volume -filer"
+ ports:
+ - 8888:8888
+ - 18888:18888
+ healthcheck:
+ test: [ "CMD", "curl", "--fail", "-I", "http://localhost:9333/cluster/healthz" ]
+ interval: 1s
+ start_period: 10s
+ timeout: 30s
mount1:
image: chrislusf/seaweedfs:local
privileged: true
command: "mount -filer=node1:8888 -dir=/mnt -dirAutoCreate"
+ healthcheck:
+ test: [ "CMD", "curl", "--fail", "-I", "http://node1:8888/" ]
+ interval: 1s
+ start_period: 10s
+ timeout: 30s
+ depends_on:
+ node1:
+ condition: service_healthy
node2:
image: chrislusf/seaweedfs:local
ports:
- 7888:8888
+ - 17888:18888
command: "server -master -volume -filer"
+ healthcheck:
+ test: [ "CMD", "curl", "--fail", "-I", "http://localhost:9333/cluster/healthz" ]
+ interval: 1s
+ start_period: 10s
+ timeout: 30s
mount2:
image: chrislusf/seaweedfs:local
privileged: true
command: "mount -filer=node2:8888 -dir=/mnt -dirAutoCreate"
+ healthcheck:
+ test: [ "CMD", "curl", "--fail", "-I", "http://node2:8888/" ]
+ interval: 1s
+ start_period: 10s
+ timeout: 30s
+ depends_on:
+ node2:
+ condition: service_healthy
sync:
image: chrislusf/seaweedfs:local
command: "-v=4 filer.sync -a=node1:8888 -b=node2:8888 -a.debug -b.debug"
+ depends_on:
+ mount1:
+ condition: service_healthy
+ mount2:
+ condition: service_healthy
diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go
index 912607847..53abee8b7 100644
--- a/weed/command/filer_remote_gateway_buckets.go
+++ b/weed/command/filer_remote_gateway_buckets.go
@@ -30,24 +30,24 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo
return err
}
- processor := NewMetadataProcessor(eachEntryFunc, 128)
+ lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
+ processor := NewMetadataProcessor(eachEntryFunc, 128, lastOffsetTs.UnixNano())
var lastLogTsNs = time.Now().UnixNano()
processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
processor.AddSyncJob(resp)
return nil
}, 3*time.Second, func(counter int64, lastTsNs int64) error {
- if processor.processedTsWatermark == 0 {
+ offsetTsNs := processor.processedTsWatermark.Load()
+ if offsetTsNs == 0 {
return nil
}
now := time.Now().UnixNano()
- glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9))
+ glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
lastLogTsNs = now
- return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, processor.processedTsWatermark)
+ return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, offsetTsNs)
})
- lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
-
option.clientEpoch++
metadataFollowOption := &pb.MetadataFollowOption{
diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go
index d4305b666..00f6d7493 100644
--- a/weed/command/filer_remote_sync_dir.go
+++ b/weed/command/filer_remote_sync_dir.go
@@ -33,7 +33,8 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
return err
}
- processor := NewMetadataProcessor(eachEntryFunc, 128)
+ lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo)
+ processor := NewMetadataProcessor(eachEntryFunc, 128, lastOffsetTs.UnixNano())
var lastLogTsNs = time.Now().UnixNano()
processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
@@ -50,18 +51,17 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
processor.AddSyncJob(resp)
return nil
}, 3*time.Second, func(counter int64, lastTsNs int64) error {
- if processor.processedTsWatermark == 0 {
+ offsetTsNs := processor.processedTsWatermark.Load()
+ if offsetTsNs == 0 {
return nil
}
// use processor.processedTsWatermark instead of the lastTsNs from the most recent job
now := time.Now().UnixNano()
- glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9))
+ glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
lastLogTsNs = now
- return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, processor.processedTsWatermark)
+ return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, offsetTsNs)
})
- lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo)
-
option.clientEpoch++
metadataFollowOption := &pb.MetadataFollowOption{
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index b1e32b65e..006f6794a 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -19,6 +19,7 @@ import (
"os"
"regexp"
"strings"
+ "sync/atomic"
"time"
)
@@ -50,7 +51,7 @@ type SyncOptions struct {
aDoDeleteFiles *bool
bDoDeleteFiles *bool
clientId int32
- clientEpoch int32
+ clientEpoch atomic.Int32
}
const (
@@ -150,10 +151,10 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
os.Exit(2)
}
for {
- syncOptions.clientEpoch++
+ syncOptions.clientEpoch.Add(1)
err := doSubscribeFilerMetaChanges(
syncOptions.clientId,
- syncOptions.clientEpoch,
+ syncOptions.clientEpoch.Load(),
grpcDialOption,
filerA,
*syncOptions.aPath,
@@ -188,10 +189,10 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
}
go func() {
for {
- syncOptions.clientEpoch++
+ syncOptions.clientEpoch.Add(1)
err := doSubscribeFilerMetaChanges(
syncOptions.clientId,
- syncOptions.clientEpoch,
+ syncOptions.clientEpoch.Load(),
grpcDialOption,
filerB,
*syncOptions.bPath,
@@ -274,7 +275,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrencyLimit)
concurrency = DefaultConcurrencyLimit
}
- processor := NewMetadataProcessor(processEventFn, concurrency)
+ processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs)
var lastLogTsNs = time.Now().UnixNano()
var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
@@ -282,16 +283,17 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
processor.AddSyncJob(resp)
return nil
}, 3*time.Second, func(counter int64, lastTsNs int64) error {
- if processor.processedTsWatermark == 0 {
+ offsetTsNs := processor.processedTsWatermark.Load()
+ if offsetTsNs == 0 {
return nil
}
// use processor.processedTsWatermark instead of the lastTsNs from the most recent job
now := time.Now().UnixNano()
- glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, processor.processedTsWatermark), float64(counter)/(float64(now-lastLogTsNs)/1e9))
+ glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
lastLogTsNs = now
// collect synchronous offset
- statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(processor.processedTsWatermark))
- return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, processor.processedTsWatermark)
+ statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs))
+ return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs)
})
metadataFollowOption := &pb.MetadataFollowOption{
diff --git a/weed/command/filer_sync_jobs.go b/weed/command/filer_sync_jobs.go
index 9d2ba75d5..d49031b98 100644
--- a/weed/command/filer_sync_jobs.go
+++ b/weed/command/filer_sync_jobs.go
@@ -6,6 +6,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"sync"
+ "sync/atomic"
)
type MetadataProcessor struct {
@@ -14,15 +15,16 @@ type MetadataProcessor struct {
activeJobsCond *sync.Cond
concurrencyLimit int
fn pb.ProcessMetadataFunc
- processedTsWatermark int64
+ processedTsWatermark atomic.Int64
}
-func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int) *MetadataProcessor {
+func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor {
t := &MetadataProcessor{
fn: fn,
activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse),
concurrencyLimit: concurrency,
}
+ t.processedTsWatermark.Store(offsetTsNs)
t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
return t
}
@@ -61,7 +63,7 @@ func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse)
}
}
if isOldest {
- t.processedTsWatermark = resp.TsNs
+ t.processedTsWatermark.Store(resp.TsNs)
}
t.activeJobsCond.Signal()
}()