aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/reader_at.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/reader_at.go')
-rw-r--r--weed/filer/reader_at.go138
1 files changed, 112 insertions, 26 deletions
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 93fa76a2e..5e8fd6154 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -7,6 +7,8 @@ import (
"math/rand"
"sync"
+ "golang.org/x/sync/errgroup"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
@@ -19,6 +21,11 @@ import (
// the prefetch count is derived from the -concurrentReaders option.
const DefaultPrefetchCount = 4
+// minReadConcurrency is the minimum number of parallel chunk fetches.
+// This ensures at least some parallelism even when prefetchCount is low,
+// improving throughput for reads spanning multiple chunks.
+const minReadConcurrency = 4
+
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews *IntervalList[*ChunkView]
@@ -175,67 +182,139 @@ func (c *ChunkReadAt) ReadAtWithTime(ctx context.Context, p []byte, offset int64
return c.doReadAt(ctx, p, offset)
}
+// chunkReadTask represents a single chunk read operation for parallel processing
+type chunkReadTask struct {
+ chunk *ChunkView
+ bufferStart int64 // start position in the output buffer
+ bufferEnd int64 // end position in the output buffer
+ chunkOffset uint64 // offset within the chunk to read from
+ bytesRead int
+ modifiedTsNs int64
+}
+
func (c *ChunkReadAt) doReadAt(ctx context.Context, p []byte, offset int64) (n int, ts int64, err error) {
+ // Collect all chunk read tasks
+ var tasks []*chunkReadTask
+ var gaps []struct{ start, length int64 } // gaps that need zero-filling
+
startOffset, remaining := offset, int64(len(p))
- var nextChunks *Interval[*ChunkView]
+ var lastChunk *Interval[*ChunkView]
+
for x := c.chunkViews.Front(); x != nil; x = x.Next {
chunk := x.Value
if remaining <= 0 {
break
}
- if x.Next != nil {
- nextChunks = x.Next
- }
+ lastChunk = x
+
+ // Handle gap before this chunk
if startOffset < chunk.ViewOffset {
gap := chunk.ViewOffset - startOffset
- glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.ViewOffset)
- n += zero(p, startOffset-offset, gap)
+ gaps = append(gaps, struct{ start, length int64 }{startOffset - offset, gap})
startOffset, remaining = chunk.ViewOffset, remaining-gap
if remaining <= 0 {
break
}
}
- // fmt.Printf(">>> doReadAt [%d,%d), chunk[%d,%d)\n", offset, offset+int64(len(p)), chunk.ViewOffset, chunk.ViewOffset+int64(chunk.ViewSize))
+
chunkStart, chunkStop := max(chunk.ViewOffset, startOffset), min(chunk.ViewOffset+int64(chunk.ViewSize), startOffset+remaining)
if chunkStart >= chunkStop {
continue
}
- // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.ViewOffset-chunk.Offset, chunk.ViewOffset-chunk.Offset+int64(chunk.ViewSize))
+
bufferOffset := chunkStart - chunk.ViewOffset + chunk.OffsetInChunk
- ts = chunk.ModifiedTsNs
- copied, err := c.readChunkSliceAt(ctx, p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset))
- if err != nil {
- glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
- return copied, ts, err
+ tasks = append(tasks, &chunkReadTask{
+ chunk: chunk,
+ bufferStart: startOffset - offset,
+ bufferEnd: chunkStop - chunkStart + startOffset - offset,
+ chunkOffset: uint64(bufferOffset),
+ })
+
+ startOffset, remaining = chunkStop, remaining-(chunkStop-chunkStart)
+ }
+
+ // Zero-fill gaps
+ for _, gap := range gaps {
+ glog.V(4).Infof("zero [%d,%d)", offset+gap.start, offset+gap.start+gap.length)
+ n += zero(p, gap.start, gap.length)
+ }
+
+ // If only one chunk or random access mode, use sequential reading
+ if len(tasks) <= 1 || c.readerPattern.IsRandomMode() {
+ for _, task := range tasks {
+ copied, readErr := c.readChunkSliceAt(ctx, p[task.bufferStart:task.bufferEnd], task.chunk, nil, task.chunkOffset)
+ ts = max(ts, task.chunk.ModifiedTsNs)
+ if readErr != nil {
+ glog.Errorf("fetching chunk %+v: %v\n", task.chunk, readErr)
+ return n + copied, ts, readErr
+ }
+ n += copied
+ }
+ } else {
+ // Parallel chunk fetching for multiple chunks
+ // This significantly improves throughput when chunks are on different volume servers
+ g, gCtx := errgroup.WithContext(ctx)
+
+ // Limit concurrency to avoid overwhelming the system
+ concurrency := c.prefetchCount
+ if concurrency < minReadConcurrency {
+ concurrency = minReadConcurrency
+ }
+ if concurrency > len(tasks) {
+ concurrency = len(tasks)
+ }
+ g.SetLimit(concurrency)
+
+ for _, task := range tasks {
+ g.Go(func() error {
+ // Read directly into the correct position in the output buffer
+ copied, readErr := c.readChunkSliceAtForParallel(gCtx, p[task.bufferStart:task.bufferEnd], task.chunk, task.chunkOffset)
+ task.bytesRead = copied
+ task.modifiedTsNs = task.chunk.ModifiedTsNs
+ return readErr
+ })
}
- n += copied
- startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
+ // Wait for all chunk reads to complete
+ if waitErr := g.Wait(); waitErr != nil {
+ err = waitErr
+ }
+
+ // Aggregate results (order is preserved since we read directly into buffer positions)
+ for _, task := range tasks {
+ n += task.bytesRead
+ ts = max(ts, task.modifiedTsNs)
+ }
+
+ if err != nil {
+ return n, ts, err
+ }
}
- // glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
+ // Trigger prefetch for sequential reads
+ if lastChunk != nil && lastChunk.Next != nil && c.prefetchCount > 0 && !c.readerPattern.IsRandomMode() {
+ c.readerCache.MaybeCache(lastChunk.Next, c.prefetchCount)
+ }
- // zero the remaining bytes if a gap exists at the end of the last chunk (or a fully sparse file)
- if err == nil && remaining > 0 {
+ // Zero the remaining bytes if a gap exists at the end
+ if remaining > 0 {
var delta int64
if c.fileSize >= startOffset {
delta = min(remaining, c.fileSize-startOffset)
- startOffset -= offset
- }
- if delta > 0 {
- glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+delta, c.fileSize)
- n += zero(p, startOffset, delta)
+ bufStart := startOffset - offset
+ if delta > 0 {
+ glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+delta, c.fileSize)
+ n += zero(p, bufStart, delta)
+ }
}
}
if err == nil && offset+int64(len(p)) >= c.fileSize {
err = io.EOF
}
- // fmt.Printf("~~~ filled %d, err: %v\n\n", n, err)
return
-
}
func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunkView *ChunkView, nextChunkViews *Interval[*ChunkView], offset uint64) (n int, err error) {
@@ -249,7 +328,7 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk
}
shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache()
- n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache)
+ n, err = c.readerCache.ReadChunkAt(ctx, buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache)
if c.lastChunkFid != chunkView.FileId {
if chunkView.OffsetInChunk == 0 { // start of a new chunk
if c.lastChunkFid != "" {
@@ -266,6 +345,13 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk
return
}
+// readChunkSliceAtForParallel is a simplified version for parallel chunk fetching
+// It doesn't update lastChunkFid or trigger prefetch (handled by the caller)
+func (c *ChunkReadAt) readChunkSliceAtForParallel(ctx context.Context, buffer []byte, chunkView *ChunkView, offset uint64) (n int, err error) {
+ shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= c.readerCache.chunkCache.GetMaxFilePartSizeInCache()
+ return c.readerCache.ReadChunkAt(ctx, buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), shouldCache)
+}
+
func zero(buffer []byte, start, length int64) int {
if length <= 0 {
return 0