diff options
Diffstat (limited to 'weed/filer/reader_at.go')
| -rw-r--r-- | weed/filer/reader_at.go | 145 |
1 files changed, 27 insertions, 118 deletions
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index b1c15152f..7d9997761 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -12,21 +12,16 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/chrislusf/seaweedfs/weed/wdclient" - "github.com/golang/groupcache/singleflight" ) type ChunkReadAt struct { - masterClient *wdclient.MasterClient - chunkViews []*ChunkView - lookupFileId wdclient.LookupFileIdFunctionType - readerLock sync.Mutex - fileSize int64 - - fetchGroup singleflight.Group - chunkCache chunk_cache.ChunkCache - lastChunkFileId string - lastChunkData []byte - readerPattern *ReaderPattern + masterClient *wdclient.MasterClient + chunkViews []*ChunkView + readerLock sync.Mutex + fileSize int64 + readerCache *ReaderCache + readerPattern *ReaderPattern + lastChunkFid string } var _ = io.ReaderAt(&ChunkReadAt{}) @@ -90,16 +85,14 @@ func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chun return &ChunkReadAt{ chunkViews: chunkViews, - lookupFileId: lookupFn, - chunkCache: chunkCache, fileSize: fileSize, + readerCache: newReaderCache(32, chunkCache, lookupFn), readerPattern: NewReaderPattern(), } } func (c *ChunkReadAt) Close() error { - c.lastChunkData = nil - c.lastChunkFileId = "" + c.readerCache.destroy() return nil } @@ -117,15 +110,13 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { startOffset, remaining := offset, int64(len(p)) - var nextChunk *ChunkView + var nextChunks []*ChunkView for i, chunk := range c.chunkViews { if remaining <= 0 { break } if i+1 < len(c.chunkViews) { - nextChunk = c.chunkViews[i+1] - } else { - nextChunk = nil + nextChunks = c.chunkViews[i+1:] } if startOffset < chunk.LogicOffset { gap := int(chunk.LogicOffset - startOffset) @@ -142,16 +133,13 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { continue } // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) - var buffer []byte bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset - bufferLength := chunkStop - chunkStart - buffer, err = c.readChunkSlice(chunk, nextChunk, uint64(bufferOffset), uint64(bufferLength)) + copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset)) if err != nil { glog.Errorf("fetching chunk %+v: %v\n", chunk, err) - return + return copied, err } - copied := copy(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], buffer) n += copied startOffset, remaining = startOffset+int64(copied), remaining-int64(copied) } @@ -173,104 +161,25 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { } -func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *ChunkView, offset, length uint64) ([]byte, error) { +func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews []*ChunkView, offset uint64) (n int, err error) { - var chunkSlice []byte - if chunkView.LogicOffset == 0 { - chunkSlice = c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length) - } - if len(chunkSlice) > 0 { - return chunkSlice, nil - } - if c.lookupFileId == nil { - return nil, nil - } if c.readerPattern.IsRandomMode() { - return c.doFetchRangeChunkData(chunkView, offset, length) - } - chunkData, err := c.readFromWholeChunkData(chunkView, nextChunkViews) - if err != nil { - return nil, err - } - wanted := min(int64(length), int64(len(chunkData))-int64(offset)) - return chunkData[offset : int64(offset)+wanted], nil -} - -func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView, nextChunkViews ...*ChunkView) (chunkData []byte, err error) { - - if c.lastChunkFileId == chunkView.FileId { - return c.lastChunkData, nil - } - - v, doErr := c.readOneWholeChunk(chunkView) - - if doErr != nil { - return nil, doErr + return fetchChunkRange(buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset)) } - chunkData = v.([]byte) - - c.lastChunkData = chunkData - c.lastChunkFileId = chunkView.FileId - - for _, nextChunkView := range nextChunkViews { - if c.chunkCache != nil && nextChunkView != nil { - go c.readOneWholeChunk(nextChunkView) + n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0) + if c.lastChunkFid != chunkView.FileId { + if chunkView.Offset == 0 { // start of a new chunk + if c.lastChunkFid != "" { + c.readerCache.UnCache(c.lastChunkFid) + c.readerCache.MaybeCache(nextChunkViews) + } else { + if len(nextChunkViews) >= 1 { + c.readerCache.MaybeCache(nextChunkViews[:1]) // just read the next chunk if at the very beginning + } + } } } - + c.lastChunkFid = chunkView.FileId return } - -func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, error) { - - var err error - - return c.fetchGroup.Do(chunkView.FileId, func() (interface{}, error) { - - glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) - - var data []byte - if chunkView.LogicOffset == 0 { - data = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) - } - if data != nil { - glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data))) - } else { - var err error - data, err = c.doFetchFullChunkData(chunkView) - if err != nil { - return data, err - } - if chunkView.LogicOffset == 0 { - // only cache the first chunk - c.chunkCache.SetChunk(chunkView.FileId, data) - } - } - return data, err - }) -} - -func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) { - - glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId) - - data, err := fetchChunk(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) - - glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId) - - return data, err - -} - -func (c *ChunkReadAt) doFetchRangeChunkData(chunkView *ChunkView, offset, length uint64) ([]byte, error) { - - glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId) - - data, err := fetchChunkRange(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(length)) - - glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId) - - return data, err - -} |
