aboutsummaryrefslogtreecommitdiff
path: root/weed/util/http/http_global_client_util.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/http/http_global_client_util.go')
-rw-r--r--weed/util/http/http_global_client_util.go20
1 files changed, 11 insertions, 9 deletions
diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go
index 33d978d9e..3cc819a47 100644
--- a/weed/util/http/http_global_client_util.go
+++ b/weed/util/http/http_global_client_util.go
@@ -2,6 +2,7 @@ package http
import (
"compress/gzip"
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -214,11 +215,11 @@ func NormalizeUrl(url string) (string, error) {
return GetGlobalHttpClient().NormalizeHttpScheme(url)
}
-func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
+func ReadUrl(ctx context.Context, fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
if cipherKey != nil {
var n int
- _, err := readEncryptedUrl(fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
+ _, err := readEncryptedUrl(ctx, fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
n = copy(buf, data)
})
return int64(n), err
@@ -286,13 +287,13 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
return n, err
}
-func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
- return ReadUrlAsStreamAuthenticated(fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
+func ReadUrlAsStream(ctx context.Context, fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
+ return ReadUrlAsStreamAuthenticated(ctx, fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
}
-func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
+func ReadUrlAsStreamAuthenticated(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
if cipherKey != nil {
- return readEncryptedUrl(fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
+ return readEncryptedUrl(ctx, fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
}
req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
@@ -306,6 +307,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte
} else {
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
}
+ util.ReqWithRequestId(req, ctx)
r, err := GetGlobalHttpClient().Do(req)
if err != nil {
@@ -351,7 +353,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte
}
-func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
+func readEncryptedUrl(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt)
if err != nil {
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
@@ -447,7 +449,7 @@ func (r *CountingReader) Read(p []byte) (n int, err error) {
return n, err
}
-func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
+func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
var shouldRetry bool
@@ -457,7 +459,7 @@ func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte,
if strings.Contains(urlString, "%") {
urlString = url.PathEscape(urlString)
}
- shouldRetry, err = ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
+ shouldRetry, err = ReadUrlAsStream(ctx, urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
if n < len(buffer) {
x := copy(buffer[n:], data)
n += x