aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2020-03-11 00:02:10 -0700
committerGitHub <noreply@github.com>2020-03-11 00:02:10 -0700
commitf59fcba265ff6f24ab525e37e32eaebf88286ad7 (patch)
tree2d09d259e6cf5a4edb78e6f8c0488e67f35efdf6
parent60f5f05c78a2918d5219c925cea5847759281a2c (diff)
parent81610ed0067958537b67f23b0d086ce3c47ebc54 (diff)
downloadseaweedfs-f59fcba265ff6f24ab525e37e32eaebf88286ad7.tar.xz
seaweedfs-f59fcba265ff6f24ab525e37e32eaebf88286ad7.zip
Merge pull request #1228 from HongyanShen/master
fix #1226
-rw-r--r--weed/replication/sink/filersink/fetch_write.go11
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go10
2 files changed, 12 insertions, 9 deletions
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index 07218b9b3..360a34620 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -19,17 +19,20 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, dir str
if len(sourceChunks) == 0 {
return
}
+
+ replicatedChunks = make([]*filer_pb.FileChunk, len(sourceChunks))
+
var wg sync.WaitGroup
- for _, sourceChunk := range sourceChunks {
+ for chunkIndex, sourceChunk := range sourceChunks {
wg.Add(1)
- go func(chunk *filer_pb.FileChunk) {
+ go func(chunk *filer_pb.FileChunk, index int) {
defer wg.Done()
replicatedChunk, e := fs.replicateOneChunk(chunk, dir)
if e != nil {
err = e
}
- replicatedChunks = append(replicatedChunks, replicatedChunk)
- }(sourceChunk)
+ replicatedChunks[index] = replicatedChunk
+ }(sourceChunk, chunkIndex)
}
wg.Wait()
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index 5f548559b..e0aee5ada 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -91,7 +91,6 @@ func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks b
}
func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
-
key = cleanKey(key)
if entry.IsDirectory {
@@ -106,19 +105,20 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
totalSize := filer2.TotalSize(entry.Chunks)
chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize))
- var parts []*s3.CompletedPart
+ parts := make([]*s3.CompletedPart, len(chunkViews))
+
var wg sync.WaitGroup
for chunkIndex, chunk := range chunkViews {
partId := chunkIndex + 1
wg.Add(1)
- go func(chunk *filer2.ChunkView) {
+ go func(chunk *filer2.ChunkView, index int) {
defer wg.Done()
if part, uploadErr := s3sink.uploadPart(key, uploadId, partId, chunk); uploadErr != nil {
err = uploadErr
} else {
- parts = append(parts, part)
+ parts[index] = part
}
- }(chunk)
+ }(chunk, chunkIndex)
}
wg.Wait()