diff options
Diffstat (limited to 'weed/filer/filechunk_manifest.go')
| -rw-r--r-- | weed/filer/filechunk_manifest.go | 25 |
1 files changed, 13 insertions, 12 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 36096d2c1..00e9a73a1 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -2,6 +2,7 @@ package filer import ( "bytes" + "context" "fmt" "io" "math" @@ -48,7 +49,7 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa return } -func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) { +func ResolveChunkManifest(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) { // TODO maybe parallel this for _, chunk := range chunks { @@ -61,14 +62,14 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun continue } - resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk) + resolvedChunks, err := ResolveOneChunkManifest(ctx, lookupFileIdFn, chunk) if err != nil { return dataChunks, nil, err } manifestChunks = append(manifestChunks, chunk) // recursive - subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset) + subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(ctx, lookupFileIdFn, resolvedChunks, startOffset, stopOffset) if subErr != nil { return dataChunks, nil, subErr } @@ -78,7 +79,7 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun return } -func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) { +func ResolveOneChunkManifest(ctx context.Context, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) { if !chunk.IsChunkManifest { return } @@ -87,7 +88,7 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer) bytesBuffer.Reset() defer bytesBufferPool.Put(bytesBuffer) - err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) + err := fetchWholeChunk(ctx, bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) if err != nil { return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err) } @@ -102,13 +103,13 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c } // TODO fetch from cache for weed mount? -func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error { - urlStrings, err := lookupFileIdFn(fileId) +func fetchWholeChunk(ctx context.Context, bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error { + urlStrings, err := lookupFileIdFn(ctx, fileId) if err != nil { glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) return err } - err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0) + err = retriedStreamFetchChunkData(ctx, bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0) if err != nil { return err } @@ -116,15 +117,15 @@ func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFi } func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) { - urlStrings, err := lookupFileIdFn(fileId) + urlStrings, err := lookupFileIdFn(context.Background(), fileId) if err != nil { glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) return 0, err } - return util_http.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset) + return util_http.RetriedFetchChunkData(context.Background(), buffer, urlStrings, cipherKey, isGzipped, false, offset) } -func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { +func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { var shouldRetry bool var totalWritten int @@ -135,7 +136,7 @@ func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt stri retriedCnt++ var localProcessed int var writeErr error - shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + shouldRetry, err = util_http.ReadUrlAsStreamAuthenticated(ctx, urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { if totalWritten > localProcessed { toBeSkipped := totalWritten - localProcessed if len(data) <= toBeSkipped { |
