aboutsummaryrefslogtreecommitdiff
path: root/weed/util/http/http_global_client_util.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/http/http_global_client_util.go')
-rw-r--r--weed/util/http/http_global_client_util.go108
1 files changed, 108 insertions, 0 deletions
diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go
index 3a969fdc8..a374c8a2b 100644
--- a/weed/util/http/http_global_client_util.go
+++ b/weed/util/http/http_global_client_util.go
@@ -487,6 +487,12 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri
)
}
+ // For unencrypted, non-gzipped full chunks, use direct buffer read
+ // This avoids the 64KB intermediate buffer and callback overhead
+ if cipherKey == nil && !isGzipped && isFullChunk {
+ return retriedFetchChunkDataDirect(ctx, buffer, urlStrings, string(jwt))
+ }
+
var shouldRetry bool
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
@@ -551,3 +557,105 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri
return n, err
}
+
+// retriedFetchChunkDataDirect reads chunk data directly into the buffer without
+// intermediate buffering. This reduces memory copies and improves throughput
+// for large chunk reads.
+func retriedFetchChunkDataDirect(ctx context.Context, buffer []byte, urlStrings []string, jwt string) (n int, err error) {
+ var shouldRetry bool
+
+ for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ default:
+ }
+
+ for _, urlString := range urlStrings {
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ default:
+ }
+
+ n, shouldRetry, err = readUrlDirectToBuffer(ctx, urlString+"?readDeleted=true", jwt, buffer)
+ if err == nil {
+ return n, nil
+ }
+ if !shouldRetry {
+ break
+ }
+ glog.V(0).InfofCtx(ctx, "read %s failed, err: %v", urlString, err)
+ }
+
+ if err != nil && shouldRetry {
+ glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime)
+ timer := time.NewTimer(waitTime)
+ select {
+ case <-ctx.Done():
+ timer.Stop()
+ return 0, ctx.Err()
+ case <-timer.C:
+ }
+ } else {
+ break
+ }
+ }
+
+ return n, err
+}
+
+// readUrlDirectToBuffer reads HTTP response directly into the provided buffer,
+// avoiding intermediate buffer allocations and copies.
+func readUrlDirectToBuffer(ctx context.Context, fileUrl, jwt string, buffer []byte) (n int, retryable bool, err error) {
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, fileUrl, nil)
+ if err != nil {
+ return 0, false, err
+ }
+ maybeAddAuth(req, jwt)
+ request_id.InjectToRequest(ctx, req)
+
+ r, err := GetGlobalHttpClient().Do(req)
+ if err != nil {
+ return 0, true, err
+ }
+ defer CloseResponse(r)
+
+ if r.StatusCode >= 400 {
+ if r.StatusCode == http.StatusNotFound {
+ return 0, true, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrNotFound)
+ }
+ if r.StatusCode == http.StatusTooManyRequests {
+ return 0, false, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrTooManyRequests)
+ }
+ retryable = r.StatusCode >= 499
+ return 0, retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
+ }
+
+ // Read directly into the buffer without intermediate copying
+ // This is significantly faster for large chunks (16MB+)
+ var totalRead int
+ for totalRead < len(buffer) {
+ select {
+ case <-ctx.Done():
+ return totalRead, false, ctx.Err()
+ default:
+ }
+
+ m, readErr := r.Body.Read(buffer[totalRead:])
+ totalRead += m
+ if readErr != nil {
+ if readErr == io.EOF {
+ // Return io.ErrUnexpectedEOF if we haven't filled the buffer
+ // This prevents silent data corruption from truncated responses
+ if totalRead < len(buffer) {
+ return totalRead, true, io.ErrUnexpectedEOF
+ }
+ return totalRead, false, nil
+ }
+ return totalRead, true, readErr
+ }
+ }
+
+ return totalRead, false, nil
+}