diff options
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/filechunk_group.go | 157 | ||||
| -rw-r--r-- | weed/filer/filechunk_section.go | 2 | ||||
| -rw-r--r-- | weed/filer/reader_at.go | 23 | ||||
| -rw-r--r-- | weed/filer/reader_cache.go | 13 |
4 files changed, 175 insertions, 20 deletions
diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go index 0f449735a..ed92e78a9 100644 --- a/weed/filer/filechunk_group.go +++ b/weed/filer/filechunk_group.go @@ -5,29 +5,64 @@ import ( "io" "sync" + "golang.org/x/sync/errgroup" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/wdclient" ) type ChunkGroup struct { - lookupFn wdclient.LookupFileIdFunctionType - sections map[SectionIndex]*FileChunkSection - sectionsLock sync.RWMutex - readerCache *ReaderCache + lookupFn wdclient.LookupFileIdFunctionType + sections map[SectionIndex]*FileChunkSection + sectionsLock sync.RWMutex + readerCache *ReaderCache + concurrentReaders int } -func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) { +// NewChunkGroup creates a ChunkGroup with configurable concurrency. +// concurrentReaders controls: +// - Maximum parallel chunk fetches during read operations +// - Read-ahead prefetch parallelism +// - Number of concurrent section reads for large files +// If concurrentReaders <= 0, defaults to 16. +func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk, concurrentReaders int) (*ChunkGroup, error) { + if concurrentReaders <= 0 { + concurrentReaders = 16 + } + if concurrentReaders > 128 { + concurrentReaders = 128 // Cap to prevent excessive goroutine fan-out + } + // ReaderCache limit should be at least concurrentReaders to allow parallel prefetching + readerCacheLimit := concurrentReaders * 2 + if readerCacheLimit < 32 { + readerCacheLimit = 32 + } group := &ChunkGroup{ - lookupFn: lookupFn, - sections: make(map[SectionIndex]*FileChunkSection), - readerCache: NewReaderCache(32, chunkCache, lookupFn), + lookupFn: lookupFn, + sections: make(map[SectionIndex]*FileChunkSection), + readerCache: NewReaderCache(readerCacheLimit, chunkCache, lookupFn), + concurrentReaders: concurrentReaders, } err := group.SetChunks(chunks) return group, err } +// GetPrefetchCount returns the number of chunks to prefetch ahead during sequential reads. +// This is derived from concurrentReaders to keep the network pipeline full. +func (group *ChunkGroup) GetPrefetchCount() int { + // Prefetch at least 1, and scale with concurrency (roughly 1/4 of concurrent readers) + prefetch := group.concurrentReaders / 4 + if prefetch < 1 { + prefetch = 1 + } + if prefetch > 8 { + prefetch = 8 // Cap at 8 to avoid excessive memory usage + } + return prefetch +} + func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error { group.sectionsLock.Lock() @@ -54,6 +89,19 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff [] defer group.sectionsLock.RUnlock() sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize) + numSections := int(sectionIndexStop - sectionIndexStart + 1) + + // For single section or when concurrency is disabled, use sequential reading + if numSections <= 1 || group.concurrentReaders <= 1 { + return group.readDataAtSequential(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop) + } + + // For multiple sections, use parallel reading + return group.readDataAtParallel(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop) +} + +// readDataAtSequential reads sections sequentially (original behavior) +func (group *ChunkGroup) readDataAtSequential(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) { for si := sectionIndexStart; si < sectionIndexStop+1; si++ { section, found := group.sections[si] rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) @@ -78,9 +126,98 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff [] return } +// sectionReadResult holds the result of a section read operation +type sectionReadResult struct { + sectionIndex SectionIndex + n int + tsNs int64 + err error +} + +// readDataAtParallel reads multiple sections in parallel for better throughput +func (group *ChunkGroup) readDataAtParallel(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) { + numSections := int(sectionIndexStop - sectionIndexStart + 1) + + // Limit concurrency to the smaller of concurrentReaders and numSections + maxConcurrent := group.concurrentReaders + if numSections < maxConcurrent { + maxConcurrent = numSections + } + + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(maxConcurrent) + + results := make([]sectionReadResult, numSections) + + for i := 0; i < numSections; i++ { + si := sectionIndexStart + SectionIndex(i) + idx := i + + section, found := group.sections[si] + rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize)) + if rangeStart >= rangeStop { + continue + } + + if !found { + // Zero-fill missing sections synchronously + rangeStop = min(rangeStop, fileSize) + for j := rangeStart; j < rangeStop; j++ { + buff[j-offset] = 0 + } + results[idx] = sectionReadResult{ + sectionIndex: si, + n: int(rangeStop - rangeStart), + tsNs: 0, + err: nil, + } + continue + } + + // Capture variables for closure + sectionCopy := section + buffSlice := buff[rangeStart-offset : rangeStop-offset] + rangeStartCopy := rangeStart + + g.Go(func() error { + xn, xTsNs, xErr := sectionCopy.readDataAt(gCtx, group, fileSize, buffSlice, rangeStartCopy) + results[idx] = sectionReadResult{ + sectionIndex: si, + n: xn, + tsNs: xTsNs, + err: xErr, + } + if xErr != nil && xErr != io.EOF { + return xErr + } + return nil + }) + } + + // Wait for all goroutines to complete + groupErr := g.Wait() + + // Aggregate results + for _, result := range results { + n += result.n + tsNs = max(tsNs, result.tsNs) + // Collect first non-EOF error from results as fallback + if result.err != nil && result.err != io.EOF && err == nil { + err = result.err + } + } + + // Prioritize errgroup error (first error that cancelled context) + if groupErr != nil { + err = groupErr + } + + return +} + func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error { - group.sectionsLock.RLock() - defer group.sectionsLock.RUnlock() + group.sectionsLock.Lock() + defer group.sectionsLock.Unlock() var dataChunks []*filer_pb.FileChunk for _, chunk := range chunks { diff --git a/weed/filer/filechunk_section.go b/weed/filer/filechunk_section.go index 76eb84c23..dd7c2ea48 100644 --- a/weed/filer/filechunk_section.go +++ b/weed/filer/filechunk_section.go @@ -85,7 +85,7 @@ func (section *FileChunkSection) setupForRead(ctx context.Context, group *ChunkG } if section.reader == nil { - section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize)) + section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize), group.GetPrefetchCount()) } section.isPrepared = true diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index aeac9b34a..93fa76a2e 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -13,6 +13,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/wdclient" ) +// DefaultPrefetchCount is the default number of chunks to prefetch ahead during +// sequential reads. This value is used when prefetch count is not explicitly +// configured (e.g., WebDAV, query engine, message queue). For mount operations, +// the prefetch count is derived from the -concurrentReaders option. +const DefaultPrefetchCount = 4 + type ChunkReadAt struct { masterClient *wdclient.MasterClient chunkViews *IntervalList[*ChunkView] @@ -20,6 +26,7 @@ type ChunkReadAt struct { readerCache *ReaderCache readerPattern *ReaderPattern lastChunkFid string + prefetchCount int // Number of chunks to prefetch ahead during sequential reads ctx context.Context // Context used for cancellation during chunk read operations } @@ -35,8 +42,9 @@ var _ = io.Closer(&ChunkReadAt{}) // - No high availability (single filer address, no automatic failover) // // For NEW code, especially mount operations, use wdclient.FilerClient instead: -// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts) -// lookupFn := filerClient.GetLookupFileIdFunction() +// +// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts) +// lookupFn := filerClient.GetLookupFileIdFunction() // // This provides: // - Bounded cache with configurable size @@ -56,7 +64,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp var vidCacheLock sync.RWMutex cacheSize := 0 const maxCacheSize = 10000 // Simple bound to prevent unbounded growth - + return func(ctx context.Context, fileId string) (targetUrls []string, err error) { vid := VolumeId(fileId) vidCacheLock.RLock() @@ -123,13 +131,14 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp } } -func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64) *ChunkReadAt { +func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64, prefetchCount int) *ChunkReadAt { return &ChunkReadAt{ chunkViews: chunkViews, fileSize: fileSize, readerCache: readerCache, readerPattern: NewReaderPattern(), + prefetchCount: prefetchCount, ctx: ctx, } } @@ -246,8 +255,10 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk if c.lastChunkFid != "" { c.readerCache.UnCache(c.lastChunkFid) } - if nextChunkViews != nil { - c.readerCache.MaybeCache(nextChunkViews) // just read the next chunk if at the very beginning + if nextChunkViews != nil && c.prefetchCount > 0 { + // Prefetch multiple chunks ahead for better sequential read throughput + // This keeps the network pipeline full with parallel chunk fetches + c.readerCache.MaybeCache(nextChunkViews, c.prefetchCount) } } } diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index 11382bed3..605be5e73 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -46,10 +46,16 @@ func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn } } -func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { +// MaybeCache prefetches up to 'count' chunks ahead in parallel. +// This improves read throughput for sequential reads by keeping the +// network pipeline full with parallel chunk fetches. +func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView], count int) { if rc.lookupFileIdFn == nil { return } + if count <= 0 { + count = 1 + } rc.Lock() defer rc.Unlock() @@ -58,7 +64,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { return } - for x := chunkViews; x != nil; x = x.Next { + cached := 0 + for x := chunkViews; x != nil && cached < count; x = x.Next { chunkView := x.Value if _, found := rc.downloaders[chunkView.FileId]; found { continue @@ -80,7 +87,7 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) { go cacher.startCaching() <-cacher.cacheStartedCh rc.downloaders[chunkView.FileId] = cacher - + cached++ } return |
