aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers_copy_unified.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers_copy_unified.go')
-rw-r--r--weed/s3api/s3api_object_handlers_copy_unified.go249
1 files changed, 249 insertions, 0 deletions
diff --git a/weed/s3api/s3api_object_handlers_copy_unified.go b/weed/s3api/s3api_object_handlers_copy_unified.go
new file mode 100644
index 000000000..d11594420
--- /dev/null
+++ b/weed/s3api/s3api_object_handlers_copy_unified.go
@@ -0,0 +1,249 @@
+package s3api
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
+)
+
+// executeUnifiedCopyStrategy executes the appropriate copy strategy based on encryption state
+// Returns chunks and destination metadata that should be applied to the destination entry
+func (s3a *S3ApiServer) executeUnifiedCopyStrategy(entry *filer_pb.Entry, r *http.Request, dstBucket, srcObject, dstObject string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
+ // Detect encryption state (using entry-aware detection for multipart objects)
+ srcPath := fmt.Sprintf("/%s/%s", r.Header.Get("X-Amz-Copy-Source-Bucket"), srcObject)
+ dstPath := fmt.Sprintf("/%s/%s", dstBucket, dstObject)
+ state := DetectEncryptionStateWithEntry(entry, r, srcPath, dstPath)
+
+ // Debug logging for encryption state
+
+ // Apply bucket default encryption if no explicit encryption specified
+ if !state.IsTargetEncrypted() {
+ bucketMetadata, err := s3a.getBucketMetadata(dstBucket)
+ if err == nil && bucketMetadata != nil && bucketMetadata.Encryption != nil {
+ switch bucketMetadata.Encryption.SseAlgorithm {
+ case "aws:kms":
+ state.DstSSEKMS = true
+ case "AES256":
+ state.DstSSES3 = true
+ }
+ }
+ }
+
+ // Determine copy strategy
+ strategy, err := DetermineUnifiedCopyStrategy(state, entry.Extended, r)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ glog.V(2).Infof("Unified copy strategy for %s → %s: %v", srcPath, dstPath, strategy)
+
+ // Calculate optimized sizes for the strategy
+ sizeCalc := CalculateOptimizedSizes(entry, r, strategy)
+ glog.V(2).Infof("Size calculation: src=%d, target=%d, actual=%d, overhead=%d, preallocate=%v",
+ sizeCalc.SourceSize, sizeCalc.TargetSize, sizeCalc.ActualContentSize,
+ sizeCalc.EncryptionOverhead, sizeCalc.CanPreallocate)
+
+ // Execute strategy
+ switch strategy {
+ case CopyStrategyDirect:
+ chunks, err := s3a.copyChunks(entry, dstPath)
+ return chunks, nil, err
+
+ case CopyStrategyKeyRotation:
+ return s3a.executeKeyRotation(entry, r, state)
+
+ case CopyStrategyEncrypt:
+ return s3a.executeEncryptCopy(entry, r, state, dstBucket, dstPath)
+
+ case CopyStrategyDecrypt:
+ return s3a.executeDecryptCopy(entry, r, state, dstPath)
+
+ case CopyStrategyReencrypt:
+ return s3a.executeReencryptCopy(entry, r, state, dstBucket, dstPath)
+
+ default:
+ return nil, nil, fmt.Errorf("unknown unified copy strategy: %v", strategy)
+ }
+}
+
+// mapCopyErrorToS3Error maps various copy errors to appropriate S3 error codes
+func (s3a *S3ApiServer) mapCopyErrorToS3Error(err error) s3err.ErrorCode {
+ if err == nil {
+ return s3err.ErrNone
+ }
+
+ // Check for KMS errors first
+ if kmsErr := MapKMSErrorToS3Error(err); kmsErr != s3err.ErrInvalidRequest {
+ return kmsErr
+ }
+
+ // Check for SSE-C errors
+ if ssecErr := MapSSECErrorToS3Error(err); ssecErr != s3err.ErrInvalidRequest {
+ return ssecErr
+ }
+
+ // Default to internal error for unknown errors
+ return s3err.ErrInternalError
+}
+
+// executeKeyRotation handles key rotation for same-object copies
+func (s3a *S3ApiServer) executeKeyRotation(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) ([]*filer_pb.FileChunk, map[string][]byte, error) {
+ // For key rotation, we only need to update metadata, not re-copy chunks
+ // This is a significant optimization for same-object key changes
+
+ if state.SrcSSEC && state.DstSSEC {
+ // SSE-C key rotation - need to handle new key/IV, use reencrypt logic
+ return s3a.executeReencryptCopy(entry, r, state, "", "")
+ }
+
+ if state.SrcSSEKMS && state.DstSSEKMS {
+ // SSE-KMS key rotation - return existing chunks, metadata will be updated by caller
+ return entry.GetChunks(), nil, nil
+ }
+
+ // Fallback to reencrypt if we can't do metadata-only rotation
+ return s3a.executeReencryptCopy(entry, r, state, "", "")
+}
+
+// executeEncryptCopy handles plain → encrypted copies
+func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
+ if state.DstSSEC {
+ // Use existing SSE-C copy logic
+ return s3a.copyChunksWithSSEC(entry, r)
+ }
+
+ if state.DstSSEKMS {
+ // Use existing SSE-KMS copy logic - metadata is now generated internally
+ chunks, dstMetadata, err := s3a.copyChunksWithSSEKMS(entry, r, dstBucket)
+ return chunks, dstMetadata, err
+ }
+
+ if state.DstSSES3 {
+ // Use streaming copy for SSE-S3 encryption
+ chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
+ return chunks, nil, err
+ }
+
+ return nil, nil, fmt.Errorf("unknown target encryption type")
+}
+
+// executeDecryptCopy handles encrypted → plain copies
+func (s3a *S3ApiServer) executeDecryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
+ // Use unified multipart-aware decrypt copy for all encryption types
+ if state.SrcSSEC || state.SrcSSEKMS {
+ glog.V(2).Infof("Encrypted→Plain copy: using unified multipart decrypt copy")
+ return s3a.copyMultipartCrossEncryption(entry, r, state, "", dstPath)
+ }
+
+ if state.SrcSSES3 {
+ // Use streaming copy for SSE-S3 decryption
+ chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
+ return chunks, nil, err
+ }
+
+ return nil, nil, fmt.Errorf("unknown source encryption type")
+}
+
+// executeReencryptCopy handles encrypted → encrypted copies with different keys/methods
+func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
+ // Check if we should use streaming copy for better performance
+ if s3a.shouldUseStreamingCopy(entry, state) {
+ chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
+ return chunks, nil, err
+ }
+
+ // Fallback to chunk-by-chunk approach for compatibility
+ if state.SrcSSEC && state.DstSSEC {
+ return s3a.copyChunksWithSSEC(entry, r)
+ }
+
+ if state.SrcSSEKMS && state.DstSSEKMS {
+ // Use existing SSE-KMS copy logic - metadata is now generated internally
+ chunks, dstMetadata, err := s3a.copyChunksWithSSEKMS(entry, r, dstBucket)
+ return chunks, dstMetadata, err
+ }
+
+ if state.SrcSSEC && state.DstSSEKMS {
+ // SSE-C → SSE-KMS: use unified multipart-aware cross-encryption copy
+ glog.V(2).Infof("SSE-C→SSE-KMS cross-encryption copy: using unified multipart copy")
+ return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
+ }
+
+ if state.SrcSSEKMS && state.DstSSEC {
+ // SSE-KMS → SSE-C: use unified multipart-aware cross-encryption copy
+ glog.V(2).Infof("SSE-KMS→SSE-C cross-encryption copy: using unified multipart copy")
+ return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
+ }
+
+ // Handle SSE-S3 cross-encryption scenarios
+ if state.SrcSSES3 || state.DstSSES3 {
+ // Any scenario involving SSE-S3 uses streaming copy
+ chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
+ return chunks, nil, err
+ }
+
+ return nil, nil, fmt.Errorf("unsupported cross-encryption scenario")
+}
+
+// shouldUseStreamingCopy determines if streaming copy should be used
+func (s3a *S3ApiServer) shouldUseStreamingCopy(entry *filer_pb.Entry, state *EncryptionState) bool {
+ // Use streaming copy for large files or when beneficial
+ fileSize := entry.Attributes.FileSize
+
+ // Use streaming for files larger than 10MB
+ if fileSize > 10*1024*1024 {
+ return true
+ }
+
+ // Check if this is a multipart encrypted object
+ isMultipartEncrypted := false
+ if state.IsSourceEncrypted() {
+ encryptedChunks := 0
+ for _, chunk := range entry.GetChunks() {
+ if chunk.GetSseType() != filer_pb.SSEType_NONE {
+ encryptedChunks++
+ }
+ }
+ isMultipartEncrypted = encryptedChunks > 1
+ }
+
+ // For multipart encrypted objects, avoid streaming copy to use per-chunk metadata approach
+ if isMultipartEncrypted {
+ glog.V(3).Infof("Multipart encrypted object detected, using chunk-by-chunk approach")
+ return false
+ }
+
+ // Use streaming for cross-encryption scenarios (for single-part objects only)
+ if state.IsSourceEncrypted() && state.IsTargetEncrypted() {
+ srcType := s3a.getEncryptionTypeString(state.SrcSSEC, state.SrcSSEKMS, state.SrcSSES3)
+ dstType := s3a.getEncryptionTypeString(state.DstSSEC, state.DstSSEKMS, state.DstSSES3)
+ if srcType != dstType {
+ return true
+ }
+ }
+
+ // Use streaming for compressed files
+ if isCompressedEntry(entry) {
+ return true
+ }
+
+ // Use streaming for SSE-S3 scenarios (always)
+ if state.SrcSSES3 || state.DstSSES3 {
+ return true
+ }
+
+ return false
+}
+
+// executeStreamingReencryptCopy performs streaming re-encryption copy
+func (s3a *S3ApiServer) executeStreamingReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, error) {
+ // Create streaming copy manager
+ streamingManager := NewStreamingCopyManager(s3a)
+
+ // Execute streaming copy
+ return streamingManager.ExecuteStreamingCopy(context.Background(), entry, r, dstPath, state)
+}