diff options
Diffstat (limited to 'weed/util/http/http_global_client_util.go')
| -rw-r--r-- | weed/util/http/http_global_client_util.go | 20 |
1 files changed, 11 insertions, 9 deletions
diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go index 33d978d9e..3cc819a47 100644 --- a/weed/util/http/http_global_client_util.go +++ b/weed/util/http/http_global_client_util.go @@ -2,6 +2,7 @@ package http import ( "compress/gzip" + "context" "encoding/json" "errors" "fmt" @@ -214,11 +215,11 @@ func NormalizeUrl(url string) (string, error) { return GetGlobalHttpClient().NormalizeHttpScheme(url) } -func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) { +func ReadUrl(ctx context.Context, fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) { if cipherKey != nil { var n int - _, err := readEncryptedUrl(fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { + _, err := readEncryptedUrl(ctx, fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { n = copy(buf, data) }) return int64(n), err @@ -286,13 +287,13 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC return n, err } -func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { - return ReadUrlAsStreamAuthenticated(fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn) +func ReadUrlAsStream(ctx context.Context, fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { + return ReadUrlAsStreamAuthenticated(ctx, fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn) } -func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { +func ReadUrlAsStreamAuthenticated(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { if cipherKey != nil { - return readEncryptedUrl(fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) + return readEncryptedUrl(ctx, fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) } req, err := http.NewRequest(http.MethodGet, fileUrl, nil) @@ -306,6 +307,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte } else { req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) } + util.ReqWithRequestId(req, ctx) r, err := GetGlobalHttpClient().Do(req) if err != nil { @@ -351,7 +353,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte } -func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) { +func readEncryptedUrl(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) { encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt) if err != nil { return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err) @@ -447,7 +449,7 @@ func (r *CountingReader) Read(p []byte) (n int, err error) { return n, err } -func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) { +func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) { var shouldRetry bool @@ -457,7 +459,7 @@ func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, if strings.Contains(urlString, "%") { urlString = url.PathEscape(urlString) } - shouldRetry, err = ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { + shouldRetry, err = ReadUrlAsStream(ctx, urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { if n < len(buffer) { x := copy(buffer[n:], data) n += x |
