diff options
Diffstat (limited to 'weed/filer2/stream.go')
| -rw-r--r-- | weed/filer2/stream.go | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index 381d99144..bf985f8bd 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -1,7 +1,10 @@ package filer2 import ( + "bytes" + "fmt" "io" + "math" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -40,3 +43,58 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f return nil } + +type ChunkStreamReader struct { + masterClient *wdclient.MasterClient + chunkViews []*ChunkView + logicOffset int64 + buffer bytes.Buffer + bufferOffset int64 + chunkIndex int +} + +var _ = io.ReadSeeker(&ChunkStreamReader{}) + +func NewChunkStreamReader(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { + + chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) + + return &ChunkStreamReader{ + masterClient: masterClient, + chunkViews: chunkViews, + bufferOffset: -1, + } +} + +func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { + if c.buffer.Len() == 0 { + if c.chunkIndex >= len(c.chunkViews) { + return 0, io.EOF + } + chunkView := c.chunkViews[c.chunkIndex] + c.fetchChunkToBuffer(chunkView) + c.chunkIndex++ + } + return c.buffer.Read(p) +} + +func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { + return 0, fmt.Errorf("ChunkStreamReader: seek not supported") +} + +func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { + urlString, err := c.masterClient.LookupFileId(chunkView.FileId) + if err != nil { + glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) + return err + } + c.buffer.Reset() + err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.isGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) { + c.buffer.Write(data) + }) + if err != nil { + glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) + return err + } + return nil +} |
