aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-10-07 22:49:04 -0700
committerChris Lu <chris.lu@gmail.com>2020-10-07 22:49:04 -0700
commita8624c2e4f94dce96448f6f3b33fb0f71e1f07df (patch)
tree27083da070f50b49d6b5f4ae96db88159d7d573d
parentda4edf3651d0dd17570057388e5e012eeda4ff80 (diff)
downloadseaweedfs-a8624c2e4f94dce96448f6f3b33fb0f71e1f07df.tar.xz
seaweedfs-a8624c2e4f94dce96448f6f3b33fb0f71e1f07df.zip
read from alternative replica
related to https://github.com/chrislusf/seaweedfs/issues/1512
-rw-r--r--weed/command/benchmark.go9
-rw-r--r--weed/filer/filechunk_manifest.go21
-rw-r--r--weed/filer/reader_at.go24
-rw-r--r--weed/filer/stream.go64
-rw-r--r--weed/replication/repl_util/replication_utli.go40
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go25
-rw-r--r--weed/replication/sink/b2sink/b2_sink.go30
-rw-r--r--weed/replication/sink/gcssink/gcs_sink.go22
-rw-r--r--weed/replication/sink/s3sink/s3_write.go11
-rw-r--r--weed/replication/source/filer_source.go21
-rw-r--r--weed/server/filer_grpc_server.go11
-rw-r--r--weed/wdclient/vid_map.go51
12 files changed, 182 insertions, 147 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 4a9a9619a..8bb585d91 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -282,14 +282,19 @@ func readFiles(fileIdLineChan chan string, s *stat) {
start := time.Now()
var bytesRead int
var err error
- url, err := b.masterClient.LookupFileId(fid)
+ urls, err := b.masterClient.LookupFileId(fid)
if err != nil {
s.failed++
println("!!!! ", fid, " location not found!!!!!")
continue
}
var bytes []byte
- bytes, err = util.Get(url)
+ for _, url := range urls {
+ bytes, err = util.Get(url)
+ if err == nil {
+ break
+ }
+ }
bytesRead = len(bytes)
if err == nil {
s.completed++
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 37b172357..6fcd7abc2 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -84,21 +84,26 @@ func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *fil
// TODO fetch from cache for weed mount?
func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
- urlString, err := lookupFileIdFn(fileId)
+ urlStrings, err := lookupFileIdFn(fileId)
if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
return nil, err
}
var buffer bytes.Buffer
- err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) {
- buffer.Write(data)
- })
- if err != nil {
- glog.V(0).Infof("read %s failed, err: %v", fileId, err)
- return nil, err
+
+ for _, urlString := range urlStrings {
+ err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) {
+ buffer.Write(data)
+ })
+ if err != nil {
+ glog.V(0).Infof("read %s failed, err: %v", fileId, err)
+ buffer.Reset()
+ } else {
+ break
+ }
}
- return buffer.Bytes(), nil
+ return buffer.Bytes(), err
}
func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 3aeb9a94d..74f037557 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -5,7 +5,6 @@ import (
"fmt"
"github.com/golang/groupcache/singleflight"
"io"
- "math/rand"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -17,24 +16,24 @@ import (
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews []*ChunkView
- lookupFileId func(fileId string) (targetUrl string, err error)
+ lookupFileId LookupFileIdFunctionType
readerLock sync.Mutex
fileSize int64
- fetchGroup singleflight.Group
- lastChunkFileId string
- lastChunkData []byte
- chunkCache chunk_cache.ChunkCache
+ fetchGroup singleflight.Group
+ lastChunkFileId string
+ lastChunkData []byte
+ chunkCache chunk_cache.ChunkCache
}
// var _ = io.ReaderAt(&ChunkReadAt{})
-type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error)
+type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error)
func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
vidCache := make(map[string]*filer_pb.Locations)
- return func(fileId string) (targetUrl string, err error) {
+ return func(fileId string) (targetUrls []string, err error) {
vid := VolumeId(fileId)
locations, found := vidCache[vid]
@@ -59,8 +58,11 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
})
}
- volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[rand.Intn(len(locations.Locations))].Url)
- targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
+ for _, loc := range locations.Locations {
+ volumeServerAddress := filerClient.AdjustedUrl(loc.Url)
+ targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
+ targetUrls = append(targetUrls, targetUrl)
+ }
return
}
@@ -142,7 +144,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
}
-func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView, nextChunkViews... *ChunkView) (chunkData []byte, err error) {
+func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView, nextChunkViews ...*ChunkView) (chunkData []byte, err error) {
if c.lastChunkFileId == chunkView.FileId {
return c.lastChunkData, nil
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index dc6e414ca..d98229379 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -17,27 +17,32 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
// fmt.Printf("start to stream content for chunks: %+v\n", chunks)
chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
- fileId2Url := make(map[string]string)
+ fileId2Url := make(map[string][]string)
for _, chunkView := range chunkViews {
- urlString, err := masterClient.LookupFileId(chunkView.FileId)
+ urlStrings, err := masterClient.LookupFileId(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
}
- fileId2Url[chunkView.FileId] = urlString
+ fileId2Url[chunkView.FileId] = urlStrings
}
for _, chunkView := range chunkViews {
- urlString := fileId2Url[chunkView.FileId]
- err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
- w.Write(data)
- })
- if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
- return err
+ urlStrings := fileId2Url[chunkView.FileId]
+ for _, urlString := range urlStrings {
+ err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ w.Write(data)
+ })
+ if err != nil {
+ // data already written to w would be wrong
+ // but usually there are nothing written if fails to read
+ glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ } else {
+ break
+ }
}
}
@@ -51,24 +56,28 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk)
buffer := bytes.Buffer{}
- lookupFileIdFn := func(fileId string) (targetUrl string, err error) {
+ lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
return masterClient.LookupFileId(fileId)
}
chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
for _, chunkView := range chunkViews {
- urlString, err := lookupFileIdFn(chunkView.FileId)
+ urlStrings, err := lookupFileIdFn(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return nil, err
}
- err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
- buffer.Write(data)
- })
- if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
- return nil, err
+ for _, urlString := range urlStrings {
+ err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ buffer.Write(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ buffer.Reset()
+ } else {
+ break
+ }
}
}
return buffer.Bytes(), nil
@@ -89,7 +98,7 @@ var _ = io.ReadSeeker(&ChunkStreamReader{})
func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
- lookupFileIdFn := func(fileId string) (targetUrl string, err error) {
+ lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
return masterClient.LookupFileId(fileId)
}
@@ -169,17 +178,24 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
}
func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
- urlString, err := c.lookupFileId(chunkView.FileId)
+ urlStrings, err := c.lookupFileId(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
}
var buffer bytes.Buffer
- err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
- buffer.Write(data)
- })
+ for _, urlString := range urlStrings {
+ err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ buffer.Write(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ buffer.Reset()
+ } else {
+ break
+ }
+ }
if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
return err
}
c.buffer = buffer.Bytes()
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go
new file mode 100644
index 000000000..9b18275b5
--- /dev/null
+++ b/weed/replication/repl_util/replication_utli.go
@@ -0,0 +1,40 @@
+package repl_util
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.FilerSource, writeFunc func(data []byte) error) error {
+
+ for _, chunk := range chunkViews {
+
+ fileUrls, err := filerSource.LookupFileId(chunk.FileId)
+ if err != nil {
+ return err
+ }
+
+ var writeErr error
+
+ for _, fileUrl := range fileUrls {
+ err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
+ writeErr = writeFunc(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read from %s: %v", fileUrl, err)
+ } else if writeErr != nil {
+ glog.V(1).Infof("copy from %s: %v", fileUrl, writeErr)
+ } else {
+ break
+ }
+ }
+
+ if err != nil {
+ return err
+ }
+
+ }
+ return nil
+}
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index dab5cf4f4..df70be64b 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/replication/repl_util"
"net/url"
"strings"
@@ -107,25 +108,13 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
return err
}
- for _, chunk := range chunkViews {
-
- fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
- if err != nil {
- return err
- }
-
- var writeErr error
- readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
- _, writeErr = appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil)
- })
-
- if readErr != nil {
- return readErr
- }
- if writeErr != nil {
- return writeErr
- }
+ writeFunc := func(data []byte) error {
+ _, writeErr := appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil)
+ return writeErr
+ }
+ if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
+ return err
}
return nil
diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go
index cf212f129..24f0ecbbc 100644
--- a/weed/replication/sink/b2sink/b2_sink.go
+++ b/weed/replication/sink/b2sink/b2_sink.go
@@ -2,6 +2,7 @@ package B2Sink
import (
"context"
+ "github.com/chrislusf/seaweedfs/weed/replication/repl_util"
"strings"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -95,31 +96,18 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int
targetObject := bucket.Object(key)
writer := targetObject.NewWriter(context.Background())
- for _, chunk := range chunkViews {
-
- fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
- if err != nil {
- return err
- }
-
- var writeErr error
- readErr := util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
- _, err := writer.Write(data)
- if err != nil {
- writeErr = err
- }
- })
+ writeFunc := func(data []byte) error {
+ _, writeErr := writer.Write(data)
+ return writeErr
+ }
- if readErr != nil {
- return readErr
- }
- if writeErr != nil {
- return writeErr
- }
+ defer writer.Close()
+ if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
+ return err
}
- return writer.Close()
+ return nil
}
diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go
index c6bfa212a..badabc32c 100644
--- a/weed/replication/sink/gcssink/gcs_sink.go
+++ b/weed/replication/sink/gcssink/gcs_sink.go
@@ -3,6 +3,7 @@ package gcssink
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/replication/repl_util"
"os"
"cloud.google.com/go/storage"
@@ -93,25 +94,14 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []in
chunkViews := filer.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background())
+ defer wc.Close()
- for _, chunk := range chunkViews {
-
- fileUrl, err := g.filerSource.LookupFileId(chunk.FileId)
- if err != nil {
- return err
- }
-
- err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
- wc.Write(data)
- })
-
- if err != nil {
- return err
- }
-
+ writeFunc := func(data []byte) error {
+ _, writeErr := wc.Write(data)
+ return writeErr
}
- if err := wc.Close(); err != nil {
+ if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc); err != nil {
return err
}
diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go
index 8a8e7a92b..45265d1ba 100644
--- a/weed/replication/sink/s3sink/s3_write.go
+++ b/weed/replication/sink/s3sink/s3_write.go
@@ -157,11 +157,18 @@ func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySou
}
func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, error) {
- fileUrl, err := s3sink.filerSource.LookupFileId(chunk.FileId)
+ fileUrls, err := s3sink.filerSource.LookupFileId(chunk.FileId)
if err != nil {
return nil, err
}
buf := make([]byte, chunk.Size)
- util.ReadUrl(fileUrl, nil, false, false, chunk.Offset, int(chunk.Size), buf)
+ for _, fileUrl := range fileUrls {
+ _, err = util.ReadUrl(fileUrl, nil, false, false, chunk.Offset, int(chunk.Size), buf)
+ if err != nil {
+ glog.V(1).Infof("read from %s: %v", fileUrl, err)
+ } else {
+ break
+ }
+ }
return bytes.NewReader(buf), nil
}
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index 9106ee98b..c3ef8835c 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -41,7 +41,7 @@ func (fs *FilerSource) DoInitialize(grpcAddress string, dir string) (err error)
return nil
}
-func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) {
+func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) {
vid2Locations := make(map[string]*filer_pb.Locations)
@@ -64,29 +64,38 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrl string, err error) {
if err != nil {
glog.V(1).Infof("LookupFileId volume id %s: %v", vid, err)
- return "", fmt.Errorf("LookupFileId volume id %s: %v", vid, err)
+ return nil, fmt.Errorf("LookupFileId volume id %s: %v", vid, err)
}
locations := vid2Locations[vid]
if locations == nil || len(locations.Locations) == 0 {
glog.V(1).Infof("LookupFileId locate volume id %s: %v", vid, err)
- return "", fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
+ return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
}
- fileUrl = fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, part)
+ for _, loc := range locations.Locations {
+ fileUrls = append(fileUrls, fmt.Sprintf("http://%s/%s", loc.Url, part))
+ }
return
}
func (fs *FilerSource) ReadPart(part string) (filename string, header http.Header, resp *http.Response, err error) {
- fileUrl, err := fs.LookupFileId(part)
+ fileUrls, err := fs.LookupFileId(part)
if err != nil {
return "", nil, nil, err
}
- filename, header, resp, err = util.DownloadFile(fileUrl)
+ for _, fileUrl := range fileUrls {
+ filename, header, resp, err = util.DownloadFile(fileUrl)
+ if err != nil {
+ glog.V(1).Infof("fail to read from %s: %v", fileUrl, err)
+ } else {
+ break
+ }
+ }
return filename, header, resp, err
}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index ecd23413f..5166f0896 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -135,16 +135,19 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
return resp, nil
}
-func (fs *FilerServer) lookupFileId(fileId string) (targetUrl string, err error) {
+func (fs *FilerServer) lookupFileId(fileId string) (targetUrls []string, err error) {
fid, err := needle.ParseFileIdFromString(fileId)
if err != nil {
- return "", err
+ return nil, err
}
locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId))
if !found || len(locations) == 0 {
- return "", fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId)
+ return nil, fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId)
+ }
+ for _, loc := range locations {
+ targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId))
}
- return fmt.Sprintf("http://%s/%s", locations[0].Url, fileId), nil
+ return
}
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index 97df49cb6..cee2da6e1 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -44,38 +44,36 @@ func (vc *vidMap) getLocationIndex(length int) (int, error) {
return int(atomic.AddInt32(&vc.cursor, 1)) % length, nil
}
-func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrl string, err error) {
+func (vc *vidMap) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) {
id, err := strconv.Atoi(vid)
if err != nil {
glog.V(1).Infof("Unknown volume id %s", vid)
- return "", err
+ return nil, err
}
- return vc.GetRandomLocation(uint32(id))
-}
-
-func (vc *vidMap) LookupFileId(fileId string) (fullUrl string, err error) {
- parts := strings.Split(fileId, ",")
- if len(parts) != 2 {
- return "", errors.New("Invalid fileId " + fileId)
+ locations, found := vc.GetLocations(uint32(id))
+ if !found {
+ return nil, fmt.Errorf("volume %d not found", id)
}
- serverUrl, lookupError := vc.LookupVolumeServerUrl(parts[0])
- if lookupError != nil {
- return "", lookupError
+ for _, loc := range locations {
+ serverUrls = append(serverUrls, loc.Url)
}
- return "http://" + serverUrl + "/" + fileId, nil
+ return
}
-func (vc *vidMap) LookupVolumeServer(fileId string) (volumeServer string, err error) {
+func (vc *vidMap) LookupFileId(fileId string) (fullUrls []string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
- return "", errors.New("Invalid fileId " + fileId)
+ return nil, errors.New("Invalid fileId " + fileId)
}
- serverUrl, lookupError := vc.LookupVolumeServerUrl(parts[0])
+ serverUrls, lookupError := vc.LookupVolumeServerUrl(parts[0])
if lookupError != nil {
- return "", lookupError
+ return nil, lookupError
}
- return serverUrl, nil
+ for _, serverUrl := range serverUrls {
+ fullUrls = append(fullUrls, "http://"+serverUrl+"/"+fileId)
+ }
+ return
}
func (vc *vidMap) GetVidLocations(vid string) (locations []Location, err error) {
@@ -99,23 +97,6 @@ func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) {
return
}
-func (vc *vidMap) GetRandomLocation(vid uint32) (serverUrl string, err error) {
- vc.RLock()
- defer vc.RUnlock()
-
- locations := vc.vid2Locations[vid]
- if len(locations) == 0 {
- return "", fmt.Errorf("volume %d not found", vid)
- }
-
- index, err := vc.getLocationIndex(len(locations))
- if err != nil {
- return "", fmt.Errorf("volume %d: %v", vid, err)
- }
-
- return locations[index].Url, nil
-}
-
func (vc *vidMap) addLocation(vid uint32, location Location) {
vc.Lock()
defer vc.Unlock()