aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/stream.go')
-rw-r--r--weed/filer/stream.go91
1 files changed, 64 insertions, 27 deletions
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index ce0264cd3..7da9fd0a0 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -3,6 +3,7 @@ package filer
import (
"bytes"
"fmt"
+ "golang.org/x/exp/slices"
"io"
"math"
"sort"
@@ -39,11 +40,11 @@ func isSameChunks(a, b []*filer_pb.FileChunk) bool {
if len(a) != len(b) {
return false
}
- sort.Slice(a, func(i, j int) bool {
- return strings.Compare(a[i].ETag, a[j].ETag) < 0
+ slices.SortFunc(a, func(i, j *filer_pb.FileChunk) bool {
+ return strings.Compare(i.ETag, j.ETag) < 0
})
- sort.Slice(b, func(i, j int) bool {
- return strings.Compare(b[i].ETag, b[j].ETag) < 0
+ slices.SortFunc(b, func(i, j *filer_pb.FileChunk) bool {
+ return strings.Compare(i.ETag, j.ETag) < 0
})
for i := 0; i < len(a); i++ {
if a[i].ETag != b[i].ETag {
@@ -62,7 +63,7 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
- glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
+ glog.V(4).Infof("start to stream content for chunks: %+v", chunks)
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
@@ -80,11 +81,23 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
fileId2Url[chunkView.FileId] = urlStrings
}
+ remaining := size
for _, chunkView := range chunkViews {
-
+ if offset < chunkView.LogicOffset {
+ gap := chunkView.LogicOffset - offset
+ remaining -= gap
+ glog.V(4).Infof("zero [%d,%d)", offset, chunkView.LogicOffset)
+ err := writeZero(writer, gap)
+ if err != nil {
+ return fmt.Errorf("write zero [%d,%d)", offset, chunkView.LogicOffset)
+ }
+ offset = chunkView.LogicOffset
+ }
urlStrings := fileId2Url[chunkView.FileId]
start := time.Now()
err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ offset += int64(chunkView.Size)
+ remaining -= int64(chunkView.Size)
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
if err != nil {
stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
@@ -92,6 +105,13 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
}
stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
}
+ if remaining > 0 {
+ glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
+ err := writeZero(writer, remaining)
+ if err != nil {
+ return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
+ }
+ }
return nil
@@ -99,42 +119,59 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
// ---------------- ReadAllReader ----------------------------------
-func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
+func writeZero(w io.Writer, size int64) (err error) {
+ zeroPadding := make([]byte, 1024)
+ var written int
+ for size > 0 {
+ if size > 1024 {
+ written, err = w.Write(zeroPadding)
+ } else {
+ written, err = w.Write(zeroPadding[:size])
+ }
+ size -= int64(written)
+ if err != nil {
+ return
+ }
+ }
+ return
+}
- buffer := bytes.Buffer{}
+func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
return masterClient.LookupFileId(fileId)
}
- chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer)))
+
+ idx := 0
for _, chunkView := range chunkViews {
urlStrings, err := lookupFileIdFn(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
- return nil, err
+ return err
}
- data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset)
if err != nil {
- return nil, err
+ return err
}
- buffer.Write(data)
+ idx += n
}
- return buffer.Bytes(), nil
+ return nil
}
// ---------------- ChunkStreamReader ----------------------------------
type ChunkStreamReader struct {
- chunkViews []*ChunkView
- totalSize int64
- logicOffset int64
- buffer []byte
- bufferOffset int64
- bufferLock sync.Mutex
- chunk string
- lookupFileId wdclient.LookupFileIdFunctionType
+ chunkViews []*ChunkView
+ totalSize int64
+ logicOffset int64
+ buffer []byte
+ bufferOffset int64
+ bufferLock sync.Mutex
+ chunk string
+ lookupFileId wdclient.LookupFileIdFunctionType
}
var _ = io.ReadSeeker(&ChunkStreamReader{})
@@ -143,8 +180,8 @@ var _ = io.ReaderAt(&ChunkStreamReader{})
func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
- sort.Slice(chunkViews, func(i, j int) bool {
- return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset
+ slices.SortFunc(chunkViews, func(a, b *ChunkView) bool {
+ return a.LogicOffset < b.LogicOffset
})
var totalSize int64
@@ -206,7 +243,7 @@ func (c *ChunkStreamReader) doRead(p []byte) (n int, err error) {
}
func (c *ChunkStreamReader) isBufferEmpty() bool {
- return len(c.buffer) <= int(c.logicOffset - c.bufferOffset)
+ return len(c.buffer) <= int(c.logicOffset-c.bufferOffset)
}
func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
@@ -261,7 +298,7 @@ func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
} else if currentChunkIndex > 0 {
if insideChunk(offset, c.chunkViews[currentChunkIndex]) {
// good hit
- } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]){
+ } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]) {
currentChunkIndex -= 1
// fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
} else {
@@ -297,7 +334,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
- shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {