aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/sink/b2sink/b2_sink.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/replication/sink/b2sink/b2_sink.go')
-rw-r--r--weed/replication/sink/b2sink/b2_sink.go30
1 files changed, 9 insertions, 21 deletions
diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go
index cf212f129..24f0ecbbc 100644
--- a/weed/replication/sink/b2sink/b2_sink.go
+++ b/weed/replication/sink/b2sink/b2_sink.go
@@ -2,6 +2,7 @@ package B2Sink
import (
"context"
+ "github.com/chrislusf/seaweedfs/weed/replication/repl_util"
"strings"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -95,31 +96,18 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int
targetObject := bucket.Object(key)
writer := targetObject.NewWriter(context.Background())
- for _, chunk := range chunkViews {
-
- fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
- if err != nil {
- return err
- }
-
- var writeErr error
- readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
- _, err := writer.Write(data)
- if err != nil {
- writeErr = err
- }
- })
+ writeFunc := func(data []byte) error {
+ _, writeErr := writer.Write(data)
+ return writeErr
+ }
- if readErr != nil {
- return readErr
- }
- if writeErr != nil {
- return writeErr
- }
+ defer writer.Close()
+ if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
+ return err
}
- return writer.Close()
+ return nil
}