aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/stream.go')
-rw-r--r--weed/filer/stream.go32
1 files changed, 20 insertions, 12 deletions
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 3859f9a67..503e6b23f 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -91,6 +91,7 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk)
type ChunkStreamReader struct {
chunkViews []*ChunkView
totalSize int64
+ logicOffset int64
buffer []byte
bufferOffset int64
bufferPos int
@@ -137,8 +138,7 @@ func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.F
}
func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
- _, err = c.Seek(off, io.SeekStart)
- if err != nil {
+ if err = c.prepareBufferFor(c.logicOffset); err != nil {
return
}
return c.Read(p)
@@ -151,12 +151,15 @@ func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
return n, io.EOF
}
chunkView := c.chunkViews[c.nextChunkViewIndex]
- c.fetchChunkToBuffer(chunkView)
+ if err = c.fetchChunkToBuffer(chunkView); err != nil {
+ return
+ }
c.nextChunkViewIndex++
}
t := copy(p[n:], c.buffer[c.bufferPos:])
c.bufferPos += t
n += t
+ c.logicOffset += int64(t)
}
return
}
@@ -171,19 +174,26 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
case io.SeekCurrent:
- offset += c.bufferOffset + int64(c.bufferPos)
+ offset += c.logicOffset
case io.SeekEnd:
offset = c.totalSize + offset
}
if offset > c.totalSize {
err = io.ErrUnexpectedEOF
+ } else {
+ c.logicOffset = offset
}
+ return offset, err
+
+}
+
+func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
// stay in the same chunk
if !c.isBufferEmpty() {
if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
c.bufferPos = int(offset - c.bufferOffset)
- return offset, nil
+ return nil
}
}
@@ -192,23 +202,21 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
return c.chunkViews[i].LogicOffset <= offset
})
if currentChunkIndex == len(c.chunkViews) {
- return 0, io.EOF
+ return io.EOF
}
// positioning within the new chunk
chunk := c.chunkViews[currentChunkIndex]
if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
- c.fetchChunkToBuffer(chunk)
+ if err = c.fetchChunkToBuffer(chunk); err != nil {
+ return
+ }
c.nextChunkViewIndex = currentChunkIndex + 1
}
c.bufferPos = int(offset - c.bufferOffset)
- } else {
- return 0, io.ErrUnexpectedEOF
}
-
- return offset, err
-
+ return
}
func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {