diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-10-07 22:49:04 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-10-07 22:49:04 -0700 |
| commit | a8624c2e4f94dce96448f6f3b33fb0f71e1f07df (patch) | |
| tree | 27083da070f50b49d6b5f4ae96db88159d7d573d /weed/replication | |
| parent | da4edf3651d0dd17570057388e5e012eeda4ff80 (diff) | |
| download | seaweedfs-a8624c2e4f94dce96448f6f3b33fb0f71e1f07df.tar.xz seaweedfs-a8624c2e4f94dce96448f6f3b33fb0f71e1f07df.zip | |
read from alternative replica
related to https://github.com/chrislusf/seaweedfs/issues/1512
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/repl_util/replication_utli.go | 40 | ||||
| -rw-r--r-- | weed/replication/sink/azuresink/azure_sink.go | 25 | ||||
| -rw-r--r-- | weed/replication/sink/b2sink/b2_sink.go | 30 | ||||
| -rw-r--r-- | weed/replication/sink/gcssink/gcs_sink.go | 22 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_write.go | 11 | ||||
| -rw-r--r-- | weed/replication/source/filer_source.go | 21 |
6 files changed, 86 insertions, 63 deletions
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go new file mode 100644 index 000000000..9b18275b5 --- /dev/null +++ b/weed/replication/repl_util/replication_utli.go @@ -0,0 +1,40 @@ +package repl_util + +import ( + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.FilerSource, writeFunc func(data []byte) error) error { + + for _, chunk := range chunkViews { + + fileUrls, err := filerSource.LookupFileId(chunk.FileId) + if err != nil { + return err + } + + var writeErr error + + for _, fileUrl := range fileUrls { + err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { + writeErr = writeFunc(data) + }) + if err != nil { + glog.V(1).Infof("read from %s: %v", fileUrl, err) + } else if writeErr != nil { + glog.V(1).Infof("copy from %s: %v", fileUrl, writeErr) + } else { + break + } + } + + if err != nil { + return err + } + + } + return nil +} diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index dab5cf4f4..df70be64b 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/replication/repl_util" "net/url" "strings" @@ -107,25 +108,13 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] return err } - for _, chunk := range chunkViews { - - fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) - if err != nil { - return err - } - - var writeErr error - readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { - _, writeErr = appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil) - }) - - if readErr != nil { - return readErr - } - if writeErr != nil { - return writeErr - } + writeFunc := func(data []byte) error { + _, writeErr := appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil) + return writeErr + } + if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { + return err } return nil diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index cf212f129..24f0ecbbc 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -2,6 +2,7 @@ package B2Sink import ( "context" + "github.com/chrislusf/seaweedfs/weed/replication/repl_util" "strings" "github.com/chrislusf/seaweedfs/weed/filer" @@ -95,31 +96,18 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int targetObject := bucket.Object(key) writer := targetObject.NewWriter(context.Background()) - for _, chunk := range chunkViews { - - fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) - if err != nil { - return err - } - - var writeErr error - readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { - _, err := writer.Write(data) - if err != nil { - writeErr = err - } - }) + writeFunc := func(data []byte) error { + _, writeErr := writer.Write(data) + return writeErr + } - if readErr != nil { - return readErr - } - if writeErr != nil { - return writeErr - } + defer writer.Close() + if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { + return err } - return writer.Close() + return nil } diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index c6bfa212a..badabc32c 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -3,6 +3,7 @@ package gcssink import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/replication/repl_util" "os" "cloud.google.com/go/storage" @@ -93,25 +94,14 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []in chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background()) + defer wc.Close() - for _, chunk := range chunkViews { - - fileUrl, err := g.filerSource.LookupFileId(chunk.FileId) - if err != nil { - return err - } - - err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { - wc.Write(data) - }) - - if err != nil { - return err - } - + writeFunc := func(data []byte) error { + _, writeErr := wc.Write(data) + return writeErr } - if err := wc.Close(); err != nil { + if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil { return err } diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go index 8a8e7a92b..45265d1ba 100644 --- a/weed/replication/sink/s3sink/s3_write.go +++ b/weed/replication/sink/s3sink/s3_write.go @@ -157,11 +157,18 @@ func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySou } func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, error) { - fileUrl, err := s3sink.filerSource.LookupFileId(chunk.FileId) + fileUrls, err := s3sink.filerSource.LookupFileId(chunk.FileId) if err != nil { return nil, err } buf := make([]byte, chunk.Size) - util.ReadUrl(fileUrl, nil, false, false, chunk.Offset, int(chunk.Size), buf) + for _, fileUrl := range fileUrls { + _, err = util.ReadUrl(fileUrl, nil, false, false, chunk.Offset, int(chunk.Size), buf) + if err != nil { + glog.V(1).Infof("read from %s: %v", fileUrl, err) + } else { + break + } + } return bytes.NewReader(buf), nil } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 9106ee98b..c3ef8835c 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -41,7 +41,7 @@ func (fs *FilerSource) DoInitialize(grpcAddress string, dir string) (err error) return nil } -func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) { +func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) { vid2Locations := make(map[string]*filer_pb.Locations) @@ -64,29 +64,38 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) { if err != nil { glog.V(1).Infof("LookupFileId volume id %s: %v", vid, err) - return "", fmt.Errorf("LookupFileId volume id %s: %v", vid, err) + return nil, fmt.Errorf("LookupFileId volume id %s: %v", vid, err) } locations := vid2Locations[vid] if locations == nil || len(locations.Locations) == 0 { glog.V(1).Infof("LookupFileId locate volume id %s: %v", vid, err) - return "", fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err) + return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err) } - fileUrl = fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part) + for _, loc := range locations.Locations { + fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s", loc.Url, part)) + } return } func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, resp *http.Response, err error) { - fileUrl, err := fs.LookupFileId(part) + fileUrls, err := fs.LookupFileId(part) if err != nil { return "", nil, nil, err } - filename, header, resp, err = util.DownloadFile(fileUrl) + for _, fileUrl := range fileUrls { + filename, header, resp, err = util.DownloadFile(fileUrl) + if err != nil { + glog.V(1).Infof("fail to read from %s: %v", fileUrl, err) + } else { + break + } + } return filename, header, resp, err } |
