aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-10-04 11:35:07 -0700
committerchrislu <chris.lu@gmail.com>2022-10-04 11:35:07 -0700
commit0452ae6a6c4a2c9543c3e614672017983ca3c179 (patch)
treef1237752efc9e1f4304caf7956bd4eb433e4de7c
parentec46a34f338ae40d2840c8569b26b02f8c8fbb22 (diff)
downloadseaweedfs-0452ae6a6c4a2c9543c3e614672017983ca3c179.tar.xz
seaweedfs-0452ae6a6c4a2c9543c3e614672017983ca3c179.zip
filer.sync: limit concurrency when fetching file chunks
fix https://github.com/seaweedfs/seaweedfs/issues/3787
-rw-r--r--weed/replication/sink/filersink/fetch_write.go20
-rw-r--r--weed/replication/sink/filersink/filer_sink.go2
2 files changed, 13 insertions, 9 deletions
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index c0321039b..00a48d41a 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -24,15 +24,17 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st
var wg sync.WaitGroup
for chunkIndex, sourceChunk := range sourceChunks {
wg.Add(1)
- go func(chunk *filer_pb.FileChunk, index int) {
- defer wg.Done()
- replicatedChunk, e := fs.replicateOneChunk(chunk, path)
- if e != nil {
- err = e
- return
- }
- replicatedChunks[index] = replicatedChunk
- }(sourceChunk, chunkIndex)
+ fs.executor.Execute(func() {
+ func(chunk *filer_pb.FileChunk, index int) {
+ defer wg.Done()
+ replicatedChunk, e := fs.replicateOneChunk(chunk, path)
+ if e != nil {
+ err = e
+ return
+ }
+ replicatedChunks[index] = replicatedChunk
+ }(sourceChunk, chunkIndex)
+ })
}
wg.Wait()
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 816d18a02..3af5a4a80 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -32,6 +32,7 @@ type FilerSink struct {
address string
writeChunkByFiler bool
isIncremental bool
+ executor *util.LimitedConcurrentExecutor
}
func init() {
@@ -53,6 +54,7 @@ func (fs *FilerSink) IsIncremental() bool {
func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
fs.isIncremental = configuration.GetBool(prefix + "is_incremental")
fs.dataCenter = configuration.GetString(prefix + "dataCenter")
+ fs.executor = util.NewLimitedConcurrentExecutor(32)
return fs.DoInitialize(
"",
configuration.GetString(prefix+"grpcAddress"),