aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go4
-rw-r--r--weed/replication/sink/b2sink/b2_sink.go5
-rw-r--r--weed/replication/sink/gcssink/gcs_sink.go4
-rw-r--r--weed/replication/sink/localsink/local_sink.go4
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go13
-rw-r--r--weed/replication/sink/s3sink/s3_write.go5
6 files changed, 34 insertions, 1 deletions
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index 363f3e7a9..87540383d 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -119,6 +119,10 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
return writeErr
}
+ if len(entry.Content) > 0 {
+ return writeFunc(entry.Content)
+ }
+
if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
return err
}
diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go
index fe65a9447..52883644b 100644
--- a/weed/replication/sink/b2sink/b2_sink.go
+++ b/weed/replication/sink/b2sink/b2_sink.go
@@ -101,13 +101,16 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int
targetObject := bucket.Object(key)
writer := targetObject.NewWriter(context.Background())
+ defer writer.Close()
writeFunc := func(data []byte) error {
_, writeErr := writer.Write(data)
return writeErr
}
- defer writer.Close()
+ if len(entry.Content) > 0 {
+ return writeFunc(entry.Content)
+ }
if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
return err
diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go
index 2d48f3682..8c9fd5b15 100644
--- a/weed/replication/sink/gcssink/gcs_sink.go
+++ b/weed/replication/sink/gcssink/gcs_sink.go
@@ -107,6 +107,10 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []in
return writeErr
}
+ if len(entry.Content) > 0 {
+ return writeFunc(entry.Content)
+ }
+
if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
return err
}
diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go
index 56e102ebc..5a953353b 100644
--- a/weed/replication/sink/localsink/local_sink.go
+++ b/weed/replication/sink/localsink/local_sink.go
@@ -101,6 +101,10 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa
return writeErr
}
+ if len(entry.Content) > 0 {
+ return writeFunc(entry.Content)
+ }
+
if err := repl_util.CopyFromChunkViews(chunkViews, localsink.filerSource, writeFunc); err != nil {
return err
}
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index 4fb59fb37..55a56c6f5 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -1,6 +1,7 @@
package S3Sink
import (
+ "bytes"
"context"
"fmt"
"strings"
@@ -121,6 +122,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
}
totalSize := filer.FileSize(entry)
+
chunkViews := filer.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
parts := make([]*s3.CompletedPart, len(chunkViews))
@@ -141,6 +143,17 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
}
wg.Wait()
+ // for small files
+ if len(entry.Content) > 0 {
+ parts = make([]*s3.CompletedPart, 1)
+ if part, uploadErr := s3sink.doUploadPart(key, uploadId, 1, bytes.NewReader(entry.Content)); uploadErr != nil {
+ err = uploadErr
+ glog.Errorf("uploadPart: %v", uploadErr)
+ } else {
+ parts[0] = part
+ }
+ }
+
if err != nil {
s3sink.abortMultipartUpload(key, uploadId)
return fmt.Errorf("uploadPart: %v", err)
diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go
index 3f0f482fa..480859db8 100644
--- a/weed/replication/sink/s3sink/s3_write.go
+++ b/weed/replication/sink/s3sink/s3_write.go
@@ -116,6 +116,11 @@ func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer.
return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
}
+ return s3sink.doUploadPart(key, uploadId, partId, readSeeker)
+}
+
+func (s3sink *S3Sink) doUploadPart(key, uploadId string, partId int, readSeeker io.ReadSeeker) (*s3.CompletedPart, error) {
+
input := &s3.UploadPartInput{
Body: readSeeker,
Bucket: aws.String(s3sink.bucket),