diff options
| author | Aleksey Kosov <rusyak777@list.ru> | 2025-05-28 21:34:02 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-05-28 11:34:02 -0700 |
| commit | 283d9e0079d5deb57aefe9a7b30e8b9869ba8685 (patch) | |
| tree | 87b09bebed2ee4afc9c2a4f711ac8598fe2949b7 /weed/filer/stream.go | |
| parent | 62aaaa18f3ea8b7600d28934580dc220ca95164a (diff) | |
| download | seaweedfs-283d9e0079d5deb57aefe9a7b30e8b9869ba8685.tar.xz seaweedfs-283d9e0079d5deb57aefe9a7b30e8b9869ba8685.zip | |
Add context with request (#6824)
Diffstat (limited to 'weed/filer/stream.go')
| -rw-r--r-- | weed/filer/stream.go | 41 |
1 files changed, 21 insertions, 20 deletions
diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 2f55e3e44..bca9c7f6e 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -2,6 +2,7 @@ package filer import ( "bytes" + "context" "fmt" "io" "math" @@ -71,7 +72,7 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R type DoStreamContent func(writer io.Writer) error func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) { - return PrepareStreamContentWithThrottler(masterClient, jwtFunc, chunks, offset, size, 0) + return PrepareStreamContentWithThrottler(context.Background(), masterClient, jwtFunc, chunks, offset, size, 0) } type VolumeServerJwtFunction func(fileId string) string @@ -80,9 +81,9 @@ func noJwtFunc(string) string { return "" } -func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) { +func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) { glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks)) - chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) + chunkViews := ViewFromChunks(ctx, masterClient.GetLookupFileIdFunction(), chunks, offset, size) fileId2Url := make(map[string][]string) @@ -91,7 +92,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc var urlStrings []string var err error for _, backoff := range getLookupFileIdBackoffSchedule { - urlStrings, err = masterClient.GetLookupFileIdFunction()(chunkView.FileId) + urlStrings, err = masterClient.GetLookupFileIdFunction()(ctx, chunkView.FileId) if err == nil && len(urlStrings) > 0 { break } @@ -127,7 +128,7 @@ func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunc urlStrings := fileId2Url[chunkView.FileId] start := time.Now() jwt := jwtFunc(chunkView.FileId) - err := retriedStreamFetchChunkData(writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) + err := retriedStreamFetchChunkData(ctx, writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) offset += int64(chunkView.ViewSize) remaining -= int64(chunkView.ViewSize) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) @@ -177,25 +178,25 @@ func writeZero(w io.Writer, size int64) (err error) { return } -func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error { +func ReadAll(ctx context.Context, buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error { - lookupFileIdFn := func(fileId string) (targetUrls []string, err error) { - return masterClient.LookupFileId(fileId) + lookupFileIdFn := func(ctx context.Context, fileId string) (targetUrls []string, err error) { + return masterClient.LookupFileId(ctx, fileId) } - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer))) + chunkViews := ViewFromChunks(ctx, lookupFileIdFn, chunks, 0, int64(len(buffer))) idx := 0 for x := chunkViews.Front(); x != nil; x = x.Next { chunkView := x.Value - urlStrings, err := lookupFileIdFn(chunkView.FileId) + urlStrings, err := lookupFileIdFn(ctx, chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err } - n, err := util_http.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk) + n, err := util_http.RetriedFetchChunkData(ctx, buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk) if err != nil { return err } @@ -220,9 +221,9 @@ type ChunkStreamReader struct { var _ = io.ReadSeeker(&ChunkStreamReader{}) var _ = io.ReaderAt(&ChunkStreamReader{}) -func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { +func doNewChunkStreamReader(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + chunkViews := ViewFromChunks(ctx, lookupFileIdFn, chunks, 0, math.MaxInt64) var totalSize int64 for x := chunkViews.Front(); x != nil; x = x.Next { @@ -238,20 +239,20 @@ func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, ch } } -func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { +func NewChunkStreamReaderFromFiler(ctx context.Context, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { - lookupFileIdFn := func(fileId string) (targetUrl []string, err error) { - return masterClient.LookupFileId(fileId) + lookupFileIdFn := func(ctx context.Context, fileId string) (targetUrl []string, err error) { + return masterClient.LookupFileId(ctx, fileId) } - return doNewChunkStreamReader(lookupFileIdFn, chunks) + return doNewChunkStreamReader(ctx, lookupFileIdFn, chunks) } func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { lookupFileIdFn := LookupFn(filerClient) - return doNewChunkStreamReader(lookupFileIdFn, chunks) + return doNewChunkStreamReader(context.Background(), lookupFileIdFn, chunks) } func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) { @@ -343,7 +344,7 @@ func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) { } func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { - urlStrings, err := c.lookupFileId(chunkView.FileId) + urlStrings, err := c.lookupFileId(context.Background(), chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err @@ -351,7 +352,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { var buffer bytes.Buffer var shouldRetry bool for _, urlString := range urlStrings { - shouldRetry, err = util_http.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) { + shouldRetry, err = util_http.ReadUrlAsStream(context.Background(), urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize), func(data []byte) { buffer.Write(data) }) if !shouldRetry { |
