aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers_copy.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers_copy.go')
-rw-r--r--weed/s3api/s3api_object_handlers_copy.go155
1 files changed, 149 insertions, 6 deletions
diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go
index 888b38e94..18159ab17 100644
--- a/weed/s3api/s3api_object_handlers_copy.go
+++ b/weed/s3api/s3api_object_handlers_copy.go
@@ -1,8 +1,10 @@
package s3api
import (
+ "bytes"
"context"
"fmt"
+ "io"
"net/http"
"net/url"
"strconv"
@@ -160,11 +162,17 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
// Just copy the entry structure without chunks for zero-size files
dstEntry.Chunks = nil
} else {
- // Replicate chunks for files with content
- dstChunks, err := s3a.copyChunks(entry, r.URL.Path)
+ // Handle SSE-C copy with smart fast/slow path selection
+ dstChunks, err := s3a.copyChunksWithSSEC(entry, r)
if err != nil {
- glog.Errorf("CopyObjectHandler copy chunks error: %v", err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ glog.Errorf("CopyObjectHandler copy chunks with SSE-C error: %v", err)
+ // Use shared error mapping helper
+ errCode := MapSSECErrorToS3Error(err)
+ // For copy operations, if the error is not recognized, use InternalError
+ if errCode == s3err.ErrInvalidRequest {
+ errCode = s3err.ErrInternalError
+ }
+ s3err.WriteErrorResponse(w, r, errCode)
return
}
dstEntry.Chunks = dstChunks
@@ -591,7 +599,8 @@ func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, rep
// copyChunks replicates chunks from source entry to destination entry
func (s3a *S3ApiServer) copyChunks(entry *filer_pb.Entry, dstPath string) ([]*filer_pb.FileChunk, error) {
dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks()))
- executor := util.NewLimitedConcurrentExecutor(4) // Limit to 4 concurrent operations
+ const defaultChunkCopyConcurrency = 4
+ executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency) // Limit to configurable concurrent operations
errChan := make(chan error, len(entry.GetChunks()))
for i, chunk := range entry.GetChunks() {
@@ -777,7 +786,8 @@ func (s3a *S3ApiServer) copyChunksForRange(entry *filer_pb.Entry, startOffset, e
// Copy the relevant chunks using a specialized method for range copies
dstChunks := make([]*filer_pb.FileChunk, len(relevantChunks))
- executor := util.NewLimitedConcurrentExecutor(4)
+ const defaultChunkCopyConcurrency = 4
+ executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency)
errChan := make(chan error, len(relevantChunks))
// Create a map to track original chunks for each relevant chunk
@@ -997,3 +1007,136 @@ func (s3a *S3ApiServer) downloadChunkData(srcUrl string, offset, size int64) ([]
}
return chunkData, nil
}
+
+// copyChunksWithSSEC handles SSE-C aware copying with smart fast/slow path selection
+func (s3a *S3ApiServer) copyChunksWithSSEC(entry *filer_pb.Entry, r *http.Request) ([]*filer_pb.FileChunk, error) {
+ // Parse SSE-C headers
+ copySourceKey, err := ParseSSECCopySourceHeaders(r)
+ if err != nil {
+ return nil, err
+ }
+
+ destKey, err := ParseSSECHeaders(r)
+ if err != nil {
+ return nil, err
+ }
+
+ // Determine copy strategy
+ strategy, err := DetermineSSECCopyStrategy(entry.Extended, copySourceKey, destKey)
+ if err != nil {
+ return nil, err
+ }
+
+ glog.V(2).Infof("SSE-C copy strategy for %s: %v", r.URL.Path, strategy)
+
+ switch strategy {
+ case SSECCopyDirect:
+ // FAST PATH: Direct chunk copy
+ glog.V(2).Infof("Using fast path: direct chunk copy for %s", r.URL.Path)
+ return s3a.copyChunks(entry, r.URL.Path)
+
+ case SSECCopyReencrypt:
+ // SLOW PATH: Decrypt and re-encrypt
+ glog.V(2).Infof("Using slow path: decrypt/re-encrypt for %s", r.URL.Path)
+ return s3a.copyChunksWithReencryption(entry, copySourceKey, destKey, r.URL.Path)
+
+ default:
+ return nil, fmt.Errorf("unknown SSE-C copy strategy: %v", strategy)
+ }
+}
+
+// copyChunksWithReencryption handles the slow path: decrypt source and re-encrypt for destination
+func (s3a *S3ApiServer) copyChunksWithReencryption(entry *filer_pb.Entry, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) ([]*filer_pb.FileChunk, error) {
+ dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks()))
+ const defaultChunkCopyConcurrency = 4
+ executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency) // Limit to configurable concurrent operations
+ errChan := make(chan error, len(entry.GetChunks()))
+
+ for i, chunk := range entry.GetChunks() {
+ chunkIndex := i
+ executor.Execute(func() {
+ dstChunk, err := s3a.copyChunkWithReencryption(chunk, copySourceKey, destKey, dstPath)
+ if err != nil {
+ errChan <- fmt.Errorf("chunk %d: %v", chunkIndex, err)
+ return
+ }
+ dstChunks[chunkIndex] = dstChunk
+ errChan <- nil
+ })
+ }
+
+ // Wait for all operations to complete and check for errors
+ for i := 0; i < len(entry.GetChunks()); i++ {
+ if err := <-errChan; err != nil {
+ return nil, err
+ }
+ }
+
+ return dstChunks, nil
+}
+
+// copyChunkWithReencryption copies a single chunk with decrypt/re-encrypt
+func (s3a *S3ApiServer) copyChunkWithReencryption(chunk *filer_pb.FileChunk, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) (*filer_pb.FileChunk, error) {
+ // Create destination chunk
+ dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
+
+ // Prepare chunk copy (assign new volume and get source URL)
+ assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath)
+ if err != nil {
+ return nil, err
+ }
+
+ // Set file ID on destination chunk
+ if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
+ return nil, err
+ }
+
+ // Download encrypted chunk data
+ encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size))
+ if err != nil {
+ return nil, fmt.Errorf("download encrypted chunk data: %w", err)
+ }
+
+ var finalData []byte
+
+ // Decrypt if source is encrypted
+ if copySourceKey != nil {
+ decryptedReader, decErr := CreateSSECDecryptedReader(bytes.NewReader(encryptedData), copySourceKey)
+ if decErr != nil {
+ return nil, fmt.Errorf("create decrypted reader: %w", decErr)
+ }
+
+ decryptedData, readErr := io.ReadAll(decryptedReader)
+ if readErr != nil {
+ return nil, fmt.Errorf("decrypt chunk data: %w", readErr)
+ }
+ finalData = decryptedData
+ } else {
+ // Source is unencrypted
+ finalData = encryptedData
+ }
+
+ // Re-encrypt if destination should be encrypted
+ if destKey != nil {
+ encryptedReader, encErr := CreateSSECEncryptedReader(bytes.NewReader(finalData), destKey)
+ if encErr != nil {
+ return nil, fmt.Errorf("create encrypted reader: %w", encErr)
+ }
+
+ reencryptedData, readErr := io.ReadAll(encryptedReader)
+ if readErr != nil {
+ return nil, fmt.Errorf("re-encrypt chunk data: %w", readErr)
+ }
+ finalData = reencryptedData
+
+ // Update chunk size to include IV
+ dstChunk.Size = uint64(len(finalData))
+ }
+
+ // Upload the processed data
+ if err := s3a.uploadChunkData(finalData, assignResult); err != nil {
+ return nil, fmt.Errorf("upload processed chunk data: %w", err)
+ }
+
+ return dstChunk, nil
+}