aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/filechunk_group.go157
-rw-r--r--weed/filer/filechunk_section.go2
-rw-r--r--weed/filer/reader_at.go23
-rw-r--r--weed/filer/reader_cache.go13
4 files changed, 175 insertions, 20 deletions
diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go
index 0f449735a..ed92e78a9 100644
--- a/weed/filer/filechunk_group.go
+++ b/weed/filer/filechunk_group.go
@@ -5,29 +5,64 @@ import (
"io"
"sync"
+ "golang.org/x/sync/errgroup"
+
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
type ChunkGroup struct {
- lookupFn wdclient.LookupFileIdFunctionType
- sections map[SectionIndex]*FileChunkSection
- sectionsLock sync.RWMutex
- readerCache *ReaderCache
+ lookupFn wdclient.LookupFileIdFunctionType
+ sections map[SectionIndex]*FileChunkSection
+ sectionsLock sync.RWMutex
+ readerCache *ReaderCache
+ concurrentReaders int
}
-func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) {
+// NewChunkGroup creates a ChunkGroup with configurable concurrency.
+// concurrentReaders controls:
+// - Maximum parallel chunk fetches during read operations
+// - Read-ahead prefetch parallelism
+// - Number of concurrent section reads for large files
+// If concurrentReaders <= 0, defaults to 16.
+func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk, concurrentReaders int) (*ChunkGroup, error) {
+ if concurrentReaders <= 0 {
+ concurrentReaders = 16
+ }
+ if concurrentReaders > 128 {
+ concurrentReaders = 128 // Cap to prevent excessive goroutine fan-out
+ }
+ // ReaderCache limit should be at least concurrentReaders to allow parallel prefetching
+ readerCacheLimit := concurrentReaders * 2
+ if readerCacheLimit < 32 {
+ readerCacheLimit = 32
+ }
group := &ChunkGroup{
- lookupFn: lookupFn,
- sections: make(map[SectionIndex]*FileChunkSection),
- readerCache: NewReaderCache(32, chunkCache, lookupFn),
+ lookupFn: lookupFn,
+ sections: make(map[SectionIndex]*FileChunkSection),
+ readerCache: NewReaderCache(readerCacheLimit, chunkCache, lookupFn),
+ concurrentReaders: concurrentReaders,
}
err := group.SetChunks(chunks)
return group, err
}
+// GetPrefetchCount returns the number of chunks to prefetch ahead during sequential reads.
+// This is derived from concurrentReaders to keep the network pipeline full.
+func (group *ChunkGroup) GetPrefetchCount() int {
+ // Prefetch at least 1, and scale with concurrency (roughly 1/4 of concurrent readers)
+ prefetch := group.concurrentReaders / 4
+ if prefetch < 1 {
+ prefetch = 1
+ }
+ if prefetch > 8 {
+ prefetch = 8 // Cap at 8 to avoid excessive memory usage
+ }
+ return prefetch
+}
+
func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error {
group.sectionsLock.Lock()
@@ -54,6 +89,19 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff []
defer group.sectionsLock.RUnlock()
sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize)
+ numSections := int(sectionIndexStop - sectionIndexStart + 1)
+
+ // For single section or when concurrency is disabled, use sequential reading
+ if numSections <= 1 || group.concurrentReaders <= 1 {
+ return group.readDataAtSequential(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop)
+ }
+
+ // For multiple sections, use parallel reading
+ return group.readDataAtParallel(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop)
+}
+
+// readDataAtSequential reads sections sequentially (original behavior)
+func (group *ChunkGroup) readDataAtSequential(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) {
for si := sectionIndexStart; si < sectionIndexStop+1; si++ {
section, found := group.sections[si]
rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize))
@@ -78,9 +126,98 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff []
return
}
+// sectionReadResult holds the result of a section read operation
+type sectionReadResult struct {
+ sectionIndex SectionIndex
+ n int
+ tsNs int64
+ err error
+}
+
+// readDataAtParallel reads multiple sections in parallel for better throughput
+func (group *ChunkGroup) readDataAtParallel(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) {
+ numSections := int(sectionIndexStop - sectionIndexStart + 1)
+
+ // Limit concurrency to the smaller of concurrentReaders and numSections
+ maxConcurrent := group.concurrentReaders
+ if numSections < maxConcurrent {
+ maxConcurrent = numSections
+ }
+
+ g, gCtx := errgroup.WithContext(ctx)
+ g.SetLimit(maxConcurrent)
+
+ results := make([]sectionReadResult, numSections)
+
+ for i := 0; i < numSections; i++ {
+ si := sectionIndexStart + SectionIndex(i)
+ idx := i
+
+ section, found := group.sections[si]
+ rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize))
+ if rangeStart >= rangeStop {
+ continue
+ }
+
+ if !found {
+ // Zero-fill missing sections synchronously
+ rangeStop = min(rangeStop, fileSize)
+ for j := rangeStart; j < rangeStop; j++ {
+ buff[j-offset] = 0
+ }
+ results[idx] = sectionReadResult{
+ sectionIndex: si,
+ n: int(rangeStop - rangeStart),
+ tsNs: 0,
+ err: nil,
+ }
+ continue
+ }
+
+ // Capture variables for closure
+ sectionCopy := section
+ buffSlice := buff[rangeStart-offset : rangeStop-offset]
+ rangeStartCopy := rangeStart
+
+ g.Go(func() error {
+ xn, xTsNs, xErr := sectionCopy.readDataAt(gCtx, group, fileSize, buffSlice, rangeStartCopy)
+ results[idx] = sectionReadResult{
+ sectionIndex: si,
+ n: xn,
+ tsNs: xTsNs,
+ err: xErr,
+ }
+ if xErr != nil && xErr != io.EOF {
+ return xErr
+ }
+ return nil
+ })
+ }
+
+ // Wait for all goroutines to complete
+ groupErr := g.Wait()
+
+ // Aggregate results
+ for _, result := range results {
+ n += result.n
+ tsNs = max(tsNs, result.tsNs)
+ // Collect first non-EOF error from results as fallback
+ if result.err != nil && result.err != io.EOF && err == nil {
+ err = result.err
+ }
+ }
+
+ // Prioritize errgroup error (first error that cancelled context)
+ if groupErr != nil {
+ err = groupErr
+ }
+
+ return
+}
+
func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error {
- group.sectionsLock.RLock()
- defer group.sectionsLock.RUnlock()
+ group.sectionsLock.Lock()
+ defer group.sectionsLock.Unlock()
var dataChunks []*filer_pb.FileChunk
for _, chunk := range chunks {
diff --git a/weed/filer/filechunk_section.go b/weed/filer/filechunk_section.go
index 76eb84c23..dd7c2ea48 100644
--- a/weed/filer/filechunk_section.go
+++ b/weed/filer/filechunk_section.go
@@ -85,7 +85,7 @@ func (section *FileChunkSection) setupForRead(ctx context.Context, group *ChunkG
}
if section.reader == nil {
- section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
+ section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize), group.GetPrefetchCount())
}
section.isPrepared = true
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index aeac9b34a..93fa76a2e 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -13,6 +13,12 @@ import (
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
+// DefaultPrefetchCount is the default number of chunks to prefetch ahead during
+// sequential reads. This value is used when prefetch count is not explicitly
+// configured (e.g., WebDAV, query engine, message queue). For mount operations,
+// the prefetch count is derived from the -concurrentReaders option.
+const DefaultPrefetchCount = 4
+
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews *IntervalList[*ChunkView]
@@ -20,6 +26,7 @@ type ChunkReadAt struct {
readerCache *ReaderCache
readerPattern *ReaderPattern
lastChunkFid string
+ prefetchCount int // Number of chunks to prefetch ahead during sequential reads
ctx context.Context // Context used for cancellation during chunk read operations
}
@@ -35,8 +42,9 @@ var _ = io.Closer(&ChunkReadAt{})
// - No high availability (single filer address, no automatic failover)
//
// For NEW code, especially mount operations, use wdclient.FilerClient instead:
-// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts)
-// lookupFn := filerClient.GetLookupFileIdFunction()
+//
+// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts)
+// lookupFn := filerClient.GetLookupFileIdFunction()
//
// This provides:
// - Bounded cache with configurable size
@@ -56,7 +64,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
var vidCacheLock sync.RWMutex
cacheSize := 0
const maxCacheSize = 10000 // Simple bound to prevent unbounded growth
-
+
return func(ctx context.Context, fileId string) (targetUrls []string, err error) {
vid := VolumeId(fileId)
vidCacheLock.RLock()
@@ -123,13 +131,14 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
}
}
-func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64) *ChunkReadAt {
+func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64, prefetchCount int) *ChunkReadAt {
return &ChunkReadAt{
chunkViews: chunkViews,
fileSize: fileSize,
readerCache: readerCache,
readerPattern: NewReaderPattern(),
+ prefetchCount: prefetchCount,
ctx: ctx,
}
}
@@ -246,8 +255,10 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk
if c.lastChunkFid != "" {
c.readerCache.UnCache(c.lastChunkFid)
}
- if nextChunkViews != nil {
- c.readerCache.MaybeCache(nextChunkViews) // just read the next chunk if at the very beginning
+ if nextChunkViews != nil && c.prefetchCount > 0 {
+ // Prefetch multiple chunks ahead for better sequential read throughput
+ // This keeps the network pipeline full with parallel chunk fetches
+ c.readerCache.MaybeCache(nextChunkViews, c.prefetchCount)
}
}
}
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index 11382bed3..605be5e73 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -46,10 +46,16 @@ func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn
}
}
-func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
+// MaybeCache prefetches up to 'count' chunks ahead in parallel.
+// This improves read throughput for sequential reads by keeping the
+// network pipeline full with parallel chunk fetches.
+func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView], count int) {
if rc.lookupFileIdFn == nil {
return
}
+ if count <= 0 {
+ count = 1
+ }
rc.Lock()
defer rc.Unlock()
@@ -58,7 +64,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
return
}
- for x := chunkViews; x != nil; x = x.Next {
+ cached := 0
+ for x := chunkViews; x != nil && cached < count; x = x.Next {
chunkView := x.Value
if _, found := rc.downloaders[chunkView.FileId]; found {
continue
@@ -80,7 +87,7 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
go cacher.startCaching()
<-cacher.cacheStartedCh
rc.downloaders[chunkView.FileId] = cacher
-
+ cached++
}
return