diff options
Diffstat (limited to 'weed/filer/reader_cache.go')
| -rw-r--r-- | weed/filer/reader_cache.go | 86 |
1 files changed, 63 insertions, 23 deletions
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index 605be5e73..66cbac1e3 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -35,6 +35,7 @@ type SingleChunkCacher struct { shouldCache bool wg sync.WaitGroup cacheStartedCh chan struct{} + done chan struct{} // signals when download is complete } func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache { @@ -93,14 +94,18 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView], count int) { return } -func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) { +func (rc *ReaderCache) ReadChunkAt(ctx context.Context, buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) { rc.Lock() if cacher, found := rc.downloaders[fileId]; found { - if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil { - rc.Unlock() + rc.Unlock() + n, err := cacher.readChunkAt(ctx, buffer, offset) + if n > 0 || err != nil { return n, err } + // If n=0 and err=nil, the cacher couldn't provide data for this offset. + // Fall through to try chunkCache. + rc.Lock() } if shouldCache || rc.lookupFileIdFn == nil { n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset)) @@ -134,7 +139,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt rc.downloaders[fileId] = cacher rc.Unlock() - return cacher.readChunkAt(buffer, offset) + return cacher.readChunkAt(ctx, buffer, offset) } func (rc *ReaderCache) UnCache(fileId string) { @@ -166,38 +171,53 @@ func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, chunkSize: chunkSize, shouldCache: shouldCache, cacheStartedCh: make(chan struct{}), + done: make(chan struct{}), } } +// startCaching downloads the chunk data in the background. +// It does NOT hold the lock during the HTTP download to allow concurrent readers +// to wait efficiently using the done channel. func (s *SingleChunkCacher) startCaching() { s.wg.Add(1) defer s.wg.Done() - s.Lock() - defer s.Unlock() + defer close(s.done) // guarantee completion signal even on panic - s.cacheStartedCh <- struct{}{} // means this has been started + s.cacheStartedCh <- struct{}{} // signal that we've started + // Note: We intentionally use context.Background() here, NOT a request-specific context. + // The downloaded chunk is a shared resource - multiple concurrent readers may be waiting + // for this same download to complete. If we used a request context and that request was + // cancelled, it would abort the download and cause errors for all other waiting readers. + // The download should always complete once started to serve all potential consumers. + + // Lookup file ID without holding the lock urlStrings, err := s.parent.lookupFileIdFn(context.Background(), s.chunkFileId) if err != nil { + s.Lock() s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err) + s.Unlock() return } - s.data = mem.Allocate(s.chunkSize) - - _, s.err = util_http.RetriedFetchChunkData(context.Background(), s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId) - if s.err != nil { - mem.Free(s.data) - s.data = nil - return - } + // Allocate buffer and download without holding the lock + // This allows multiple downloads to proceed in parallel + data := mem.Allocate(s.chunkSize) + _, fetchErr := util_http.RetriedFetchChunkData(context.Background(), data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId) - if s.shouldCache { - s.parent.chunkCache.SetChunk(s.chunkFileId, s.data) + // Now acquire lock to update state + s.Lock() + if fetchErr != nil { + mem.Free(data) + s.err = fetchErr + } else { + s.data = data + if s.shouldCache { + s.parent.chunkCache.SetChunk(s.chunkFileId, s.data) + } + atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano()) } - atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano()) - - return + s.Unlock() } func (s *SingleChunkCacher) destroy() { @@ -209,13 +229,34 @@ func (s *SingleChunkCacher) destroy() { if s.data != nil { mem.Free(s.data) s.data = nil - close(s.cacheStartedCh) } } -func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { +// readChunkAt reads data from the cached chunk. +// It waits for the download to complete if it's still in progress. +// The ctx parameter allows the reader to cancel its wait (but the download continues +// for other readers - see comment in startCaching about shared resource semantics). +func (s *SingleChunkCacher) readChunkAt(ctx context.Context, buf []byte, offset int64) (int, error) { s.wg.Add(1) defer s.wg.Done() + + // Wait for download to complete, but allow reader cancellation. + // Prioritize checking done first - if data is already available, + // return it even if context is also cancelled. + select { + case <-s.done: + // Download already completed, proceed immediately + default: + // Download not complete, wait for it or context cancellation + select { + case <-s.done: + // Download completed + case <-ctx.Done(): + // Reader cancelled while waiting - download continues for other readers + return 0, ctx.Err() + } + } + s.Lock() defer s.Unlock() @@ -228,5 +269,4 @@ func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { } return copy(buf, s.data[offset:]), nil - } |
