aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2/stream.go')
-rw-r--r--weed/filer2/stream.go58
1 files changed, 58 insertions, 0 deletions
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go
index 381d99144..bf985f8bd 100644
--- a/weed/filer2/stream.go
+++ b/weed/filer2/stream.go
@@ -1,7 +1,10 @@
package filer2
import (
+ "bytes"
+ "fmt"
"io"
+ "math"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -40,3 +43,58 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f
return nil
}
+
+type ChunkStreamReader struct {
+ masterClient *wdclient.MasterClient
+ chunkViews []*ChunkView
+ logicOffset int64
+ buffer bytes.Buffer
+ bufferOffset int64
+ chunkIndex int
+}
+
+var _ = io.ReadSeeker(&ChunkStreamReader{})
+
+func NewChunkStreamReader(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
+
+ chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
+
+ return &ChunkStreamReader{
+ masterClient: masterClient,
+ chunkViews: chunkViews,
+ bufferOffset: -1,
+ }
+}
+
+func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
+ if c.buffer.Len() == 0 {
+ if c.chunkIndex >= len(c.chunkViews) {
+ return 0, io.EOF
+ }
+ chunkView := c.chunkViews[c.chunkIndex]
+ c.fetchChunkToBuffer(chunkView)
+ c.chunkIndex++
+ }
+ return c.buffer.Read(p)
+}
+
+func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
+ return 0, fmt.Errorf("ChunkStreamReader: seek not supported")
+}
+
+func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
+ urlString, err := c.masterClient.LookupFileId(chunkView.FileId)
+ if err != nil {
+ glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
+ return err
+ }
+ c.buffer.Reset()
+ err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.isGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ c.buffer.Write(data)
+ })
+ if err != nil {
+ glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ return err
+ }
+ return nil
+}