aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/filechunk_group.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/filechunk_group.go')
-rw-r--r--weed/filer/filechunk_group.go157
1 files changed, 147 insertions, 10 deletions
diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go
index 0f449735a..ed92e78a9 100644
--- a/weed/filer/filechunk_group.go
+++ b/weed/filer/filechunk_group.go
@@ -5,29 +5,64 @@ import (
"io"
"sync"
+ "golang.org/x/sync/errgroup"
+
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
type ChunkGroup struct {
- lookupFn wdclient.LookupFileIdFunctionType
- sections map[SectionIndex]*FileChunkSection
- sectionsLock sync.RWMutex
- readerCache *ReaderCache
+ lookupFn wdclient.LookupFileIdFunctionType
+ sections map[SectionIndex]*FileChunkSection
+ sectionsLock sync.RWMutex
+ readerCache *ReaderCache
+ concurrentReaders int
}
-func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) {
+// NewChunkGroup creates a ChunkGroup with configurable concurrency.
+// concurrentReaders controls:
+// - Maximum parallel chunk fetches during read operations
+// - Read-ahead prefetch parallelism
+// - Number of concurrent section reads for large files
+// If concurrentReaders <= 0, defaults to 16.
+func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk, concurrentReaders int) (*ChunkGroup, error) {
+ if concurrentReaders <= 0 {
+ concurrentReaders = 16
+ }
+ if concurrentReaders > 128 {
+ concurrentReaders = 128 // Cap to prevent excessive goroutine fan-out
+ }
+ // ReaderCache limit should be at least concurrentReaders to allow parallel prefetching
+ readerCacheLimit := concurrentReaders * 2
+ if readerCacheLimit < 32 {
+ readerCacheLimit = 32
+ }
group := &ChunkGroup{
- lookupFn: lookupFn,
- sections: make(map[SectionIndex]*FileChunkSection),
- readerCache: NewReaderCache(32, chunkCache, lookupFn),
+ lookupFn: lookupFn,
+ sections: make(map[SectionIndex]*FileChunkSection),
+ readerCache: NewReaderCache(readerCacheLimit, chunkCache, lookupFn),
+ concurrentReaders: concurrentReaders,
}
err := group.SetChunks(chunks)
return group, err
}
+// GetPrefetchCount returns the number of chunks to prefetch ahead during sequential reads.
+// This is derived from concurrentReaders to keep the network pipeline full.
+func (group *ChunkGroup) GetPrefetchCount() int {
+ // Prefetch at least 1, and scale with concurrency (roughly 1/4 of concurrent readers)
+ prefetch := group.concurrentReaders / 4
+ if prefetch < 1 {
+ prefetch = 1
+ }
+ if prefetch > 8 {
+ prefetch = 8 // Cap at 8 to avoid excessive memory usage
+ }
+ return prefetch
+}
+
func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error {
group.sectionsLock.Lock()
@@ -54,6 +89,19 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff []
defer group.sectionsLock.RUnlock()
sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize)
+ numSections := int(sectionIndexStop - sectionIndexStart + 1)
+
+ // For single section or when concurrency is disabled, use sequential reading
+ if numSections <= 1 || group.concurrentReaders <= 1 {
+ return group.readDataAtSequential(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop)
+ }
+
+ // For multiple sections, use parallel reading
+ return group.readDataAtParallel(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop)
+}
+
+// readDataAtSequential reads sections sequentially (original behavior)
+func (group *ChunkGroup) readDataAtSequential(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) {
for si := sectionIndexStart; si < sectionIndexStop+1; si++ {
section, found := group.sections[si]
rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize))
@@ -78,9 +126,98 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff []
return
}
+// sectionReadResult holds the result of a section read operation
+type sectionReadResult struct {
+ sectionIndex SectionIndex
+ n int
+ tsNs int64
+ err error
+}
+
+// readDataAtParallel reads multiple sections in parallel for better throughput
+func (group *ChunkGroup) readDataAtParallel(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) {
+ numSections := int(sectionIndexStop - sectionIndexStart + 1)
+
+ // Limit concurrency to the smaller of concurrentReaders and numSections
+ maxConcurrent := group.concurrentReaders
+ if numSections < maxConcurrent {
+ maxConcurrent = numSections
+ }
+
+ g, gCtx := errgroup.WithContext(ctx)
+ g.SetLimit(maxConcurrent)
+
+ results := make([]sectionReadResult, numSections)
+
+ for i := 0; i < numSections; i++ {
+ si := sectionIndexStart + SectionIndex(i)
+ idx := i
+
+ section, found := group.sections[si]
+ rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize))
+ if rangeStart >= rangeStop {
+ continue
+ }
+
+ if !found {
+ // Zero-fill missing sections synchronously
+ rangeStop = min(rangeStop, fileSize)
+ for j := rangeStart; j < rangeStop; j++ {
+ buff[j-offset] = 0
+ }
+ results[idx] = sectionReadResult{
+ sectionIndex: si,
+ n: int(rangeStop - rangeStart),
+ tsNs: 0,
+ err: nil,
+ }
+ continue
+ }
+
+ // Capture variables for closure
+ sectionCopy := section
+ buffSlice := buff[rangeStart-offset : rangeStop-offset]
+ rangeStartCopy := rangeStart
+
+ g.Go(func() error {
+ xn, xTsNs, xErr := sectionCopy.readDataAt(gCtx, group, fileSize, buffSlice, rangeStartCopy)
+ results[idx] = sectionReadResult{
+ sectionIndex: si,
+ n: xn,
+ tsNs: xTsNs,
+ err: xErr,
+ }
+ if xErr != nil && xErr != io.EOF {
+ return xErr
+ }
+ return nil
+ })
+ }
+
+ // Wait for all goroutines to complete
+ groupErr := g.Wait()
+
+ // Aggregate results
+ for _, result := range results {
+ n += result.n
+ tsNs = max(tsNs, result.tsNs)
+ // Collect first non-EOF error from results as fallback
+ if result.err != nil && result.err != io.EOF && err == nil {
+ err = result.err
+ }
+ }
+
+ // Prioritize errgroup error (first error that cancelled context)
+ if groupErr != nil {
+ err = groupErr
+ }
+
+ return
+}
+
func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error {
- group.sectionsLock.RLock()
- defer group.sectionsLock.RUnlock()
+ group.sectionsLock.Lock()
+ defer group.sectionsLock.Unlock()
var dataChunks []*filer_pb.FileChunk
for _, chunk := range chunks {