diff options
Diffstat (limited to 'weed/filer/stream.go')
| -rw-r--r-- | weed/filer/stream.go | 91 |
1 files changed, 64 insertions, 27 deletions
diff --git a/weed/filer/stream.go b/weed/filer/stream.go index ce0264cd3..7da9fd0a0 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -3,6 +3,7 @@ package filer import ( "bytes" "fmt" + "golang.org/x/exp/slices" "io" "math" "sort" @@ -39,11 +40,11 @@ func isSameChunks(a, b []*filer_pb.FileChunk) bool { if len(a) != len(b) { return false } - sort.Slice(a, func(i, j int) bool { - return strings.Compare(a[i].ETag, a[j].ETag) < 0 + slices.SortFunc(a, func(i, j *filer_pb.FileChunk) bool { + return strings.Compare(i.ETag, j.ETag) < 0 }) - sort.Slice(b, func(i, j int) bool { - return strings.Compare(b[i].ETag, b[j].ETag) < 0 + slices.SortFunc(b, func(i, j *filer_pb.FileChunk) bool { + return strings.Compare(i.ETag, j.ETag) < 0 }) for i := 0; i < len(a); i++ { if a[i].ETag != b[i].ETag { @@ -62,7 +63,7 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks) + glog.V(4).Infof("start to stream content for chunks: %+v", chunks) chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) fileId2Url := make(map[string][]string) @@ -80,11 +81,23 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ fileId2Url[chunkView.FileId] = urlStrings } + remaining := size for _, chunkView := range chunkViews { - + if offset < chunkView.LogicOffset { + gap := chunkView.LogicOffset - offset + remaining -= gap + glog.V(4).Infof("zero [%d,%d)", offset, chunkView.LogicOffset) + err := writeZero(writer, gap) + if err != nil { + return fmt.Errorf("write zero [%d,%d)", offset, chunkView.LogicOffset) + } + offset = chunkView.LogicOffset + } urlStrings := fileId2Url[chunkView.FileId] start := time.Now() err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) + offset += int64(chunkView.Size) + remaining -= int64(chunkView.Size) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) if err != nil { stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc() @@ -92,6 +105,13 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ } stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc() } + if remaining > 0 { + glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining) + err := writeZero(writer, remaining) + if err != nil { + return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining) + } + } return nil @@ -99,42 +119,59 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ // ---------------- ReadAllReader ---------------------------------- -func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) { +func writeZero(w io.Writer, size int64) (err error) { + zeroPadding := make([]byte, 1024) + var written int + for size > 0 { + if size > 1024 { + written, err = w.Write(zeroPadding) + } else { + written, err = w.Write(zeroPadding[:size]) + } + size -= int64(written) + if err != nil { + return + } + } + return +} - buffer := bytes.Buffer{} +func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error { lookupFileIdFn := func(fileId string) (targetUrls []string, err error) { return masterClient.LookupFileId(fileId) } - chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) + chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer))) + + idx := 0 for _, chunkView := range chunkViews { urlStrings, err := lookupFileIdFn(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) - return nil, err + return err } - data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) + n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset) if err != nil { - return nil, err + return err } - buffer.Write(data) + idx += n } - return buffer.Bytes(), nil + return nil } // ---------------- ChunkStreamReader ---------------------------------- type ChunkStreamReader struct { - chunkViews []*ChunkView - totalSize int64 - logicOffset int64 - buffer []byte - bufferOffset int64 - bufferLock sync.Mutex - chunk string - lookupFileId wdclient.LookupFileIdFunctionType + chunkViews []*ChunkView + totalSize int64 + logicOffset int64 + buffer []byte + bufferOffset int64 + bufferLock sync.Mutex + chunk string + lookupFileId wdclient.LookupFileIdFunctionType } var _ = io.ReadSeeker(&ChunkStreamReader{}) @@ -143,8 +180,8 @@ var _ = io.ReaderAt(&ChunkStreamReader{}) func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader { chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64) - sort.Slice(chunkViews, func(i, j int) bool { - return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset + slices.SortFunc(chunkViews, func(a, b *ChunkView) bool { + return a.LogicOffset < b.LogicOffset }) var totalSize int64 @@ -206,7 +243,7 @@ func (c *ChunkStreamReader) doRead(p []byte) (n int, err error) { } func (c *ChunkStreamReader) isBufferEmpty() bool { - return len(c.buffer) <= int(c.logicOffset - c.bufferOffset) + return len(c.buffer) <= int(c.logicOffset-c.bufferOffset) } func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { @@ -261,7 +298,7 @@ func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) { } else if currentChunkIndex > 0 { if insideChunk(offset, c.chunkViews[currentChunkIndex]) { // good hit - } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]){ + } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]) { currentChunkIndex -= 1 // fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId) } else { @@ -297,7 +334,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { var buffer bytes.Buffer var shouldRetry bool for _, urlString := range urlStrings { - shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) if !shouldRetry { |
