aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/upload_chunked.go267
-rw-r--r--weed/operation/upload_chunked_test.go312
2 files changed, 579 insertions, 0 deletions
diff --git a/weed/operation/upload_chunked.go b/weed/operation/upload_chunked.go
new file mode 100644
index 000000000..352b329f8
--- /dev/null
+++ b/weed/operation/upload_chunked.go
@@ -0,0 +1,267 @@
+package operation
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "fmt"
+ "hash"
+ "io"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/security"
+)
+
+// ChunkedUploadResult contains the result of a chunked upload
+type ChunkedUploadResult struct {
+ FileChunks []*filer_pb.FileChunk
+ Md5Hash hash.Hash
+ TotalSize int64
+ SmallContent []byte // For files smaller than threshold
+}
+
+// ChunkedUploadOption contains options for chunked uploads
+type ChunkedUploadOption struct {
+ ChunkSize int32
+ SmallFileLimit int64
+ Collection string
+ Replication string
+ DataCenter string
+ SaveSmallInline bool
+ Jwt security.EncodedJwt
+ MimeType string
+ AssignFunc func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error)
+ UploadFunc func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) // Optional: for testing
+}
+
+var chunkBufferPool = sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+}
+
+// UploadReaderInChunks reads from reader and uploads in chunks to volume servers
+// This prevents OOM by processing the stream in fixed-size chunks
+// Returns file chunks, MD5 hash, total size, and any small content stored inline
+func UploadReaderInChunks(ctx context.Context, reader io.Reader, opt *ChunkedUploadOption) (*ChunkedUploadResult, error) {
+
+ md5Hash := md5.New()
+ var partReader = io.TeeReader(reader, md5Hash)
+
+ var fileChunks []*filer_pb.FileChunk
+ var fileChunksLock sync.Mutex
+ var uploadErr error
+ var uploadErrLock sync.Mutex
+ var chunkOffset int64 = 0
+
+ var wg sync.WaitGroup
+ const bytesBufferCounter = 4
+ bytesBufferLimitChan := make(chan struct{}, bytesBufferCounter)
+
+uploadLoop:
+ for {
+ // Throttle buffer usage
+ bytesBufferLimitChan <- struct{}{}
+
+ // Check for errors from parallel uploads
+ uploadErrLock.Lock()
+ if uploadErr != nil {
+ <-bytesBufferLimitChan
+ uploadErrLock.Unlock()
+ break
+ }
+ uploadErrLock.Unlock()
+
+ // Check for context cancellation
+ select {
+ case <-ctx.Done():
+ <-bytesBufferLimitChan
+ uploadErrLock.Lock()
+ if uploadErr == nil {
+ uploadErr = ctx.Err()
+ }
+ uploadErrLock.Unlock()
+ break uploadLoop
+ default:
+ }
+
+ // Get buffer from pool
+ bytesBuffer := chunkBufferPool.Get().(*bytes.Buffer)
+ limitedReader := io.LimitReader(partReader, int64(opt.ChunkSize))
+ bytesBuffer.Reset()
+
+ // Read one chunk
+ dataSize, err := bytesBuffer.ReadFrom(limitedReader)
+ if err != nil {
+ glog.V(2).Infof("UploadReaderInChunks: read error at offset %d: %v", chunkOffset, err)
+ chunkBufferPool.Put(bytesBuffer)
+ <-bytesBufferLimitChan
+ uploadErrLock.Lock()
+ if uploadErr == nil {
+ uploadErr = err
+ }
+ uploadErrLock.Unlock()
+ break
+ }
+ // If no data was read, we've reached EOF
+ // Only break if we've already read some data (chunkOffset > 0) or if this is truly EOF
+ if dataSize == 0 {
+ if chunkOffset == 0 {
+ glog.Warningf("UploadReaderInChunks: received 0 bytes on first read - creating empty file")
+ }
+ chunkBufferPool.Put(bytesBuffer)
+ <-bytesBufferLimitChan
+ // If we've already read some chunks, this is normal EOF
+ // If we haven't read anything yet (chunkOffset == 0), this could be an empty file
+ // which is valid (e.g., touch command creates 0-byte files)
+ break
+ }
+
+ // For small files at offset 0, store inline instead of uploading
+ if chunkOffset == 0 && opt.SaveSmallInline && dataSize < opt.SmallFileLimit {
+ smallContent := make([]byte, dataSize)
+ n, readErr := io.ReadFull(bytesBuffer, smallContent)
+ chunkBufferPool.Put(bytesBuffer)
+ <-bytesBufferLimitChan
+
+ if readErr != nil {
+ return nil, fmt.Errorf("failed to read small content: read %d of %d bytes: %w", n, dataSize, readErr)
+ }
+
+ return &ChunkedUploadResult{
+ FileChunks: nil,
+ Md5Hash: md5Hash,
+ TotalSize: dataSize,
+ SmallContent: smallContent,
+ }, nil
+ }
+
+ // Upload chunk in parallel goroutine
+ wg.Add(1)
+ go func(offset int64, buf *bytes.Buffer) {
+ defer func() {
+ chunkBufferPool.Put(buf)
+ <-bytesBufferLimitChan
+ wg.Done()
+ }()
+
+ // Assign volume for this chunk
+ _, assignResult, assignErr := opt.AssignFunc(ctx, 1)
+ if assignErr != nil {
+ uploadErrLock.Lock()
+ if uploadErr == nil {
+ uploadErr = fmt.Errorf("assign volume: %w", assignErr)
+ }
+ uploadErrLock.Unlock()
+ return
+ }
+
+ // Upload chunk data
+ uploadUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid)
+
+ // Use per-assignment JWT if present, otherwise fall back to the original JWT
+ // This is critical for secured clusters where each volume assignment has its own JWT
+ jwt := opt.Jwt
+ if assignResult.Auth != "" {
+ jwt = assignResult.Auth
+ }
+
+ uploadOption := &UploadOption{
+ UploadUrl: uploadUrl,
+ Cipher: false,
+ IsInputCompressed: false,
+ MimeType: opt.MimeType,
+ PairMap: nil,
+ Jwt: jwt,
+ }
+
+ var uploadResult *UploadResult
+ var uploadResultErr error
+
+ // Use mock upload function if provided (for testing), otherwise use real uploader
+ if opt.UploadFunc != nil {
+ uploadResult, uploadResultErr = opt.UploadFunc(ctx, buf.Bytes(), uploadOption)
+ } else {
+ uploader, uploaderErr := NewUploader()
+ if uploaderErr != nil {
+ uploadErrLock.Lock()
+ if uploadErr == nil {
+ uploadErr = fmt.Errorf("create uploader: %w", uploaderErr)
+ }
+ uploadErrLock.Unlock()
+ return
+ }
+ uploadResult, uploadResultErr = uploader.UploadData(ctx, buf.Bytes(), uploadOption)
+ }
+
+ if uploadResultErr != nil {
+ uploadErrLock.Lock()
+ if uploadErr == nil {
+ uploadErr = fmt.Errorf("upload chunk: %w", uploadResultErr)
+ }
+ uploadErrLock.Unlock()
+ return
+ }
+
+ // Create chunk entry
+ // Set ModifiedTsNs to current time (nanoseconds) to track when upload completed
+ // This is critical for multipart uploads where the same part may be uploaded multiple times
+ // The part with the latest ModifiedTsNs is selected as the authoritative version
+ fid, _ := filer_pb.ToFileIdObject(assignResult.Fid)
+ chunk := &filer_pb.FileChunk{
+ FileId: assignResult.Fid,
+ Offset: offset,
+ Size: uint64(uploadResult.Size),
+ ModifiedTsNs: time.Now().UnixNano(),
+ ETag: uploadResult.ContentMd5,
+ Fid: fid,
+ CipherKey: uploadResult.CipherKey,
+ }
+
+ fileChunksLock.Lock()
+ fileChunks = append(fileChunks, chunk)
+ glog.V(4).Infof("uploaded chunk %d to %s [%d,%d)", len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size))
+ fileChunksLock.Unlock()
+
+ }(chunkOffset, bytesBuffer)
+
+ // Update offset for next chunk
+ chunkOffset += dataSize
+
+ // If this was a partial chunk, we're done
+ if dataSize < int64(opt.ChunkSize) {
+ break
+ }
+ }
+
+ // Wait for all uploads to complete
+ wg.Wait()
+
+ // Sort chunks by offset (do this even if there's an error, for cleanup purposes)
+ sort.Slice(fileChunks, func(i, j int) bool {
+ return fileChunks[i].Offset < fileChunks[j].Offset
+ })
+
+ // Check for errors - return partial results for cleanup
+ if uploadErr != nil {
+ glog.Errorf("chunked upload failed: %v (returning %d partial chunks for cleanup)", uploadErr, len(fileChunks))
+ // IMPORTANT: Return partial results even on error so caller can cleanup orphaned chunks
+ return &ChunkedUploadResult{
+ FileChunks: fileChunks,
+ Md5Hash: md5Hash,
+ TotalSize: chunkOffset,
+ SmallContent: nil,
+ }, uploadErr
+ }
+
+ return &ChunkedUploadResult{
+ FileChunks: fileChunks,
+ Md5Hash: md5Hash,
+ TotalSize: chunkOffset,
+ SmallContent: nil,
+ }, nil
+}
diff --git a/weed/operation/upload_chunked_test.go b/weed/operation/upload_chunked_test.go
new file mode 100644
index 000000000..ec7ffbba2
--- /dev/null
+++ b/weed/operation/upload_chunked_test.go
@@ -0,0 +1,312 @@
+package operation
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "io"
+ "testing"
+)
+
+// TestUploadReaderInChunksReturnsPartialResultsOnError verifies that when
+// UploadReaderInChunks fails mid-upload, it returns partial results containing
+// the chunks that were successfully uploaded before the error occurred.
+// This allows the caller to cleanup orphaned chunks and prevent resource leaks.
+func TestUploadReaderInChunksReturnsPartialResultsOnError(t *testing.T) {
+ // Create test data larger than one chunk to force multiple chunk uploads
+ testData := bytes.Repeat([]byte("test data for chunk upload failure testing"), 1000) // ~40KB
+ reader := bytes.NewReader(testData)
+
+ uploadAttempts := 0
+
+ // Create a mock assign function that succeeds for first chunk, then fails
+ assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) {
+ uploadAttempts++
+
+ if uploadAttempts == 1 {
+ // First chunk succeeds
+ return nil, &AssignResult{
+ Fid: "test-fid-1,1234",
+ Url: "http://test-volume-1:8080",
+ PublicUrl: "http://test-volume-1:8080",
+ Count: 1,
+ }, nil
+ }
+
+ // Second chunk fails (simulating volume server down or network error)
+ return nil, nil, errors.New("simulated volume assignment failure")
+ }
+
+ // Mock upload function that simulates successful upload
+ uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) {
+ return &UploadResult{
+ Name: "test-file",
+ Size: uint32(len(data)),
+ ContentMd5: "mock-md5-hash",
+ Error: "",
+ }, nil
+ }
+
+ // Attempt upload with small chunk size to trigger multiple uploads
+ result, err := UploadReaderInChunks(context.Background(), reader, &ChunkedUploadOption{
+ ChunkSize: 8 * 1024, // 8KB chunks
+ SmallFileLimit: 256,
+ Collection: "test",
+ DataCenter: "",
+ SaveSmallInline: false,
+ AssignFunc: assignFunc,
+ UploadFunc: uploadFunc,
+ })
+
+ // VERIFICATION 1: Error should be returned
+ if err == nil {
+ t.Fatal("Expected error from UploadReaderInChunks, got nil")
+ }
+ t.Logf("✓ Got expected error: %v", err)
+
+ // VERIFICATION 2: Result should NOT be nil (this is the fix)
+ if result == nil {
+ t.Fatal("CRITICAL: UploadReaderInChunks returned nil result on error - caller cannot cleanup orphaned chunks!")
+ }
+ t.Log("✓ Result is not nil (partial results returned)")
+
+ // VERIFICATION 3: Result should contain partial chunks from successful uploads
+ // Note: In reality, the first chunk upload would succeed before assignment fails for chunk 2
+ // But in this test, assignment fails immediately for chunk 2, so we may have 0 chunks
+ // The important thing is that the result struct is returned, not that it has chunks
+ t.Logf("✓ Result contains %d chunks (may be 0 if all assignments failed)", len(result.FileChunks))
+
+ // VERIFICATION 4: MD5 hash should be available even on partial failure
+ if result.Md5Hash == nil {
+ t.Error("Expected Md5Hash to be non-nil")
+ } else {
+ t.Log("✓ Md5Hash is available for partial data")
+ }
+
+ // VERIFICATION 5: TotalSize should reflect bytes read before failure
+ if result.TotalSize < 0 {
+ t.Errorf("Expected non-negative TotalSize, got %d", result.TotalSize)
+ } else {
+ t.Logf("✓ TotalSize = %d bytes read before failure", result.TotalSize)
+ }
+}
+
+// TestUploadReaderInChunksSuccessPath verifies normal successful upload behavior
+func TestUploadReaderInChunksSuccessPath(t *testing.T) {
+ testData := []byte("small test data")
+ reader := bytes.NewReader(testData)
+
+ // Mock assign function that always succeeds
+ assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) {
+ return nil, &AssignResult{
+ Fid: "test-fid,1234",
+ Url: "http://test-volume:8080",
+ PublicUrl: "http://test-volume:8080",
+ Count: 1,
+ }, nil
+ }
+
+ // Mock upload function that simulates successful upload
+ uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) {
+ return &UploadResult{
+ Name: "test-file",
+ Size: uint32(len(data)),
+ ContentMd5: "mock-md5-hash",
+ Error: "",
+ }, nil
+ }
+
+ result, err := UploadReaderInChunks(context.Background(), reader, &ChunkedUploadOption{
+ ChunkSize: 8 * 1024,
+ SmallFileLimit: 256,
+ Collection: "test",
+ DataCenter: "",
+ SaveSmallInline: false,
+ AssignFunc: assignFunc,
+ UploadFunc: uploadFunc,
+ })
+
+ // VERIFICATION 1: No error should occur
+ if err != nil {
+ t.Fatalf("Expected successful upload, got error: %v", err)
+ }
+ t.Log("✓ Upload completed without error")
+
+ // VERIFICATION 2: Result should not be nil
+ if result == nil {
+ t.Fatal("Expected non-nil result")
+ }
+ t.Log("✓ Result is not nil")
+
+ // VERIFICATION 3: Should have file chunks
+ if len(result.FileChunks) == 0 {
+ t.Error("Expected at least one file chunk")
+ } else {
+ t.Logf("✓ Result contains %d file chunk(s)", len(result.FileChunks))
+ }
+
+ // VERIFICATION 4: Total size should match input data
+ if result.TotalSize != int64(len(testData)) {
+ t.Errorf("Expected TotalSize=%d, got %d", len(testData), result.TotalSize)
+ } else {
+ t.Logf("✓ TotalSize=%d matches input data", result.TotalSize)
+ }
+
+ // VERIFICATION 5: MD5 hash should be available
+ if result.Md5Hash == nil {
+ t.Error("Expected non-nil Md5Hash")
+ } else {
+ t.Log("✓ Md5Hash is available")
+ }
+
+ // VERIFICATION 6: Chunk should have expected properties
+ if len(result.FileChunks) > 0 {
+ chunk := result.FileChunks[0]
+ if chunk.FileId != "test-fid,1234" {
+ t.Errorf("Expected chunk FileId='test-fid,1234', got '%s'", chunk.FileId)
+ }
+ if chunk.Offset != 0 {
+ t.Errorf("Expected chunk Offset=0, got %d", chunk.Offset)
+ }
+ if chunk.Size != uint64(len(testData)) {
+ t.Errorf("Expected chunk Size=%d, got %d", len(testData), chunk.Size)
+ }
+ t.Logf("✓ Chunk properties validated: FileId=%s, Offset=%d, Size=%d",
+ chunk.FileId, chunk.Offset, chunk.Size)
+ }
+}
+
+// TestUploadReaderInChunksContextCancellation verifies behavior when context is cancelled
+func TestUploadReaderInChunksContextCancellation(t *testing.T) {
+ testData := bytes.Repeat([]byte("test data"), 10000) // ~80KB
+ reader := bytes.NewReader(testData)
+
+ // Create a context that we'll cancel
+ ctx, cancel := context.WithCancel(context.Background())
+
+ // Cancel immediately to trigger cancellation handling
+ cancel()
+
+ assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) {
+ return nil, &AssignResult{
+ Fid: "test-fid,1234",
+ Url: "http://test-volume:8080",
+ PublicUrl: "http://test-volume:8080",
+ Count: 1,
+ }, nil
+ }
+
+ // Mock upload function that simulates successful upload
+ uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) {
+ return &UploadResult{
+ Name: "test-file",
+ Size: uint32(len(data)),
+ ContentMd5: "mock-md5-hash",
+ Error: "",
+ }, nil
+ }
+
+ result, err := UploadReaderInChunks(ctx, reader, &ChunkedUploadOption{
+ ChunkSize: 8 * 1024,
+ SmallFileLimit: 256,
+ Collection: "test",
+ DataCenter: "",
+ SaveSmallInline: false,
+ AssignFunc: assignFunc,
+ UploadFunc: uploadFunc,
+ })
+
+ // Should get context cancelled error
+ if err == nil {
+ t.Error("Expected context cancellation error")
+ }
+
+ // Should still get partial results for cleanup
+ if result == nil {
+ t.Error("Expected non-nil result even on context cancellation")
+ } else {
+ t.Logf("✓ Got partial result on cancellation: chunks=%d", len(result.FileChunks))
+ }
+}
+
+// mockFailingReader simulates a reader that fails after reading some data
+type mockFailingReader struct {
+ data []byte
+ pos int
+ failAfter int
+}
+
+func (m *mockFailingReader) Read(p []byte) (n int, err error) {
+ if m.pos >= m.failAfter {
+ return 0, errors.New("simulated read failure")
+ }
+
+ remaining := m.failAfter - m.pos
+ toRead := len(p)
+ if toRead > remaining {
+ toRead = remaining
+ }
+ if toRead > len(m.data)-m.pos {
+ toRead = len(m.data) - m.pos
+ }
+
+ if toRead == 0 {
+ return 0, io.EOF
+ }
+
+ copy(p, m.data[m.pos:m.pos+toRead])
+ m.pos += toRead
+ return toRead, nil
+}
+
+// TestUploadReaderInChunksReaderFailure verifies behavior when reader fails mid-read
+func TestUploadReaderInChunksReaderFailure(t *testing.T) {
+ testData := bytes.Repeat([]byte("test"), 5000) // 20KB
+ failingReader := &mockFailingReader{
+ data: testData,
+ pos: 0,
+ failAfter: 10000, // Fail after 10KB
+ }
+
+ assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) {
+ return nil, &AssignResult{
+ Fid: "test-fid,1234",
+ Url: "http://test-volume:8080",
+ PublicUrl: "http://test-volume:8080",
+ Count: 1,
+ }, nil
+ }
+
+ // Mock upload function that simulates successful upload
+ uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) {
+ return &UploadResult{
+ Name: "test-file",
+ Size: uint32(len(data)),
+ ContentMd5: "mock-md5-hash",
+ Error: "",
+ }, nil
+ }
+
+ result, err := UploadReaderInChunks(context.Background(), failingReader, &ChunkedUploadOption{
+ ChunkSize: 8 * 1024, // 8KB chunks
+ SmallFileLimit: 256,
+ Collection: "test",
+ DataCenter: "",
+ SaveSmallInline: false,
+ AssignFunc: assignFunc,
+ UploadFunc: uploadFunc,
+ })
+
+ // Should get read error
+ if err == nil {
+ t.Error("Expected read failure error")
+ }
+
+ // Should still get partial results
+ if result == nil {
+ t.Fatal("Expected non-nil result on read failure")
+ }
+
+ t.Logf("✓ Got partial result on read failure: chunks=%d, totalSize=%d",
+ len(result.FileChunks), result.TotalSize)
+}