diff options
Diffstat (limited to 'weed/filer/stream.go')
| -rw-r--r-- | weed/filer/stream.go | 48 |
1 files changed, 27 insertions, 21 deletions
diff --git a/weed/filer/stream.go b/weed/filer/stream.go index dc6e414ca..f6e2a7643 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -17,28 +17,28 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f // fmt.Printf("start to stream content for chunks: %+v\n", chunks) chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size) - fileId2Url := make(map[string]string) + fileId2Url := make(map[string][]string) for _, chunkView := range chunkViews { - urlString, err := masterClient.LookupFileId(chunkView.FileId) + urlStrings, err := masterClient.LookupFileId(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err } - fileId2Url[chunkView.FileId] = urlString + fileId2Url[chunkView.FileId] = urlStrings } for _, chunkView := range chunkViews { - urlString := fileId2Url[chunkView.FileId] - err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { - w.Write(data) - }) - if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + urlStrings := fileId2Url[chunkView.FileId] + + data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) + if err == nil { return err } + w.Write(data) + } return nil @@ -51,25 +51,24 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) buffer := bytes.Buffer{} - lookupFileIdFn := func(fileId string) (targetUrl string, err error) { + lookupFileIdFn := func(fileId string) (targetUrls []string, err error) { return masterClient.LookupFileId(fileId) } chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) for _, chunkView := range chunkViews { - urlString, err := lookupFileIdFn(chunkView.FileId) + urlStrings, err := lookupFileIdFn(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return nil, err } - err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { - buffer.Write(data) - }) + + data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) return nil, err } + buffer.Write(data) } return buffer.Bytes(), nil } @@ -89,7 +88,7 @@ var _ = io.ReadSeeker(&ChunkStreamReader{}) func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - lookupFileIdFn := func(fileId string) (targetUrl string, err error) { + lookupFileIdFn := func(fileId string) (targetUrl []string, err error) { return masterClient.LookupFileId(fileId) } @@ -169,17 +168,24 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { } func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { - urlString, err := c.lookupFileId(chunkView.FileId) + urlStrings, err := c.lookupFileId(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err } var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { - buffer.Write(data) - }) + for _, urlString := range urlStrings { + err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + buffer.Write(data) + }) + if err != nil { + glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + buffer.Reset() + } else { + break + } + } if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) return err } c.buffer = buffer.Bytes() |
