aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/replication/sink/filersink/fetch_write.go20
-rw-r--r--weed/replication/sink/filersink/filer_sink.go2
2 files changed, 13 insertions, 9 deletions
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index c0321039b..00a48d41a 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -24,15 +24,17 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st
var wg sync.WaitGroup
for chunkIndex, sourceChunk := range sourceChunks {
wg.Add(1)
- go func(chunk *filer_pb.FileChunk, index int) {
- defer wg.Done()
- replicatedChunk, e := fs.replicateOneChunk(chunk, path)
- if e != nil {
- err = e
- return
- }
- replicatedChunks[index] = replicatedChunk
- }(sourceChunk, chunkIndex)
+ fs.executor.Execute(func() {
+ func(chunk *filer_pb.FileChunk, index int) {
+ defer wg.Done()
+ replicatedChunk, e := fs.replicateOneChunk(chunk, path)
+ if e != nil {
+ err = e
+ return
+ }
+ replicatedChunks[index] = replicatedChunk
+ }(sourceChunk, chunkIndex)
+ })
}
wg.Wait()
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 816d18a02..3af5a4a80 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -32,6 +32,7 @@ type FilerSink struct {
address string
writeChunkByFiler bool
isIncremental bool
+ executor *util.LimitedConcurrentExecutor
}
func init() {
@@ -53,6 +54,7 @@ func (fs *FilerSink) IsIncremental() bool {
func (fs *FilerSink) Initialize(configuration util.Configuration, prefix string) error {
fs.isIncremental = configuration.GetBool(prefix + "is_incremental")
fs.dataCenter = configuration.GetString(prefix + "dataCenter")
+ fs.executor = util.NewLimitedConcurrentExecutor(32)
return fs.DoInitialize(
"",
configuration.GetString(prefix+"grpcAddress"),