diff options
| author | chrislu <chris.lu@gmail.com> | 2022-01-13 13:03:07 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-01-13 13:03:07 -0800 |
| commit | 3c8b74318eec916bcb9ccfe599003d2f4e15e48d (patch) | |
| tree | 06fe6007c5952c58d1523aba9735565d975fa4da | |
| parent | 8907e6a40ac8c97e7c97f73d45cc528b72deb5ae (diff) | |
| parent | 4cd446717997d44122656cd34cc56ec4c1db595f (diff) | |
| download | seaweedfs-3c8b74318eec916bcb9ccfe599003d2f4e15e48d.tar.xz seaweedfs-3c8b74318eec916bcb9ccfe599003d2f4e15e48d.zip | |
Merge branch 'master' of https://github.com/chrislusf/seaweedfs
| -rw-r--r-- | weed/filer/reader_at.go | 2 | ||||
| -rw-r--r-- | weed/filer/stream.go | 36 |
2 files changed, 36 insertions, 2 deletions
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 670de382c..b1c15152f 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -129,7 +129,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { } if startOffset < chunk.LogicOffset { gap := int(chunk.LogicOffset - startOffset) - glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap)) + glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.LogicOffset) n += int(min(int64(gap), remaining)) startOffset, remaining = chunk.LogicOffset, remaining-int64(gap) if remaining <= 0 { diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 8baf7aeaa..e5163f2d9 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -80,11 +80,23 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ fileId2Url[chunkView.FileId] = urlStrings } + remaining := size for _, chunkView := range chunkViews { - + if offset < chunkView.LogicOffset { + gap := chunkView.LogicOffset - offset + remaining -= gap + glog.V(4).Infof("zero [%d,%d)", offset, chunkView.LogicOffset) + err := writeZero(writer, gap) + if err != nil { + return fmt.Errorf("write zero [%d,%d)", offset, chunkView.LogicOffset) + } + offset = chunkView.LogicOffset + } urlStrings := fileId2Url[chunkView.FileId] start := time.Now() err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) + offset += int64(chunkView.Size) + remaining -= int64(chunkView.Size) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) if err != nil { stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc() @@ -92,6 +104,11 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ } stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc() } + glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining) + err := writeZero(writer, remaining) + if err != nil { + return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining) + } return nil @@ -99,6 +116,23 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ // ---------------- ReadAllReader ---------------------------------- +func writeZero(w io.Writer, size int64) (err error) { + zeroPadding := make([]byte, 1024) + var written int + for size > 0 { + if size > 1024 { + written, err = w.Write(zeroPadding) + } else { + written, err = w.Write(zeroPadding[:size]) + } + size -= int64(written) + if err != nil { + return + } + } + return +} + func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) { buffer := bytes.Buffer{} |
