diff options
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/upload_chunked.go | 267 | ||||
| -rw-r--r-- | weed/operation/upload_chunked_test.go | 312 |
2 files changed, 579 insertions, 0 deletions
diff --git a/weed/operation/upload_chunked.go b/weed/operation/upload_chunked.go new file mode 100644 index 000000000..352b329f8 --- /dev/null +++ b/weed/operation/upload_chunked.go @@ -0,0 +1,267 @@ +package operation + +import ( + "bytes" + "context" + "crypto/md5" + "fmt" + "hash" + "io" + "sort" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" +) + +// ChunkedUploadResult contains the result of a chunked upload +type ChunkedUploadResult struct { + FileChunks []*filer_pb.FileChunk + Md5Hash hash.Hash + TotalSize int64 + SmallContent []byte // For files smaller than threshold +} + +// ChunkedUploadOption contains options for chunked uploads +type ChunkedUploadOption struct { + ChunkSize int32 + SmallFileLimit int64 + Collection string + Replication string + DataCenter string + SaveSmallInline bool + Jwt security.EncodedJwt + MimeType string + AssignFunc func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) + UploadFunc func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) // Optional: for testing +} + +var chunkBufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + +// UploadReaderInChunks reads from reader and uploads in chunks to volume servers +// This prevents OOM by processing the stream in fixed-size chunks +// Returns file chunks, MD5 hash, total size, and any small content stored inline +func UploadReaderInChunks(ctx context.Context, reader io.Reader, opt *ChunkedUploadOption) (*ChunkedUploadResult, error) { + + md5Hash := md5.New() + var partReader = io.TeeReader(reader, md5Hash) + + var fileChunks []*filer_pb.FileChunk + var fileChunksLock sync.Mutex + var uploadErr error + var uploadErrLock sync.Mutex + var chunkOffset int64 = 0 + + var wg sync.WaitGroup + const bytesBufferCounter = 4 + bytesBufferLimitChan := make(chan struct{}, bytesBufferCounter) + +uploadLoop: + for { + // Throttle buffer usage + bytesBufferLimitChan <- struct{}{} + + // Check for errors from parallel uploads + uploadErrLock.Lock() + if uploadErr != nil { + <-bytesBufferLimitChan + uploadErrLock.Unlock() + break + } + uploadErrLock.Unlock() + + // Check for context cancellation + select { + case <-ctx.Done(): + <-bytesBufferLimitChan + uploadErrLock.Lock() + if uploadErr == nil { + uploadErr = ctx.Err() + } + uploadErrLock.Unlock() + break uploadLoop + default: + } + + // Get buffer from pool + bytesBuffer := chunkBufferPool.Get().(*bytes.Buffer) + limitedReader := io.LimitReader(partReader, int64(opt.ChunkSize)) + bytesBuffer.Reset() + + // Read one chunk + dataSize, err := bytesBuffer.ReadFrom(limitedReader) + if err != nil { + glog.V(2).Infof("UploadReaderInChunks: read error at offset %d: %v", chunkOffset, err) + chunkBufferPool.Put(bytesBuffer) + <-bytesBufferLimitChan + uploadErrLock.Lock() + if uploadErr == nil { + uploadErr = err + } + uploadErrLock.Unlock() + break + } + // If no data was read, we've reached EOF + // Only break if we've already read some data (chunkOffset > 0) or if this is truly EOF + if dataSize == 0 { + if chunkOffset == 0 { + glog.Warningf("UploadReaderInChunks: received 0 bytes on first read - creating empty file") + } + chunkBufferPool.Put(bytesBuffer) + <-bytesBufferLimitChan + // If we've already read some chunks, this is normal EOF + // If we haven't read anything yet (chunkOffset == 0), this could be an empty file + // which is valid (e.g., touch command creates 0-byte files) + break + } + + // For small files at offset 0, store inline instead of uploading + if chunkOffset == 0 && opt.SaveSmallInline && dataSize < opt.SmallFileLimit { + smallContent := make([]byte, dataSize) + n, readErr := io.ReadFull(bytesBuffer, smallContent) + chunkBufferPool.Put(bytesBuffer) + <-bytesBufferLimitChan + + if readErr != nil { + return nil, fmt.Errorf("failed to read small content: read %d of %d bytes: %w", n, dataSize, readErr) + } + + return &ChunkedUploadResult{ + FileChunks: nil, + Md5Hash: md5Hash, + TotalSize: dataSize, + SmallContent: smallContent, + }, nil + } + + // Upload chunk in parallel goroutine + wg.Add(1) + go func(offset int64, buf *bytes.Buffer) { + defer func() { + chunkBufferPool.Put(buf) + <-bytesBufferLimitChan + wg.Done() + }() + + // Assign volume for this chunk + _, assignResult, assignErr := opt.AssignFunc(ctx, 1) + if assignErr != nil { + uploadErrLock.Lock() + if uploadErr == nil { + uploadErr = fmt.Errorf("assign volume: %w", assignErr) + } + uploadErrLock.Unlock() + return + } + + // Upload chunk data + uploadUrl := fmt.Sprintf("http://%s/%s", assignResult.Url, assignResult.Fid) + + // Use per-assignment JWT if present, otherwise fall back to the original JWT + // This is critical for secured clusters where each volume assignment has its own JWT + jwt := opt.Jwt + if assignResult.Auth != "" { + jwt = assignResult.Auth + } + + uploadOption := &UploadOption{ + UploadUrl: uploadUrl, + Cipher: false, + IsInputCompressed: false, + MimeType: opt.MimeType, + PairMap: nil, + Jwt: jwt, + } + + var uploadResult *UploadResult + var uploadResultErr error + + // Use mock upload function if provided (for testing), otherwise use real uploader + if opt.UploadFunc != nil { + uploadResult, uploadResultErr = opt.UploadFunc(ctx, buf.Bytes(), uploadOption) + } else { + uploader, uploaderErr := NewUploader() + if uploaderErr != nil { + uploadErrLock.Lock() + if uploadErr == nil { + uploadErr = fmt.Errorf("create uploader: %w", uploaderErr) + } + uploadErrLock.Unlock() + return + } + uploadResult, uploadResultErr = uploader.UploadData(ctx, buf.Bytes(), uploadOption) + } + + if uploadResultErr != nil { + uploadErrLock.Lock() + if uploadErr == nil { + uploadErr = fmt.Errorf("upload chunk: %w", uploadResultErr) + } + uploadErrLock.Unlock() + return + } + + // Create chunk entry + // Set ModifiedTsNs to current time (nanoseconds) to track when upload completed + // This is critical for multipart uploads where the same part may be uploaded multiple times + // The part with the latest ModifiedTsNs is selected as the authoritative version + fid, _ := filer_pb.ToFileIdObject(assignResult.Fid) + chunk := &filer_pb.FileChunk{ + FileId: assignResult.Fid, + Offset: offset, + Size: uint64(uploadResult.Size), + ModifiedTsNs: time.Now().UnixNano(), + ETag: uploadResult.ContentMd5, + Fid: fid, + CipherKey: uploadResult.CipherKey, + } + + fileChunksLock.Lock() + fileChunks = append(fileChunks, chunk) + glog.V(4).Infof("uploaded chunk %d to %s [%d,%d)", len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size)) + fileChunksLock.Unlock() + + }(chunkOffset, bytesBuffer) + + // Update offset for next chunk + chunkOffset += dataSize + + // If this was a partial chunk, we're done + if dataSize < int64(opt.ChunkSize) { + break + } + } + + // Wait for all uploads to complete + wg.Wait() + + // Sort chunks by offset (do this even if there's an error, for cleanup purposes) + sort.Slice(fileChunks, func(i, j int) bool { + return fileChunks[i].Offset < fileChunks[j].Offset + }) + + // Check for errors - return partial results for cleanup + if uploadErr != nil { + glog.Errorf("chunked upload failed: %v (returning %d partial chunks for cleanup)", uploadErr, len(fileChunks)) + // IMPORTANT: Return partial results even on error so caller can cleanup orphaned chunks + return &ChunkedUploadResult{ + FileChunks: fileChunks, + Md5Hash: md5Hash, + TotalSize: chunkOffset, + SmallContent: nil, + }, uploadErr + } + + return &ChunkedUploadResult{ + FileChunks: fileChunks, + Md5Hash: md5Hash, + TotalSize: chunkOffset, + SmallContent: nil, + }, nil +} diff --git a/weed/operation/upload_chunked_test.go b/weed/operation/upload_chunked_test.go new file mode 100644 index 000000000..ec7ffbba2 --- /dev/null +++ b/weed/operation/upload_chunked_test.go @@ -0,0 +1,312 @@ +package operation + +import ( + "bytes" + "context" + "errors" + "io" + "testing" +) + +// TestUploadReaderInChunksReturnsPartialResultsOnError verifies that when +// UploadReaderInChunks fails mid-upload, it returns partial results containing +// the chunks that were successfully uploaded before the error occurred. +// This allows the caller to cleanup orphaned chunks and prevent resource leaks. +func TestUploadReaderInChunksReturnsPartialResultsOnError(t *testing.T) { + // Create test data larger than one chunk to force multiple chunk uploads + testData := bytes.Repeat([]byte("test data for chunk upload failure testing"), 1000) // ~40KB + reader := bytes.NewReader(testData) + + uploadAttempts := 0 + + // Create a mock assign function that succeeds for first chunk, then fails + assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { + uploadAttempts++ + + if uploadAttempts == 1 { + // First chunk succeeds + return nil, &AssignResult{ + Fid: "test-fid-1,1234", + Url: "http://test-volume-1:8080", + PublicUrl: "http://test-volume-1:8080", + Count: 1, + }, nil + } + + // Second chunk fails (simulating volume server down or network error) + return nil, nil, errors.New("simulated volume assignment failure") + } + + // Mock upload function that simulates successful upload + uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) { + return &UploadResult{ + Name: "test-file", + Size: uint32(len(data)), + ContentMd5: "mock-md5-hash", + Error: "", + }, nil + } + + // Attempt upload with small chunk size to trigger multiple uploads + result, err := UploadReaderInChunks(context.Background(), reader, &ChunkedUploadOption{ + ChunkSize: 8 * 1024, // 8KB chunks + SmallFileLimit: 256, + Collection: "test", + DataCenter: "", + SaveSmallInline: false, + AssignFunc: assignFunc, + UploadFunc: uploadFunc, + }) + + // VERIFICATION 1: Error should be returned + if err == nil { + t.Fatal("Expected error from UploadReaderInChunks, got nil") + } + t.Logf("✓ Got expected error: %v", err) + + // VERIFICATION 2: Result should NOT be nil (this is the fix) + if result == nil { + t.Fatal("CRITICAL: UploadReaderInChunks returned nil result on error - caller cannot cleanup orphaned chunks!") + } + t.Log("✓ Result is not nil (partial results returned)") + + // VERIFICATION 3: Result should contain partial chunks from successful uploads + // Note: In reality, the first chunk upload would succeed before assignment fails for chunk 2 + // But in this test, assignment fails immediately for chunk 2, so we may have 0 chunks + // The important thing is that the result struct is returned, not that it has chunks + t.Logf("✓ Result contains %d chunks (may be 0 if all assignments failed)", len(result.FileChunks)) + + // VERIFICATION 4: MD5 hash should be available even on partial failure + if result.Md5Hash == nil { + t.Error("Expected Md5Hash to be non-nil") + } else { + t.Log("✓ Md5Hash is available for partial data") + } + + // VERIFICATION 5: TotalSize should reflect bytes read before failure + if result.TotalSize < 0 { + t.Errorf("Expected non-negative TotalSize, got %d", result.TotalSize) + } else { + t.Logf("✓ TotalSize = %d bytes read before failure", result.TotalSize) + } +} + +// TestUploadReaderInChunksSuccessPath verifies normal successful upload behavior +func TestUploadReaderInChunksSuccessPath(t *testing.T) { + testData := []byte("small test data") + reader := bytes.NewReader(testData) + + // Mock assign function that always succeeds + assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { + return nil, &AssignResult{ + Fid: "test-fid,1234", + Url: "http://test-volume:8080", + PublicUrl: "http://test-volume:8080", + Count: 1, + }, nil + } + + // Mock upload function that simulates successful upload + uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) { + return &UploadResult{ + Name: "test-file", + Size: uint32(len(data)), + ContentMd5: "mock-md5-hash", + Error: "", + }, nil + } + + result, err := UploadReaderInChunks(context.Background(), reader, &ChunkedUploadOption{ + ChunkSize: 8 * 1024, + SmallFileLimit: 256, + Collection: "test", + DataCenter: "", + SaveSmallInline: false, + AssignFunc: assignFunc, + UploadFunc: uploadFunc, + }) + + // VERIFICATION 1: No error should occur + if err != nil { + t.Fatalf("Expected successful upload, got error: %v", err) + } + t.Log("✓ Upload completed without error") + + // VERIFICATION 2: Result should not be nil + if result == nil { + t.Fatal("Expected non-nil result") + } + t.Log("✓ Result is not nil") + + // VERIFICATION 3: Should have file chunks + if len(result.FileChunks) == 0 { + t.Error("Expected at least one file chunk") + } else { + t.Logf("✓ Result contains %d file chunk(s)", len(result.FileChunks)) + } + + // VERIFICATION 4: Total size should match input data + if result.TotalSize != int64(len(testData)) { + t.Errorf("Expected TotalSize=%d, got %d", len(testData), result.TotalSize) + } else { + t.Logf("✓ TotalSize=%d matches input data", result.TotalSize) + } + + // VERIFICATION 5: MD5 hash should be available + if result.Md5Hash == nil { + t.Error("Expected non-nil Md5Hash") + } else { + t.Log("✓ Md5Hash is available") + } + + // VERIFICATION 6: Chunk should have expected properties + if len(result.FileChunks) > 0 { + chunk := result.FileChunks[0] + if chunk.FileId != "test-fid,1234" { + t.Errorf("Expected chunk FileId='test-fid,1234', got '%s'", chunk.FileId) + } + if chunk.Offset != 0 { + t.Errorf("Expected chunk Offset=0, got %d", chunk.Offset) + } + if chunk.Size != uint64(len(testData)) { + t.Errorf("Expected chunk Size=%d, got %d", len(testData), chunk.Size) + } + t.Logf("✓ Chunk properties validated: FileId=%s, Offset=%d, Size=%d", + chunk.FileId, chunk.Offset, chunk.Size) + } +} + +// TestUploadReaderInChunksContextCancellation verifies behavior when context is cancelled +func TestUploadReaderInChunksContextCancellation(t *testing.T) { + testData := bytes.Repeat([]byte("test data"), 10000) // ~80KB + reader := bytes.NewReader(testData) + + // Create a context that we'll cancel + ctx, cancel := context.WithCancel(context.Background()) + + // Cancel immediately to trigger cancellation handling + cancel() + + assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { + return nil, &AssignResult{ + Fid: "test-fid,1234", + Url: "http://test-volume:8080", + PublicUrl: "http://test-volume:8080", + Count: 1, + }, nil + } + + // Mock upload function that simulates successful upload + uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) { + return &UploadResult{ + Name: "test-file", + Size: uint32(len(data)), + ContentMd5: "mock-md5-hash", + Error: "", + }, nil + } + + result, err := UploadReaderInChunks(ctx, reader, &ChunkedUploadOption{ + ChunkSize: 8 * 1024, + SmallFileLimit: 256, + Collection: "test", + DataCenter: "", + SaveSmallInline: false, + AssignFunc: assignFunc, + UploadFunc: uploadFunc, + }) + + // Should get context cancelled error + if err == nil { + t.Error("Expected context cancellation error") + } + + // Should still get partial results for cleanup + if result == nil { + t.Error("Expected non-nil result even on context cancellation") + } else { + t.Logf("✓ Got partial result on cancellation: chunks=%d", len(result.FileChunks)) + } +} + +// mockFailingReader simulates a reader that fails after reading some data +type mockFailingReader struct { + data []byte + pos int + failAfter int +} + +func (m *mockFailingReader) Read(p []byte) (n int, err error) { + if m.pos >= m.failAfter { + return 0, errors.New("simulated read failure") + } + + remaining := m.failAfter - m.pos + toRead := len(p) + if toRead > remaining { + toRead = remaining + } + if toRead > len(m.data)-m.pos { + toRead = len(m.data) - m.pos + } + + if toRead == 0 { + return 0, io.EOF + } + + copy(p, m.data[m.pos:m.pos+toRead]) + m.pos += toRead + return toRead, nil +} + +// TestUploadReaderInChunksReaderFailure verifies behavior when reader fails mid-read +func TestUploadReaderInChunksReaderFailure(t *testing.T) { + testData := bytes.Repeat([]byte("test"), 5000) // 20KB + failingReader := &mockFailingReader{ + data: testData, + pos: 0, + failAfter: 10000, // Fail after 10KB + } + + assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { + return nil, &AssignResult{ + Fid: "test-fid,1234", + Url: "http://test-volume:8080", + PublicUrl: "http://test-volume:8080", + Count: 1, + }, nil + } + + // Mock upload function that simulates successful upload + uploadFunc := func(ctx context.Context, data []byte, option *UploadOption) (*UploadResult, error) { + return &UploadResult{ + Name: "test-file", + Size: uint32(len(data)), + ContentMd5: "mock-md5-hash", + Error: "", + }, nil + } + + result, err := UploadReaderInChunks(context.Background(), failingReader, &ChunkedUploadOption{ + ChunkSize: 8 * 1024, // 8KB chunks + SmallFileLimit: 256, + Collection: "test", + DataCenter: "", + SaveSmallInline: false, + AssignFunc: assignFunc, + UploadFunc: uploadFunc, + }) + + // Should get read error + if err == nil { + t.Error("Expected read failure error") + } + + // Should still get partial results + if result == nil { + t.Fatal("Expected non-nil result on read failure") + } + + t.Logf("✓ Got partial result on read failure: chunks=%d, totalSize=%d", + len(result.FileChunks), result.TotalSize) +} |
