diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers_copy_unified.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers_copy_unified.go | 249 |
1 files changed, 249 insertions, 0 deletions
diff --git a/weed/s3api/s3api_object_handlers_copy_unified.go b/weed/s3api/s3api_object_handlers_copy_unified.go new file mode 100644 index 000000000..d11594420 --- /dev/null +++ b/weed/s3api/s3api_object_handlers_copy_unified.go @@ -0,0 +1,249 @@ +package s3api + +import ( + "context" + "fmt" + "net/http" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" +) + +// executeUnifiedCopyStrategy executes the appropriate copy strategy based on encryption state +// Returns chunks and destination metadata that should be applied to the destination entry +func (s3a *S3ApiServer) executeUnifiedCopyStrategy(entry *filer_pb.Entry, r *http.Request, dstBucket, srcObject, dstObject string) ([]*filer_pb.FileChunk, map[string][]byte, error) { + // Detect encryption state (using entry-aware detection for multipart objects) + srcPath := fmt.Sprintf("/%s/%s", r.Header.Get("X-Amz-Copy-Source-Bucket"), srcObject) + dstPath := fmt.Sprintf("/%s/%s", dstBucket, dstObject) + state := DetectEncryptionStateWithEntry(entry, r, srcPath, dstPath) + + // Debug logging for encryption state + + // Apply bucket default encryption if no explicit encryption specified + if !state.IsTargetEncrypted() { + bucketMetadata, err := s3a.getBucketMetadata(dstBucket) + if err == nil && bucketMetadata != nil && bucketMetadata.Encryption != nil { + switch bucketMetadata.Encryption.SseAlgorithm { + case "aws:kms": + state.DstSSEKMS = true + case "AES256": + state.DstSSES3 = true + } + } + } + + // Determine copy strategy + strategy, err := DetermineUnifiedCopyStrategy(state, entry.Extended, r) + if err != nil { + return nil, nil, err + } + + glog.V(2).Infof("Unified copy strategy for %s → %s: %v", srcPath, dstPath, strategy) + + // Calculate optimized sizes for the strategy + sizeCalc := CalculateOptimizedSizes(entry, r, strategy) + glog.V(2).Infof("Size calculation: src=%d, target=%d, actual=%d, overhead=%d, preallocate=%v", + sizeCalc.SourceSize, sizeCalc.TargetSize, sizeCalc.ActualContentSize, + sizeCalc.EncryptionOverhead, sizeCalc.CanPreallocate) + + // Execute strategy + switch strategy { + case CopyStrategyDirect: + chunks, err := s3a.copyChunks(entry, dstPath) + return chunks, nil, err + + case CopyStrategyKeyRotation: + return s3a.executeKeyRotation(entry, r, state) + + case CopyStrategyEncrypt: + return s3a.executeEncryptCopy(entry, r, state, dstBucket, dstPath) + + case CopyStrategyDecrypt: + return s3a.executeDecryptCopy(entry, r, state, dstPath) + + case CopyStrategyReencrypt: + return s3a.executeReencryptCopy(entry, r, state, dstBucket, dstPath) + + default: + return nil, nil, fmt.Errorf("unknown unified copy strategy: %v", strategy) + } +} + +// mapCopyErrorToS3Error maps various copy errors to appropriate S3 error codes +func (s3a *S3ApiServer) mapCopyErrorToS3Error(err error) s3err.ErrorCode { + if err == nil { + return s3err.ErrNone + } + + // Check for KMS errors first + if kmsErr := MapKMSErrorToS3Error(err); kmsErr != s3err.ErrInvalidRequest { + return kmsErr + } + + // Check for SSE-C errors + if ssecErr := MapSSECErrorToS3Error(err); ssecErr != s3err.ErrInvalidRequest { + return ssecErr + } + + // Default to internal error for unknown errors + return s3err.ErrInternalError +} + +// executeKeyRotation handles key rotation for same-object copies +func (s3a *S3ApiServer) executeKeyRotation(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) ([]*filer_pb.FileChunk, map[string][]byte, error) { + // For key rotation, we only need to update metadata, not re-copy chunks + // This is a significant optimization for same-object key changes + + if state.SrcSSEC && state.DstSSEC { + // SSE-C key rotation - need to handle new key/IV, use reencrypt logic + return s3a.executeReencryptCopy(entry, r, state, "", "") + } + + if state.SrcSSEKMS && state.DstSSEKMS { + // SSE-KMS key rotation - return existing chunks, metadata will be updated by caller + return entry.GetChunks(), nil, nil + } + + // Fallback to reencrypt if we can't do metadata-only rotation + return s3a.executeReencryptCopy(entry, r, state, "", "") +} + +// executeEncryptCopy handles plain → encrypted copies +func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { + if state.DstSSEC { + // Use existing SSE-C copy logic + return s3a.copyChunksWithSSEC(entry, r) + } + + if state.DstSSEKMS { + // Use existing SSE-KMS copy logic - metadata is now generated internally + chunks, dstMetadata, err := s3a.copyChunksWithSSEKMS(entry, r, dstBucket) + return chunks, dstMetadata, err + } + + if state.DstSSES3 { + // Use streaming copy for SSE-S3 encryption + chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath) + return chunks, nil, err + } + + return nil, nil, fmt.Errorf("unknown target encryption type") +} + +// executeDecryptCopy handles encrypted → plain copies +func (s3a *S3ApiServer) executeDecryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { + // Use unified multipart-aware decrypt copy for all encryption types + if state.SrcSSEC || state.SrcSSEKMS { + glog.V(2).Infof("Encrypted→Plain copy: using unified multipart decrypt copy") + return s3a.copyMultipartCrossEncryption(entry, r, state, "", dstPath) + } + + if state.SrcSSES3 { + // Use streaming copy for SSE-S3 decryption + chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath) + return chunks, nil, err + } + + return nil, nil, fmt.Errorf("unknown source encryption type") +} + +// executeReencryptCopy handles encrypted → encrypted copies with different keys/methods +func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) { + // Check if we should use streaming copy for better performance + if s3a.shouldUseStreamingCopy(entry, state) { + chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath) + return chunks, nil, err + } + + // Fallback to chunk-by-chunk approach for compatibility + if state.SrcSSEC && state.DstSSEC { + return s3a.copyChunksWithSSEC(entry, r) + } + + if state.SrcSSEKMS && state.DstSSEKMS { + // Use existing SSE-KMS copy logic - metadata is now generated internally + chunks, dstMetadata, err := s3a.copyChunksWithSSEKMS(entry, r, dstBucket) + return chunks, dstMetadata, err + } + + if state.SrcSSEC && state.DstSSEKMS { + // SSE-C → SSE-KMS: use unified multipart-aware cross-encryption copy + glog.V(2).Infof("SSE-C→SSE-KMS cross-encryption copy: using unified multipart copy") + return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath) + } + + if state.SrcSSEKMS && state.DstSSEC { + // SSE-KMS → SSE-C: use unified multipart-aware cross-encryption copy + glog.V(2).Infof("SSE-KMS→SSE-C cross-encryption copy: using unified multipart copy") + return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath) + } + + // Handle SSE-S3 cross-encryption scenarios + if state.SrcSSES3 || state.DstSSES3 { + // Any scenario involving SSE-S3 uses streaming copy + chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath) + return chunks, nil, err + } + + return nil, nil, fmt.Errorf("unsupported cross-encryption scenario") +} + +// shouldUseStreamingCopy determines if streaming copy should be used +func (s3a *S3ApiServer) shouldUseStreamingCopy(entry *filer_pb.Entry, state *EncryptionState) bool { + // Use streaming copy for large files or when beneficial + fileSize := entry.Attributes.FileSize + + // Use streaming for files larger than 10MB + if fileSize > 10*1024*1024 { + return true + } + + // Check if this is a multipart encrypted object + isMultipartEncrypted := false + if state.IsSourceEncrypted() { + encryptedChunks := 0 + for _, chunk := range entry.GetChunks() { + if chunk.GetSseType() != filer_pb.SSEType_NONE { + encryptedChunks++ + } + } + isMultipartEncrypted = encryptedChunks > 1 + } + + // For multipart encrypted objects, avoid streaming copy to use per-chunk metadata approach + if isMultipartEncrypted { + glog.V(3).Infof("Multipart encrypted object detected, using chunk-by-chunk approach") + return false + } + + // Use streaming for cross-encryption scenarios (for single-part objects only) + if state.IsSourceEncrypted() && state.IsTargetEncrypted() { + srcType := s3a.getEncryptionTypeString(state.SrcSSEC, state.SrcSSEKMS, state.SrcSSES3) + dstType := s3a.getEncryptionTypeString(state.DstSSEC, state.DstSSEKMS, state.DstSSES3) + if srcType != dstType { + return true + } + } + + // Use streaming for compressed files + if isCompressedEntry(entry) { + return true + } + + // Use streaming for SSE-S3 scenarios (always) + if state.SrcSSES3 || state.DstSSES3 { + return true + } + + return false +} + +// executeStreamingReencryptCopy performs streaming re-encryption copy +func (s3a *S3ApiServer) executeStreamingReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, error) { + // Create streaming copy manager + streamingManager := NewStreamingCopyManager(s3a) + + // Execute streaming copy + return streamingManager.ExecuteStreamingCopy(context.Background(), entry, r, dstPath, state) +} |
