aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/filer/filechunk_manifest.go6
-rw-r--r--weed/filer/stream.go43
-rw-r--r--weed/replication/repl_util/replication_util.go2
-rw-r--r--weed/util/fasthttp_util.go7
4 files changed, 46 insertions, 12 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 99a62c90c..c4f989394 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -91,10 +91,10 @@ func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string,
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
return nil, err
}
- return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0)
+ return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, false, 0, 0)
}
-func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) {
+func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, isCheck bool, offset int64, size int) ([]byte, error) {
var err error
var buffer bytes.Buffer
@@ -102,7 +102,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
- shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
+ shouldRetry, err = util.FastReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, isCheck, offset, size, func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 573ab65e8..7e041e213 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -3,6 +3,7 @@ package filer
import (
"bytes"
"fmt"
+ "golang.org/x/sync/errgroup"
"io"
"math"
"strings"
@@ -33,16 +34,32 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c
fileId2Url[chunkView.FileId] = urlStrings
}
- for _, chunkView := range chunkViews {
+ for idx, chunkView := range chunkViews {
urlStrings := fileId2Url[chunkView.FileId]
- data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ // Pre-check all chunkViews urls
+ gErr := new(errgroup.Group)
+ if len(chunkViews) > 1 && idx == 0 {
+ CheckAllChunkViews(chunkViews[1:], &fileId2Url, gErr)
+ }
+ data, err := retriedFetchChunkData(
+ urlStrings,
+ chunkView.CipherKey,
+ chunkView.IsGzipped,
+ chunkView.IsFullChunk(),
+ false,
+ chunkView.Offset,
+ int(chunkView.Size),
+ )
if err != nil {
glog.Errorf("read chunk: %v", err)
return fmt.Errorf("read chunk: %v", err)
}
-
+ if err := gErr.Wait(); err != nil {
+ glog.Errorf("check all chunks: %v", err)
+ return fmt.Errorf("check all chunks: %v", err)
+ }
_, err = w.Write(data)
if err != nil {
glog.Errorf("write chunk: %v", err)
@@ -54,6 +71,22 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c
}
+func CheckAllChunkViews(chunkViews []*ChunkView, fileId2Url *map[string][]string, gErr *errgroup.Group) {
+ for _, chunkView := range chunkViews {
+ gErr.Go(func() error {
+ _, err := retriedFetchChunkData(
+ (*fileId2Url)[chunkView.FileId],
+ chunkView.CipherKey,
+ chunkView.IsGzipped,
+ chunkView.IsFullChunk(),
+ true,
+ chunkView.Offset,
+ int(chunkView.Size))
+ return err
+ })
+ }
+}
+
// ---------------- ReadAllReader ----------------------------------
func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
@@ -73,7 +106,7 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk)
return nil, err
}
- data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), false, chunkView.Offset, int(chunkView.Size))
if err != nil {
return nil, err
}
@@ -185,7 +218,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
- shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), false, chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {
diff --git a/weed/replication/repl_util/replication_util.go b/weed/replication/repl_util/replication_util.go
index f642bb801..23fbe3292 100644
--- a/weed/replication/repl_util/replication_util.go
+++ b/weed/replication/repl_util/replication_util.go
@@ -20,7 +20,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
var shouldRetry bool
for _, fileUrl := range fileUrls {
- shouldRetry, err = util.FastReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
+ shouldRetry, err = util.FastReadUrlAsStream(fileUrl, nil, false, chunk.IsFullChunk(), false, chunk.Offset, int(chunk.Size), func(data []byte) {
writeErr = writeFunc(data)
})
if err != nil {
diff --git a/weed/util/fasthttp_util.go b/weed/util/fasthttp_util.go
index 6c31a40da..02c78e79d 100644
--- a/weed/util/fasthttp_util.go
+++ b/weed/util/fasthttp_util.go
@@ -72,12 +72,11 @@ func FastGet(url string) ([]byte, bool, error) {
return out, false, nil
}
-func FastReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
+func FastReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, isCheck bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
if cipherKey != nil {
return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
}
-
req := fasthttp.AcquireRequest()
res := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
@@ -85,7 +84,9 @@ func FastReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool
req.SetRequestURIBytes([]byte(fileUrl))
- if isFullChunk {
+ if isCheck {
+ req.Header.Add("Range", "bytes=0-1")
+ } else if isFullChunk {
req.Header.Add("Accept-Encoding", "gzip")
} else {
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))