diff options
Diffstat (limited to 'weed/filer/reader_cache.go')
| -rw-r--r-- | weed/filer/reader_cache.go | 192 |
1 files changed, 192 insertions, 0 deletions
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go new file mode 100644 index 000000000..bce97cc49 --- /dev/null +++ b/weed/filer/reader_cache.go @@ -0,0 +1,192 @@ +package filer + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" + "github.com/chrislusf/seaweedfs/weed/util/mem" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "sync" + "time" +) + +type ReaderCache struct { + chunkCache chunk_cache.ChunkCache + lookupFileIdFn wdclient.LookupFileIdFunctionType + sync.Mutex + downloaders map[string]*SingleChunkCacher + limit int +} + +type SingleChunkCacher struct { + sync.RWMutex + parent *ReaderCache + chunkFileId string + data []byte + err error + cipherKey []byte + isGzipped bool + chunkSize int + shouldCache bool + wg sync.WaitGroup + completedTime time.Time +} + +func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache { + return &ReaderCache{ + limit: limit, + chunkCache: chunkCache, + lookupFileIdFn: lookupFileIdFn, + downloaders: make(map[string]*SingleChunkCacher), + } +} + +func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) { + if rc.lookupFileIdFn == nil { + return + } + + rc.Lock() + defer rc.Unlock() + + for _, chunkView := range chunkViews { + if _, found := rc.downloaders[chunkView.FileId]; found { + continue + } + + if len(rc.downloaders) >= rc.limit { + // if still no slots, return + return + } + + // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset) + // cache this chunk if not yet + cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false) + cacher.wg.Add(1) + go cacher.startCaching() + cacher.wg.Wait() + rc.downloaders[chunkView.FileId] = cacher + + } + + return +} + +func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) { + rc.Lock() + defer rc.Unlock() + if cacher, found := rc.downloaders[fileId]; found { + return cacher.readChunkAt(buffer, offset) + } + if shouldCache || rc.lookupFileIdFn == nil { + n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset)) + if n > 0 { + return n, err + } + } + + if len(rc.downloaders) >= rc.limit { + oldestFid, oldestTime := "", time.Now() + for fid, downloader := range rc.downloaders { + if !downloader.completedTime.IsZero() { + if downloader.completedTime.Before(oldestTime) { + oldestFid, oldestTime = fid, downloader.completedTime + } + } + } + if oldestFid != "" { + oldDownloader := rc.downloaders[oldestFid] + delete(rc.downloaders, oldestFid) + oldDownloader.destroy() + } + } + + // glog.V(4).Infof("cache1 %s", fileId) + + cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache) + cacher.wg.Add(1) + go cacher.startCaching() + cacher.wg.Wait() + rc.downloaders[fileId] = cacher + + return cacher.readChunkAt(buffer, offset) +} + +func (rc *ReaderCache) UnCache(fileId string) { + rc.Lock() + defer rc.Unlock() + // glog.V(4).Infof("uncache %s", fileId) + if downloader, found := rc.downloaders[fileId]; found { + downloader.destroy() + delete(rc.downloaders, fileId) + } +} + +func (rc *ReaderCache) destroy() { + rc.Lock() + defer rc.Unlock() + + for _, downloader := range rc.downloaders { + downloader.destroy() + } + +} + +func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher { + t := &SingleChunkCacher{ + parent: parent, + chunkFileId: fileId, + cipherKey: cipherKey, + isGzipped: isGzipped, + chunkSize: chunkSize, + shouldCache: shouldCache, + } + return t +} + +func (s *SingleChunkCacher) startCaching() { + s.Lock() + defer s.Unlock() + + s.wg.Done() // means this has been started + + urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId) + if err != nil { + s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err) + return + } + + s.data = mem.Allocate(s.chunkSize) + + _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0) + if s.err != nil { + mem.Free(s.data) + s.data = nil + return + } + + s.completedTime = time.Now() + if s.shouldCache { + s.parent.chunkCache.SetChunk(s.chunkFileId, s.data) + } + + return +} + +func (s *SingleChunkCacher) destroy() { + if s.data != nil { + mem.Free(s.data) + s.data = nil + } +} + +func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { + s.RLock() + defer s.RUnlock() + + if s.err != nil { + return 0, s.err + } + + return copy(buf, s.data[offset:]), nil + +} |
