diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers.go | 849 |
1 files changed, 835 insertions, 14 deletions
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 70d36cd7e..75c9a9e91 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -2,11 +2,13 @@ package s3api import ( "bytes" + "encoding/base64" "errors" "fmt" "io" "net/http" "net/url" + "sort" "strconv" "strings" "time" @@ -244,6 +246,20 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) return // Directory object request was handled } + // Check conditional headers for read operations + result := s3a.checkConditionalHeadersForReads(r, bucket, object) + if result.ErrorCode != s3err.ErrNone { + glog.V(3).Infof("GetObjectHandler: Conditional header check failed for %s/%s with error %v", bucket, object, result.ErrorCode) + + // For 304 Not Modified responses, include the ETag header + if result.ErrorCode == s3err.ErrNotModified && result.ETag != "" { + w.Header().Set("ETag", result.ETag) + } + + s3err.WriteErrorResponse(w, r, result.ErrorCode) + return + } + // Check for specific version ID in query parameters versionId := r.URL.Query().Get("versionId") @@ -328,7 +344,42 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) destUrl = s3a.toFilerUrl(bucket, object) } - s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse) + // Check if this is a range request to an SSE object and modify the approach + originalRangeHeader := r.Header.Get("Range") + var sseObject = false + + // Pre-check if this object is SSE encrypted to avoid filer range conflicts + if originalRangeHeader != "" { + bucket, object := s3_constants.GetBucketAndObject(r) + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + if objectEntry, err := s3a.getEntry("", objectPath); err == nil { + primarySSEType := s3a.detectPrimarySSEType(objectEntry) + if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS { + sseObject = true + // Temporarily remove Range header to get full encrypted data from filer + r.Header.Del("Range") + + } + } + } + + s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { + // Restore the original Range header for SSE processing + if sseObject && originalRangeHeader != "" { + r.Header.Set("Range", originalRangeHeader) + + } + + // Add SSE metadata headers based on object metadata before SSE processing + bucket, object := s3_constants.GetBucketAndObject(r) + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + if objectEntry, err := s3a.getEntry("", objectPath); err == nil { + s3a.addSSEHeadersToResponse(proxyResponse, objectEntry) + } + + // Handle SSE decryption (both SSE-C and SSE-KMS) if needed + return s3a.handleSSEResponse(r, proxyResponse, w) + }) } func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { @@ -341,6 +392,20 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request return // Directory object request was handled } + // Check conditional headers for read operations + result := s3a.checkConditionalHeadersForReads(r, bucket, object) + if result.ErrorCode != s3err.ErrNone { + glog.V(3).Infof("HeadObjectHandler: Conditional header check failed for %s/%s with error %v", bucket, object, result.ErrorCode) + + // For 304 Not Modified responses, include the ETag header + if result.ErrorCode == s3err.ErrNotModified && result.ETag != "" { + w.Header().Set("ETag", result.ETag) + } + + s3err.WriteErrorResponse(w, r, result.ErrorCode) + return + } + // Check for specific version ID in query parameters versionId := r.URL.Query().Get("versionId") @@ -423,7 +488,10 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request destUrl = s3a.toFilerUrl(bucket, object) } - s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse) + s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { + // Handle SSE validation (both SSE-C and SSE-KMS) for HEAD requests + return s3a.handleSSEResponse(r, proxyResponse, w) + }) } func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, isWrite bool, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64)) { @@ -555,34 +623,357 @@ func restoreCORSHeaders(w http.ResponseWriter, capturedCORSHeaders map[string]st } } -func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { - // Capture existing CORS headers that may have been set by middleware - capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) - - // Copy headers from proxy response - for k, v := range proxyResponse.Header { - w.Header()[k] = v - } - +// writeFinalResponse handles the common response writing logic shared between +// passThroughResponse and handleSSECResponse +func writeFinalResponse(w http.ResponseWriter, proxyResponse *http.Response, bodyReader io.Reader, capturedCORSHeaders map[string]string) (statusCode int, bytesTransferred int64) { // Restore CORS headers that were set by middleware restoreCORSHeaders(w, capturedCORSHeaders) if proxyResponse.Header.Get("Content-Range") != "" && proxyResponse.StatusCode == 200 { - w.WriteHeader(http.StatusPartialContent) statusCode = http.StatusPartialContent } else { statusCode = proxyResponse.StatusCode } w.WriteHeader(statusCode) + + // Stream response data buf := mem.Allocate(128 * 1024) defer mem.Free(buf) - bytesTransferred, err := io.CopyBuffer(w, proxyResponse.Body, buf) + bytesTransferred, err := io.CopyBuffer(w, bodyReader, buf) if err != nil { - glog.V(1).Infof("passthrough response read %d bytes: %v", bytesTransferred, err) + glog.V(1).Infof("response read %d bytes: %v", bytesTransferred, err) } return statusCode, bytesTransferred } +func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { + // Capture existing CORS headers that may have been set by middleware + capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) + + // Copy headers from proxy response + for k, v := range proxyResponse.Header { + w.Header()[k] = v + } + + return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders) +} + +// handleSSECResponse handles SSE-C decryption and response processing +func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { + // Check if the object has SSE-C metadata + sseAlgorithm := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm) + sseKeyMD5 := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerKeyMD5) + isObjectEncrypted := sseAlgorithm != "" && sseKeyMD5 != "" + + // Parse SSE-C headers from request once (avoid duplication) + customerKey, err := ParseSSECHeaders(r) + if err != nil { + errCode := MapSSECErrorToS3Error(err) + s3err.WriteErrorResponse(w, r, errCode) + return http.StatusBadRequest, 0 + } + + if isObjectEncrypted { + // This object was encrypted with SSE-C, validate customer key + if customerKey == nil { + s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) + return http.StatusBadRequest, 0 + } + + // SSE-C MD5 is base64 and case-sensitive + if customerKey.KeyMD5 != sseKeyMD5 { + // For GET/HEAD requests, AWS S3 returns 403 Forbidden for a key mismatch. + s3err.WriteErrorResponse(w, r, s3err.ErrAccessDenied) + return http.StatusForbidden, 0 + } + + // SSE-C encrypted objects support HTTP Range requests + // The IV is stored in metadata and CTR mode allows seeking to any offset + // Range requests will be handled by the filer layer with proper offset-based decryption + + // Check if this is a chunked or small content SSE-C object + bucket, object := s3_constants.GetBucketAndObject(r) + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + if entry, err := s3a.getEntry("", objectPath); err == nil { + // Check for SSE-C chunks + sseCChunks := 0 + for _, chunk := range entry.GetChunks() { + if chunk.GetSseType() == filer_pb.SSEType_SSE_C { + sseCChunks++ + } + } + + if sseCChunks >= 1 { + + // Handle chunked SSE-C objects - each chunk needs independent decryption + multipartReader, decErr := s3a.createMultipartSSECDecryptedReader(r, proxyResponse) + if decErr != nil { + glog.Errorf("Failed to create multipart SSE-C decrypted reader: %v", decErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 + } + + // Capture existing CORS headers + capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) + + // Copy headers from proxy response + for k, v := range proxyResponse.Header { + w.Header()[k] = v + } + + // Set proper headers for range requests + rangeHeader := r.Header.Get("Range") + if rangeHeader != "" { + + // Parse range header (e.g., "bytes=0-99") + if len(rangeHeader) > 6 && rangeHeader[:6] == "bytes=" { + rangeSpec := rangeHeader[6:] + parts := strings.Split(rangeSpec, "-") + if len(parts) == 2 { + startOffset, endOffset := int64(0), int64(-1) + if parts[0] != "" { + startOffset, _ = strconv.ParseInt(parts[0], 10, 64) + } + if parts[1] != "" { + endOffset, _ = strconv.ParseInt(parts[1], 10, 64) + } + + if endOffset >= startOffset { + // Specific range - set proper Content-Length and Content-Range headers + rangeLength := endOffset - startOffset + 1 + totalSize := proxyResponse.Header.Get("Content-Length") + + w.Header().Set("Content-Length", strconv.FormatInt(rangeLength, 10)) + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%s", startOffset, endOffset, totalSize)) + // writeFinalResponse will set status to 206 if Content-Range is present + } + } + } + } + + return writeFinalResponse(w, proxyResponse, multipartReader, capturedCORSHeaders) + } else if len(entry.GetChunks()) == 0 && len(entry.Content) > 0 { + // Small content SSE-C object stored directly in entry.Content + + // Fall through to traditional single-object SSE-C handling below + } + } + + // Single-part SSE-C object: Get IV from proxy response headers (stored during upload) + ivBase64 := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEIVHeader) + if ivBase64 == "" { + glog.Errorf("SSE-C encrypted single-part object missing IV in metadata") + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 + } + + iv, err := base64.StdEncoding.DecodeString(ivBase64) + if err != nil { + glog.Errorf("Failed to decode IV from metadata: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 + } + + // Create decrypted reader with IV from metadata + decryptedReader, decErr := CreateSSECDecryptedReader(proxyResponse.Body, customerKey, iv) + if decErr != nil { + glog.Errorf("Failed to create SSE-C decrypted reader: %v", decErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 + } + + // Capture existing CORS headers that may have been set by middleware + capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) + + // Copy headers from proxy response (excluding body-related headers that might change) + for k, v := range proxyResponse.Header { + if k != "Content-Length" && k != "Content-Encoding" { + w.Header()[k] = v + } + } + + // Set correct Content-Length for SSE-C (only for full object requests) + // With IV stored in metadata, the encrypted length equals the original length + if proxyResponse.Header.Get("Content-Range") == "" { + // Full object request: encrypted length equals original length (IV not in stream) + if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" { + // Content-Length is already correct since IV is stored in metadata, not in data stream + w.Header().Set("Content-Length", contentLengthStr) + } + } + // For range requests, let the actual bytes transferred determine the response length + + // Add SSE-C response headers + w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, sseAlgorithm) + w.Header().Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, sseKeyMD5) + + return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders) + } else { + // Object is not encrypted, but check if customer provided SSE-C headers unnecessarily + if customerKey != nil { + s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyNotNeeded) + return http.StatusBadRequest, 0 + } + + // Normal pass-through response + return passThroughResponse(proxyResponse, w) + } +} + +// handleSSEResponse handles both SSE-C and SSE-KMS decryption/validation and response processing +func (s3a *S3ApiServer) handleSSEResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { + // Check what the client is expecting based on request headers + clientExpectsSSEC := IsSSECRequest(r) + + // Check what the stored object has in headers (may be conflicting after copy) + kmsMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader) + sseAlgorithm := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm) + + // Get actual object state by examining chunks (most reliable for cross-encryption) + bucket, object := s3_constants.GetBucketAndObject(r) + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + actualObjectType := "Unknown" + if objectEntry, err := s3a.getEntry("", objectPath); err == nil { + actualObjectType = s3a.detectPrimarySSEType(objectEntry) + } + + // Route based on ACTUAL object type (from chunks) rather than conflicting headers + if actualObjectType == s3_constants.SSETypeC && clientExpectsSSEC { + // Object is SSE-C and client expects SSE-C → SSE-C handler + return s3a.handleSSECResponse(r, proxyResponse, w) + } else if actualObjectType == s3_constants.SSETypeKMS && !clientExpectsSSEC { + // Object is SSE-KMS and client doesn't expect SSE-C → SSE-KMS handler + return s3a.handleSSEKMSResponse(r, proxyResponse, w, kmsMetadataHeader) + } else if actualObjectType == "None" && !clientExpectsSSEC { + // Object is unencrypted and client doesn't expect SSE-C → pass through + return passThroughResponse(proxyResponse, w) + } else if actualObjectType == s3_constants.SSETypeC && !clientExpectsSSEC { + // Object is SSE-C but client doesn't provide SSE-C headers → Error + s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) + return http.StatusBadRequest, 0 + } else if actualObjectType == s3_constants.SSETypeKMS && clientExpectsSSEC { + // Object is SSE-KMS but client provides SSE-C headers → Error + s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) + return http.StatusBadRequest, 0 + } else if actualObjectType == "None" && clientExpectsSSEC { + // Object is unencrypted but client provides SSE-C headers → Error + s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) + return http.StatusBadRequest, 0 + } + + // Fallback for edge cases - use original logic with header-based detection + if clientExpectsSSEC && sseAlgorithm != "" { + return s3a.handleSSECResponse(r, proxyResponse, w) + } else if !clientExpectsSSEC && kmsMetadataHeader != "" { + return s3a.handleSSEKMSResponse(r, proxyResponse, w, kmsMetadataHeader) + } else { + return passThroughResponse(proxyResponse, w) + } +} + +// handleSSEKMSResponse handles SSE-KMS decryption and response processing +func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, kmsMetadataHeader string) (statusCode int, bytesTransferred int64) { + // Deserialize SSE-KMS metadata + kmsMetadataBytes, err := base64.StdEncoding.DecodeString(kmsMetadataHeader) + if err != nil { + glog.Errorf("Failed to decode SSE-KMS metadata: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 + } + + sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) + if err != nil { + glog.Errorf("Failed to deserialize SSE-KMS metadata: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 + } + + // For HEAD requests, we don't need to decrypt the body, just add response headers + if r.Method == "HEAD" { + // Capture existing CORS headers that may have been set by middleware + capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) + + // Copy headers from proxy response + for k, v := range proxyResponse.Header { + w.Header()[k] = v + } + + // Add SSE-KMS response headers + AddSSEKMSResponseHeaders(w, sseKMSKey) + + return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders) + } + + // For GET requests, check if this is a multipart SSE-KMS object + // We need to check the object structure to determine if it's multipart encrypted + isMultipartSSEKMS := false + + if sseKMSKey != nil { + // Get the object entry to check chunk structure + bucket, object := s3_constants.GetBucketAndObject(r) + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + if entry, err := s3a.getEntry("", objectPath); err == nil { + // Check for multipart SSE-KMS + sseKMSChunks := 0 + for _, chunk := range entry.GetChunks() { + if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 { + sseKMSChunks++ + } + } + isMultipartSSEKMS = sseKMSChunks > 1 + + glog.Infof("SSE-KMS object detection: chunks=%d, sseKMSChunks=%d, isMultipartSSEKMS=%t", + len(entry.GetChunks()), sseKMSChunks, isMultipartSSEKMS) + } + } + + var decryptedReader io.Reader + if isMultipartSSEKMS { + // Handle multipart SSE-KMS objects - each chunk needs independent decryption + multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse) + if decErr != nil { + glog.Errorf("Failed to create multipart SSE-KMS decrypted reader: %v", decErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 + } + decryptedReader = multipartReader + glog.V(3).Infof("Using multipart SSE-KMS decryption for object") + } else { + // Handle single-part SSE-KMS objects + singlePartReader, decErr := CreateSSEKMSDecryptedReader(proxyResponse.Body, sseKMSKey) + if decErr != nil { + glog.Errorf("Failed to create SSE-KMS decrypted reader: %v", decErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 + } + decryptedReader = singlePartReader + glog.V(3).Infof("Using single-part SSE-KMS decryption for object") + } + + // Capture existing CORS headers that may have been set by middleware + capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) + + // Copy headers from proxy response (excluding body-related headers that might change) + for k, v := range proxyResponse.Header { + if k != "Content-Length" && k != "Content-Encoding" { + w.Header()[k] = v + } + } + + // Set correct Content-Length for SSE-KMS + if proxyResponse.Header.Get("Content-Range") == "" { + // For full object requests, encrypted length equals original length + if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" { + w.Header().Set("Content-Length", contentLengthStr) + } + } + + // Add SSE-KMS response headers + AddSSEKMSResponseHeaders(w, sseKMSKey) + + return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders) +} + // addObjectLockHeadersToResponse extracts object lock metadata from entry Extended attributes // and adds the appropriate S3 headers to the response func (s3a *S3ApiServer) addObjectLockHeadersToResponse(w http.ResponseWriter, entry *filer_pb.Entry) { @@ -623,3 +1014,433 @@ func (s3a *S3ApiServer) addObjectLockHeadersToResponse(w http.ResponseWriter, en w.Header().Set(s3_constants.AmzObjectLockLegalHold, s3_constants.LegalHoldOff) } } + +// addSSEHeadersToResponse converts stored SSE metadata from entry.Extended to HTTP response headers +// Uses intelligent prioritization: only set headers for the PRIMARY encryption type to avoid conflicts +func (s3a *S3ApiServer) addSSEHeadersToResponse(proxyResponse *http.Response, entry *filer_pb.Entry) { + if entry == nil || entry.Extended == nil { + return + } + + // Determine the primary encryption type by examining chunks (most reliable) + primarySSEType := s3a.detectPrimarySSEType(entry) + + // Only set headers for the PRIMARY encryption type + switch primarySSEType { + case s3_constants.SSETypeC: + // Add only SSE-C headers + if algorithmBytes, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; exists && len(algorithmBytes) > 0 { + proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, string(algorithmBytes)) + } + + if keyMD5Bytes, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; exists && len(keyMD5Bytes) > 0 { + proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, string(keyMD5Bytes)) + } + + if ivBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEIV]; exists && len(ivBytes) > 0 { + ivBase64 := base64.StdEncoding.EncodeToString(ivBytes) + proxyResponse.Header.Set(s3_constants.SeaweedFSSSEIVHeader, ivBase64) + } + + case s3_constants.SSETypeKMS: + // Add only SSE-KMS headers + if sseAlgorithm, exists := entry.Extended[s3_constants.AmzServerSideEncryption]; exists && len(sseAlgorithm) > 0 { + proxyResponse.Header.Set(s3_constants.AmzServerSideEncryption, string(sseAlgorithm)) + } + + if kmsKeyID, exists := entry.Extended[s3_constants.AmzServerSideEncryptionAwsKmsKeyId]; exists && len(kmsKeyID) > 0 { + proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, string(kmsKeyID)) + } + + default: + // Unencrypted or unknown - don't set any SSE headers + } + + glog.V(3).Infof("addSSEHeadersToResponse: processed %d extended metadata entries", len(entry.Extended)) +} + +// detectPrimarySSEType determines the primary SSE type by examining chunk metadata +func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string { + if len(entry.GetChunks()) == 0 { + // No chunks - check object-level metadata only (single objects or smallContent) + hasSSEC := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] != nil + hasSSEKMS := entry.Extended[s3_constants.AmzServerSideEncryption] != nil + + if hasSSEC && !hasSSEKMS { + return s3_constants.SSETypeC + } else if hasSSEKMS && !hasSSEC { + return s3_constants.SSETypeKMS + } else if hasSSEC && hasSSEKMS { + // Both present - this should only happen during cross-encryption copies + // Use content to determine actual encryption state + if len(entry.Content) > 0 { + // smallContent - check if it's encrypted (heuristic: random-looking data) + return s3_constants.SSETypeC // Default to SSE-C for mixed case + } else { + // No content, both headers - default to SSE-C + return s3_constants.SSETypeC + } + } + return "None" + } + + // Count chunk types to determine primary (multipart objects) + ssecChunks := 0 + ssekmsChunks := 0 + + for _, chunk := range entry.GetChunks() { + switch chunk.GetSseType() { + case filer_pb.SSEType_SSE_C: + ssecChunks++ + case filer_pb.SSEType_SSE_KMS: + ssekmsChunks++ + } + } + + // Primary type is the one with more chunks + if ssecChunks > ssekmsChunks { + return s3_constants.SSETypeC + } else if ssekmsChunks > ssecChunks { + return s3_constants.SSETypeKMS + } else if ssecChunks > 0 { + // Equal number, prefer SSE-C (shouldn't happen in practice) + return s3_constants.SSETypeC + } + + return "None" +} + +// createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects +func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) { + // Get the object path from the request + bucket, object := s3_constants.GetBucketAndObject(r) + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + + // Get the object entry from filer to access chunk information + entry, err := s3a.getEntry("", objectPath) + if err != nil { + return nil, fmt.Errorf("failed to get object entry for multipart SSE-KMS decryption: %v", err) + } + + // Sort chunks by offset to ensure correct order + chunks := entry.GetChunks() + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].GetOffset() < chunks[j].GetOffset() + }) + + // Create readers for each chunk, decrypting them independently + var readers []io.Reader + + for i, chunk := range chunks { + glog.Infof("Processing chunk %d/%d: fileId=%s, offset=%d, size=%d, sse_type=%d", + i+1, len(entry.GetChunks()), chunk.GetFileIdString(), chunk.GetOffset(), chunk.GetSize(), chunk.GetSseType()) + + // Get this chunk's encrypted data + chunkReader, err := s3a.createEncryptedChunkReader(chunk) + if err != nil { + return nil, fmt.Errorf("failed to create chunk reader: %v", err) + } + + // Get SSE-KMS metadata for this chunk + var chunkSSEKMSKey *SSEKMSKey + + // Check if this chunk has per-chunk SSE-KMS metadata (new architecture) + if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseMetadata()) > 0 { + // Use the per-chunk SSE-KMS metadata + kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseMetadata()) + if err != nil { + glog.Errorf("Failed to deserialize per-chunk SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err) + } else { + // ChunkOffset is already set from the stored metadata (PartOffset) + chunkSSEKMSKey = kmsKey + glog.Infof("Using per-chunk SSE-KMS metadata for chunk %s: keyID=%s, IV=%x, partOffset=%d", + chunk.GetFileIdString(), kmsKey.KeyID, kmsKey.IV[:8], kmsKey.ChunkOffset) + } + } + + // Fallback to object-level metadata (legacy support) + if chunkSSEKMSKey == nil { + objectMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader) + if objectMetadataHeader != "" { + kmsMetadataBytes, decodeErr := base64.StdEncoding.DecodeString(objectMetadataHeader) + if decodeErr == nil { + kmsKey, _ := DeserializeSSEKMSMetadata(kmsMetadataBytes) + if kmsKey != nil { + // For object-level metadata (legacy), use absolute file offset as fallback + kmsKey.ChunkOffset = chunk.GetOffset() + chunkSSEKMSKey = kmsKey + } + glog.Infof("Using fallback object-level SSE-KMS metadata for chunk %s with offset %d", chunk.GetFileIdString(), chunk.GetOffset()) + } + } + } + + if chunkSSEKMSKey == nil { + return nil, fmt.Errorf("no SSE-KMS metadata found for chunk %s in multipart object", chunk.GetFileIdString()) + } + + // Create decrypted reader for this chunk + decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, chunkSSEKMSKey) + if decErr != nil { + chunkReader.Close() // Close the chunk reader if decryption fails + return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) + } + + // Use the streaming decrypted reader directly instead of reading into memory + readers = append(readers, decryptedChunkReader) + glog.V(4).Infof("Added streaming decrypted reader for chunk %s in multipart SSE-KMS object", chunk.GetFileIdString()) + } + + // Combine all decrypted chunk readers into a single stream with proper resource management + multiReader := NewMultipartSSEReader(readers) + glog.V(3).Infof("Created multipart SSE-KMS decrypted reader with %d chunks", len(readers)) + + return multiReader, nil +} + +// createEncryptedChunkReader creates a reader for a single encrypted chunk +func (s3a *S3ApiServer) createEncryptedChunkReader(chunk *filer_pb.FileChunk) (io.ReadCloser, error) { + // Get chunk URL + srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString()) + if err != nil { + return nil, fmt.Errorf("lookup volume URL for chunk %s: %v", chunk.GetFileIdString(), err) + } + + // Create HTTP request for chunk data + req, err := http.NewRequest("GET", srcUrl, nil) + if err != nil { + return nil, fmt.Errorf("create HTTP request for chunk: %v", err) + } + + // Execute request + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("execute HTTP request for chunk: %v", err) + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return nil, fmt.Errorf("HTTP request for chunk failed: %d", resp.StatusCode) + } + + return resp.Body, nil +} + +// MultipartSSEReader wraps multiple readers and ensures all underlying readers are properly closed +type MultipartSSEReader struct { + multiReader io.Reader + readers []io.Reader +} + +// SSERangeReader applies range logic to an underlying reader +type SSERangeReader struct { + reader io.Reader + offset int64 // bytes to skip from the beginning + remaining int64 // bytes remaining to read (-1 for unlimited) + skipped int64 // bytes already skipped +} + +// NewMultipartSSEReader creates a new multipart reader that can properly close all underlying readers +func NewMultipartSSEReader(readers []io.Reader) *MultipartSSEReader { + return &MultipartSSEReader{ + multiReader: io.MultiReader(readers...), + readers: readers, + } +} + +// Read implements the io.Reader interface +func (m *MultipartSSEReader) Read(p []byte) (n int, err error) { + return m.multiReader.Read(p) +} + +// Close implements the io.Closer interface and closes all underlying readers that support closing +func (m *MultipartSSEReader) Close() error { + var lastErr error + for i, reader := range m.readers { + if closer, ok := reader.(io.Closer); ok { + if err := closer.Close(); err != nil { + glog.V(2).Infof("Error closing reader %d: %v", i, err) + lastErr = err // Keep track of the last error, but continue closing others + } + } + } + return lastErr +} + +// Read implements the io.Reader interface for SSERangeReader +func (r *SSERangeReader) Read(p []byte) (n int, err error) { + + // If we need to skip bytes and haven't skipped enough yet + if r.skipped < r.offset { + skipNeeded := r.offset - r.skipped + skipBuf := make([]byte, min(int64(len(p)), skipNeeded)) + skipRead, skipErr := r.reader.Read(skipBuf) + r.skipped += int64(skipRead) + + if skipErr != nil { + return 0, skipErr + } + + // If we still need to skip more, recurse + if r.skipped < r.offset { + return r.Read(p) + } + } + + // If we have a remaining limit and it's reached + if r.remaining == 0 { + return 0, io.EOF + } + + // Calculate how much to read + readSize := len(p) + if r.remaining > 0 && int64(readSize) > r.remaining { + readSize = int(r.remaining) + } + + // Read the data + n, err = r.reader.Read(p[:readSize]) + if r.remaining > 0 { + r.remaining -= int64(n) + } + + return n, err +} + +// createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects +// Each chunk has its own IV and encryption key from the original multipart parts +func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) { + // Parse SSE-C headers from the request for decryption key + customerKey, err := ParseSSECHeaders(r) + if err != nil { + return nil, fmt.Errorf("invalid SSE-C headers for multipart decryption: %v", err) + } + + // Get the object path from the request + bucket, object := s3_constants.GetBucketAndObject(r) + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + + // Get the object entry from filer to access chunk information + entry, err := s3a.getEntry("", objectPath) + if err != nil { + return nil, fmt.Errorf("failed to get object entry for multipart SSE-C decryption: %v", err) + } + + // Sort chunks by offset to ensure correct order + chunks := entry.GetChunks() + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].GetOffset() < chunks[j].GetOffset() + }) + + // Check for Range header to optimize chunk processing + var startOffset, endOffset int64 = 0, -1 + rangeHeader := r.Header.Get("Range") + if rangeHeader != "" { + // Parse range header (e.g., "bytes=0-99") + if len(rangeHeader) > 6 && rangeHeader[:6] == "bytes=" { + rangeSpec := rangeHeader[6:] + parts := strings.Split(rangeSpec, "-") + if len(parts) == 2 { + if parts[0] != "" { + startOffset, _ = strconv.ParseInt(parts[0], 10, 64) + } + if parts[1] != "" { + endOffset, _ = strconv.ParseInt(parts[1], 10, 64) + } + } + } + } + + // Filter chunks to only those needed for the range request + var neededChunks []*filer_pb.FileChunk + for _, chunk := range chunks { + chunkStart := chunk.GetOffset() + chunkEnd := chunkStart + int64(chunk.GetSize()) - 1 + + // Check if this chunk overlaps with the requested range + if endOffset == -1 { + // No end specified, take all chunks from startOffset + if chunkEnd >= startOffset { + neededChunks = append(neededChunks, chunk) + } + } else { + // Specific range: check for overlap + if chunkStart <= endOffset && chunkEnd >= startOffset { + neededChunks = append(neededChunks, chunk) + } + } + } + + // Create readers for only the needed chunks + var readers []io.Reader + + for _, chunk := range neededChunks { + + // Get this chunk's encrypted data + chunkReader, err := s3a.createEncryptedChunkReader(chunk) + if err != nil { + return nil, fmt.Errorf("failed to create chunk reader: %v", err) + } + + if chunk.GetSseType() == filer_pb.SSEType_SSE_C { + // For SSE-C chunks, extract the IV from the stored per-chunk metadata (unified approach) + if len(chunk.GetSseMetadata()) > 0 { + // Deserialize the SSE-C metadata stored in the unified metadata field + ssecMetadata, decErr := DeserializeSSECMetadata(chunk.GetSseMetadata()) + if decErr != nil { + return nil, fmt.Errorf("failed to deserialize SSE-C metadata for chunk %s: %v", chunk.GetFileIdString(), decErr) + } + + // Decode the IV from the metadata + iv, ivErr := base64.StdEncoding.DecodeString(ssecMetadata.IV) + if ivErr != nil { + return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), ivErr) + } + + // Calculate the correct IV for this chunk using within-part offset + var chunkIV []byte + if ssecMetadata.PartOffset > 0 { + chunkIV = calculateIVWithOffset(iv, ssecMetadata.PartOffset) + } else { + chunkIV = iv + } + + decryptedReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV) + if decErr != nil { + return nil, fmt.Errorf("failed to create SSE-C decrypted reader for chunk %s: %v", chunk.GetFileIdString(), decErr) + } + readers = append(readers, decryptedReader) + glog.Infof("Created SSE-C decrypted reader for chunk %s using stored metadata", chunk.GetFileIdString()) + } else { + return nil, fmt.Errorf("SSE-C chunk %s missing required metadata", chunk.GetFileIdString()) + } + } else { + // Non-SSE-C chunk, use as-is + readers = append(readers, chunkReader) + } + } + + multiReader := NewMultipartSSEReader(readers) + + // Apply range logic if a range was requested + if rangeHeader != "" && startOffset >= 0 { + if endOffset == -1 { + // Open-ended range (e.g., "bytes=100-") + return &SSERangeReader{ + reader: multiReader, + offset: startOffset, + remaining: -1, // Read until EOF + }, nil + } else { + // Specific range (e.g., "bytes=0-99") + rangeLength := endOffset - startOffset + 1 + return &SSERangeReader{ + reader: multiReader, + offset: startOffset, + remaining: rangeLength, + }, nil + } + } + + return multiReader, nil +} |
