aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/stream.go')
-rw-r--r--weed/filer/stream.go245
1 files changed, 245 insertions, 0 deletions
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
new file mode 100644
index 000000000..661a210ea
--- /dev/null
+++ b/weed/filer/stream.go
@@ -0,0 +1,245 @@
+package filer
+
+import (
+ "bytes"
+ "fmt"
+ "golang.org/x/sync/errgroup"
+ "io"
+ "math"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+)
+
+func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64, isCheck bool) error {
+
+ glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
+ chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
+
+ fileId2Url := make(map[string][]string)
+
+ for _, chunkView := range chunkViews {
+
+ urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
+ if err != nil {
+ glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
+ return err
+ } else if len(urlStrings) == 0 {
+ glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
+ return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
+ }
+ fileId2Url[chunkView.FileId] = urlStrings
+ }
+
+ if isCheck {
+ // Pre-check all chunkViews urls
+ gErr := new(errgroup.Group)
+ CheckAllChunkViews(chunkViews, &fileId2Url, gErr)
+ if err := gErr.Wait(); err != nil {
+ glog.Errorf("check all chunks: %v", err)
+ return fmt.Errorf("check all chunks: %v", err)
+ }
+ return nil
+ }
+
+ for _, chunkView := range chunkViews {
+
+ urlStrings := fileId2Url[chunkView.FileId]
+ data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ if err != nil {
+ glog.Errorf("read chunk: %v", err)
+ return fmt.Errorf("read chunk: %v", err)
+ }
+
+ _, err = w.Write(data)
+ if err != nil {
+ glog.Errorf("write chunk: %v", err)
+ return fmt.Errorf("write chunk: %v", err)
+ }
+ }
+
+ return nil
+
+}
+
+func CheckAllChunkViews(chunkViews []*ChunkView, fileId2Url *map[string][]string, gErr *errgroup.Group) {
+ for _, chunkView := range chunkViews {
+ urlStrings := (*fileId2Url)[chunkView.FileId]
+ glog.V(9).Infof("Check chunk: %+v\n url: %v", chunkView, urlStrings)
+ gErr.Go(func() error {
+ _, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ return err
+ })
+ }
+}
+
+// ---------------- ReadAllReader ----------------------------------
+
+func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
+
+ buffer := bytes.Buffer{}
+
+ lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
+ return masterClient.LookupFileId(fileId)
+ }
+
+ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+
+ for _, chunkView := range chunkViews {
+ urlStrings, err := lookupFileIdFn(chunkView.FileId)
+ if err != nil {
+ glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
+ return nil, err
+ }
+
+ data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ if err != nil {
+ return nil, err
+ }
+ buffer.Write(data)
+ }
+ return buffer.Bytes(), nil
+}
+
+// ---------------- ChunkStreamReader ----------------------------------
+type ChunkStreamReader struct {
+ chunkViews []*ChunkView
+ logicOffset int64
+ buffer []byte
+ bufferOffset int64
+ bufferPos int
+ chunkIndex int
+ lookupFileId wdclient.LookupFileIdFunctionType
+}
+
+var _ = io.ReadSeeker(&ChunkStreamReader{})
+
+func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
+
+ lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
+ return masterClient.LookupFileId(fileId)
+ }
+
+ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+
+ return &ChunkStreamReader{
+ chunkViews: chunkViews,
+ lookupFileId: lookupFileIdFn,
+ }
+}
+
+func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
+
+ lookupFileIdFn := LookupFn(filerClient)
+
+ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+
+ return &ChunkStreamReader{
+ chunkViews: chunkViews,
+ lookupFileId: lookupFileIdFn,
+ }
+}
+
+func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
+ for n < len(p) {
+ if c.isBufferEmpty() {
+ if c.chunkIndex >= len(c.chunkViews) {
+ return n, io.EOF
+ }
+ chunkView := c.chunkViews[c.chunkIndex]
+ c.fetchChunkToBuffer(chunkView)
+ c.chunkIndex++
+ }
+ t := copy(p[n:], c.buffer[c.bufferPos:])
+ c.bufferPos += t
+ n += t
+ }
+ return
+}
+
+func (c *ChunkStreamReader) isBufferEmpty() bool {
+ return len(c.buffer) <= c.bufferPos
+}
+
+func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
+
+ var totalSize int64
+ for _, chunk := range c.chunkViews {
+ totalSize += int64(chunk.Size)
+ }
+
+ var err error
+ switch whence {
+ case io.SeekStart:
+ case io.SeekCurrent:
+ offset += c.bufferOffset + int64(c.bufferPos)
+ case io.SeekEnd:
+ offset = totalSize + offset
+ }
+ if offset > totalSize {
+ err = io.ErrUnexpectedEOF
+ }
+
+ for i, chunk := range c.chunkViews {
+ if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
+ if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
+ c.fetchChunkToBuffer(chunk)
+ c.chunkIndex = i + 1
+ break
+ }
+ }
+ }
+ c.bufferPos = int(offset - c.bufferOffset)
+
+ return offset, err
+
+}
+
+func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
+ urlStrings, err := c.lookupFileId(chunkView.FileId)
+ if err != nil {
+ glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
+ return err
+ }
+ var buffer bytes.Buffer
+ var shouldRetry bool
+ for _, urlString := range urlStrings {
+ shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ buffer.Write(data)
+ })
+ if !shouldRetry {
+ break
+ }
+ if err != nil {
+ glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
+ buffer.Reset()
+ } else {
+ break
+ }
+ }
+ if err != nil {
+ return err
+ }
+ c.buffer = buffer.Bytes()
+ c.bufferPos = 0
+ c.bufferOffset = chunkView.LogicOffset
+
+ // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
+
+ return nil
+}
+
+func (c *ChunkStreamReader) Close() {
+ // TODO try to release and reuse buffer
+}
+
+func VolumeId(fileId string) string {
+ lastCommaIndex := strings.LastIndex(fileId, ",")
+ if lastCommaIndex > 0 {
+ return fileId[:lastCommaIndex]
+ }
+ return fileId
+}