aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_streaming_copy.go
blob: c996e61889d1762d86a5928265b13ac215e01e44 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
package s3api

import (
	"context"
	"crypto/md5"
	"crypto/sha256"
	"encoding/hex"
	"fmt"
	"hash"
	"io"
	"net/http"

	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
	"github.com/seaweedfs/seaweedfs/weed/util"
)

// StreamingCopySpec defines the specification for streaming copy operations
type StreamingCopySpec struct {
	SourceReader    io.Reader
	TargetSize      int64
	EncryptionSpec  *EncryptionSpec
	CompressionSpec *CompressionSpec
	HashCalculation bool
	BufferSize      int
}

// EncryptionSpec defines encryption parameters for streaming
type EncryptionSpec struct {
	NeedsDecryption bool
	NeedsEncryption bool
	SourceKey       interface{} // SSECustomerKey or SSEKMSKey
	DestinationKey  interface{} // SSECustomerKey or SSEKMSKey
	SourceType      EncryptionType
	DestinationType EncryptionType
	SourceMetadata  map[string][]byte // Source metadata for IV extraction
	DestinationIV   []byte            // Generated IV for destination
}

// CompressionSpec defines compression parameters for streaming
type CompressionSpec struct {
	IsCompressed       bool
	CompressionType    string
	NeedsDecompression bool
	NeedsCompression   bool
}

// StreamingCopyManager handles streaming copy operations
type StreamingCopyManager struct {
	s3a        *S3ApiServer
	bufferSize int
}

// NewStreamingCopyManager creates a new streaming copy manager
func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager {
	return &StreamingCopyManager{
		s3a:        s3a,
		bufferSize: 64 * 1024, // 64KB default buffer
	}
}

// ExecuteStreamingCopy performs a streaming copy operation
func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, error) {
	// Create streaming copy specification
	spec, err := scm.createStreamingSpec(entry, r, state)
	if err != nil {
		return nil, fmt.Errorf("create streaming spec: %w", err)
	}

	// Create source reader from entry
	sourceReader, err := scm.createSourceReader(entry)
	if err != nil {
		return nil, fmt.Errorf("create source reader: %w", err)
	}
	defer sourceReader.Close()

	spec.SourceReader = sourceReader

	// Create processing pipeline
	processedReader, err := scm.createProcessingPipeline(spec)
	if err != nil {
		return nil, fmt.Errorf("create processing pipeline: %w", err)
	}

	// Stream to destination
	return scm.streamToDestination(ctx, processedReader, spec, dstPath)
}

// createStreamingSpec creates a streaming specification based on copy parameters
func (scm *StreamingCopyManager) createStreamingSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*StreamingCopySpec, error) {
	spec := &StreamingCopySpec{
		BufferSize:      scm.bufferSize,
		HashCalculation: true,
	}

	// Calculate target size
	sizeCalc := NewCopySizeCalculator(entry, r)
	spec.TargetSize = sizeCalc.CalculateTargetSize()

	// Create encryption specification
	encSpec, err := scm.createEncryptionSpec(entry, r, state)
	if err != nil {
		return nil, err
	}
	spec.EncryptionSpec = encSpec

	// Create compression specification
	spec.CompressionSpec = scm.createCompressionSpec(entry, r)

	return spec, nil
}

// createEncryptionSpec creates encryption specification for streaming
func (scm *StreamingCopyManager) createEncryptionSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*EncryptionSpec, error) {
	spec := &EncryptionSpec{
		NeedsDecryption: state.IsSourceEncrypted(),
		NeedsEncryption: state.IsTargetEncrypted(),
		SourceMetadata:  entry.Extended, // Pass source metadata for IV extraction
	}

	// Set source encryption details
	if state.SrcSSEC {
		spec.SourceType = EncryptionTypeSSEC
		sourceKey, err := ParseSSECCopySourceHeaders(r)
		if err != nil {
			return nil, fmt.Errorf("parse SSE-C copy source headers: %w", err)
		}
		spec.SourceKey = sourceKey
	} else if state.SrcSSEKMS {
		spec.SourceType = EncryptionTypeSSEKMS
		// Extract SSE-KMS key from metadata
		if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
			sseKey, err := DeserializeSSEKMSMetadata(keyData)
			if err != nil {
				return nil, fmt.Errorf("deserialize SSE-KMS metadata: %w", err)
			}
			spec.SourceKey = sseKey
		}
	} else if state.SrcSSES3 {
		spec.SourceType = EncryptionTypeSSES3
		// Extract SSE-S3 key from metadata
		if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]; exists {
			// TODO: This should use a proper SSE-S3 key manager from S3ApiServer
			// For now, create a temporary key manager to handle deserialization
			tempKeyManager := NewSSES3KeyManager()
			sseKey, err := DeserializeSSES3Metadata(keyData, tempKeyManager)
			if err != nil {
				return nil, fmt.Errorf("deserialize SSE-S3 metadata: %w", err)
			}
			spec.SourceKey = sseKey
		}
	}

	// Set destination encryption details
	if state.DstSSEC {
		spec.DestinationType = EncryptionTypeSSEC
		destKey, err := ParseSSECHeaders(r)
		if err != nil {
			return nil, fmt.Errorf("parse SSE-C headers: %w", err)
		}
		spec.DestinationKey = destKey
	} else if state.DstSSEKMS {
		spec.DestinationType = EncryptionTypeSSEKMS
		// Parse KMS parameters
		keyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r)
		if err != nil {
			return nil, fmt.Errorf("parse SSE-KMS copy headers: %w", err)
		}

		// Create SSE-KMS key for destination
		sseKey := &SSEKMSKey{
			KeyID:             keyID,
			EncryptionContext: encryptionContext,
			BucketKeyEnabled:  bucketKeyEnabled,
		}
		spec.DestinationKey = sseKey
	} else if state.DstSSES3 {
		spec.DestinationType = EncryptionTypeSSES3
		// Generate or retrieve SSE-S3 key
		keyManager := GetSSES3KeyManager()
		sseKey, err := keyManager.GetOrCreateKey("")
		if err != nil {
			return nil, fmt.Errorf("get SSE-S3 key: %w", err)
		}
		spec.DestinationKey = sseKey
	}

	return spec, nil
}

// createCompressionSpec creates compression specification for streaming
func (scm *StreamingCopyManager) createCompressionSpec(entry *filer_pb.Entry, r *http.Request) *CompressionSpec {
	return &CompressionSpec{
		IsCompressed: isCompressedEntry(entry),
		// For now, we don't change compression during copy
		NeedsDecompression: false,
		NeedsCompression:   false,
	}
}

// createSourceReader creates a reader for the source entry
func (scm *StreamingCopyManager) createSourceReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
	// Create a multi-chunk reader that streams from all chunks
	return scm.s3a.createMultiChunkReader(entry)
}

// createProcessingPipeline creates a processing pipeline for the copy operation
func (scm *StreamingCopyManager) createProcessingPipeline(spec *StreamingCopySpec) (io.Reader, error) {
	reader := spec.SourceReader

	// Add decryption if needed
	if spec.EncryptionSpec.NeedsDecryption {
		decryptedReader, err := scm.createDecryptionReader(reader, spec.EncryptionSpec)
		if err != nil {
			return nil, fmt.Errorf("create decryption reader: %w", err)
		}
		reader = decryptedReader
	}

	// Add decompression if needed
	if spec.CompressionSpec.NeedsDecompression {
		decompressedReader, err := scm.createDecompressionReader(reader, spec.CompressionSpec)
		if err != nil {
			return nil, fmt.Errorf("create decompression reader: %w", err)
		}
		reader = decompressedReader
	}

	// Add compression if needed
	if spec.CompressionSpec.NeedsCompression {
		compressedReader, err := scm.createCompressionReader(reader, spec.CompressionSpec)
		if err != nil {
			return nil, fmt.Errorf("create compression reader: %w", err)
		}
		reader = compressedReader
	}

	// Add encryption if needed
	if spec.EncryptionSpec.NeedsEncryption {
		encryptedReader, err := scm.createEncryptionReader(reader, spec.EncryptionSpec)
		if err != nil {
			return nil, fmt.Errorf("create encryption reader: %w", err)
		}
		reader = encryptedReader
	}

	// Add hash calculation if needed
	if spec.HashCalculation {
		reader = scm.createHashReader(reader)
	}

	return reader, nil
}

// createDecryptionReader creates a decryption reader based on encryption type
func (scm *StreamingCopyManager) createDecryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
	switch encSpec.SourceType {
	case EncryptionTypeSSEC:
		if sourceKey, ok := encSpec.SourceKey.(*SSECustomerKey); ok {
			// Get IV from metadata
			iv, err := GetIVFromMetadata(encSpec.SourceMetadata)
			if err != nil {
				return nil, fmt.Errorf("get IV from metadata: %w", err)
			}
			return CreateSSECDecryptedReader(reader, sourceKey, iv)
		}
		return nil, fmt.Errorf("invalid SSE-C source key type")

	case EncryptionTypeSSEKMS:
		if sseKey, ok := encSpec.SourceKey.(*SSEKMSKey); ok {
			return CreateSSEKMSDecryptedReader(reader, sseKey)
		}
		return nil, fmt.Errorf("invalid SSE-KMS source key type")

	case EncryptionTypeSSES3:
		if sseKey, ok := encSpec.SourceKey.(*SSES3Key); ok {
			// Get IV from metadata
			iv, err := GetIVFromMetadata(encSpec.SourceMetadata)
			if err != nil {
				return nil, fmt.Errorf("get IV from metadata: %w", err)
			}
			return CreateSSES3DecryptedReader(reader, sseKey, iv)
		}
		return nil, fmt.Errorf("invalid SSE-S3 source key type")

	default:
		return reader, nil
	}
}

// createEncryptionReader creates an encryption reader based on encryption type
func (scm *StreamingCopyManager) createEncryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
	switch encSpec.DestinationType {
	case EncryptionTypeSSEC:
		if destKey, ok := encSpec.DestinationKey.(*SSECustomerKey); ok {
			encryptedReader, iv, err := CreateSSECEncryptedReader(reader, destKey)
			if err != nil {
				return nil, err
			}
			// Store IV in destination metadata (this would need to be handled by caller)
			encSpec.DestinationIV = iv
			return encryptedReader, nil
		}
		return nil, fmt.Errorf("invalid SSE-C destination key type")

	case EncryptionTypeSSEKMS:
		if sseKey, ok := encSpec.DestinationKey.(*SSEKMSKey); ok {
			encryptedReader, updatedKey, err := CreateSSEKMSEncryptedReaderWithBucketKey(reader, sseKey.KeyID, sseKey.EncryptionContext, sseKey.BucketKeyEnabled)
			if err != nil {
				return nil, err
			}
			// Store IV from the updated key
			encSpec.DestinationIV = updatedKey.IV
			return encryptedReader, nil
		}
		return nil, fmt.Errorf("invalid SSE-KMS destination key type")

	case EncryptionTypeSSES3:
		if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok {
			encryptedReader, iv, err := CreateSSES3EncryptedReader(reader, sseKey)
			if err != nil {
				return nil, err
			}
			// Store IV for metadata
			encSpec.DestinationIV = iv
			return encryptedReader, nil
		}
		return nil, fmt.Errorf("invalid SSE-S3 destination key type")

	default:
		return reader, nil
	}
}

// createDecompressionReader creates a decompression reader
func (scm *StreamingCopyManager) createDecompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
	if !compSpec.NeedsDecompression {
		return reader, nil
	}

	switch compSpec.CompressionType {
	case "gzip":
		// Use SeaweedFS's streaming gzip decompression
		pr, pw := io.Pipe()
		go func() {
			defer pw.Close()
			_, err := util.GunzipStream(pw, reader)
			if err != nil {
				pw.CloseWithError(fmt.Errorf("gzip decompression failed: %v", err))
			}
		}()
		return pr, nil
	default:
		// Unknown compression type, return as-is
		return reader, nil
	}
}

// createCompressionReader creates a compression reader
func (scm *StreamingCopyManager) createCompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
	if !compSpec.NeedsCompression {
		return reader, nil
	}

	switch compSpec.CompressionType {
	case "gzip":
		// Use SeaweedFS's streaming gzip compression
		pr, pw := io.Pipe()
		go func() {
			defer pw.Close()
			_, err := util.GzipStream(pw, reader)
			if err != nil {
				pw.CloseWithError(fmt.Errorf("gzip compression failed: %v", err))
			}
		}()
		return pr, nil
	default:
		// Unknown compression type, return as-is
		return reader, nil
	}
}

// HashReader wraps an io.Reader to calculate MD5 and SHA256 hashes
type HashReader struct {
	reader     io.Reader
	md5Hash    hash.Hash
	sha256Hash hash.Hash
}

// NewHashReader creates a new hash calculating reader
func NewHashReader(reader io.Reader) *HashReader {
	return &HashReader{
		reader:     reader,
		md5Hash:    md5.New(),
		sha256Hash: sha256.New(),
	}
}

// Read implements io.Reader and calculates hashes as data flows through
func (hr *HashReader) Read(p []byte) (n int, err error) {
	n, err = hr.reader.Read(p)
	if n > 0 {
		// Update both hashes with the data read
		hr.md5Hash.Write(p[:n])
		hr.sha256Hash.Write(p[:n])
	}
	return n, err
}

// MD5Sum returns the current MD5 hash
func (hr *HashReader) MD5Sum() []byte {
	return hr.md5Hash.Sum(nil)
}

// SHA256Sum returns the current SHA256 hash
func (hr *HashReader) SHA256Sum() []byte {
	return hr.sha256Hash.Sum(nil)
}

// MD5Hex returns the MD5 hash as a hex string
func (hr *HashReader) MD5Hex() string {
	return hex.EncodeToString(hr.MD5Sum())
}

// SHA256Hex returns the SHA256 hash as a hex string
func (hr *HashReader) SHA256Hex() string {
	return hex.EncodeToString(hr.SHA256Sum())
}

// createHashReader creates a hash calculation reader
func (scm *StreamingCopyManager) createHashReader(reader io.Reader) io.Reader {
	return NewHashReader(reader)
}

// streamToDestination streams the processed data to the destination
func (scm *StreamingCopyManager) streamToDestination(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
	// For now, we'll use the existing chunk-based approach
	// In a full implementation, this would stream directly to the destination
	// without creating intermediate chunks

	// This is a placeholder that converts back to chunk-based approach
	// A full streaming implementation would write directly to the destination
	return scm.streamToChunks(ctx, reader, spec, dstPath)
}

// streamToChunks converts streaming data back to chunks (temporary implementation)
func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
	// This is a simplified implementation that reads the stream and creates chunks
	// A full implementation would be more sophisticated

	var chunks []*filer_pb.FileChunk
	buffer := make([]byte, spec.BufferSize)
	offset := int64(0)

	for {
		n, err := reader.Read(buffer)
		if n > 0 {
			// Create chunk for this data
			chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath)
			if chunkErr != nil {
				return nil, fmt.Errorf("create chunk from data: %w", chunkErr)
			}
			chunks = append(chunks, chunk)
			offset += int64(n)
		}

		if err == io.EOF {
			break
		}
		if err != nil {
			return nil, fmt.Errorf("read stream: %w", err)
		}
	}

	return chunks, nil
}

// createChunkFromData creates a chunk from streaming data
func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string) (*filer_pb.FileChunk, error) {
	// Assign new volume
	assignResult, err := scm.s3a.assignNewVolume(dstPath)
	if err != nil {
		return nil, fmt.Errorf("assign volume: %w", err)
	}

	// Create chunk
	chunk := &filer_pb.FileChunk{
		Offset: offset,
		Size:   uint64(len(data)),
	}

	// Set file ID
	if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil {
		return nil, err
	}

	// Upload data
	if err := scm.s3a.uploadChunkData(data, assignResult); err != nil {
		return nil, fmt.Errorf("upload chunk data: %w", err)
	}

	return chunk, nil
}

// createMultiChunkReader creates a reader that streams from multiple chunks
func (s3a *S3ApiServer) createMultiChunkReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
	// Create a multi-reader that combines all chunks
	var readers []io.Reader

	for _, chunk := range entry.GetChunks() {
		chunkReader, err := s3a.createChunkReader(chunk)
		if err != nil {
			return nil, fmt.Errorf("create chunk reader: %w", err)
		}
		readers = append(readers, chunkReader)
	}

	multiReader := io.MultiReader(readers...)
	return &multiReadCloser{reader: multiReader}, nil
}

// createChunkReader creates a reader for a single chunk
func (s3a *S3ApiServer) createChunkReader(chunk *filer_pb.FileChunk) (io.Reader, error) {
	// Get chunk URL
	srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString())
	if err != nil {
		return nil, fmt.Errorf("lookup volume URL: %w", err)
	}

	// Create HTTP request for chunk data
	req, err := http.NewRequest("GET", srcUrl, nil)
	if err != nil {
		return nil, fmt.Errorf("create HTTP request: %w", err)
	}

	// Execute request
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return nil, fmt.Errorf("execute HTTP request: %w", err)
	}

	if resp.StatusCode != http.StatusOK {
		resp.Body.Close()
		return nil, fmt.Errorf("HTTP request failed: %d", resp.StatusCode)
	}

	return resp.Body, nil
}

// multiReadCloser wraps a multi-reader with a close method
type multiReadCloser struct {
	reader io.Reader
}

func (mrc *multiReadCloser) Read(p []byte) (int, error) {
	return mrc.reader.Read(p)
}

func (mrc *multiReadCloser) Close() error {
	return nil
}