aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/filechunk_manifest.go10
-rw-r--r--weed/filer/reader_at.go2
-rw-r--r--weed/filer/stream.go17
3 files changed, 21 insertions, 8 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 2df8a4bbf..f45448a6e 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -97,12 +97,16 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
var err error
var buffer bytes.Buffer
+ var shouldRetry bool
for waitTime := time.Second; waitTime < ReadWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
- err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
+ shouldRetry, err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
buffer.Write(data)
})
+ if !shouldRetry {
+ break
+ }
if err != nil {
glog.V(0).Infof("read %s failed, err: %v", urlString, err)
buffer.Reset()
@@ -110,8 +114,8 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
break
}
}
- if err != nil {
- glog.V(0).Infof("sleep for %v before retrying reading", waitTime)
+ if err != nil && shouldRetry {
+ glog.V(0).Infof("retry reading in %v", waitTime)
time.Sleep(waitTime)
} else {
break
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 835d6cfc2..da7eae621 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -75,7 +75,7 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
}
for _, loc := range locations.Locations {
- volumeServerAddress := filerClient.AdjustedUrl(loc.Url)
+ volumeServerAddress := filerClient.AdjustedUrl(loc)
targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
targetUrls = append(targetUrls, targetUrl)
}
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index a41aebe22..363b07f14 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -2,6 +2,7 @@ package filer
import (
"bytes"
+ "fmt"
"io"
"math"
"strings"
@@ -35,10 +36,14 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
if err != nil {
- return err
+ glog.Errorf("read chunk: %v", err)
+ return fmt.Errorf("read chunk: %v", err)
+ }
+ _, err = w.Write(data)
+ if err != nil {
+ glog.Errorf("write chunk: %v", err)
+ return fmt.Errorf("write chunk: %v", err)
}
- w.Write(data)
-
}
return nil
@@ -174,10 +179,14 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return err
}
var buffer bytes.Buffer
+ var shouldRetry bool
for _, urlString := range urlStrings {
- err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
+ if !shouldRetry {
+ break
+ }
if err != nil {
glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
buffer.Reset()