aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/reader_at.go138
-rw-r--r--weed/filer/reader_cache.go86
-rw-r--r--weed/filer/reader_cache_test.go505
-rw-r--r--weed/util/http/http_global_client_util.go108
4 files changed, 788 insertions, 49 deletions
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 93fa76a2e..5e8fd6154 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -7,6 +7,8 @@ import (
"math/rand"
"sync"
+ "golang.org/x/sync/errgroup"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
@@ -19,6 +21,11 @@ import (
// the prefetch count is derived from the -concurrentReaders option.
const DefaultPrefetchCount = 4
+// minReadConcurrency is the minimum number of parallel chunk fetches.
+// This ensures at least some parallelism even when prefetchCount is low,
+// improving throughput for reads spanning multiple chunks.
+const minReadConcurrency = 4
+
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews *IntervalList[*ChunkView]
@@ -175,67 +182,139 @@ func (c *ChunkReadAt) ReadAtWithTime(ctx context.Context, p []byte, offset int64
return c.doReadAt(ctx, p, offset)
}
+// chunkReadTask represents a single chunk read operation for parallel processing
+type chunkReadTask struct {
+ chunk *ChunkView
+ bufferStart int64 // start position in the output buffer
+ bufferEnd int64 // end position in the output buffer
+ chunkOffset uint64 // offset within the chunk to read from
+ bytesRead int
+ modifiedTsNs int64
+}
+
func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n int, ts int64, err error) {
+ // Collect all chunk read tasks
+ var tasks []*chunkReadTask
+ var gaps []struct{ start, length int64 } // gaps that need zero-filling
+
startOffset, remaining := offset, int64(len(p))
- var nextChunks *Interval[*ChunkView]
+ var lastChunk *Interval[*ChunkView]
+
for x := c.chunkViews.Front(); x != nil; x = x.Next {
chunk := x.Value
if remaining <= 0 {
break
}
- if x.Next != nil {
- nextChunks = x.Next
- }
+ lastChunk = x
+
+ // Handle gap before this chunk
if startOffset < chunk.ViewOffset {
gap := chunk.ViewOffset - startOffset
- glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.ViewOffset)
- n += zero(p, startOffset-offset, gap)
+ gaps = append(gaps, struct{ start, length int64 }{startOffset - offset, gap})
startOffset, remaining = chunk.ViewOffset, remaining-gap
if remaining <= 0 {
break
}
}
- // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize))
+
chunkStart, chunkStop := max(chunk.ViewOffset, startOffset), min(chunk.ViewOffset+int64(chunk.ViewSize), startOffset+remaining)
if chunkStart >= chunkStop {
continue
}
- // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.ViewOffset-chunk.Offset, chunk.ViewOffset-chunk.Offset+int64(chunk.ViewSize))
+
bufferOffset := chunkStart - chunk.ViewOffset + chunk.OffsetInChunk
- ts = chunk.ModifiedTsNs
- copied, err := c.readChunkSliceAt(ctx, p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset))
- if err != nil {
- glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
- return copied, ts, err
+ tasks = append(tasks, &chunkReadTask{
+ chunk: chunk,
+ bufferStart: startOffset - offset,
+ bufferEnd: chunkStop - chunkStart + startOffset - offset,
+ chunkOffset: uint64(bufferOffset),
+ })
+
+ startOffset, remaining = chunkStop, remaining-(chunkStop-chunkStart)
+ }
+
+ // Zero-fill gaps
+ for _, gap := range gaps {
+ glog.V(4).Infof("zero [%d,%d)", offset+gap.start, offset+gap.start+gap.length)
+ n += zero(p, gap.start, gap.length)
+ }
+
+ // If only one chunk or random access mode, use sequential reading
+ if len(tasks) <= 1 || c.readerPattern.IsRandomMode() {
+ for _, task := range tasks {
+ copied, readErr := c.readChunkSliceAt(ctx, p[task.bufferStart:task.bufferEnd], task.chunk, nil, task.chunkOffset)
+ ts = max(ts, task.chunk.ModifiedTsNs)
+ if readErr != nil {
+ glog.Errorf("fetching chunk %+v: %v\n", task.chunk, readErr)
+ return n + copied, ts, readErr
+ }
+ n += copied
+ }
+ } else {
+ // Parallel chunk fetching for multiple chunks
+ // This significantly improves throughput when chunks are on different volume servers
+ g, gCtx := errgroup.WithContext(ctx)
+
+ // Limit concurrency to avoid overwhelming the system
+ concurrency := c.prefetchCount
+ if concurrency < minReadConcurrency {
+ concurrency = minReadConcurrency
+ }
+ if concurrency > len(tasks) {
+ concurrency = len(tasks)
+ }
+ g.SetLimit(concurrency)
+
+ for _, task := range tasks {
+ g.Go(func() error {
+ // Read directly into the correct position in the output buffer
+ copied, readErr := c.readChunkSliceAtForParallel(gCtx, p[task.bufferStart:task.bufferEnd], task.chunk, task.chunkOffset)
+ task.bytesRead = copied
+ task.modifiedTsNs = task.chunk.ModifiedTsNs
+ return readErr
+ })
}
- n += copied
- startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
+ // Wait for all chunk reads to complete
+ if waitErr := g.Wait(); waitErr != nil {
+ err = waitErr
+ }
+
+ // Aggregate results (order is preserved since we read directly into buffer positions)
+ for _, task := range tasks {
+ n += task.bytesRead
+ ts = max(ts, task.modifiedTsNs)
+ }
+
+ if err != nil {
+ return n, ts, err
+ }
}
- // glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
+ // Trigger prefetch for sequential reads
+ if lastChunk != nil && lastChunk.Next != nil && c.prefetchCount > 0 && !c.readerPattern.IsRandomMode() {
+ c.readerCache.MaybeCache(lastChunk.Next, c.prefetchCount)
+ }
- // zero the remaining bytes if a gap exists at the end of the last chunk (or a fully sparse file)
- if err == nil && remaining > 0 {
+ // Zero the remaining bytes if a gap exists at the end
+ if remaining > 0 {
var delta int64
if c.fileSize >= startOffset {
delta = min(remaining, c.fileSize-startOffset)
- startOffset -= offset
- }
- if delta > 0 {
- glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+delta, c.fileSize)
- n += zero(p, startOffset, delta)
+ bufStart := startOffset - offset
+ if delta > 0 {
+ glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+delta, c.fileSize)
+ n += zero(p, bufStart, delta)
+ }
}
}
if err == nil && offset+int64(len(p)) >= c.fileSize {
err = io.EOF
}
- // fmt.Printf("~~~ filled %d, err: %v\n\n", n, err)
return
-
}
func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunkView *ChunkView, nextChunkViews *Interval[*ChunkView], offset uint64) (n int, err error) {
@@ -249,7 +328,7 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk
}
shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache()
- n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache)
+ n, err = c.readerCache.ReadChunkAt(ctx, buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache)
if c.lastChunkFid != chunkView.FileId {
if chunkView.OffsetInChunk == 0 { // start of a new chunk
if c.lastChunkFid != "" {
@@ -266,6 +345,13 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk
return
}
+// readChunkSliceAtForParallel is a simplified version for parallel chunk fetching
+// It doesn't update lastChunkFid or trigger prefetch (handled by the caller)
+func (c *ChunkReadAt) readChunkSliceAtForParallel(ctx context.Context, buffer []byte, chunkView *ChunkView, offset uint64) (n int, err error) {
+ shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache()
+ return c.readerCache.ReadChunkAt(ctx, buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache)
+}
+
func zero(buffer []byte, start, length int64) int {
if length <= 0 {
return 0
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index 605be5e73..66cbac1e3 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -35,6 +35,7 @@ type SingleChunkCacher struct {
shouldCache bool
wg sync.WaitGroup
cacheStartedCh chan struct{}
+ done chan struct{} // signals when download is complete
}
func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
@@ -93,14 +94,18 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView], count int) {
return
}
-func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
+func (rc *ReaderCache) ReadChunkAt(ctx context.Context, buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
rc.Lock()
if cacher, found := rc.downloaders[fileId]; found {
- if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
- rc.Unlock()
+ rc.Unlock()
+ n, err := cacher.readChunkAt(ctx, buffer, offset)
+ if n > 0 || err != nil {
return n, err
}
+ // If n=0 and err=nil, the cacher couldn't provide data for this offset.
+ // Fall through to try chunkCache.
+ rc.Lock()
}
if shouldCache || rc.lookupFileIdFn == nil {
n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
@@ -134,7 +139,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
rc.downloaders[fileId] = cacher
rc.Unlock()
- return cacher.readChunkAt(buffer, offset)
+ return cacher.readChunkAt(ctx, buffer, offset)
}
func (rc *ReaderCache) UnCache(fileId string) {
@@ -166,38 +171,53 @@ func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte,
chunkSize: chunkSize,
shouldCache: shouldCache,
cacheStartedCh: make(chan struct{}),
+ done: make(chan struct{}),
}
}
+// startCaching downloads the chunk data in the background.
+// It does NOT hold the lock during the HTTP download to allow concurrent readers
+// to wait efficiently using the done channel.
func (s *SingleChunkCacher) startCaching() {
s.wg.Add(1)
defer s.wg.Done()
- s.Lock()
- defer s.Unlock()
+ defer close(s.done) // guarantee completion signal even on panic
- s.cacheStartedCh <- struct{}{} // means this has been started
+ s.cacheStartedCh <- struct{}{} // signal that we've started
+ // Note: We intentionally use context.Background() here, NOT a request-specific context.
+ // The downloaded chunk is a shared resource - multiple concurrent readers may be waiting
+ // for this same download to complete. If we used a request context and that request was
+ // cancelled, it would abort the download and cause errors for all other waiting readers.
+ // The download should always complete once started to serve all potential consumers.
+
+ // Lookup file ID without holding the lock
urlStrings, err := s.parent.lookupFileIdFn(context.Background(), s.chunkFileId)
if err != nil {
+ s.Lock()
s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
+ s.Unlock()
return
}
- s.data = mem.Allocate(s.chunkSize)
-
- _, s.err = util_http.RetriedFetchChunkData(context.Background(), s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId)
- if s.err != nil {
- mem.Free(s.data)
- s.data = nil
- return
- }
+ // Allocate buffer and download without holding the lock
+ // This allows multiple downloads to proceed in parallel
+ data := mem.Allocate(s.chunkSize)
+ _, fetchErr := util_http.RetriedFetchChunkData(context.Background(), data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId)
- if s.shouldCache {
- s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
+ // Now acquire lock to update state
+ s.Lock()
+ if fetchErr != nil {
+ mem.Free(data)
+ s.err = fetchErr
+ } else {
+ s.data = data
+ if s.shouldCache {
+ s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
+ }
+ atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
}
- atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
-
- return
+ s.Unlock()
}
func (s *SingleChunkCacher) destroy() {
@@ -209,13 +229,34 @@ func (s *SingleChunkCacher) destroy() {
if s.data != nil {
mem.Free(s.data)
s.data = nil
- close(s.cacheStartedCh)
}
}
-func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
+// readChunkAt reads data from the cached chunk.
+// It waits for the download to complete if it's still in progress.
+// The ctx parameter allows the reader to cancel its wait (but the download continues
+// for other readers - see comment in startCaching about shared resource semantics).
+func (s *SingleChunkCacher) readChunkAt(ctx context.Context, buf []byte, offset int64) (int, error) {
s.wg.Add(1)
defer s.wg.Done()
+
+ // Wait for download to complete, but allow reader cancellation.
+ // Prioritize checking done first - if data is already available,
+ // return it even if context is also cancelled.
+ select {
+ case <-s.done:
+ // Download already completed, proceed immediately
+ default:
+ // Download not complete, wait for it or context cancellation
+ select {
+ case <-s.done:
+ // Download completed
+ case <-ctx.Done():
+ // Reader cancelled while waiting - download continues for other readers
+ return 0, ctx.Err()
+ }
+ }
+
s.Lock()
defer s.Unlock()
@@ -228,5 +269,4 @@ func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
}
return copy(buf, s.data[offset:]), nil
-
}
diff --git a/weed/filer/reader_cache_test.go b/weed/filer/reader_cache_test.go
new file mode 100644
index 000000000..0480de8a7
--- /dev/null
+++ b/weed/filer/reader_cache_test.go
@@ -0,0 +1,505 @@
+package filer
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+// mockChunkCacheForReaderCache implements chunk cache for testing
+type mockChunkCacheForReaderCache struct {
+ data map[string][]byte
+ hitCount int32
+ mu sync.Mutex
+}
+
+func newMockChunkCacheForReaderCache() *mockChunkCacheForReaderCache {
+ return &mockChunkCacheForReaderCache{
+ data: make(map[string][]byte),
+ }
+}
+
+func (m *mockChunkCacheForReaderCache) GetChunk(fileId string, minSize uint64) []byte {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if d, ok := m.data[fileId]; ok {
+ atomic.AddInt32(&m.hitCount, 1)
+ return d
+ }
+ return nil
+}
+
+func (m *mockChunkCacheForReaderCache) ReadChunkAt(data []byte, fileId string, offset uint64) (int, error) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if d, ok := m.data[fileId]; ok && int(offset) < len(d) {
+ atomic.AddInt32(&m.hitCount, 1)
+ n := copy(data, d[offset:])
+ return n, nil
+ }
+ return 0, nil
+}
+
+func (m *mockChunkCacheForReaderCache) SetChunk(fileId string, data []byte) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.data[fileId] = data
+}
+
+func (m *mockChunkCacheForReaderCache) GetMaxFilePartSizeInCache() uint64 {
+ return 1024 * 1024 // 1MB
+}
+
+func (m *mockChunkCacheForReaderCache) IsInCache(fileId string, lockNeeded bool) bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ _, ok := m.data[fileId]
+ return ok
+}
+
+// TestReaderCacheContextCancellation tests that a reader can cancel its wait
+// while the download continues for other readers
+func TestReaderCacheContextCancellation(t *testing.T) {
+ cache := newMockChunkCacheForReaderCache()
+
+ // Create a ReaderCache - we can't easily test the full flow without mocking HTTP,
+ // but we can test the context cancellation in readChunkAt
+ rc := NewReaderCache(10, cache, nil)
+ defer rc.destroy()
+
+ // Pre-populate cache to avoid HTTP calls
+ testData := []byte("test data for context cancellation")
+ cache.SetChunk("test-file-1", testData)
+
+ // Test that context cancellation works
+ ctx, cancel := context.WithCancel(context.Background())
+
+ buffer := make([]byte, len(testData))
+ n, err := rc.ReadChunkAt(ctx, buffer, "test-file-1", nil, false, 0, len(testData), true)
+ if err != nil {
+ t.Errorf("Expected no error, got: %v", err)
+ }
+ if n != len(testData) {
+ t.Errorf("Expected %d bytes, got %d", len(testData), n)
+ }
+
+ // Cancel context and verify it doesn't affect already completed reads
+ cancel()
+
+ // Subsequent read with cancelled context should still work from cache
+ buffer2 := make([]byte, len(testData))
+ n2, err2 := rc.ReadChunkAt(ctx, buffer2, "test-file-1", nil, false, 0, len(testData), true)
+ // Note: This may or may not error depending on whether it hits cache
+ _ = n2
+ _ = err2
+}
+
+// TestReaderCacheFallbackToChunkCache tests that when a cacher returns n=0, err=nil,
+// we fall back to the chunkCache
+func TestReaderCacheFallbackToChunkCache(t *testing.T) {
+ cache := newMockChunkCacheForReaderCache()
+
+ // Pre-populate the chunk cache with data
+ testData := []byte("fallback test data that should be found in chunk cache")
+ cache.SetChunk("fallback-file", testData)
+
+ rc := NewReaderCache(10, cache, nil)
+ defer rc.destroy()
+
+ // Read should hit the chunk cache
+ buffer := make([]byte, len(testData))
+ n, err := rc.ReadChunkAt(context.Background(), buffer, "fallback-file", nil, false, 0, len(testData), true)
+
+ if err != nil {
+ t.Errorf("Expected no error, got: %v", err)
+ }
+ if n != len(testData) {
+ t.Errorf("Expected %d bytes, got %d", len(testData), n)
+ }
+
+ // Verify cache was hit
+ if cache.hitCount == 0 {
+ t.Error("Expected chunk cache to be hit")
+ }
+}
+
+// TestReaderCacheMultipleReadersWaitForSameChunk tests that multiple readers
+// can wait for the same chunk download to complete
+func TestReaderCacheMultipleReadersWaitForSameChunk(t *testing.T) {
+ cache := newMockChunkCacheForReaderCache()
+
+ // Pre-populate cache so we don't need HTTP
+ testData := make([]byte, 1024)
+ for i := range testData {
+ testData[i] = byte(i % 256)
+ }
+ cache.SetChunk("shared-chunk", testData)
+
+ rc := NewReaderCache(10, cache, nil)
+ defer rc.destroy()
+
+ // Launch multiple concurrent readers for the same chunk
+ numReaders := 10
+ var wg sync.WaitGroup
+ errors := make(chan error, numReaders)
+ bytesRead := make(chan int, numReaders)
+
+ for i := 0; i < numReaders; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ buffer := make([]byte, len(testData))
+ n, err := rc.ReadChunkAt(context.Background(), buffer, "shared-chunk", nil, false, 0, len(testData), true)
+ if err != nil {
+ errors <- err
+ }
+ bytesRead <- n
+ }()
+ }
+
+ wg.Wait()
+ close(errors)
+ close(bytesRead)
+
+ // Check for errors
+ for err := range errors {
+ t.Errorf("Reader got error: %v", err)
+ }
+
+ // Verify all readers got the expected data
+ for n := range bytesRead {
+ if n != len(testData) {
+ t.Errorf("Expected %d bytes, got %d", len(testData), n)
+ }
+ }
+}
+
+// TestReaderCachePartialRead tests reading at different offsets
+func TestReaderCachePartialRead(t *testing.T) {
+ cache := newMockChunkCacheForReaderCache()
+
+ testData := []byte("0123456789ABCDEFGHIJ")
+ cache.SetChunk("partial-read-file", testData)
+
+ rc := NewReaderCache(10, cache, nil)
+ defer rc.destroy()
+
+ tests := []struct {
+ name string
+ offset int64
+ size int
+ expected []byte
+ }{
+ {"read from start", 0, 5, []byte("01234")},
+ {"read from middle", 5, 5, []byte("56789")},
+ {"read to end", 15, 5, []byte("FGHIJ")},
+ {"read single byte", 10, 1, []byte("A")},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ buffer := make([]byte, tt.size)
+ n, err := rc.ReadChunkAt(context.Background(), buffer, "partial-read-file", nil, false, tt.offset, len(testData), true)
+
+ if err != nil {
+ t.Errorf("Expected no error, got: %v", err)
+ }
+ if n != tt.size {
+ t.Errorf("Expected %d bytes, got %d", tt.size, n)
+ }
+ if string(buffer[:n]) != string(tt.expected) {
+ t.Errorf("Expected %q, got %q", tt.expected, buffer[:n])
+ }
+ })
+ }
+}
+
+// TestReaderCacheCleanup tests that old downloaders are cleaned up
+func TestReaderCacheCleanup(t *testing.T) {
+ cache := newMockChunkCacheForReaderCache()
+
+ // Create cache with limit of 3
+ rc := NewReaderCache(3, cache, nil)
+ defer rc.destroy()
+
+ // Add data for multiple files
+ for i := 0; i < 5; i++ {
+ fileId := string(rune('A' + i))
+ data := []byte("data for file " + fileId)
+ cache.SetChunk(fileId, data)
+ }
+
+ // Read from multiple files - should trigger cleanup when exceeding limit
+ for i := 0; i < 5; i++ {
+ fileId := string(rune('A' + i))
+ buffer := make([]byte, 20)
+ _, err := rc.ReadChunkAt(context.Background(), buffer, fileId, nil, false, 0, 20, true)
+ if err != nil {
+ t.Errorf("Read error for file %s: %v", fileId, err)
+ }
+ }
+
+ // Cache should still work - reads should succeed
+ for i := 0; i < 5; i++ {
+ fileId := string(rune('A' + i))
+ buffer := make([]byte, 20)
+ n, err := rc.ReadChunkAt(context.Background(), buffer, fileId, nil, false, 0, 20, true)
+ if err != nil {
+ t.Errorf("Second read error for file %s: %v", fileId, err)
+ }
+ if n == 0 {
+ t.Errorf("Expected data for file %s, got 0 bytes", fileId)
+ }
+ }
+}
+
+// TestSingleChunkCacherDoneSignal tests that done channel is always closed
+func TestSingleChunkCacherDoneSignal(t *testing.T) {
+ cache := newMockChunkCacheForReaderCache()
+ rc := NewReaderCache(10, cache, nil)
+ defer rc.destroy()
+
+ // Test that we can read even when data is in cache (done channel should work)
+ testData := []byte("done signal test")
+ cache.SetChunk("done-signal-test", testData)
+
+ // Multiple goroutines reading same chunk
+ var wg sync.WaitGroup
+ for i := 0; i < 5; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ buffer := make([]byte, len(testData))
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ n, err := rc.ReadChunkAt(ctx, buffer, "done-signal-test", nil, false, 0, len(testData), true)
+ if err != nil && err != context.DeadlineExceeded {
+ t.Errorf("Unexpected error: %v", err)
+ }
+ if n == 0 && err == nil {
+ t.Error("Got 0 bytes with no error")
+ }
+ }()
+ }
+
+ // Should complete without hanging
+ done := make(chan struct{})
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ // Success
+ case <-time.After(10 * time.Second):
+ t.Fatal("Test timed out - done channel may not be signaled correctly")
+ }
+}
+
+// ============================================================================
+// Tests that exercise SingleChunkCacher concurrency logic
+// ============================================================================
+//
+// These tests use blocking lookupFileIdFn to exercise the wait/cancellation
+// logic in SingleChunkCacher without requiring HTTP calls.
+
+// TestSingleChunkCacherLookupError tests handling of lookup errors
+func TestSingleChunkCacherLookupError(t *testing.T) {
+ cache := newMockChunkCacheForReaderCache()
+
+ // Lookup function that returns an error
+ lookupFn := func(ctx context.Context, fileId string) ([]string, error) {
+ return nil, fmt.Errorf("lookup failed for %s", fileId)
+ }
+
+ rc := NewReaderCache(10, cache, lookupFn)
+ defer rc.destroy()
+
+ buffer := make([]byte, 100)
+ _, err := rc.ReadChunkAt(context.Background(), buffer, "error-test", nil, false, 0, 100, true)
+
+ if err == nil {
+ t.Error("Expected an error, got nil")
+ }
+}
+
+// TestSingleChunkCacherContextCancellationDuringLookup tests that a reader can
+// cancel its wait while the lookup is in progress. This exercises the actual
+// SingleChunkCacher wait/cancel logic.
+func TestSingleChunkCacherContextCancellationDuringLookup(t *testing.T) {
+ cache := newMockChunkCacheForReaderCache()
+ lookupStarted := make(chan struct{})
+ lookupCanFinish := make(chan struct{})
+
+ // Lookup function that blocks to simulate slow operation
+ lookupFn := func(ctx context.Context, fileId string) ([]string, error) {
+ close(lookupStarted)
+ <-lookupCanFinish // Block until test allows completion
+ return nil, fmt.Errorf("lookup completed but reader should have cancelled")
+ }
+
+ rc := NewReaderCache(10, cache, lookupFn)
+ defer rc.destroy()
+ defer close(lookupCanFinish) // Ensure cleanup
+
+ ctx, cancel := context.WithCancel(context.Background())
+ readResult := make(chan error, 1)
+
+ go func() {
+ buffer := make([]byte, 100)
+ _, err := rc.ReadChunkAt(ctx, buffer, "cancel-during-lookup", nil, false, 0, 100, true)
+ readResult <- err
+ }()
+
+ // Wait for lookup to start, then cancel the reader's context
+ select {
+ case <-lookupStarted:
+ cancel() // Cancel the reader while lookup is blocked
+ case <-time.After(5 * time.Second):
+ t.Fatal("Lookup never started")
+ }
+
+ // Read should return with context.Canceled
+ select {
+ case err := <-readResult:
+ if err != context.Canceled {
+ t.Errorf("Expected context.Canceled, got: %v", err)
+ }
+ case <-time.After(5 * time.Second):
+ t.Fatal("Read did not complete after context cancellation")
+ }
+}
+
+// TestSingleChunkCacherMultipleReadersWaitForDownload tests that multiple readers
+// can wait for the same SingleChunkCacher download to complete. When lookup fails,
+// all readers should receive the same error.
+func TestSingleChunkCacherMultipleReadersWaitForDownload(t *testing.T) {
+ cache := newMockChunkCacheForReaderCache()
+ lookupStarted := make(chan struct{})
+ lookupCanFinish := make(chan struct{})
+ var lookupStartedOnce sync.Once
+
+ // Lookup function that blocks to simulate slow operation
+ lookupFn := func(ctx context.Context, fileId string) ([]string, error) {
+ lookupStartedOnce.Do(func() { close(lookupStarted) })
+ <-lookupCanFinish
+ return nil, fmt.Errorf("simulated lookup error")
+ }
+
+ rc := NewReaderCache(10, cache, lookupFn)
+ defer rc.destroy()
+
+ numReaders := 5
+ var wg sync.WaitGroup
+ errors := make(chan error, numReaders)
+
+ // Start multiple readers for the same chunk
+ for i := 0; i < numReaders; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ buffer := make([]byte, 100)
+ _, err := rc.ReadChunkAt(context.Background(), buffer, "shared-chunk", nil, false, 0, 100, true)
+ errors <- err
+ }()
+ }
+
+ // Wait for lookup to start, then allow completion
+ select {
+ case <-lookupStarted:
+ close(lookupCanFinish)
+ case <-time.After(5 * time.Second):
+ close(lookupCanFinish)
+ t.Fatal("Lookup never started")
+ }
+
+ wg.Wait()
+ close(errors)
+
+ // All readers should receive an error
+ errorCount := 0
+ for err := range errors {
+ if err != nil {
+ errorCount++
+ }
+ }
+ if errorCount != numReaders {
+ t.Errorf("Expected %d errors, got %d", numReaders, errorCount)
+ }
+}
+
+// TestSingleChunkCacherOneReaderCancelsOthersContinue tests that when one reader
+// cancels, other readers waiting on the same chunk continue to wait.
+func TestSingleChunkCacherOneReaderCancelsOthersContinue(t *testing.T) {
+ cache := newMockChunkCacheForReaderCache()
+ lookupStarted := make(chan struct{})
+ lookupCanFinish := make(chan struct{})
+ var lookupStartedOnce sync.Once
+
+ lookupFn := func(ctx context.Context, fileId string) ([]string, error) {
+ lookupStartedOnce.Do(func() { close(lookupStarted) })
+ <-lookupCanFinish
+ return nil, fmt.Errorf("simulated error after delay")
+ }
+
+ rc := NewReaderCache(10, cache, lookupFn)
+ defer rc.destroy()
+
+ cancelledReaderDone := make(chan error, 1)
+ otherReaderDone := make(chan error, 1)
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ // Start reader that will be cancelled
+ go func() {
+ buffer := make([]byte, 100)
+ _, err := rc.ReadChunkAt(ctx, buffer, "shared-chunk-2", nil, false, 0, 100, true)
+ cancelledReaderDone <- err
+ }()
+
+ // Start reader that will NOT be cancelled
+ go func() {
+ buffer := make([]byte, 100)
+ _, err := rc.ReadChunkAt(context.Background(), buffer, "shared-chunk-2", nil, false, 0, 100, true)
+ otherReaderDone <- err
+ }()
+
+ // Wait for lookup to start
+ select {
+ case <-lookupStarted:
+ case <-time.After(5 * time.Second):
+ t.Fatal("Lookup never started")
+ }
+
+ // Cancel the first reader
+ cancel()
+
+ // First reader should complete with context.Canceled quickly
+ select {
+ case err := <-cancelledReaderDone:
+ if err != context.Canceled {
+ t.Errorf("Cancelled reader: expected context.Canceled, got: %v", err)
+ }
+ case <-time.After(2 * time.Second):
+ t.Error("Cancelled reader did not complete quickly")
+ }
+
+ // Allow the download to complete
+ close(lookupCanFinish)
+
+ // Other reader should eventually complete (with error since lookup returns error)
+ select {
+ case err := <-otherReaderDone:
+ if err == nil || err == context.Canceled {
+ t.Errorf("Other reader: expected non-nil non-cancelled error, got: %v", err)
+ }
+ // Expected: "simulated error after delay"
+ case <-time.After(5 * time.Second):
+ t.Error("Other reader did not complete")
+ }
+}
diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go
index 3a969fdc8..a374c8a2b 100644
--- a/weed/util/http/http_global_client_util.go
+++ b/weed/util/http/http_global_client_util.go
@@ -487,6 +487,12 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri
)
}
+ // For unencrypted, non-gzipped full chunks, use direct buffer read
+ // This avoids the 64KB intermediate buffer and callback overhead
+ if cipherKey == nil && !isGzipped && isFullChunk {
+ return retriedFetchChunkDataDirect(ctx, buffer, urlStrings, string(jwt))
+ }
+
var shouldRetry bool
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
@@ -551,3 +557,105 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri
return n, err
}
+
+// retriedFetchChunkDataDirect reads chunk data directly into the buffer without
+// intermediate buffering. This reduces memory copies and improves throughput
+// for large chunk reads.
+func retriedFetchChunkDataDirect(ctx context.Context, buffer []byte, urlStrings []string, jwt string) (n int, err error) {
+ var shouldRetry bool
+
+ for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ default:
+ }
+
+ for _, urlString := range urlStrings {
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ default:
+ }
+
+ n, shouldRetry, err = readUrlDirectToBuffer(ctx, urlString+"?readDeleted=true", jwt, buffer)
+ if err == nil {
+ return n, nil
+ }
+ if !shouldRetry {
+ break
+ }
+ glog.V(0).InfofCtx(ctx, "read %s failed, err: %v", urlString, err)
+ }
+
+ if err != nil && shouldRetry {
+ glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime)
+ timer := time.NewTimer(waitTime)
+ select {
+ case <-ctx.Done():
+ timer.Stop()
+ return 0, ctx.Err()
+ case <-timer.C:
+ }
+ } else {
+ break
+ }
+ }
+
+ return n, err
+}
+
+// readUrlDirectToBuffer reads HTTP response directly into the provided buffer,
+// avoiding intermediate buffer allocations and copies.
+func readUrlDirectToBuffer(ctx context.Context, fileUrl, jwt string, buffer []byte) (n int, retryable bool, err error) {
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, fileUrl, nil)
+ if err != nil {
+ return 0, false, err
+ }
+ maybeAddAuth(req, jwt)
+ request_id.InjectToRequest(ctx, req)
+
+ r, err := GetGlobalHttpClient().Do(req)
+ if err != nil {
+ return 0, true, err
+ }
+ defer CloseResponse(r)
+
+ if r.StatusCode >= 400 {
+ if r.StatusCode == http.StatusNotFound {
+ return 0, true, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrNotFound)
+ }
+ if r.StatusCode == http.StatusTooManyRequests {
+ return 0, false, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrTooManyRequests)
+ }
+ retryable = r.StatusCode >= 499
+ return 0, retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
+ }
+
+ // Read directly into the buffer without intermediate copying
+ // This is significantly faster for large chunks (16MB+)
+ var totalRead int
+ for totalRead < len(buffer) {
+ select {
+ case <-ctx.Done():
+ return totalRead, false, ctx.Err()
+ default:
+ }
+
+ m, readErr := r.Body.Read(buffer[totalRead:])
+ totalRead += m
+ if readErr != nil {
+ if readErr == io.EOF {
+ // Return io.ErrUnexpectedEOF if we haven't filled the buffer
+ // This prevents silent data corruption from truncated responses
+ if totalRead < len(buffer) {
+ return totalRead, true, io.ErrUnexpectedEOF
+ }
+ return totalRead, false, nil
+ }
+ return totalRead, true, readErr
+ }
+ }
+
+ return totalRead, false, nil
+}