aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbernardx <xiaoyq@gmail.com>2022-09-03 14:03:23 +0800
committerGitHub <noreply@github.com>2022-09-02 23:03:23 -0700
commit228b133afa570d18cf14cd5081427f1ed3a50efe (patch)
tree3d47438810e89c4ed0029507f395b0ceb6e26291
parent853880bd837bd394994a2a3580eb236d76093514 (diff)
downloadseaweedfs-228b133afa570d18cf14cd5081427f1ed3a50efe.tar.xz
seaweedfs-228b133afa570d18cf14cd5081427f1ed3a50efe.zip
new 'concurrency' parameter for filer.sync (#3579)
Co-authored-by: XIAOYQ <xiaoyq@eudic.net>
-rw-r--r--weed/command/filer_sync.go22
1 files changed, 16 insertions, 6 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index d6f1d63d8..7133b0aef 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -44,10 +44,16 @@ type SyncOptions struct {
aProxyByFiler *bool
bProxyByFiler *bool
metricsHttpPort *int
+ concurrency *int
clientId int32
clientEpoch int32
}
+const (
+ SyncKeyPrefix = "sync."
+ DefaultConcurrcyLimit = 32
+)
+
var (
syncOptions SyncOptions
syncCpuProfile *string
@@ -77,6 +83,7 @@ func init() {
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
+ syncOptions.concurrency = cmdFilerSynchronize.Flag.Int("concurrency", DefaultConcurrcyLimit, "The maximum number of files that will be synced concurrently.")
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port")
@@ -153,6 +160,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
*syncOptions.bProxyByFiler,
*syncOptions.bDiskType,
*syncOptions.bDebug,
+ *syncOptions.concurrency,
aFilerSignature,
bFilerSignature)
if err != nil {
@@ -189,6 +197,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
*syncOptions.aProxyByFiler,
*syncOptions.aDiskType,
*syncOptions.aDebug,
+ *syncOptions.concurrency,
bFilerSignature,
aFilerSignature)
if err != nil {
@@ -221,7 +230,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd
}
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
- replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error {
+ replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, sourceFilerSignature int32, targetFilerSignature int32) error {
// if first time, start from now
// if has previously synced, resume from that point of time
@@ -251,7 +260,12 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
}
return persistEventFn(resp)
}
- processor := NewMetadataProcessor(processEventFn, 128)
+
+ if concurrency < 0 || concurrency > 1024 {
+ glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrcyLimit)
+ concurrency = DefaultConcurrcyLimit
+ }
+ processor := NewMetadataProcessor(processEventFn, concurrency)
var lastLogTsNs = time.Now().UnixNano()
var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
@@ -276,10 +290,6 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
}
-const (
- SyncKeyPrefix = "sync."
-)
-
// When each business is distinguished according to path, and offsets need to be maintained separately.
func getSignaturePrefixByPath(path string) string {
// compatible historical version