aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/stream.go
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2020-10-10 13:07:57 +0800
committerGitHub <noreply@github.com>2020-10-10 13:07:57 +0800
commit411e49f96494629fa3da334e8dc7cc15bd4d19cd (patch)
tree1756f0a2f86c871a6db67df7ecbd509c9df32233 /weed/filer/stream.go
parentac162fc85769cb1b2a1f8694f9644eae7d0ce6c8 (diff)
parent4a15e9c830de1b654515308e5be8380ffa34aefa (diff)
downloadseaweedfs-411e49f96494629fa3da334e8dc7cc15bd4d19cd.tar.xz
seaweedfs-411e49f96494629fa3da334e8dc7cc15bd4d19cd.zip
Merge pull request #23 from chrislusf/master
sync
Diffstat (limited to 'weed/filer/stream.go')
-rw-r--r--weed/filer/stream.go48
1 files changed, 27 insertions, 21 deletions
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index dc6e414ca..f6e2a7643 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -17,28 +17,28 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
// fmt.Printf("start to stream content for chunks: %+v\n", chunks)
chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
- fileId2Url := make(map[string]string)
+ fileId2Url := make(map[string][]string)
for _, chunkView := range chunkViews {
- urlString, err := masterClient.LookupFileId(chunkView.FileId)
+ urlStrings, err := masterClient.LookupFileId(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
}
- fileId2Url[chunkView.FileId] = urlString
+ fileId2Url[chunkView.FileId] = urlStrings
}
for _, chunkView := range chunkViews {
- urlString := fileId2Url[chunkView.FileId]
- err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
- w.Write(data)
- })
- if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ urlStrings := fileId2Url[chunkView.FileId]
+
+ data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ if err == nil {
return err
}
+ w.Write(data)
+
}
return nil
@@ -51,25 +51,24 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk)
buffer := bytes.Buffer{}
- lookupFileIdFn := func(fileId string) (targetUrl string, err error) {
+ lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
return masterClient.LookupFileId(fileId)
}
chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
for _, chunkView := range chunkViews {
- urlString, err := lookupFileIdFn(chunkView.FileId)
+ urlStrings, err := lookupFileIdFn(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return nil, err
}
- err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
- buffer.Write(data)
- })
+
+ data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
return nil, err
}
+ buffer.Write(data)
}
return buffer.Bytes(), nil
}
@@ -89,7 +88,7 @@ var _ = io.ReadSeeker(&ChunkStreamReader{})
func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
- lookupFileIdFn := func(fileId string) (targetUrl string, err error) {
+ lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
return masterClient.LookupFileId(fileId)
}
@@ -169,17 +168,24 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
}
func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
- urlString, err := c.lookupFileId(chunkView.FileId)
+ urlStrings, err := c.lookupFileId(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
}
var buffer bytes.Buffer
- err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
- buffer.Write(data)
- })
+ for _, urlString := range urlStrings {
+ err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ buffer.Write(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ buffer.Reset()
+ } else {
+ break
+ }
+ }
if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
return err
}
c.buffer = buffer.Bytes()