aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/reader_at.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/reader_at.go')
-rw-r--r--weed/filer/reader_at.go65
1 files changed, 38 insertions, 27 deletions
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 9d1fab20a..27e8f79a6 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -16,8 +16,7 @@ import (
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
- chunkViews []*ChunkView
- readerLock sync.Mutex
+ chunkViews *IntervalList[*ChunkView]
fileSize int64
readerCache *ReaderCache
readerPattern *ReaderPattern
@@ -89,7 +88,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
}
}
-func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
+func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews *IntervalList[*ChunkView], chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
return &ChunkReadAt{
chunkViews: chunkViews,
@@ -108,44 +107,58 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
c.readerPattern.MonitorReadAt(offset, len(p))
- c.readerLock.Lock()
- defer c.readerLock.Unlock()
+ c.chunkViews.Lock.Lock()
+ defer c.chunkViews.Lock.Unlock()
+
+ // glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
+ n, _, err = c.doReadAt(p, offset)
+ return
+}
+
+func (c *ChunkReadAt) ReadAtWithTime(p []byte, offset int64) (n int, ts int64, err error) {
+
+ c.readerPattern.MonitorReadAt(offset, len(p))
+
+ c.chunkViews.Lock.Lock()
+ defer c.chunkViews.Lock.Unlock()
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
return c.doReadAt(p, offset)
}
-func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
+func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, ts int64, err error) {
startOffset, remaining := offset, int64(len(p))
- var nextChunks []*ChunkView
- for i, chunk := range c.chunkViews {
+ var nextChunks *Interval[*ChunkView]
+ for x := c.chunkViews.Front(); x != nil; x = x.Next {
+ chunk := x.Value
if remaining <= 0 {
break
}
- if i+1 < len(c.chunkViews) {
- nextChunks = c.chunkViews[i+1:]
+ if x.Next != nil {
+ nextChunks = x.Next
}
- if startOffset < chunk.LogicOffset {
- gap := chunk.LogicOffset - startOffset
- glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.LogicOffset)
+ if startOffset < chunk.ViewOffset {
+ gap := chunk.ViewOffset - startOffset
+ glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.ViewOffset)
n += zero(p, startOffset-offset, gap)
- startOffset, remaining = chunk.LogicOffset, remaining-gap
+ startOffset, remaining = chunk.ViewOffset, remaining-gap
if remaining <= 0 {
break
}
}
- // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
- chunkStart, chunkStop := max(chunk.LogicOffset, startOffset), min(chunk.LogicOffset+int64(chunk.Size), startOffset+remaining)
+ // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize))
+ chunkStart, chunkStop := max(chunk.ViewOffset, startOffset), min(chunk.ViewOffset+int64(chunk.ViewSize), startOffset+remaining)
if chunkStart >= chunkStop {
continue
}
- // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
- bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
+ // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.ViewOffset-chunk.Offset, chunk.ViewOffset-chunk.Offset+int64(chunk.ViewSize))
+ bufferOffset := chunkStart - chunk.ViewOffset + chunk.OffsetInChunk
+ ts = chunk.ModifiedTsNs
copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset))
if err != nil {
glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
- return copied, err
+ return copied, ts, err
}
n += copied
@@ -177,7 +190,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
}
-func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews []*ChunkView, offset uint64) (n int, err error) {
+func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews *Interval[*ChunkView], offset uint64) (n int, err error) {
if c.readerPattern.IsRandomMode() {
n, err := c.readerCache.chunkCache.ReadChunkAt(buffer, chunkView.FileId, offset)
@@ -187,16 +200,14 @@ func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, next
return fetchChunkRange(buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset))
}
- n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0)
+ n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.ViewOffset == 0)
if c.lastChunkFid != chunkView.FileId {
- if chunkView.Offset == 0 { // start of a new chunk
+ if chunkView.OffsetInChunk == 0 { // start of a new chunk
if c.lastChunkFid != "" {
c.readerCache.UnCache(c.lastChunkFid)
- c.readerCache.MaybeCache(nextChunkViews)
- } else {
- if len(nextChunkViews) >= 1 {
- c.readerCache.MaybeCache(nextChunkViews[:1]) // just read the next chunk if at the very beginning
- }
+ }
+ if nextChunkViews != nil {
+ c.readerCache.MaybeCache(nextChunkViews) // just read the next chunk if at the very beginning
}
}
}