aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/http/http_global_client_util.go20
-rw-r--r--weed/util/request_id.go9
2 files changed, 19 insertions, 10 deletions
diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go
index 33d978d9e..3cc819a47 100644
--- a/weed/util/http/http_global_client_util.go
+++ b/weed/util/http/http_global_client_util.go
@@ -2,6 +2,7 @@ package http
import (
"compress/gzip"
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -214,11 +215,11 @@ func NormalizeUrl(url string) (string, error) {
return GetGlobalHttpClient().NormalizeHttpScheme(url)
}
-func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
+func ReadUrl(ctx context.Context, fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
if cipherKey != nil {
var n int
- _, err := readEncryptedUrl(fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
+ _, err := readEncryptedUrl(ctx, fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
n = copy(buf, data)
})
return int64(n), err
@@ -286,13 +287,13 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
return n, err
}
-func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
- return ReadUrlAsStreamAuthenticated(fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
+func ReadUrlAsStream(ctx context.Context, fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
+ return ReadUrlAsStreamAuthenticated(ctx, fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
}
-func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
+func ReadUrlAsStreamAuthenticated(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
if cipherKey != nil {
- return readEncryptedUrl(fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
+ return readEncryptedUrl(ctx, fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
}
req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
@@ -306,6 +307,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte
} else {
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
}
+ util.ReqWithRequestId(req, ctx)
r, err := GetGlobalHttpClient().Do(req)
if err != nil {
@@ -351,7 +353,7 @@ func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isConte
}
-func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
+func readEncryptedUrl(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt)
if err != nil {
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
@@ -447,7 +449,7 @@ func (r *CountingReader) Read(p []byte) (n int, err error) {
return n, err
}
-func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
+func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
var shouldRetry bool
@@ -457,7 +459,7 @@ func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte,
if strings.Contains(urlString, "%") {
urlString = url.PathEscape(urlString)
}
- shouldRetry, err = ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
+ shouldRetry, err = ReadUrlAsStream(ctx, urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
if n < len(buffer) {
x := copy(buffer[n:], data)
n += x
diff --git a/weed/util/request_id.go b/weed/util/request_id.go
index 85ec254dc..7945e7cc4 100644
--- a/weed/util/request_id.go
+++ b/weed/util/request_id.go
@@ -1,6 +1,9 @@
package util
-import "context"
+import (
+ "context"
+ "net/http"
+)
const (
RequestIdHttpHeader = "X-Request-ID"
@@ -18,3 +21,7 @@ func GetRequestID(ctx context.Context) string {
func WithRequestID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, RequestIDKey, id)
}
+
+func ReqWithRequestId(req *http.Request, ctx context.Context) {
+ req.Header.Set(RequestIdHttpHeader, GetRequestID(ctx))
+}