diff options
Diffstat (limited to 'weed/s3api/s3api_streaming_copy.go')
| -rw-r--r-- | weed/s3api/s3api_streaming_copy.go | 561 |
1 files changed, 561 insertions, 0 deletions
diff --git a/weed/s3api/s3api_streaming_copy.go b/weed/s3api/s3api_streaming_copy.go new file mode 100644 index 000000000..c996e6188 --- /dev/null +++ b/weed/s3api/s3api_streaming_copy.go @@ -0,0 +1,561 @@ +package s3api + +import ( + "context" + "crypto/md5" + "crypto/sha256" + "encoding/hex" + "fmt" + "hash" + "io" + "net/http" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// StreamingCopySpec defines the specification for streaming copy operations +type StreamingCopySpec struct { + SourceReader io.Reader + TargetSize int64 + EncryptionSpec *EncryptionSpec + CompressionSpec *CompressionSpec + HashCalculation bool + BufferSize int +} + +// EncryptionSpec defines encryption parameters for streaming +type EncryptionSpec struct { + NeedsDecryption bool + NeedsEncryption bool + SourceKey interface{} // SSECustomerKey or SSEKMSKey + DestinationKey interface{} // SSECustomerKey or SSEKMSKey + SourceType EncryptionType + DestinationType EncryptionType + SourceMetadata map[string][]byte // Source metadata for IV extraction + DestinationIV []byte // Generated IV for destination +} + +// CompressionSpec defines compression parameters for streaming +type CompressionSpec struct { + IsCompressed bool + CompressionType string + NeedsDecompression bool + NeedsCompression bool +} + +// StreamingCopyManager handles streaming copy operations +type StreamingCopyManager struct { + s3a *S3ApiServer + bufferSize int +} + +// NewStreamingCopyManager creates a new streaming copy manager +func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager { + return &StreamingCopyManager{ + s3a: s3a, + bufferSize: 64 * 1024, // 64KB default buffer + } +} + +// ExecuteStreamingCopy performs a streaming copy operation +func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, error) { + // Create streaming copy specification + spec, err := scm.createStreamingSpec(entry, r, state) + if err != nil { + return nil, fmt.Errorf("create streaming spec: %w", err) + } + + // Create source reader from entry + sourceReader, err := scm.createSourceReader(entry) + if err != nil { + return nil, fmt.Errorf("create source reader: %w", err) + } + defer sourceReader.Close() + + spec.SourceReader = sourceReader + + // Create processing pipeline + processedReader, err := scm.createProcessingPipeline(spec) + if err != nil { + return nil, fmt.Errorf("create processing pipeline: %w", err) + } + + // Stream to destination + return scm.streamToDestination(ctx, processedReader, spec, dstPath) +} + +// createStreamingSpec creates a streaming specification based on copy parameters +func (scm *StreamingCopyManager) createStreamingSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*StreamingCopySpec, error) { + spec := &StreamingCopySpec{ + BufferSize: scm.bufferSize, + HashCalculation: true, + } + + // Calculate target size + sizeCalc := NewCopySizeCalculator(entry, r) + spec.TargetSize = sizeCalc.CalculateTargetSize() + + // Create encryption specification + encSpec, err := scm.createEncryptionSpec(entry, r, state) + if err != nil { + return nil, err + } + spec.EncryptionSpec = encSpec + + // Create compression specification + spec.CompressionSpec = scm.createCompressionSpec(entry, r) + + return spec, nil +} + +// createEncryptionSpec creates encryption specification for streaming +func (scm *StreamingCopyManager) createEncryptionSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*EncryptionSpec, error) { + spec := &EncryptionSpec{ + NeedsDecryption: state.IsSourceEncrypted(), + NeedsEncryption: state.IsTargetEncrypted(), + SourceMetadata: entry.Extended, // Pass source metadata for IV extraction + } + + // Set source encryption details + if state.SrcSSEC { + spec.SourceType = EncryptionTypeSSEC + sourceKey, err := ParseSSECCopySourceHeaders(r) + if err != nil { + return nil, fmt.Errorf("parse SSE-C copy source headers: %w", err) + } + spec.SourceKey = sourceKey + } else if state.SrcSSEKMS { + spec.SourceType = EncryptionTypeSSEKMS + // Extract SSE-KMS key from metadata + if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists { + sseKey, err := DeserializeSSEKMSMetadata(keyData) + if err != nil { + return nil, fmt.Errorf("deserialize SSE-KMS metadata: %w", err) + } + spec.SourceKey = sseKey + } + } else if state.SrcSSES3 { + spec.SourceType = EncryptionTypeSSES3 + // Extract SSE-S3 key from metadata + if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]; exists { + // TODO: This should use a proper SSE-S3 key manager from S3ApiServer + // For now, create a temporary key manager to handle deserialization + tempKeyManager := NewSSES3KeyManager() + sseKey, err := DeserializeSSES3Metadata(keyData, tempKeyManager) + if err != nil { + return nil, fmt.Errorf("deserialize SSE-S3 metadata: %w", err) + } + spec.SourceKey = sseKey + } + } + + // Set destination encryption details + if state.DstSSEC { + spec.DestinationType = EncryptionTypeSSEC + destKey, err := ParseSSECHeaders(r) + if err != nil { + return nil, fmt.Errorf("parse SSE-C headers: %w", err) + } + spec.DestinationKey = destKey + } else if state.DstSSEKMS { + spec.DestinationType = EncryptionTypeSSEKMS + // Parse KMS parameters + keyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r) + if err != nil { + return nil, fmt.Errorf("parse SSE-KMS copy headers: %w", err) + } + + // Create SSE-KMS key for destination + sseKey := &SSEKMSKey{ + KeyID: keyID, + EncryptionContext: encryptionContext, + BucketKeyEnabled: bucketKeyEnabled, + } + spec.DestinationKey = sseKey + } else if state.DstSSES3 { + spec.DestinationType = EncryptionTypeSSES3 + // Generate or retrieve SSE-S3 key + keyManager := GetSSES3KeyManager() + sseKey, err := keyManager.GetOrCreateKey("") + if err != nil { + return nil, fmt.Errorf("get SSE-S3 key: %w", err) + } + spec.DestinationKey = sseKey + } + + return spec, nil +} + +// createCompressionSpec creates compression specification for streaming +func (scm *StreamingCopyManager) createCompressionSpec(entry *filer_pb.Entry, r *http.Request) *CompressionSpec { + return &CompressionSpec{ + IsCompressed: isCompressedEntry(entry), + // For now, we don't change compression during copy + NeedsDecompression: false, + NeedsCompression: false, + } +} + +// createSourceReader creates a reader for the source entry +func (scm *StreamingCopyManager) createSourceReader(entry *filer_pb.Entry) (io.ReadCloser, error) { + // Create a multi-chunk reader that streams from all chunks + return scm.s3a.createMultiChunkReader(entry) +} + +// createProcessingPipeline creates a processing pipeline for the copy operation +func (scm *StreamingCopyManager) createProcessingPipeline(spec *StreamingCopySpec) (io.Reader, error) { + reader := spec.SourceReader + + // Add decryption if needed + if spec.EncryptionSpec.NeedsDecryption { + decryptedReader, err := scm.createDecryptionReader(reader, spec.EncryptionSpec) + if err != nil { + return nil, fmt.Errorf("create decryption reader: %w", err) + } + reader = decryptedReader + } + + // Add decompression if needed + if spec.CompressionSpec.NeedsDecompression { + decompressedReader, err := scm.createDecompressionReader(reader, spec.CompressionSpec) + if err != nil { + return nil, fmt.Errorf("create decompression reader: %w", err) + } + reader = decompressedReader + } + + // Add compression if needed + if spec.CompressionSpec.NeedsCompression { + compressedReader, err := scm.createCompressionReader(reader, spec.CompressionSpec) + if err != nil { + return nil, fmt.Errorf("create compression reader: %w", err) + } + reader = compressedReader + } + + // Add encryption if needed + if spec.EncryptionSpec.NeedsEncryption { + encryptedReader, err := scm.createEncryptionReader(reader, spec.EncryptionSpec) + if err != nil { + return nil, fmt.Errorf("create encryption reader: %w", err) + } + reader = encryptedReader + } + + // Add hash calculation if needed + if spec.HashCalculation { + reader = scm.createHashReader(reader) + } + + return reader, nil +} + +// createDecryptionReader creates a decryption reader based on encryption type +func (scm *StreamingCopyManager) createDecryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) { + switch encSpec.SourceType { + case EncryptionTypeSSEC: + if sourceKey, ok := encSpec.SourceKey.(*SSECustomerKey); ok { + // Get IV from metadata + iv, err := GetIVFromMetadata(encSpec.SourceMetadata) + if err != nil { + return nil, fmt.Errorf("get IV from metadata: %w", err) + } + return CreateSSECDecryptedReader(reader, sourceKey, iv) + } + return nil, fmt.Errorf("invalid SSE-C source key type") + + case EncryptionTypeSSEKMS: + if sseKey, ok := encSpec.SourceKey.(*SSEKMSKey); ok { + return CreateSSEKMSDecryptedReader(reader, sseKey) + } + return nil, fmt.Errorf("invalid SSE-KMS source key type") + + case EncryptionTypeSSES3: + if sseKey, ok := encSpec.SourceKey.(*SSES3Key); ok { + // Get IV from metadata + iv, err := GetIVFromMetadata(encSpec.SourceMetadata) + if err != nil { + return nil, fmt.Errorf("get IV from metadata: %w", err) + } + return CreateSSES3DecryptedReader(reader, sseKey, iv) + } + return nil, fmt.Errorf("invalid SSE-S3 source key type") + + default: + return reader, nil + } +} + +// createEncryptionReader creates an encryption reader based on encryption type +func (scm *StreamingCopyManager) createEncryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) { + switch encSpec.DestinationType { + case EncryptionTypeSSEC: + if destKey, ok := encSpec.DestinationKey.(*SSECustomerKey); ok { + encryptedReader, iv, err := CreateSSECEncryptedReader(reader, destKey) + if err != nil { + return nil, err + } + // Store IV in destination metadata (this would need to be handled by caller) + encSpec.DestinationIV = iv + return encryptedReader, nil + } + return nil, fmt.Errorf("invalid SSE-C destination key type") + + case EncryptionTypeSSEKMS: + if sseKey, ok := encSpec.DestinationKey.(*SSEKMSKey); ok { + encryptedReader, updatedKey, err := CreateSSEKMSEncryptedReaderWithBucketKey(reader, sseKey.KeyID, sseKey.EncryptionContext, sseKey.BucketKeyEnabled) + if err != nil { + return nil, err + } + // Store IV from the updated key + encSpec.DestinationIV = updatedKey.IV + return encryptedReader, nil + } + return nil, fmt.Errorf("invalid SSE-KMS destination key type") + + case EncryptionTypeSSES3: + if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok { + encryptedReader, iv, err := CreateSSES3EncryptedReader(reader, sseKey) + if err != nil { + return nil, err + } + // Store IV for metadata + encSpec.DestinationIV = iv + return encryptedReader, nil + } + return nil, fmt.Errorf("invalid SSE-S3 destination key type") + + default: + return reader, nil + } +} + +// createDecompressionReader creates a decompression reader +func (scm *StreamingCopyManager) createDecompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) { + if !compSpec.NeedsDecompression { + return reader, nil + } + + switch compSpec.CompressionType { + case "gzip": + // Use SeaweedFS's streaming gzip decompression + pr, pw := io.Pipe() + go func() { + defer pw.Close() + _, err := util.GunzipStream(pw, reader) + if err != nil { + pw.CloseWithError(fmt.Errorf("gzip decompression failed: %v", err)) + } + }() + return pr, nil + default: + // Unknown compression type, return as-is + return reader, nil + } +} + +// createCompressionReader creates a compression reader +func (scm *StreamingCopyManager) createCompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) { + if !compSpec.NeedsCompression { + return reader, nil + } + + switch compSpec.CompressionType { + case "gzip": + // Use SeaweedFS's streaming gzip compression + pr, pw := io.Pipe() + go func() { + defer pw.Close() + _, err := util.GzipStream(pw, reader) + if err != nil { + pw.CloseWithError(fmt.Errorf("gzip compression failed: %v", err)) + } + }() + return pr, nil + default: + // Unknown compression type, return as-is + return reader, nil + } +} + +// HashReader wraps an io.Reader to calculate MD5 and SHA256 hashes +type HashReader struct { + reader io.Reader + md5Hash hash.Hash + sha256Hash hash.Hash +} + +// NewHashReader creates a new hash calculating reader +func NewHashReader(reader io.Reader) *HashReader { + return &HashReader{ + reader: reader, + md5Hash: md5.New(), + sha256Hash: sha256.New(), + } +} + +// Read implements io.Reader and calculates hashes as data flows through +func (hr *HashReader) Read(p []byte) (n int, err error) { + n, err = hr.reader.Read(p) + if n > 0 { + // Update both hashes with the data read + hr.md5Hash.Write(p[:n]) + hr.sha256Hash.Write(p[:n]) + } + return n, err +} + +// MD5Sum returns the current MD5 hash +func (hr *HashReader) MD5Sum() []byte { + return hr.md5Hash.Sum(nil) +} + +// SHA256Sum returns the current SHA256 hash +func (hr *HashReader) SHA256Sum() []byte { + return hr.sha256Hash.Sum(nil) +} + +// MD5Hex returns the MD5 hash as a hex string +func (hr *HashReader) MD5Hex() string { + return hex.EncodeToString(hr.MD5Sum()) +} + +// SHA256Hex returns the SHA256 hash as a hex string +func (hr *HashReader) SHA256Hex() string { + return hex.EncodeToString(hr.SHA256Sum()) +} + +// createHashReader creates a hash calculation reader +func (scm *StreamingCopyManager) createHashReader(reader io.Reader) io.Reader { + return NewHashReader(reader) +} + +// streamToDestination streams the processed data to the destination +func (scm *StreamingCopyManager) streamToDestination(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) { + // For now, we'll use the existing chunk-based approach + // In a full implementation, this would stream directly to the destination + // without creating intermediate chunks + + // This is a placeholder that converts back to chunk-based approach + // A full streaming implementation would write directly to the destination + return scm.streamToChunks(ctx, reader, spec, dstPath) +} + +// streamToChunks converts streaming data back to chunks (temporary implementation) +func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) { + // This is a simplified implementation that reads the stream and creates chunks + // A full implementation would be more sophisticated + + var chunks []*filer_pb.FileChunk + buffer := make([]byte, spec.BufferSize) + offset := int64(0) + + for { + n, err := reader.Read(buffer) + if n > 0 { + // Create chunk for this data + chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath) + if chunkErr != nil { + return nil, fmt.Errorf("create chunk from data: %w", chunkErr) + } + chunks = append(chunks, chunk) + offset += int64(n) + } + + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("read stream: %w", err) + } + } + + return chunks, nil +} + +// createChunkFromData creates a chunk from streaming data +func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string) (*filer_pb.FileChunk, error) { + // Assign new volume + assignResult, err := scm.s3a.assignNewVolume(dstPath) + if err != nil { + return nil, fmt.Errorf("assign volume: %w", err) + } + + // Create chunk + chunk := &filer_pb.FileChunk{ + Offset: offset, + Size: uint64(len(data)), + } + + // Set file ID + if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil { + return nil, err + } + + // Upload data + if err := scm.s3a.uploadChunkData(data, assignResult); err != nil { + return nil, fmt.Errorf("upload chunk data: %w", err) + } + + return chunk, nil +} + +// createMultiChunkReader creates a reader that streams from multiple chunks +func (s3a *S3ApiServer) createMultiChunkReader(entry *filer_pb.Entry) (io.ReadCloser, error) { + // Create a multi-reader that combines all chunks + var readers []io.Reader + + for _, chunk := range entry.GetChunks() { + chunkReader, err := s3a.createChunkReader(chunk) + if err != nil { + return nil, fmt.Errorf("create chunk reader: %w", err) + } + readers = append(readers, chunkReader) + } + + multiReader := io.MultiReader(readers...) + return &multiReadCloser{reader: multiReader}, nil +} + +// createChunkReader creates a reader for a single chunk +func (s3a *S3ApiServer) createChunkReader(chunk *filer_pb.FileChunk) (io.Reader, error) { + // Get chunk URL + srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString()) + if err != nil { + return nil, fmt.Errorf("lookup volume URL: %w", err) + } + + // Create HTTP request for chunk data + req, err := http.NewRequest("GET", srcUrl, nil) + if err != nil { + return nil, fmt.Errorf("create HTTP request: %w", err) + } + + // Execute request + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("execute HTTP request: %w", err) + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return nil, fmt.Errorf("HTTP request failed: %d", resp.StatusCode) + } + + return resp.Body, nil +} + +// multiReadCloser wraps a multi-reader with a close method +type multiReadCloser struct { + reader io.Reader +} + +func (mrc *multiReadCloser) Read(p []byte) (int, error) { + return mrc.reader.Read(p) +} + +func (mrc *multiReadCloser) Close() error { + return nil +} |
