aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/filer_multipart.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/filer_multipart.go')
-rw-r--r--weed/s3api/filer_multipart.go267
1 files changed, 185 insertions, 82 deletions
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index c4c07f0c7..4b8fbaa62 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -5,7 +5,9 @@ import (
"crypto/rand"
"encoding/base64"
"encoding/hex"
+ "encoding/json"
"encoding/xml"
+ "errors"
"fmt"
"math"
"path/filepath"
@@ -71,7 +73,7 @@ func (s3a *S3ApiServer) createMultipartUpload(r *http.Request, input *s3.CreateM
// Prepare and apply encryption configuration within directory creation
// This ensures encryption resources are only allocated if directory creation succeeds
- encryptionConfig, prepErr := s3a.prepareMultipartEncryptionConfig(r, uploadIdString)
+ encryptionConfig, prepErr := s3a.prepareMultipartEncryptionConfig(r, *input.Bucket, uploadIdString)
if prepErr != nil {
encryptionError = prepErr
return // Exit callback, letting mkdir handle the error
@@ -118,6 +120,36 @@ type CompleteMultipartUploadResult struct {
VersionId *string `xml:"-"`
}
+// copySSEHeadersFromFirstPart copies all SSE-related headers from the first part to the destination entry
+// This is critical for detectPrimarySSEType to work correctly and ensures encryption metadata is preserved
+func copySSEHeadersFromFirstPart(dst *filer_pb.Entry, firstPart *filer_pb.Entry, context string) {
+ if firstPart == nil || firstPart.Extended == nil {
+ return
+ }
+
+ // Copy ALL SSE-related headers (not just SeaweedFSSSEKMSKey)
+ sseKeys := []string{
+ // SSE-C headers
+ s3_constants.SeaweedFSSSEIV,
+ s3_constants.AmzServerSideEncryptionCustomerAlgorithm,
+ s3_constants.AmzServerSideEncryptionCustomerKeyMD5,
+ // SSE-KMS headers
+ s3_constants.SeaweedFSSSEKMSKey,
+ s3_constants.AmzServerSideEncryptionAwsKmsKeyId,
+ // SSE-S3 headers
+ s3_constants.SeaweedFSSSES3Key,
+ // Common SSE header (for SSE-KMS and SSE-S3)
+ s3_constants.AmzServerSideEncryption,
+ }
+
+ for _, key := range sseKeys {
+ if value, exists := firstPart.Extended[key]; exists {
+ dst.Extended[key] = value
+ glog.V(4).Infof("completeMultipartUpload: copied SSE header %s from first part (%s)", key, context)
+ }
+ }
+}
+
func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("completeMultipartUpload input %v", input)
@@ -231,6 +263,16 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
mime := pentry.Attributes.Mime
var finalParts []*filer_pb.FileChunk
var offset int64
+
+ // Track part boundaries for later retrieval with PartNumber parameter
+ type PartBoundary struct {
+ PartNumber int `json:"part"`
+ StartChunk int `json:"start"`
+ EndChunk int `json:"end"` // exclusive
+ ETag string `json:"etag"`
+ }
+ var partBoundaries []PartBoundary
+
for _, partNumber := range completedPartNumbers {
partEntriesByNumber, ok := partEntries[partNumber]
if !ok {
@@ -251,42 +293,18 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
continue
}
- // Track within-part offset for SSE-KMS IV calculation
- var withinPartOffset int64 = 0
+ // Record the start chunk index for this part
+ partStartChunk := len(finalParts)
+
+ // Calculate the part's ETag (for GetObject with PartNumber)
+ partETag := filer.ETag(entry)
for _, chunk := range entry.GetChunks() {
- // Update SSE metadata with correct within-part offset (unified approach for KMS and SSE-C)
- sseKmsMetadata := chunk.SseMetadata
-
- if chunk.SseType == filer_pb.SSEType_SSE_KMS && len(chunk.SseMetadata) > 0 {
- // Deserialize, update offset, and re-serialize SSE-KMS metadata
- if kmsKey, err := DeserializeSSEKMSMetadata(chunk.SseMetadata); err == nil {
- kmsKey.ChunkOffset = withinPartOffset
- if updatedMetadata, serErr := SerializeSSEKMSMetadata(kmsKey); serErr == nil {
- sseKmsMetadata = updatedMetadata
- glog.V(4).Infof("Updated SSE-KMS metadata for chunk in part %d: withinPartOffset=%d", partNumber, withinPartOffset)
- }
- }
- } else if chunk.SseType == filer_pb.SSEType_SSE_C {
- // For SSE-C chunks, create per-chunk metadata using the part's IV
- if ivData, exists := entry.Extended[s3_constants.SeaweedFSSSEIV]; exists {
- // Get keyMD5 from entry metadata if available
- var keyMD5 string
- if keyMD5Data, keyExists := entry.Extended[s3_constants.AmzServerSideEncryptionCustomerKeyMD5]; keyExists {
- keyMD5 = string(keyMD5Data)
- }
-
- // Create SSE-C metadata with the part's IV and this chunk's within-part offset
- if ssecMetadata, serErr := SerializeSSECMetadata(ivData, keyMD5, withinPartOffset); serErr == nil {
- sseKmsMetadata = ssecMetadata // Reuse the same field for unified handling
- glog.V(4).Infof("Created SSE-C metadata for chunk in part %d: withinPartOffset=%d", partNumber, withinPartOffset)
- } else {
- glog.Errorf("Failed to serialize SSE-C metadata for chunk in part %d: %v", partNumber, serErr)
- }
- } else {
- glog.Errorf("SSE-C chunk in part %d missing IV in entry metadata", partNumber)
- }
- }
+ // CRITICAL: Do NOT modify SSE metadata offsets during assembly!
+ // The encrypted data was created with the offset stored in chunk.SseMetadata.
+ // Changing the offset here would cause decryption to fail because CTR mode
+ // uses the offset to initialize the counter. We must decrypt with the same
+ // offset that was used during encryption.
p := &filer_pb.FileChunk{
FileId: chunk.GetFileIdString(),
@@ -296,14 +314,23 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
CipherKey: chunk.CipherKey,
ETag: chunk.ETag,
IsCompressed: chunk.IsCompressed,
- // Preserve SSE metadata with updated within-part offset
+ // Preserve SSE metadata UNCHANGED - do not modify the offset!
SseType: chunk.SseType,
- SseMetadata: sseKmsMetadata,
+ SseMetadata: chunk.SseMetadata,
}
finalParts = append(finalParts, p)
offset += int64(chunk.Size)
- withinPartOffset += int64(chunk.Size)
}
+
+ // Record the part boundary
+ partEndChunk := len(finalParts)
+ partBoundaries = append(partBoundaries, PartBoundary{
+ PartNumber: partNumber,
+ StartChunk: partStartChunk,
+ EndChunk: partEndChunk,
+ ETag: partETag,
+ })
+
found = true
}
}
@@ -325,6 +352,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
}
versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
+ // Store parts count for x-amz-mp-parts-count header
+ versionEntry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
+ // Store part boundaries for GetObject with PartNumber
+ if partBoundariesJSON, err := json.Marshal(partBoundaries); err == nil {
+ versionEntry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
+ }
// Set object owner for versioned multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
@@ -338,17 +371,11 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
}
}
- // Preserve SSE-KMS metadata from the first part (if any)
- // SSE-KMS metadata is stored in individual parts, not the upload directory
+ // Preserve ALL SSE metadata from the first part (if any)
+ // SSE metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := partEntries[completedPartNumbers[0]][0]
- if firstPartEntry.Extended != nil {
- // Copy SSE-KMS metadata from the first part
- if kmsMetadata, exists := firstPartEntry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
- versionEntry.Extended[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata
- glog.V(3).Infof("completeMultipartUpload: preserved SSE-KMS metadata from first part (versioned)")
- }
- }
+ copySSEHeadersFromFirstPart(versionEntry, firstPartEntry, "versioned")
}
if pentry.Attributes.Mime != "" {
versionEntry.Attributes.Mime = pentry.Attributes.Mime
@@ -387,6 +414,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
+ // Store parts count for x-amz-mp-parts-count header
+ entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
+ // Store part boundaries for GetObject with PartNumber
+ if partBoundariesJSON, jsonErr := json.Marshal(partBoundaries); jsonErr == nil {
+ entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
+ }
// Set object owner for suspended versioning multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
@@ -400,17 +433,11 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
}
}
- // Preserve SSE-KMS metadata from the first part (if any)
- // SSE-KMS metadata is stored in individual parts, not the upload directory
+ // Preserve ALL SSE metadata from the first part (if any)
+ // SSE metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := partEntries[completedPartNumbers[0]][0]
- if firstPartEntry.Extended != nil {
- // Copy SSE-KMS metadata from the first part
- if kmsMetadata, exists := firstPartEntry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
- entry.Extended[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata
- glog.V(3).Infof("completeMultipartUpload: preserved SSE-KMS metadata from first part (suspended versioning)")
- }
- }
+ copySSEHeadersFromFirstPart(entry, firstPartEntry, "suspended versioning")
}
if pentry.Attributes.Mime != "" {
entry.Attributes.Mime = pentry.Attributes.Mime
@@ -440,6 +467,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
+ // Store parts count for x-amz-mp-parts-count header
+ entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
+ // Store part boundaries for GetObject with PartNumber
+ if partBoundariesJSON, err := json.Marshal(partBoundaries); err == nil {
+ entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
+ }
// Set object owner for non-versioned multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
@@ -453,17 +486,11 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
}
}
- // Preserve SSE-KMS metadata from the first part (if any)
- // SSE-KMS metadata is stored in individual parts, not the upload directory
+ // Preserve ALL SSE metadata from the first part (if any)
+ // SSE metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := partEntries[completedPartNumbers[0]][0]
- if firstPartEntry.Extended != nil {
- // Copy SSE-KMS metadata from the first part
- if kmsMetadata, exists := firstPartEntry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
- entry.Extended[s3_constants.SeaweedFSSSEKMSKey] = kmsMetadata
- glog.V(3).Infof("completeMultipartUpload: preserved SSE-KMS metadata from first part")
- }
- }
+ copySSEHeadersFromFirstPart(entry, firstPartEntry, "non-versioned")
}
if pentry.Attributes.Mime != "" {
entry.Attributes.Mime = pentry.Attributes.Mime
@@ -510,15 +537,11 @@ func (s3a *S3ApiServer) getEntryNameAndDir(input *s3.CompleteMultipartUploadInpu
if dirName == "." {
dirName = ""
}
- if strings.HasPrefix(dirName, "/") {
- dirName = dirName[1:]
- }
+ dirName = strings.TrimPrefix(dirName, "/")
dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName)
// remove suffix '/'
- if strings.HasSuffix(dirName, "/") {
- dirName = dirName[:len(dirName)-1]
- }
+ dirName = strings.TrimSuffix(dirName, "/")
return entryName, dirName
}
@@ -664,18 +687,23 @@ func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListP
glog.Errorf("listObjectParts %s %s parse %s: %v", *input.Bucket, *input.UploadId, entry.Name, err)
continue
}
- output.Part = append(output.Part, &s3.Part{
+ partETag := filer.ETag(entry)
+ part := &s3.Part{
PartNumber: aws.Int64(int64(partNumber)),
LastModified: aws.Time(time.Unix(entry.Attributes.Mtime, 0).UTC()),
Size: aws.Int64(int64(filer.FileSize(entry))),
- ETag: aws.String("\"" + filer.ETag(entry) + "\""),
- })
+ ETag: aws.String("\"" + partETag + "\""),
+ }
+ output.Part = append(output.Part, part)
+ glog.V(3).Infof("listObjectParts: Added part %d, size=%d, etag=%s",
+ partNumber, filer.FileSize(entry), partETag)
if !isLast {
output.NextPartNumberMarker = aws.Int64(int64(partNumber))
}
}
}
+ glog.V(2).Infof("listObjectParts: Returning %d parts for uploadId=%s", len(output.Part), *input.UploadId)
return
}
@@ -704,11 +732,16 @@ type MultipartEncryptionConfig struct {
// prepareMultipartEncryptionConfig prepares encryption configuration with proper error handling
// This eliminates the need for criticalError variable in callback functions
-func (s3a *S3ApiServer) prepareMultipartEncryptionConfig(r *http.Request, uploadIdString string) (*MultipartEncryptionConfig, error) {
+// Updated to support bucket-default encryption (matches putToFiler behavior)
+func (s3a *S3ApiServer) prepareMultipartEncryptionConfig(r *http.Request, bucket string, uploadIdString string) (*MultipartEncryptionConfig, error) {
config := &MultipartEncryptionConfig{}
- // Prepare SSE-KMS configuration
- if IsSSEKMSRequest(r) {
+ // Check for explicit encryption headers first (priority over bucket defaults)
+ hasExplicitSSEKMS := IsSSEKMSRequest(r)
+ hasExplicitSSES3 := IsSSES3RequestInternal(r)
+
+ // Prepare SSE-KMS configuration (explicit request headers)
+ if hasExplicitSSEKMS {
config.IsSSEKMS = true
config.KMSKeyID = r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId)
config.BucketKeyEnabled = strings.ToLower(r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled)) == "true"
@@ -721,11 +754,11 @@ func (s3a *S3ApiServer) prepareMultipartEncryptionConfig(r *http.Request, upload
return nil, fmt.Errorf("failed to generate secure IV for SSE-KMS multipart upload: %v (read %d/%d bytes)", err, n, len(baseIV))
}
config.KMSBaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV)
- glog.V(4).Infof("Generated base IV %x for SSE-KMS multipart upload %s", baseIV[:8], uploadIdString)
+ glog.V(4).Infof("Generated base IV %x for explicit SSE-KMS multipart upload %s", baseIV[:8], uploadIdString)
}
- // Prepare SSE-S3 configuration
- if IsSSES3RequestInternal(r) {
+ // Prepare SSE-S3 configuration (explicit request headers)
+ if hasExplicitSSES3 {
config.IsSSES3 = true
// Generate and encode base IV with proper error handling
@@ -735,7 +768,7 @@ func (s3a *S3ApiServer) prepareMultipartEncryptionConfig(r *http.Request, upload
return nil, fmt.Errorf("failed to generate secure IV for SSE-S3 multipart upload: %v (read %d/%d bytes)", err, n, len(baseIV))
}
config.S3BaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV)
- glog.V(4).Infof("Generated base IV %x for SSE-S3 multipart upload %s", baseIV[:8], uploadIdString)
+ glog.V(4).Infof("Generated base IV %x for explicit SSE-S3 multipart upload %s", baseIV[:8], uploadIdString)
// Generate and serialize SSE-S3 key with proper error handling
keyManager := GetSSES3KeyManager()
@@ -753,7 +786,77 @@ func (s3a *S3ApiServer) prepareMultipartEncryptionConfig(r *http.Request, upload
// Store key in manager for later retrieval
keyManager.StoreKey(sseS3Key)
- glog.V(4).Infof("Stored SSE-S3 key %s for multipart upload %s", sseS3Key.KeyID, uploadIdString)
+ glog.V(4).Infof("Stored SSE-S3 key %s for explicit multipart upload %s", sseS3Key.KeyID, uploadIdString)
+ }
+
+ // If no explicit encryption headers, check bucket-default encryption
+ // This matches AWS S3 behavior and putToFiler() implementation
+ if !hasExplicitSSEKMS && !hasExplicitSSES3 {
+ encryptionConfig, err := s3a.GetBucketEncryptionConfig(bucket)
+ if err != nil {
+ // Check if this is just "no encryption configured" vs a real error
+ if !errors.Is(err, ErrNoEncryptionConfig) {
+ // Real error - propagate to prevent silent encryption bypass
+ return nil, fmt.Errorf("failed to read bucket encryption config for multipart upload: %v", err)
+ }
+ // No default encryption configured, continue without encryption
+ } else if encryptionConfig != nil && encryptionConfig.SseAlgorithm != "" {
+ glog.V(3).Infof("prepareMultipartEncryptionConfig: applying bucket-default encryption %s for bucket %s, upload %s",
+ encryptionConfig.SseAlgorithm, bucket, uploadIdString)
+
+ switch encryptionConfig.SseAlgorithm {
+ case EncryptionTypeKMS:
+ // Apply SSE-KMS as bucket default
+ config.IsSSEKMS = true
+ config.KMSKeyID = encryptionConfig.KmsKeyId
+ config.BucketKeyEnabled = encryptionConfig.BucketKeyEnabled
+ // No encryption context for bucket defaults
+
+ // Generate and encode base IV
+ baseIV := make([]byte, s3_constants.AESBlockSize)
+ n, readErr := rand.Read(baseIV)
+ if readErr != nil || n != len(baseIV) {
+ return nil, fmt.Errorf("failed to generate secure IV for bucket-default SSE-KMS multipart upload: %v (read %d/%d bytes)", readErr, n, len(baseIV))
+ }
+ config.KMSBaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV)
+ glog.V(4).Infof("Generated base IV %x for bucket-default SSE-KMS multipart upload %s", baseIV[:8], uploadIdString)
+
+ case EncryptionTypeAES256:
+ // Apply SSE-S3 (AES256) as bucket default
+ config.IsSSES3 = true
+
+ // Generate and encode base IV
+ baseIV := make([]byte, s3_constants.AESBlockSize)
+ n, readErr := rand.Read(baseIV)
+ if readErr != nil || n != len(baseIV) {
+ return nil, fmt.Errorf("failed to generate secure IV for bucket-default SSE-S3 multipart upload: %v (read %d/%d bytes)", readErr, n, len(baseIV))
+ }
+ config.S3BaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV)
+ glog.V(4).Infof("Generated base IV %x for bucket-default SSE-S3 multipart upload %s", baseIV[:8], uploadIdString)
+
+ // Generate and serialize SSE-S3 key
+ keyManager := GetSSES3KeyManager()
+ sseS3Key, keyErr := keyManager.GetOrCreateKey("")
+ if keyErr != nil {
+ return nil, fmt.Errorf("failed to generate SSE-S3 key for bucket-default multipart upload: %v", keyErr)
+ }
+
+ keyData, serErr := SerializeSSES3Metadata(sseS3Key)
+ if serErr != nil {
+ return nil, fmt.Errorf("failed to serialize SSE-S3 metadata for bucket-default multipart upload: %v", serErr)
+ }
+
+ config.S3KeyDataEncoded = base64.StdEncoding.EncodeToString(keyData)
+
+ // Store key in manager for later retrieval
+ keyManager.StoreKey(sseS3Key)
+ glog.V(4).Infof("Stored SSE-S3 key %s for bucket-default multipart upload %s", sseS3Key.KeyID, uploadIdString)
+
+ default:
+ glog.V(3).Infof("prepareMultipartEncryptionConfig: unsupported bucket-default encryption algorithm %s for bucket %s",
+ encryptionConfig.SseAlgorithm, bucket)
+ }
+ }
}
return config, nil