aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-10-13 00:29:46 -0700
committerChris Lu <chris.lu@gmail.com>2020-10-13 00:29:46 -0700
commit3f7d1d1bf146eaeac3ddcd96e3d8de088bdf97ce (patch)
tree4c19cf7c8350a7be607f8fce612c46b109b0b5bc
parentb18f21cce178b60531086f164d24e832a7b6eb86 (diff)
downloadseaweedfs-3f7d1d1bf146eaeac3ddcd96e3d8de088bdf97ce.tar.xz
seaweedfs-3f7d1d1bf146eaeac3ddcd96e3d8de088bdf97ce.zip
Only wait on retryable requests
-rw-r--r--unmaintained/repeated_vacuum/repeated_vacuum.go2
-rw-r--r--weed/command/benchmark.go2
-rw-r--r--weed/filer/filechunk_manifest.go8
-rw-r--r--weed/filer/stream.go6
-rw-r--r--weed/replication/repl_util/replication_utli.go2
-rw-r--r--weed/util/http_util.go38
6 files changed, 34 insertions, 24 deletions
diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go
index 12ac42dbe..bff5becc1 100644
--- a/unmaintained/repeated_vacuum/repeated_vacuum.go
+++ b/unmaintained/repeated_vacuum/repeated_vacuum.go
@@ -32,7 +32,7 @@ func main() {
go func() {
for {
println("vacuum threshold", *garbageThreshold)
- _, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", *master, *garbageThreshold))
+ _, _, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", *master, *garbageThreshold))
if err != nil {
log.Fatalf("vacuum: %v", err)
}
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 8bb585d91..e241a904e 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -290,7 +290,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
}
var bytes []byte
for _, url := range urls {
- bytes, err = util.Get(url)
+ bytes, _, err = util.Get(url)
if err == nil {
break
}
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 2df8a4bbf..271d5e5ee 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -97,12 +97,16 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
var err error
var buffer bytes.Buffer
+ var shouldRetry bool
for waitTime := time.Second; waitTime < ReadWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
- err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
+ shouldRetry, err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
buffer.Write(data)
})
+ if !shouldRetry {
+ break
+ }
if err != nil {
glog.V(0).Infof("read %s failed, err: %v", urlString, err)
buffer.Reset()
@@ -110,7 +114,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
break
}
}
- if err != nil {
+ if err != nil && shouldRetry{
glog.V(0).Infof("sleep for %v before retrying reading", waitTime)
time.Sleep(waitTime)
} else {
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index a41aebe22..1e1d5c7f3 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -174,10 +174,14 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
return err
}
var buffer bytes.Buffer
+ var shouldRetry bool
for _, urlString := range urlStrings {
- err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
+ if !shouldRetry {
+ break
+ }
if err != nil {
glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
buffer.Reset()
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go
index 9b18275b5..42777f4ad 100644
--- a/weed/replication/repl_util/replication_utli.go
+++ b/weed/replication/repl_util/replication_utli.go
@@ -19,7 +19,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var writeErr error
for _, fileUrl := range fileUrls {
- err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
+ _, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index eef24b930..da0b3d849 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -67,14 +67,14 @@ func Post(url string, values url.Values) ([]byte, error) {
// github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
// may need increasing http.Client.Timeout
-func Get(url string) ([]byte, error) {
+func Get(url string) ([]byte, bool, error) {
request, err := http.NewRequest("GET", url, nil)
request.Header.Add("Accept-Encoding", "gzip")
response, err := client.Do(request)
if err != nil {
- return nil, err
+ return nil, true, err
}
defer response.Body.Close()
@@ -89,12 +89,13 @@ func Get(url string) ([]byte, error) {
b, err := ioutil.ReadAll(reader)
if response.StatusCode >= 400 {
- return nil, fmt.Errorf("%s: %s", url, response.Status)
+ retryable := response.StatusCode >= 500
+ return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
}
if err != nil {
- return nil, err
+ return nil, false, err
}
- return b, nil
+ return b, false, nil
}
func Head(url string) (http.Header, error) {
@@ -207,7 +208,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
if cipherKey != nil {
var n int
- err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
+ _, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
n = copy(buf, data)
})
return int64(n), err
@@ -272,7 +273,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
return n, err
}
-func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
+func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
if cipherKey != nil {
return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
@@ -280,7 +281,7 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil {
- return err
+ return false, err
}
if isFullChunk {
@@ -291,11 +292,12 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
r, err := client.Do(req)
if err != nil {
- return err
+ return true, err
}
defer CloseResponse(r)
if r.StatusCode >= 400 {
- return fmt.Errorf("%s: %s", fileUrl, r.Status)
+ retryable = r.StatusCode >= 500
+ return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
var reader io.ReadCloser
@@ -317,23 +319,23 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
m, err = reader.Read(buf)
fn(buf[:m])
if err == io.EOF {
- return nil
+ return false, nil
}
if err != nil {
- return err
+ return false, err
}
}
}
-func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
- encryptedData, err := Get(fileUrl)
+func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
+ encryptedData, retryable, err := Get(fileUrl)
if err != nil {
- return fmt.Errorf("fetch %s: %v", fileUrl, err)
+ return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
}
decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
if err != nil {
- return fmt.Errorf("decrypt %s: %v", fileUrl, err)
+ return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
}
if isContentCompressed {
decryptedData, err = DecompressData(decryptedData)
@@ -342,14 +344,14 @@ func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool
}
}
if len(decryptedData) < int(offset)+size {
- return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
+ return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
}
if isFullChunk {
fn(decryptedData)
} else {
fn(decryptedData[int(offset) : int(offset)+size])
}
- return nil
+ return false, nil
}
func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {