aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api')
-rw-r--r--weed/s3api/chunked_reader_v4.go77
-rw-r--r--weed/s3api/s3api_bucket_config.go4
-rw-r--r--weed/s3api/s3api_object_handlers_copy.go119
-rw-r--r--weed/s3api/s3api_object_handlers_copy_unified.go108
-rw-r--r--weed/s3api/s3api_object_handlers_delete.go57
-rw-r--r--weed/s3api/s3api_object_handlers_put.go27
-rw-r--r--weed/s3api/s3api_streaming_copy.go60
7 files changed, 243 insertions, 209 deletions
diff --git a/weed/s3api/chunked_reader_v4.go b/weed/s3api/chunked_reader_v4.go
index c21b57009..f841c3e1e 100644
--- a/weed/s3api/chunked_reader_v4.go
+++ b/weed/s3api/chunked_reader_v4.go
@@ -116,6 +116,7 @@ func (iam *IdentityAccessManagement) newChunkedReader(req *http.Request) (io.Rea
}
checkSumWriter := getCheckSumWriter(checksumAlgorithm)
+ hasTrailer := amzTrailerHeader != ""
return &s3ChunkedReader{
cred: credential,
@@ -129,6 +130,7 @@ func (iam *IdentityAccessManagement) newChunkedReader(req *http.Request) (io.Rea
checkSumWriter: checkSumWriter,
state: readChunkHeader,
iam: iam,
+ hasTrailer: hasTrailer,
}, s3err.ErrNone
}
@@ -170,6 +172,7 @@ type s3ChunkedReader struct {
n uint64 // Unread bytes in chunk
err error
iam *IdentityAccessManagement
+ hasTrailer bool
}
// Read chunk reads the chunk token signature portion.
@@ -281,10 +284,10 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) {
}
// If we're using unsigned streaming upload, there is no signature to verify at each chunk.
- if cr.chunkSignature != "" {
- cr.state = verifyChunk
- } else if cr.lastChunk {
+ if cr.lastChunk && cr.hasTrailer {
cr.state = readTrailerChunk
+ } else if cr.chunkSignature != "" {
+ cr.state = verifyChunk
} else {
cr.state = readChunkHeader
}
@@ -304,7 +307,11 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) {
// This implementation currently only supports the first case.
// TODO: Implement the second case (signed upload with additional checksum computation for each chunk)
- extractedCheckSumAlgorithm, extractedChecksum := parseChunkChecksum(cr.reader)
+ extractedCheckSumAlgorithm, extractedChecksum, err := parseChunkChecksum(cr.reader)
+ if err != nil {
+ cr.err = err
+ return 0, err
+ }
if extractedCheckSumAlgorithm.String() != cr.checkSumAlgorithm {
errorMessage := fmt.Sprintf("checksum algorithm in trailer '%s' does not match the one advertised in the header '%s'", extractedCheckSumAlgorithm.String(), cr.checkSumAlgorithm)
@@ -313,6 +320,7 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) {
return 0, cr.err
}
+ // Validate checksum for data integrity (required for both signed and unsigned streaming with trailers)
computedChecksum := cr.checkSumWriter.Sum(nil)
base64Checksum := base64.StdEncoding.EncodeToString(computedChecksum)
if string(extractedChecksum) != base64Checksum {
@@ -324,11 +332,6 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) {
// TODO: Extract signature from trailer chunk and verify it.
// For now, we just read the trailer chunk and discard it.
- // Reading remaining CRLF.
- for i := 0; i < 2; i++ {
- cr.err = readCRLF(cr.reader)
- }
-
cr.state = eofChunk
case readChunk:
@@ -506,41 +509,37 @@ func parseS3ChunkExtension(buf []byte) ([]byte, []byte) {
return buf[:semi], parseChunkSignature(buf[semi:])
}
-func parseChunkChecksum(b *bufio.Reader) (ChecksumAlgorithm, []byte) {
- // When using unsigned upload, this would be the raw contents of the trailer chunk:
- //
- // x-amz-checksum-crc32:YABb/g==\n\r\n\r\n // Trailer chunk (note optional \n character)
- // \r\n // CRLF
- //
- // When using signed upload with an additional checksum algorithm, this would be the raw contents of the trailer chunk:
- //
- // x-amz-checksum-crc32:YABb/g==\n\r\n // Trailer chunk (note optional \n character)
- // trailer-signature\r\n
- // \r\n // CRLF
- //
-
- // x-amz-checksum-crc32:YABb/g==\n
- bytesRead, err := readChunkLine(b)
- if err != nil {
- return ChecksumAlgorithmNone, nil
- }
+func parseChunkChecksum(b *bufio.Reader) (ChecksumAlgorithm, []byte, error) {
+ // Read trailer lines until empty line
+ var checksumAlgorithm ChecksumAlgorithm
+ var checksum []byte
- // Split on ':'
- parts := bytes.SplitN(bytesRead, []byte(":"), 2)
- checksumKey := string(parts[0])
- checksumValue := parts[1]
+ for {
+ bytesRead, err := readChunkLine(b)
+ if err != nil {
+ return ChecksumAlgorithmNone, nil, err
+ }
- // Discard all trailing whitespace characters
- checksumValue = trimTrailingWhitespace(checksumValue)
+ if len(bytesRead) == 0 {
+ break
+ }
- // If the checksum key is not a supported checksum algorithm, return an error.
- // TODO: Bubble that error up to the caller
- extractedAlgorithm, err := extractChecksumAlgorithm(checksumKey)
- if err != nil {
- return ChecksumAlgorithmNone, nil
+ parts := bytes.SplitN(bytesRead, []byte(":"), 2)
+ if len(parts) == 2 {
+ key := string(bytes.TrimSpace(parts[0]))
+ value := bytes.TrimSpace(parts[1])
+ if alg, err := extractChecksumAlgorithm(key); err == nil {
+ if checksumAlgorithm != ChecksumAlgorithmNone {
+ glog.V(3).Infof("multiple checksum headers found in trailer, using last: %s", key)
+ }
+ checksumAlgorithm = alg
+ checksum = value
+ }
+ // Ignore other trailer headers like x-amz-trailer-signature
+ }
}
- return extractedAlgorithm, checksumValue
+ return checksumAlgorithm, checksum, nil
}
func parseChunkSignature(chunk []byte) []byte {
diff --git a/weed/s3api/s3api_bucket_config.go b/weed/s3api/s3api_bucket_config.go
index 00449d80a..a10374339 100644
--- a/weed/s3api/s3api_bucket_config.go
+++ b/weed/s3api/s3api_bucket_config.go
@@ -519,7 +519,9 @@ func (s3a *S3ApiServer) getVersioningState(bucket string) (string, error) {
config, errCode := s3a.getBucketConfig(bucket)
if errCode != s3err.ErrNone {
if errCode == s3err.ErrNoSuchBucket {
- return "", nil
+ // Signal to callers that the bucket does not exist so they can
+ // decide whether to auto-create it (e.g., in PUT handlers).
+ return "", filer_pb.ErrNotFound
}
glog.Errorf("getVersioningState: failed to get bucket config for %s: %v", bucket, errCode)
return "", fmt.Errorf("failed to get bucket config: %v", errCode)
diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go
index 4e465919c..66d4ded80 100644
--- a/weed/s3api/s3api_object_handlers_copy.go
+++ b/weed/s3api/s3api_object_handlers_copy.go
@@ -199,7 +199,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
}
// Process metadata and tags and apply to destination
- processedMetadata, tagErr := processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging)
+ // Use dstEntry.Extended (already filtered) as the source, not entry.Extended,
+ // to preserve the encryption header filtering. Fixes GitHub #7562.
+ processedMetadata, tagErr := processMetadataBytes(r.Header, dstEntry.Extended, replaceMeta, replaceTagging)
if tagErr != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
return
@@ -1543,7 +1545,7 @@ func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySo
}
// copyMultipartCrossEncryption handles all cross-encryption and decrypt-only copy scenarios
-// This unified function supports: SSE-C↔SSE-KMS, SSE-C→Plain, SSE-KMS→Plain
+// This unified function supports: SSE-C↔SSE-KMS↔SSE-S3, and any→Plain
func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
var dstChunks []*filer_pb.FileChunk
@@ -1552,6 +1554,7 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
var destKMSKeyID string
var destKMSEncryptionContext map[string]string
var destKMSBucketKeyEnabled bool
+ var destSSES3Key *SSES3Key
if state.DstSSEC {
var err error
@@ -1565,7 +1568,13 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
if err != nil {
return nil, nil, fmt.Errorf("failed to parse destination SSE-KMS headers: %w", err)
}
- } else {
+ } else if state.DstSSES3 {
+ // Generate SSE-S3 key for destination
+ var err error
+ destSSES3Key, err = GenerateSSES3Key()
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to generate SSE-S3 key: %w", err)
+ }
}
// Parse source encryption parameters
@@ -1584,12 +1593,18 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
var err error
if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
- copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state)
+ copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
} else if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS {
- copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state)
+ copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
+ } else if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 {
+ copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
} else {
- // Unencrypted chunk, copy directly
- copiedChunk, err = s3a.copySingleChunk(chunk, dstPath)
+ // Unencrypted chunk - may need encryption if destination requires it
+ if state.DstSSEC || state.DstSSEKMS || state.DstSSES3 {
+ copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
+ } else {
+ copiedChunk, err = s3a.copySingleChunk(chunk, dstPath)
+ }
}
if err != nil {
@@ -1640,6 +1655,40 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
} else {
glog.Errorf("Failed to serialize SSE-KMS metadata: %v", serErr)
}
+ } else if state.DstSSES3 && destSSES3Key != nil {
+ // For SSE-S3 destination, create object-level metadata
+ var sses3Metadata *SSES3Key
+ if len(dstChunks) == 0 {
+ // Handle 0-byte files - generate IV for metadata even though there's no content to encrypt
+ if entry.Attributes.FileSize != 0 {
+ return nil, nil, fmt.Errorf("internal error: no chunks created for non-empty SSE-S3 destination object")
+ }
+ // Generate IV for 0-byte object metadata
+ iv := make([]byte, s3_constants.AESBlockSize)
+ if _, err := io.ReadFull(rand.Reader, iv); err != nil {
+ return nil, nil, fmt.Errorf("generate IV for 0-byte object: %w", err)
+ }
+ destSSES3Key.IV = iv
+ sses3Metadata = destSSES3Key
+ } else {
+ // For non-empty objects, use the first chunk's metadata
+ if dstChunks[0].GetSseType() != filer_pb.SSEType_SSE_S3 || len(dstChunks[0].GetSseMetadata()) == 0 {
+ return nil, nil, fmt.Errorf("internal error: first chunk is missing expected SSE-S3 metadata for destination object")
+ }
+ keyManager := GetSSES3KeyManager()
+ var err error
+ sses3Metadata, err = DeserializeSSES3Metadata(dstChunks[0].GetSseMetadata(), keyManager)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to deserialize SSE-S3 metadata from first chunk: %w", err)
+ }
+ }
+ // Use the derived key with its IV for object-level metadata
+ keyData, serErr := SerializeSSES3Metadata(sses3Metadata)
+ if serErr != nil {
+ return nil, nil, fmt.Errorf("failed to serialize SSE-S3 metadata: %w", serErr)
+ }
+ dstMetadata[s3_constants.SeaweedFSSSES3Key] = keyData
+ dstMetadata[s3_constants.AmzServerSideEncryption] = []byte("AES256")
}
// For unencrypted destination, no metadata needed (dstMetadata remains empty)
@@ -1647,7 +1696,7 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
}
// copyCrossEncryptionChunk handles copying a single chunk with cross-encryption support
-func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) {
+func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, destSSES3Key *SSES3Key, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) {
// Create destination chunk
dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
@@ -1747,6 +1796,30 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour
previewLen = len(finalData)
}
+ } else if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 {
+ // Decrypt SSE-S3 source
+ if len(chunk.GetSseMetadata()) == 0 {
+ return nil, fmt.Errorf("SSE-S3 chunk missing per-chunk metadata")
+ }
+
+ keyManager := GetSSES3KeyManager()
+ sourceSSEKey, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager)
+ if err != nil {
+ return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err)
+ }
+
+ decryptedReader, decErr := CreateSSES3DecryptedReader(bytes.NewReader(encryptedData), sourceSSEKey, sourceSSEKey.IV)
+ if decErr != nil {
+ return nil, fmt.Errorf("create SSE-S3 decrypted reader: %w", decErr)
+ }
+
+ decryptedData, readErr := io.ReadAll(decryptedReader)
+ if readErr != nil {
+ return nil, fmt.Errorf("decrypt SSE-S3 chunk data: %w", readErr)
+ }
+ finalData = decryptedData
+ glog.V(4).Infof("Decrypted SSE-S3 chunk, size: %d", len(finalData))
+
} else {
// Source is unencrypted
finalData = encryptedData
@@ -1808,6 +1881,36 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour
dstChunk.SseMetadata = kmsMetadata
glog.V(4).Infof("Re-encrypted chunk with SSE-KMS")
+
+ } else if state.DstSSES3 && destSSES3Key != nil {
+ // Encrypt with SSE-S3
+ encryptedReader, iv, encErr := CreateSSES3EncryptedReader(bytes.NewReader(finalData), destSSES3Key)
+ if encErr != nil {
+ return nil, fmt.Errorf("create SSE-S3 encrypted reader: %w", encErr)
+ }
+
+ reencryptedData, readErr := io.ReadAll(encryptedReader)
+ if readErr != nil {
+ return nil, fmt.Errorf("re-encrypt with SSE-S3: %w", readErr)
+ }
+ finalData = reencryptedData
+
+ // Create per-chunk SSE-S3 metadata with chunk-specific IV
+ chunkSSEKey := &SSES3Key{
+ Key: destSSES3Key.Key,
+ KeyID: destSSES3Key.KeyID,
+ Algorithm: destSSES3Key.Algorithm,
+ IV: iv,
+ }
+ sses3Metadata, err := SerializeSSES3Metadata(chunkSSEKey)
+ if err != nil {
+ return nil, fmt.Errorf("serialize SSE-S3 metadata: %w", err)
+ }
+
+ dstChunk.SseType = filer_pb.SSEType_SSE_S3
+ dstChunk.SseMetadata = sses3Metadata
+
+ glog.V(4).Infof("Re-encrypted chunk with SSE-S3")
}
// For unencrypted destination, finalData remains as decrypted plaintext
diff --git a/weed/s3api/s3api_object_handlers_copy_unified.go b/weed/s3api/s3api_object_handlers_copy_unified.go
index 255c3eb2d..f1b4ff280 100644
--- a/weed/s3api/s3api_object_handlers_copy_unified.go
+++ b/weed/s3api/s3api_object_handlers_copy_unified.go
@@ -1,7 +1,6 @@
package s3api
import (
- "context"
"errors"
"fmt"
"net/http"
@@ -133,9 +132,9 @@ func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Reques
}
if state.DstSSES3 {
- // Use streaming copy for SSE-S3 encryption
- chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
- return chunks, nil, err
+ // Use chunk-by-chunk copy for SSE-S3 encryption (consistent with SSE-C and SSE-KMS)
+ glog.V(2).Infof("Plain→SSE-S3 copy: using unified multipart encrypt copy")
+ return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
}
return nil, nil, fmt.Errorf("unknown target encryption type")
@@ -143,30 +142,18 @@ func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Reques
// executeDecryptCopy handles encrypted → plain copies
func (s3a *S3ApiServer) executeDecryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
- // Use unified multipart-aware decrypt copy for all encryption types
- if state.SrcSSEC || state.SrcSSEKMS {
+ // Use unified multipart-aware decrypt copy for all encryption types (consistent chunk-by-chunk)
+ if state.SrcSSEC || state.SrcSSEKMS || state.SrcSSES3 {
glog.V(2).Infof("Encrypted→Plain copy: using unified multipart decrypt copy")
return s3a.copyMultipartCrossEncryption(entry, r, state, "", dstPath)
}
- if state.SrcSSES3 {
- // Use streaming copy for SSE-S3 decryption
- chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
- return chunks, nil, err
- }
-
return nil, nil, fmt.Errorf("unknown source encryption type")
}
// executeReencryptCopy handles encrypted → encrypted copies with different keys/methods
func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
- // Check if we should use streaming copy for better performance
- if s3a.shouldUseStreamingCopy(entry, state) {
- chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
- return chunks, nil, err
- }
-
- // Fallback to chunk-by-chunk approach for compatibility
+ // Use chunk-by-chunk approach for all cross-encryption scenarios (consistent behavior)
if state.SrcSSEC && state.DstSSEC {
return s3a.copyChunksWithSSEC(entry, r)
}
@@ -177,83 +164,8 @@ func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Requ
return chunks, dstMetadata, err
}
- if state.SrcSSEC && state.DstSSEKMS {
- // SSE-C → SSE-KMS: use unified multipart-aware cross-encryption copy
- glog.V(2).Infof("SSE-C→SSE-KMS cross-encryption copy: using unified multipart copy")
- return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
- }
-
- if state.SrcSSEKMS && state.DstSSEC {
- // SSE-KMS → SSE-C: use unified multipart-aware cross-encryption copy
- glog.V(2).Infof("SSE-KMS→SSE-C cross-encryption copy: using unified multipart copy")
- return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
- }
-
- // Handle SSE-S3 cross-encryption scenarios
- if state.SrcSSES3 || state.DstSSES3 {
- // Any scenario involving SSE-S3 uses streaming copy
- chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
- return chunks, nil, err
- }
-
- return nil, nil, fmt.Errorf("unsupported cross-encryption scenario")
-}
-
-// shouldUseStreamingCopy determines if streaming copy should be used
-func (s3a *S3ApiServer) shouldUseStreamingCopy(entry *filer_pb.Entry, state *EncryptionState) bool {
- // Use streaming copy for large files or when beneficial
- fileSize := entry.Attributes.FileSize
-
- // Use streaming for files larger than 10MB
- if fileSize > 10*1024*1024 {
- return true
- }
-
- // Check if this is a multipart encrypted object
- isMultipartEncrypted := false
- if state.IsSourceEncrypted() {
- encryptedChunks := 0
- for _, chunk := range entry.GetChunks() {
- if chunk.GetSseType() != filer_pb.SSEType_NONE {
- encryptedChunks++
- }
- }
- isMultipartEncrypted = encryptedChunks > 1
- }
-
- // For multipart encrypted objects, avoid streaming copy to use per-chunk metadata approach
- if isMultipartEncrypted {
- glog.V(3).Infof("Multipart encrypted object detected, using chunk-by-chunk approach")
- return false
- }
-
- // Use streaming for cross-encryption scenarios (for single-part objects only)
- if state.IsSourceEncrypted() && state.IsTargetEncrypted() {
- srcType := s3a.getEncryptionTypeString(state.SrcSSEC, state.SrcSSEKMS, state.SrcSSES3)
- dstType := s3a.getEncryptionTypeString(state.DstSSEC, state.DstSSEKMS, state.DstSSES3)
- if srcType != dstType {
- return true
- }
- }
-
- // Use streaming for compressed files
- if isCompressedEntry(entry) {
- return true
- }
-
- // Use streaming for SSE-S3 scenarios (always)
- if state.SrcSSES3 || state.DstSSES3 {
- return true
- }
-
- return false
-}
-
-// executeStreamingReencryptCopy performs streaming re-encryption copy
-func (s3a *S3ApiServer) executeStreamingReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, error) {
- // Create streaming copy manager
- streamingManager := NewStreamingCopyManager(s3a)
-
- // Execute streaming copy
- return streamingManager.ExecuteStreamingCopy(context.Background(), entry, r, dstPath, state)
+ // All other cross-encryption scenarios use unified multipart copy
+ // This includes: SSE-C↔SSE-KMS, SSE-C↔SSE-S3, SSE-KMS↔SSE-S3, SSE-S3↔SSE-S3
+ glog.V(2).Infof("Cross-encryption copy: using unified multipart copy")
+ return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
}
diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go
index f779a6edc..6e373bb4e 100644
--- a/weed/s3api/s3api_object_handlers_delete.go
+++ b/weed/s3api/s3api_object_handlers_delete.go
@@ -1,12 +1,10 @@
package s3api
import (
- "context"
"encoding/xml"
"fmt"
"io"
"net/http"
- "slices"
"strings"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -127,22 +125,9 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
dir, name := target.DirAndName()
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- // Use operation context that won't be cancelled if request terminates
- // This ensures deletion completes atomically to avoid inconsistent state
- opCtx := context.WithoutCancel(r.Context())
-
- if err := doDeleteEntry(client, dir, name, true, false); err != nil {
- return err
- }
-
- // Cleanup empty directories
- if !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0 {
- bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
- // Recursively delete empty parent directories, stop at bucket path
- filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dir), util.FullPath(bucketPath), nil)
- }
-
- return nil
+ return doDeleteEntry(client, dir, name, true, false)
+ // Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner
+ // which listens to metadata events and uses consistent hashing for coordination
})
if err != nil {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@@ -222,8 +207,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
var deleteErrors []DeleteError
var auditLog *s3err.AccessLog
- directoriesWithDeletion := make(map[string]bool)
-
if s3err.Logger != nil {
auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
}
@@ -245,10 +228,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
versioningConfigured := (versioningState != "")
s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- // Use operation context that won't be cancelled if request terminates
- // This ensures batch deletion completes atomically to avoid inconsistent state
- opCtx := context.WithoutCancel(r.Context())
-
// delete file entries
for _, object := range deleteObjects.Objects {
if object.Key == "" {
@@ -357,10 +336,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
if err == nil {
- // Track directory for empty directory cleanup
- if !s3a.option.AllowEmptyFolder {
- directoriesWithDeletion[parentDirectoryPath] = true
- }
deletedObjects = append(deletedObjects, object)
} else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
deletedObjects = append(deletedObjects, object)
@@ -380,30 +355,8 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
}
- // Cleanup empty directories - optimize by processing deepest first
- if !s3a.option.AllowEmptyFolder && len(directoriesWithDeletion) > 0 {
- bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket)
-
- // Collect and sort directories by depth (deepest first) to avoid redundant checks
- var allDirs []string
- for dirPath := range directoriesWithDeletion {
- allDirs = append(allDirs, dirPath)
- }
- // Sort by depth (deeper directories first)
- slices.SortFunc(allDirs, func(a, b string) int {
- return strings.Count(b, "/") - strings.Count(a, "/")
- })
-
- // Track already-checked directories to avoid redundant work
- checked := make(map[string]bool)
- for _, dirPath := range allDirs {
- if !checked[dirPath] {
- // Recursively delete empty parent directories, stop at bucket path
- // Mark this directory and all its parents as checked during recursion
- filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dirPath), util.FullPath(bucketPath), checked)
- }
- }
- }
+ // Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner
+ // which listens to metadata events and uses consistent hashing for coordination
return nil
})
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go
index 100796b2e..f848790de 100644
--- a/weed/s3api/s3api_object_handlers_put.go
+++ b/weed/s3api/s3api_object_handlers_put.go
@@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
+ "net/url"
"path/filepath"
"strconv"
"strings"
@@ -548,6 +549,28 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader
}
}
+ // Parse and store object tags from X-Amz-Tagging header
+ // Fix for GitHub issue #7589: Tags sent during object upload were not being stored
+ if tagging := r.Header.Get(s3_constants.AmzObjectTagging); tagging != "" {
+ parsedTags, err := url.ParseQuery(tagging)
+ if err != nil {
+ glog.Warningf("putToFiler: Invalid S3 tag format in header '%s': %v", tagging, err)
+ return "", s3err.ErrInvalidTag, SSEResponseMetadata{}
+ }
+ for key, values := range parsedTags {
+ if len(values) > 1 {
+ glog.Warningf("putToFiler: Duplicate tag key '%s' in header", key)
+ return "", s3err.ErrInvalidTag, SSEResponseMetadata{}
+ }
+ value := ""
+ if len(values) > 0 {
+ value = values[0]
+ }
+ entry.Extended[s3_constants.AmzObjectTagging+"-"+key] = []byte(value)
+ }
+ glog.V(3).Infof("putToFiler: stored %d tags from X-Amz-Tagging header", len(parsedTags))
+ }
+
// Set SSE-C metadata
if customerKey != nil && len(sseIV) > 0 {
// Store IV as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes)
@@ -680,9 +703,9 @@ func filerErrorToS3Error(err error) s3err.ErrorCode {
if err == nil {
return s3err.ErrNone
}
-
+
errString := err.Error()
-
+
switch {
case errString == constants.ErrMsgBadDigest:
return s3err.ErrBadDigest
diff --git a/weed/s3api/s3api_streaming_copy.go b/weed/s3api/s3api_streaming_copy.go
index 457986858..94729c003 100644
--- a/weed/s3api/s3api_streaming_copy.go
+++ b/weed/s3api/s3api_streaming_copy.go
@@ -59,18 +59,19 @@ func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager {
}
}
-// ExecuteStreamingCopy performs a streaming copy operation
-func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, error) {
+// ExecuteStreamingCopy performs a streaming copy operation and returns the encryption spec
+// The encryption spec is needed for SSE-S3 to properly set destination metadata (fixes GitHub #7562)
+func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, *EncryptionSpec, error) {
// Create streaming copy specification
spec, err := scm.createStreamingSpec(entry, r, state)
if err != nil {
- return nil, fmt.Errorf("create streaming spec: %w", err)
+ return nil, nil, fmt.Errorf("create streaming spec: %w", err)
}
// Create source reader from entry
sourceReader, err := scm.createSourceReader(entry)
if err != nil {
- return nil, fmt.Errorf("create source reader: %w", err)
+ return nil, nil, fmt.Errorf("create source reader: %w", err)
}
defer sourceReader.Close()
@@ -79,11 +80,16 @@ func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry
// Create processing pipeline
processedReader, err := scm.createProcessingPipeline(spec)
if err != nil {
- return nil, fmt.Errorf("create processing pipeline: %w", err)
+ return nil, nil, fmt.Errorf("create processing pipeline: %w", err)
}
// Stream to destination
- return scm.streamToDestination(ctx, processedReader, spec, dstPath)
+ chunks, err := scm.streamToDestination(ctx, processedReader, spec, dstPath)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return chunks, spec.EncryptionSpec, nil
}
// createStreamingSpec creates a streaming specification based on copy parameters
@@ -453,8 +459,8 @@ func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.R
for {
n, err := reader.Read(buffer)
if n > 0 {
- // Create chunk for this data
- chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath)
+ // Create chunk for this data, setting SSE type and per-chunk metadata (including chunk-specific IVs for SSE-S3)
+ chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath, spec.EncryptionSpec)
if chunkErr != nil {
return nil, fmt.Errorf("create chunk from data: %w", chunkErr)
}
@@ -474,7 +480,7 @@ func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.R
}
// createChunkFromData creates a chunk from streaming data
-func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string) (*filer_pb.FileChunk, error) {
+func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string, encSpec *EncryptionSpec) (*filer_pb.FileChunk, error) {
// Assign new volume
assignResult, err := scm.s3a.assignNewVolume(dstPath)
if err != nil {
@@ -487,6 +493,42 @@ func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64,
Size: uint64(len(data)),
}
+ // Set SSE type and metadata on chunk if destination is encrypted
+ // This is critical for GetObject to know to decrypt the data - fixes GitHub #7562
+ if encSpec != nil && encSpec.NeedsEncryption {
+ switch encSpec.DestinationType {
+ case EncryptionTypeSSEC:
+ chunk.SseType = filer_pb.SSEType_SSE_C
+ // SSE-C metadata is handled at object level, not per-chunk for streaming copy
+ case EncryptionTypeSSEKMS:
+ chunk.SseType = filer_pb.SSEType_SSE_KMS
+ // SSE-KMS metadata is handled at object level, not per-chunk for streaming copy
+ case EncryptionTypeSSES3:
+ chunk.SseType = filer_pb.SSEType_SSE_S3
+ // Create per-chunk SSE-S3 metadata with chunk-specific IV
+ if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok {
+ // Calculate chunk-specific IV using base IV and chunk offset
+ baseIV := encSpec.DestinationIV
+ if len(baseIV) == 0 {
+ return nil, fmt.Errorf("SSE-S3 encryption requires DestinationIV to be set for chunk at offset %d", offset)
+ }
+ chunkIV, _ := calculateIVWithOffset(baseIV, offset)
+ // Create chunk key with the chunk-specific IV
+ chunkSSEKey := &SSES3Key{
+ Key: sseKey.Key,
+ KeyID: sseKey.KeyID,
+ Algorithm: sseKey.Algorithm,
+ IV: chunkIV,
+ }
+ chunkMetadata, serErr := SerializeSSES3Metadata(chunkSSEKey)
+ if serErr != nil {
+ return nil, fmt.Errorf("failed to serialize chunk SSE-S3 metadata: %w", serErr)
+ }
+ chunk.SseMetadata = chunkMetadata
+ }
+ }
+ }
+
// Set file ID
if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil {
return nil, err