aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/reader_cache.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/reader_cache.go')
-rw-r--r--weed/filer/reader_cache.go56
1 files changed, 30 insertions, 26 deletions
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index 1a0dc6a31..4f2c52303 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -40,41 +40,33 @@ func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn
}
}
-func (rc *ReaderCache) MaybeCache(fileId string, cipherKey []byte, isGzipped bool, chunkSize int) {
- rc.Lock()
- defer rc.Unlock()
- if _, found := rc.downloaders[fileId]; found {
- return
- }
+func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
if rc.lookupFileIdFn == nil {
return
}
- // if too many, delete one of them?
- if len(rc.downloaders) >= rc.limit {
- oldestFid, oldestTime := "", time.Now()
- for fid, downloader := range rc.downloaders {
- if !downloader.completedTime.IsZero() {
- if downloader.completedTime.Before(oldestTime) {
- oldestFid, oldestTime = fid, downloader.completedTime
- }
- }
+ rc.Lock()
+ defer rc.Unlock()
+
+ for _, chunkView := range chunkViews {
+ if _, found := rc.downloaders[chunkView.FileId]; found {
+ continue
}
- if oldestFid != "" {
- oldDownloader := rc.downloaders[oldestFid]
- delete(rc.downloaders, oldestFid)
- oldDownloader.destroy()
- } else {
+
+ if len(rc.downloaders) >= rc.limit {
// if still no slots, return
return
}
- }
- cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, false)
- cacher.wg.Add(1)
- go cacher.startCaching()
- cacher.wg.Wait()
- rc.downloaders[fileId] = cacher
+ // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset)
+ // cache this chunk if not yet
+ cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
+ cacher.wg.Add(1)
+ go cacher.startCaching()
+ cacher.wg.Wait()
+ rc.downloaders[chunkView.FileId] = cacher
+
+ }
return
}
@@ -108,6 +100,8 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
}
}
+ // glog.V(4).Infof("cache1 %s", fileId)
+
cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
cacher.wg.Add(1)
go cacher.startCaching()
@@ -117,6 +111,16 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
return cacher.readChunkAt(buffer, offset)
}
+func (rc *ReaderCache) UnCache(fileId string) {
+ rc.Lock()
+ defer rc.Unlock()
+ // glog.V(4).Infof("uncache %s", fileId)
+ if downloader, found := rc.downloaders[fileId]; found {
+ downloader.destroy()
+ delete(rc.downloaders, fileId)
+ }
+}
+
func (rc *ReaderCache) destroy() {
rc.Lock()
defer rc.Unlock()