diff options
Diffstat (limited to 'weed/s3api')
| -rw-r--r-- | weed/s3api/chunked_reader_v4.go | 77 | ||||
| -rw-r--r-- | weed/s3api/s3api_bucket_config.go | 4 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_handlers_copy.go | 119 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_handlers_copy_unified.go | 108 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_handlers_delete.go | 57 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_handlers_put.go | 27 | ||||
| -rw-r--r-- | weed/s3api/s3api_streaming_copy.go | 60 |
7 files changed, 243 insertions, 209 deletions
diff --git a/weed/s3api/chunked_reader_v4.go b/weed/s3api/chunked_reader_v4.go index c21b57009..f841c3e1e 100644 --- a/weed/s3api/chunked_reader_v4.go +++ b/weed/s3api/chunked_reader_v4.go @@ -116,6 +116,7 @@ func (iam *IdentityAccessManagement) newChunkedReader(req *http.Request) (io.Rea } checkSumWriter := getCheckSumWriter(checksumAlgorithm) + hasTrailer := amzTrailerHeader != "" return &s3ChunkedReader{ cred: credential, @@ -129,6 +130,7 @@ func (iam *IdentityAccessManagement) newChunkedReader(req *http.Request) (io.Rea checkSumWriter: checkSumWriter, state: readChunkHeader, iam: iam, + hasTrailer: hasTrailer, }, s3err.ErrNone } @@ -170,6 +172,7 @@ type s3ChunkedReader struct { n uint64 // Unread bytes in chunk err error iam *IdentityAccessManagement + hasTrailer bool } // Read chunk reads the chunk token signature portion. @@ -281,10 +284,10 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) { } // If we're using unsigned streaming upload, there is no signature to verify at each chunk. - if cr.chunkSignature != "" { - cr.state = verifyChunk - } else if cr.lastChunk { + if cr.lastChunk && cr.hasTrailer { cr.state = readTrailerChunk + } else if cr.chunkSignature != "" { + cr.state = verifyChunk } else { cr.state = readChunkHeader } @@ -304,7 +307,11 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) { // This implementation currently only supports the first case. // TODO: Implement the second case (signed upload with additional checksum computation for each chunk) - extractedCheckSumAlgorithm, extractedChecksum := parseChunkChecksum(cr.reader) + extractedCheckSumAlgorithm, extractedChecksum, err := parseChunkChecksum(cr.reader) + if err != nil { + cr.err = err + return 0, err + } if extractedCheckSumAlgorithm.String() != cr.checkSumAlgorithm { errorMessage := fmt.Sprintf("checksum algorithm in trailer '%s' does not match the one advertised in the header '%s'", extractedCheckSumAlgorithm.String(), cr.checkSumAlgorithm) @@ -313,6 +320,7 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) { return 0, cr.err } + // Validate checksum for data integrity (required for both signed and unsigned streaming with trailers) computedChecksum := cr.checkSumWriter.Sum(nil) base64Checksum := base64.StdEncoding.EncodeToString(computedChecksum) if string(extractedChecksum) != base64Checksum { @@ -324,11 +332,6 @@ func (cr *s3ChunkedReader) Read(buf []byte) (n int, err error) { // TODO: Extract signature from trailer chunk and verify it. // For now, we just read the trailer chunk and discard it. - // Reading remaining CRLF. - for i := 0; i < 2; i++ { - cr.err = readCRLF(cr.reader) - } - cr.state = eofChunk case readChunk: @@ -506,41 +509,37 @@ func parseS3ChunkExtension(buf []byte) ([]byte, []byte) { return buf[:semi], parseChunkSignature(buf[semi:]) } -func parseChunkChecksum(b *bufio.Reader) (ChecksumAlgorithm, []byte) { - // When using unsigned upload, this would be the raw contents of the trailer chunk: - // - // x-amz-checksum-crc32:YABb/g==\n\r\n\r\n // Trailer chunk (note optional \n character) - // \r\n // CRLF - // - // When using signed upload with an additional checksum algorithm, this would be the raw contents of the trailer chunk: - // - // x-amz-checksum-crc32:YABb/g==\n\r\n // Trailer chunk (note optional \n character) - // trailer-signature\r\n - // \r\n // CRLF - // - - // x-amz-checksum-crc32:YABb/g==\n - bytesRead, err := readChunkLine(b) - if err != nil { - return ChecksumAlgorithmNone, nil - } +func parseChunkChecksum(b *bufio.Reader) (ChecksumAlgorithm, []byte, error) { + // Read trailer lines until empty line + var checksumAlgorithm ChecksumAlgorithm + var checksum []byte - // Split on ':' - parts := bytes.SplitN(bytesRead, []byte(":"), 2) - checksumKey := string(parts[0]) - checksumValue := parts[1] + for { + bytesRead, err := readChunkLine(b) + if err != nil { + return ChecksumAlgorithmNone, nil, err + } - // Discard all trailing whitespace characters - checksumValue = trimTrailingWhitespace(checksumValue) + if len(bytesRead) == 0 { + break + } - // If the checksum key is not a supported checksum algorithm, return an error. - // TODO: Bubble that error up to the caller - extractedAlgorithm, err := extractChecksumAlgorithm(checksumKey) - if err != nil { - return ChecksumAlgorithmNone, nil + parts := bytes.SplitN(bytesRead, []byte(":"), 2) + if len(parts) == 2 { + key := string(bytes.TrimSpace(parts[0])) + value := bytes.TrimSpace(parts[1]) + if alg, err := extractChecksumAlgorithm(key); err == nil { + if checksumAlgorithm != ChecksumAlgorithmNone { + glog.V(3).Infof("multiple checksum headers found in trailer, using last: %s", key) + } + checksumAlgorithm = alg + checksum = value + } + // Ignore other trailer headers like x-amz-trailer-signature + } } - return extractedAlgorithm, checksumValue + return checksumAlgorithm, checksum, nil } func parseChunkSignature(chunk []byte) []byte { diff --git a/weed/s3api/s3api_bucket_config.go b/weed/s3api/s3api_bucket_config.go index 00449d80a..a10374339 100644 --- a/weed/s3api/s3api_bucket_config.go +++ b/weed/s3api/s3api_bucket_config.go @@ -519,7 +519,9 @@ func (s3a *S3ApiServer) getVersioningState(bucket string) (string, error) { config, errCode := s3a.getBucketConfig(bucket) if errCode != s3err.ErrNone { if errCode == s3err.ErrNoSuchBucket { - return "", nil + // Signal to callers that the bucket does not exist so they can + // decide whether to auto-create it (e.g., in PUT handlers). + return "", filer_pb.ErrNotFound } glog.Errorf("getVersioningState: failed to get bucket config for %s: %v", bucket, errCode) return "", fmt.Errorf("failed to get bucket config: %v", errCode) diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 4e465919c..66d4ded80 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -199,7 +199,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request } // Process metadata and tags and apply to destination - processedMetadata, tagErr := processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging) + // Use dstEntry.Extended (already filtered) as the source, not entry.Extended, + // to preserve the encryption header filtering. Fixes GitHub #7562. + processedMetadata, tagErr := processMetadataBytes(r.Header, dstEntry.Extended, replaceMeta, replaceTagging) if tagErr != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource) return @@ -1543,7 +1545,7 @@ func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySo } // copyMultipartCrossEncryption handles all cross-encryption and decrypt-only copy scenarios -// This unified function supports: SSE-C↔SSE-KMS, SSE-C→Plain, SSE-KMS→Plain +// This unified function supports: SSE-C↔SSE-KMS↔SSE-S3, and any→Plain func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { var dstChunks []*filer_pb.FileChunk @@ -1552,6 +1554,7 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h var destKMSKeyID string var destKMSEncryptionContext map[string]string var destKMSBucketKeyEnabled bool + var destSSES3Key *SSES3Key if state.DstSSEC { var err error @@ -1565,7 +1568,13 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h if err != nil { return nil, nil, fmt.Errorf("failed to parse destination SSE-KMS headers: %w", err) } - } else { + } else if state.DstSSES3 { + // Generate SSE-S3 key for destination + var err error + destSSES3Key, err = GenerateSSES3Key() + if err != nil { + return nil, nil, fmt.Errorf("failed to generate SSE-S3 key: %w", err) + } } // Parse source encryption parameters @@ -1584,12 +1593,18 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h var err error if chunk.GetSseType() == filer_pb.SSEType_SSE_C { - copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state) + copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state) } else if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS { - copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state) + copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state) + } else if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 { + copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state) } else { - // Unencrypted chunk, copy directly - copiedChunk, err = s3a.copySingleChunk(chunk, dstPath) + // Unencrypted chunk - may need encryption if destination requires it + if state.DstSSEC || state.DstSSEKMS || state.DstSSES3 { + copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state) + } else { + copiedChunk, err = s3a.copySingleChunk(chunk, dstPath) + } } if err != nil { @@ -1640,6 +1655,40 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h } else { glog.Errorf("Failed to serialize SSE-KMS metadata: %v", serErr) } + } else if state.DstSSES3 && destSSES3Key != nil { + // For SSE-S3 destination, create object-level metadata + var sses3Metadata *SSES3Key + if len(dstChunks) == 0 { + // Handle 0-byte files - generate IV for metadata even though there's no content to encrypt + if entry.Attributes.FileSize != 0 { + return nil, nil, fmt.Errorf("internal error: no chunks created for non-empty SSE-S3 destination object") + } + // Generate IV for 0-byte object metadata + iv := make([]byte, s3_constants.AESBlockSize) + if _, err := io.ReadFull(rand.Reader, iv); err != nil { + return nil, nil, fmt.Errorf("generate IV for 0-byte object: %w", err) + } + destSSES3Key.IV = iv + sses3Metadata = destSSES3Key + } else { + // For non-empty objects, use the first chunk's metadata + if dstChunks[0].GetSseType() != filer_pb.SSEType_SSE_S3 || len(dstChunks[0].GetSseMetadata()) == 0 { + return nil, nil, fmt.Errorf("internal error: first chunk is missing expected SSE-S3 metadata for destination object") + } + keyManager := GetSSES3KeyManager() + var err error + sses3Metadata, err = DeserializeSSES3Metadata(dstChunks[0].GetSseMetadata(), keyManager) + if err != nil { + return nil, nil, fmt.Errorf("failed to deserialize SSE-S3 metadata from first chunk: %w", err) + } + } + // Use the derived key with its IV for object-level metadata + keyData, serErr := SerializeSSES3Metadata(sses3Metadata) + if serErr != nil { + return nil, nil, fmt.Errorf("failed to serialize SSE-S3 metadata: %w", serErr) + } + dstMetadata[s3_constants.SeaweedFSSSES3Key] = keyData + dstMetadata[s3_constants.AmzServerSideEncryption] = []byte("AES256") } // For unencrypted destination, no metadata needed (dstMetadata remains empty) @@ -1647,7 +1696,7 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h } // copyCrossEncryptionChunk handles copying a single chunk with cross-encryption support -func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) { +func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, destSSES3Key *SSES3Key, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) { // Create destination chunk dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) @@ -1747,6 +1796,30 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour previewLen = len(finalData) } + } else if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 { + // Decrypt SSE-S3 source + if len(chunk.GetSseMetadata()) == 0 { + return nil, fmt.Errorf("SSE-S3 chunk missing per-chunk metadata") + } + + keyManager := GetSSES3KeyManager() + sourceSSEKey, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager) + if err != nil { + return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err) + } + + decryptedReader, decErr := CreateSSES3DecryptedReader(bytes.NewReader(encryptedData), sourceSSEKey, sourceSSEKey.IV) + if decErr != nil { + return nil, fmt.Errorf("create SSE-S3 decrypted reader: %w", decErr) + } + + decryptedData, readErr := io.ReadAll(decryptedReader) + if readErr != nil { + return nil, fmt.Errorf("decrypt SSE-S3 chunk data: %w", readErr) + } + finalData = decryptedData + glog.V(4).Infof("Decrypted SSE-S3 chunk, size: %d", len(finalData)) + } else { // Source is unencrypted finalData = encryptedData @@ -1808,6 +1881,36 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour dstChunk.SseMetadata = kmsMetadata glog.V(4).Infof("Re-encrypted chunk with SSE-KMS") + + } else if state.DstSSES3 && destSSES3Key != nil { + // Encrypt with SSE-S3 + encryptedReader, iv, encErr := CreateSSES3EncryptedReader(bytes.NewReader(finalData), destSSES3Key) + if encErr != nil { + return nil, fmt.Errorf("create SSE-S3 encrypted reader: %w", encErr) + } + + reencryptedData, readErr := io.ReadAll(encryptedReader) + if readErr != nil { + return nil, fmt.Errorf("re-encrypt with SSE-S3: %w", readErr) + } + finalData = reencryptedData + + // Create per-chunk SSE-S3 metadata with chunk-specific IV + chunkSSEKey := &SSES3Key{ + Key: destSSES3Key.Key, + KeyID: destSSES3Key.KeyID, + Algorithm: destSSES3Key.Algorithm, + IV: iv, + } + sses3Metadata, err := SerializeSSES3Metadata(chunkSSEKey) + if err != nil { + return nil, fmt.Errorf("serialize SSE-S3 metadata: %w", err) + } + + dstChunk.SseType = filer_pb.SSEType_SSE_S3 + dstChunk.SseMetadata = sses3Metadata + + glog.V(4).Infof("Re-encrypted chunk with SSE-S3") } // For unencrypted destination, finalData remains as decrypted plaintext diff --git a/weed/s3api/s3api_object_handlers_copy_unified.go b/weed/s3api/s3api_object_handlers_copy_unified.go index 255c3eb2d..f1b4ff280 100644 --- a/weed/s3api/s3api_object_handlers_copy_unified.go +++ b/weed/s3api/s3api_object_handlers_copy_unified.go @@ -1,7 +1,6 @@ package s3api import ( - "context" "errors" "fmt" "net/http" @@ -133,9 +132,9 @@ func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Reques } if state.DstSSES3 { - // Use streaming copy for SSE-S3 encryption - chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath) - return chunks, nil, err + // Use chunk-by-chunk copy for SSE-S3 encryption (consistent with SSE-C and SSE-KMS) + glog.V(2).Infof("Plain→SSE-S3 copy: using unified multipart encrypt copy") + return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath) } return nil, nil, fmt.Errorf("unknown target encryption type") @@ -143,30 +142,18 @@ func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Reques // executeDecryptCopy handles encrypted → plain copies func (s3a *S3ApiServer) executeDecryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { - // Use unified multipart-aware decrypt copy for all encryption types - if state.SrcSSEC || state.SrcSSEKMS { + // Use unified multipart-aware decrypt copy for all encryption types (consistent chunk-by-chunk) + if state.SrcSSEC || state.SrcSSEKMS || state.SrcSSES3 { glog.V(2).Infof("Encrypted→Plain copy: using unified multipart decrypt copy") return s3a.copyMultipartCrossEncryption(entry, r, state, "", dstPath) } - if state.SrcSSES3 { - // Use streaming copy for SSE-S3 decryption - chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath) - return chunks, nil, err - } - return nil, nil, fmt.Errorf("unknown source encryption type") } // executeReencryptCopy handles encrypted → encrypted copies with different keys/methods func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { - // Check if we should use streaming copy for better performance - if s3a.shouldUseStreamingCopy(entry, state) { - chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath) - return chunks, nil, err - } - - // Fallback to chunk-by-chunk approach for compatibility + // Use chunk-by-chunk approach for all cross-encryption scenarios (consistent behavior) if state.SrcSSEC && state.DstSSEC { return s3a.copyChunksWithSSEC(entry, r) } @@ -177,83 +164,8 @@ func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Requ return chunks, dstMetadata, err } - if state.SrcSSEC && state.DstSSEKMS { - // SSE-C → SSE-KMS: use unified multipart-aware cross-encryption copy - glog.V(2).Infof("SSE-C→SSE-KMS cross-encryption copy: using unified multipart copy") - return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath) - } - - if state.SrcSSEKMS && state.DstSSEC { - // SSE-KMS → SSE-C: use unified multipart-aware cross-encryption copy - glog.V(2).Infof("SSE-KMS→SSE-C cross-encryption copy: using unified multipart copy") - return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath) - } - - // Handle SSE-S3 cross-encryption scenarios - if state.SrcSSES3 || state.DstSSES3 { - // Any scenario involving SSE-S3 uses streaming copy - chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath) - return chunks, nil, err - } - - return nil, nil, fmt.Errorf("unsupported cross-encryption scenario") -} - -// shouldUseStreamingCopy determines if streaming copy should be used -func (s3a *S3ApiServer) shouldUseStreamingCopy(entry *filer_pb.Entry, state *EncryptionState) bool { - // Use streaming copy for large files or when beneficial - fileSize := entry.Attributes.FileSize - - // Use streaming for files larger than 10MB - if fileSize > 10*1024*1024 { - return true - } - - // Check if this is a multipart encrypted object - isMultipartEncrypted := false - if state.IsSourceEncrypted() { - encryptedChunks := 0 - for _, chunk := range entry.GetChunks() { - if chunk.GetSseType() != filer_pb.SSEType_NONE { - encryptedChunks++ - } - } - isMultipartEncrypted = encryptedChunks > 1 - } - - // For multipart encrypted objects, avoid streaming copy to use per-chunk metadata approach - if isMultipartEncrypted { - glog.V(3).Infof("Multipart encrypted object detected, using chunk-by-chunk approach") - return false - } - - // Use streaming for cross-encryption scenarios (for single-part objects only) - if state.IsSourceEncrypted() && state.IsTargetEncrypted() { - srcType := s3a.getEncryptionTypeString(state.SrcSSEC, state.SrcSSEKMS, state.SrcSSES3) - dstType := s3a.getEncryptionTypeString(state.DstSSEC, state.DstSSEKMS, state.DstSSES3) - if srcType != dstType { - return true - } - } - - // Use streaming for compressed files - if isCompressedEntry(entry) { - return true - } - - // Use streaming for SSE-S3 scenarios (always) - if state.SrcSSES3 || state.DstSSES3 { - return true - } - - return false -} - -// executeStreamingReencryptCopy performs streaming re-encryption copy -func (s3a *S3ApiServer) executeStreamingReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, error) { - // Create streaming copy manager - streamingManager := NewStreamingCopyManager(s3a) - - // Execute streaming copy - return streamingManager.ExecuteStreamingCopy(context.Background(), entry, r, dstPath, state) + // All other cross-encryption scenarios use unified multipart copy + // This includes: SSE-C↔SSE-KMS, SSE-C↔SSE-S3, SSE-KMS↔SSE-S3, SSE-S3↔SSE-S3 + glog.V(2).Infof("Cross-encryption copy: using unified multipart copy") + return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath) } diff --git a/weed/s3api/s3api_object_handlers_delete.go b/weed/s3api/s3api_object_handlers_delete.go index f779a6edc..6e373bb4e 100644 --- a/weed/s3api/s3api_object_handlers_delete.go +++ b/weed/s3api/s3api_object_handlers_delete.go @@ -1,12 +1,10 @@ package s3api import ( - "context" "encoding/xml" "fmt" "io" "net/http" - "slices" "strings" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -127,22 +125,9 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque dir, name := target.DirAndName() err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - // Use operation context that won't be cancelled if request terminates - // This ensures deletion completes atomically to avoid inconsistent state - opCtx := context.WithoutCancel(r.Context()) - - if err := doDeleteEntry(client, dir, name, true, false); err != nil { - return err - } - - // Cleanup empty directories - if !s3a.option.AllowEmptyFolder && strings.LastIndex(object, "/") > 0 { - bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket) - // Recursively delete empty parent directories, stop at bucket path - filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dir), util.FullPath(bucketPath), nil) - } - - return nil + return doDeleteEntry(client, dir, name, true, false) + // Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner + // which listens to metadata events and uses consistent hashing for coordination }) if err != nil { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) @@ -222,8 +207,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h var deleteErrors []DeleteError var auditLog *s3err.AccessLog - directoriesWithDeletion := make(map[string]bool) - if s3err.Logger != nil { auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone) } @@ -245,10 +228,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h versioningConfigured := (versioningState != "") s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - // Use operation context that won't be cancelled if request terminates - // This ensures batch deletion completes atomically to avoid inconsistent state - opCtx := context.WithoutCancel(r.Context()) - // delete file entries for _, object := range deleteObjects.Objects { if object.Key == "" { @@ -357,10 +336,6 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive) if err == nil { - // Track directory for empty directory cleanup - if !s3a.option.AllowEmptyFolder { - directoriesWithDeletion[parentDirectoryPath] = true - } deletedObjects = append(deletedObjects, object) } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) { deletedObjects = append(deletedObjects, object) @@ -380,30 +355,8 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } } - // Cleanup empty directories - optimize by processing deepest first - if !s3a.option.AllowEmptyFolder && len(directoriesWithDeletion) > 0 { - bucketPath := fmt.Sprintf("%s/%s", s3a.option.BucketsPath, bucket) - - // Collect and sort directories by depth (deepest first) to avoid redundant checks - var allDirs []string - for dirPath := range directoriesWithDeletion { - allDirs = append(allDirs, dirPath) - } - // Sort by depth (deeper directories first) - slices.SortFunc(allDirs, func(a, b string) int { - return strings.Count(b, "/") - strings.Count(a, "/") - }) - - // Track already-checked directories to avoid redundant work - checked := make(map[string]bool) - for _, dirPath := range allDirs { - if !checked[dirPath] { - // Recursively delete empty parent directories, stop at bucket path - // Mark this directory and all its parents as checked during recursion - filer_pb.DoDeleteEmptyParentDirectories(opCtx, client, util.FullPath(dirPath), util.FullPath(bucketPath), checked) - } - } - } + // Note: Empty folder cleanup is now handled asynchronously by EmptyFolderCleaner + // which listens to metadata events and uses consistent hashing for coordination return nil }) diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 100796b2e..f848790de 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "net/http" + "net/url" "path/filepath" "strconv" "strings" @@ -548,6 +549,28 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader } } + // Parse and store object tags from X-Amz-Tagging header + // Fix for GitHub issue #7589: Tags sent during object upload were not being stored + if tagging := r.Header.Get(s3_constants.AmzObjectTagging); tagging != "" { + parsedTags, err := url.ParseQuery(tagging) + if err != nil { + glog.Warningf("putToFiler: Invalid S3 tag format in header '%s': %v", tagging, err) + return "", s3err.ErrInvalidTag, SSEResponseMetadata{} + } + for key, values := range parsedTags { + if len(values) > 1 { + glog.Warningf("putToFiler: Duplicate tag key '%s' in header", key) + return "", s3err.ErrInvalidTag, SSEResponseMetadata{} + } + value := "" + if len(values) > 0 { + value = values[0] + } + entry.Extended[s3_constants.AmzObjectTagging+"-"+key] = []byte(value) + } + glog.V(3).Infof("putToFiler: stored %d tags from X-Amz-Tagging header", len(parsedTags)) + } + // Set SSE-C metadata if customerKey != nil && len(sseIV) > 0 { // Store IV as RAW bytes (matches filer behavior - filer decodes base64 headers and stores raw bytes) @@ -680,9 +703,9 @@ func filerErrorToS3Error(err error) s3err.ErrorCode { if err == nil { return s3err.ErrNone } - + errString := err.Error() - + switch { case errString == constants.ErrMsgBadDigest: return s3err.ErrBadDigest diff --git a/weed/s3api/s3api_streaming_copy.go b/weed/s3api/s3api_streaming_copy.go index 457986858..94729c003 100644 --- a/weed/s3api/s3api_streaming_copy.go +++ b/weed/s3api/s3api_streaming_copy.go @@ -59,18 +59,19 @@ func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager { } } -// ExecuteStreamingCopy performs a streaming copy operation -func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, error) { +// ExecuteStreamingCopy performs a streaming copy operation and returns the encryption spec +// The encryption spec is needed for SSE-S3 to properly set destination metadata (fixes GitHub #7562) +func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, *EncryptionSpec, error) { // Create streaming copy specification spec, err := scm.createStreamingSpec(entry, r, state) if err != nil { - return nil, fmt.Errorf("create streaming spec: %w", err) + return nil, nil, fmt.Errorf("create streaming spec: %w", err) } // Create source reader from entry sourceReader, err := scm.createSourceReader(entry) if err != nil { - return nil, fmt.Errorf("create source reader: %w", err) + return nil, nil, fmt.Errorf("create source reader: %w", err) } defer sourceReader.Close() @@ -79,11 +80,16 @@ func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry // Create processing pipeline processedReader, err := scm.createProcessingPipeline(spec) if err != nil { - return nil, fmt.Errorf("create processing pipeline: %w", err) + return nil, nil, fmt.Errorf("create processing pipeline: %w", err) } // Stream to destination - return scm.streamToDestination(ctx, processedReader, spec, dstPath) + chunks, err := scm.streamToDestination(ctx, processedReader, spec, dstPath) + if err != nil { + return nil, nil, err + } + + return chunks, spec.EncryptionSpec, nil } // createStreamingSpec creates a streaming specification based on copy parameters @@ -453,8 +459,8 @@ func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.R for { n, err := reader.Read(buffer) if n > 0 { - // Create chunk for this data - chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath) + // Create chunk for this data, setting SSE type and per-chunk metadata (including chunk-specific IVs for SSE-S3) + chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath, spec.EncryptionSpec) if chunkErr != nil { return nil, fmt.Errorf("create chunk from data: %w", chunkErr) } @@ -474,7 +480,7 @@ func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.R } // createChunkFromData creates a chunk from streaming data -func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string) (*filer_pb.FileChunk, error) { +func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string, encSpec *EncryptionSpec) (*filer_pb.FileChunk, error) { // Assign new volume assignResult, err := scm.s3a.assignNewVolume(dstPath) if err != nil { @@ -487,6 +493,42 @@ func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, Size: uint64(len(data)), } + // Set SSE type and metadata on chunk if destination is encrypted + // This is critical for GetObject to know to decrypt the data - fixes GitHub #7562 + if encSpec != nil && encSpec.NeedsEncryption { + switch encSpec.DestinationType { + case EncryptionTypeSSEC: + chunk.SseType = filer_pb.SSEType_SSE_C + // SSE-C metadata is handled at object level, not per-chunk for streaming copy + case EncryptionTypeSSEKMS: + chunk.SseType = filer_pb.SSEType_SSE_KMS + // SSE-KMS metadata is handled at object level, not per-chunk for streaming copy + case EncryptionTypeSSES3: + chunk.SseType = filer_pb.SSEType_SSE_S3 + // Create per-chunk SSE-S3 metadata with chunk-specific IV + if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok { + // Calculate chunk-specific IV using base IV and chunk offset + baseIV := encSpec.DestinationIV + if len(baseIV) == 0 { + return nil, fmt.Errorf("SSE-S3 encryption requires DestinationIV to be set for chunk at offset %d", offset) + } + chunkIV, _ := calculateIVWithOffset(baseIV, offset) + // Create chunk key with the chunk-specific IV + chunkSSEKey := &SSES3Key{ + Key: sseKey.Key, + KeyID: sseKey.KeyID, + Algorithm: sseKey.Algorithm, + IV: chunkIV, + } + chunkMetadata, serErr := SerializeSSES3Metadata(chunkSSEKey) + if serErr != nil { + return nil, fmt.Errorf("failed to serialize chunk SSE-S3 metadata: %w", serErr) + } + chunk.SseMetadata = chunkMetadata + } + } + } + // Set file ID if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil { return nil, err |
