diff options
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/filer/reader_at.go | 76 |
1 files changed, 55 insertions, 21 deletions
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 5ffc3a024..1c11f718a 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "github.com/golang/groupcache/singleflight" "io" "math/rand" "sync" @@ -18,12 +19,12 @@ type ChunkReadAt struct { chunkViews []*ChunkView lookupFileId func(fileId string) (targetUrl string, err error) readerLock sync.Mutex - fetcherLock sync.Mutex fileSize int64 - lastChunkFileId string - lastChunkData []byte - chunkCache chunk_cache.ChunkCache + fetchGroup singleflight.Group + lastChunkFileId string + lastChunkData []byte + chunkCache chunk_cache.ChunkCache } // var _ = io.ReaderAt(&ChunkReadAt{}) @@ -88,10 +89,16 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { var buffer []byte startOffset, remaining := offset, int64(len(p)) + var nextChunk *ChunkView for i, chunk := range c.chunkViews { if remaining <= 0 { break } + if i+1 < len(c.chunkViews) { + nextChunk = c.chunkViews[i+1] + } else { + nextChunk = nil + } if startOffset < chunk.LogicOffset { gap := int(chunk.LogicOffset - startOffset) glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap)) @@ -107,7 +114,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { continue } glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) - buffer, err = c.readFromWholeChunkData(chunk) + buffer, err = c.readFromWholeChunkData(chunk, nextChunk) if err != nil { glog.Errorf("fetching chunk %+v: %v\n", chunk, err) return @@ -135,36 +142,63 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { } -func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView) (chunkData []byte, err error) { - - c.fetcherLock.Lock() - defer c.fetcherLock.Unlock() +func (c *ChunkReadAt) readFromWholeChunkData(chunkView, nextChunkView *ChunkView) (chunkData []byte, err error) { if c.lastChunkFileId == chunkView.FileId { return c.lastChunkData, nil } - glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) + v, doErr := c.readOneWholeChunk(chunkView) - chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) - if chunkData != nil { - glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData))) - } else { - chunkData, err = c.doFetchFullChunkData(chunkView) - if err != nil { - return - } - c.chunkCache.SetChunk(chunkView.FileId, chunkData) - c.lastChunkData = chunkData - c.lastChunkFileId = chunkView.FileId + if doErr != nil { + return } + chunkData = v.([]byte) + + c.lastChunkData = chunkData + c.lastChunkFileId = chunkView.FileId + + go func() { + if c.chunkCache != nil && nextChunkView != nil { + c.readOneWholeChunk(nextChunkView) + } + }() + return } +func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, error) { + + var err error + + return c.fetchGroup.Do(chunkView.FileId, func() (interface{}, error) { + + glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) + + data := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) + if data != nil { + glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data))) + } else { + var err error + data, err = c.doFetchFullChunkData(chunkView) + if err != nil { + return data, err + } + c.chunkCache.SetChunk(chunkView.FileId, data) + } + return data, err + }) +} + func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) { + + glog.V(2).Infof("+ doFetchFullChunkData %s", chunkView.FileId) + data, err := fetchChunk(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) + glog.V(2).Infof("- doFetchFullChunkData %s", chunkView.FileId) + return data, err } |
