diff options
Diffstat (limited to 'weed/filer/reader_cache.go')
| -rw-r--r-- | weed/filer/reader_cache.go | 45 |
1 files changed, 26 insertions, 19 deletions
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index eb2308758..89db04eb0 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -3,6 +3,7 @@ package filer import ( "fmt" "sync" + "sync/atomic" "time" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" @@ -20,17 +21,17 @@ type ReaderCache struct { type SingleChunkCacher struct { sync.Mutex - parent *ReaderCache - chunkFileId string - data []byte - err error - cipherKey []byte - isGzipped bool - chunkSize int - shouldCache bool - wg sync.WaitGroup - cacheStartedCh chan struct{} - completedTime time.Time + parent *ReaderCache + chunkFileId string + data []byte + err error + cipherKey []byte + isGzipped bool + chunkSize int + shouldCache bool + wg sync.WaitGroup + cacheStartedCh chan struct{} + completedTimeNew int64 } func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache { @@ -50,13 +51,17 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) { rc.Lock() defer rc.Unlock() + if len(rc.downloaders) >= rc.limit { + return + } + for _, chunkView := range chunkViews { if _, found := rc.downloaders[chunkView.FileId]; found { continue } if len(rc.downloaders) >= rc.limit { - // if still no slots, return + // abort when slots are filled return } @@ -74,27 +79,28 @@ func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) { func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) { rc.Lock() - defer rc.Unlock() + if cacher, found := rc.downloaders[fileId]; found { if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil { + rc.Unlock() return n, err } } if shouldCache || rc.lookupFileIdFn == nil { n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset)) if n > 0 { + rc.Unlock() return n, err } } // clean up old downloaders if len(rc.downloaders) >= rc.limit { - oldestFid, oldestTime := "", time.Now() + oldestFid, oldestTime := "", time.Now().Unix() for fid, downloader := range rc.downloaders { - if !downloader.completedTime.IsZero() { - if downloader.completedTime.Before(oldestTime) { - oldestFid, oldestTime = fid, downloader.completedTime - } + completedTime := atomic.LoadInt64(&downloader.completedTimeNew) + if completedTime > 0 && completedTime < oldestTime { + oldestFid, oldestTime = fid, completedTime } } if oldestFid != "" { @@ -110,6 +116,7 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt go cacher.startCaching() <-cacher.cacheStartedCh rc.downloaders[fileId] = cacher + rc.Unlock() return cacher.readChunkAt(buffer, offset) } @@ -172,7 +179,7 @@ func (s *SingleChunkCacher) startCaching() { if s.shouldCache { s.parent.chunkCache.SetChunk(s.chunkFileId, s.data) } - s.completedTime = time.Now() + atomic.StoreInt64(&s.completedTimeNew, time.Now().Unix()) return } |
