diff options
Diffstat (limited to 'weed/filer/filechunk_group.go')
| -rw-r--r-- | weed/filer/filechunk_group.go | 157 |
1 files changed, 147 insertions, 10 deletions
diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go index 0f449735a..ed92e78a9 100644 --- a/weed/filer/filechunk_group.go +++ b/weed/filer/filechunk_group.go @@ -5,29 +5,64 @@ import ( "io" "sync" + "golang.org/x/sync/errgroup" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/wdclient" ) type ChunkGroup struct { - lookupFn wdclient.LookupFileIdFunctionType - sections map[SectionIndex]*FileChunkSection - sectionsLock sync.RWMutex - readerCache *ReaderCache + lookupFn wdclient.LookupFileIdFunctionType + sections map[SectionIndex]*FileChunkSection + sectionsLock sync.RWMutex + readerCache *ReaderCache + concurrentReaders int } -func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) { +// NewChunkGroup creates a ChunkGroup with configurable concurrency. +// concurrentReaders controls: +// - Maximum parallel chunk fetches during read operations +// - Read-ahead prefetch parallelism +// - Number of concurrent section reads for large files +// If concurrentReaders <= 0, defaults to 16. +func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk, concurrentReaders int) (*ChunkGroup, error) { + if concurrentReaders <= 0 { + concurrentReaders = 16 + } + if concurrentReaders > 128 { + concurrentReaders = 128 // Cap to prevent excessive goroutine fan-out + } + // ReaderCache limit should be at least concurrentReaders to allow parallel prefetching + readerCacheLimit := concurrentReaders * 2 + if readerCacheLimit < 32 { + readerCacheLimit = 32 + } group := &ChunkGroup{ - lookupFn: lookupFn, - sections: make(map[SectionIndex]*FileChunkSection), - readerCache: NewReaderCache(32, chunkCache, lookupFn), + lookupFn: lookupFn, + sections: make(map[SectionIndex]*FileChunkSection), + readerCache: NewReaderCache(readerCacheLimit, chunkCache, lookupFn), + concurrentReaders: concurrentReaders, } err := group.SetChunks(chunks) return group, err } +// GetPrefetchCount returns the number of chunks to prefetch ahead during sequential reads. +// This is derived from concurrentReaders to keep the network pipeline full. +func (group *ChunkGroup) GetPrefetchCount() int { + // Prefetch at least 1, and scale with concurrency (roughly 1/4 of concurrent readers) + prefetch := group.concurrentReaders / 4 + if prefetch < 1 { + prefetch = 1 + } + if prefetch > 8 { + prefetch = 8 // Cap at 8 to avoid excessive memory usage + } + return prefetch +} + func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error { group.sectionsLock.Lock() @@ -54,6 +89,19 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff [] defer group.sectionsLock.RUnlock() sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize) + numSections := int(sectionIndexStop - sectionIndexStart + 1) + + // For single section or when concurrency is disabled, use sequential reading + if numSections <= 1 || group.concurrentReaders <= 1 { + return group.readDataAtSequential(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop) + } + + // For multiple sections, use parallel reading + return group.readDataAtParallel(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop) +} + +// readDataAtSequential reads sections sequentially (original behavior) +func (group *ChunkGroup) readDataAtSequential(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) { for si := sectionIndexStart; si < sectionIndexStop+1; si++ { section, found := group.sections[si] rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) @@ -78,9 +126,98 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff [] return } +// sectionReadResult holds the result of a section read operation +type sectionReadResult struct { + sectionIndex SectionIndex + n int + tsNs int64 + err error +} + +// readDataAtParallel reads multiple sections in parallel for better throughput +func (group *ChunkGroup) readDataAtParallel(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) { + numSections := int(sectionIndexStop - sectionIndexStart + 1) + + // Limit concurrency to the smaller of concurrentReaders and numSections + maxConcurrent := group.concurrentReaders + if numSections < maxConcurrent { + maxConcurrent = numSections + } + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(maxConcurrent) + + results := make([]sectionReadResult, numSections) + + for i := 0; i < numSections; i++ { + si := sectionIndexStart + SectionIndex(i) + idx := i + + section, found := group.sections[si] + rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) + if rangeStart >= rangeStop { + continue + } + + if !found { + // Zero-fill missing sections synchronously + rangeStop = min(rangeStop, fileSize) + for j := rangeStart; j < rangeStop; j++ { + buff[j-offset] = 0 + } + results[idx] = sectionReadResult{ + sectionIndex: si, + n: int(rangeStop - rangeStart), + tsNs: 0, + err: nil, + } + continue + } + + // Capture variables for closure + sectionCopy := section + buffSlice := buff[rangeStart-offset : rangeStop-offset] + rangeStartCopy := rangeStart + + g.Go(func() error { + xn, xTsNs, xErr := sectionCopy.readDataAt(gCtx, group, fileSize, buffSlice, rangeStartCopy) + results[idx] = sectionReadResult{ + sectionIndex: si, + n: xn, + tsNs: xTsNs, + err: xErr, + } + if xErr != nil && xErr != io.EOF { + return xErr + } + return nil + }) + } + + // Wait for all goroutines to complete + groupErr := g.Wait() + + // Aggregate results + for _, result := range results { + n += result.n + tsNs = max(tsNs, result.tsNs) + // Collect first non-EOF error from results as fallback + if result.err != nil && result.err != io.EOF && err == nil { + err = result.err + } + } + + // Prioritize errgroup error (first error that cancelled context) + if groupErr != nil { + err = groupErr + } + + return +} + func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error { - group.sectionsLock.RLock() - defer group.sectionsLock.RUnlock() + group.sectionsLock.Lock() + defer group.sectionsLock.Unlock() var dataChunks []*filer_pb.FileChunk for _, chunk := range chunks { |
