aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/reader_cache.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/reader_cache.go')
-rw-r--r--weed/filer/reader_cache.go86
1 files changed, 63 insertions, 23 deletions
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index 605be5e73..66cbac1e3 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -35,6 +35,7 @@ type SingleChunkCacher struct {
shouldCache bool
wg sync.WaitGroup
cacheStartedCh chan struct{}
+ done chan struct{} // signals when download is complete
}
func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
@@ -93,14 +94,18 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView], count int) {
return
}
-func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
+func (rc *ReaderCache) ReadChunkAt(ctx context.Context, buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
rc.Lock()
if cacher, found := rc.downloaders[fileId]; found {
- if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
- rc.Unlock()
+ rc.Unlock()
+ n, err := cacher.readChunkAt(ctx, buffer, offset)
+ if n > 0 || err != nil {
return n, err
}
+ // If n=0 and err=nil, the cacher couldn't provide data for this offset.
+ // Fall through to try chunkCache.
+ rc.Lock()
}
if shouldCache || rc.lookupFileIdFn == nil {
n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
@@ -134,7 +139,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
rc.downloaders[fileId] = cacher
rc.Unlock()
- return cacher.readChunkAt(buffer, offset)
+ return cacher.readChunkAt(ctx, buffer, offset)
}
func (rc *ReaderCache) UnCache(fileId string) {
@@ -166,38 +171,53 @@ func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte,
chunkSize: chunkSize,
shouldCache: shouldCache,
cacheStartedCh: make(chan struct{}),
+ done: make(chan struct{}),
}
}
+// startCaching downloads the chunk data in the background.
+// It does NOT hold the lock during the HTTP download to allow concurrent readers
+// to wait efficiently using the done channel.
func (s *SingleChunkCacher) startCaching() {
s.wg.Add(1)
defer s.wg.Done()
- s.Lock()
- defer s.Unlock()
+ defer close(s.done) // guarantee completion signal even on panic
- s.cacheStartedCh <- struct{}{} // means this has been started
+ s.cacheStartedCh <- struct{}{} // signal that we've started
+ // Note: We intentionally use context.Background() here, NOT a request-specific context.
+ // The downloaded chunk is a shared resource - multiple concurrent readers may be waiting
+ // for this same download to complete. If we used a request context and that request was
+ // cancelled, it would abort the download and cause errors for all other waiting readers.
+ // The download should always complete once started to serve all potential consumers.
+
+ // Lookup file ID without holding the lock
urlStrings, err := s.parent.lookupFileIdFn(context.Background(), s.chunkFileId)
if err != nil {
+ s.Lock()
s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
+ s.Unlock()
return
}
- s.data = mem.Allocate(s.chunkSize)
-
- _, s.err = util_http.RetriedFetchChunkData(context.Background(), s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId)
- if s.err != nil {
- mem.Free(s.data)
- s.data = nil
- return
- }
+ // Allocate buffer and download without holding the lock
+ // This allows multiple downloads to proceed in parallel
+ data := mem.Allocate(s.chunkSize)
+ _, fetchErr := util_http.RetriedFetchChunkData(context.Background(), data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId)
- if s.shouldCache {
- s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
+ // Now acquire lock to update state
+ s.Lock()
+ if fetchErr != nil {
+ mem.Free(data)
+ s.err = fetchErr
+ } else {
+ s.data = data
+ if s.shouldCache {
+ s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
+ }
+ atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
}
- atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
-
- return
+ s.Unlock()
}
func (s *SingleChunkCacher) destroy() {
@@ -209,13 +229,34 @@ func (s *SingleChunkCacher) destroy() {
if s.data != nil {
mem.Free(s.data)
s.data = nil
- close(s.cacheStartedCh)
}
}
-func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
+// readChunkAt reads data from the cached chunk.
+// It waits for the download to complete if it's still in progress.
+// The ctx parameter allows the reader to cancel its wait (but the download continues
+// for other readers - see comment in startCaching about shared resource semantics).
+func (s *SingleChunkCacher) readChunkAt(ctx context.Context, buf []byte, offset int64) (int, error) {
s.wg.Add(1)
defer s.wg.Done()
+
+ // Wait for download to complete, but allow reader cancellation.
+ // Prioritize checking done first - if data is already available,
+ // return it even if context is also cancelled.
+ select {
+ case <-s.done:
+ // Download already completed, proceed immediately
+ default:
+ // Download not complete, wait for it or context cancellation
+ select {
+ case <-s.done:
+ // Download completed
+ case <-ctx.Done():
+ // Reader cancelled while waiting - download continues for other readers
+ return 0, ctx.Err()
+ }
+ }
+
s.Lock()
defer s.Unlock()
@@ -228,5 +269,4 @@ func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
}
return copy(buf, s.data[offset:]), nil
-
}