diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-08-08 01:21:42 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-08-08 01:21:42 -0700 |
| commit | 13e45e16054d16e8d8161a8ddb02fde3cd4cde8f (patch) | |
| tree | 29d7b15932e10b0adafe2b27b3618e330805f75c /weed/filer/stream.go | |
| parent | 8f5170c1389f2d0bac75ca2f95a676a05283317b (diff) | |
| download | seaweedfs-13e45e16054d16e8d8161a8ddb02fde3cd4cde8f.tar.xz seaweedfs-13e45e16054d16e8d8161a8ddb02fde3cd4cde8f.zip | |
filer.remote.sync can work now
Diffstat (limited to 'weed/filer/stream.go')
| -rw-r--r-- | weed/filer/stream.go | 32 |
1 files changed, 20 insertions, 12 deletions
diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 3859f9a67..503e6b23f 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -91,6 +91,7 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) type ChunkStreamReader struct { chunkViews []*ChunkView totalSize int64 + logicOffset int64 buffer []byte bufferOffset int64 bufferPos int @@ -137,8 +138,7 @@ func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.F } func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) { - _, err = c.Seek(off, io.SeekStart) - if err != nil { + if err = c.prepareBufferFor(c.logicOffset); err != nil { return } return c.Read(p) @@ -151,12 +151,15 @@ func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { return n, io.EOF } chunkView := c.chunkViews[c.nextChunkViewIndex] - c.fetchChunkToBuffer(chunkView) + if err = c.fetchChunkToBuffer(chunkView); err != nil { + return + } c.nextChunkViewIndex++ } t := copy(p[n:], c.buffer[c.bufferPos:]) c.bufferPos += t n += t + c.logicOffset += int64(t) } return } @@ -171,19 +174,26 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { switch whence { case io.SeekStart: case io.SeekCurrent: - offset += c.bufferOffset + int64(c.bufferPos) + offset += c.logicOffset case io.SeekEnd: offset = c.totalSize + offset } if offset > c.totalSize { err = io.ErrUnexpectedEOF + } else { + c.logicOffset = offset } + return offset, err + +} + +func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) { // stay in the same chunk if !c.isBufferEmpty() { if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) { c.bufferPos = int(offset - c.bufferOffset) - return offset, nil + return nil } } @@ -192,23 +202,21 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { return c.chunkViews[i].LogicOffset <= offset }) if currentChunkIndex == len(c.chunkViews) { - return 0, io.EOF + return io.EOF } // positioning within the new chunk chunk := c.chunkViews[currentChunkIndex] if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { - c.fetchChunkToBuffer(chunk) + if err = c.fetchChunkToBuffer(chunk); err != nil { + return + } c.nextChunkViewIndex = currentChunkIndex + 1 } c.bufferPos = int(offset - c.bufferOffset) - } else { - return 0, io.ErrUnexpectedEOF } - - return offset, err - + return } func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { |
