aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/repl_util/replication_util.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/replication/repl_util/replication_util.go')
-rw-r--r--weed/replication/repl_util/replication_util.go42
1 files changed, 42 insertions, 0 deletions
diff --git a/weed/replication/repl_util/replication_util.go b/weed/replication/repl_util/replication_util.go
new file mode 100644
index 000000000..519a9a201
--- /dev/null
+++ b/weed/replication/repl_util/replication_util.go
@@ -0,0 +1,42 @@
+package repl_util
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.FilerSource, writeFunc func(data []byte) error) error {
+
+ for _, chunk := range chunkViews {
+
+ fileUrls, err := filerSource.LookupFileId(chunk.FileId)
+ if err != nil {
+ return err
+ }
+
+ var writeErr error
+ var shouldRetry bool
+
+ for _, fileUrl := range fileUrls {
+ shouldRetry, err = util.ReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
+ writeErr = writeFunc(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read from %s: %v", fileUrl, err)
+ } else if writeErr != nil {
+ glog.V(1).Infof("copy from %s: %v", fileUrl, writeErr)
+ } else {
+ break
+ }
+ }
+ if shouldRetry && err != nil {
+ return err
+ }
+ if writeErr != nil {
+ return writeErr
+ }
+ }
+ return nil
+}