diff options
| author | Sébastien <s.berthier@bee-buzziness.com> | 2024-01-29 19:35:52 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-01-29 10:35:52 -0800 |
| commit | 0775d05a23b61a23f9a7f3fb180ec6d96fa94430 (patch) | |
| tree | c5cd1d9bbb914e511f2243f471a7a8b1f5c02708 /weed/filer/stream.go | |
| parent | e5c0680dbcd250f69ab414cd14ee3ecc859c5dcf (diff) | |
| download | seaweedfs-0775d05a23b61a23f9a7f3fb180ec6d96fa94430.tar.xz seaweedfs-0775d05a23b61a23f9a7f3fb180ec6d96fa94430.zip | |
fix: http range request return status 500 (#5251)
When volume server unavailable for at least one chunk; was returning status 206.
Split `StreamContent` in two parts,
- first prepare, to get chunk info and return stream function
- then write chunk, with that stream function
That allow to catch error in first step before setting response status code in `processRangeRequest`
Diffstat (limited to 'weed/filer/stream.go')
| -rw-r--r-- | weed/filer/stream.go | 89 |
1 files changed, 50 insertions, 39 deletions
diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 73a2a219c..a402bc30c 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -3,13 +3,14 @@ package filer import ( "bytes" "fmt" - "golang.org/x/exp/slices" "io" "math" "strings" "sync" "time" + "golang.org/x/exp/slices" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/stats" @@ -66,13 +67,14 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R return NewChunkStreamReader(filerClient, entry.GetChunks()) } -func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { - return StreamContentWithThrottler(masterClient, writer, chunks, offset, size, 0) -} +type DoStreamContent func(writer io.Writer) error -func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) error { +func PrepareStreamContent(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64) (DoStreamContent, error) { + return PrepareStreamContentWithThrottler(masterClient, chunks, offset, size, 0) +} - glog.V(4).Infof("start to stream content for chunks: %d", len(chunks)) +func PrepareStreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) { + glog.V(4).Infof("prepare to stream content for chunks: %d", len(chunks)) chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size) fileId2Url := make(map[string][]string) @@ -91,52 +93,61 @@ func StreamContentWithThrottler(masterClient wdclient.HasLookupFileIdFunction, w } if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) - return err + return nil, err } else if len(urlStrings) == 0 { errUrlNotFound := fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId) glog.Error(errUrlNotFound) - return errUrlNotFound + return nil, errUrlNotFound } fileId2Url[chunkView.FileId] = urlStrings } - downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs) - remaining := size - for x := chunkViews.Front(); x != nil; x = x.Next { - chunkView := x.Value - if offset < chunkView.ViewOffset { - gap := chunkView.ViewOffset - offset - remaining -= gap - glog.V(4).Infof("zero [%d,%d)", offset, chunkView.ViewOffset) - err := writeZero(writer, gap) + return func(writer io.Writer) error { + downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs) + remaining := size + for x := chunkViews.Front(); x != nil; x = x.Next { + chunkView := x.Value + if offset < chunkView.ViewOffset { + gap := chunkView.ViewOffset - offset + remaining -= gap + glog.V(4).Infof("zero [%d,%d)", offset, chunkView.ViewOffset) + err := writeZero(writer, gap) + if err != nil { + return fmt.Errorf("write zero [%d,%d)", offset, chunkView.ViewOffset) + } + offset = chunkView.ViewOffset + } + urlStrings := fileId2Url[chunkView.FileId] + start := time.Now() + err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) + offset += int64(chunkView.ViewSize) + remaining -= int64(chunkView.ViewSize) + stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) if err != nil { - return fmt.Errorf("write zero [%d,%d)", offset, chunkView.ViewOffset) + stats.FilerHandlerCounter.WithLabelValues("chunkDownloadError").Inc() + return fmt.Errorf("read chunk: %v", err) } - offset = chunkView.ViewOffset - } - urlStrings := fileId2Url[chunkView.FileId] - start := time.Now() - err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) - offset += int64(chunkView.ViewSize) - remaining -= int64(chunkView.ViewSize) - stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) - if err != nil { - stats.FilerHandlerCounter.WithLabelValues("chunkDownloadError").Inc() - return fmt.Errorf("read chunk: %v", err) + stats.FilerHandlerCounter.WithLabelValues("chunkDownload").Inc() + downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize)) } - stats.FilerHandlerCounter.WithLabelValues("chunkDownload").Inc() - downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize)) - } - if remaining > 0 { - glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining) - err := writeZero(writer, remaining) - if err != nil { - return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining) + if remaining > 0 { + glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining) + err := writeZero(writer, remaining) + if err != nil { + return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining) + } } - } - return nil + return nil + }, nil +} +func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { + streamFn, err := PrepareStreamContent(masterClient, chunks, offset, size) + if err != nil { + return err + } + return streamFn(writer) } // ---------------- ReadAllReader ---------------------------------- |
