aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-02-26 23:20:45 -0800
committerchrislu <chris.lu@gmail.com>2022-02-26 23:20:45 -0800
commit551d00d51a14438abc3e182166a4850ec1104a44 (patch)
treea07d5996ea447aeecfe277e90c9b3e5ff128c5b5
parent7b1a713d2a6913f7f2ecb2211f50afd7468ef224 (diff)
downloadseaweedfs-551d00d51a14438abc3e182166a4850ec1104a44.tar.xz
seaweedfs-551d00d51a14438abc3e182166a4850ec1104a44.zip
prefetch other chunks when stream reading
-rw-r--r--weed/filer/reader_at.go13
-rw-r--r--weed/filer/reader_cache.go56
2 files changed, 37 insertions, 32 deletions
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index a38a0bfd5..8ee627a21 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -21,6 +21,7 @@ type ChunkReadAt struct {
fileSize int64
readerCache *ReaderCache
readerPattern *ReaderPattern
+ lastChunkFid string
}
var _ = io.ReaderAt(&ChunkReadAt{})
@@ -85,7 +86,7 @@ func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chun
return &ChunkReadAt{
chunkViews: chunkViews,
fileSize: fileSize,
- readerCache: newReaderCache(5, chunkCache, lookupFn),
+ readerCache: newReaderCache(32, chunkCache, lookupFn),
readerPattern: NewReaderPattern(),
}
}
@@ -167,12 +168,12 @@ func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, next
}
n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0)
- for i, nextChunk := range nextChunkViews {
- if i < 2 {
- c.readerCache.MaybeCache(nextChunk.FileId, nextChunk.CipherKey, nextChunk.IsGzipped, int(nextChunk.ChunkSize))
- } else {
- break
+ if c.lastChunkFid != "" && c.lastChunkFid != chunkView.FileId {
+ if chunkView.Offset == 0 { // start of a new chunk
+ c.readerCache.UnCache(c.lastChunkFid)
+ c.readerCache.MaybeCache(nextChunkViews)
}
}
+ c.lastChunkFid = chunkView.FileId
return
}
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index 1a0dc6a31..4f2c52303 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -40,41 +40,33 @@ func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn
}
}
-func (rc *ReaderCache) MaybeCache(fileId string, cipherKey []byte, isGzipped bool, chunkSize int) {
- rc.Lock()
- defer rc.Unlock()
- if _, found := rc.downloaders[fileId]; found {
- return
- }
+func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
if rc.lookupFileIdFn == nil {
return
}
- // if too many, delete one of them?
- if len(rc.downloaders) >= rc.limit {
- oldestFid, oldestTime := "", time.Now()
- for fid, downloader := range rc.downloaders {
- if !downloader.completedTime.IsZero() {
- if downloader.completedTime.Before(oldestTime) {
- oldestFid, oldestTime = fid, downloader.completedTime
- }
- }
+ rc.Lock()
+ defer rc.Unlock()
+
+ for _, chunkView := range chunkViews {
+ if _, found := rc.downloaders[chunkView.FileId]; found {
+ continue
}
- if oldestFid != "" {
- oldDownloader := rc.downloaders[oldestFid]
- delete(rc.downloaders, oldestFid)
- oldDownloader.destroy()
- } else {
+
+ if len(rc.downloaders) >= rc.limit {
// if still no slots, return
return
}
- }
- cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, false)
- cacher.wg.Add(1)
- go cacher.startCaching()
- cacher.wg.Wait()
- rc.downloaders[fileId] = cacher
+ // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset)
+ // cache this chunk if not yet
+ cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
+ cacher.wg.Add(1)
+ go cacher.startCaching()
+ cacher.wg.Wait()
+ rc.downloaders[chunkView.FileId] = cacher
+
+ }
return
}
@@ -108,6 +100,8 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
}
}
+ // glog.V(4).Infof("cache1 %s", fileId)
+
cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
cacher.wg.Add(1)
go cacher.startCaching()
@@ -117,6 +111,16 @@ func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byt
return cacher.readChunkAt(buffer, offset)
}
+func (rc *ReaderCache) UnCache(fileId string) {
+ rc.Lock()
+ defer rc.Unlock()
+ // glog.V(4).Infof("uncache %s", fileId)
+ if downloader, found := rc.downloaders[fileId]; found {
+ downloader.destroy()
+ delete(rc.downloaders, fileId)
+ }
+}
+
func (rc *ReaderCache) destroy() {
rc.Lock()
defer rc.Unlock()