diff options
Diffstat (limited to 'weed/filer/stream.go')
| -rw-r--r-- | weed/filer/stream.go | 43 |
1 files changed, 38 insertions, 5 deletions
diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 573ab65e8..7e041e213 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -3,6 +3,7 @@ package filer import ( "bytes" "fmt" + "golang.org/x/sync/errgroup" "io" "math" "strings" @@ -33,16 +34,32 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c fileId2Url[chunkView.FileId] = urlStrings } - for _, chunkView := range chunkViews { + for idx, chunkView := range chunkViews { urlStrings := fileId2Url[chunkView.FileId] - data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) + // Pre-check all chunkViews urls + gErr := new(errgroup.Group) + if len(chunkViews) > 1 && idx == 0 { + CheckAllChunkViews(chunkViews[1:], &fileId2Url, gErr) + } + data, err := retriedFetchChunkData( + urlStrings, + chunkView.CipherKey, + chunkView.IsGzipped, + chunkView.IsFullChunk(), + false, + chunkView.Offset, + int(chunkView.Size), + ) if err != nil { glog.Errorf("read chunk: %v", err) return fmt.Errorf("read chunk: %v", err) } - + if err := gErr.Wait(); err != nil { + glog.Errorf("check all chunks: %v", err) + return fmt.Errorf("check all chunks: %v", err) + } _, err = w.Write(data) if err != nil { glog.Errorf("write chunk: %v", err) @@ -54,6 +71,22 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, c } +func CheckAllChunkViews(chunkViews []*ChunkView, fileId2Url *map[string][]string, gErr *errgroup.Group) { + for _, chunkView := range chunkViews { + gErr.Go(func() error { + _, err := retriedFetchChunkData( + (*fileId2Url)[chunkView.FileId], + chunkView.CipherKey, + chunkView.IsGzipped, + chunkView.IsFullChunk(), + true, + chunkView.Offset, + int(chunkView.Size)) + return err + }) + } +} + // ---------------- ReadAllReader ---------------------------------- func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) { @@ -73,7 +106,7 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) return nil, err } - data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) + data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), false, chunkView.Offset, int(chunkView.Size)) if err != nil { return nil, err } @@ -185,7 +218,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { var buffer bytes.Buffer var shouldRetry bool for _, urlString := range urlStrings { - shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), false, chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) if !shouldRetry { |
