diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers.go | 739 |
1 files changed, 713 insertions, 26 deletions
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index bde5764f6..140ee7a42 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -2,11 +2,13 @@ package s3api import ( "bytes" + "encoding/base64" "errors" "fmt" "io" "net/http" "net/url" + "sort" "strconv" "strings" "time" @@ -328,9 +330,41 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) destUrl = s3a.toFilerUrl(bucket, object) } + // Check if this is a range request to an SSE object and modify the approach + originalRangeHeader := r.Header.Get("Range") + var sseObject = false + + // Pre-check if this object is SSE encrypted to avoid filer range conflicts + if originalRangeHeader != "" { + bucket, object := s3_constants.GetBucketAndObject(r) + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + if objectEntry, err := s3a.getEntry("", objectPath); err == nil { + primarySSEType := s3a.detectPrimarySSEType(objectEntry) + if primarySSEType == "SSE-C" || primarySSEType == "SSE-KMS" { + sseObject = true + // Temporarily remove Range header to get full encrypted data from filer + r.Header.Del("Range") + + } + } + } + s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { - // Handle SSE-C decryption if needed - return s3a.handleSSECResponse(r, proxyResponse, w) + // Restore the original Range header for SSE processing + if sseObject && originalRangeHeader != "" { + r.Header.Set("Range", originalRangeHeader) + + } + + // Add SSE metadata headers based on object metadata before SSE processing + bucket, object := s3_constants.GetBucketAndObject(r) + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + if objectEntry, err := s3a.getEntry("", objectPath); err == nil { + s3a.addSSEHeadersToResponse(proxyResponse, objectEntry) + } + + // Handle SSE decryption (both SSE-C and SSE-KMS) if needed + return s3a.handleSSEResponse(r, proxyResponse, w) }) } @@ -427,8 +461,8 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request } s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { - // Handle SSE-C validation for HEAD requests - return s3a.handleSSECResponse(r, proxyResponse, w) + // Handle SSE validation (both SSE-C and SSE-KMS) for HEAD requests + return s3a.handleSSEResponse(r, proxyResponse, w) }) } @@ -625,15 +659,95 @@ func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http. return http.StatusForbidden, 0 } - // SSE-C encrypted objects do not support HTTP Range requests because the 16-byte IV - // is required at the beginning of the stream for proper decryption - if r.Header.Get("Range") != "" { - s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) - return http.StatusRequestedRangeNotSatisfiable, 0 + // SSE-C encrypted objects support HTTP Range requests + // The IV is stored in metadata and CTR mode allows seeking to any offset + // Range requests will be handled by the filer layer with proper offset-based decryption + + // Check if this is a chunked or small content SSE-C object + bucket, object := s3_constants.GetBucketAndObject(r) + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + if entry, err := s3a.getEntry("", objectPath); err == nil { + // Check for SSE-C chunks + sseCChunks := 0 + for _, chunk := range entry.GetChunks() { + if chunk.GetSseType() == filer_pb.SSEType_SSE_C { + sseCChunks++ + } + } + + if sseCChunks >= 1 { + + // Handle chunked SSE-C objects - each chunk needs independent decryption + multipartReader, decErr := s3a.createMultipartSSECDecryptedReader(r, proxyResponse) + if decErr != nil { + glog.Errorf("Failed to create multipart SSE-C decrypted reader: %v", decErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 + } + + // Capture existing CORS headers + capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) + + // Copy headers from proxy response + for k, v := range proxyResponse.Header { + w.Header()[k] = v + } + + // Set proper headers for range requests + rangeHeader := r.Header.Get("Range") + if rangeHeader != "" { + + // Parse range header (e.g., "bytes=0-99") + if len(rangeHeader) > 6 && rangeHeader[:6] == "bytes=" { + rangeSpec := rangeHeader[6:] + parts := strings.Split(rangeSpec, "-") + if len(parts) == 2 { + startOffset, endOffset := int64(0), int64(-1) + if parts[0] != "" { + startOffset, _ = strconv.ParseInt(parts[0], 10, 64) + } + if parts[1] != "" { + endOffset, _ = strconv.ParseInt(parts[1], 10, 64) + } + + if endOffset >= startOffset { + // Specific range - set proper Content-Length and Content-Range headers + rangeLength := endOffset - startOffset + 1 + totalSize := proxyResponse.Header.Get("Content-Length") + + w.Header().Set("Content-Length", strconv.FormatInt(rangeLength, 10)) + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%s", startOffset, endOffset, totalSize)) + // writeFinalResponse will set status to 206 if Content-Range is present + } + } + } + } + + return writeFinalResponse(w, proxyResponse, multipartReader, capturedCORSHeaders) + } else if len(entry.GetChunks()) == 0 && len(entry.Content) > 0 { + // Small content SSE-C object stored directly in entry.Content + + // Fall through to traditional single-object SSE-C handling below + } + } + + // Single-part SSE-C object: Get IV from proxy response headers (stored during upload) + ivBase64 := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEIVHeader) + if ivBase64 == "" { + glog.Errorf("SSE-C encrypted single-part object missing IV in metadata") + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 + } + + iv, err := base64.StdEncoding.DecodeString(ivBase64) + if err != nil { + glog.Errorf("Failed to decode IV from metadata: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 } - // Create decrypted reader - decryptedReader, decErr := CreateSSECDecryptedReader(proxyResponse.Body, customerKey) + // Create decrypted reader with IV from metadata + decryptedReader, decErr := CreateSSECDecryptedReader(proxyResponse.Body, customerKey, iv) if decErr != nil { glog.Errorf("Failed to create SSE-C decrypted reader: %v", decErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) @@ -651,23 +765,12 @@ func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http. } // Set correct Content-Length for SSE-C (only for full object requests) - // Range requests are complex with SSE-C because the entire object needs decryption + // With IV stored in metadata, the encrypted length equals the original length if proxyResponse.Header.Get("Content-Range") == "" { - // Full object request: subtract 16-byte IV from encrypted length + // Full object request: encrypted length equals original length (IV not in stream) if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" { - encryptedLength, err := strconv.ParseInt(contentLengthStr, 10, 64) - if err != nil { - glog.Errorf("Invalid Content-Length header for SSE-C object: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - originalLength := encryptedLength - 16 - if originalLength < 0 { - glog.Errorf("Encrypted object length (%d) is less than IV size (16 bytes)", encryptedLength) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return http.StatusInternalServerError, 0 - } - w.Header().Set("Content-Length", strconv.FormatInt(originalLength, 10)) + // Content-Length is already correct since IV is stored in metadata, not in data stream + w.Header().Set("Content-Length", contentLengthStr) } } // For range requests, let the actual bytes transferred determine the response length @@ -689,6 +792,160 @@ func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http. } } +// handleSSEResponse handles both SSE-C and SSE-KMS decryption/validation and response processing +func (s3a *S3ApiServer) handleSSEResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { + // Check what the client is expecting based on request headers + clientExpectsSSEC := IsSSECRequest(r) + + // Check what the stored object has in headers (may be conflicting after copy) + kmsMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader) + sseAlgorithm := proxyResponse.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm) + + // Get actual object state by examining chunks (most reliable for cross-encryption) + bucket, object := s3_constants.GetBucketAndObject(r) + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + actualObjectType := "Unknown" + if objectEntry, err := s3a.getEntry("", objectPath); err == nil { + actualObjectType = s3a.detectPrimarySSEType(objectEntry) + } + + // Route based on ACTUAL object type (from chunks) rather than conflicting headers + if actualObjectType == "SSE-C" && clientExpectsSSEC { + // Object is SSE-C and client expects SSE-C → SSE-C handler + return s3a.handleSSECResponse(r, proxyResponse, w) + } else if actualObjectType == "SSE-KMS" && !clientExpectsSSEC { + // Object is SSE-KMS and client doesn't expect SSE-C → SSE-KMS handler + return s3a.handleSSEKMSResponse(r, proxyResponse, w, kmsMetadataHeader) + } else if actualObjectType == "None" && !clientExpectsSSEC { + // Object is unencrypted and client doesn't expect SSE-C → pass through + return passThroughResponse(proxyResponse, w) + } else if actualObjectType == "SSE-C" && !clientExpectsSSEC { + // Object is SSE-C but client doesn't provide SSE-C headers → Error + s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) + return http.StatusBadRequest, 0 + } else if actualObjectType == "SSE-KMS" && clientExpectsSSEC { + // Object is SSE-KMS but client provides SSE-C headers → Error + s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) + return http.StatusBadRequest, 0 + } else if actualObjectType == "None" && clientExpectsSSEC { + // Object is unencrypted but client provides SSE-C headers → Error + s3err.WriteErrorResponse(w, r, s3err.ErrSSECustomerKeyMissing) + return http.StatusBadRequest, 0 + } + + // Fallback for edge cases - use original logic with header-based detection + if clientExpectsSSEC && sseAlgorithm != "" { + return s3a.handleSSECResponse(r, proxyResponse, w) + } else if !clientExpectsSSEC && kmsMetadataHeader != "" { + return s3a.handleSSEKMSResponse(r, proxyResponse, w, kmsMetadataHeader) + } else { + return passThroughResponse(proxyResponse, w) + } +} + +// handleSSEKMSResponse handles SSE-KMS decryption and response processing +func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *http.Response, w http.ResponseWriter, kmsMetadataHeader string) (statusCode int, bytesTransferred int64) { + // Deserialize SSE-KMS metadata + kmsMetadataBytes, err := base64.StdEncoding.DecodeString(kmsMetadataHeader) + if err != nil { + glog.Errorf("Failed to decode SSE-KMS metadata: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 + } + + sseKMSKey, err := DeserializeSSEKMSMetadata(kmsMetadataBytes) + if err != nil { + glog.Errorf("Failed to deserialize SSE-KMS metadata: %v", err) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 + } + + // For HEAD requests, we don't need to decrypt the body, just add response headers + if r.Method == "HEAD" { + // Capture existing CORS headers that may have been set by middleware + capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) + + // Copy headers from proxy response + for k, v := range proxyResponse.Header { + w.Header()[k] = v + } + + // Add SSE-KMS response headers + AddSSEKMSResponseHeaders(w, sseKMSKey) + + return writeFinalResponse(w, proxyResponse, proxyResponse.Body, capturedCORSHeaders) + } + + // For GET requests, check if this is a multipart SSE-KMS object + // We need to check the object structure to determine if it's multipart encrypted + isMultipartSSEKMS := false + + if sseKMSKey != nil { + // Get the object entry to check chunk structure + bucket, object := s3_constants.GetBucketAndObject(r) + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + if entry, err := s3a.getEntry("", objectPath); err == nil { + // Check for multipart SSE-KMS + sseKMSChunks := 0 + for _, chunk := range entry.GetChunks() { + if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseKmsMetadata()) > 0 { + sseKMSChunks++ + } + } + isMultipartSSEKMS = sseKMSChunks > 1 + + glog.Infof("SSE-KMS object detection: chunks=%d, sseKMSChunks=%d, isMultipartSSEKMS=%t", + len(entry.GetChunks()), sseKMSChunks, isMultipartSSEKMS) + } + } + + var decryptedReader io.Reader + if isMultipartSSEKMS { + // Handle multipart SSE-KMS objects - each chunk needs independent decryption + multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse) + if decErr != nil { + glog.Errorf("Failed to create multipart SSE-KMS decrypted reader: %v", decErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 + } + decryptedReader = multipartReader + glog.V(3).Infof("Using multipart SSE-KMS decryption for object") + } else { + // Handle single-part SSE-KMS objects + singlePartReader, decErr := CreateSSEKMSDecryptedReader(proxyResponse.Body, sseKMSKey) + if decErr != nil { + glog.Errorf("Failed to create SSE-KMS decrypted reader: %v", decErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return http.StatusInternalServerError, 0 + } + decryptedReader = singlePartReader + glog.V(3).Infof("Using single-part SSE-KMS decryption for object") + } + + // Capture existing CORS headers that may have been set by middleware + capturedCORSHeaders := captureCORSHeaders(w, corsHeaders) + + // Copy headers from proxy response (excluding body-related headers that might change) + for k, v := range proxyResponse.Header { + if k != "Content-Length" && k != "Content-Encoding" { + w.Header()[k] = v + } + } + + // Set correct Content-Length for SSE-KMS + if proxyResponse.Header.Get("Content-Range") == "" { + // For full object requests, encrypted length equals original length + if contentLengthStr := proxyResponse.Header.Get("Content-Length"); contentLengthStr != "" { + w.Header().Set("Content-Length", contentLengthStr) + } + } + + // Add SSE-KMS response headers + AddSSEKMSResponseHeaders(w, sseKMSKey) + + return writeFinalResponse(w, proxyResponse, decryptedReader, capturedCORSHeaders) +} + // addObjectLockHeadersToResponse extracts object lock metadata from entry Extended attributes // and adds the appropriate S3 headers to the response func (s3a *S3ApiServer) addObjectLockHeadersToResponse(w http.ResponseWriter, entry *filer_pb.Entry) { @@ -729,3 +986,433 @@ func (s3a *S3ApiServer) addObjectLockHeadersToResponse(w http.ResponseWriter, en w.Header().Set(s3_constants.AmzObjectLockLegalHold, s3_constants.LegalHoldOff) } } + +// addSSEHeadersToResponse converts stored SSE metadata from entry.Extended to HTTP response headers +// Uses intelligent prioritization: only set headers for the PRIMARY encryption type to avoid conflicts +func (s3a *S3ApiServer) addSSEHeadersToResponse(proxyResponse *http.Response, entry *filer_pb.Entry) { + if entry == nil || entry.Extended == nil { + return + } + + // Determine the primary encryption type by examining chunks (most reliable) + primarySSEType := s3a.detectPrimarySSEType(entry) + + // Only set headers for the PRIMARY encryption type + switch primarySSEType { + case "SSE-C": + // Add only SSE-C headers + if algorithmBytes, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm]; exists && len(algorithmBytes) > 0 { + proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionCustomerAlgorithm, string(algorithmBytes)) + } + + if keyMD5Bytes, exists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; exists && len(keyMD5Bytes) > 0 { + proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionCustomerKeyMD5, string(keyMD5Bytes)) + } + + if ivBytes, exists := entry.Extended[s3_constants.SeaweedFSSSEIV]; exists && len(ivBytes) > 0 { + ivBase64 := base64.StdEncoding.EncodeToString(ivBytes) + proxyResponse.Header.Set(s3_constants.SeaweedFSSSEIVHeader, ivBase64) + } + + case "SSE-KMS": + // Add only SSE-KMS headers + if sseAlgorithm, exists := entry.Extended[s3_constants.AmzServerSideEncryption]; exists && len(sseAlgorithm) > 0 { + proxyResponse.Header.Set(s3_constants.AmzServerSideEncryption, string(sseAlgorithm)) + } + + if kmsKeyID, exists := entry.Extended[s3_constants.AmzServerSideEncryptionAwsKmsKeyId]; exists && len(kmsKeyID) > 0 { + proxyResponse.Header.Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, string(kmsKeyID)) + } + + default: + // Unencrypted or unknown - don't set any SSE headers + } + + glog.V(3).Infof("addSSEHeadersToResponse: processed %d extended metadata entries", len(entry.Extended)) +} + +// detectPrimarySSEType determines the primary SSE type by examining chunk metadata +func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string { + if len(entry.GetChunks()) == 0 { + // No chunks - check object-level metadata only (single objects or smallContent) + hasSSEC := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] != nil + hasSSEKMS := entry.Extended[s3_constants.AmzServerSideEncryption] != nil + + if hasSSEC && !hasSSEKMS { + return "SSE-C" + } else if hasSSEKMS && !hasSSEC { + return "SSE-KMS" + } else if hasSSEC && hasSSEKMS { + // Both present - this should only happen during cross-encryption copies + // Use content to determine actual encryption state + if len(entry.Content) > 0 { + // smallContent - check if it's encrypted (heuristic: random-looking data) + return "SSE-C" // Default to SSE-C for mixed case + } else { + // No content, both headers - default to SSE-C + return "SSE-C" + } + } + return "None" + } + + // Count chunk types to determine primary (multipart objects) + ssecChunks := 0 + ssekmsChunks := 0 + + for _, chunk := range entry.GetChunks() { + switch chunk.GetSseType() { + case filer_pb.SSEType_SSE_C: + ssecChunks++ + case filer_pb.SSEType_SSE_KMS: + ssekmsChunks++ + } + } + + // Primary type is the one with more chunks + if ssecChunks > ssekmsChunks { + return "SSE-C" + } else if ssekmsChunks > ssecChunks { + return "SSE-KMS" + } else if ssecChunks > 0 { + // Equal number, prefer SSE-C (shouldn't happen in practice) + return "SSE-C" + } + + return "None" +} + +// createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects +func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) { + // Get the object path from the request + bucket, object := s3_constants.GetBucketAndObject(r) + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + + // Get the object entry from filer to access chunk information + entry, err := s3a.getEntry("", objectPath) + if err != nil { + return nil, fmt.Errorf("failed to get object entry for multipart SSE-KMS decryption: %v", err) + } + + // Sort chunks by offset to ensure correct order + chunks := entry.GetChunks() + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].GetOffset() < chunks[j].GetOffset() + }) + + // Create readers for each chunk, decrypting them independently + var readers []io.Reader + + for i, chunk := range chunks { + glog.Infof("Processing chunk %d/%d: fileId=%s, offset=%d, size=%d, sse_type=%d", + i+1, len(entry.GetChunks()), chunk.GetFileIdString(), chunk.GetOffset(), chunk.GetSize(), chunk.GetSseType()) + + // Get this chunk's encrypted data + chunkReader, err := s3a.createEncryptedChunkReader(chunk) + if err != nil { + return nil, fmt.Errorf("failed to create chunk reader: %v", err) + } + + // Get SSE-KMS metadata for this chunk + var chunkSSEKMSKey *SSEKMSKey + + // Check if this chunk has per-chunk SSE-KMS metadata (new architecture) + if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS && len(chunk.GetSseKmsMetadata()) > 0 { + // Use the per-chunk SSE-KMS metadata + kmsKey, err := DeserializeSSEKMSMetadata(chunk.GetSseKmsMetadata()) + if err != nil { + glog.Errorf("Failed to deserialize per-chunk SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err) + } else { + // ChunkOffset is already set from the stored metadata (PartOffset) + chunkSSEKMSKey = kmsKey + glog.Infof("Using per-chunk SSE-KMS metadata for chunk %s: keyID=%s, IV=%x, partOffset=%d", + chunk.GetFileIdString(), kmsKey.KeyID, kmsKey.IV[:8], kmsKey.ChunkOffset) + } + } + + // Fallback to object-level metadata (legacy support) + if chunkSSEKMSKey == nil { + objectMetadataHeader := proxyResponse.Header.Get(s3_constants.SeaweedFSSSEKMSKeyHeader) + if objectMetadataHeader != "" { + kmsMetadataBytes, decodeErr := base64.StdEncoding.DecodeString(objectMetadataHeader) + if decodeErr == nil { + kmsKey, _ := DeserializeSSEKMSMetadata(kmsMetadataBytes) + if kmsKey != nil { + // For object-level metadata (legacy), use absolute file offset as fallback + kmsKey.ChunkOffset = chunk.GetOffset() + chunkSSEKMSKey = kmsKey + } + glog.Infof("Using fallback object-level SSE-KMS metadata for chunk %s with offset %d", chunk.GetFileIdString(), chunk.GetOffset()) + } + } + } + + if chunkSSEKMSKey == nil { + return nil, fmt.Errorf("no SSE-KMS metadata found for chunk %s in multipart object", chunk.GetFileIdString()) + } + + // Create decrypted reader for this chunk + decryptedChunkReader, decErr := CreateSSEKMSDecryptedReader(chunkReader, chunkSSEKMSKey) + if decErr != nil { + chunkReader.Close() // Close the chunk reader if decryption fails + return nil, fmt.Errorf("failed to decrypt chunk: %v", decErr) + } + + // Use the streaming decrypted reader directly instead of reading into memory + readers = append(readers, decryptedChunkReader) + glog.V(4).Infof("Added streaming decrypted reader for chunk %s in multipart SSE-KMS object", chunk.GetFileIdString()) + } + + // Combine all decrypted chunk readers into a single stream with proper resource management + multiReader := NewMultipartSSEReader(readers) + glog.V(3).Infof("Created multipart SSE-KMS decrypted reader with %d chunks", len(readers)) + + return multiReader, nil +} + +// createEncryptedChunkReader creates a reader for a single encrypted chunk +func (s3a *S3ApiServer) createEncryptedChunkReader(chunk *filer_pb.FileChunk) (io.ReadCloser, error) { + // Get chunk URL + srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString()) + if err != nil { + return nil, fmt.Errorf("lookup volume URL for chunk %s: %v", chunk.GetFileIdString(), err) + } + + // Create HTTP request for chunk data + req, err := http.NewRequest("GET", srcUrl, nil) + if err != nil { + return nil, fmt.Errorf("create HTTP request for chunk: %v", err) + } + + // Execute request + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("execute HTTP request for chunk: %v", err) + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return nil, fmt.Errorf("HTTP request for chunk failed: %d", resp.StatusCode) + } + + return resp.Body, nil +} + +// MultipartSSEReader wraps multiple readers and ensures all underlying readers are properly closed +type MultipartSSEReader struct { + multiReader io.Reader + readers []io.Reader +} + +// SSERangeReader applies range logic to an underlying reader +type SSERangeReader struct { + reader io.Reader + offset int64 // bytes to skip from the beginning + remaining int64 // bytes remaining to read (-1 for unlimited) + skipped int64 // bytes already skipped +} + +// NewMultipartSSEReader creates a new multipart reader that can properly close all underlying readers +func NewMultipartSSEReader(readers []io.Reader) *MultipartSSEReader { + return &MultipartSSEReader{ + multiReader: io.MultiReader(readers...), + readers: readers, + } +} + +// Read implements the io.Reader interface +func (m *MultipartSSEReader) Read(p []byte) (n int, err error) { + return m.multiReader.Read(p) +} + +// Close implements the io.Closer interface and closes all underlying readers that support closing +func (m *MultipartSSEReader) Close() error { + var lastErr error + for i, reader := range m.readers { + if closer, ok := reader.(io.Closer); ok { + if err := closer.Close(); err != nil { + glog.V(2).Infof("Error closing reader %d: %v", i, err) + lastErr = err // Keep track of the last error, but continue closing others + } + } + } + return lastErr +} + +// Read implements the io.Reader interface for SSERangeReader +func (r *SSERangeReader) Read(p []byte) (n int, err error) { + + // If we need to skip bytes and haven't skipped enough yet + if r.skipped < r.offset { + skipNeeded := r.offset - r.skipped + skipBuf := make([]byte, min(int64(len(p)), skipNeeded)) + skipRead, skipErr := r.reader.Read(skipBuf) + r.skipped += int64(skipRead) + + if skipErr != nil { + return 0, skipErr + } + + // If we still need to skip more, recurse + if r.skipped < r.offset { + return r.Read(p) + } + } + + // If we have a remaining limit and it's reached + if r.remaining == 0 { + return 0, io.EOF + } + + // Calculate how much to read + readSize := len(p) + if r.remaining > 0 && int64(readSize) > r.remaining { + readSize = int(r.remaining) + } + + // Read the data + n, err = r.reader.Read(p[:readSize]) + if r.remaining > 0 { + r.remaining -= int64(n) + } + + return n, err +} + +// createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects +// Each chunk has its own IV and encryption key from the original multipart parts +func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) { + // Parse SSE-C headers from the request for decryption key + customerKey, err := ParseSSECHeaders(r) + if err != nil { + return nil, fmt.Errorf("invalid SSE-C headers for multipart decryption: %v", err) + } + + // Get the object path from the request + bucket, object := s3_constants.GetBucketAndObject(r) + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + + // Get the object entry from filer to access chunk information + entry, err := s3a.getEntry("", objectPath) + if err != nil { + return nil, fmt.Errorf("failed to get object entry for multipart SSE-C decryption: %v", err) + } + + // Sort chunks by offset to ensure correct order + chunks := entry.GetChunks() + sort.Slice(chunks, func(i, j int) bool { + return chunks[i].GetOffset() < chunks[j].GetOffset() + }) + + // Check for Range header to optimize chunk processing + var startOffset, endOffset int64 = 0, -1 + rangeHeader := r.Header.Get("Range") + if rangeHeader != "" { + // Parse range header (e.g., "bytes=0-99") + if len(rangeHeader) > 6 && rangeHeader[:6] == "bytes=" { + rangeSpec := rangeHeader[6:] + parts := strings.Split(rangeSpec, "-") + if len(parts) == 2 { + if parts[0] != "" { + startOffset, _ = strconv.ParseInt(parts[0], 10, 64) + } + if parts[1] != "" { + endOffset, _ = strconv.ParseInt(parts[1], 10, 64) + } + } + } + } + + // Filter chunks to only those needed for the range request + var neededChunks []*filer_pb.FileChunk + for _, chunk := range chunks { + chunkStart := chunk.GetOffset() + chunkEnd := chunkStart + int64(chunk.GetSize()) - 1 + + // Check if this chunk overlaps with the requested range + if endOffset == -1 { + // No end specified, take all chunks from startOffset + if chunkEnd >= startOffset { + neededChunks = append(neededChunks, chunk) + } + } else { + // Specific range: check for overlap + if chunkStart <= endOffset && chunkEnd >= startOffset { + neededChunks = append(neededChunks, chunk) + } + } + } + + // Create readers for only the needed chunks + var readers []io.Reader + + for _, chunk := range neededChunks { + + // Get this chunk's encrypted data + chunkReader, err := s3a.createEncryptedChunkReader(chunk) + if err != nil { + return nil, fmt.Errorf("failed to create chunk reader: %v", err) + } + + if chunk.GetSseType() == filer_pb.SSEType_SSE_C { + // For SSE-C chunks, extract the IV from the stored per-chunk metadata (unified approach) + if len(chunk.GetSseKmsMetadata()) > 0 { + // Deserialize the SSE-C metadata stored in the unified metadata field + ssecMetadata, decErr := DeserializeSSECMetadata(chunk.GetSseKmsMetadata()) + if decErr != nil { + return nil, fmt.Errorf("failed to deserialize SSE-C metadata for chunk %s: %v", chunk.GetFileIdString(), decErr) + } + + // Decode the IV from the metadata + iv, ivErr := base64.StdEncoding.DecodeString(ssecMetadata.IV) + if ivErr != nil { + return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), ivErr) + } + + // Calculate the correct IV for this chunk using within-part offset + var chunkIV []byte + if ssecMetadata.PartOffset > 0 { + chunkIV = calculateIVWithOffset(iv, ssecMetadata.PartOffset) + } else { + chunkIV = iv + } + + decryptedReader, decErr := CreateSSECDecryptedReader(chunkReader, customerKey, chunkIV) + if decErr != nil { + return nil, fmt.Errorf("failed to create SSE-C decrypted reader for chunk %s: %v", chunk.GetFileIdString(), decErr) + } + readers = append(readers, decryptedReader) + glog.Infof("Created SSE-C decrypted reader for chunk %s using stored metadata", chunk.GetFileIdString()) + } else { + return nil, fmt.Errorf("SSE-C chunk %s missing required metadata", chunk.GetFileIdString()) + } + } else { + // Non-SSE-C chunk, use as-is + readers = append(readers, chunkReader) + } + } + + multiReader := NewMultipartSSEReader(readers) + + // Apply range logic if a range was requested + if rangeHeader != "" && startOffset >= 0 { + if endOffset == -1 { + // Open-ended range (e.g., "bytes=100-") + return &SSERangeReader{ + reader: multiReader, + offset: startOffset, + remaining: -1, // Read until EOF + }, nil + } else { + // Specific range (e.g., "bytes=0-99") + rangeLength := endOffset - startOffset + 1 + return &SSERangeReader{ + reader: multiReader, + offset: startOffset, + remaining: rangeLength, + }, nil + } + } + + return multiReader, nil +} |
