diff options
Diffstat (limited to 'weed/s3api/filer_multipart.go')
| -rw-r--r-- | weed/s3api/filer_multipart.go | 267 |
1 files changed, 185 insertions, 82 deletions
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index c4c07f0c7..4b8fbaa62 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -5,7 +5,9 @@ import ( "crypto/rand" "encoding/base64" "encoding/hex" + "encoding/json" "encoding/xml" + "errors" "fmt" "math" "path/filepath" @@ -71,7 +73,7 @@ func (s3a *S3ApiServer) createMultipartUpload(r *http.Request, input *s3.CreateM // Prepare and apply encryption configuration within directory creation // This ensures encryption resources are only allocated if directory creation succeeds - encryptionConfig, prepErr := s3a.prepareMultipartEncryptionConfig(r, uploadIdString) + encryptionConfig, prepErr := s3a.prepareMultipartEncryptionConfig(r, *input.Bucket, uploadIdString) if prepErr != nil { encryptionError = prepErr return // Exit callback, letting mkdir handle the error @@ -118,6 +120,36 @@ type CompleteMultipartUploadResult struct { VersionId *string `xml:"-"` } +// copySSEHeadersFromFirstPart copies all SSE-related headers from the first part to the destination entry +// This is critical for detectPrimarySSEType to work correctly and ensures encryption metadata is preserved +func copySSEHeadersFromFirstPart(dst *filer_pb.Entry, firstPart *filer_pb.Entry, context string) { + if firstPart == nil || firstPart.Extended == nil { + return + } + + // Copy ALL SSE-related headers (not just SeaweedFSSSEKMSKey) + sseKeys := []string{ + // SSE-C headers + s3_constants.SeaweedFSSSEIV, + s3_constants.AmzServerSideEncryptionCustomerAlgorithm, + s3_constants.AmzServerSideEncryptionCustomerKeyMD5, + // SSE-KMS headers + s3_constants.SeaweedFSSSEKMSKey, + s3_constants.AmzServerSideEncryptionAwsKmsKeyId, + // SSE-S3 headers + s3_constants.SeaweedFSSSES3Key, + // Common SSE header (for SSE-KMS and SSE-S3) + s3_constants.AmzServerSideEncryption, + } + + for _, key := range sseKeys { + if value, exists := firstPart.Extended[key]; exists { + dst.Extended[key] = value + glog.V(4).Infof("completeMultipartUpload: copied SSE header %s from first part (%s)", key, context) + } + } +} + func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) { glog.V(2).Infof("completeMultipartUpload input %v", input) @@ -231,6 +263,16 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl mime := pentry.Attributes.Mime var finalParts []*filer_pb.FileChunk var offset int64 + + // Track part boundaries for later retrieval with PartNumber parameter + type PartBoundary struct { + PartNumber int `json:"part"` + StartChunk int `json:"start"` + EndChunk int `json:"end"` // exclusive + ETag string `json:"etag"` + } + var partBoundaries []PartBoundary + for _, partNumber := range completedPartNumbers { partEntriesByNumber, ok := partEntries[partNumber] if !ok { @@ -251,42 +293,18 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl continue } - // Track within-part offset for SSE-KMS IV calculation - var withinPartOffset int64 = 0 + // Record the start chunk index for this part + partStartChunk := len(finalParts) + + // Calculate the part's ETag (for GetObject with PartNumber) + partETag := filer.ETag(entry) for _, chunk := range entry.GetChunks() { - // Update SSE metadata with correct within-part offset (unified approach for KMS and SSE-C) - sseKmsMetadata := chunk.SseMetadata - - if chunk.SseType == filer_pb.SSEType_SSE_KMS && len(chunk.SseMetadata) > 0 { - // Deserialize, update offset, and re-serialize SSE-KMS metadata - if kmsKey, err := DeserializeSSEKMSMetadata(chunk.SseMetadata); err == nil { - kmsKey.ChunkOffset = withinPartOffset - if updatedMetadata, serErr := SerializeSSEKMSMetadata(kmsKey); serErr == nil { - sseKmsMetadata = updatedMetadata - glog.V(4).Infof("Updated SSE-KMS metadata for chunk in part %d: withinPartOffset=%d", partNumber, withinPartOffset) - } - } - } else if chunk.SseType == filer_pb.SSEType_SSE_C { - // For SSE-C chunks, create per-chunk metadata using the part's IV - if ivData, exists := entry.Extended[s3_constants.SeaweedFSSSEIV]; exists { - // Get keyMD5 from entry metadata if available - var keyMD5 string - if keyMD5Data, keyExists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; keyExists { - keyMD5 = string(keyMD5Data) - } - - // Create SSE-C metadata with the part's IV and this chunk's within-part offset - if ssecMetadata, serErr := SerializeSSECMetadata(ivData, keyMD5, withinPartOffset); serErr == nil { - sseKmsMetadata = ssecMetadata // Reuse the same field for unified handling - glog.V(4).Infof("Created SSE-C metadata for chunk in part %d: withinPartOffset=%d", partNumber, withinPartOffset) - } else { - glog.Errorf("Failed to serialize SSE-C metadata for chunk in part %d: %v", partNumber, serErr) - } - } else { - glog.Errorf("SSE-C chunk in part %d missing IV in entry metadata", partNumber) - } - } + // CRITICAL: Do NOT modify SSE metadata offsets during assembly! + // The encrypted data was created with the offset stored in chunk.SseMetadata. + // Changing the offset here would cause decryption to fail because CTR mode + // uses the offset to initialize the counter. We must decrypt with the same + // offset that was used during encryption. p := &filer_pb.FileChunk{ FileId: chunk.GetFileIdString(), @@ -296,14 +314,23 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl CipherKey: chunk.CipherKey, ETag: chunk.ETag, IsCompressed: chunk.IsCompressed, - // Preserve SSE metadata with updated within-part offset + // Preserve SSE metadata UNCHANGED - do not modify the offset! SseType: chunk.SseType, - SseMetadata: sseKmsMetadata, + SseMetadata: chunk.SseMetadata, } finalParts = append(finalParts, p) offset += int64(chunk.Size) - withinPartOffset += int64(chunk.Size) } + + // Record the part boundary + partEndChunk := len(finalParts) + partBoundaries = append(partBoundaries, PartBoundary{ + PartNumber: partNumber, + StartChunk: partStartChunk, + EndChunk: partEndChunk, + ETag: partETag, + }) + found = true } } @@ -325,6 +352,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl } versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId) versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId) + // Store parts count for x-amz-mp-parts-count header + versionEntry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers))) + // Store part boundaries for GetObject with PartNumber + if partBoundariesJSON, err := json.Marshal(partBoundaries); err == nil { + versionEntry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON + } // Set object owner for versioned multipart objects amzAccountId := r.Header.Get(s3_constants.AmzAccountId) @@ -338,17 +371,11 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl } } - // Preserve SSE-KMS metadata from the first part (if any) - // SSE-KMS metadata is stored in individual parts, not the upload directory + // Preserve ALL SSE metadata from the first part (if any) + // SSE metadata is stored in individual parts, not the upload directory if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 { firstPartEntry := partEntries[completedPartNumbers[0]][0] - if firstPartEntry.Extended != nil { - // Copy SSE-KMS metadata from the first part - if kmsMetadata, exists := firstPartEntry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists { - versionEntry.Extended[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata - glog.V(3).Infof("completeMultipartUpload: preserved SSE-KMS metadata from first part (versioned)") - } - } + copySSEHeadersFromFirstPart(versionEntry, firstPartEntry, "versioned") } if pentry.Attributes.Mime != "" { versionEntry.Attributes.Mime = pentry.Attributes.Mime @@ -387,6 +414,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl entry.Extended = make(map[string][]byte) } entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null") + // Store parts count for x-amz-mp-parts-count header + entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers))) + // Store part boundaries for GetObject with PartNumber + if partBoundariesJSON, jsonErr := json.Marshal(partBoundaries); jsonErr == nil { + entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON + } // Set object owner for suspended versioning multipart objects amzAccountId := r.Header.Get(s3_constants.AmzAccountId) @@ -400,17 +433,11 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl } } - // Preserve SSE-KMS metadata from the first part (if any) - // SSE-KMS metadata is stored in individual parts, not the upload directory + // Preserve ALL SSE metadata from the first part (if any) + // SSE metadata is stored in individual parts, not the upload directory if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 { firstPartEntry := partEntries[completedPartNumbers[0]][0] - if firstPartEntry.Extended != nil { - // Copy SSE-KMS metadata from the first part - if kmsMetadata, exists := firstPartEntry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists { - entry.Extended[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata - glog.V(3).Infof("completeMultipartUpload: preserved SSE-KMS metadata from first part (suspended versioning)") - } - } + copySSEHeadersFromFirstPart(entry, firstPartEntry, "suspended versioning") } if pentry.Attributes.Mime != "" { entry.Attributes.Mime = pentry.Attributes.Mime @@ -440,6 +467,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl entry.Extended = make(map[string][]byte) } entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId) + // Store parts count for x-amz-mp-parts-count header + entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers))) + // Store part boundaries for GetObject with PartNumber + if partBoundariesJSON, err := json.Marshal(partBoundaries); err == nil { + entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON + } // Set object owner for non-versioned multipart objects amzAccountId := r.Header.Get(s3_constants.AmzAccountId) @@ -453,17 +486,11 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl } } - // Preserve SSE-KMS metadata from the first part (if any) - // SSE-KMS metadata is stored in individual parts, not the upload directory + // Preserve ALL SSE metadata from the first part (if any) + // SSE metadata is stored in individual parts, not the upload directory if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 { firstPartEntry := partEntries[completedPartNumbers[0]][0] - if firstPartEntry.Extended != nil { - // Copy SSE-KMS metadata from the first part - if kmsMetadata, exists := firstPartEntry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists { - entry.Extended[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata - glog.V(3).Infof("completeMultipartUpload: preserved SSE-KMS metadata from first part") - } - } + copySSEHeadersFromFirstPart(entry, firstPartEntry, "non-versioned") } if pentry.Attributes.Mime != "" { entry.Attributes.Mime = pentry.Attributes.Mime @@ -510,15 +537,11 @@ func (s3a *S3ApiServer) getEntryNameAndDir(input *s3.CompleteMultipartUploadInpu if dirName == "." { dirName = "" } - if strings.HasPrefix(dirName, "/") { - dirName = dirName[1:] - } + dirName = strings.TrimPrefix(dirName, "/") dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName) // remove suffix '/' - if strings.HasSuffix(dirName, "/") { - dirName = dirName[:len(dirName)-1] - } + dirName = strings.TrimSuffix(dirName, "/") return entryName, dirName } @@ -664,18 +687,23 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP glog.Errorf("listObjectParts %s %s parse %s: %v", *input.Bucket, *input.UploadId, entry.Name, err) continue } - output.Part = append(output.Part, &s3.Part{ + partETag := filer.ETag(entry) + part := &s3.Part{ PartNumber: aws.Int64(int64(partNumber)), LastModified: aws.Time(time.Unix(entry.Attributes.Mtime, 0).UTC()), Size: aws.Int64(int64(filer.FileSize(entry))), - ETag: aws.String("\"" + filer.ETag(entry) + "\""), - }) + ETag: aws.String("\"" + partETag + "\""), + } + output.Part = append(output.Part, part) + glog.V(3).Infof("listObjectParts: Added part %d, size=%d, etag=%s", + partNumber, filer.FileSize(entry), partETag) if !isLast { output.NextPartNumberMarker = aws.Int64(int64(partNumber)) } } } + glog.V(2).Infof("listObjectParts: Returning %d parts for uploadId=%s", len(output.Part), *input.UploadId) return } @@ -704,11 +732,16 @@ type MultipartEncryptionConfig struct { // prepareMultipartEncryptionConfig prepares encryption configuration with proper error handling // This eliminates the need for criticalError variable in callback functions -func (s3a *S3ApiServer) prepareMultipartEncryptionConfig(r *http.Request, uploadIdString string) (*MultipartEncryptionConfig, error) { +// Updated to support bucket-default encryption (matches putToFiler behavior) +func (s3a *S3ApiServer) prepareMultipartEncryptionConfig(r *http.Request, bucket string, uploadIdString string) (*MultipartEncryptionConfig, error) { config := &MultipartEncryptionConfig{} - // Prepare SSE-KMS configuration - if IsSSEKMSRequest(r) { + // Check for explicit encryption headers first (priority over bucket defaults) + hasExplicitSSEKMS := IsSSEKMSRequest(r) + hasExplicitSSES3 := IsSSES3RequestInternal(r) + + // Prepare SSE-KMS configuration (explicit request headers) + if hasExplicitSSEKMS { config.IsSSEKMS = true config.KMSKeyID = r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId) config.BucketKeyEnabled = strings.ToLower(r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled)) == "true" @@ -721,11 +754,11 @@ func (s3a *S3ApiServer) prepareMultipartEncryptionConfig(r *http.Request, upload return nil, fmt.Errorf("failed to generate secure IV for SSE-KMS multipart upload: %v (read %d/%d bytes)", err, n, len(baseIV)) } config.KMSBaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV) - glog.V(4).Infof("Generated base IV %x for SSE-KMS multipart upload %s", baseIV[:8], uploadIdString) + glog.V(4).Infof("Generated base IV %x for explicit SSE-KMS multipart upload %s", baseIV[:8], uploadIdString) } - // Prepare SSE-S3 configuration - if IsSSES3RequestInternal(r) { + // Prepare SSE-S3 configuration (explicit request headers) + if hasExplicitSSES3 { config.IsSSES3 = true // Generate and encode base IV with proper error handling @@ -735,7 +768,7 @@ func (s3a *S3ApiServer) prepareMultipartEncryptionConfig(r *http.Request, upload return nil, fmt.Errorf("failed to generate secure IV for SSE-S3 multipart upload: %v (read %d/%d bytes)", err, n, len(baseIV)) } config.S3BaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV) - glog.V(4).Infof("Generated base IV %x for SSE-S3 multipart upload %s", baseIV[:8], uploadIdString) + glog.V(4).Infof("Generated base IV %x for explicit SSE-S3 multipart upload %s", baseIV[:8], uploadIdString) // Generate and serialize SSE-S3 key with proper error handling keyManager := GetSSES3KeyManager() @@ -753,7 +786,77 @@ func (s3a *S3ApiServer) prepareMultipartEncryptionConfig(r *http.Request, upload // Store key in manager for later retrieval keyManager.StoreKey(sseS3Key) - glog.V(4).Infof("Stored SSE-S3 key %s for multipart upload %s", sseS3Key.KeyID, uploadIdString) + glog.V(4).Infof("Stored SSE-S3 key %s for explicit multipart upload %s", sseS3Key.KeyID, uploadIdString) + } + + // If no explicit encryption headers, check bucket-default encryption + // This matches AWS S3 behavior and putToFiler() implementation + if !hasExplicitSSEKMS && !hasExplicitSSES3 { + encryptionConfig, err := s3a.GetBucketEncryptionConfig(bucket) + if err != nil { + // Check if this is just "no encryption configured" vs a real error + if !errors.Is(err, ErrNoEncryptionConfig) { + // Real error - propagate to prevent silent encryption bypass + return nil, fmt.Errorf("failed to read bucket encryption config for multipart upload: %v", err) + } + // No default encryption configured, continue without encryption + } else if encryptionConfig != nil && encryptionConfig.SseAlgorithm != "" { + glog.V(3).Infof("prepareMultipartEncryptionConfig: applying bucket-default encryption %s for bucket %s, upload %s", + encryptionConfig.SseAlgorithm, bucket, uploadIdString) + + switch encryptionConfig.SseAlgorithm { + case EncryptionTypeKMS: + // Apply SSE-KMS as bucket default + config.IsSSEKMS = true + config.KMSKeyID = encryptionConfig.KmsKeyId + config.BucketKeyEnabled = encryptionConfig.BucketKeyEnabled + // No encryption context for bucket defaults + + // Generate and encode base IV + baseIV := make([]byte, s3_constants.AESBlockSize) + n, readErr := rand.Read(baseIV) + if readErr != nil || n != len(baseIV) { + return nil, fmt.Errorf("failed to generate secure IV for bucket-default SSE-KMS multipart upload: %v (read %d/%d bytes)", readErr, n, len(baseIV)) + } + config.KMSBaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV) + glog.V(4).Infof("Generated base IV %x for bucket-default SSE-KMS multipart upload %s", baseIV[:8], uploadIdString) + + case EncryptionTypeAES256: + // Apply SSE-S3 (AES256) as bucket default + config.IsSSES3 = true + + // Generate and encode base IV + baseIV := make([]byte, s3_constants.AESBlockSize) + n, readErr := rand.Read(baseIV) + if readErr != nil || n != len(baseIV) { + return nil, fmt.Errorf("failed to generate secure IV for bucket-default SSE-S3 multipart upload: %v (read %d/%d bytes)", readErr, n, len(baseIV)) + } + config.S3BaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV) + glog.V(4).Infof("Generated base IV %x for bucket-default SSE-S3 multipart upload %s", baseIV[:8], uploadIdString) + + // Generate and serialize SSE-S3 key + keyManager := GetSSES3KeyManager() + sseS3Key, keyErr := keyManager.GetOrCreateKey("") + if keyErr != nil { + return nil, fmt.Errorf("failed to generate SSE-S3 key for bucket-default multipart upload: %v", keyErr) + } + + keyData, serErr := SerializeSSES3Metadata(sseS3Key) + if serErr != nil { + return nil, fmt.Errorf("failed to serialize SSE-S3 metadata for bucket-default multipart upload: %v", serErr) + } + + config.S3KeyDataEncoded = base64.StdEncoding.EncodeToString(keyData) + + // Store key in manager for later retrieval + keyManager.StoreKey(sseS3Key) + glog.V(4).Infof("Stored SSE-S3 key %s for bucket-default multipart upload %s", sseS3Key.KeyID, uploadIdString) + + default: + glog.V(3).Infof("prepareMultipartEncryptionConfig: unsupported bucket-default encryption algorithm %s for bucket %s", + encryptionConfig.SseAlgorithm, bucket) + } + } } return config, nil |
