diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers.go | 2605 |
1 files changed, 2098 insertions, 507 deletions
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 98d0ffede..ce2772981 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -2,12 +2,17 @@ package s3api import ( "bytes" + "context" "encoding/base64" + "encoding/json" "errors" "fmt" "io" + "math" + "mime" "net/http" "net/url" + "path/filepath" "sort" "strconv" "strings" @@ -15,13 +20,15 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/glog" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) // corsHeaders defines the CORS headers that need to be preserved @@ -35,6 +42,113 @@ var corsHeaders = []string{ "Access-Control-Allow-Credentials", } +// zeroBuf is a reusable buffer of zero bytes for padding operations +// Package-level to avoid per-call allocations in writeZeroBytes +var zeroBuf = make([]byte, 32*1024) + +// adjustRangeForPart adjusts a client's Range header to absolute offsets within a part. +// Parameters: +// - partStartOffset: the absolute start offset of the part in the object +// - partEndOffset: the absolute end offset of the part in the object +// - clientRangeHeader: the Range header value from the client (e.g., "bytes=0-99") +// +// Returns: +// - adjustedStart: the adjusted absolute start offset +// - adjustedEnd: the adjusted absolute end offset +// - error: nil on success, error if the range is invalid +func adjustRangeForPart(partStartOffset, partEndOffset int64, clientRangeHeader string) (adjustedStart, adjustedEnd int64, err error) { + // If no range header, return the full part + if clientRangeHeader == "" || !strings.HasPrefix(clientRangeHeader, "bytes=") { + return partStartOffset, partEndOffset, nil + } + + // Parse client's range request (relative to the part) + rangeSpec := clientRangeHeader[6:] // Remove "bytes=" prefix + parts := strings.Split(rangeSpec, "-") + + if len(parts) != 2 { + return 0, 0, fmt.Errorf("invalid range format") + } + + partSize := partEndOffset - partStartOffset + 1 + var clientStart, clientEnd int64 + + // Parse start offset + if parts[0] != "" { + clientStart, err = strconv.ParseInt(parts[0], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("invalid range start: %w", err) + } + } + + // Parse end offset + if parts[1] != "" { + clientEnd, err = strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("invalid range end: %w", err) + } + } else { + // No end specified, read to end of part + clientEnd = partSize - 1 + } + + // Handle suffix-range (e.g., "bytes=-100" means last 100 bytes) + if parts[0] == "" { + // suffix-range: clientEnd is actually the suffix length + suffixLength := clientEnd + if suffixLength > partSize { + suffixLength = partSize + } + clientStart = partSize - suffixLength + clientEnd = partSize - 1 + } + + // Validate range is within part boundaries + if clientStart < 0 || clientStart >= partSize { + return 0, 0, fmt.Errorf("range start %d out of bounds for part size %d", clientStart, partSize) + } + if clientEnd >= partSize { + clientEnd = partSize - 1 + } + if clientStart > clientEnd { + return 0, 0, fmt.Errorf("range start %d > end %d", clientStart, clientEnd) + } + + // Adjust to absolute offsets in the object + adjustedStart = partStartOffset + clientStart + adjustedEnd = partStartOffset + clientEnd + + return adjustedStart, adjustedEnd, nil +} + +// StreamError is returned when streaming functions encounter errors. +// It tracks whether an HTTP response has already been written to prevent +// double WriteHeader calls that would create malformed S3 error responses. +type StreamError struct { + // Err is the underlying error + Err error + // ResponseWritten indicates if HTTP headers/status have been written to ResponseWriter + ResponseWritten bool +} + +func (e *StreamError) Error() string { + return e.Err.Error() +} + +func (e *StreamError) Unwrap() error { + return e.Err +} + +// newStreamError creates a StreamError for cases where response hasn't been written yet +func newStreamError(err error) *StreamError { + return &StreamError{Err: err, ResponseWritten: false} +} + +// newStreamErrorWithResponse creates a StreamError for cases where response was already written +func newStreamErrorWithResponse(err error) *StreamError { + return &StreamError{Err: err, ResponseWritten: true} +} + func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser { mimeBuffer := make([]byte, 512) size, _ := dataReader.Read(mimeBuffer) @@ -88,6 +202,62 @@ func removeDuplicateSlashes(object string) string { return result.String() } +// hasChildren checks if a path has any child objects (is a directory with contents) +// +// This helper function is used to distinguish implicit directories from regular files or empty directories. +// An implicit directory is one that exists only because it has children, not because it was explicitly created. +// +// Implementation: +// - Lists the directory with Limit=1 to check for at least one child +// - Returns true if any child exists, false otherwise +// - Efficient: only fetches one entry to minimize overhead +// +// Used by HeadObjectHandler to implement AWS S3-compatible implicit directory behavior: +// - If a 0-byte object or directory has children → it's an implicit directory → HEAD returns 404 +// - If a 0-byte object or directory has no children → it's empty → HEAD returns 200 +// +// Examples: +// +// hasChildren("bucket", "dataset") where "dataset/file.txt" exists → true +// hasChildren("bucket", "empty-dir") where no children exist → false +// +// Performance: ~1-5ms per call (one gRPC LIST request with Limit=1) +func (s3a *S3ApiServer) hasChildren(bucket, prefix string) bool { + // Clean up prefix: remove leading slashes + cleanPrefix := strings.TrimPrefix(prefix, "/") + + // The directory to list is bucketDir + cleanPrefix + bucketDir := s3a.option.BucketsPath + "/" + bucket + fullPath := bucketDir + "/" + cleanPrefix + + // Try to list one child object in the directory + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + request := &filer_pb.ListEntriesRequest{ + Directory: fullPath, + Limit: 1, + InclusiveStartFrom: true, + } + + stream, err := client.ListEntries(context.Background(), request) + if err != nil { + return err + } + + // Check if we got at least one entry + _, err = stream.Recv() + if err == io.EOF { + return io.EOF // No children + } + if err != nil { + return err + } + return nil + }) + + // If we got an entry (not EOF), then it has children + return err == nil +} + // checkDirectoryObject checks if the object is a directory object (ends with "/") and if it exists // Returns: (entry, isDirectoryObject, error) // - entry: the directory entry if found and is a directory @@ -123,6 +293,13 @@ func (s3a *S3ApiServer) checkDirectoryObject(bucket, object string) (*filer_pb.E // serveDirectoryContent serves the content of a directory object directly func (s3a *S3ApiServer) serveDirectoryContent(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry) { + // Defensive nil checks - entry and attributes should never be nil, but guard against it + if entry == nil || entry.Attributes == nil { + glog.Errorf("serveDirectoryContent: entry or attributes is nil") + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + // Set content type - use stored MIME type or default contentType := entry.Attributes.Mime if contentType == "" { @@ -272,13 +449,29 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) bucket, object := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("GetObjectHandler %s %s", bucket, object) + // TTFB Profiling: Track all stages until first byte + tStart := time.Now() + var ( + conditionalHeadersTime time.Duration + versioningCheckTime time.Duration + entryFetchTime time.Duration + streamTime time.Duration + ) + defer func() { + totalTime := time.Since(tStart) + glog.V(2).Infof("GET TTFB PROFILE %s/%s: total=%v | conditional=%v, versioning=%v, entryFetch=%v, stream=%v", + bucket, object, totalTime, conditionalHeadersTime, versioningCheckTime, entryFetchTime, streamTime) + }() + // Handle directory objects with shared logic if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "GetObjectHandler") { return // Directory object request was handled } // Check conditional headers and handle early return if conditions fail + tConditional := time.Now() result, handled := s3a.processConditionalHeaders(w, r, bucket, object, "GetObjectHandler") + conditionalHeadersTime = time.Since(tConditional) if handled { return } @@ -287,13 +480,13 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) versionId := r.URL.Query().Get("versionId") var ( - destUrl string entry *filer_pb.Entry // Declare entry at function scope for SSE processing versioningConfigured bool err error ) // Check if versioning is configured for the bucket (Enabled or Suspended) + tVersioning := time.Now() // Note: We need to check this even if versionId is empty, because versioned buckets // handle even "get latest version" requests differently (through .versions directory) versioningConfigured, err = s3a.isVersioningConfigured(bucket) @@ -306,15 +499,15 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId) + glog.V(3).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId) if versioningConfigured { - // Handle versioned GET - all versions are stored in .versions directory + // Handle versioned GET - check if specific version requested var targetVersionId string if versionId != "" { - // Request for specific version - glog.V(2).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object) + // Request for specific version - must look in .versions directory + glog.V(3).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object) entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) if err != nil { glog.Errorf("Failed to get specific version %s: %v", versionId, err) @@ -323,22 +516,61 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) } targetVersionId = versionId } else { - // Request for latest version - glog.V(1).Infof("GetObject: requesting latest version for %s%s", bucket, object) - entry, err = s3a.getLatestObjectVersion(bucket, object) - if err != nil { - glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } - if entry.Extended != nil { - if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { - targetVersionId = string(versionIdBytes) + // Request for latest version - OPTIMIZATION: + // Check if .versions/ directory exists quickly (no retries) to decide path + // - If .versions/ exists: real versions available, use getLatestObjectVersion + // - If .versions/ doesn't exist (ErrNotFound): only null version at regular path, use it directly + // - If transient error: fall back to getLatestObjectVersion which has retry logic + bucketDir := s3a.option.BucketsPath + "/" + bucket + normalizedObject := removeDuplicateSlashes(object) + versionsDir := normalizedObject + s3_constants.VersionsFolder + + // Quick check (no retries) for .versions/ directory + versionsEntry, versionsErr := s3a.getEntry(bucketDir, versionsDir) + + if versionsErr == nil && versionsEntry != nil { + // .versions/ exists, meaning real versions are stored there + // Use getLatestObjectVersion which will properly find the newest version + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } else if errors.Is(versionsErr, filer_pb.ErrNotFound) { + // .versions/ doesn't exist (confirmed not found), check regular path for null version + regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject) + if regularErr == nil && regularEntry != nil { + // Found object at regular path - this is the null version + entry = regularEntry + targetVersionId = "null" + } else { + // No object at regular path either - object doesn't exist + glog.Errorf("GetObject: object not found at regular path or .versions for %s%s", bucket, object) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } else { + // Transient error checking .versions/, fall back to getLatestObjectVersion with retries + glog.V(2).Infof("GetObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr) + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return } } - // If no version ID found in entry, this is a pre-versioning object + // Extract version ID if not already set if targetVersionId == "" { - targetVersionId = "null" + if entry.Extended != nil { + if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { + targetVersionId = string(versionIdBytes) + } + } + // If no version ID found in entry, this is a pre-versioning object + if targetVersionId == "" { + targetVersionId = "null" + } } } @@ -350,16 +582,11 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) } } - // Determine the actual file path based on whether this is a versioned or pre-versioning object + // For versioned objects, log the target version if targetVersionId == "null" { - // Pre-versioning object - stored as regular file - destUrl = s3a.toFilerUrl(bucket, object) - glog.V(2).Infof("GetObject: pre-versioning object URL: %s", destUrl) + glog.V(2).Infof("GetObject: pre-versioning object %s/%s", bucket, object) } else { - // Versioned object - stored in .versions directory - versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId) - destUrl = s3a.toFilerUrl(bucket, versionObjectPath) - glog.V(2).Infof("GetObject: version %s URL: %s", targetVersionId, destUrl) + glog.V(2).Infof("GetObject: version %s for %s/%s", targetVersionId, bucket, object) } // Set version ID in response header @@ -367,16 +594,14 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) // Add object lock metadata to response headers if present s3a.addObjectLockHeadersToResponse(w, entry) - } else { - // Handle regular GET (non-versioned) - destUrl = s3a.toFilerUrl(bucket, object) } + versioningCheckTime = time.Since(tVersioning) + // Fetch the correct entry for SSE processing (respects versionId) // This consolidates entry lookups to avoid multiple filer calls + tEntryFetch := time.Now() var objectEntryForSSE *filer_pb.Entry - originalRangeHeader := r.Header.Get("Range") - var sseObject = false // Optimization: Reuse already-fetched entry to avoid redundant metadata fetches if versioningConfigured { @@ -397,7 +622,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) var fetchErr error objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object) if fetchErr != nil { - glog.Errorf("GetObjectHandler: failed to get entry for SSE check: %v", fetchErr) + glog.Warningf("GetObjectHandler: failed to get entry for %s/%s: %v", bucket, object, fetchErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } @@ -408,34 +633,1415 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) } } } + entryFetchTime = time.Since(tEntryFetch) + + // Check if PartNumber query parameter is present (for multipart GET requests) + partNumberStr := r.URL.Query().Get("partNumber") + if partNumberStr == "" { + partNumberStr = r.URL.Query().Get("PartNumber") + } + + // If PartNumber is specified, set headers and modify Range to read only that part + // This replicates the filer handler logic + if partNumberStr != "" { + if partNumber, parseErr := strconv.Atoi(partNumberStr); parseErr == nil && partNumber > 0 { + // Get actual parts count from metadata (not chunk count) + partsCount, partInfo := s3a.getMultipartInfo(objectEntryForSSE, partNumber) + + // Validate part number + if partNumber > partsCount { + glog.Warningf("GetObject: Invalid part number %d, object has %d parts", partNumber, partsCount) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) + return + } + + // Set parts count header + w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(partsCount)) + glog.V(3).Infof("GetObject: Set PartsCount=%d for multipart GET with PartNumber=%d", partsCount, partNumber) + + // Calculate the byte range for this part + var startOffset, endOffset int64 + if partInfo != nil { + // Use part boundaries from metadata (accurate for multi-chunk parts) + startOffset = objectEntryForSSE.Chunks[partInfo.StartChunk].Offset + lastChunk := objectEntryForSSE.Chunks[partInfo.EndChunk-1] + endOffset = lastChunk.Offset + int64(lastChunk.Size) - 1 + + // Override ETag with the part's ETag from metadata + w.Header().Set("ETag", "\""+partInfo.ETag+"\"") + glog.V(3).Infof("GetObject: Override ETag with part %d ETag: %s (from metadata)", partNumber, partInfo.ETag) + } else { + // Fallback: assume 1:1 part-to-chunk mapping (backward compatibility) + chunkIndex := partNumber - 1 + if chunkIndex >= len(objectEntryForSSE.Chunks) { + glog.Warningf("GetObject: Part %d chunk index %d out of range (chunks: %d)", partNumber, chunkIndex, len(objectEntryForSSE.Chunks)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) + return + } + partChunk := objectEntryForSSE.Chunks[chunkIndex] + startOffset = partChunk.Offset + endOffset = partChunk.Offset + int64(partChunk.Size) - 1 + + // Override ETag with chunk's ETag (fallback) + if partChunk.ETag != "" { + if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil { + partETag := fmt.Sprintf("%x", md5Bytes) + w.Header().Set("ETag", "\""+partETag+"\"") + glog.V(3).Infof("GetObject: Override ETag with part %d ETag: %s (fallback from chunk)", partNumber, partETag) + } + } + } + + // Check if client supplied a Range header - if so, apply it within the part's boundaries + // S3 allows both partNumber and Range together, where Range applies within the selected part + clientRangeHeader := r.Header.Get("Range") + if clientRangeHeader != "" { + adjustedStart, adjustedEnd, rangeErr := adjustRangeForPart(startOffset, endOffset, clientRangeHeader) + if rangeErr != nil { + glog.Warningf("GetObject: Invalid Range for part %d: %v", partNumber, rangeErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) + return + } + startOffset = adjustedStart + endOffset = adjustedEnd + glog.V(3).Infof("GetObject: Client Range %s applied to part %d, adjusted to bytes=%d-%d", clientRangeHeader, partNumber, startOffset, endOffset) + } + + // Set Range header to read the requested bytes (full part or client-specified range within part) + rangeHeader := fmt.Sprintf("bytes=%d-%d", startOffset, endOffset) + r.Header.Set("Range", rangeHeader) + glog.V(3).Infof("GetObject: Set Range header for part %d: %s", partNumber, rangeHeader) + } + } + + // NEW OPTIMIZATION: Stream directly from volume servers, bypassing filer proxy + // This eliminates the 19ms filer proxy overhead + // SSE decryption is handled inline during streaming + + // Safety check: entry must be valid before streaming + if objectEntryForSSE == nil { + glog.Errorf("GetObjectHandler: objectEntryForSSE is nil for %s/%s (should not happen)", bucket, object) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + + // Detect SSE encryption type + primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE) + + // Stream directly from volume servers with SSE support + tStream := time.Now() + err = s3a.streamFromVolumeServersWithSSE(w, r, objectEntryForSSE, primarySSEType) + streamTime = time.Since(tStream) + if err != nil { + glog.Errorf("GetObjectHandler: failed to stream from volume servers: %v", err) + // Check if the streaming function already wrote an HTTP response + var streamErr *StreamError + if errors.As(err, &streamErr) && streamErr.ResponseWritten { + // Response already written (headers + status code), don't write again + // to avoid "superfluous response.WriteHeader call" and malformed S3 error bodies + return + } + // Response not yet written - safe to write S3 error response + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } +} + +// streamFromVolumeServers streams object data directly from volume servers, bypassing filer proxy +// This eliminates the ~19ms filer proxy overhead by reading chunks directly +func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error { + // Profiling: Track overall and stage timings + t0 := time.Now() + var ( + rangeParseTime time.Duration + headerSetTime time.Duration + chunkResolveTime time.Duration + streamPrepTime time.Duration + streamExecTime time.Duration + ) + defer func() { + totalTime := time.Since(t0) + glog.V(2).Infof(" └─ streamFromVolumeServers: total=%v, rangeParse=%v, headerSet=%v, chunkResolve=%v, streamPrep=%v, streamExec=%v", + totalTime, rangeParseTime, headerSetTime, chunkResolveTime, streamPrepTime, streamExecTime) + }() + + if entry == nil { + // Early validation error: write S3-compliant XML error response + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return newStreamErrorWithResponse(fmt.Errorf("entry is nil")) + } + + // Get file size + totalSize := int64(filer.FileSize(entry)) + + // Parse Range header if present + tRangeParse := time.Now() + var offset int64 = 0 + var size int64 = totalSize + rangeHeader := r.Header.Get("Range") + isRangeRequest := false + + if rangeHeader != "" && strings.HasPrefix(rangeHeader, "bytes=") { + rangeSpec := rangeHeader[6:] + parts := strings.Split(rangeSpec, "-") + if len(parts) == 2 { + var startOffset, endOffset int64 + + // Handle different Range formats: + // 1. "bytes=0-499" - first 500 bytes (parts[0]="0", parts[1]="499") + // 2. "bytes=500-" - from byte 500 to end (parts[0]="500", parts[1]="") + // 3. "bytes=-500" - last 500 bytes (parts[0]="", parts[1]="500") + + if parts[0] == "" && parts[1] != "" { + // Suffix range: bytes=-N (last N bytes) + if suffixLen, err := strconv.ParseInt(parts[1], 10, 64); err == nil { + // RFC 7233: suffix range on empty object or zero-length suffix is unsatisfiable + if totalSize == 0 || suffixLen <= 0 { + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) + return newStreamErrorWithResponse(fmt.Errorf("invalid suffix range for empty object")) + } + if suffixLen > totalSize { + suffixLen = totalSize + } + startOffset = totalSize - suffixLen + endOffset = totalSize - 1 + } else { + // Set header BEFORE WriteHeader + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) + return newStreamErrorWithResponse(fmt.Errorf("invalid suffix range")) + } + } else { + // Regular range or open-ended range + startOffset = 0 + endOffset = totalSize - 1 + + if parts[0] != "" { + if parsed, err := strconv.ParseInt(parts[0], 10, 64); err == nil { + startOffset = parsed + } + } + if parts[1] != "" { + if parsed, err := strconv.ParseInt(parts[1], 10, 64); err == nil { + endOffset = parsed + } + } - // Check if this is an SSE object for Range request handling - // This applies to both versioned and non-versioned objects - if originalRangeHeader != "" && objectEntryForSSE != nil { - primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE) - if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS { - sseObject = true - // Temporarily remove Range header to get full encrypted data from filer - r.Header.Del("Range") + // Validate range + if startOffset < 0 || startOffset >= totalSize { + // Set header BEFORE WriteHeader + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) + return newStreamErrorWithResponse(fmt.Errorf("invalid range start")) + } + + if endOffset >= totalSize { + endOffset = totalSize - 1 + } + + if endOffset < startOffset { + // Set header BEFORE WriteHeader + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) + return newStreamErrorWithResponse(fmt.Errorf("invalid range: end before start")) + } + } + + offset = startOffset + size = endOffset - startOffset + 1 + isRangeRequest = true } } + rangeParseTime = time.Since(tRangeParse) - s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { - // Restore the original Range header for SSE processing - if sseObject && originalRangeHeader != "" { - r.Header.Set("Range", originalRangeHeader) + // For small files stored inline in entry.Content - validate BEFORE setting headers + if len(entry.Content) > 0 && totalSize == int64(len(entry.Content)) { + if isRangeRequest { + // Safely convert int64 to int for slice indexing - validate BEFORE WriteHeader + // Use MaxInt32 for portability across 32-bit and 64-bit platforms + if offset < 0 || offset > int64(math.MaxInt32) || size < 0 || size > int64(math.MaxInt32) { + // Early validation error: write S3-compliant error response + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) + return newStreamErrorWithResponse(fmt.Errorf("range too large for platform: offset=%d, size=%d", offset, size)) + } + start := int(offset) + end := start + int(size) + // Bounds check (should already be validated, but double-check) - BEFORE WriteHeader + if start < 0 || start > len(entry.Content) || end > len(entry.Content) || end < start { + // Early validation error: write S3-compliant error response + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) + return newStreamErrorWithResponse(fmt.Errorf("invalid range for inline content: start=%d, end=%d, len=%d", start, end, len(entry.Content))) + } + // Validation passed - now set headers and write + s3a.setResponseHeaders(w, entry, totalSize) + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize)) + w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) + w.WriteHeader(http.StatusPartialContent) + _, err := w.Write(entry.Content[start:end]) + return err + } + // Non-range request for inline content + s3a.setResponseHeaders(w, entry, totalSize) + w.WriteHeader(http.StatusOK) + _, err := w.Write(entry.Content) + return err + } + + // Get chunks and validate BEFORE setting headers + chunks := entry.GetChunks() + glog.V(4).Infof("streamFromVolumeServers: entry has %d chunks, totalSize=%d, isRange=%v, offset=%d, size=%d", + len(chunks), totalSize, isRangeRequest, offset, size) + + if len(chunks) == 0 { + // BUG FIX: If totalSize > 0 but no chunks and no content, this is a data integrity issue + if totalSize > 0 && len(entry.Content) == 0 { + glog.Errorf("streamFromVolumeServers: Data integrity error - entry reports size %d but has no content or chunks", totalSize) + // Write S3-compliant XML error response + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return newStreamErrorWithResponse(fmt.Errorf("data integrity error: size %d reported but no content available", totalSize)) } + // Empty object - set headers and write status + s3a.setResponseHeaders(w, entry, totalSize) + w.WriteHeader(http.StatusOK) + return nil + } - // Add SSE metadata headers based on object metadata before SSE processing - if objectEntryForSSE != nil { - s3a.addSSEHeadersToResponse(proxyResponse, objectEntryForSSE) + // Log chunk details (verbose only - high frequency) + if glog.V(4) { + for i, chunk := range chunks { + glog.Infof(" GET Chunk[%d]: fid=%s, offset=%d, size=%d", i, chunk.GetFileIdString(), chunk.Offset, chunk.Size) } + } - // Handle SSE decryption (both SSE-C and SSE-KMS) if needed - return s3a.handleSSEResponse(r, proxyResponse, w, objectEntryForSSE) - }) + // CRITICAL: Resolve chunks and prepare stream BEFORE WriteHeader + // This ensures we can write proper error responses if these operations fail + ctx := r.Context() + lookupFileIdFn := s3a.createLookupFileIdFunction() + + // Resolve chunk manifests with the requested range + tChunkResolve := time.Now() + resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, offset, offset+size) + chunkResolveTime = time.Since(tChunkResolve) + if err != nil { + glog.Errorf("streamFromVolumeServers: failed to resolve chunks: %v", err) + // Write S3-compliant XML error response + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return newStreamErrorWithResponse(fmt.Errorf("failed to resolve chunks: %v", err)) + } + + // Prepare streaming function with simple master client wrapper + tStreamPrep := time.Now() + masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn} + streamFn, err := filer.PrepareStreamContentWithThrottler( + ctx, + masterClient, + func(fileId string) string { + // Use volume server JWT (not filer JWT) for direct volume reads + return string(security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, fileId)) + }, + resolvedChunks, + offset, + size, + 0, // no throttling + ) + streamPrepTime = time.Since(tStreamPrep) + if err != nil { + glog.Errorf("streamFromVolumeServers: failed to prepare stream: %v", err) + // Write S3-compliant XML error response + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return newStreamErrorWithResponse(fmt.Errorf("failed to prepare stream: %v", err)) + } + + // All validation and preparation successful - NOW set headers and write status + tHeaderSet := time.Now() + s3a.setResponseHeaders(w, entry, totalSize) + + // Override/add range-specific headers if this is a range request + if isRangeRequest { + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize)) + w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) + } + headerSetTime = time.Since(tHeaderSet) + + // Now write status code (headers are all set, stream is ready) + if isRangeRequest { + w.WriteHeader(http.StatusPartialContent) + } else { + w.WriteHeader(http.StatusOK) + } + + // Stream directly to response + tStreamExec := time.Now() + glog.V(4).Infof("streamFromVolumeServers: starting streamFn, offset=%d, size=%d", offset, size) + err = streamFn(w) + streamExecTime = time.Since(tStreamExec) + if err != nil { + glog.Errorf("streamFromVolumeServers: streamFn failed: %v", err) + // Streaming error after WriteHeader was called - response already partially written + return newStreamErrorWithResponse(err) + } + glog.V(4).Infof("streamFromVolumeServers: streamFn completed successfully") + return nil +} + +// Shared HTTP client for volume server requests (connection pooling) +var volumeServerHTTPClient = &http.Client{ + Timeout: 5 * time.Minute, + Transport: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 90 * time.Second, + }, +} + +// createLookupFileIdFunction creates a reusable lookup function for resolving volume URLs +func (s3a *S3ApiServer) createLookupFileIdFunction() func(context.Context, string) ([]string, error) { + return func(ctx context.Context, fileId string) ([]string, error) { + var urls []string + err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + vid := filer.VolumeId(fileId) + resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + if locs, found := resp.LocationsMap[vid]; found { + for _, loc := range locs.Locations { + // Build complete URL with volume server address and fileId + // The fileId parameter contains the full "volumeId,fileKey" identifier (e.g., "3,01637037d6") + // This constructs URLs like: http://127.0.0.1:8080/3,01637037d6 (or https:// if configured) + // NormalizeUrl ensures the proper scheme (http:// or https://) is used based on configuration + normalizedUrl, err := util_http.NormalizeUrl(loc.Url) + if err != nil { + glog.Warningf("Failed to normalize URL for %s: %v", loc.Url, err) + continue + } + urls = append(urls, normalizedUrl+"/"+fileId) + } + } + return nil + }) + glog.V(3).Infof("createLookupFileIdFunction: fileId=%s, resolved urls=%v", fileId, urls) + return urls, err + } +} + +// streamFromVolumeServersWithSSE handles streaming with inline SSE decryption +func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error { + // If not encrypted, use fast path without decryption + if sseType == "" || sseType == "None" { + return s3a.streamFromVolumeServers(w, r, entry, sseType) + } + + // Profiling: Track SSE decryption stages + t0 := time.Now() + var ( + rangeParseTime time.Duration + keyValidateTime time.Duration + headerSetTime time.Duration + streamFetchTime time.Duration + decryptSetupTime time.Duration + copyTime time.Duration + ) + defer func() { + totalTime := time.Since(t0) + glog.V(2).Infof(" └─ streamFromVolumeServersWithSSE (%s): total=%v, rangeParse=%v, keyValidate=%v, headerSet=%v, streamFetch=%v, decryptSetup=%v, copy=%v", + sseType, totalTime, rangeParseTime, keyValidateTime, headerSetTime, streamFetchTime, decryptSetupTime, copyTime) + }() + + glog.V(2).Infof("streamFromVolumeServersWithSSE: Handling %s encrypted object with inline decryption", sseType) + + // Parse Range header BEFORE key validation + totalSize := int64(filer.FileSize(entry)) + tRangeParse := time.Now() + var offset int64 = 0 + var size int64 = totalSize + rangeHeader := r.Header.Get("Range") + isRangeRequest := false + + if rangeHeader != "" && strings.HasPrefix(rangeHeader, "bytes=") { + rangeSpec := rangeHeader[6:] + parts := strings.Split(rangeSpec, "-") + if len(parts) == 2 { + var startOffset, endOffset int64 + + if parts[0] == "" && parts[1] != "" { + // Suffix range: bytes=-N (last N bytes) + if suffixLen, err := strconv.ParseInt(parts[1], 10, 64); err == nil { + // RFC 7233: suffix range on empty object or zero-length suffix is unsatisfiable + if totalSize == 0 || suffixLen <= 0 { + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) + return newStreamErrorWithResponse(fmt.Errorf("invalid suffix range for empty object")) + } + if suffixLen > totalSize { + suffixLen = totalSize + } + startOffset = totalSize - suffixLen + endOffset = totalSize - 1 + } else { + // Set header BEFORE WriteHeader + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) + return newStreamErrorWithResponse(fmt.Errorf("invalid suffix range")) + } + } else { + // Regular range or open-ended range + startOffset = 0 + endOffset = totalSize - 1 + + if parts[0] != "" { + if parsed, err := strconv.ParseInt(parts[0], 10, 64); err == nil { + startOffset = parsed + } + } + if parts[1] != "" { + if parsed, err := strconv.ParseInt(parts[1], 10, 64); err == nil { + endOffset = parsed + } + } + + // Validate range + if startOffset < 0 || startOffset >= totalSize { + // Set header BEFORE WriteHeader + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) + return newStreamErrorWithResponse(fmt.Errorf("invalid range start")) + } + + if endOffset >= totalSize { + endOffset = totalSize - 1 + } + + if endOffset < startOffset { + // Set header BEFORE WriteHeader + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) + return newStreamErrorWithResponse(fmt.Errorf("invalid range: end before start")) + } + } + + offset = startOffset + size = endOffset - startOffset + 1 + isRangeRequest = true + glog.V(2).Infof("streamFromVolumeServersWithSSE: Range request bytes %d-%d/%d (size=%d)", startOffset, endOffset, totalSize, size) + } + } + rangeParseTime = time.Since(tRangeParse) + + // Validate SSE keys BEFORE streaming + tKeyValidate := time.Now() + var decryptionKey interface{} + switch sseType { + case s3_constants.SSETypeC: + customerKey, err := ParseSSECHeaders(r) + if err != nil { + s3err.WriteErrorResponse(w, r, MapSSECErrorToS3Error(err)) + return newStreamErrorWithResponse(err) + } + if customerKey == nil { + s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) + return newStreamErrorWithResponse(fmt.Errorf("SSE-C key required")) + } + // Validate key MD5 + if entry.Extended != nil { + storedKeyMD5 := string(entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]) + if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 { + s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) + return newStreamErrorWithResponse(fmt.Errorf("SSE-C key mismatch")) + } + } + decryptionKey = customerKey + case s3_constants.SSETypeKMS: + // Extract KMS key from metadata (stored as raw bytes, matching filer behavior) + if entry.Extended == nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return newStreamErrorWithResponse(fmt.Errorf("no SSE-KMS metadata")) + } + kmsMetadataBytes := entry.Extended[s3_constants.SeaweedFSSSEKMSKey] + sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return newStreamErrorWithResponse(err) + } + decryptionKey = sseKMSKey + case s3_constants.SSETypeS3: + // Extract S3 key from metadata (stored as raw bytes, matching filer behavior) + if entry.Extended == nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return newStreamErrorWithResponse(fmt.Errorf("no SSE-S3 metadata")) + } + keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key] + keyManager := GetSSES3KeyManager() + sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) + if err != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return newStreamErrorWithResponse(err) + } + decryptionKey = sseS3Key + } + keyValidateTime = time.Since(tKeyValidate) + + // Set response headers + // IMPORTANT: Set ALL headers BEFORE calling WriteHeader (headers are ignored after WriteHeader) + tHeaderSet := time.Now() + s3a.setResponseHeaders(w, entry, totalSize) + s3a.addSSEResponseHeadersFromEntry(w, r, entry, sseType) + + // Override/add range-specific headers if this is a range request + if isRangeRequest { + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize)) + w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) + } + headerSetTime = time.Since(tHeaderSet) + + // Now write status code (headers are all set) + if isRangeRequest { + w.WriteHeader(http.StatusPartialContent) + } + + // Full Range Optimization: Use ViewFromChunks to only fetch/decrypt needed chunks + tDecryptSetup := time.Now() + + // Use range-aware chunk resolution (like filer does) + if isRangeRequest { + glog.V(2).Infof("Using range-aware SSE decryption for offset=%d size=%d", offset, size) + streamFetchTime = 0 // No full stream fetch in range-aware path + err := s3a.streamDecryptedRangeFromChunks(r.Context(), w, entry, offset, size, sseType, decryptionKey) + decryptSetupTime = time.Since(tDecryptSetup) + copyTime = decryptSetupTime // Streaming is included in decrypt setup for range-aware path + if err != nil { + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(err) + } + return nil + } + + // Full object path: Optimize multipart vs single-part + var decryptedReader io.Reader + var err error + + switch sseType { + case s3_constants.SSETypeC: + customerKey := decryptionKey.(*SSECustomerKey) + + // Check if this is a multipart object (multiple chunks with SSE-C metadata) + isMultipartSSEC := false + ssecChunks := 0 + for _, chunk := range entry.GetChunks() { + if chunk.GetSseType() == filer_pb.SSEType_SSE_C && len(chunk.GetSseMetadata()) > 0 { + ssecChunks++ + } + } + isMultipartSSEC = ssecChunks > 1 + glog.V(3).Infof("SSE-C decryption: KeyMD5=%s, entry has %d chunks, isMultipart=%v, ssecChunks=%d", + customerKey.KeyMD5, len(entry.GetChunks()), isMultipartSSEC, ssecChunks) + + if isMultipartSSEC { + // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly + // This saves one filer lookup/pipe creation + decryptedReader, err = s3a.createMultipartSSECDecryptedReaderDirect(r.Context(), nil, customerKey, entry) + glog.V(2).Infof("Using multipart SSE-C decryption for object with %d chunks (no prefetch)", len(entry.GetChunks())) + } else { + // For single-part, get encrypted stream and decrypt + tStreamFetch := time.Now() + encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) + streamFetchTime = time.Since(tStreamFetch) + if streamErr != nil { + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(streamErr) + } + defer encryptedReader.Close() + + iv := entry.Extended[s3_constants.SeaweedFSSSEIV] + if len(iv) == 0 { + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(fmt.Errorf("SSE-C IV not found in entry metadata")) + } + glog.V(2).Infof("SSE-C decryption: IV length=%d, KeyMD5=%s", len(iv), customerKey.KeyMD5) + decryptedReader, err = CreateSSECDecryptedReader(encryptedReader, customerKey, iv) + } + + case s3_constants.SSETypeKMS: + sseKMSKey := decryptionKey.(*SSEKMSKey) + + // Check if this is a multipart object (multiple chunks with SSE-KMS metadata) + isMultipartSSEKMS := false + ssekmsChunks := 0 + for _, chunk := range entry.GetChunks() { + if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 { + ssekmsChunks++ + } + } + isMultipartSSEKMS = ssekmsChunks > 1 + glog.V(3).Infof("SSE-KMS decryption: isMultipart=%v, ssekmsChunks=%d", isMultipartSSEKMS, ssekmsChunks) + + if isMultipartSSEKMS { + // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly + decryptedReader, err = s3a.createMultipartSSEKMSDecryptedReaderDirect(r.Context(), nil, entry) + glog.V(2).Infof("Using multipart SSE-KMS decryption for object with %d chunks (no prefetch)", len(entry.GetChunks())) + } else { + // For single-part, get encrypted stream and decrypt + tStreamFetch := time.Now() + encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) + streamFetchTime = time.Since(tStreamFetch) + if streamErr != nil { + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(streamErr) + } + defer encryptedReader.Close() + + glog.V(2).Infof("SSE-KMS decryption: KeyID=%s, IV length=%d", sseKMSKey.KeyID, len(sseKMSKey.IV)) + decryptedReader, err = CreateSSEKMSDecryptedReader(encryptedReader, sseKMSKey) + } + + case s3_constants.SSETypeS3: + sseS3Key := decryptionKey.(*SSES3Key) + + // Check if this is a multipart object (multiple chunks with SSE-S3 metadata) + isMultipartSSES3 := false + sses3Chunks := 0 + for _, chunk := range entry.GetChunks() { + if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(chunk.GetSseMetadata()) > 0 { + sses3Chunks++ + } + } + isMultipartSSES3 = sses3Chunks > 1 + glog.V(3).Infof("SSE-S3 decryption: isMultipart=%v, sses3Chunks=%d", isMultipartSSES3, sses3Chunks) + + if isMultipartSSES3 { + // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly + decryptedReader, err = s3a.createMultipartSSES3DecryptedReaderDirect(r.Context(), nil, entry) + glog.V(2).Infof("Using multipart SSE-S3 decryption for object with %d chunks (no prefetch)", len(entry.GetChunks())) + } else { + // For single-part, get encrypted stream and decrypt + tStreamFetch := time.Now() + encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry) + streamFetchTime = time.Since(tStreamFetch) + if streamErr != nil { + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(streamErr) + } + defer encryptedReader.Close() + + keyManager := GetSSES3KeyManager() + iv, ivErr := GetSSES3IV(entry, sseS3Key, keyManager) + if ivErr != nil { + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(fmt.Errorf("failed to get SSE-S3 IV: %w", ivErr)) + } + glog.V(2).Infof("SSE-S3 decryption: KeyID=%s, IV length=%d", sseS3Key.KeyID, len(iv)) + decryptedReader, err = CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv) + } + } + decryptSetupTime = time.Since(tDecryptSetup) + + if err != nil { + glog.Errorf("SSE decryption error (%s): %v", sseType, err) + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(fmt.Errorf("failed to create decrypted reader: %w", err)) + } + + // Close the decrypted reader to avoid leaking HTTP bodies + if closer, ok := decryptedReader.(io.Closer); ok { + defer func() { + if closeErr := closer.Close(); closeErr != nil { + glog.V(3).Infof("Error closing decrypted reader: %v", closeErr) + } + }() + } + + // Stream full decrypted object to client + tCopy := time.Now() + buf := make([]byte, 128*1024) + copied, copyErr := io.CopyBuffer(w, decryptedReader, buf) + copyTime = time.Since(tCopy) + if copyErr != nil { + glog.Errorf("Failed to copy full object: copied %d bytes: %v", copied, copyErr) + // Error after WriteHeader - response already written + return newStreamErrorWithResponse(copyErr) + } + glog.V(3).Infof("Full object request: copied %d bytes", copied) + return nil +} + +// streamDecryptedRangeFromChunks streams a range of decrypted data by only fetching needed chunks +// This implements the filer's ViewFromChunks approach for optimal range performance +func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io.Writer, entry *filer_pb.Entry, offset int64, size int64, sseType string, decryptionKey interface{}) error { + // Use filer's ViewFromChunks to resolve only needed chunks for the range + lookupFileIdFn := s3a.createLookupFileIdFunction() + chunkViews := filer.ViewFromChunks(ctx, lookupFileIdFn, entry.GetChunks(), offset, size) + + totalWritten := int64(0) + targetOffset := offset + + // Stream each chunk view + for x := chunkViews.Front(); x != nil; x = x.Next { + chunkView := x.Value + + // Handle gaps between chunks (write zeros) + if targetOffset < chunkView.ViewOffset { + gap := chunkView.ViewOffset - targetOffset + glog.V(4).Infof("Writing %d zero bytes for gap [%d,%d)", gap, targetOffset, chunkView.ViewOffset) + if err := writeZeroBytes(w, gap); err != nil { + return fmt.Errorf("failed to write zero padding: %w", err) + } + totalWritten += gap + targetOffset = chunkView.ViewOffset + } + + // Find the corresponding FileChunk for this chunkView + var fileChunk *filer_pb.FileChunk + for _, chunk := range entry.GetChunks() { + if chunk.GetFileIdString() == chunkView.FileId { + fileChunk = chunk + break + } + } + if fileChunk == nil { + return fmt.Errorf("chunk %s not found in entry", chunkView.FileId) + } + + // Fetch and decrypt this chunk view + var decryptedChunkReader io.Reader + var err error + + switch sseType { + case s3_constants.SSETypeC: + decryptedChunkReader, err = s3a.decryptSSECChunkView(ctx, fileChunk, chunkView, decryptionKey.(*SSECustomerKey)) + case s3_constants.SSETypeKMS: + decryptedChunkReader, err = s3a.decryptSSEKMSChunkView(ctx, fileChunk, chunkView) + case s3_constants.SSETypeS3: + decryptedChunkReader, err = s3a.decryptSSES3ChunkView(ctx, fileChunk, chunkView, entry) + default: + // Non-encrypted chunk + decryptedChunkReader, err = s3a.fetchChunkViewData(ctx, chunkView) + } + + if err != nil { + return fmt.Errorf("failed to decrypt chunk view %s: %w", chunkView.FileId, err) + } + + // Copy the decrypted chunk data + written, copyErr := io.Copy(w, decryptedChunkReader) + if closer, ok := decryptedChunkReader.(io.Closer); ok { + closeErr := closer.Close() + if closeErr != nil { + glog.Warningf("streamDecryptedRangeFromChunks: failed to close decrypted chunk reader: %v", closeErr) + } + } + if copyErr != nil { + glog.Errorf("streamDecryptedRangeFromChunks: copy error after writing %d bytes (expected %d): %v", written, chunkView.ViewSize, copyErr) + return fmt.Errorf("failed to copy decrypted chunk data: %w", copyErr) + } + + if written != int64(chunkView.ViewSize) { + glog.Errorf("streamDecryptedRangeFromChunks: size mismatch - wrote %d bytes but expected %d", written, chunkView.ViewSize) + return fmt.Errorf("size mismatch: wrote %d bytes but expected %d for chunk %s", written, chunkView.ViewSize, chunkView.FileId) + } + + totalWritten += written + targetOffset += written + glog.V(2).Infof("streamDecryptedRangeFromChunks: Wrote %d bytes from chunk %s [%d,%d), totalWritten=%d, targetSize=%d", written, chunkView.FileId, chunkView.ViewOffset, chunkView.ViewOffset+int64(chunkView.ViewSize), totalWritten, size) + } + + // Handle trailing zeros if needed + remaining := size - totalWritten + if remaining > 0 { + glog.V(4).Infof("Writing %d trailing zero bytes", remaining) + if err := writeZeroBytes(w, remaining); err != nil { + return fmt.Errorf("failed to write trailing zeros: %w", err) + } + } + + glog.V(3).Infof("Completed range-aware SSE decryption: wrote %d bytes for range [%d,%d)", totalWritten, offset, offset+size) + return nil +} + +// writeZeroBytes writes n zero bytes to writer using the package-level zero buffer +func writeZeroBytes(w io.Writer, n int64) error { + for n > 0 { + toWrite := min(n, int64(len(zeroBuf))) + written, err := w.Write(zeroBuf[:toWrite]) + if err != nil { + return err + } + n -= int64(written) + } + return nil +} + +// decryptSSECChunkView decrypts a specific chunk view with SSE-C +// +// IV Handling for SSE-C: +// ---------------------- +// SSE-C multipart encryption (see lines 2772-2781) differs fundamentally from SSE-KMS/SSE-S3: +// +// 1. Encryption: CreateSSECEncryptedReader generates a RANDOM IV per part/chunk +// - Each part starts with a fresh random IV +// - CTR counter starts from 0 for each part: counter₀, counter₁, counter₂, ... +// - PartOffset is stored in metadata but NOT applied during encryption +// +// 2. Decryption: Use the stored IV directly WITHOUT offset adjustment +// - The stored IV already represents the start of this part's encryption +// - Applying calculateIVWithOffset would shift to counterₙ, misaligning the keystream +// - Result: XOR with wrong keystream = corrupted plaintext +// +// This contrasts with SSE-KMS/SSE-S3 which use: base IV + calculateIVWithOffset(ChunkOffset) +func (s3a *S3ApiServer) decryptSSECChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView, customerKey *SSECustomerKey) (io.Reader, error) { + // For multipart SSE-C, each chunk has its own IV in chunk.SseMetadata + if fileChunk.GetSseType() == filer_pb.SSEType_SSE_C && len(fileChunk.GetSseMetadata()) > 0 { + ssecMetadata, err := DeserializeSSECMetadata(fileChunk.GetSseMetadata()) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-C metadata: %w", err) + } + chunkIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV) + if err != nil { + return nil, fmt.Errorf("failed to decode IV: %w", err) + } + + // Fetch FULL encrypted chunk + // Note: Fetching full chunk is necessary for proper CTR decryption stream + fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId) + if err != nil { + return nil, fmt.Errorf("failed to fetch full chunk: %w", err) + } + + // CRITICAL: Use stored IV directly WITHOUT offset adjustment + // The stored IV is the random IV used at encryption time for this specific part + // SSE-C does NOT apply calculateIVWithOffset during encryption, so we must not apply it during decryption + // (See documentation above and at lines 2772-2781 for detailed explanation) + decryptedReader, decryptErr := CreateSSECDecryptedReader(fullChunkReader, customerKey, chunkIV) + if decryptErr != nil { + fullChunkReader.Close() + return nil, fmt.Errorf("failed to create decrypted reader: %w", decryptErr) + } + + // Skip to the position we need in the decrypted stream + if chunkView.OffsetInChunk > 0 { + _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk) + if err != nil { + if closer, ok := decryptedReader.(io.Closer); ok { + closer.Close() + } + return nil, fmt.Errorf("failed to skip to offset %d: %w", chunkView.OffsetInChunk, err) + } + } + + // Return a reader that only reads ViewSize bytes with proper cleanup + limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) + return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil + } + + // Single-part SSE-C: use object-level IV (should not hit this in range path, but handle it) + encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView) + if err != nil { + return nil, err + } + // For single-part, the IV is stored at object level, already handled in non-range path + return encryptedReader, nil +} + +// decryptSSEKMSChunkView decrypts a specific chunk view with SSE-KMS +// +// IV Handling for SSE-KMS: +// ------------------------ +// SSE-KMS (and SSE-S3) use a fundamentally different IV scheme than SSE-C: +// +// 1. Encryption: Uses a BASE IV + offset calculation +// - Base IV is generated once for the entire object +// - For each chunk at position N: adjustedIV = calculateIVWithOffset(baseIV, N) +// - This shifts the CTR counter to counterₙ where n = N/16 +// - ChunkOffset is stored in metadata and IS applied during encryption +// +// 2. Decryption: Apply the same offset calculation +// - Use calculateIVWithOffset(baseIV, ChunkOffset) to reconstruct the encryption IV +// - Also handle ivSkip for non-block-aligned offsets (intra-block positioning) +// - This ensures decryption uses the same CTR counter sequence as encryption +// +// This contrasts with SSE-C which uses random IVs without offset calculation. +func (s3a *S3ApiServer) decryptSSEKMSChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView) (io.Reader, error) { + if fileChunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(fileChunk.GetSseMetadata()) > 0 { + sseKMSKey, err := DeserializeSSEKMSMetadata(fileChunk.GetSseMetadata()) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err) + } + + // Fetch FULL encrypted chunk + fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId) + if err != nil { + return nil, fmt.Errorf("failed to fetch full chunk: %w", err) + } + + // IMPORTANT: Calculate adjusted IV using ChunkOffset + // SSE-KMS uses base IV + offset calculation (unlike SSE-C which uses random IVs) + // This reconstructs the same IV that was used during encryption + var adjustedIV []byte + var ivSkip int + if sseKMSKey.ChunkOffset > 0 { + adjustedIV, ivSkip = calculateIVWithOffset(sseKMSKey.IV, sseKMSKey.ChunkOffset) + } else { + adjustedIV = sseKMSKey.IV + ivSkip = 0 + } + + adjustedKey := &SSEKMSKey{ + KeyID: sseKMSKey.KeyID, + EncryptedDataKey: sseKMSKey.EncryptedDataKey, + EncryptionContext: sseKMSKey.EncryptionContext, + BucketKeyEnabled: sseKMSKey.BucketKeyEnabled, + IV: adjustedIV, + ChunkOffset: sseKMSKey.ChunkOffset, + } + + decryptedReader, decryptErr := CreateSSEKMSDecryptedReader(fullChunkReader, adjustedKey) + if decryptErr != nil { + fullChunkReader.Close() + return nil, fmt.Errorf("failed to create KMS decrypted reader: %w", decryptErr) + } + + // CRITICAL: Skip intra-block bytes from CTR decryption (non-block-aligned offset handling) + if ivSkip > 0 { + _, err = io.CopyN(io.Discard, decryptedReader, int64(ivSkip)) + if err != nil { + if closer, ok := decryptedReader.(io.Closer); ok { + closer.Close() + } + return nil, fmt.Errorf("failed to skip intra-block bytes (%d): %w", ivSkip, err) + } + } + + // Skip to position and limit to ViewSize + if chunkView.OffsetInChunk > 0 { + _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk) + if err != nil { + if closer, ok := decryptedReader.(io.Closer); ok { + closer.Close() + } + return nil, fmt.Errorf("failed to skip to offset: %w", err) + } + } + + limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) + return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil + } + + // Non-KMS encrypted chunk + return s3a.fetchChunkViewData(ctx, chunkView) +} + +// decryptSSES3ChunkView decrypts a specific chunk view with SSE-S3 +// +// IV Handling for SSE-S3: +// ----------------------- +// SSE-S3 uses the same BASE IV + offset scheme as SSE-KMS, but with a subtle difference: +// +// 1. Encryption: Uses BASE IV + offset, but stores the ADJUSTED IV +// - Base IV is generated once for the entire object +// - For each chunk at position N: adjustedIV, skip = calculateIVWithOffset(baseIV, N) +// - The ADJUSTED IV (not base IV) is stored in chunk metadata +// - ChunkOffset calculation is performed during encryption +// +// 2. Decryption: Use the stored adjusted IV directly +// - The stored IV is already block-aligned and ready to use +// - No need to call calculateIVWithOffset again (unlike SSE-KMS) +// - Decrypt full chunk from start, then skip to OffsetInChunk in plaintext +// +// This differs from: +// - SSE-C: Uses random IV per chunk, no offset calculation +// - SSE-KMS: Stores base IV, requires calculateIVWithOffset during decryption +func (s3a *S3ApiServer) decryptSSES3ChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView, entry *filer_pb.Entry) (io.Reader, error) { + // For multipart SSE-S3, each chunk has its own IV in chunk.SseMetadata + if fileChunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(fileChunk.GetSseMetadata()) > 0 { + keyManager := GetSSES3KeyManager() + + // Deserialize per-chunk SSE-S3 metadata to get chunk-specific IV + chunkSSES3Metadata, err := DeserializeSSES3Metadata(fileChunk.GetSseMetadata(), keyManager) + if err != nil { + return nil, fmt.Errorf("failed to deserialize chunk SSE-S3 metadata: %w", err) + } + + // Fetch FULL encrypted chunk (necessary for proper CTR decryption stream) + fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId) + if err != nil { + return nil, fmt.Errorf("failed to fetch full chunk: %w", err) + } + + // IMPORTANT: Use the stored IV directly - it's already block-aligned + // During encryption, CreateSSES3EncryptedReaderWithBaseIV called: + // adjustedIV, skip := calculateIVWithOffset(baseIV, partOffset) + // and stored the adjustedIV in metadata. We use it as-is for decryption. + // No need to call calculateIVWithOffset again (unlike SSE-KMS which stores base IV). + iv := chunkSSES3Metadata.IV + + glog.V(4).Infof("Decrypting multipart SSE-S3 chunk %s with chunk-specific IV length=%d", + chunkView.FileId, len(iv)) + + // Decrypt the full chunk starting from offset 0 + decryptedReader, decryptErr := CreateSSES3DecryptedReader(fullChunkReader, chunkSSES3Metadata, iv) + if decryptErr != nil { + fullChunkReader.Close() + return nil, fmt.Errorf("failed to create SSE-S3 decrypted reader: %w", decryptErr) + } + + // Skip to position within the decrypted chunk (plaintext offset, not ciphertext offset) + if chunkView.OffsetInChunk > 0 { + _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk) + if err != nil { + if closer, ok := decryptedReader.(io.Closer); ok { + closer.Close() + } + return nil, fmt.Errorf("failed to skip to offset %d: %w", chunkView.OffsetInChunk, err) + } + } + + limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) + return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil + } + + // Single-part SSE-S3: use object-level IV and key (fallback path) + keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key] + keyManager := GetSSES3KeyManager() + sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err) + } + + // Fetch FULL encrypted chunk + fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId) + if err != nil { + return nil, fmt.Errorf("failed to fetch full chunk: %w", err) + } + + // Get base IV for single-part object + iv, err := GetSSES3IV(entry, sseS3Key, keyManager) + if err != nil { + fullChunkReader.Close() + return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err) + } + + glog.V(4).Infof("Decrypting single-part SSE-S3 chunk %s with entry-level IV length=%d", + chunkView.FileId, len(iv)) + + decryptedReader, decryptErr := CreateSSES3DecryptedReader(fullChunkReader, sseS3Key, iv) + if decryptErr != nil { + fullChunkReader.Close() + return nil, fmt.Errorf("failed to create S3 decrypted reader: %w", decryptErr) + } + + // Skip to position and limit to ViewSize + if chunkView.OffsetInChunk > 0 { + _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk) + if err != nil { + if closer, ok := decryptedReader.(io.Closer); ok { + closer.Close() + } + return nil, fmt.Errorf("failed to skip to offset: %w", err) + } + } + + limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize)) + return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil +} + +// fetchFullChunk fetches the complete encrypted chunk from volume server +func (s3a *S3ApiServer) fetchFullChunk(ctx context.Context, fileId string) (io.ReadCloser, error) { + // Lookup the volume server URLs for this chunk + lookupFileIdFn := s3a.createLookupFileIdFunction() + urlStrings, err := lookupFileIdFn(ctx, fileId) + if err != nil || len(urlStrings) == 0 { + return nil, fmt.Errorf("failed to lookup chunk %s: %w", fileId, err) + } + + // Use the first URL + chunkUrl := urlStrings[0] + + // Generate JWT for volume server authentication + jwt := security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, fileId) + + // Create request WITHOUT Range header to get full chunk + req, err := http.NewRequestWithContext(ctx, "GET", chunkUrl, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + // Set JWT for authentication + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + + // Use shared HTTP client + resp, err := volumeServerHTTPClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to fetch chunk: %w", err) + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return nil, fmt.Errorf("unexpected status code %d for chunk %s", resp.StatusCode, fileId) + } + + return resp.Body, nil +} + +// fetchChunkViewData fetches encrypted data for a chunk view (with range) +func (s3a *S3ApiServer) fetchChunkViewData(ctx context.Context, chunkView *filer.ChunkView) (io.ReadCloser, error) { + // Lookup the volume server URLs for this chunk + lookupFileIdFn := s3a.createLookupFileIdFunction() + urlStrings, err := lookupFileIdFn(ctx, chunkView.FileId) + if err != nil || len(urlStrings) == 0 { + return nil, fmt.Errorf("failed to lookup chunk %s: %w", chunkView.FileId, err) + } + + // Use the first URL (already contains complete URL with fileId) + chunkUrl := urlStrings[0] + + // Generate JWT for volume server authentication + jwt := security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, chunkView.FileId) + + // Create request with Range header for the chunk view + // chunkUrl already contains the complete URL including fileId + req, err := http.NewRequestWithContext(ctx, "GET", chunkUrl, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + // Set Range header to fetch only the needed portion of the chunk + if !chunkView.IsFullChunk() { + rangeEnd := chunkView.OffsetInChunk + int64(chunkView.ViewSize) - 1 + req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", chunkView.OffsetInChunk, rangeEnd)) + } + + // Set JWT for authentication + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + + // Use shared HTTP client with connection pooling + resp, err := volumeServerHTTPClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to fetch chunk: %w", err) + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { + resp.Body.Close() + return nil, fmt.Errorf("unexpected status code %d for chunk %s", resp.StatusCode, chunkView.FileId) + } + + return resp.Body, nil +} + +// getEncryptedStreamFromVolumes gets raw encrypted data stream from volume servers +func (s3a *S3ApiServer) getEncryptedStreamFromVolumes(ctx context.Context, entry *filer_pb.Entry) (io.ReadCloser, error) { + // Handle inline content + if len(entry.Content) > 0 { + return io.NopCloser(bytes.NewReader(entry.Content)), nil + } + + // Handle empty files + chunks := entry.GetChunks() + if len(chunks) == 0 { + return io.NopCloser(bytes.NewReader([]byte{})), nil + } + + // Reuse shared lookup function to keep volume lookup logic in one place + lookupFileIdFn := s3a.createLookupFileIdFunction() + + // Resolve chunks + totalSize := int64(filer.FileSize(entry)) + resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, 0, totalSize) + if err != nil { + return nil, err + } + + // Create streaming reader + masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn} + streamFn, err := filer.PrepareStreamContentWithThrottler( + ctx, + masterClient, + func(fileId string) string { + // Use volume server JWT (not filer JWT) for direct volume reads + return string(security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, fileId)) + }, + resolvedChunks, + 0, + totalSize, + 0, + ) + if err != nil { + return nil, err + } + + // Create a pipe to get io.ReadCloser + pipeReader, pipeWriter := io.Pipe() + go func() { + defer pipeWriter.Close() + if err := streamFn(pipeWriter); err != nil { + glog.Errorf("getEncryptedStreamFromVolumes: streaming error: %v", err) + pipeWriter.CloseWithError(err) + } + }() + + return pipeReader, nil +} + +// addSSEResponseHeadersFromEntry adds appropriate SSE response headers based on entry metadata +func (s3a *S3ApiServer) addSSEResponseHeadersFromEntry(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) { + if entry == nil || entry.Extended == nil { + return + } + + switch sseType { + case s3_constants.SSETypeC: + // SSE-C: Echo back algorithm and key MD5 + if algo, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; exists { + w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, string(algo)) + } + if keyMD5, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; exists { + w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, string(keyMD5)) + } + + case s3_constants.SSETypeKMS: + // SSE-KMS: Return algorithm and key ID + w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms") + if kmsMetadataBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists { + sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) + if err == nil { + AddSSEKMSResponseHeaders(w, sseKMSKey) + } + } + + case s3_constants.SSETypeS3: + // SSE-S3: Return algorithm + w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256) + } +} + +// setResponseHeaders sets all standard HTTP response headers from entry metadata +func (s3a *S3ApiServer) setResponseHeaders(w http.ResponseWriter, entry *filer_pb.Entry, totalSize int64) { + // Safety check: entry must be valid + if entry == nil { + glog.Errorf("setResponseHeaders: entry is nil") + return + } + + // Set content length and accept ranges + w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + w.Header().Set("Accept-Ranges", "bytes") + + // Set ETag (but don't overwrite if already set, e.g., for part-specific GET requests) + if w.Header().Get("ETag") == "" { + etag := filer.ETag(entry) + if etag != "" { + w.Header().Set("ETag", "\""+etag+"\"") + } + } + + // Set Last-Modified in RFC1123 format + if entry.Attributes != nil { + modTime := time.Unix(entry.Attributes.Mtime, 0).UTC() + w.Header().Set("Last-Modified", modTime.Format(http.TimeFormat)) + } + + // Set Content-Type + mimeType := "" + if entry.Attributes != nil && entry.Attributes.Mime != "" { + mimeType = entry.Attributes.Mime + } + if mimeType == "" { + // Try to detect from entry name + if entry.Name != "" { + ext := filepath.Ext(entry.Name) + if ext != "" { + mimeType = mime.TypeByExtension(ext) + } + } + } + if mimeType != "" { + w.Header().Set("Content-Type", mimeType) + } else { + w.Header().Set("Content-Type", "application/octet-stream") + } + + // Set custom headers from entry.Extended (user metadata) + // Use direct map assignment to preserve original header casing (matches proxy behavior) + if entry.Extended != nil { + for k, v := range entry.Extended { + // Skip internal SeaweedFS headers + if !strings.HasPrefix(k, "xattr-") && !s3_constants.IsSeaweedFSInternalHeader(k) { + // Support backward compatibility: migrate old non-canonical format to canonical format + // OLD: "x-amz-meta-foo" → NEW: "X-Amz-Meta-foo" (preserving suffix case) + headerKey := k + if len(k) >= 11 && strings.EqualFold(k[:11], "x-amz-meta-") { + // Normalize to AWS S3 format: "X-Amz-Meta-" prefix with lowercase suffix + // AWS S3 returns user metadata with the suffix in lowercase + suffix := k[len("x-amz-meta-"):] + headerKey = s3_constants.AmzUserMetaPrefix + strings.ToLower(suffix) + if glog.V(4) && k != headerKey { + glog.Infof("Normalizing user metadata header %q to %q in response", k, headerKey) + } + } + w.Header()[headerKey] = []string{string(v)} + } + } + } + + // Set tag count header (matches filer logic) + if entry.Extended != nil { + tagCount := 0 + for k := range entry.Extended { + if strings.HasPrefix(k, s3_constants.AmzObjectTagging+"-") { + tagCount++ + } + } + if tagCount > 0 { + w.Header().Set(s3_constants.AmzTagCount, strconv.Itoa(tagCount)) + } + } +} + +// simpleMasterClient implements the minimal interface for streaming +type simpleMasterClient struct { + lookupFn func(ctx context.Context, fileId string) ([]string, error) +} + +func (s *simpleMasterClient) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType { + return s.lookupFn } +// HeadObjectHandler handles S3 HEAD object requests +// +// Special behavior for implicit directories: +// When a HEAD request is made on a path without a trailing slash, and that path represents +// a directory with children (either a 0-byte file marker or an actual directory), this handler +// returns 404 Not Found instead of 200 OK. This behavior improves compatibility with s3fs and +// matches AWS S3's handling of implicit directories. +// +// Rationale: +// - AWS S3 typically doesn't create directory markers when files are uploaded (e.g., uploading +// "dataset/file.txt" doesn't create a marker at "dataset") +// - Some S3 clients (like PyArrow with s3fs) create directory markers, which can confuse s3fs +// - s3fs's info() method calls HEAD first; if it succeeds with size=0, s3fs incorrectly reports +// the object as a file instead of checking for children +// - By returning 404 for implicit directories, we force s3fs to fall back to LIST-based discovery, +// which correctly identifies directories by checking for children +// +// Examples: +// +// HEAD /bucket/dataset (no trailing slash, has children) → 404 Not Found (implicit directory) +// HEAD /bucket/dataset/ (trailing slash) → 200 OK (explicit directory request) +// HEAD /bucket/empty.txt (0-byte file, no children) → 200 OK (legitimate empty file) +// HEAD /bucket/file.txt (regular file) → 200 OK (normal operation) +// +// This behavior only applies to: +// - Non-versioned buckets (versioned buckets use different semantics) +// - Paths without trailing slashes (trailing slash indicates explicit directory request) +// - Objects that are either 0-byte files or actual directories +// - Objects that have at least one child (checked via hasChildren) func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { bucket, object := s3_constants.GetBucketAndObject(r) @@ -456,7 +2062,6 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request versionId := r.URL.Query().Get("versionId") var ( - destUrl string entry *filer_pb.Entry // Declare entry at function scope for SSE processing versioningConfigured bool err error @@ -491,22 +2096,61 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request } targetVersionId = versionId } else { - // Request for latest version - glog.V(2).Infof("HeadObject: requesting latest version for %s%s", bucket, object) - entry, err = s3a.getLatestObjectVersion(bucket, object) - if err != nil { - glog.Errorf("Failed to get latest version: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } - if entry.Extended != nil { - if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { - targetVersionId = string(versionIdBytes) + // Request for latest version - OPTIMIZATION: + // Check if .versions/ directory exists quickly (no retries) to decide path + // - If .versions/ exists: real versions available, use getLatestObjectVersion + // - If .versions/ doesn't exist (ErrNotFound): only null version at regular path, use it directly + // - If transient error: fall back to getLatestObjectVersion which has retry logic + bucketDir := s3a.option.BucketsPath + "/" + bucket + normalizedObject := removeDuplicateSlashes(object) + versionsDir := normalizedObject + s3_constants.VersionsFolder + + // Quick check (no retries) for .versions/ directory + versionsEntry, versionsErr := s3a.getEntry(bucketDir, versionsDir) + + if versionsErr == nil && versionsEntry != nil { + // .versions/ exists, meaning real versions are stored there + // Use getLatestObjectVersion which will properly find the newest version + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } else if errors.Is(versionsErr, filer_pb.ErrNotFound) { + // .versions/ doesn't exist (confirmed not found), check regular path for null version + regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject) + if regularErr == nil && regularEntry != nil { + // Found object at regular path - this is the null version + entry = regularEntry + targetVersionId = "null" + } else { + // No object at regular path either - object doesn't exist + glog.Errorf("HeadObject: object not found at regular path or .versions for %s%s", bucket, object) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } else { + // Transient error checking .versions/, fall back to getLatestObjectVersion with retries + glog.V(2).Infof("HeadObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr) + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return } } - // If no version ID found in entry, this is a pre-versioning object + // Extract version ID if not already set if targetVersionId == "" { - targetVersionId = "null" + if entry.Extended != nil { + if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { + targetVersionId = string(versionIdBytes) + } + } + // If no version ID found in entry, this is a pre-versioning object + if targetVersionId == "" { + targetVersionId = "null" + } } } @@ -518,16 +2162,11 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request } } - // Determine the actual file path based on whether this is a versioned or pre-versioning object + // For versioned objects, log the target version if targetVersionId == "null" { - // Pre-versioning object - stored as regular file - destUrl = s3a.toFilerUrl(bucket, object) - glog.V(2).Infof("HeadObject: pre-versioning object URL: %s", destUrl) + glog.V(2).Infof("HeadObject: pre-versioning object %s/%s", bucket, object) } else { - // Versioned object - stored in .versions directory - versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId) - destUrl = s3a.toFilerUrl(bucket, versionObjectPath) - glog.V(2).Infof("HeadObject: version %s URL: %s", targetVersionId, destUrl) + glog.V(2).Infof("HeadObject: version %s for %s/%s", targetVersionId, bucket, object) } // Set version ID in response header @@ -535,9 +2174,6 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request // Add object lock metadata to response headers if present s3a.addObjectLockHeadersToResponse(w, entry) - } else { - // Handle regular HEAD (non-versioned) - destUrl = s3a.toFilerUrl(bucket, object) } // Fetch the correct entry for SSE processing (respects versionId) @@ -559,7 +2195,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request var fetchErr error objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object) if fetchErr != nil { - glog.Errorf("HeadObjectHandler: failed to get entry for SSE check: %v", fetchErr) + glog.Warningf("HeadObjectHandler: failed to get entry for %s/%s: %v", bucket, object, fetchErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } @@ -571,122 +2207,150 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request } } - s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { - // Handle SSE validation (both SSE-C and SSE-KMS) for HEAD requests - return s3a.handleSSEResponse(r, proxyResponse, w, objectEntryForSSE) - }) -} - -func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, isWrite bool, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64)) { - - glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl) - start := time.Now() - - proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body) - - if err != nil { - glog.Errorf("NewRequest %s: %v", destUrl, err) + // Safety check: entry must be valid + if objectEntryForSSE == nil { + glog.Errorf("HeadObjectHandler: objectEntryForSSE is nil for %s/%s (should not happen)", bucket, object) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) - proxyReq.Header.Set("Accept-Encoding", "identity") - for k, v := range r.URL.Query() { - if _, ok := s3_constants.PassThroughHeaders[strings.ToLower(k)]; ok { - proxyReq.Header[k] = v - } - if k == "partNumber" { - proxyReq.Header[s3_constants.SeaweedFSPartNumber] = v + // Implicit Directory Handling for s3fs Compatibility + // ==================================================== + // + // Background: + // Some S3 clients (like PyArrow with s3fs) create directory markers when writing datasets. + // These can be either: + // 1. 0-byte files with directory MIME type (e.g., "application/octet-stream") + // 2. Actual directories in the filer (created by PyArrow's write_dataset) + // + // Problem: + // s3fs's info() method calls HEAD on the path. If HEAD returns 200 with size=0, + // s3fs incorrectly reports it as a file (type='file', size=0) instead of checking + // for children. This causes PyArrow to fail with "Parquet file size is 0 bytes". + // + // Solution: + // For non-versioned objects without trailing slash, if the object is a 0-byte file + // or directory AND has children, return 404 instead of 200. This forces s3fs to + // fall back to LIST-based discovery, which correctly identifies it as a directory. + // + // AWS S3 Compatibility: + // AWS S3 typically doesn't create directory markers for implicit directories, so + // HEAD on "dataset" (when only "dataset/file.txt" exists) returns 404. Our behavior + // matches this by returning 404 for implicit directories with children. + // + // Edge Cases Handled: + // - Empty files (0-byte, no children) → 200 OK (legitimate empty file) + // - Empty directories (no children) → 200 OK (legitimate empty directory) + // - Explicit directory requests (trailing slash) → 200 OK (handled earlier) + // - Versioned objects → Skip this check (different semantics) + // + // Performance: + // Only adds overhead for 0-byte files or directories without trailing slash. + // Cost: One LIST operation with Limit=1 (~1-5ms). + // + if !versioningConfigured && !strings.HasSuffix(object, "/") { + // Check if this is an implicit directory (either a 0-byte file or actual directory with children) + // PyArrow may create 0-byte files when writing datasets, or the filer may have actual directories + if objectEntryForSSE.Attributes != nil { + isZeroByteFile := objectEntryForSSE.Attributes.FileSize == 0 && !objectEntryForSSE.IsDirectory + isActualDirectory := objectEntryForSSE.IsDirectory + + if isZeroByteFile || isActualDirectory { + // Check if it has children (making it an implicit directory) + if s3a.hasChildren(bucket, object) { + // This is an implicit directory with children + // Return 404 to force clients (like s3fs) to use LIST-based discovery + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } } } - for header, values := range r.Header { - proxyReq.Header[header] = values - } - if proxyReq.ContentLength == 0 && r.ContentLength != 0 { - proxyReq.ContentLength = r.ContentLength - } - // ensure that the Authorization header is overriding any previous - // Authorization header which might be already present in proxyReq - s3a.maybeAddFilerJwtAuthorization(proxyReq, isWrite) - resp, postErr := s3a.client.Do(proxyReq) + // For HEAD requests, we already have all metadata - just set headers directly + totalSize := int64(filer.FileSize(objectEntryForSSE)) + s3a.setResponseHeaders(w, objectEntryForSSE, totalSize) - if postErr != nil { - glog.Errorf("post to filer: %v", postErr) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return + // Check if PartNumber query parameter is present (for multipart objects) + // This logic matches the filer handler for consistency + partNumberStr := r.URL.Query().Get("partNumber") + if partNumberStr == "" { + partNumberStr = r.URL.Query().Get("PartNumber") } - defer util_http.CloseResponse(resp) - if resp.StatusCode == http.StatusPreconditionFailed { - s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed) - return - } + // If PartNumber is specified, set headers (matching filer logic) + if partNumberStr != "" { + if partNumber, parseErr := strconv.Atoi(partNumberStr); parseErr == nil && partNumber > 0 { + // Get actual parts count from metadata (not chunk count) + partsCount, partInfo := s3a.getMultipartInfo(objectEntryForSSE, partNumber) - if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable { - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) - return - } - - if r.Method == http.MethodDelete { - if resp.StatusCode == http.StatusNotFound { - // this is normal - responseStatusCode, _ := responseFn(resp, w) - s3err.PostLog(r, responseStatusCode, s3err.ErrNone) - return - } - } - if resp.StatusCode == http.StatusNotFound { - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } - - TimeToFirstByte(r.Method, start, r) - if resp.Header.Get(s3_constants.SeaweedFSIsDirectoryKey) == "true" { - responseStatusCode, _ := responseFn(resp, w) - s3err.PostLog(r, responseStatusCode, s3err.ErrNone) - return - } + // Validate part number + if partNumber > partsCount { + glog.Warningf("HeadObject: Invalid part number %d, object has %d parts", partNumber, partsCount) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) + return + } - if resp.StatusCode == http.StatusInternalServerError { - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return - } + // Set parts count header + w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(partsCount)) + glog.V(3).Infof("HeadObject: Set PartsCount=%d for part %d", partsCount, partNumber) - // when HEAD a directory, it should be reported as no such key - // https://github.com/seaweedfs/seaweedfs/issues/3457 - if resp.ContentLength == -1 && resp.StatusCode != http.StatusNotModified { - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return - } - - if resp.StatusCode == http.StatusBadRequest { - resp_body, _ := io.ReadAll(resp.Body) - switch string(resp_body) { - case "InvalidPart": - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) - default: - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest) + // Override ETag with the part's ETag + if partInfo != nil { + // Use part ETag from metadata (accurate for multi-chunk parts) + w.Header().Set("ETag", "\""+partInfo.ETag+"\"") + glog.V(3).Infof("HeadObject: Override ETag with part %d ETag: %s (from metadata)", partNumber, partInfo.ETag) + } else { + // Fallback: use chunk's ETag (backward compatibility) + chunkIndex := partNumber - 1 + if chunkIndex >= len(objectEntryForSSE.Chunks) { + glog.Warningf("HeadObject: Part %d chunk index %d out of range (chunks: %d)", partNumber, chunkIndex, len(objectEntryForSSE.Chunks)) + s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) + return + } + partChunk := objectEntryForSSE.Chunks[chunkIndex] + if partChunk.ETag != "" { + if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil { + partETag := fmt.Sprintf("%x", md5Bytes) + w.Header().Set("ETag", "\""+partETag+"\"") + glog.V(3).Infof("HeadObject: Override ETag with part %d ETag: %s (fallback from chunk)", partNumber, partETag) + } + } + } } - resp.Body.Close() - return } - setUserMetadataKeyToLowercase(resp) - - responseStatusCode, bytesTransferred := responseFn(resp, w) - BucketTrafficSent(bytesTransferred, r) - s3err.PostLog(r, responseStatusCode, s3err.ErrNone) -} - -func setUserMetadataKeyToLowercase(resp *http.Response) { - for key, value := range resp.Header { - if strings.HasPrefix(key, s3_constants.AmzUserMetaPrefix) { - resp.Header[strings.ToLower(key)] = value - delete(resp.Header, key) + // Detect and handle SSE + glog.V(3).Infof("HeadObjectHandler: Retrieved entry for %s%s - %d chunks", bucket, object, len(objectEntryForSSE.Chunks)) + sseType := s3a.detectPrimarySSEType(objectEntryForSSE) + glog.V(2).Infof("HeadObjectHandler: Detected SSE type: %s", sseType) + if sseType != "" && sseType != "None" { + // Validate SSE headers for encrypted objects + switch sseType { + case s3_constants.SSETypeC: + customerKey, err := ParseSSECHeaders(r) + if err != nil { + s3err.WriteErrorResponse(w, r, MapSSECErrorToS3Error(err)) + return + } + if customerKey == nil { + s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) + return + } + // Validate key MD5 + if objectEntryForSSE.Extended != nil { + storedKeyMD5 := string(objectEntryForSSE.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]) + if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 { + s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) + return + } + } } + // Add SSE response headers + s3a.addSSEResponseHeadersFromEntry(w, r, objectEntryForSSE, sseType) } + + w.WriteHeader(http.StatusOK) } func captureCORSHeaders(w http.ResponseWriter, headersToCapture []string) map[string]string { @@ -934,247 +2598,6 @@ func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http. } } -// handleSSEResponse handles both SSE-C and SSE-KMS decryption/validation and response processing -// The objectEntry parameter should be the correct entry for the requested version (if versioned) -func (s3a *S3ApiServer) handleSSEResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, objectEntry *filer_pb.Entry) (statusCode int, bytesTransferred int64) { - // Check what the client is expecting based on request headers - clientExpectsSSEC := IsSSECRequest(r) - - // Check what the stored object has in headers (may be conflicting after copy) - kmsMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader) - - // Detect actual object SSE type from the provided entry (respects versionId) - actualObjectType := "Unknown" - if objectEntry != nil { - actualObjectType = s3a.detectPrimarySSEType(objectEntry) - } - - // If objectEntry is nil, we cannot determine SSE type from chunks - // This should only happen for 404s which will be handled by the proxy - if objectEntry == nil { - glog.V(4).Infof("Object entry not available for SSE routing, passing through") - return passThroughResponse(proxyResponse, w) - } - - // Route based on ACTUAL object type (from chunks) rather than conflicting headers - if actualObjectType == s3_constants.SSETypeC && clientExpectsSSEC { - // Object is SSE-C and client expects SSE-C → SSE-C handler - return s3a.handleSSECResponse(r, proxyResponse, w, objectEntry) - } else if actualObjectType == s3_constants.SSETypeKMS && !clientExpectsSSEC { - // Object is SSE-KMS and client doesn't expect SSE-C → SSE-KMS handler - return s3a.handleSSEKMSResponse(r, proxyResponse, w, objectEntry, kmsMetadataHeader) - } else if actualObjectType == s3_constants.SSETypeS3 && !clientExpectsSSEC { - // Object is SSE-S3 and client doesn't expect SSE-C → SSE-S3 handler - return s3a.handleSSES3Response(r, proxyResponse, w, objectEntry) - } else if actualObjectType == "None" && !clientExpectsSSEC { - // Object is unencrypted and client doesn't expect SSE-C → pass through - return passThroughResponse(proxyResponse, w) - } else if actualObjectType == s3_constants.SSETypeC && !clientExpectsSSEC { - // Object is SSE-C but client doesn't provide SSE-C headers → Error - s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) - return http.StatusBadRequest, 0 - } else if actualObjectType == s3_constants.SSETypeKMS && clientExpectsSSEC { - // Object is SSE-KMS but client provides SSE-C headers → Error - s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) - return http.StatusBadRequest, 0 - } else if actualObjectType == s3_constants.SSETypeS3 && clientExpectsSSEC { - // Object is SSE-S3 but client provides SSE-C headers → Error (mismatched encryption) - s3err.WriteErrorResponse(w, r, s3err.ErrSSEEncryptionTypeMismatch) - return http.StatusBadRequest, 0 - } else if actualObjectType == "None" && clientExpectsSSEC { - // Object is unencrypted but client provides SSE-C headers → Error - s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) - return http.StatusBadRequest, 0 - } - - // Unknown state - pass through and let proxy handle it - glog.V(4).Infof("Unknown SSE state: objectType=%s, clientExpectsSSEC=%v", actualObjectType, clientExpectsSSEC) - return passThroughResponse(proxyResponse, w) -} - -// handleSSEKMSResponse handles SSE-KMS decryption and response processing -func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, entry *filer_pb.Entry, kmsMetadataHeader string) (statusCode int, bytesTransferred int64) { - // Deserialize SSE-KMS metadata - kmsMetadataBytes, err := base64.StdEncoding.DecodeString(kmsMetadataHeader) - if err != nil { - glog.Errorf("Failed to decode SSE-KMS metadata: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - - sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) - if err != nil { - glog.Errorf("Failed to deserialize SSE-KMS metadata: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - - // For HEAD requests, we don't need to decrypt the body, just add response headers - if r.Method == "HEAD" { - // Capture existing CORS headers that may have been set by middleware - capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) - - // Copy headers from proxy response (excluding internal SeaweedFS headers) - copyResponseHeaders(w, proxyResponse, false) - - // Add SSE-KMS response headers - AddSSEKMSResponseHeaders(w, sseKMSKey) - - return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders) - } - - // For GET requests, check if this is a multipart SSE-KMS object - // We need to check the object structure to determine if it's multipart encrypted - isMultipartSSEKMS := false - - if sseKMSKey != nil && entry != nil { - // Use the entry parameter passed from the caller (avoids redundant lookup) - // Check for multipart SSE-KMS - sseKMSChunks := 0 - for _, chunk := range entry.GetChunks() { - if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 { - sseKMSChunks++ - } - } - isMultipartSSEKMS = sseKMSChunks > 1 - } - - var decryptedReader io.Reader - if isMultipartSSEKMS { - // Handle multipart SSE-KMS objects - each chunk needs independent decryption - multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse, entry) - if decErr != nil { - glog.Errorf("Failed to create multipart SSE-KMS decrypted reader: %v", decErr) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - decryptedReader = multipartReader - glog.V(3).Infof("Using multipart SSE-KMS decryption for object") - } else { - // Handle single-part SSE-KMS objects - singlePartReader, decErr := CreateSSEKMSDecryptedReader(proxyResponse.Body, sseKMSKey) - if decErr != nil { - glog.Errorf("Failed to create SSE-KMS decrypted reader: %v", decErr) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - decryptedReader = singlePartReader - glog.V(3).Infof("Using single-part SSE-KMS decryption for object") - } - - // Capture existing CORS headers that may have been set by middleware - capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) - - // Copy headers from proxy response (excluding body-related headers that might change and internal SeaweedFS headers) - copyResponseHeaders(w, proxyResponse, true) - - // Set correct Content-Length for SSE-KMS - if proxyResponse.Header.Get("Content-Range") == "" { - // For full object requests, encrypted length equals original length - if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" { - w.Header().Set("Content-Length", contentLengthStr) - } - } - - // Add SSE-KMS response headers - AddSSEKMSResponseHeaders(w, sseKMSKey) - - return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders) -} - -// handleSSES3Response handles SSE-S3 decryption and response processing -func (s3a *S3ApiServer) handleSSES3Response(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, entry *filer_pb.Entry) (statusCode int, bytesTransferred int64) { - - // For HEAD requests, we don't need to decrypt the body, just add response headers - if r.Method == "HEAD" { - // Capture existing CORS headers that may have been set by middleware - capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) - - // Copy headers from proxy response (excluding internal SeaweedFS headers) - copyResponseHeaders(w, proxyResponse, false) - - // Add SSE-S3 response headers - w.Header().Set(s3_constants.AmzServerSideEncryption, SSES3Algorithm) - - return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders) - } - - // For GET requests, check if this is a multipart SSE-S3 object - isMultipartSSES3 := false - sses3Chunks := 0 - for _, chunk := range entry.GetChunks() { - if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(chunk.GetSseMetadata()) > 0 { - sses3Chunks++ - } - } - isMultipartSSES3 = sses3Chunks > 1 - - var decryptedReader io.Reader - if isMultipartSSES3 { - // Handle multipart SSE-S3 objects - each chunk needs independent decryption - multipartReader, decErr := s3a.createMultipartSSES3DecryptedReader(r, entry) - if decErr != nil { - glog.Errorf("Failed to create multipart SSE-S3 decrypted reader: %v", decErr) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - decryptedReader = multipartReader - glog.V(3).Infof("Using multipart SSE-S3 decryption for object") - } else { - // Handle single-part SSE-S3 objects - // Extract SSE-S3 key from metadata - keyManager := GetSSES3KeyManager() - if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]; !exists { - glog.Errorf("SSE-S3 key metadata not found in object entry") - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } else { - sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) - if err != nil { - glog.Errorf("Failed to deserialize SSE-S3 metadata: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - - // Extract IV from metadata using helper function - iv, err := GetSSES3IV(entry, sseS3Key, keyManager) - if err != nil { - glog.Errorf("Failed to get SSE-S3 IV: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - - singlePartReader, decErr := CreateSSES3DecryptedReader(proxyResponse.Body, sseS3Key, iv) - if decErr != nil { - glog.Errorf("Failed to create SSE-S3 decrypted reader: %v", decErr) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - decryptedReader = singlePartReader - glog.V(3).Infof("Using single-part SSE-S3 decryption for object") - } - } - - // Capture existing CORS headers that may have been set by middleware - capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) - - // Copy headers from proxy response (excluding body-related headers that might change and internal SeaweedFS headers) - copyResponseHeaders(w, proxyResponse, true) - - // Set correct Content-Length for SSE-S3 - if proxyResponse.Header.Get("Content-Range") == "" { - // For full object requests, encrypted length equals original length - if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" { - w.Header().Set("Content-Length", contentLengthStr) - } - } - - // Add SSE-S3 response headers - w.Header().Set(s3_constants.AmzServerSideEncryption, SSES3Algorithm) - - return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders) -} - // addObjectLockHeadersToResponse extracts object lock metadata from entry Extended attributes // and adds the appropriate S3 headers to the response func (s3a *S3ApiServer) addObjectLockHeadersToResponse(w http.ResponseWriter, entry *filer_pb.Entry) { @@ -1266,6 +2689,11 @@ func (s3a *S3ApiServer) addSSEHeadersToResponse(proxyResponse *http.Response, en // detectPrimarySSEType determines the primary SSE type by examining chunk metadata func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string { + // Safety check: handle nil entry + if entry == nil { + return "None" + } + if len(entry.GetChunks()) == 0 { // No chunks - check object-level metadata only (single objects or smallContent) hasSSEC := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] != nil @@ -1346,10 +2774,95 @@ func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string { return "None" } -// createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects -func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) { - // Entry is passed from caller to avoid redundant filer lookup +// createMultipartSSECDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-C objects (direct volume path) +// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O. +// It's kept in the signature for API consistency with non-Direct versions. +func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, customerKey *SSECustomerKey, entry *filer_pb.Entry) (io.Reader, error) { + // Sort chunks by offset to ensure correct order + chunks := entry.GetChunks() + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].GetOffset() < chunks[j].GetOffset() + }) + + // Create readers for each chunk, decrypting them independently + var readers []io.Reader + + for _, chunk := range chunks { + // Get this chunk's encrypted data + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) + if err != nil { + return nil, fmt.Errorf("failed to create chunk reader: %v", err) + } + + // Handle based on chunk's encryption type + if chunk.GetSseType() == filer_pb.SSEType_SSE_C { + // Check if this chunk has per-chunk SSE-C metadata + if len(chunk.GetSseMetadata()) == 0 { + chunkReader.Close() + return nil, fmt.Errorf("SSE-C chunk %s missing per-chunk metadata", chunk.GetFileIdString()) + } + + // Deserialize the SSE-C metadata + ssecMetadata, err := DeserializeSSECMetadata(chunk.GetSseMetadata()) + if err != nil { + chunkReader.Close() + return nil, fmt.Errorf("failed to deserialize SSE-C metadata for chunk %s: %v", chunk.GetFileIdString(), err) + } + + // Decode the IV from the metadata + chunkIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV) + if err != nil { + chunkReader.Close() + return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), err) + } + + glog.V(4).Infof("Decrypting SSE-C chunk %s with IV=%x, PartOffset=%d", + chunk.GetFileIdString(), chunkIV[:8], ssecMetadata.PartOffset) + + // Note: SSE-C multipart behavior (differs from SSE-KMS/SSE-S3): + // - Upload: CreateSSECEncryptedReader generates RANDOM IV per part (no base IV + offset) + // - Metadata: PartOffset is stored but not used during encryption + // - Decryption: Use stored random IV directly (no offset adjustment needed) + // + // This differs from: + // - SSE-KMS/SSE-S3: Use base IV + calculateIVWithOffset(partOffset) during encryption + // - CopyObject: Applies calculateIVWithOffset to SSE-C (which may be incorrect) + // + // TODO: Investigate CopyObject SSE-C PartOffset handling for consistency + decryptedChunkReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV) + if decErr != nil { + chunkReader.Close() + return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) + } + // Use the streaming decrypted reader directly + readers = append(readers, struct { + io.Reader + io.Closer + }{ + Reader: decryptedChunkReader, + Closer: chunkReader, + }) + glog.V(4).Infof("Added streaming decrypted reader for SSE-C chunk %s", chunk.GetFileIdString()) + } else { + // Non-SSE-C chunk, use as-is + readers = append(readers, chunkReader) + glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString()) + } + } + + // Close the original encrypted stream since we're reading chunks individually + if encryptedStream != nil { + encryptedStream.Close() + } + + return NewMultipartSSEReader(readers), nil +} + +// createMultipartSSEKMSDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-KMS objects (direct volume path) +// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O. +// It's kept in the signature for API consistency with non-Direct versions. +func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) { // Sort chunks by offset to ensure correct order chunks := entry.GetChunks() sort.Slice(chunks, func(i, j int) bool { @@ -1361,55 +2874,64 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, pr for _, chunk := range chunks { // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(chunk) + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) if err != nil { return nil, fmt.Errorf("failed to create chunk reader: %v", err) } - // Get SSE-KMS metadata for this chunk - var chunkSSEKMSKey *SSEKMSKey + // Handle based on chunk's encryption type + if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS { + // Check if this chunk has per-chunk SSE-KMS metadata + if len(chunk.GetSseMetadata()) == 0 { + chunkReader.Close() + return nil, fmt.Errorf("SSE-KMS chunk %s missing per-chunk metadata", chunk.GetFileIdString()) + } - // Check if this chunk has per-chunk SSE-KMS metadata (new architecture) - if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 { // Use the per-chunk SSE-KMS metadata kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata()) if err != nil { - glog.Errorf("Failed to deserialize per-chunk SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err) - } else { - // ChunkOffset is already set from the stored metadata (PartOffset) - chunkSSEKMSKey = kmsKey + chunkReader.Close() + return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err) } - } - // Note: No fallback to object-level metadata for multipart objects - // Each chunk in a multipart SSE-KMS object must have its own unique IV - // Falling back to object-level metadata could lead to IV reuse or incorrect decryption + glog.V(4).Infof("Decrypting SSE-KMS chunk %s with KeyID=%s", + chunk.GetFileIdString(), kmsKey.KeyID) - if chunkSSEKMSKey == nil { - return nil, fmt.Errorf("no SSE-KMS metadata found for chunk %s in multipart object", chunk.GetFileIdString()) - } + // Create decrypted reader for this chunk + decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, kmsKey) + if decErr != nil { + chunkReader.Close() + return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) + } - // Create decrypted reader for this chunk - decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, chunkSSEKMSKey) - if decErr != nil { - chunkReader.Close() // Close the chunk reader if decryption fails - return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) + // Use the streaming decrypted reader directly + readers = append(readers, struct { + io.Reader + io.Closer + }{ + Reader: decryptedChunkReader, + Closer: chunkReader, + }) + glog.V(4).Infof("Added streaming decrypted reader for SSE-KMS chunk %s", chunk.GetFileIdString()) + } else { + // Non-SSE-KMS chunk, use as-is + readers = append(readers, chunkReader) + glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString()) } - - // Use the streaming decrypted reader directly instead of reading into memory - readers = append(readers, decryptedChunkReader) - glog.V(4).Infof("Added streaming decrypted reader for chunk %s in multipart SSE-KMS object", chunk.GetFileIdString()) } - // Combine all decrypted chunk readers into a single stream with proper resource management - multiReader := NewMultipartSSEReader(readers) - glog.V(3).Infof("Created multipart SSE-KMS decrypted reader with %d chunks", len(readers)) + // Close the original encrypted stream since we're reading chunks individually + if encryptedStream != nil { + encryptedStream.Close() + } - return multiReader, nil + return NewMultipartSSEReader(readers), nil } -// createMultipartSSES3DecryptedReader creates a reader for multipart SSE-S3 objects -func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, entry *filer_pb.Entry) (io.Reader, error) { +// createMultipartSSES3DecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-S3 objects (direct volume path) +// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O. +// It's kept in the signature for API consistency with non-Direct versions. +func (s3a *S3ApiServer) createMultipartSSES3DecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) { // Sort chunks by offset to ensure correct order chunks := entry.GetChunks() sort.Slice(chunks, func(i, j int) bool { @@ -1418,54 +2940,50 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, ent // Create readers for each chunk, decrypting them independently var readers []io.Reader + + // Get key manager and SSE-S3 key from entry metadata keyManager := GetSSES3KeyManager() + keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key] + sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-S3 key from entry metadata: %v", err) + } for _, chunk := range chunks { // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(chunk) + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) if err != nil { return nil, fmt.Errorf("failed to create chunk reader: %v", err) } // Handle based on chunk's encryption type if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 { - var chunkSSES3Key *SSES3Key - // Check if this chunk has per-chunk SSE-S3 metadata - if len(chunk.GetSseMetadata()) > 0 { - // Use the per-chunk SSE-S3 metadata - sseKey, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager) - if err != nil { - glog.Errorf("Failed to deserialize per-chunk SSE-S3 metadata for chunk %s: %v", chunk.GetFileIdString(), err) - chunkReader.Close() - return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %v", err) - } - chunkSSES3Key = sseKey - } - - // Note: No fallback to object-level metadata for multipart objects - // Each chunk in a multipart SSE-S3 object must have its own unique IV - // Falling back to object-level metadata could lead to IV reuse or incorrect decryption - - if chunkSSES3Key == nil { + if len(chunk.GetSseMetadata()) == 0 { chunkReader.Close() - return nil, fmt.Errorf("no SSE-S3 metadata found for chunk %s in multipart object", chunk.GetFileIdString()) + return nil, fmt.Errorf("SSE-S3 chunk %s missing per-chunk metadata", chunk.GetFileIdString()) } - // Extract IV from chunk metadata - if len(chunkSSES3Key.IV) == 0 { + // Deserialize the per-chunk SSE-S3 metadata to get the IV + chunkSSES3Metadata, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager) + if err != nil { chunkReader.Close() - return nil, fmt.Errorf("no IV found in SSE-S3 metadata for chunk %s", chunk.GetFileIdString()) + return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata for chunk %s: %v", chunk.GetFileIdString(), err) } + // Use the IV from the chunk metadata + iv := chunkSSES3Metadata.IV + glog.V(4).Infof("Decrypting SSE-S3 chunk %s with KeyID=%s, IV length=%d", + chunk.GetFileIdString(), sseS3Key.KeyID, len(iv)) + // Create decrypted reader for this chunk - decryptedChunkReader, decErr := CreateSSES3DecryptedReader(chunkReader, chunkSSES3Key, chunkSSES3Key.IV) + decryptedChunkReader, decErr := CreateSSES3DecryptedReader(chunkReader, sseS3Key, iv) if decErr != nil { chunkReader.Close() - return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) + return nil, fmt.Errorf("failed to decrypt SSE-S3 chunk: %v", decErr) } - // Use the streaming decrypted reader directly, ensuring the underlying chunkReader can be closed + // Use the streaming decrypted reader directly readers = append(readers, struct { io.Reader io.Closer @@ -1473,37 +2991,45 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, ent Reader: decryptedChunkReader, Closer: chunkReader, }) - glog.V(4).Infof("Added streaming decrypted reader for chunk %s in multipart SSE-S3 object", chunk.GetFileIdString()) + glog.V(4).Infof("Added streaming decrypted reader for SSE-S3 chunk %s", chunk.GetFileIdString()) } else { - // Non-SSE-S3 chunk (unencrypted or other encryption type), use as-is + // Non-SSE-S3 chunk, use as-is readers = append(readers, chunkReader) - glog.V(4).Infof("Added passthrough reader for non-SSE-S3 chunk %s (type: %v)", chunk.GetFileIdString(), chunk.GetSseType()) + glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString()) } } - // Combine all decrypted chunk readers into a single stream - multiReader := NewMultipartSSEReader(readers) - glog.V(3).Infof("Created multipart SSE-S3 decrypted reader with %d chunks", len(readers)) + // Close the original encrypted stream since we're reading chunks individually + if encryptedStream != nil { + encryptedStream.Close() + } - return multiReader, nil + return NewMultipartSSEReader(readers), nil } // createEncryptedChunkReader creates a reader for a single encrypted chunk -func (s3a *S3ApiServer) createEncryptedChunkReader(chunk *filer_pb.FileChunk) (io.ReadCloser, error) { +// Context propagation ensures cancellation if the S3 client disconnects +func (s3a *S3ApiServer) createEncryptedChunkReader(ctx context.Context, chunk *filer_pb.FileChunk) (io.ReadCloser, error) { // Get chunk URL srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString()) if err != nil { return nil, fmt.Errorf("lookup volume URL for chunk %s: %v", chunk.GetFileIdString(), err) } - // Create HTTP request for chunk data - req, err := http.NewRequest("GET", srcUrl, nil) + // Create HTTP request with context for cancellation propagation + req, err := http.NewRequestWithContext(ctx, "GET", srcUrl, nil) if err != nil { return nil, fmt.Errorf("create HTTP request for chunk: %v", err) } - // Execute request - resp, err := http.DefaultClient.Do(req) + // Attach volume server JWT for authentication (matches filer behavior) + jwt := security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, chunk.GetFileIdString()) + if jwt != "" { + req.Header.Set("Authorization", "BEARER "+string(jwt)) + } + + // Use shared HTTP client with connection pooling + resp, err := volumeServerHTTPClient.Do(req) if err != nil { return nil, fmt.Errorf("execute HTTP request for chunk: %v", err) } @@ -1525,9 +3051,10 @@ type MultipartSSEReader struct { // SSERangeReader applies range logic to an underlying reader type SSERangeReader struct { reader io.Reader - offset int64 // bytes to skip from the beginning - remaining int64 // bytes remaining to read (-1 for unlimited) - skipped int64 // bytes already skipped + offset int64 // bytes to skip from the beginning + remaining int64 // bytes remaining to read (-1 for unlimited) + skipped int64 // bytes already skipped + skipBuf []byte // reusable buffer for skipping bytes (avoids per-call allocation) } // NewMultipartSSEReader creates a new multipart reader that can properly close all underlying readers @@ -1559,21 +3086,34 @@ func (m *MultipartSSEReader) Close() error { // Read implements the io.Reader interface for SSERangeReader func (r *SSERangeReader) Read(p []byte) (n int, err error) { - - // If we need to skip bytes and haven't skipped enough yet - if r.skipped < r.offset { + // Skip bytes iteratively (no recursion) until we reach the offset + for r.skipped < r.offset { skipNeeded := r.offset - r.skipped - skipBuf := make([]byte, min(int64(len(p)), skipNeeded)) - skipRead, skipErr := r.reader.Read(skipBuf) + + // Lazily allocate skip buffer on first use, reuse thereafter + if r.skipBuf == nil { + // Use a fixed 32KB buffer for skipping (avoids per-call allocation) + r.skipBuf = make([]byte, 32*1024) + } + + // Determine how much to skip in this iteration + bufSize := int64(len(r.skipBuf)) + if skipNeeded < bufSize { + bufSize = skipNeeded + } + + skipRead, skipErr := r.reader.Read(r.skipBuf[:bufSize]) r.skipped += int64(skipRead) if skipErr != nil { return 0, skipErr } - // If we still need to skip more, recurse - if r.skipped < r.offset { - return r.Read(p) + // Guard against infinite loop: io.Reader may return (0, nil) + // which is permitted by the interface contract for non-empty buffers. + // If we get zero bytes without an error, treat it as an unexpected EOF. + if skipRead == 0 { + return 0, io.ErrUnexpectedEOF } } @@ -1600,6 +3140,8 @@ func (r *SSERangeReader) Read(p []byte) (n int, err error) { // createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects // Each chunk has its own IV and encryption key from the original multipart parts func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) { + ctx := r.Context() + // Parse SSE-C headers from the request for decryption key customerKey, err := ParseSSECHeaders(r) if err != nil { @@ -1659,7 +3201,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox for _, chunk := range neededChunks { // Get this chunk's encrypted data - chunkReader, err := s3a.createEncryptedChunkReader(chunk) + chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk) if err != nil { return nil, fmt.Errorf("failed to create chunk reader: %v", err) } @@ -1679,13 +3221,10 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), ivErr) } - // Calculate the correct IV for this chunk using within-part offset - var chunkIV []byte - if ssecMetadata.PartOffset > 0 { - chunkIV = calculateIVWithOffset(iv, ssecMetadata.PartOffset) - } else { - chunkIV = iv - } + // Note: For multipart SSE-C, each part was encrypted with offset=0 + // So we use the stored IV directly without offset adjustment + // PartOffset is stored for informational purposes, but encryption uses offset=0 + chunkIV := iv decryptedReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV) if decErr != nil { @@ -1725,3 +3264,55 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox return multiReader, nil } + +// PartBoundaryInfo holds information about a part's chunk boundaries +type PartBoundaryInfo struct { + PartNumber int `json:"part"` + StartChunk int `json:"start"` + EndChunk int `json:"end"` // exclusive + ETag string `json:"etag"` +} + +// rc is a helper type that wraps a Reader and Closer for proper resource cleanup +type rc struct { + io.Reader + io.Closer +} + +// getMultipartInfo retrieves multipart metadata for a given part number +// Returns: (partsCount, partInfo) +// - partsCount: total number of parts in the multipart object +// - partInfo: boundary information for the requested part (nil if not found or not a multipart object) +func (s3a *S3ApiServer) getMultipartInfo(entry *filer_pb.Entry, partNumber int) (int, *PartBoundaryInfo) { + if entry == nil { + return 0, nil + } + if entry.Extended == nil { + // Not a multipart object or no metadata + return len(entry.GetChunks()), nil + } + + // Try to get parts count from metadata + partsCount := len(entry.GetChunks()) // default fallback + if partsCountBytes, exists := entry.Extended[s3_constants.SeaweedFSMultipartPartsCount]; exists { + if count, err := strconv.Atoi(string(partsCountBytes)); err == nil && count > 0 { + partsCount = count + } + } + + // Try to get part boundaries from metadata + if boundariesJSON, exists := entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries]; exists { + var boundaries []PartBoundaryInfo + if err := json.Unmarshal(boundariesJSON, &boundaries); err == nil { + // Find the requested part + for i := range boundaries { + if boundaries[i].PartNumber == partNumber { + return partsCount, &boundaries[i] + } + } + } + } + + // No part boundaries metadata or part not found + return partsCount, nil +} |
