aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/filer/stream.go89
-rw-r--r--weed/server/common.go42
-rw-r--r--weed/server/filer_server_handlers_read.go32
-rw-r--r--weed/server/volume_server_handlers_read.go19
4 files changed, 118 insertions, 64 deletions
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index 73a2a219c..a402bc30c 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -3,13 +3,14 @@ package filer
import (
"bytes"
"fmt"
- "golang.org/x/exp/slices"
"io"
"math"
"strings"
"sync"
"time"
+ "golang.org/x/exp/slices"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
@@ -66,13 +67,14 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R
return NewChunkStreamReader(filerClient, entry.GetChunks())
}
-func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
- return StreamContentWithThrottler(masterClient, writer, chunks, offset, size, 0)
-}
+type DoStreamContent func(writer io.Writer) error
-func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) error {
+func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) {
+ return PrepareStreamContentWithThrottler(masterClient, chunks, offset, size, 0)
+}
- glog.V(4).Infof("start to stream content for chunks: %d", len(chunks))
+func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) {
+ glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks))
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
@@ -91,52 +93,61 @@ func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, w
}
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
- return err
+ return nil, err
} else if len(urlStrings) == 0 {
errUrlNotFound := fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
glog.Error(errUrlNotFound)
- return errUrlNotFound
+ return nil, errUrlNotFound
}
fileId2Url[chunkView.FileId] = urlStrings
}
- downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs)
- remaining := size
- for x := chunkViews.Front(); x != nil; x = x.Next {
- chunkView := x.Value
- if offset < chunkView.ViewOffset {
- gap := chunkView.ViewOffset - offset
- remaining -= gap
- glog.V(4).Infof("zero [%d,%d)", offset, chunkView.ViewOffset)
- err := writeZero(writer, gap)
+ return func(writer io.Writer) error {
+ downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs)
+ remaining := size
+ for x := chunkViews.Front(); x != nil; x = x.Next {
+ chunkView := x.Value
+ if offset < chunkView.ViewOffset {
+ gap := chunkView.ViewOffset - offset
+ remaining -= gap
+ glog.V(4).Infof("zero [%d,%d)", offset, chunkView.ViewOffset)
+ err := writeZero(writer, gap)
+ if err != nil {
+ return fmt.Errorf("write zero [%d,%d)", offset, chunkView.ViewOffset)
+ }
+ offset = chunkView.ViewOffset
+ }
+ urlStrings := fileId2Url[chunkView.FileId]
+ start := time.Now()
+ err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
+ offset += int64(chunkView.ViewSize)
+ remaining -= int64(chunkView.ViewSize)
+ stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
if err != nil {
- return fmt.Errorf("write zero [%d,%d)", offset, chunkView.ViewOffset)
+ stats.FilerHandlerCounter.WithLabelValues("chunkDownloadError").Inc()
+ return fmt.Errorf("read chunk: %v", err)
}
- offset = chunkView.ViewOffset
- }
- urlStrings := fileId2Url[chunkView.FileId]
- start := time.Now()
- err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
- offset += int64(chunkView.ViewSize)
- remaining -= int64(chunkView.ViewSize)
- stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
- if err != nil {
- stats.FilerHandlerCounter.WithLabelValues("chunkDownloadError").Inc()
- return fmt.Errorf("read chunk: %v", err)
+ stats.FilerHandlerCounter.WithLabelValues("chunkDownload").Inc()
+ downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize))
}
- stats.FilerHandlerCounter.WithLabelValues("chunkDownload").Inc()
- downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize))
- }
- if remaining > 0 {
- glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
- err := writeZero(writer, remaining)
- if err != nil {
- return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
+ if remaining > 0 {
+ glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
+ err := writeZero(writer, remaining)
+ if err != nil {
+ return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
+ }
}
- }
- return nil
+ return nil
+ }, nil
+}
+func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
+ streamFn, err := PrepareStreamContent(masterClient, chunks, offset, size)
+ if err != nil {
+ return err
+ }
+ return streamFn(writer)
}
// ---------------- ReadAllReader ----------------------------------
diff --git a/weed/server/common.go b/weed/server/common.go
index d88298402..a7d67fb2e 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"io"
"io/fs"
"mime/multipart"
@@ -18,6 +17,9 @@ import (
"sync"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
+
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -282,7 +284,7 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file
}
}
-func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) error {
+func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, prepareWriteFn func(offset int64, size int64) (filer.DoStreamContent, error)) error {
rangeReq := r.Header.Get("Range")
bufferedWriter := writePool.Get().(*bufio.Writer)
bufferedWriter.Reset(w)
@@ -293,7 +295,13 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
- if err := writeFn(bufferedWriter, 0, totalSize); err != nil {
+ writeFn, err := prepareWriteFn(0, totalSize)
+ if err != nil {
+ glog.Errorf("processRangeRequest: %v", err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return fmt.Errorf("processRangeRequest: %v", err)
+ }
+ if err = writeFn(bufferedWriter); err != nil {
glog.Errorf("processRangeRequest: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return fmt.Errorf("processRangeRequest: %v", err)
@@ -335,8 +343,14 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
+ writeFn, err := prepareWriteFn(ra.start, ra.length)
+ if err != nil {
+ glog.Errorf("processRangeRequest range[0]: %+v err: %v", w.Header(), err)
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return fmt.Errorf("processRangeRequest: %v", err)
+ }
w.WriteHeader(http.StatusPartialContent)
- err = writeFn(bufferedWriter, ra.start, ra.length)
+ err = writeFn(bufferedWriter)
if err != nil {
glog.Errorf("processRangeRequest range[0]: %+v err: %v", w.Header(), err)
http.Error(w, err.Error(), http.StatusInternalServerError)
@@ -346,11 +360,20 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
}
// process multiple ranges
- for _, ra := range ranges {
+ writeFnByRange := make(map[int](func(writer io.Writer) error))
+
+ for i, ra := range ranges {
if ra.start > totalSize {
http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
return fmt.Errorf("out of range: %v", err)
}
+ writeFn, err := prepareWriteFn(ra.start, ra.length)
+ if err != nil {
+ glog.Errorf("processRangeRequest range[%d] err: %v", i, err)
+ http.Error(w, "Internal Error", http.StatusInternalServerError)
+ return fmt.Errorf("processRangeRequest range[%d] err: %v", i, err)
+ }
+ writeFnByRange[i] = writeFn
}
sendSize := rangesMIMESize(ranges, mimeType, totalSize)
pr, pw := io.Pipe()
@@ -359,13 +382,18 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
sendContent := pr
defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
go func() {
- for _, ra := range ranges {
+ for i, ra := range ranges {
part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
if e != nil {
pw.CloseWithError(e)
return
}
- if e = writeFn(part, ra.start, ra.length); e != nil {
+ writeFn := writeFnByRange[i]
+ if writeFn == nil {
+ pw.CloseWithError(e)
+ return
+ }
+ if e = writeFn(part); e != nil {
pw.CloseWithError(e)
return
}
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index f5f652f83..d1cd3beae 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -231,14 +231,16 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) {
if offset+size <= int64(len(entry.Content)) {
- _, err := writer.Write(entry.Content[offset : offset+size])
- if err != nil {
- stats.FilerHandlerCounter.WithLabelValues(stats.ErrorWriteEntry).Inc()
- glog.Errorf("failed to write entry content: %v", err)
- }
- return err
+ return func(writer io.Writer) error {
+ _, err := writer.Write(entry.Content[offset : offset+size])
+ if err != nil {
+ stats.FilerHandlerCounter.WithLabelValues(stats.ErrorWriteEntry).Inc()
+ glog.Errorf("failed to write entry content: %v", err)
+ }
+ return err
+ }, nil
}
chunks := entry.GetChunks()
if entry.IsInRemoteOnly() {
@@ -249,17 +251,25 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}); err != nil {
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadCache).Inc()
glog.Errorf("CacheRemoteObjectToLocalCluster %s: %v", entry.FullPath, err)
- return fmt.Errorf("cache %s: %v", entry.FullPath, err)
+ return nil, fmt.Errorf("cache %s: %v", entry.FullPath, err)
} else {
chunks = resp.Entry.GetChunks()
}
}
- err = filer.StreamContentWithThrottler(fs.filer.MasterClient, writer, chunks, offset, size, fs.option.DownloadMaxBytesPs)
+ streamFn, err := filer.PrepareStreamContentWithThrottler(fs.filer.MasterClient, chunks, offset, size, fs.option.DownloadMaxBytesPs)
if err != nil {
stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
- glog.Errorf("failed to stream content %s: %v", r.URL, err)
+ glog.Errorf("failed to prepare stream content %s: %v", r.URL, err)
+ return nil, err
}
- return err
+ return func(writer io.Writer) error {
+ err := streamFn(writer)
+ if err != nil {
+ stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc()
+ glog.Errorf("failed to stream content %s: %v", r.URL, err)
+ }
+ return err
+ }, nil
})
}
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 5f4beb77d..08e536811 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -15,6 +15,7 @@ import (
"sync/atomic"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
@@ -382,12 +383,14 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
return nil
}
- return processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
- if _, e = rs.Seek(offset, 0); e != nil {
+ return processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) {
+ return func(writer io.Writer) error {
+ if _, e = rs.Seek(offset, 0); e != nil {
+ return e
+ }
+ _, e = io.CopyN(writer, rs, size)
return e
- }
- _, e = io.CopyN(writer, rs, size)
- return e
+ }, nil
})
}
@@ -409,8 +412,10 @@ func (vs *VolumeServer) streamWriteResponseContent(filename string, mimeType str
return
}
- processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
- return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size)
+ processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) {
+ return func(writer io.Writer) error {
+ return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size)
+ }, nil
})
}