aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/stream.go')
-rw-r--r--weed/filer/stream.go41
1 files changed, 21 insertions, 20 deletions
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 2f55e3e44..bca9c7f6e 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -2,6 +2,7 @@ package filer
import (
"bytes"
+ "context"
"fmt"
"io"
"math"
@@ -71,7 +72,7 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R
type DoStreamContent func(writer io.Writer) error
func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) {
- return PrepareStreamContentWithThrottler(masterClient, jwtFunc, chunks, offset, size, 0)
+ return PrepareStreamContentWithThrottler(context.Background(), masterClient, jwtFunc, chunks, offset, size, 0)
}
type VolumeServerJwtFunction func(fileId string) string
@@ -80,9 +81,9 @@ func noJwtFunc(string) string {
return ""
}
-func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
+func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks))
- chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
+ chunkViews := ViewFromChunks(ctx, masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
@@ -91,7 +92,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc
var urlStrings []string
var err error
for _, backoff := range getLookupFileIdBackoffSchedule {
- urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId)
+ urlStrings, err = masterClient.GetLookupFileIdFunction()(ctx, chunkView.FileId)
if err == nil && len(urlStrings) > 0 {
break
}
@@ -127,7 +128,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc
urlStrings := fileId2Url[chunkView.FileId]
start := time.Now()
jwt := jwtFunc(chunkView.FileId)
- err := retriedStreamFetchChunkData(writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
+ err := retriedStreamFetchChunkData(ctx, writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
offset += int64(chunkView.ViewSize)
remaining -= int64(chunkView.ViewSize)
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
@@ -177,25 +178,25 @@ func writeZero(w io.Writer, size int64) (err error) {
return
}
-func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
+func ReadAll(ctx context.Context, buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
- lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
- return masterClient.LookupFileId(fileId)
+ lookupFileIdFn := func(ctx context.Context, fileId string) (targetUrls []string, err error) {
+ return masterClient.LookupFileId(ctx, fileId)
}
- chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer)))
+ chunkViews := ViewFromChunks(ctx, lookupFileIdFn, chunks, 0, int64(len(buffer)))
idx := 0
for x := chunkViews.Front(); x != nil; x = x.Next {
chunkView := x.Value
- urlStrings, err := lookupFileIdFn(chunkView.FileId)
+ urlStrings, err := lookupFileIdFn(ctx, chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
}
- n, err := util_http.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
+ n, err := util_http.RetriedFetchChunkData(ctx, buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
if err != nil {
return err
}
@@ -220,9 +221,9 @@ type ChunkStreamReader struct {
var _ = io.ReadSeeker(&ChunkStreamReader{})
var _ = io.ReaderAt(&ChunkStreamReader{})
-func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
+func doNewChunkStreamReader(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
- chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+ chunkViews := ViewFromChunks(ctx, lookupFileIdFn, chunks, 0, math.MaxInt64)
var totalSize int64
for x := chunkViews.Front(); x != nil; x = x.Next {
@@ -238,20 +239,20 @@ func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, ch
}
}
-func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
+func NewChunkStreamReaderFromFiler(ctx context.Context, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
- lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
- return masterClient.LookupFileId(fileId)
+ lookupFileIdFn := func(ctx context.Context, fileId string) (targetUrl []string, err error) {
+ return masterClient.LookupFileId(ctx, fileId)
}
- return doNewChunkStreamReader(lookupFileIdFn, chunks)
+ return doNewChunkStreamReader(ctx, lookupFileIdFn, chunks)
}
func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
lookupFileIdFn := LookupFn(filerClient)
- return doNewChunkStreamReader(lookupFileIdFn, chunks)
+ return doNewChunkStreamReader(context.Background(), lookupFileIdFn, chunks)
}
func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
@@ -343,7 +344,7 @@ func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
}
func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
- urlStrings, err := c.lookupFileId(chunkView.FileId)
+ urlStrings, err := c.lookupFileId(context.Background(), chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
@@ -351,7 +352,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
- shouldRetry, err = util_http.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
+ shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {