diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2020-03-11 00:02:10 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-03-11 00:02:10 -0700 |
| commit | f59fcba265ff6f24ab525e37e32eaebf88286ad7 (patch) | |
| tree | 2d09d259e6cf5a4edb78e6f8c0488e67f35efdf6 | |
| parent | 60f5f05c78a2918d5219c925cea5847759281a2c (diff) | |
| parent | 81610ed0067958537b67f23b0d086ce3c47ebc54 (diff) | |
| download | seaweedfs-f59fcba265ff6f24ab525e37e32eaebf88286ad7.tar.xz seaweedfs-f59fcba265ff6f24ab525e37e32eaebf88286ad7.zip | |
Merge pull request #1228 from HongyanShen/master
fix #1226
| -rw-r--r-- | weed/replication/sink/filersink/fetch_write.go | 11 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 10 |
2 files changed, 12 insertions, 9 deletions
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 07218b9b3..360a34620 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -19,17 +19,20 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir str if len(sourceChunks) == 0 { return } + + replicatedChunks = make([]*filer_pb.FileChunk, len(sourceChunks)) + var wg sync.WaitGroup - for _, sourceChunk := range sourceChunks { + for chunkIndex, sourceChunk := range sourceChunks { wg.Add(1) - go func(chunk *filer_pb.FileChunk) { + go func(chunk *filer_pb.FileChunk, index int) { defer wg.Done() replicatedChunk, e := fs.replicateOneChunk(chunk, dir) if e != nil { err = e } - replicatedChunks = append(replicatedChunks, replicatedChunk) - }(sourceChunk) + replicatedChunks[index] = replicatedChunk + }(sourceChunk, chunkIndex) } wg.Wait() diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 5f548559b..e0aee5ada 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -91,7 +91,6 @@ func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks b } func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { - key = cleanKey(key) if entry.IsDirectory { @@ -106,19 +105,20 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { totalSize := filer2.TotalSize(entry.Chunks) chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize)) - var parts []*s3.CompletedPart + parts := make([]*s3.CompletedPart, len(chunkViews)) + var wg sync.WaitGroup for chunkIndex, chunk := range chunkViews { partId := chunkIndex + 1 wg.Add(1) - go func(chunk *filer2.ChunkView) { + go func(chunk *filer2.ChunkView, index int) { defer wg.Done() if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil { err = uploadErr } else { - parts = append(parts, part) + parts[index] = part } - }(chunk) + }(chunk, chunkIndex) } wg.Wait() |
