aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers.go')
-rw-r--r--weed/s3api/s3api_object_handlers.go2605
1 files changed, 2098 insertions, 507 deletions
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index 98d0ffede..ce2772981 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -2,12 +2,17 @@ package s3api
import (
"bytes"
+ "context"
"encoding/base64"
+ "encoding/json"
"errors"
"fmt"
"io"
+ "math"
+ "mime"
"net/http"
"net/url"
+ "path/filepath"
"sort"
"strconv"
"strings"
@@ -15,13 +20,15 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/security"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/glog"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
// corsHeaders defines the CORS headers that need to be preserved
@@ -35,6 +42,113 @@ var corsHeaders = []string{
"Access-Control-Allow-Credentials",
}
+// zeroBuf is a reusable buffer of zero bytes for padding operations
+// Package-level to avoid per-call allocations in writeZeroBytes
+var zeroBuf = make([]byte, 32*1024)
+
+// adjustRangeForPart adjusts a client's Range header to absolute offsets within a part.
+// Parameters:
+// - partStartOffset: the absolute start offset of the part in the object
+// - partEndOffset: the absolute end offset of the part in the object
+// - clientRangeHeader: the Range header value from the client (e.g., "bytes=0-99")
+//
+// Returns:
+// - adjustedStart: the adjusted absolute start offset
+// - adjustedEnd: the adjusted absolute end offset
+// - error: nil on success, error if the range is invalid
+func adjustRangeForPart(partStartOffset, partEndOffset int64, clientRangeHeader string) (adjustedStart, adjustedEnd int64, err error) {
+ // If no range header, return the full part
+ if clientRangeHeader == "" || !strings.HasPrefix(clientRangeHeader, "bytes=") {
+ return partStartOffset, partEndOffset, nil
+ }
+
+ // Parse client's range request (relative to the part)
+ rangeSpec := clientRangeHeader[6:] // Remove "bytes=" prefix
+ parts := strings.Split(rangeSpec, "-")
+
+ if len(parts) != 2 {
+ return 0, 0, fmt.Errorf("invalid range format")
+ }
+
+ partSize := partEndOffset - partStartOffset + 1
+ var clientStart, clientEnd int64
+
+ // Parse start offset
+ if parts[0] != "" {
+ clientStart, err = strconv.ParseInt(parts[0], 10, 64)
+ if err != nil {
+ return 0, 0, fmt.Errorf("invalid range start: %w", err)
+ }
+ }
+
+ // Parse end offset
+ if parts[1] != "" {
+ clientEnd, err = strconv.ParseInt(parts[1], 10, 64)
+ if err != nil {
+ return 0, 0, fmt.Errorf("invalid range end: %w", err)
+ }
+ } else {
+ // No end specified, read to end of part
+ clientEnd = partSize - 1
+ }
+
+ // Handle suffix-range (e.g., "bytes=-100" means last 100 bytes)
+ if parts[0] == "" {
+ // suffix-range: clientEnd is actually the suffix length
+ suffixLength := clientEnd
+ if suffixLength > partSize {
+ suffixLength = partSize
+ }
+ clientStart = partSize - suffixLength
+ clientEnd = partSize - 1
+ }
+
+ // Validate range is within part boundaries
+ if clientStart < 0 || clientStart >= partSize {
+ return 0, 0, fmt.Errorf("range start %d out of bounds for part size %d", clientStart, partSize)
+ }
+ if clientEnd >= partSize {
+ clientEnd = partSize - 1
+ }
+ if clientStart > clientEnd {
+ return 0, 0, fmt.Errorf("range start %d > end %d", clientStart, clientEnd)
+ }
+
+ // Adjust to absolute offsets in the object
+ adjustedStart = partStartOffset + clientStart
+ adjustedEnd = partStartOffset + clientEnd
+
+ return adjustedStart, adjustedEnd, nil
+}
+
+// StreamError is returned when streaming functions encounter errors.
+// It tracks whether an HTTP response has already been written to prevent
+// double WriteHeader calls that would create malformed S3 error responses.
+type StreamError struct {
+ // Err is the underlying error
+ Err error
+ // ResponseWritten indicates if HTTP headers/status have been written to ResponseWriter
+ ResponseWritten bool
+}
+
+func (e *StreamError) Error() string {
+ return e.Err.Error()
+}
+
+func (e *StreamError) Unwrap() error {
+ return e.Err
+}
+
+// newStreamError creates a StreamError for cases where response hasn't been written yet
+func newStreamError(err error) *StreamError {
+ return &StreamError{Err: err, ResponseWritten: false}
+}
+
+// newStreamErrorWithResponse creates a StreamError for cases where response was already written
+func newStreamErrorWithResponse(err error) *StreamError {
+ return &StreamError{Err: err, ResponseWritten: true}
+}
+
func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser {
mimeBuffer := make([]byte, 512)
size, _ := dataReader.Read(mimeBuffer)
@@ -88,6 +202,62 @@ func removeDuplicateSlashes(object string) string {
return result.String()
}
+// hasChildren checks if a path has any child objects (is a directory with contents)
+//
+// This helper function is used to distinguish implicit directories from regular files or empty directories.
+// An implicit directory is one that exists only because it has children, not because it was explicitly created.
+//
+// Implementation:
+// - Lists the directory with Limit=1 to check for at least one child
+// - Returns true if any child exists, false otherwise
+// - Efficient: only fetches one entry to minimize overhead
+//
+// Used by HeadObjectHandler to implement AWS S3-compatible implicit directory behavior:
+// - If a 0-byte object or directory has children → it's an implicit directory → HEAD returns 404
+// - If a 0-byte object or directory has no children → it's empty → HEAD returns 200
+//
+// Examples:
+//
+// hasChildren("bucket", "dataset") where "dataset/file.txt" exists → true
+// hasChildren("bucket", "empty-dir") where no children exist → false
+//
+// Performance: ~1-5ms per call (one gRPC LIST request with Limit=1)
+func (s3a *S3ApiServer) hasChildren(bucket, prefix string) bool {
+ // Clean up prefix: remove leading slashes
+ cleanPrefix := strings.TrimPrefix(prefix, "/")
+
+ // The directory to list is bucketDir + cleanPrefix
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ fullPath := bucketDir + "/" + cleanPrefix
+
+ // Try to list one child object in the directory
+ err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.ListEntriesRequest{
+ Directory: fullPath,
+ Limit: 1,
+ InclusiveStartFrom: true,
+ }
+
+ stream, err := client.ListEntries(context.Background(), request)
+ if err != nil {
+ return err
+ }
+
+ // Check if we got at least one entry
+ _, err = stream.Recv()
+ if err == io.EOF {
+ return io.EOF // No children
+ }
+ if err != nil {
+ return err
+ }
+ return nil
+ })
+
+ // If we got an entry (not EOF), then it has children
+ return err == nil
+}
+
// checkDirectoryObject checks if the object is a directory object (ends with "/") and if it exists
// Returns: (entry, isDirectoryObject, error)
// - entry: the directory entry if found and is a directory
@@ -123,6 +293,13 @@ func (s3a *S3ApiServer) checkDirectoryObject(bucket, object string) (*filer_pb.E
// serveDirectoryContent serves the content of a directory object directly
func (s3a *S3ApiServer) serveDirectoryContent(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry) {
+ // Defensive nil checks - entry and attributes should never be nil, but guard against it
+ if entry == nil || entry.Attributes == nil {
+ glog.Errorf("serveDirectoryContent: entry or attributes is nil")
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+
// Set content type - use stored MIME type or default
contentType := entry.Attributes.Mime
if contentType == "" {
@@ -272,13 +449,29 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
bucket, object := s3_constants.GetBucketAndObject(r)
glog.V(3).Infof("GetObjectHandler %s %s", bucket, object)
+ // TTFB Profiling: Track all stages until first byte
+ tStart := time.Now()
+ var (
+ conditionalHeadersTime time.Duration
+ versioningCheckTime time.Duration
+ entryFetchTime time.Duration
+ streamTime time.Duration
+ )
+ defer func() {
+ totalTime := time.Since(tStart)
+ glog.V(2).Infof("GET TTFB PROFILE %s/%s: total=%v | conditional=%v, versioning=%v, entryFetch=%v, stream=%v",
+ bucket, object, totalTime, conditionalHeadersTime, versioningCheckTime, entryFetchTime, streamTime)
+ }()
+
// Handle directory objects with shared logic
if s3a.handleDirectoryObjectRequest(w, r, bucket, object, "GetObjectHandler") {
return // Directory object request was handled
}
// Check conditional headers and handle early return if conditions fail
+ tConditional := time.Now()
result, handled := s3a.processConditionalHeaders(w, r, bucket, object, "GetObjectHandler")
+ conditionalHeadersTime = time.Since(tConditional)
if handled {
return
}
@@ -287,13 +480,13 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
versionId := r.URL.Query().Get("versionId")
var (
- destUrl string
entry *filer_pb.Entry // Declare entry at function scope for SSE processing
versioningConfigured bool
err error
)
// Check if versioning is configured for the bucket (Enabled or Suspended)
+ tVersioning := time.Now()
// Note: We need to check this even if versionId is empty, because versioned buckets
// handle even "get latest version" requests differently (through .versions directory)
versioningConfigured, err = s3a.isVersioningConfigured(bucket)
@@ -306,15 +499,15 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
- glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId)
+ glog.V(3).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId)
if versioningConfigured {
- // Handle versioned GET - all versions are stored in .versions directory
+ // Handle versioned GET - check if specific version requested
var targetVersionId string
if versionId != "" {
- // Request for specific version
- glog.V(2).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object)
+ // Request for specific version - must look in .versions directory
+ glog.V(3).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object)
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
if err != nil {
glog.Errorf("Failed to get specific version %s: %v", versionId, err)
@@ -323,22 +516,61 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
}
targetVersionId = versionId
} else {
- // Request for latest version
- glog.V(1).Infof("GetObject: requesting latest version for %s%s", bucket, object)
- entry, err = s3a.getLatestObjectVersion(bucket, object)
- if err != nil {
- glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err)
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
- return
- }
- if entry.Extended != nil {
- if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
- targetVersionId = string(versionIdBytes)
+ // Request for latest version - OPTIMIZATION:
+ // Check if .versions/ directory exists quickly (no retries) to decide path
+ // - If .versions/ exists: real versions available, use getLatestObjectVersion
+ // - If .versions/ doesn't exist (ErrNotFound): only null version at regular path, use it directly
+ // - If transient error: fall back to getLatestObjectVersion which has retry logic
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ normalizedObject := removeDuplicateSlashes(object)
+ versionsDir := normalizedObject + s3_constants.VersionsFolder
+
+ // Quick check (no retries) for .versions/ directory
+ versionsEntry, versionsErr := s3a.getEntry(bucketDir, versionsDir)
+
+ if versionsErr == nil && versionsEntry != nil {
+ // .versions/ exists, meaning real versions are stored there
+ // Use getLatestObjectVersion which will properly find the newest version
+ entry, err = s3a.getLatestObjectVersion(bucket, object)
+ if err != nil {
+ glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
+ }
+ } else if errors.Is(versionsErr, filer_pb.ErrNotFound) {
+ // .versions/ doesn't exist (confirmed not found), check regular path for null version
+ regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject)
+ if regularErr == nil && regularEntry != nil {
+ // Found object at regular path - this is the null version
+ entry = regularEntry
+ targetVersionId = "null"
+ } else {
+ // No object at regular path either - object doesn't exist
+ glog.Errorf("GetObject: object not found at regular path or .versions for %s%s", bucket, object)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
+ }
+ } else {
+ // Transient error checking .versions/, fall back to getLatestObjectVersion with retries
+ glog.V(2).Infof("GetObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr)
+ entry, err = s3a.getLatestObjectVersion(bucket, object)
+ if err != nil {
+ glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
}
}
- // If no version ID found in entry, this is a pre-versioning object
+ // Extract version ID if not already set
if targetVersionId == "" {
- targetVersionId = "null"
+ if entry.Extended != nil {
+ if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
+ targetVersionId = string(versionIdBytes)
+ }
+ }
+ // If no version ID found in entry, this is a pre-versioning object
+ if targetVersionId == "" {
+ targetVersionId = "null"
+ }
}
}
@@ -350,16 +582,11 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
}
}
- // Determine the actual file path based on whether this is a versioned or pre-versioning object
+ // For versioned objects, log the target version
if targetVersionId == "null" {
- // Pre-versioning object - stored as regular file
- destUrl = s3a.toFilerUrl(bucket, object)
- glog.V(2).Infof("GetObject: pre-versioning object URL: %s", destUrl)
+ glog.V(2).Infof("GetObject: pre-versioning object %s/%s", bucket, object)
} else {
- // Versioned object - stored in .versions directory
- versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
- destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
- glog.V(2).Infof("GetObject: version %s URL: %s", targetVersionId, destUrl)
+ glog.V(2).Infof("GetObject: version %s for %s/%s", targetVersionId, bucket, object)
}
// Set version ID in response header
@@ -367,16 +594,14 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
// Add object lock metadata to response headers if present
s3a.addObjectLockHeadersToResponse(w, entry)
- } else {
- // Handle regular GET (non-versioned)
- destUrl = s3a.toFilerUrl(bucket, object)
}
+ versioningCheckTime = time.Since(tVersioning)
+
// Fetch the correct entry for SSE processing (respects versionId)
// This consolidates entry lookups to avoid multiple filer calls
+ tEntryFetch := time.Now()
var objectEntryForSSE *filer_pb.Entry
- originalRangeHeader := r.Header.Get("Range")
- var sseObject = false
// Optimization: Reuse already-fetched entry to avoid redundant metadata fetches
if versioningConfigured {
@@ -397,7 +622,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
var fetchErr error
objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object)
if fetchErr != nil {
- glog.Errorf("GetObjectHandler: failed to get entry for SSE check: %v", fetchErr)
+ glog.Warningf("GetObjectHandler: failed to get entry for %s/%s: %v", bucket, object, fetchErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
@@ -408,34 +633,1415 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
}
}
}
+ entryFetchTime = time.Since(tEntryFetch)
+
+ // Check if PartNumber query parameter is present (for multipart GET requests)
+ partNumberStr := r.URL.Query().Get("partNumber")
+ if partNumberStr == "" {
+ partNumberStr = r.URL.Query().Get("PartNumber")
+ }
+
+ // If PartNumber is specified, set headers and modify Range to read only that part
+ // This replicates the filer handler logic
+ if partNumberStr != "" {
+ if partNumber, parseErr := strconv.Atoi(partNumberStr); parseErr == nil && partNumber > 0 {
+ // Get actual parts count from metadata (not chunk count)
+ partsCount, partInfo := s3a.getMultipartInfo(objectEntryForSSE, partNumber)
+
+ // Validate part number
+ if partNumber > partsCount {
+ glog.Warningf("GetObject: Invalid part number %d, object has %d parts", partNumber, partsCount)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
+ return
+ }
+
+ // Set parts count header
+ w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(partsCount))
+ glog.V(3).Infof("GetObject: Set PartsCount=%d for multipart GET with PartNumber=%d", partsCount, partNumber)
+
+ // Calculate the byte range for this part
+ var startOffset, endOffset int64
+ if partInfo != nil {
+ // Use part boundaries from metadata (accurate for multi-chunk parts)
+ startOffset = objectEntryForSSE.Chunks[partInfo.StartChunk].Offset
+ lastChunk := objectEntryForSSE.Chunks[partInfo.EndChunk-1]
+ endOffset = lastChunk.Offset + int64(lastChunk.Size) - 1
+
+ // Override ETag with the part's ETag from metadata
+ w.Header().Set("ETag", "\""+partInfo.ETag+"\"")
+ glog.V(3).Infof("GetObject: Override ETag with part %d ETag: %s (from metadata)", partNumber, partInfo.ETag)
+ } else {
+ // Fallback: assume 1:1 part-to-chunk mapping (backward compatibility)
+ chunkIndex := partNumber - 1
+ if chunkIndex >= len(objectEntryForSSE.Chunks) {
+ glog.Warningf("GetObject: Part %d chunk index %d out of range (chunks: %d)", partNumber, chunkIndex, len(objectEntryForSSE.Chunks))
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
+ return
+ }
+ partChunk := objectEntryForSSE.Chunks[chunkIndex]
+ startOffset = partChunk.Offset
+ endOffset = partChunk.Offset + int64(partChunk.Size) - 1
+
+ // Override ETag with chunk's ETag (fallback)
+ if partChunk.ETag != "" {
+ if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil {
+ partETag := fmt.Sprintf("%x", md5Bytes)
+ w.Header().Set("ETag", "\""+partETag+"\"")
+ glog.V(3).Infof("GetObject: Override ETag with part %d ETag: %s (fallback from chunk)", partNumber, partETag)
+ }
+ }
+ }
+
+ // Check if client supplied a Range header - if so, apply it within the part's boundaries
+ // S3 allows both partNumber and Range together, where Range applies within the selected part
+ clientRangeHeader := r.Header.Get("Range")
+ if clientRangeHeader != "" {
+ adjustedStart, adjustedEnd, rangeErr := adjustRangeForPart(startOffset, endOffset, clientRangeHeader)
+ if rangeErr != nil {
+ glog.Warningf("GetObject: Invalid Range for part %d: %v", partNumber, rangeErr)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
+ return
+ }
+ startOffset = adjustedStart
+ endOffset = adjustedEnd
+ glog.V(3).Infof("GetObject: Client Range %s applied to part %d, adjusted to bytes=%d-%d", clientRangeHeader, partNumber, startOffset, endOffset)
+ }
+
+ // Set Range header to read the requested bytes (full part or client-specified range within part)
+ rangeHeader := fmt.Sprintf("bytes=%d-%d", startOffset, endOffset)
+ r.Header.Set("Range", rangeHeader)
+ glog.V(3).Infof("GetObject: Set Range header for part %d: %s", partNumber, rangeHeader)
+ }
+ }
+
+ // NEW OPTIMIZATION: Stream directly from volume servers, bypassing filer proxy
+ // This eliminates the 19ms filer proxy overhead
+ // SSE decryption is handled inline during streaming
+
+ // Safety check: entry must be valid before streaming
+ if objectEntryForSSE == nil {
+ glog.Errorf("GetObjectHandler: objectEntryForSSE is nil for %s/%s (should not happen)", bucket, object)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+
+ // Detect SSE encryption type
+ primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE)
+
+ // Stream directly from volume servers with SSE support
+ tStream := time.Now()
+ err = s3a.streamFromVolumeServersWithSSE(w, r, objectEntryForSSE, primarySSEType)
+ streamTime = time.Since(tStream)
+ if err != nil {
+ glog.Errorf("GetObjectHandler: failed to stream from volume servers: %v", err)
+ // Check if the streaming function already wrote an HTTP response
+ var streamErr *StreamError
+ if errors.As(err, &streamErr) && streamErr.ResponseWritten {
+ // Response already written (headers + status code), don't write again
+ // to avoid "superfluous response.WriteHeader call" and malformed S3 error bodies
+ return
+ }
+ // Response not yet written - safe to write S3 error response
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+}
+
+// streamFromVolumeServers streams object data directly from volume servers, bypassing filer proxy
+// This eliminates the ~19ms filer proxy overhead by reading chunks directly
+func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error {
+ // Profiling: Track overall and stage timings
+ t0 := time.Now()
+ var (
+ rangeParseTime time.Duration
+ headerSetTime time.Duration
+ chunkResolveTime time.Duration
+ streamPrepTime time.Duration
+ streamExecTime time.Duration
+ )
+ defer func() {
+ totalTime := time.Since(t0)
+ glog.V(2).Infof(" └─ streamFromVolumeServers: total=%v, rangeParse=%v, headerSet=%v, chunkResolve=%v, streamPrep=%v, streamExec=%v",
+ totalTime, rangeParseTime, headerSetTime, chunkResolveTime, streamPrepTime, streamExecTime)
+ }()
+
+ if entry == nil {
+ // Early validation error: write S3-compliant XML error response
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return newStreamErrorWithResponse(fmt.Errorf("entry is nil"))
+ }
+
+ // Get file size
+ totalSize := int64(filer.FileSize(entry))
+
+ // Parse Range header if present
+ tRangeParse := time.Now()
+ var offset int64 = 0
+ var size int64 = totalSize
+ rangeHeader := r.Header.Get("Range")
+ isRangeRequest := false
+
+ if rangeHeader != "" && strings.HasPrefix(rangeHeader, "bytes=") {
+ rangeSpec := rangeHeader[6:]
+ parts := strings.Split(rangeSpec, "-")
+ if len(parts) == 2 {
+ var startOffset, endOffset int64
+
+ // Handle different Range formats:
+ // 1. "bytes=0-499" - first 500 bytes (parts[0]="0", parts[1]="499")
+ // 2. "bytes=500-" - from byte 500 to end (parts[0]="500", parts[1]="")
+ // 3. "bytes=-500" - last 500 bytes (parts[0]="", parts[1]="500")
+
+ if parts[0] == "" && parts[1] != "" {
+ // Suffix range: bytes=-N (last N bytes)
+ if suffixLen, err := strconv.ParseInt(parts[1], 10, 64); err == nil {
+ // RFC 7233: suffix range on empty object or zero-length suffix is unsatisfiable
+ if totalSize == 0 || suffixLen <= 0 {
+ w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
+ return newStreamErrorWithResponse(fmt.Errorf("invalid suffix range for empty object"))
+ }
+ if suffixLen > totalSize {
+ suffixLen = totalSize
+ }
+ startOffset = totalSize - suffixLen
+ endOffset = totalSize - 1
+ } else {
+ // Set header BEFORE WriteHeader
+ w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
+ return newStreamErrorWithResponse(fmt.Errorf("invalid suffix range"))
+ }
+ } else {
+ // Regular range or open-ended range
+ startOffset = 0
+ endOffset = totalSize - 1
+
+ if parts[0] != "" {
+ if parsed, err := strconv.ParseInt(parts[0], 10, 64); err == nil {
+ startOffset = parsed
+ }
+ }
+ if parts[1] != "" {
+ if parsed, err := strconv.ParseInt(parts[1], 10, 64); err == nil {
+ endOffset = parsed
+ }
+ }
- // Check if this is an SSE object for Range request handling
- // This applies to both versioned and non-versioned objects
- if originalRangeHeader != "" && objectEntryForSSE != nil {
- primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE)
- if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS {
- sseObject = true
- // Temporarily remove Range header to get full encrypted data from filer
- r.Header.Del("Range")
+ // Validate range
+ if startOffset < 0 || startOffset >= totalSize {
+ // Set header BEFORE WriteHeader
+ w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
+ return newStreamErrorWithResponse(fmt.Errorf("invalid range start"))
+ }
+
+ if endOffset >= totalSize {
+ endOffset = totalSize - 1
+ }
+
+ if endOffset < startOffset {
+ // Set header BEFORE WriteHeader
+ w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
+ return newStreamErrorWithResponse(fmt.Errorf("invalid range: end before start"))
+ }
+ }
+
+ offset = startOffset
+ size = endOffset - startOffset + 1
+ isRangeRequest = true
}
}
+ rangeParseTime = time.Since(tRangeParse)
- s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
- // Restore the original Range header for SSE processing
- if sseObject && originalRangeHeader != "" {
- r.Header.Set("Range", originalRangeHeader)
+ // For small files stored inline in entry.Content - validate BEFORE setting headers
+ if len(entry.Content) > 0 && totalSize == int64(len(entry.Content)) {
+ if isRangeRequest {
+ // Safely convert int64 to int for slice indexing - validate BEFORE WriteHeader
+ // Use MaxInt32 for portability across 32-bit and 64-bit platforms
+ if offset < 0 || offset > int64(math.MaxInt32) || size < 0 || size > int64(math.MaxInt32) {
+ // Early validation error: write S3-compliant error response
+ w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
+ return newStreamErrorWithResponse(fmt.Errorf("range too large for platform: offset=%d, size=%d", offset, size))
+ }
+ start := int(offset)
+ end := start + int(size)
+ // Bounds check (should already be validated, but double-check) - BEFORE WriteHeader
+ if start < 0 || start > len(entry.Content) || end > len(entry.Content) || end < start {
+ // Early validation error: write S3-compliant error response
+ w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
+ return newStreamErrorWithResponse(fmt.Errorf("invalid range for inline content: start=%d, end=%d, len=%d", start, end, len(entry.Content)))
+ }
+ // Validation passed - now set headers and write
+ s3a.setResponseHeaders(w, entry, totalSize)
+ w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize))
+ w.Header().Set("Content-Length", strconv.FormatInt(size, 10))
+ w.WriteHeader(http.StatusPartialContent)
+ _, err := w.Write(entry.Content[start:end])
+ return err
+ }
+ // Non-range request for inline content
+ s3a.setResponseHeaders(w, entry, totalSize)
+ w.WriteHeader(http.StatusOK)
+ _, err := w.Write(entry.Content)
+ return err
+ }
+
+ // Get chunks and validate BEFORE setting headers
+ chunks := entry.GetChunks()
+ glog.V(4).Infof("streamFromVolumeServers: entry has %d chunks, totalSize=%d, isRange=%v, offset=%d, size=%d",
+ len(chunks), totalSize, isRangeRequest, offset, size)
+
+ if len(chunks) == 0 {
+ // BUG FIX: If totalSize > 0 but no chunks and no content, this is a data integrity issue
+ if totalSize > 0 && len(entry.Content) == 0 {
+ glog.Errorf("streamFromVolumeServers: Data integrity error - entry reports size %d but has no content or chunks", totalSize)
+ // Write S3-compliant XML error response
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return newStreamErrorWithResponse(fmt.Errorf("data integrity error: size %d reported but no content available", totalSize))
}
+ // Empty object - set headers and write status
+ s3a.setResponseHeaders(w, entry, totalSize)
+ w.WriteHeader(http.StatusOK)
+ return nil
+ }
- // Add SSE metadata headers based on object metadata before SSE processing
- if objectEntryForSSE != nil {
- s3a.addSSEHeadersToResponse(proxyResponse, objectEntryForSSE)
+ // Log chunk details (verbose only - high frequency)
+ if glog.V(4) {
+ for i, chunk := range chunks {
+ glog.Infof(" GET Chunk[%d]: fid=%s, offset=%d, size=%d", i, chunk.GetFileIdString(), chunk.Offset, chunk.Size)
}
+ }
- // Handle SSE decryption (both SSE-C and SSE-KMS) if needed
- return s3a.handleSSEResponse(r, proxyResponse, w, objectEntryForSSE)
- })
+ // CRITICAL: Resolve chunks and prepare stream BEFORE WriteHeader
+ // This ensures we can write proper error responses if these operations fail
+ ctx := r.Context()
+ lookupFileIdFn := s3a.createLookupFileIdFunction()
+
+ // Resolve chunk manifests with the requested range
+ tChunkResolve := time.Now()
+ resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, offset, offset+size)
+ chunkResolveTime = time.Since(tChunkResolve)
+ if err != nil {
+ glog.Errorf("streamFromVolumeServers: failed to resolve chunks: %v", err)
+ // Write S3-compliant XML error response
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return newStreamErrorWithResponse(fmt.Errorf("failed to resolve chunks: %v", err))
+ }
+
+ // Prepare streaming function with simple master client wrapper
+ tStreamPrep := time.Now()
+ masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn}
+ streamFn, err := filer.PrepareStreamContentWithThrottler(
+ ctx,
+ masterClient,
+ func(fileId string) string {
+ // Use volume server JWT (not filer JWT) for direct volume reads
+ return string(security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, fileId))
+ },
+ resolvedChunks,
+ offset,
+ size,
+ 0, // no throttling
+ )
+ streamPrepTime = time.Since(tStreamPrep)
+ if err != nil {
+ glog.Errorf("streamFromVolumeServers: failed to prepare stream: %v", err)
+ // Write S3-compliant XML error response
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return newStreamErrorWithResponse(fmt.Errorf("failed to prepare stream: %v", err))
+ }
+
+ // All validation and preparation successful - NOW set headers and write status
+ tHeaderSet := time.Now()
+ s3a.setResponseHeaders(w, entry, totalSize)
+
+ // Override/add range-specific headers if this is a range request
+ if isRangeRequest {
+ w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize))
+ w.Header().Set("Content-Length", strconv.FormatInt(size, 10))
+ }
+ headerSetTime = time.Since(tHeaderSet)
+
+ // Now write status code (headers are all set, stream is ready)
+ if isRangeRequest {
+ w.WriteHeader(http.StatusPartialContent)
+ } else {
+ w.WriteHeader(http.StatusOK)
+ }
+
+ // Stream directly to response
+ tStreamExec := time.Now()
+ glog.V(4).Infof("streamFromVolumeServers: starting streamFn, offset=%d, size=%d", offset, size)
+ err = streamFn(w)
+ streamExecTime = time.Since(tStreamExec)
+ if err != nil {
+ glog.Errorf("streamFromVolumeServers: streamFn failed: %v", err)
+ // Streaming error after WriteHeader was called - response already partially written
+ return newStreamErrorWithResponse(err)
+ }
+ glog.V(4).Infof("streamFromVolumeServers: streamFn completed successfully")
+ return nil
+}
+
+// Shared HTTP client for volume server requests (connection pooling)
+var volumeServerHTTPClient = &http.Client{
+ Timeout: 5 * time.Minute,
+ Transport: &http.Transport{
+ MaxIdleConns: 100,
+ MaxIdleConnsPerHost: 10,
+ IdleConnTimeout: 90 * time.Second,
+ },
+}
+
+// createLookupFileIdFunction creates a reusable lookup function for resolving volume URLs
+func (s3a *S3ApiServer) createLookupFileIdFunction() func(context.Context, string) ([]string, error) {
+ return func(ctx context.Context, fileId string) ([]string, error) {
+ var urls []string
+ err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ vid := filer.VolumeId(fileId)
+ resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
+ VolumeIds: []string{vid},
+ })
+ if err != nil {
+ return err
+ }
+ if locs, found := resp.LocationsMap[vid]; found {
+ for _, loc := range locs.Locations {
+ // Build complete URL with volume server address and fileId
+ // The fileId parameter contains the full "volumeId,fileKey" identifier (e.g., "3,01637037d6")
+ // This constructs URLs like: http://127.0.0.1:8080/3,01637037d6 (or https:// if configured)
+ // NormalizeUrl ensures the proper scheme (http:// or https://) is used based on configuration
+ normalizedUrl, err := util_http.NormalizeUrl(loc.Url)
+ if err != nil {
+ glog.Warningf("Failed to normalize URL for %s: %v", loc.Url, err)
+ continue
+ }
+ urls = append(urls, normalizedUrl+"/"+fileId)
+ }
+ }
+ return nil
+ })
+ glog.V(3).Infof("createLookupFileIdFunction: fileId=%s, resolved urls=%v", fileId, urls)
+ return urls, err
+ }
+}
+
+// streamFromVolumeServersWithSSE handles streaming with inline SSE decryption
+func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error {
+ // If not encrypted, use fast path without decryption
+ if sseType == "" || sseType == "None" {
+ return s3a.streamFromVolumeServers(w, r, entry, sseType)
+ }
+
+ // Profiling: Track SSE decryption stages
+ t0 := time.Now()
+ var (
+ rangeParseTime time.Duration
+ keyValidateTime time.Duration
+ headerSetTime time.Duration
+ streamFetchTime time.Duration
+ decryptSetupTime time.Duration
+ copyTime time.Duration
+ )
+ defer func() {
+ totalTime := time.Since(t0)
+ glog.V(2).Infof(" └─ streamFromVolumeServersWithSSE (%s): total=%v, rangeParse=%v, keyValidate=%v, headerSet=%v, streamFetch=%v, decryptSetup=%v, copy=%v",
+ sseType, totalTime, rangeParseTime, keyValidateTime, headerSetTime, streamFetchTime, decryptSetupTime, copyTime)
+ }()
+
+ glog.V(2).Infof("streamFromVolumeServersWithSSE: Handling %s encrypted object with inline decryption", sseType)
+
+ // Parse Range header BEFORE key validation
+ totalSize := int64(filer.FileSize(entry))
+ tRangeParse := time.Now()
+ var offset int64 = 0
+ var size int64 = totalSize
+ rangeHeader := r.Header.Get("Range")
+ isRangeRequest := false
+
+ if rangeHeader != "" && strings.HasPrefix(rangeHeader, "bytes=") {
+ rangeSpec := rangeHeader[6:]
+ parts := strings.Split(rangeSpec, "-")
+ if len(parts) == 2 {
+ var startOffset, endOffset int64
+
+ if parts[0] == "" && parts[1] != "" {
+ // Suffix range: bytes=-N (last N bytes)
+ if suffixLen, err := strconv.ParseInt(parts[1], 10, 64); err == nil {
+ // RFC 7233: suffix range on empty object or zero-length suffix is unsatisfiable
+ if totalSize == 0 || suffixLen <= 0 {
+ w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
+ return newStreamErrorWithResponse(fmt.Errorf("invalid suffix range for empty object"))
+ }
+ if suffixLen > totalSize {
+ suffixLen = totalSize
+ }
+ startOffset = totalSize - suffixLen
+ endOffset = totalSize - 1
+ } else {
+ // Set header BEFORE WriteHeader
+ w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
+ return newStreamErrorWithResponse(fmt.Errorf("invalid suffix range"))
+ }
+ } else {
+ // Regular range or open-ended range
+ startOffset = 0
+ endOffset = totalSize - 1
+
+ if parts[0] != "" {
+ if parsed, err := strconv.ParseInt(parts[0], 10, 64); err == nil {
+ startOffset = parsed
+ }
+ }
+ if parts[1] != "" {
+ if parsed, err := strconv.ParseInt(parts[1], 10, 64); err == nil {
+ endOffset = parsed
+ }
+ }
+
+ // Validate range
+ if startOffset < 0 || startOffset >= totalSize {
+ // Set header BEFORE WriteHeader
+ w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
+ return newStreamErrorWithResponse(fmt.Errorf("invalid range start"))
+ }
+
+ if endOffset >= totalSize {
+ endOffset = totalSize - 1
+ }
+
+ if endOffset < startOffset {
+ // Set header BEFORE WriteHeader
+ w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", totalSize))
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
+ return newStreamErrorWithResponse(fmt.Errorf("invalid range: end before start"))
+ }
+ }
+
+ offset = startOffset
+ size = endOffset - startOffset + 1
+ isRangeRequest = true
+ glog.V(2).Infof("streamFromVolumeServersWithSSE: Range request bytes %d-%d/%d (size=%d)", startOffset, endOffset, totalSize, size)
+ }
+ }
+ rangeParseTime = time.Since(tRangeParse)
+
+ // Validate SSE keys BEFORE streaming
+ tKeyValidate := time.Now()
+ var decryptionKey interface{}
+ switch sseType {
+ case s3_constants.SSETypeC:
+ customerKey, err := ParseSSECHeaders(r)
+ if err != nil {
+ s3err.WriteErrorResponse(w, r, MapSSECErrorToS3Error(err))
+ return newStreamErrorWithResponse(err)
+ }
+ if customerKey == nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
+ return newStreamErrorWithResponse(fmt.Errorf("SSE-C key required"))
+ }
+ // Validate key MD5
+ if entry.Extended != nil {
+ storedKeyMD5 := string(entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5])
+ if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 {
+ s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
+ return newStreamErrorWithResponse(fmt.Errorf("SSE-C key mismatch"))
+ }
+ }
+ decryptionKey = customerKey
+ case s3_constants.SSETypeKMS:
+ // Extract KMS key from metadata (stored as raw bytes, matching filer behavior)
+ if entry.Extended == nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return newStreamErrorWithResponse(fmt.Errorf("no SSE-KMS metadata"))
+ }
+ kmsMetadataBytes := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]
+ sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
+ if err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return newStreamErrorWithResponse(err)
+ }
+ decryptionKey = sseKMSKey
+ case s3_constants.SSETypeS3:
+ // Extract S3 key from metadata (stored as raw bytes, matching filer behavior)
+ if entry.Extended == nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return newStreamErrorWithResponse(fmt.Errorf("no SSE-S3 metadata"))
+ }
+ keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key]
+ keyManager := GetSSES3KeyManager()
+ sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager)
+ if err != nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return newStreamErrorWithResponse(err)
+ }
+ decryptionKey = sseS3Key
+ }
+ keyValidateTime = time.Since(tKeyValidate)
+
+ // Set response headers
+ // IMPORTANT: Set ALL headers BEFORE calling WriteHeader (headers are ignored after WriteHeader)
+ tHeaderSet := time.Now()
+ s3a.setResponseHeaders(w, entry, totalSize)
+ s3a.addSSEResponseHeadersFromEntry(w, r, entry, sseType)
+
+ // Override/add range-specific headers if this is a range request
+ if isRangeRequest {
+ w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", offset, offset+size-1, totalSize))
+ w.Header().Set("Content-Length", strconv.FormatInt(size, 10))
+ }
+ headerSetTime = time.Since(tHeaderSet)
+
+ // Now write status code (headers are all set)
+ if isRangeRequest {
+ w.WriteHeader(http.StatusPartialContent)
+ }
+
+ // Full Range Optimization: Use ViewFromChunks to only fetch/decrypt needed chunks
+ tDecryptSetup := time.Now()
+
+ // Use range-aware chunk resolution (like filer does)
+ if isRangeRequest {
+ glog.V(2).Infof("Using range-aware SSE decryption for offset=%d size=%d", offset, size)
+ streamFetchTime = 0 // No full stream fetch in range-aware path
+ err := s3a.streamDecryptedRangeFromChunks(r.Context(), w, entry, offset, size, sseType, decryptionKey)
+ decryptSetupTime = time.Since(tDecryptSetup)
+ copyTime = decryptSetupTime // Streaming is included in decrypt setup for range-aware path
+ if err != nil {
+ // Error after WriteHeader - response already written
+ return newStreamErrorWithResponse(err)
+ }
+ return nil
+ }
+
+ // Full object path: Optimize multipart vs single-part
+ var decryptedReader io.Reader
+ var err error
+
+ switch sseType {
+ case s3_constants.SSETypeC:
+ customerKey := decryptionKey.(*SSECustomerKey)
+
+ // Check if this is a multipart object (multiple chunks with SSE-C metadata)
+ isMultipartSSEC := false
+ ssecChunks := 0
+ for _, chunk := range entry.GetChunks() {
+ if chunk.GetSseType() == filer_pb.SSEType_SSE_C && len(chunk.GetSseMetadata()) > 0 {
+ ssecChunks++
+ }
+ }
+ isMultipartSSEC = ssecChunks > 1
+ glog.V(3).Infof("SSE-C decryption: KeyMD5=%s, entry has %d chunks, isMultipart=%v, ssecChunks=%d",
+ customerKey.KeyMD5, len(entry.GetChunks()), isMultipartSSEC, ssecChunks)
+
+ if isMultipartSSEC {
+ // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly
+ // This saves one filer lookup/pipe creation
+ decryptedReader, err = s3a.createMultipartSSECDecryptedReaderDirect(r.Context(), nil, customerKey, entry)
+ glog.V(2).Infof("Using multipart SSE-C decryption for object with %d chunks (no prefetch)", len(entry.GetChunks()))
+ } else {
+ // For single-part, get encrypted stream and decrypt
+ tStreamFetch := time.Now()
+ encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry)
+ streamFetchTime = time.Since(tStreamFetch)
+ if streamErr != nil {
+ // Error after WriteHeader - response already written
+ return newStreamErrorWithResponse(streamErr)
+ }
+ defer encryptedReader.Close()
+
+ iv := entry.Extended[s3_constants.SeaweedFSSSEIV]
+ if len(iv) == 0 {
+ // Error after WriteHeader - response already written
+ return newStreamErrorWithResponse(fmt.Errorf("SSE-C IV not found in entry metadata"))
+ }
+ glog.V(2).Infof("SSE-C decryption: IV length=%d, KeyMD5=%s", len(iv), customerKey.KeyMD5)
+ decryptedReader, err = CreateSSECDecryptedReader(encryptedReader, customerKey, iv)
+ }
+
+ case s3_constants.SSETypeKMS:
+ sseKMSKey := decryptionKey.(*SSEKMSKey)
+
+ // Check if this is a multipart object (multiple chunks with SSE-KMS metadata)
+ isMultipartSSEKMS := false
+ ssekmsChunks := 0
+ for _, chunk := range entry.GetChunks() {
+ if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 {
+ ssekmsChunks++
+ }
+ }
+ isMultipartSSEKMS = ssekmsChunks > 1
+ glog.V(3).Infof("SSE-KMS decryption: isMultipart=%v, ssekmsChunks=%d", isMultipartSSEKMS, ssekmsChunks)
+
+ if isMultipartSSEKMS {
+ // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly
+ decryptedReader, err = s3a.createMultipartSSEKMSDecryptedReaderDirect(r.Context(), nil, entry)
+ glog.V(2).Infof("Using multipart SSE-KMS decryption for object with %d chunks (no prefetch)", len(entry.GetChunks()))
+ } else {
+ // For single-part, get encrypted stream and decrypt
+ tStreamFetch := time.Now()
+ encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry)
+ streamFetchTime = time.Since(tStreamFetch)
+ if streamErr != nil {
+ // Error after WriteHeader - response already written
+ return newStreamErrorWithResponse(streamErr)
+ }
+ defer encryptedReader.Close()
+
+ glog.V(2).Infof("SSE-KMS decryption: KeyID=%s, IV length=%d", sseKMSKey.KeyID, len(sseKMSKey.IV))
+ decryptedReader, err = CreateSSEKMSDecryptedReader(encryptedReader, sseKMSKey)
+ }
+
+ case s3_constants.SSETypeS3:
+ sseS3Key := decryptionKey.(*SSES3Key)
+
+ // Check if this is a multipart object (multiple chunks with SSE-S3 metadata)
+ isMultipartSSES3 := false
+ sses3Chunks := 0
+ for _, chunk := range entry.GetChunks() {
+ if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(chunk.GetSseMetadata()) > 0 {
+ sses3Chunks++
+ }
+ }
+ isMultipartSSES3 = sses3Chunks > 1
+ glog.V(3).Infof("SSE-S3 decryption: isMultipart=%v, sses3Chunks=%d", isMultipartSSES3, sses3Chunks)
+
+ if isMultipartSSES3 {
+ // For multipart, skip getEncryptedStreamFromVolumes and fetch chunks directly
+ decryptedReader, err = s3a.createMultipartSSES3DecryptedReaderDirect(r.Context(), nil, entry)
+ glog.V(2).Infof("Using multipart SSE-S3 decryption for object with %d chunks (no prefetch)", len(entry.GetChunks()))
+ } else {
+ // For single-part, get encrypted stream and decrypt
+ tStreamFetch := time.Now()
+ encryptedReader, streamErr := s3a.getEncryptedStreamFromVolumes(r.Context(), entry)
+ streamFetchTime = time.Since(tStreamFetch)
+ if streamErr != nil {
+ // Error after WriteHeader - response already written
+ return newStreamErrorWithResponse(streamErr)
+ }
+ defer encryptedReader.Close()
+
+ keyManager := GetSSES3KeyManager()
+ iv, ivErr := GetSSES3IV(entry, sseS3Key, keyManager)
+ if ivErr != nil {
+ // Error after WriteHeader - response already written
+ return newStreamErrorWithResponse(fmt.Errorf("failed to get SSE-S3 IV: %w", ivErr))
+ }
+ glog.V(2).Infof("SSE-S3 decryption: KeyID=%s, IV length=%d", sseS3Key.KeyID, len(iv))
+ decryptedReader, err = CreateSSES3DecryptedReader(encryptedReader, sseS3Key, iv)
+ }
+ }
+ decryptSetupTime = time.Since(tDecryptSetup)
+
+ if err != nil {
+ glog.Errorf("SSE decryption error (%s): %v", sseType, err)
+ // Error after WriteHeader - response already written
+ return newStreamErrorWithResponse(fmt.Errorf("failed to create decrypted reader: %w", err))
+ }
+
+ // Close the decrypted reader to avoid leaking HTTP bodies
+ if closer, ok := decryptedReader.(io.Closer); ok {
+ defer func() {
+ if closeErr := closer.Close(); closeErr != nil {
+ glog.V(3).Infof("Error closing decrypted reader: %v", closeErr)
+ }
+ }()
+ }
+
+ // Stream full decrypted object to client
+ tCopy := time.Now()
+ buf := make([]byte, 128*1024)
+ copied, copyErr := io.CopyBuffer(w, decryptedReader, buf)
+ copyTime = time.Since(tCopy)
+ if copyErr != nil {
+ glog.Errorf("Failed to copy full object: copied %d bytes: %v", copied, copyErr)
+ // Error after WriteHeader - response already written
+ return newStreamErrorWithResponse(copyErr)
+ }
+ glog.V(3).Infof("Full object request: copied %d bytes", copied)
+ return nil
+}
+
+// streamDecryptedRangeFromChunks streams a range of decrypted data by only fetching needed chunks
+// This implements the filer's ViewFromChunks approach for optimal range performance
+func (s3a *S3ApiServer) streamDecryptedRangeFromChunks(ctx context.Context, w io.Writer, entry *filer_pb.Entry, offset int64, size int64, sseType string, decryptionKey interface{}) error {
+ // Use filer's ViewFromChunks to resolve only needed chunks for the range
+ lookupFileIdFn := s3a.createLookupFileIdFunction()
+ chunkViews := filer.ViewFromChunks(ctx, lookupFileIdFn, entry.GetChunks(), offset, size)
+
+ totalWritten := int64(0)
+ targetOffset := offset
+
+ // Stream each chunk view
+ for x := chunkViews.Front(); x != nil; x = x.Next {
+ chunkView := x.Value
+
+ // Handle gaps between chunks (write zeros)
+ if targetOffset < chunkView.ViewOffset {
+ gap := chunkView.ViewOffset - targetOffset
+ glog.V(4).Infof("Writing %d zero bytes for gap [%d,%d)", gap, targetOffset, chunkView.ViewOffset)
+ if err := writeZeroBytes(w, gap); err != nil {
+ return fmt.Errorf("failed to write zero padding: %w", err)
+ }
+ totalWritten += gap
+ targetOffset = chunkView.ViewOffset
+ }
+
+ // Find the corresponding FileChunk for this chunkView
+ var fileChunk *filer_pb.FileChunk
+ for _, chunk := range entry.GetChunks() {
+ if chunk.GetFileIdString() == chunkView.FileId {
+ fileChunk = chunk
+ break
+ }
+ }
+ if fileChunk == nil {
+ return fmt.Errorf("chunk %s not found in entry", chunkView.FileId)
+ }
+
+ // Fetch and decrypt this chunk view
+ var decryptedChunkReader io.Reader
+ var err error
+
+ switch sseType {
+ case s3_constants.SSETypeC:
+ decryptedChunkReader, err = s3a.decryptSSECChunkView(ctx, fileChunk, chunkView, decryptionKey.(*SSECustomerKey))
+ case s3_constants.SSETypeKMS:
+ decryptedChunkReader, err = s3a.decryptSSEKMSChunkView(ctx, fileChunk, chunkView)
+ case s3_constants.SSETypeS3:
+ decryptedChunkReader, err = s3a.decryptSSES3ChunkView(ctx, fileChunk, chunkView, entry)
+ default:
+ // Non-encrypted chunk
+ decryptedChunkReader, err = s3a.fetchChunkViewData(ctx, chunkView)
+ }
+
+ if err != nil {
+ return fmt.Errorf("failed to decrypt chunk view %s: %w", chunkView.FileId, err)
+ }
+
+ // Copy the decrypted chunk data
+ written, copyErr := io.Copy(w, decryptedChunkReader)
+ if closer, ok := decryptedChunkReader.(io.Closer); ok {
+ closeErr := closer.Close()
+ if closeErr != nil {
+ glog.Warningf("streamDecryptedRangeFromChunks: failed to close decrypted chunk reader: %v", closeErr)
+ }
+ }
+ if copyErr != nil {
+ glog.Errorf("streamDecryptedRangeFromChunks: copy error after writing %d bytes (expected %d): %v", written, chunkView.ViewSize, copyErr)
+ return fmt.Errorf("failed to copy decrypted chunk data: %w", copyErr)
+ }
+
+ if written != int64(chunkView.ViewSize) {
+ glog.Errorf("streamDecryptedRangeFromChunks: size mismatch - wrote %d bytes but expected %d", written, chunkView.ViewSize)
+ return fmt.Errorf("size mismatch: wrote %d bytes but expected %d for chunk %s", written, chunkView.ViewSize, chunkView.FileId)
+ }
+
+ totalWritten += written
+ targetOffset += written
+ glog.V(2).Infof("streamDecryptedRangeFromChunks: Wrote %d bytes from chunk %s [%d,%d), totalWritten=%d, targetSize=%d", written, chunkView.FileId, chunkView.ViewOffset, chunkView.ViewOffset+int64(chunkView.ViewSize), totalWritten, size)
+ }
+
+ // Handle trailing zeros if needed
+ remaining := size - totalWritten
+ if remaining > 0 {
+ glog.V(4).Infof("Writing %d trailing zero bytes", remaining)
+ if err := writeZeroBytes(w, remaining); err != nil {
+ return fmt.Errorf("failed to write trailing zeros: %w", err)
+ }
+ }
+
+ glog.V(3).Infof("Completed range-aware SSE decryption: wrote %d bytes for range [%d,%d)", totalWritten, offset, offset+size)
+ return nil
+}
+
+// writeZeroBytes writes n zero bytes to writer using the package-level zero buffer
+func writeZeroBytes(w io.Writer, n int64) error {
+ for n > 0 {
+ toWrite := min(n, int64(len(zeroBuf)))
+ written, err := w.Write(zeroBuf[:toWrite])
+ if err != nil {
+ return err
+ }
+ n -= int64(written)
+ }
+ return nil
+}
+
+// decryptSSECChunkView decrypts a specific chunk view with SSE-C
+//
+// IV Handling for SSE-C:
+// ----------------------
+// SSE-C multipart encryption (see lines 2772-2781) differs fundamentally from SSE-KMS/SSE-S3:
+//
+// 1. Encryption: CreateSSECEncryptedReader generates a RANDOM IV per part/chunk
+// - Each part starts with a fresh random IV
+// - CTR counter starts from 0 for each part: counter₀, counter₁, counter₂, ...
+// - PartOffset is stored in metadata but NOT applied during encryption
+//
+// 2. Decryption: Use the stored IV directly WITHOUT offset adjustment
+// - The stored IV already represents the start of this part's encryption
+// - Applying calculateIVWithOffset would shift to counterₙ, misaligning the keystream
+// - Result: XOR with wrong keystream = corrupted plaintext
+//
+// This contrasts with SSE-KMS/SSE-S3 which use: base IV + calculateIVWithOffset(ChunkOffset)
+func (s3a *S3ApiServer) decryptSSECChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView, customerKey *SSECustomerKey) (io.Reader, error) {
+ // For multipart SSE-C, each chunk has its own IV in chunk.SseMetadata
+ if fileChunk.GetSseType() == filer_pb.SSEType_SSE_C && len(fileChunk.GetSseMetadata()) > 0 {
+ ssecMetadata, err := DeserializeSSECMetadata(fileChunk.GetSseMetadata())
+ if err != nil {
+ return nil, fmt.Errorf("failed to deserialize SSE-C metadata: %w", err)
+ }
+ chunkIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV)
+ if err != nil {
+ return nil, fmt.Errorf("failed to decode IV: %w", err)
+ }
+
+ // Fetch FULL encrypted chunk
+ // Note: Fetching full chunk is necessary for proper CTR decryption stream
+ fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId)
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch full chunk: %w", err)
+ }
+
+ // CRITICAL: Use stored IV directly WITHOUT offset adjustment
+ // The stored IV is the random IV used at encryption time for this specific part
+ // SSE-C does NOT apply calculateIVWithOffset during encryption, so we must not apply it during decryption
+ // (See documentation above and at lines 2772-2781 for detailed explanation)
+ decryptedReader, decryptErr := CreateSSECDecryptedReader(fullChunkReader, customerKey, chunkIV)
+ if decryptErr != nil {
+ fullChunkReader.Close()
+ return nil, fmt.Errorf("failed to create decrypted reader: %w", decryptErr)
+ }
+
+ // Skip to the position we need in the decrypted stream
+ if chunkView.OffsetInChunk > 0 {
+ _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk)
+ if err != nil {
+ if closer, ok := decryptedReader.(io.Closer); ok {
+ closer.Close()
+ }
+ return nil, fmt.Errorf("failed to skip to offset %d: %w", chunkView.OffsetInChunk, err)
+ }
+ }
+
+ // Return a reader that only reads ViewSize bytes with proper cleanup
+ limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize))
+ return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil
+ }
+
+ // Single-part SSE-C: use object-level IV (should not hit this in range path, but handle it)
+ encryptedReader, err := s3a.fetchChunkViewData(ctx, chunkView)
+ if err != nil {
+ return nil, err
+ }
+ // For single-part, the IV is stored at object level, already handled in non-range path
+ return encryptedReader, nil
+}
+
+// decryptSSEKMSChunkView decrypts a specific chunk view with SSE-KMS
+//
+// IV Handling for SSE-KMS:
+// ------------------------
+// SSE-KMS (and SSE-S3) use a fundamentally different IV scheme than SSE-C:
+//
+// 1. Encryption: Uses a BASE IV + offset calculation
+// - Base IV is generated once for the entire object
+// - For each chunk at position N: adjustedIV = calculateIVWithOffset(baseIV, N)
+// - This shifts the CTR counter to counterₙ where n = N/16
+// - ChunkOffset is stored in metadata and IS applied during encryption
+//
+// 2. Decryption: Apply the same offset calculation
+// - Use calculateIVWithOffset(baseIV, ChunkOffset) to reconstruct the encryption IV
+// - Also handle ivSkip for non-block-aligned offsets (intra-block positioning)
+// - This ensures decryption uses the same CTR counter sequence as encryption
+//
+// This contrasts with SSE-C which uses random IVs without offset calculation.
+func (s3a *S3ApiServer) decryptSSEKMSChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView) (io.Reader, error) {
+ if fileChunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(fileChunk.GetSseMetadata()) > 0 {
+ sseKMSKey, err := DeserializeSSEKMSMetadata(fileChunk.GetSseMetadata())
+ if err != nil {
+ return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata: %w", err)
+ }
+
+ // Fetch FULL encrypted chunk
+ fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId)
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch full chunk: %w", err)
+ }
+
+ // IMPORTANT: Calculate adjusted IV using ChunkOffset
+ // SSE-KMS uses base IV + offset calculation (unlike SSE-C which uses random IVs)
+ // This reconstructs the same IV that was used during encryption
+ var adjustedIV []byte
+ var ivSkip int
+ if sseKMSKey.ChunkOffset > 0 {
+ adjustedIV, ivSkip = calculateIVWithOffset(sseKMSKey.IV, sseKMSKey.ChunkOffset)
+ } else {
+ adjustedIV = sseKMSKey.IV
+ ivSkip = 0
+ }
+
+ adjustedKey := &SSEKMSKey{
+ KeyID: sseKMSKey.KeyID,
+ EncryptedDataKey: sseKMSKey.EncryptedDataKey,
+ EncryptionContext: sseKMSKey.EncryptionContext,
+ BucketKeyEnabled: sseKMSKey.BucketKeyEnabled,
+ IV: adjustedIV,
+ ChunkOffset: sseKMSKey.ChunkOffset,
+ }
+
+ decryptedReader, decryptErr := CreateSSEKMSDecryptedReader(fullChunkReader, adjustedKey)
+ if decryptErr != nil {
+ fullChunkReader.Close()
+ return nil, fmt.Errorf("failed to create KMS decrypted reader: %w", decryptErr)
+ }
+
+ // CRITICAL: Skip intra-block bytes from CTR decryption (non-block-aligned offset handling)
+ if ivSkip > 0 {
+ _, err = io.CopyN(io.Discard, decryptedReader, int64(ivSkip))
+ if err != nil {
+ if closer, ok := decryptedReader.(io.Closer); ok {
+ closer.Close()
+ }
+ return nil, fmt.Errorf("failed to skip intra-block bytes (%d): %w", ivSkip, err)
+ }
+ }
+
+ // Skip to position and limit to ViewSize
+ if chunkView.OffsetInChunk > 0 {
+ _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk)
+ if err != nil {
+ if closer, ok := decryptedReader.(io.Closer); ok {
+ closer.Close()
+ }
+ return nil, fmt.Errorf("failed to skip to offset: %w", err)
+ }
+ }
+
+ limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize))
+ return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil
+ }
+
+ // Non-KMS encrypted chunk
+ return s3a.fetchChunkViewData(ctx, chunkView)
+}
+
+// decryptSSES3ChunkView decrypts a specific chunk view with SSE-S3
+//
+// IV Handling for SSE-S3:
+// -----------------------
+// SSE-S3 uses the same BASE IV + offset scheme as SSE-KMS, but with a subtle difference:
+//
+// 1. Encryption: Uses BASE IV + offset, but stores the ADJUSTED IV
+// - Base IV is generated once for the entire object
+// - For each chunk at position N: adjustedIV, skip = calculateIVWithOffset(baseIV, N)
+// - The ADJUSTED IV (not base IV) is stored in chunk metadata
+// - ChunkOffset calculation is performed during encryption
+//
+// 2. Decryption: Use the stored adjusted IV directly
+// - The stored IV is already block-aligned and ready to use
+// - No need to call calculateIVWithOffset again (unlike SSE-KMS)
+// - Decrypt full chunk from start, then skip to OffsetInChunk in plaintext
+//
+// This differs from:
+// - SSE-C: Uses random IV per chunk, no offset calculation
+// - SSE-KMS: Stores base IV, requires calculateIVWithOffset during decryption
+func (s3a *S3ApiServer) decryptSSES3ChunkView(ctx context.Context, fileChunk *filer_pb.FileChunk, chunkView *filer.ChunkView, entry *filer_pb.Entry) (io.Reader, error) {
+ // For multipart SSE-S3, each chunk has its own IV in chunk.SseMetadata
+ if fileChunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(fileChunk.GetSseMetadata()) > 0 {
+ keyManager := GetSSES3KeyManager()
+
+ // Deserialize per-chunk SSE-S3 metadata to get chunk-specific IV
+ chunkSSES3Metadata, err := DeserializeSSES3Metadata(fileChunk.GetSseMetadata(), keyManager)
+ if err != nil {
+ return nil, fmt.Errorf("failed to deserialize chunk SSE-S3 metadata: %w", err)
+ }
+
+ // Fetch FULL encrypted chunk (necessary for proper CTR decryption stream)
+ fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId)
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch full chunk: %w", err)
+ }
+
+ // IMPORTANT: Use the stored IV directly - it's already block-aligned
+ // During encryption, CreateSSES3EncryptedReaderWithBaseIV called:
+ // adjustedIV, skip := calculateIVWithOffset(baseIV, partOffset)
+ // and stored the adjustedIV in metadata. We use it as-is for decryption.
+ // No need to call calculateIVWithOffset again (unlike SSE-KMS which stores base IV).
+ iv := chunkSSES3Metadata.IV
+
+ glog.V(4).Infof("Decrypting multipart SSE-S3 chunk %s with chunk-specific IV length=%d",
+ chunkView.FileId, len(iv))
+
+ // Decrypt the full chunk starting from offset 0
+ decryptedReader, decryptErr := CreateSSES3DecryptedReader(fullChunkReader, chunkSSES3Metadata, iv)
+ if decryptErr != nil {
+ fullChunkReader.Close()
+ return nil, fmt.Errorf("failed to create SSE-S3 decrypted reader: %w", decryptErr)
+ }
+
+ // Skip to position within the decrypted chunk (plaintext offset, not ciphertext offset)
+ if chunkView.OffsetInChunk > 0 {
+ _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk)
+ if err != nil {
+ if closer, ok := decryptedReader.(io.Closer); ok {
+ closer.Close()
+ }
+ return nil, fmt.Errorf("failed to skip to offset %d: %w", chunkView.OffsetInChunk, err)
+ }
+ }
+
+ limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize))
+ return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil
+ }
+
+ // Single-part SSE-S3: use object-level IV and key (fallback path)
+ keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key]
+ keyManager := GetSSES3KeyManager()
+ sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager)
+ if err != nil {
+ return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err)
+ }
+
+ // Fetch FULL encrypted chunk
+ fullChunkReader, err := s3a.fetchFullChunk(ctx, chunkView.FileId)
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch full chunk: %w", err)
+ }
+
+ // Get base IV for single-part object
+ iv, err := GetSSES3IV(entry, sseS3Key, keyManager)
+ if err != nil {
+ fullChunkReader.Close()
+ return nil, fmt.Errorf("failed to get SSE-S3 IV: %w", err)
+ }
+
+ glog.V(4).Infof("Decrypting single-part SSE-S3 chunk %s with entry-level IV length=%d",
+ chunkView.FileId, len(iv))
+
+ decryptedReader, decryptErr := CreateSSES3DecryptedReader(fullChunkReader, sseS3Key, iv)
+ if decryptErr != nil {
+ fullChunkReader.Close()
+ return nil, fmt.Errorf("failed to create S3 decrypted reader: %w", decryptErr)
+ }
+
+ // Skip to position and limit to ViewSize
+ if chunkView.OffsetInChunk > 0 {
+ _, err = io.CopyN(io.Discard, decryptedReader, chunkView.OffsetInChunk)
+ if err != nil {
+ if closer, ok := decryptedReader.(io.Closer); ok {
+ closer.Close()
+ }
+ return nil, fmt.Errorf("failed to skip to offset: %w", err)
+ }
+ }
+
+ limitedReader := io.LimitReader(decryptedReader, int64(chunkView.ViewSize))
+ return &rc{Reader: limitedReader, Closer: fullChunkReader}, nil
+}
+
+// fetchFullChunk fetches the complete encrypted chunk from volume server
+func (s3a *S3ApiServer) fetchFullChunk(ctx context.Context, fileId string) (io.ReadCloser, error) {
+ // Lookup the volume server URLs for this chunk
+ lookupFileIdFn := s3a.createLookupFileIdFunction()
+ urlStrings, err := lookupFileIdFn(ctx, fileId)
+ if err != nil || len(urlStrings) == 0 {
+ return nil, fmt.Errorf("failed to lookup chunk %s: %w", fileId, err)
+ }
+
+ // Use the first URL
+ chunkUrl := urlStrings[0]
+
+ // Generate JWT for volume server authentication
+ jwt := security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, fileId)
+
+ // Create request WITHOUT Range header to get full chunk
+ req, err := http.NewRequestWithContext(ctx, "GET", chunkUrl, nil)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create request: %w", err)
+ }
+
+ // Set JWT for authentication
+ if jwt != "" {
+ req.Header.Set("Authorization", "BEARER "+string(jwt))
+ }
+
+ // Use shared HTTP client
+ resp, err := volumeServerHTTPClient.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch chunk: %w", err)
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ resp.Body.Close()
+ return nil, fmt.Errorf("unexpected status code %d for chunk %s", resp.StatusCode, fileId)
+ }
+
+ return resp.Body, nil
+}
+
+// fetchChunkViewData fetches encrypted data for a chunk view (with range)
+func (s3a *S3ApiServer) fetchChunkViewData(ctx context.Context, chunkView *filer.ChunkView) (io.ReadCloser, error) {
+ // Lookup the volume server URLs for this chunk
+ lookupFileIdFn := s3a.createLookupFileIdFunction()
+ urlStrings, err := lookupFileIdFn(ctx, chunkView.FileId)
+ if err != nil || len(urlStrings) == 0 {
+ return nil, fmt.Errorf("failed to lookup chunk %s: %w", chunkView.FileId, err)
+ }
+
+ // Use the first URL (already contains complete URL with fileId)
+ chunkUrl := urlStrings[0]
+
+ // Generate JWT for volume server authentication
+ jwt := security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, chunkView.FileId)
+
+ // Create request with Range header for the chunk view
+ // chunkUrl already contains the complete URL including fileId
+ req, err := http.NewRequestWithContext(ctx, "GET", chunkUrl, nil)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create request: %w", err)
+ }
+
+ // Set Range header to fetch only the needed portion of the chunk
+ if !chunkView.IsFullChunk() {
+ rangeEnd := chunkView.OffsetInChunk + int64(chunkView.ViewSize) - 1
+ req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", chunkView.OffsetInChunk, rangeEnd))
+ }
+
+ // Set JWT for authentication
+ if jwt != "" {
+ req.Header.Set("Authorization", "BEARER "+string(jwt))
+ }
+
+ // Use shared HTTP client with connection pooling
+ resp, err := volumeServerHTTPClient.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("failed to fetch chunk: %w", err)
+ }
+
+ if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
+ resp.Body.Close()
+ return nil, fmt.Errorf("unexpected status code %d for chunk %s", resp.StatusCode, chunkView.FileId)
+ }
+
+ return resp.Body, nil
+}
+
+// getEncryptedStreamFromVolumes gets raw encrypted data stream from volume servers
+func (s3a *S3ApiServer) getEncryptedStreamFromVolumes(ctx context.Context, entry *filer_pb.Entry) (io.ReadCloser, error) {
+ // Handle inline content
+ if len(entry.Content) > 0 {
+ return io.NopCloser(bytes.NewReader(entry.Content)), nil
+ }
+
+ // Handle empty files
+ chunks := entry.GetChunks()
+ if len(chunks) == 0 {
+ return io.NopCloser(bytes.NewReader([]byte{})), nil
+ }
+
+ // Reuse shared lookup function to keep volume lookup logic in one place
+ lookupFileIdFn := s3a.createLookupFileIdFunction()
+
+ // Resolve chunks
+ totalSize := int64(filer.FileSize(entry))
+ resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, 0, totalSize)
+ if err != nil {
+ return nil, err
+ }
+
+ // Create streaming reader
+ masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn}
+ streamFn, err := filer.PrepareStreamContentWithThrottler(
+ ctx,
+ masterClient,
+ func(fileId string) string {
+ // Use volume server JWT (not filer JWT) for direct volume reads
+ return string(security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, fileId))
+ },
+ resolvedChunks,
+ 0,
+ totalSize,
+ 0,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ // Create a pipe to get io.ReadCloser
+ pipeReader, pipeWriter := io.Pipe()
+ go func() {
+ defer pipeWriter.Close()
+ if err := streamFn(pipeWriter); err != nil {
+ glog.Errorf("getEncryptedStreamFromVolumes: streaming error: %v", err)
+ pipeWriter.CloseWithError(err)
+ }
+ }()
+
+ return pipeReader, nil
+}
+
+// addSSEResponseHeadersFromEntry adds appropriate SSE response headers based on entry metadata
+func (s3a *S3ApiServer) addSSEResponseHeadersFromEntry(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) {
+ if entry == nil || entry.Extended == nil {
+ return
+ }
+
+ switch sseType {
+ case s3_constants.SSETypeC:
+ // SSE-C: Echo back algorithm and key MD5
+ if algo, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; exists {
+ w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, string(algo))
+ }
+ if keyMD5, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; exists {
+ w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, string(keyMD5))
+ }
+
+ case s3_constants.SSETypeKMS:
+ // SSE-KMS: Return algorithm and key ID
+ w.Header().Set(s3_constants.AmzServerSideEncryption, "aws:kms")
+ if kmsMetadataBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
+ sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
+ if err == nil {
+ AddSSEKMSResponseHeaders(w, sseKMSKey)
+ }
+ }
+
+ case s3_constants.SSETypeS3:
+ // SSE-S3: Return algorithm
+ w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmAES256)
+ }
+}
+
+// setResponseHeaders sets all standard HTTP response headers from entry metadata
+func (s3a *S3ApiServer) setResponseHeaders(w http.ResponseWriter, entry *filer_pb.Entry, totalSize int64) {
+ // Safety check: entry must be valid
+ if entry == nil {
+ glog.Errorf("setResponseHeaders: entry is nil")
+ return
+ }
+
+ // Set content length and accept ranges
+ w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
+ w.Header().Set("Accept-Ranges", "bytes")
+
+ // Set ETag (but don't overwrite if already set, e.g., for part-specific GET requests)
+ if w.Header().Get("ETag") == "" {
+ etag := filer.ETag(entry)
+ if etag != "" {
+ w.Header().Set("ETag", "\""+etag+"\"")
+ }
+ }
+
+ // Set Last-Modified in RFC1123 format
+ if entry.Attributes != nil {
+ modTime := time.Unix(entry.Attributes.Mtime, 0).UTC()
+ w.Header().Set("Last-Modified", modTime.Format(http.TimeFormat))
+ }
+
+ // Set Content-Type
+ mimeType := ""
+ if entry.Attributes != nil && entry.Attributes.Mime != "" {
+ mimeType = entry.Attributes.Mime
+ }
+ if mimeType == "" {
+ // Try to detect from entry name
+ if entry.Name != "" {
+ ext := filepath.Ext(entry.Name)
+ if ext != "" {
+ mimeType = mime.TypeByExtension(ext)
+ }
+ }
+ }
+ if mimeType != "" {
+ w.Header().Set("Content-Type", mimeType)
+ } else {
+ w.Header().Set("Content-Type", "application/octet-stream")
+ }
+
+ // Set custom headers from entry.Extended (user metadata)
+ // Use direct map assignment to preserve original header casing (matches proxy behavior)
+ if entry.Extended != nil {
+ for k, v := range entry.Extended {
+ // Skip internal SeaweedFS headers
+ if !strings.HasPrefix(k, "xattr-") && !s3_constants.IsSeaweedFSInternalHeader(k) {
+ // Support backward compatibility: migrate old non-canonical format to canonical format
+ // OLD: "x-amz-meta-foo" → NEW: "X-Amz-Meta-foo" (preserving suffix case)
+ headerKey := k
+ if len(k) >= 11 && strings.EqualFold(k[:11], "x-amz-meta-") {
+ // Normalize to AWS S3 format: "X-Amz-Meta-" prefix with lowercase suffix
+ // AWS S3 returns user metadata with the suffix in lowercase
+ suffix := k[len("x-amz-meta-"):]
+ headerKey = s3_constants.AmzUserMetaPrefix + strings.ToLower(suffix)
+ if glog.V(4) && k != headerKey {
+ glog.Infof("Normalizing user metadata header %q to %q in response", k, headerKey)
+ }
+ }
+ w.Header()[headerKey] = []string{string(v)}
+ }
+ }
+ }
+
+ // Set tag count header (matches filer logic)
+ if entry.Extended != nil {
+ tagCount := 0
+ for k := range entry.Extended {
+ if strings.HasPrefix(k, s3_constants.AmzObjectTagging+"-") {
+ tagCount++
+ }
+ }
+ if tagCount > 0 {
+ w.Header().Set(s3_constants.AmzTagCount, strconv.Itoa(tagCount))
+ }
+ }
+}
+
+// simpleMasterClient implements the minimal interface for streaming
+type simpleMasterClient struct {
+ lookupFn func(ctx context.Context, fileId string) ([]string, error)
+}
+
+func (s *simpleMasterClient) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType {
+ return s.lookupFn
}
+// HeadObjectHandler handles S3 HEAD object requests
+//
+// Special behavior for implicit directories:
+// When a HEAD request is made on a path without a trailing slash, and that path represents
+// a directory with children (either a 0-byte file marker or an actual directory), this handler
+// returns 404 Not Found instead of 200 OK. This behavior improves compatibility with s3fs and
+// matches AWS S3's handling of implicit directories.
+//
+// Rationale:
+// - AWS S3 typically doesn't create directory markers when files are uploaded (e.g., uploading
+// "dataset/file.txt" doesn't create a marker at "dataset")
+// - Some S3 clients (like PyArrow with s3fs) create directory markers, which can confuse s3fs
+// - s3fs's info() method calls HEAD first; if it succeeds with size=0, s3fs incorrectly reports
+// the object as a file instead of checking for children
+// - By returning 404 for implicit directories, we force s3fs to fall back to LIST-based discovery,
+// which correctly identifies directories by checking for children
+//
+// Examples:
+//
+// HEAD /bucket/dataset (no trailing slash, has children) → 404 Not Found (implicit directory)
+// HEAD /bucket/dataset/ (trailing slash) → 200 OK (explicit directory request)
+// HEAD /bucket/empty.txt (0-byte file, no children) → 200 OK (legitimate empty file)
+// HEAD /bucket/file.txt (regular file) → 200 OK (normal operation)
+//
+// This behavior only applies to:
+// - Non-versioned buckets (versioned buckets use different semantics)
+// - Paths without trailing slashes (trailing slash indicates explicit directory request)
+// - Objects that are either 0-byte files or actual directories
+// - Objects that have at least one child (checked via hasChildren)
func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := s3_constants.GetBucketAndObject(r)
@@ -456,7 +2062,6 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
versionId := r.URL.Query().Get("versionId")
var (
- destUrl string
entry *filer_pb.Entry // Declare entry at function scope for SSE processing
versioningConfigured bool
err error
@@ -491,22 +2096,61 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
}
targetVersionId = versionId
} else {
- // Request for latest version
- glog.V(2).Infof("HeadObject: requesting latest version for %s%s", bucket, object)
- entry, err = s3a.getLatestObjectVersion(bucket, object)
- if err != nil {
- glog.Errorf("Failed to get latest version: %v", err)
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
- return
- }
- if entry.Extended != nil {
- if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
- targetVersionId = string(versionIdBytes)
+ // Request for latest version - OPTIMIZATION:
+ // Check if .versions/ directory exists quickly (no retries) to decide path
+ // - If .versions/ exists: real versions available, use getLatestObjectVersion
+ // - If .versions/ doesn't exist (ErrNotFound): only null version at regular path, use it directly
+ // - If transient error: fall back to getLatestObjectVersion which has retry logic
+ bucketDir := s3a.option.BucketsPath + "/" + bucket
+ normalizedObject := removeDuplicateSlashes(object)
+ versionsDir := normalizedObject + s3_constants.VersionsFolder
+
+ // Quick check (no retries) for .versions/ directory
+ versionsEntry, versionsErr := s3a.getEntry(bucketDir, versionsDir)
+
+ if versionsErr == nil && versionsEntry != nil {
+ // .versions/ exists, meaning real versions are stored there
+ // Use getLatestObjectVersion which will properly find the newest version
+ entry, err = s3a.getLatestObjectVersion(bucket, object)
+ if err != nil {
+ glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
+ }
+ } else if errors.Is(versionsErr, filer_pb.ErrNotFound) {
+ // .versions/ doesn't exist (confirmed not found), check regular path for null version
+ regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject)
+ if regularErr == nil && regularEntry != nil {
+ // Found object at regular path - this is the null version
+ entry = regularEntry
+ targetVersionId = "null"
+ } else {
+ // No object at regular path either - object doesn't exist
+ glog.Errorf("HeadObject: object not found at regular path or .versions for %s%s", bucket, object)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
+ }
+ } else {
+ // Transient error checking .versions/, fall back to getLatestObjectVersion with retries
+ glog.V(2).Infof("HeadObject: transient error checking .versions for %s%s: %v, falling back to getLatestObjectVersion", bucket, object, versionsErr)
+ entry, err = s3a.getLatestObjectVersion(bucket, object)
+ if err != nil {
+ glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err)
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
}
}
- // If no version ID found in entry, this is a pre-versioning object
+ // Extract version ID if not already set
if targetVersionId == "" {
- targetVersionId = "null"
+ if entry.Extended != nil {
+ if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
+ targetVersionId = string(versionIdBytes)
+ }
+ }
+ // If no version ID found in entry, this is a pre-versioning object
+ if targetVersionId == "" {
+ targetVersionId = "null"
+ }
}
}
@@ -518,16 +2162,11 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
}
}
- // Determine the actual file path based on whether this is a versioned or pre-versioning object
+ // For versioned objects, log the target version
if targetVersionId == "null" {
- // Pre-versioning object - stored as regular file
- destUrl = s3a.toFilerUrl(bucket, object)
- glog.V(2).Infof("HeadObject: pre-versioning object URL: %s", destUrl)
+ glog.V(2).Infof("HeadObject: pre-versioning object %s/%s", bucket, object)
} else {
- // Versioned object - stored in .versions directory
- versionObjectPath := object + ".versions/" + s3a.getVersionFileName(targetVersionId)
- destUrl = s3a.toFilerUrl(bucket, versionObjectPath)
- glog.V(2).Infof("HeadObject: version %s URL: %s", targetVersionId, destUrl)
+ glog.V(2).Infof("HeadObject: version %s for %s/%s", targetVersionId, bucket, object)
}
// Set version ID in response header
@@ -535,9 +2174,6 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
// Add object lock metadata to response headers if present
s3a.addObjectLockHeadersToResponse(w, entry)
- } else {
- // Handle regular HEAD (non-versioned)
- destUrl = s3a.toFilerUrl(bucket, object)
}
// Fetch the correct entry for SSE processing (respects versionId)
@@ -559,7 +2195,7 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
var fetchErr error
objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object)
if fetchErr != nil {
- glog.Errorf("HeadObjectHandler: failed to get entry for SSE check: %v", fetchErr)
+ glog.Warningf("HeadObjectHandler: failed to get entry for %s/%s: %v", bucket, object, fetchErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
@@ -571,122 +2207,150 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
}
}
- s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
- // Handle SSE validation (both SSE-C and SSE-KMS) for HEAD requests
- return s3a.handleSSEResponse(r, proxyResponse, w, objectEntryForSSE)
- })
-}
-
-func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, isWrite bool, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64)) {
-
- glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl)
- start := time.Now()
-
- proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body)
-
- if err != nil {
- glog.Errorf("NewRequest %s: %v", destUrl, err)
+ // Safety check: entry must be valid
+ if objectEntryForSSE == nil {
+ glog.Errorf("HeadObjectHandler: objectEntryForSSE is nil for %s/%s (should not happen)", bucket, object)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
- proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
- proxyReq.Header.Set("Accept-Encoding", "identity")
- for k, v := range r.URL.Query() {
- if _, ok := s3_constants.PassThroughHeaders[strings.ToLower(k)]; ok {
- proxyReq.Header[k] = v
- }
- if k == "partNumber" {
- proxyReq.Header[s3_constants.SeaweedFSPartNumber] = v
+ // Implicit Directory Handling for s3fs Compatibility
+ // ====================================================
+ //
+ // Background:
+ // Some S3 clients (like PyArrow with s3fs) create directory markers when writing datasets.
+ // These can be either:
+ // 1. 0-byte files with directory MIME type (e.g., "application/octet-stream")
+ // 2. Actual directories in the filer (created by PyArrow's write_dataset)
+ //
+ // Problem:
+ // s3fs's info() method calls HEAD on the path. If HEAD returns 200 with size=0,
+ // s3fs incorrectly reports it as a file (type='file', size=0) instead of checking
+ // for children. This causes PyArrow to fail with "Parquet file size is 0 bytes".
+ //
+ // Solution:
+ // For non-versioned objects without trailing slash, if the object is a 0-byte file
+ // or directory AND has children, return 404 instead of 200. This forces s3fs to
+ // fall back to LIST-based discovery, which correctly identifies it as a directory.
+ //
+ // AWS S3 Compatibility:
+ // AWS S3 typically doesn't create directory markers for implicit directories, so
+ // HEAD on "dataset" (when only "dataset/file.txt" exists) returns 404. Our behavior
+ // matches this by returning 404 for implicit directories with children.
+ //
+ // Edge Cases Handled:
+ // - Empty files (0-byte, no children) → 200 OK (legitimate empty file)
+ // - Empty directories (no children) → 200 OK (legitimate empty directory)
+ // - Explicit directory requests (trailing slash) → 200 OK (handled earlier)
+ // - Versioned objects → Skip this check (different semantics)
+ //
+ // Performance:
+ // Only adds overhead for 0-byte files or directories without trailing slash.
+ // Cost: One LIST operation with Limit=1 (~1-5ms).
+ //
+ if !versioningConfigured && !strings.HasSuffix(object, "/") {
+ // Check if this is an implicit directory (either a 0-byte file or actual directory with children)
+ // PyArrow may create 0-byte files when writing datasets, or the filer may have actual directories
+ if objectEntryForSSE.Attributes != nil {
+ isZeroByteFile := objectEntryForSSE.Attributes.FileSize == 0 && !objectEntryForSSE.IsDirectory
+ isActualDirectory := objectEntryForSSE.IsDirectory
+
+ if isZeroByteFile || isActualDirectory {
+ // Check if it has children (making it an implicit directory)
+ if s3a.hasChildren(bucket, object) {
+ // This is an implicit directory with children
+ // Return 404 to force clients (like s3fs) to use LIST-based discovery
+ s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
+ return
+ }
+ }
}
}
- for header, values := range r.Header {
- proxyReq.Header[header] = values
- }
- if proxyReq.ContentLength == 0 && r.ContentLength != 0 {
- proxyReq.ContentLength = r.ContentLength
- }
- // ensure that the Authorization header is overriding any previous
- // Authorization header which might be already present in proxyReq
- s3a.maybeAddFilerJwtAuthorization(proxyReq, isWrite)
- resp, postErr := s3a.client.Do(proxyReq)
+ // For HEAD requests, we already have all metadata - just set headers directly
+ totalSize := int64(filer.FileSize(objectEntryForSSE))
+ s3a.setResponseHeaders(w, objectEntryForSSE, totalSize)
- if postErr != nil {
- glog.Errorf("post to filer: %v", postErr)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return
+ // Check if PartNumber query parameter is present (for multipart objects)
+ // This logic matches the filer handler for consistency
+ partNumberStr := r.URL.Query().Get("partNumber")
+ if partNumberStr == "" {
+ partNumberStr = r.URL.Query().Get("PartNumber")
}
- defer util_http.CloseResponse(resp)
- if resp.StatusCode == http.StatusPreconditionFailed {
- s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed)
- return
- }
+ // If PartNumber is specified, set headers (matching filer logic)
+ if partNumberStr != "" {
+ if partNumber, parseErr := strconv.Atoi(partNumberStr); parseErr == nil && partNumber > 0 {
+ // Get actual parts count from metadata (not chunk count)
+ partsCount, partInfo := s3a.getMultipartInfo(objectEntryForSSE, partNumber)
- if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
- return
- }
-
- if r.Method == http.MethodDelete {
- if resp.StatusCode == http.StatusNotFound {
- // this is normal
- responseStatusCode, _ := responseFn(resp, w)
- s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
- return
- }
- }
- if resp.StatusCode == http.StatusNotFound {
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
- return
- }
-
- TimeToFirstByte(r.Method, start, r)
- if resp.Header.Get(s3_constants.SeaweedFSIsDirectoryKey) == "true" {
- responseStatusCode, _ := responseFn(resp, w)
- s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
- return
- }
+ // Validate part number
+ if partNumber > partsCount {
+ glog.Warningf("HeadObject: Invalid part number %d, object has %d parts", partNumber, partsCount)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
+ return
+ }
- if resp.StatusCode == http.StatusInternalServerError {
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return
- }
+ // Set parts count header
+ w.Header().Set(s3_constants.AmzMpPartsCount, strconv.Itoa(partsCount))
+ glog.V(3).Infof("HeadObject: Set PartsCount=%d for part %d", partsCount, partNumber)
- // when HEAD a directory, it should be reported as no such key
- // https://github.com/seaweedfs/seaweedfs/issues/3457
- if resp.ContentLength == -1 && resp.StatusCode != http.StatusNotModified {
- s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
- return
- }
-
- if resp.StatusCode == http.StatusBadRequest {
- resp_body, _ := io.ReadAll(resp.Body)
- switch string(resp_body) {
- case "InvalidPart":
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
- default:
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest)
+ // Override ETag with the part's ETag
+ if partInfo != nil {
+ // Use part ETag from metadata (accurate for multi-chunk parts)
+ w.Header().Set("ETag", "\""+partInfo.ETag+"\"")
+ glog.V(3).Infof("HeadObject: Override ETag with part %d ETag: %s (from metadata)", partNumber, partInfo.ETag)
+ } else {
+ // Fallback: use chunk's ETag (backward compatibility)
+ chunkIndex := partNumber - 1
+ if chunkIndex >= len(objectEntryForSSE.Chunks) {
+ glog.Warningf("HeadObject: Part %d chunk index %d out of range (chunks: %d)", partNumber, chunkIndex, len(objectEntryForSSE.Chunks))
+ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
+ return
+ }
+ partChunk := objectEntryForSSE.Chunks[chunkIndex]
+ if partChunk.ETag != "" {
+ if md5Bytes, decodeErr := base64.StdEncoding.DecodeString(partChunk.ETag); decodeErr == nil {
+ partETag := fmt.Sprintf("%x", md5Bytes)
+ w.Header().Set("ETag", "\""+partETag+"\"")
+ glog.V(3).Infof("HeadObject: Override ETag with part %d ETag: %s (fallback from chunk)", partNumber, partETag)
+ }
+ }
+ }
}
- resp.Body.Close()
- return
}
- setUserMetadataKeyToLowercase(resp)
-
- responseStatusCode, bytesTransferred := responseFn(resp, w)
- BucketTrafficSent(bytesTransferred, r)
- s3err.PostLog(r, responseStatusCode, s3err.ErrNone)
-}
-
-func setUserMetadataKeyToLowercase(resp *http.Response) {
- for key, value := range resp.Header {
- if strings.HasPrefix(key, s3_constants.AmzUserMetaPrefix) {
- resp.Header[strings.ToLower(key)] = value
- delete(resp.Header, key)
+ // Detect and handle SSE
+ glog.V(3).Infof("HeadObjectHandler: Retrieved entry for %s%s - %d chunks", bucket, object, len(objectEntryForSSE.Chunks))
+ sseType := s3a.detectPrimarySSEType(objectEntryForSSE)
+ glog.V(2).Infof("HeadObjectHandler: Detected SSE type: %s", sseType)
+ if sseType != "" && sseType != "None" {
+ // Validate SSE headers for encrypted objects
+ switch sseType {
+ case s3_constants.SSETypeC:
+ customerKey, err := ParseSSECHeaders(r)
+ if err != nil {
+ s3err.WriteErrorResponse(w, r, MapSSECErrorToS3Error(err))
+ return
+ }
+ if customerKey == nil {
+ s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
+ return
+ }
+ // Validate key MD5
+ if objectEntryForSSE.Extended != nil {
+ storedKeyMD5 := string(objectEntryForSSE.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5])
+ if storedKeyMD5 != "" && customerKey.KeyMD5 != storedKeyMD5 {
+ s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied)
+ return
+ }
+ }
}
+ // Add SSE response headers
+ s3a.addSSEResponseHeadersFromEntry(w, r, objectEntryForSSE, sseType)
}
+
+ w.WriteHeader(http.StatusOK)
}
func captureCORSHeaders(w http.ResponseWriter, headersToCapture []string) map[string]string {
@@ -934,247 +2598,6 @@ func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http.
}
}
-// handleSSEResponse handles both SSE-C and SSE-KMS decryption/validation and response processing
-// The objectEntry parameter should be the correct entry for the requested version (if versioned)
-func (s3a *S3ApiServer) handleSSEResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, objectEntry *filer_pb.Entry) (statusCode int, bytesTransferred int64) {
- // Check what the client is expecting based on request headers
- clientExpectsSSEC := IsSSECRequest(r)
-
- // Check what the stored object has in headers (may be conflicting after copy)
- kmsMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader)
-
- // Detect actual object SSE type from the provided entry (respects versionId)
- actualObjectType := "Unknown"
- if objectEntry != nil {
- actualObjectType = s3a.detectPrimarySSEType(objectEntry)
- }
-
- // If objectEntry is nil, we cannot determine SSE type from chunks
- // This should only happen for 404s which will be handled by the proxy
- if objectEntry == nil {
- glog.V(4).Infof("Object entry not available for SSE routing, passing through")
- return passThroughResponse(proxyResponse, w)
- }
-
- // Route based on ACTUAL object type (from chunks) rather than conflicting headers
- if actualObjectType == s3_constants.SSETypeC && clientExpectsSSEC {
- // Object is SSE-C and client expects SSE-C → SSE-C handler
- return s3a.handleSSECResponse(r, proxyResponse, w, objectEntry)
- } else if actualObjectType == s3_constants.SSETypeKMS && !clientExpectsSSEC {
- // Object is SSE-KMS and client doesn't expect SSE-C → SSE-KMS handler
- return s3a.handleSSEKMSResponse(r, proxyResponse, w, objectEntry, kmsMetadataHeader)
- } else if actualObjectType == s3_constants.SSETypeS3 && !clientExpectsSSEC {
- // Object is SSE-S3 and client doesn't expect SSE-C → SSE-S3 handler
- return s3a.handleSSES3Response(r, proxyResponse, w, objectEntry)
- } else if actualObjectType == "None" && !clientExpectsSSEC {
- // Object is unencrypted and client doesn't expect SSE-C → pass through
- return passThroughResponse(proxyResponse, w)
- } else if actualObjectType == s3_constants.SSETypeC && !clientExpectsSSEC {
- // Object is SSE-C but client doesn't provide SSE-C headers → Error
- s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
- return http.StatusBadRequest, 0
- } else if actualObjectType == s3_constants.SSETypeKMS && clientExpectsSSEC {
- // Object is SSE-KMS but client provides SSE-C headers → Error
- s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
- return http.StatusBadRequest, 0
- } else if actualObjectType == s3_constants.SSETypeS3 && clientExpectsSSEC {
- // Object is SSE-S3 but client provides SSE-C headers → Error (mismatched encryption)
- s3err.WriteErrorResponse(w, r, s3err.ErrSSEEncryptionTypeMismatch)
- return http.StatusBadRequest, 0
- } else if actualObjectType == "None" && clientExpectsSSEC {
- // Object is unencrypted but client provides SSE-C headers → Error
- s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing)
- return http.StatusBadRequest, 0
- }
-
- // Unknown state - pass through and let proxy handle it
- glog.V(4).Infof("Unknown SSE state: objectType=%s, clientExpectsSSEC=%v", actualObjectType, clientExpectsSSEC)
- return passThroughResponse(proxyResponse, w)
-}
-
-// handleSSEKMSResponse handles SSE-KMS decryption and response processing
-func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, entry *filer_pb.Entry, kmsMetadataHeader string) (statusCode int, bytesTransferred int64) {
- // Deserialize SSE-KMS metadata
- kmsMetadataBytes, err := base64.StdEncoding.DecodeString(kmsMetadataHeader)
- if err != nil {
- glog.Errorf("Failed to decode SSE-KMS metadata: %v", err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
-
- sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes)
- if err != nil {
- glog.Errorf("Failed to deserialize SSE-KMS metadata: %v", err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
-
- // For HEAD requests, we don't need to decrypt the body, just add response headers
- if r.Method == "HEAD" {
- // Capture existing CORS headers that may have been set by middleware
- capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
-
- // Copy headers from proxy response (excluding internal SeaweedFS headers)
- copyResponseHeaders(w, proxyResponse, false)
-
- // Add SSE-KMS response headers
- AddSSEKMSResponseHeaders(w, sseKMSKey)
-
- return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders)
- }
-
- // For GET requests, check if this is a multipart SSE-KMS object
- // We need to check the object structure to determine if it's multipart encrypted
- isMultipartSSEKMS := false
-
- if sseKMSKey != nil && entry != nil {
- // Use the entry parameter passed from the caller (avoids redundant lookup)
- // Check for multipart SSE-KMS
- sseKMSChunks := 0
- for _, chunk := range entry.GetChunks() {
- if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 {
- sseKMSChunks++
- }
- }
- isMultipartSSEKMS = sseKMSChunks > 1
- }
-
- var decryptedReader io.Reader
- if isMultipartSSEKMS {
- // Handle multipart SSE-KMS objects - each chunk needs independent decryption
- multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse, entry)
- if decErr != nil {
- glog.Errorf("Failed to create multipart SSE-KMS decrypted reader: %v", decErr)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
- decryptedReader = multipartReader
- glog.V(3).Infof("Using multipart SSE-KMS decryption for object")
- } else {
- // Handle single-part SSE-KMS objects
- singlePartReader, decErr := CreateSSEKMSDecryptedReader(proxyResponse.Body, sseKMSKey)
- if decErr != nil {
- glog.Errorf("Failed to create SSE-KMS decrypted reader: %v", decErr)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
- decryptedReader = singlePartReader
- glog.V(3).Infof("Using single-part SSE-KMS decryption for object")
- }
-
- // Capture existing CORS headers that may have been set by middleware
- capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
-
- // Copy headers from proxy response (excluding body-related headers that might change and internal SeaweedFS headers)
- copyResponseHeaders(w, proxyResponse, true)
-
- // Set correct Content-Length for SSE-KMS
- if proxyResponse.Header.Get("Content-Range") == "" {
- // For full object requests, encrypted length equals original length
- if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" {
- w.Header().Set("Content-Length", contentLengthStr)
- }
- }
-
- // Add SSE-KMS response headers
- AddSSEKMSResponseHeaders(w, sseKMSKey)
-
- return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders)
-}
-
-// handleSSES3Response handles SSE-S3 decryption and response processing
-func (s3a *S3ApiServer) handleSSES3Response(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, entry *filer_pb.Entry) (statusCode int, bytesTransferred int64) {
-
- // For HEAD requests, we don't need to decrypt the body, just add response headers
- if r.Method == "HEAD" {
- // Capture existing CORS headers that may have been set by middleware
- capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
-
- // Copy headers from proxy response (excluding internal SeaweedFS headers)
- copyResponseHeaders(w, proxyResponse, false)
-
- // Add SSE-S3 response headers
- w.Header().Set(s3_constants.AmzServerSideEncryption, SSES3Algorithm)
-
- return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders)
- }
-
- // For GET requests, check if this is a multipart SSE-S3 object
- isMultipartSSES3 := false
- sses3Chunks := 0
- for _, chunk := range entry.GetChunks() {
- if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 && len(chunk.GetSseMetadata()) > 0 {
- sses3Chunks++
- }
- }
- isMultipartSSES3 = sses3Chunks > 1
-
- var decryptedReader io.Reader
- if isMultipartSSES3 {
- // Handle multipart SSE-S3 objects - each chunk needs independent decryption
- multipartReader, decErr := s3a.createMultipartSSES3DecryptedReader(r, entry)
- if decErr != nil {
- glog.Errorf("Failed to create multipart SSE-S3 decrypted reader: %v", decErr)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
- decryptedReader = multipartReader
- glog.V(3).Infof("Using multipart SSE-S3 decryption for object")
- } else {
- // Handle single-part SSE-S3 objects
- // Extract SSE-S3 key from metadata
- keyManager := GetSSES3KeyManager()
- if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]; !exists {
- glog.Errorf("SSE-S3 key metadata not found in object entry")
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- } else {
- sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager)
- if err != nil {
- glog.Errorf("Failed to deserialize SSE-S3 metadata: %v", err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
-
- // Extract IV from metadata using helper function
- iv, err := GetSSES3IV(entry, sseS3Key, keyManager)
- if err != nil {
- glog.Errorf("Failed to get SSE-S3 IV: %v", err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
-
- singlePartReader, decErr := CreateSSES3DecryptedReader(proxyResponse.Body, sseS3Key, iv)
- if decErr != nil {
- glog.Errorf("Failed to create SSE-S3 decrypted reader: %v", decErr)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
- return http.StatusInternalServerError, 0
- }
- decryptedReader = singlePartReader
- glog.V(3).Infof("Using single-part SSE-S3 decryption for object")
- }
- }
-
- // Capture existing CORS headers that may have been set by middleware
- capturedCORSHeaders := captureCORSHeaders(w, corsHeaders)
-
- // Copy headers from proxy response (excluding body-related headers that might change and internal SeaweedFS headers)
- copyResponseHeaders(w, proxyResponse, true)
-
- // Set correct Content-Length for SSE-S3
- if proxyResponse.Header.Get("Content-Range") == "" {
- // For full object requests, encrypted length equals original length
- if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" {
- w.Header().Set("Content-Length", contentLengthStr)
- }
- }
-
- // Add SSE-S3 response headers
- w.Header().Set(s3_constants.AmzServerSideEncryption, SSES3Algorithm)
-
- return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders)
-}
-
// addObjectLockHeadersToResponse extracts object lock metadata from entry Extended attributes
// and adds the appropriate S3 headers to the response
func (s3a *S3ApiServer) addObjectLockHeadersToResponse(w http.ResponseWriter, entry *filer_pb.Entry) {
@@ -1266,6 +2689,11 @@ func (s3a *S3ApiServer) addSSEHeadersToResponse(proxyResponse *http.Response, en
// detectPrimarySSEType determines the primary SSE type by examining chunk metadata
func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string {
+ // Safety check: handle nil entry
+ if entry == nil {
+ return "None"
+ }
+
if len(entry.GetChunks()) == 0 {
// No chunks - check object-level metadata only (single objects or smallContent)
hasSSEC := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] != nil
@@ -1346,10 +2774,95 @@ func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string {
return "None"
}
-// createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects
-func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) {
- // Entry is passed from caller to avoid redundant filer lookup
+// createMultipartSSECDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-C objects (direct volume path)
+// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O.
+// It's kept in the signature for API consistency with non-Direct versions.
+func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, customerKey *SSECustomerKey, entry *filer_pb.Entry) (io.Reader, error) {
+ // Sort chunks by offset to ensure correct order
+ chunks := entry.GetChunks()
+ sort.Slice(chunks, func(i, j int) bool {
+ return chunks[i].GetOffset() < chunks[j].GetOffset()
+ })
+
+ // Create readers for each chunk, decrypting them independently
+ var readers []io.Reader
+
+ for _, chunk := range chunks {
+ // Get this chunk's encrypted data
+ chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create chunk reader: %v", err)
+ }
+
+ // Handle based on chunk's encryption type
+ if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
+ // Check if this chunk has per-chunk SSE-C metadata
+ if len(chunk.GetSseMetadata()) == 0 {
+ chunkReader.Close()
+ return nil, fmt.Errorf("SSE-C chunk %s missing per-chunk metadata", chunk.GetFileIdString())
+ }
+
+ // Deserialize the SSE-C metadata
+ ssecMetadata, err := DeserializeSSECMetadata(chunk.GetSseMetadata())
+ if err != nil {
+ chunkReader.Close()
+ return nil, fmt.Errorf("failed to deserialize SSE-C metadata for chunk %s: %v", chunk.GetFileIdString(), err)
+ }
+
+ // Decode the IV from the metadata
+ chunkIV, err := base64.StdEncoding.DecodeString(ssecMetadata.IV)
+ if err != nil {
+ chunkReader.Close()
+ return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), err)
+ }
+
+ glog.V(4).Infof("Decrypting SSE-C chunk %s with IV=%x, PartOffset=%d",
+ chunk.GetFileIdString(), chunkIV[:8], ssecMetadata.PartOffset)
+
+ // Note: SSE-C multipart behavior (differs from SSE-KMS/SSE-S3):
+ // - Upload: CreateSSECEncryptedReader generates RANDOM IV per part (no base IV + offset)
+ // - Metadata: PartOffset is stored but not used during encryption
+ // - Decryption: Use stored random IV directly (no offset adjustment needed)
+ //
+ // This differs from:
+ // - SSE-KMS/SSE-S3: Use base IV + calculateIVWithOffset(partOffset) during encryption
+ // - CopyObject: Applies calculateIVWithOffset to SSE-C (which may be incorrect)
+ //
+ // TODO: Investigate CopyObject SSE-C PartOffset handling for consistency
+ decryptedChunkReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV)
+ if decErr != nil {
+ chunkReader.Close()
+ return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr)
+ }
+ // Use the streaming decrypted reader directly
+ readers = append(readers, struct {
+ io.Reader
+ io.Closer
+ }{
+ Reader: decryptedChunkReader,
+ Closer: chunkReader,
+ })
+ glog.V(4).Infof("Added streaming decrypted reader for SSE-C chunk %s", chunk.GetFileIdString())
+ } else {
+ // Non-SSE-C chunk, use as-is
+ readers = append(readers, chunkReader)
+ glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString())
+ }
+ }
+
+ // Close the original encrypted stream since we're reading chunks individually
+ if encryptedStream != nil {
+ encryptedStream.Close()
+ }
+
+ return NewMultipartSSEReader(readers), nil
+}
+
+// createMultipartSSEKMSDecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-KMS objects (direct volume path)
+// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O.
+// It's kept in the signature for API consistency with non-Direct versions.
+func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) {
// Sort chunks by offset to ensure correct order
chunks := entry.GetChunks()
sort.Slice(chunks, func(i, j int) bool {
@@ -1361,55 +2874,64 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, pr
for _, chunk := range chunks {
// Get this chunk's encrypted data
- chunkReader, err := s3a.createEncryptedChunkReader(chunk)
+ chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk)
if err != nil {
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
}
- // Get SSE-KMS metadata for this chunk
- var chunkSSEKMSKey *SSEKMSKey
+ // Handle based on chunk's encryption type
+ if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS {
+ // Check if this chunk has per-chunk SSE-KMS metadata
+ if len(chunk.GetSseMetadata()) == 0 {
+ chunkReader.Close()
+ return nil, fmt.Errorf("SSE-KMS chunk %s missing per-chunk metadata", chunk.GetFileIdString())
+ }
- // Check if this chunk has per-chunk SSE-KMS metadata (new architecture)
- if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 {
// Use the per-chunk SSE-KMS metadata
kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata())
if err != nil {
- glog.Errorf("Failed to deserialize per-chunk SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err)
- } else {
- // ChunkOffset is already set from the stored metadata (PartOffset)
- chunkSSEKMSKey = kmsKey
+ chunkReader.Close()
+ return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err)
}
- }
- // Note: No fallback to object-level metadata for multipart objects
- // Each chunk in a multipart SSE-KMS object must have its own unique IV
- // Falling back to object-level metadata could lead to IV reuse or incorrect decryption
+ glog.V(4).Infof("Decrypting SSE-KMS chunk %s with KeyID=%s",
+ chunk.GetFileIdString(), kmsKey.KeyID)
- if chunkSSEKMSKey == nil {
- return nil, fmt.Errorf("no SSE-KMS metadata found for chunk %s in multipart object", chunk.GetFileIdString())
- }
+ // Create decrypted reader for this chunk
+ decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, kmsKey)
+ if decErr != nil {
+ chunkReader.Close()
+ return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr)
+ }
- // Create decrypted reader for this chunk
- decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, chunkSSEKMSKey)
- if decErr != nil {
- chunkReader.Close() // Close the chunk reader if decryption fails
- return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr)
+ // Use the streaming decrypted reader directly
+ readers = append(readers, struct {
+ io.Reader
+ io.Closer
+ }{
+ Reader: decryptedChunkReader,
+ Closer: chunkReader,
+ })
+ glog.V(4).Infof("Added streaming decrypted reader for SSE-KMS chunk %s", chunk.GetFileIdString())
+ } else {
+ // Non-SSE-KMS chunk, use as-is
+ readers = append(readers, chunkReader)
+ glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString())
}
-
- // Use the streaming decrypted reader directly instead of reading into memory
- readers = append(readers, decryptedChunkReader)
- glog.V(4).Infof("Added streaming decrypted reader for chunk %s in multipart SSE-KMS object", chunk.GetFileIdString())
}
- // Combine all decrypted chunk readers into a single stream with proper resource management
- multiReader := NewMultipartSSEReader(readers)
- glog.V(3).Infof("Created multipart SSE-KMS decrypted reader with %d chunks", len(readers))
+ // Close the original encrypted stream since we're reading chunks individually
+ if encryptedStream != nil {
+ encryptedStream.Close()
+ }
- return multiReader, nil
+ return NewMultipartSSEReader(readers), nil
}
-// createMultipartSSES3DecryptedReader creates a reader for multipart SSE-S3 objects
-func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, entry *filer_pb.Entry) (io.Reader, error) {
+// createMultipartSSES3DecryptedReaderDirect creates a reader that decrypts each chunk independently for multipart SSE-S3 objects (direct volume path)
+// Note: encryptedStream parameter is unused (always nil) as this function fetches chunks directly to avoid double I/O.
+// It's kept in the signature for API consistency with non-Direct versions.
+func (s3a *S3ApiServer) createMultipartSSES3DecryptedReaderDirect(ctx context.Context, encryptedStream io.ReadCloser, entry *filer_pb.Entry) (io.Reader, error) {
// Sort chunks by offset to ensure correct order
chunks := entry.GetChunks()
sort.Slice(chunks, func(i, j int) bool {
@@ -1418,54 +2940,50 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, ent
// Create readers for each chunk, decrypting them independently
var readers []io.Reader
+
+ // Get key manager and SSE-S3 key from entry metadata
keyManager := GetSSES3KeyManager()
+ keyData := entry.Extended[s3_constants.SeaweedFSSSES3Key]
+ sseS3Key, err := DeserializeSSES3Metadata(keyData, keyManager)
+ if err != nil {
+ return nil, fmt.Errorf("failed to deserialize SSE-S3 key from entry metadata: %v", err)
+ }
for _, chunk := range chunks {
// Get this chunk's encrypted data
- chunkReader, err := s3a.createEncryptedChunkReader(chunk)
+ chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk)
if err != nil {
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
}
// Handle based on chunk's encryption type
if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 {
- var chunkSSES3Key *SSES3Key
-
// Check if this chunk has per-chunk SSE-S3 metadata
- if len(chunk.GetSseMetadata()) > 0 {
- // Use the per-chunk SSE-S3 metadata
- sseKey, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager)
- if err != nil {
- glog.Errorf("Failed to deserialize per-chunk SSE-S3 metadata for chunk %s: %v", chunk.GetFileIdString(), err)
- chunkReader.Close()
- return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %v", err)
- }
- chunkSSES3Key = sseKey
- }
-
- // Note: No fallback to object-level metadata for multipart objects
- // Each chunk in a multipart SSE-S3 object must have its own unique IV
- // Falling back to object-level metadata could lead to IV reuse or incorrect decryption
-
- if chunkSSES3Key == nil {
+ if len(chunk.GetSseMetadata()) == 0 {
chunkReader.Close()
- return nil, fmt.Errorf("no SSE-S3 metadata found for chunk %s in multipart object", chunk.GetFileIdString())
+ return nil, fmt.Errorf("SSE-S3 chunk %s missing per-chunk metadata", chunk.GetFileIdString())
}
- // Extract IV from chunk metadata
- if len(chunkSSES3Key.IV) == 0 {
+ // Deserialize the per-chunk SSE-S3 metadata to get the IV
+ chunkSSES3Metadata, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager)
+ if err != nil {
chunkReader.Close()
- return nil, fmt.Errorf("no IV found in SSE-S3 metadata for chunk %s", chunk.GetFileIdString())
+ return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata for chunk %s: %v", chunk.GetFileIdString(), err)
}
+ // Use the IV from the chunk metadata
+ iv := chunkSSES3Metadata.IV
+ glog.V(4).Infof("Decrypting SSE-S3 chunk %s with KeyID=%s, IV length=%d",
+ chunk.GetFileIdString(), sseS3Key.KeyID, len(iv))
+
// Create decrypted reader for this chunk
- decryptedChunkReader, decErr := CreateSSES3DecryptedReader(chunkReader, chunkSSES3Key, chunkSSES3Key.IV)
+ decryptedChunkReader, decErr := CreateSSES3DecryptedReader(chunkReader, sseS3Key, iv)
if decErr != nil {
chunkReader.Close()
- return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr)
+ return nil, fmt.Errorf("failed to decrypt SSE-S3 chunk: %v", decErr)
}
- // Use the streaming decrypted reader directly, ensuring the underlying chunkReader can be closed
+ // Use the streaming decrypted reader directly
readers = append(readers, struct {
io.Reader
io.Closer
@@ -1473,37 +2991,45 @@ func (s3a *S3ApiServer) createMultipartSSES3DecryptedReader(r *http.Request, ent
Reader: decryptedChunkReader,
Closer: chunkReader,
})
- glog.V(4).Infof("Added streaming decrypted reader for chunk %s in multipart SSE-S3 object", chunk.GetFileIdString())
+ glog.V(4).Infof("Added streaming decrypted reader for SSE-S3 chunk %s", chunk.GetFileIdString())
} else {
- // Non-SSE-S3 chunk (unencrypted or other encryption type), use as-is
+ // Non-SSE-S3 chunk, use as-is
readers = append(readers, chunkReader)
- glog.V(4).Infof("Added passthrough reader for non-SSE-S3 chunk %s (type: %v)", chunk.GetFileIdString(), chunk.GetSseType())
+ glog.V(4).Infof("Added non-encrypted reader for chunk %s", chunk.GetFileIdString())
}
}
- // Combine all decrypted chunk readers into a single stream
- multiReader := NewMultipartSSEReader(readers)
- glog.V(3).Infof("Created multipart SSE-S3 decrypted reader with %d chunks", len(readers))
+ // Close the original encrypted stream since we're reading chunks individually
+ if encryptedStream != nil {
+ encryptedStream.Close()
+ }
- return multiReader, nil
+ return NewMultipartSSEReader(readers), nil
}
// createEncryptedChunkReader creates a reader for a single encrypted chunk
-func (s3a *S3ApiServer) createEncryptedChunkReader(chunk *filer_pb.FileChunk) (io.ReadCloser, error) {
+// Context propagation ensures cancellation if the S3 client disconnects
+func (s3a *S3ApiServer) createEncryptedChunkReader(ctx context.Context, chunk *filer_pb.FileChunk) (io.ReadCloser, error) {
// Get chunk URL
srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString())
if err != nil {
return nil, fmt.Errorf("lookup volume URL for chunk %s: %v", chunk.GetFileIdString(), err)
}
- // Create HTTP request for chunk data
- req, err := http.NewRequest("GET", srcUrl, nil)
+ // Create HTTP request with context for cancellation propagation
+ req, err := http.NewRequestWithContext(ctx, "GET", srcUrl, nil)
if err != nil {
return nil, fmt.Errorf("create HTTP request for chunk: %v", err)
}
- // Execute request
- resp, err := http.DefaultClient.Do(req)
+ // Attach volume server JWT for authentication (matches filer behavior)
+ jwt := security.GenJwtForVolumeServer(s3a.filerGuard.ReadSigningKey, s3a.filerGuard.ReadExpiresAfterSec, chunk.GetFileIdString())
+ if jwt != "" {
+ req.Header.Set("Authorization", "BEARER "+string(jwt))
+ }
+
+ // Use shared HTTP client with connection pooling
+ resp, err := volumeServerHTTPClient.Do(req)
if err != nil {
return nil, fmt.Errorf("execute HTTP request for chunk: %v", err)
}
@@ -1525,9 +3051,10 @@ type MultipartSSEReader struct {
// SSERangeReader applies range logic to an underlying reader
type SSERangeReader struct {
reader io.Reader
- offset int64 // bytes to skip from the beginning
- remaining int64 // bytes remaining to read (-1 for unlimited)
- skipped int64 // bytes already skipped
+ offset int64 // bytes to skip from the beginning
+ remaining int64 // bytes remaining to read (-1 for unlimited)
+ skipped int64 // bytes already skipped
+ skipBuf []byte // reusable buffer for skipping bytes (avoids per-call allocation)
}
// NewMultipartSSEReader creates a new multipart reader that can properly close all underlying readers
@@ -1559,21 +3086,34 @@ func (m *MultipartSSEReader) Close() error {
// Read implements the io.Reader interface for SSERangeReader
func (r *SSERangeReader) Read(p []byte) (n int, err error) {
-
- // If we need to skip bytes and haven't skipped enough yet
- if r.skipped < r.offset {
+ // Skip bytes iteratively (no recursion) until we reach the offset
+ for r.skipped < r.offset {
skipNeeded := r.offset - r.skipped
- skipBuf := make([]byte, min(int64(len(p)), skipNeeded))
- skipRead, skipErr := r.reader.Read(skipBuf)
+
+ // Lazily allocate skip buffer on first use, reuse thereafter
+ if r.skipBuf == nil {
+ // Use a fixed 32KB buffer for skipping (avoids per-call allocation)
+ r.skipBuf = make([]byte, 32*1024)
+ }
+
+ // Determine how much to skip in this iteration
+ bufSize := int64(len(r.skipBuf))
+ if skipNeeded < bufSize {
+ bufSize = skipNeeded
+ }
+
+ skipRead, skipErr := r.reader.Read(r.skipBuf[:bufSize])
r.skipped += int64(skipRead)
if skipErr != nil {
return 0, skipErr
}
- // If we still need to skip more, recurse
- if r.skipped < r.offset {
- return r.Read(p)
+ // Guard against infinite loop: io.Reader may return (0, nil)
+ // which is permitted by the interface contract for non-empty buffers.
+ // If we get zero bytes without an error, treat it as an unexpected EOF.
+ if skipRead == 0 {
+ return 0, io.ErrUnexpectedEOF
}
}
@@ -1600,6 +3140,8 @@ func (r *SSERangeReader) Read(p []byte) (n int, err error) {
// createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects
// Each chunk has its own IV and encryption key from the original multipart parts
func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) {
+ ctx := r.Context()
+
// Parse SSE-C headers from the request for decryption key
customerKey, err := ParseSSECHeaders(r)
if err != nil {
@@ -1659,7 +3201,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox
for _, chunk := range neededChunks {
// Get this chunk's encrypted data
- chunkReader, err := s3a.createEncryptedChunkReader(chunk)
+ chunkReader, err := s3a.createEncryptedChunkReader(ctx, chunk)
if err != nil {
return nil, fmt.Errorf("failed to create chunk reader: %v", err)
}
@@ -1679,13 +3221,10 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox
return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), ivErr)
}
- // Calculate the correct IV for this chunk using within-part offset
- var chunkIV []byte
- if ssecMetadata.PartOffset > 0 {
- chunkIV = calculateIVWithOffset(iv, ssecMetadata.PartOffset)
- } else {
- chunkIV = iv
- }
+ // Note: For multipart SSE-C, each part was encrypted with offset=0
+ // So we use the stored IV directly without offset adjustment
+ // PartOffset is stored for informational purposes, but encryption uses offset=0
+ chunkIV := iv
decryptedReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV)
if decErr != nil {
@@ -1725,3 +3264,55 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, prox
return multiReader, nil
}
+
+// PartBoundaryInfo holds information about a part's chunk boundaries
+type PartBoundaryInfo struct {
+ PartNumber int `json:"part"`
+ StartChunk int `json:"start"`
+ EndChunk int `json:"end"` // exclusive
+ ETag string `json:"etag"`
+}
+
+// rc is a helper type that wraps a Reader and Closer for proper resource cleanup
+type rc struct {
+ io.Reader
+ io.Closer
+}
+
+// getMultipartInfo retrieves multipart metadata for a given part number
+// Returns: (partsCount, partInfo)
+// - partsCount: total number of parts in the multipart object
+// - partInfo: boundary information for the requested part (nil if not found or not a multipart object)
+func (s3a *S3ApiServer) getMultipartInfo(entry *filer_pb.Entry, partNumber int) (int, *PartBoundaryInfo) {
+ if entry == nil {
+ return 0, nil
+ }
+ if entry.Extended == nil {
+ // Not a multipart object or no metadata
+ return len(entry.GetChunks()), nil
+ }
+
+ // Try to get parts count from metadata
+ partsCount := len(entry.GetChunks()) // default fallback
+ if partsCountBytes, exists := entry.Extended[s3_constants.SeaweedFSMultipartPartsCount]; exists {
+ if count, err := strconv.Atoi(string(partsCountBytes)); err == nil && count > 0 {
+ partsCount = count
+ }
+ }
+
+ // Try to get part boundaries from metadata
+ if boundariesJSON, exists := entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries]; exists {
+ var boundaries []PartBoundaryInfo
+ if err := json.Unmarshal(boundariesJSON, &boundaries); err == nil {
+ // Find the requested part
+ for i := range boundaries {
+ if boundaries[i].PartNumber == partNumber {
+ return partsCount, &boundaries[i]
+ }
+ }
+ }
+ }
+
+ // No part boundaries metadata or part not found
+ return partsCount, nil
+}