diff options
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/filechunk_manifest.go | 10 | ||||
| -rw-r--r-- | weed/filer/reader_at.go | 2 | ||||
| -rw-r--r-- | weed/filer/stream.go | 17 |
3 files changed, 21 insertions, 8 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 2df8a4bbf..f45448a6e 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -97,12 +97,16 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool var err error var buffer bytes.Buffer + var shouldRetry bool for waitTime := time.Second; waitTime < ReadWaitTime; waitTime += waitTime / 2 { for _, urlString := range urlStrings { - err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + shouldRetry, err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { buffer.Write(data) }) + if !shouldRetry { + break + } if err != nil { glog.V(0).Infof("read %s failed, err: %v", urlString, err) buffer.Reset() @@ -110,8 +114,8 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool break } } - if err != nil { - glog.V(0).Infof("sleep for %v before retrying reading", waitTime) + if err != nil && shouldRetry { + glog.V(0).Infof("retry reading in %v", waitTime) time.Sleep(waitTime) } else { break diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 835d6cfc2..da7eae621 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -75,7 +75,7 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType { } for _, loc := range locations.Locations { - volumeServerAddress := filerClient.AdjustedUrl(loc.Url) + volumeServerAddress := filerClient.AdjustedUrl(loc) targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) targetUrls = append(targetUrls, targetUrl) } diff --git a/weed/filer/stream.go b/weed/filer/stream.go index a41aebe22..363b07f14 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -2,6 +2,7 @@ package filer import ( "bytes" + "fmt" "io" "math" "strings" @@ -35,10 +36,14 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) if err != nil { - return err + glog.Errorf("read chunk: %v", err) + return fmt.Errorf("read chunk: %v", err) + } + _, err = w.Write(data) + if err != nil { + glog.Errorf("write chunk: %v", err) + return fmt.Errorf("write chunk: %v", err) } - w.Write(data) - } return nil @@ -174,10 +179,14 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { return err } var buffer bytes.Buffer + var shouldRetry bool for _, urlString := range urlStrings { - err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) + if !shouldRetry { + break + } if err != nil { glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) buffer.Reset() |
