aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/stream.go')
-rw-r--r--weed/filer/stream.go30
1 files changed, 16 insertions, 14 deletions
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index e5163f2d9..36278f0b1 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -62,7 +62,7 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
- glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
+ glog.V(4).Infof("start to stream content for chunks: %+v", chunks)
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
@@ -104,10 +104,12 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
}
stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
}
- glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
- err := writeZero(writer, remaining)
- if err != nil {
- return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
+ if remaining > 0 {
+ glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
+ err := writeZero(writer, remaining)
+ if err != nil {
+ return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
+ }
}
return nil
@@ -133,30 +135,30 @@ func writeZero(w io.Writer, size int64) (err error) {
return
}
-func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
-
- buffer := bytes.Buffer{}
+func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
return masterClient.LookupFileId(fileId)
}
- chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer)))
+
+ idx := 0
for _, chunkView := range chunkViews {
urlStrings, err := lookupFileIdFn(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
- return nil, err
+ return err
}
- data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset)
if err != nil {
- return nil, err
+ return err
}
- buffer.Write(data)
+ idx += n
}
- return buffer.Bytes(), nil
+ return nil
}
// ---------------- ChunkStreamReader ----------------------------------