diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers_copy.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers_copy.go | 155 |
1 files changed, 149 insertions, 6 deletions
diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go index 888b38e94..18159ab17 100644 --- a/weed/s3api/s3api_object_handlers_copy.go +++ b/weed/s3api/s3api_object_handlers_copy.go @@ -1,8 +1,10 @@ package s3api import ( + "bytes" "context" "fmt" + "io" "net/http" "net/url" "strconv" @@ -160,11 +162,17 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request // Just copy the entry structure without chunks for zero-size files dstEntry.Chunks = nil } else { - // Replicate chunks for files with content - dstChunks, err := s3a.copyChunks(entry, r.URL.Path) + // Handle SSE-C copy with smart fast/slow path selection + dstChunks, err := s3a.copyChunksWithSSEC(entry, r) if err != nil { - glog.Errorf("CopyObjectHandler copy chunks error: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + glog.Errorf("CopyObjectHandler copy chunks with SSE-C error: %v", err) + // Use shared error mapping helper + errCode := MapSSECErrorToS3Error(err) + // For copy operations, if the error is not recognized, use InternalError + if errCode == s3err.ErrInvalidRequest { + errCode = s3err.ErrInternalError + } + s3err.WriteErrorResponse(w, r, errCode) return } dstEntry.Chunks = dstChunks @@ -591,7 +599,8 @@ func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, rep // copyChunks replicates chunks from source entry to destination entry func (s3a *S3ApiServer) copyChunks(entry *filer_pb.Entry, dstPath string) ([]*filer_pb.FileChunk, error) { dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks())) - executor := util.NewLimitedConcurrentExecutor(4) // Limit to 4 concurrent operations + const defaultChunkCopyConcurrency = 4 + executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency) // Limit to configurable concurrent operations errChan := make(chan error, len(entry.GetChunks())) for i, chunk := range entry.GetChunks() { @@ -777,7 +786,8 @@ func (s3a *S3ApiServer) copyChunksForRange(entry *filer_pb.Entry, startOffset, e // Copy the relevant chunks using a specialized method for range copies dstChunks := make([]*filer_pb.FileChunk, len(relevantChunks)) - executor := util.NewLimitedConcurrentExecutor(4) + const defaultChunkCopyConcurrency = 4 + executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency) errChan := make(chan error, len(relevantChunks)) // Create a map to track original chunks for each relevant chunk @@ -997,3 +1007,136 @@ func (s3a *S3ApiServer) downloadChunkData(srcUrl string, offset, size int64) ([] } return chunkData, nil } + +// copyChunksWithSSEC handles SSE-C aware copying with smart fast/slow path selection +func (s3a *S3ApiServer) copyChunksWithSSEC(entry *filer_pb.Entry, r *http.Request) ([]*filer_pb.FileChunk, error) { + // Parse SSE-C headers + copySourceKey, err := ParseSSECCopySourceHeaders(r) + if err != nil { + return nil, err + } + + destKey, err := ParseSSECHeaders(r) + if err != nil { + return nil, err + } + + // Determine copy strategy + strategy, err := DetermineSSECCopyStrategy(entry.Extended, copySourceKey, destKey) + if err != nil { + return nil, err + } + + glog.V(2).Infof("SSE-C copy strategy for %s: %v", r.URL.Path, strategy) + + switch strategy { + case SSECCopyDirect: + // FAST PATH: Direct chunk copy + glog.V(2).Infof("Using fast path: direct chunk copy for %s", r.URL.Path) + return s3a.copyChunks(entry, r.URL.Path) + + case SSECCopyReencrypt: + // SLOW PATH: Decrypt and re-encrypt + glog.V(2).Infof("Using slow path: decrypt/re-encrypt for %s", r.URL.Path) + return s3a.copyChunksWithReencryption(entry, copySourceKey, destKey, r.URL.Path) + + default: + return nil, fmt.Errorf("unknown SSE-C copy strategy: %v", strategy) + } +} + +// copyChunksWithReencryption handles the slow path: decrypt source and re-encrypt for destination +func (s3a *S3ApiServer) copyChunksWithReencryption(entry *filer_pb.Entry, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) ([]*filer_pb.FileChunk, error) { + dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks())) + const defaultChunkCopyConcurrency = 4 + executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency) // Limit to configurable concurrent operations + errChan := make(chan error, len(entry.GetChunks())) + + for i, chunk := range entry.GetChunks() { + chunkIndex := i + executor.Execute(func() { + dstChunk, err := s3a.copyChunkWithReencryption(chunk, copySourceKey, destKey, dstPath) + if err != nil { + errChan <- fmt.Errorf("chunk %d: %v", chunkIndex, err) + return + } + dstChunks[chunkIndex] = dstChunk + errChan <- nil + }) + } + + // Wait for all operations to complete and check for errors + for i := 0; i < len(entry.GetChunks()); i++ { + if err := <-errChan; err != nil { + return nil, err + } + } + + return dstChunks, nil +} + +// copyChunkWithReencryption copies a single chunk with decrypt/re-encrypt +func (s3a *S3ApiServer) copyChunkWithReencryption(chunk *filer_pb.FileChunk, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) (*filer_pb.FileChunk, error) { + // Create destination chunk + dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size) + + // Prepare chunk copy (assign new volume and get source URL) + assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath) + if err != nil { + return nil, err + } + + // Set file ID on destination chunk + if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil { + return nil, err + } + + // Download encrypted chunk data + encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size)) + if err != nil { + return nil, fmt.Errorf("download encrypted chunk data: %w", err) + } + + var finalData []byte + + // Decrypt if source is encrypted + if copySourceKey != nil { + decryptedReader, decErr := CreateSSECDecryptedReader(bytes.NewReader(encryptedData), copySourceKey) + if decErr != nil { + return nil, fmt.Errorf("create decrypted reader: %w", decErr) + } + + decryptedData, readErr := io.ReadAll(decryptedReader) + if readErr != nil { + return nil, fmt.Errorf("decrypt chunk data: %w", readErr) + } + finalData = decryptedData + } else { + // Source is unencrypted + finalData = encryptedData + } + + // Re-encrypt if destination should be encrypted + if destKey != nil { + encryptedReader, encErr := CreateSSECEncryptedReader(bytes.NewReader(finalData), destKey) + if encErr != nil { + return nil, fmt.Errorf("create encrypted reader: %w", encErr) + } + + reencryptedData, readErr := io.ReadAll(encryptedReader) + if readErr != nil { + return nil, fmt.Errorf("re-encrypt chunk data: %w", readErr) + } + finalData = reencryptedData + + // Update chunk size to include IV + dstChunk.Size = uint64(len(finalData)) + } + + // Upload the processed data + if err := s3a.uploadChunkData(finalData, assignResult); err != nil { + return nil, fmt.Errorf("upload processed chunk data: %w", err) + } + + return dstChunk, nil +} |
