aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/reader_at.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/reader_at.go')
-rw-r--r--weed/filer/reader_at.go43
1 files changed, 36 insertions, 7 deletions
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 458cf88be..5f58b870c 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -26,6 +26,7 @@ type ChunkReadAt struct {
chunkCache chunk_cache.ChunkCache
lastChunkFileId string
lastChunkData []byte
+ readerPattern *ReaderPattern
}
var _ = io.ReaderAt(&ChunkReadAt{})
@@ -88,10 +89,11 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
return &ChunkReadAt{
- chunkViews: chunkViews,
- lookupFileId: lookupFn,
- chunkCache: chunkCache,
- fileSize: fileSize,
+ chunkViews: chunkViews,
+ lookupFileId: lookupFn,
+ chunkCache: chunkCache,
+ fileSize: fileSize,
+ readerPattern: NewReaderPattern(),
}
}
@@ -106,6 +108,8 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
c.readerLock.Lock()
defer c.readerLock.Unlock()
+ c.readerPattern.MonitorReadAt(offset, len(p))
+
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
return c.doReadAt(p, offset)
}
@@ -171,7 +175,14 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *ChunkView, offset, length uint64) ([]byte, error) {
- chunkSlice := c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length)
+ if c.readerPattern.IsRandomMode() {
+ return c.doFetchRangeChunkData(chunkView, offset, length)
+ }
+
+ var chunkSlice []byte
+ if chunkView.LogicOffset == 0 {
+ chunkSlice = c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length)
+ }
if len(chunkSlice) > 0 {
return chunkSlice, nil
}
@@ -217,7 +228,10 @@ func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, erro
glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
- data := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
+ var data []byte
+ if chunkView.LogicOffset == 0 {
+ data = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
+ }
if data != nil {
glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data)))
} else {
@@ -226,7 +240,10 @@ func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, erro
if err != nil {
return data, err
}
- c.chunkCache.SetChunk(chunkView.FileId, data)
+ if chunkView.LogicOffset == 0 {
+ // only cache the first chunk
+ c.chunkCache.SetChunk(chunkView.FileId, data)
+ }
}
return data, err
})
@@ -243,3 +260,15 @@ func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error)
return data, err
}
+
+func (c *ChunkReadAt) doFetchRangeChunkData(chunkView *ChunkView, offset, length uint64) ([]byte, error) {
+
+ glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId)
+
+ data, err := fetchChunkRange(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(length))
+
+ glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId)
+
+ return data, err
+
+}