diff options
Diffstat (limited to 'weed/filer/reader_at.go')
| -rw-r--r-- | weed/filer/reader_at.go | 65 |
1 files changed, 38 insertions, 27 deletions
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 9d1fab20a..27e8f79a6 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -16,8 +16,7 @@ import ( type ChunkReadAt struct { masterClient *wdclient.MasterClient - chunkViews []*ChunkView - readerLock sync.Mutex + chunkViews *IntervalList[*ChunkView] fileSize int64 readerCache *ReaderCache readerPattern *ReaderPattern @@ -89,7 +88,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp } } -func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { +func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews *IntervalList[*ChunkView], chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { return &ChunkReadAt{ chunkViews: chunkViews, @@ -108,44 +107,58 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { c.readerPattern.MonitorReadAt(offset, len(p)) - c.readerLock.Lock() - defer c.readerLock.Unlock() + c.chunkViews.Lock.Lock() + defer c.chunkViews.Lock.Unlock() + + // glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) + n, _, err = c.doReadAt(p, offset) + return +} + +func (c *ChunkReadAt) ReadAtWithTime(p []byte, offset int64) (n int, ts int64, err error) { + + c.readerPattern.MonitorReadAt(offset, len(p)) + + c.chunkViews.Lock.Lock() + defer c.chunkViews.Lock.Unlock() // glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) return c.doReadAt(p, offset) } -func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { +func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, ts int64, err error) { startOffset, remaining := offset, int64(len(p)) - var nextChunks []*ChunkView - for i, chunk := range c.chunkViews { + var nextChunks *Interval[*ChunkView] + for x := c.chunkViews.Front(); x != nil; x = x.Next { + chunk := x.Value if remaining <= 0 { break } - if i+1 < len(c.chunkViews) { - nextChunks = c.chunkViews[i+1:] + if x.Next != nil { + nextChunks = x.Next } - if startOffset < chunk.LogicOffset { - gap := chunk.LogicOffset - startOffset - glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.LogicOffset) + if startOffset < chunk.ViewOffset { + gap := chunk.ViewOffset - startOffset + glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.ViewOffset) n += zero(p, startOffset-offset, gap) - startOffset, remaining = chunk.LogicOffset, remaining-gap + startOffset, remaining = chunk.ViewOffset, remaining-gap if remaining <= 0 { break } } - // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size)) - chunkStart, chunkStop := max(chunk.LogicOffset, startOffset), min(chunk.LogicOffset+int64(chunk.Size), startOffset+remaining) + // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize)) + chunkStart, chunkStop := max(chunk.ViewOffset, startOffset), min(chunk.ViewOffset+int64(chunk.ViewSize), startOffset+remaining) if chunkStart >= chunkStop { continue } - // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) - bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset + // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.ViewOffset-chunk.Offset, chunk.ViewOffset-chunk.Offset+int64(chunk.ViewSize)) + bufferOffset := chunkStart - chunk.ViewOffset + chunk.OffsetInChunk + ts = chunk.ModifiedTsNs copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset)) if err != nil { glog.Errorf("fetching chunk %+v: %v\n", chunk, err) - return copied, err + return copied, ts, err } n += copied @@ -177,7 +190,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { } -func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews []*ChunkView, offset uint64) (n int, err error) { +func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews *Interval[*ChunkView], offset uint64) (n int, err error) { if c.readerPattern.IsRandomMode() { n, err := c.readerCache.chunkCache.ReadChunkAt(buffer, chunkView.FileId, offset) @@ -187,16 +200,14 @@ func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, next return fetchChunkRange(buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset)) } - n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0) + n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.ViewOffset == 0) if c.lastChunkFid != chunkView.FileId { - if chunkView.Offset == 0 { // start of a new chunk + if chunkView.OffsetInChunk == 0 { // start of a new chunk if c.lastChunkFid != "" { c.readerCache.UnCache(c.lastChunkFid) - c.readerCache.MaybeCache(nextChunkViews) - } else { - if len(nextChunkViews) >= 1 { - c.readerCache.MaybeCache(nextChunkViews[:1]) // just read the next chunk if at the very beginning - } + } + if nextChunkViews != nil { + c.readerCache.MaybeCache(nextChunkViews) // just read the next chunk if at the very beginning } } } |
