aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/fuse_std.go7
-rw-r--r--weed/command/mount.go2
-rw-r--r--weed/command/mount_std.go1
-rw-r--r--weed/filer/filechunk_group.go157
-rw-r--r--weed/filer/filechunk_section.go2
-rw-r--r--weed/filer/reader_at.go23
-rw-r--r--weed/filer/reader_cache.go13
-rw-r--r--weed/mount/filehandle.go2
-rw-r--r--weed/mount/weedfs.go1
-rw-r--r--weed/mq/logstore/read_parquet_to_log.go2
-rw-r--r--weed/query/engine/hybrid_message_scanner.go2
-rw-r--r--weed/query/engine/parquet_scanner.go2
-rw-r--r--weed/server/webdav_server.go2
13 files changed, 191 insertions, 25 deletions
diff --git a/weed/command/fuse_std.go b/weed/command/fuse_std.go
index b2839aaf8..2cc6fa8ab 100644
--- a/weed/command/fuse_std.go
+++ b/weed/command/fuse_std.go
@@ -155,6 +155,13 @@ func runFuse(cmd *Command, args []string) bool {
} else {
panic(fmt.Errorf("concurrentWriters: %s", err))
}
+ case "concurrentReaders":
+ if parsed, err := strconv.ParseInt(parameter.value, 0, 32); err == nil {
+ intValue := int(parsed)
+ mountOptions.concurrentReaders = &intValue
+ } else {
+ panic(fmt.Errorf("concurrentReaders: %s", err))
+ }
case "cacheDir":
mountOptions.cacheDirForRead = &parameter.value
case "cacheCapacityMB":
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 98f139c6f..618bbd3ae 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -17,6 +17,7 @@ type MountOptions struct {
ttlSec *int
chunkSizeLimitMB *int
concurrentWriters *int
+ concurrentReaders *int
cacheMetaTtlSec *int
cacheDirForRead *string
cacheDirForWrite *string
@@ -65,6 +66,7 @@ func init() {
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 32, "limit concurrent goroutine writers")
+ mountOptions.concurrentReaders = cmdMount.Flag.Int("concurrentReaders", 16, "limit concurrent chunk fetches for read operations")
mountOptions.cacheDirForRead = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data")
mountOptions.cacheSizeMBForRead = cmdMount.Flag.Int64("cacheCapacityMB", 128, "file chunk read cache capacity in MB")
mountOptions.cacheDirForWrite = cmdMount.Flag.String("cacheDirWrite", "", "buffer writes mostly for large files")
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 53b09589d..d1593454e 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -236,6 +236,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
DiskType: types.ToDiskType(*option.diskType),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
ConcurrentWriters: *option.concurrentWriters,
+ ConcurrentReaders: *option.concurrentReaders,
CacheDirForRead: *option.cacheDirForRead,
CacheSizeMBForRead: *option.cacheSizeMBForRead,
CacheDirForWrite: cacheDirForWrite,
diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go
index 0f449735a..ed92e78a9 100644
--- a/weed/filer/filechunk_group.go
+++ b/weed/filer/filechunk_group.go
@@ -5,29 +5,64 @@ import (
"io"
"sync"
+ "golang.org/x/sync/errgroup"
+
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
type ChunkGroup struct {
- lookupFn wdclient.LookupFileIdFunctionType
- sections map[SectionIndex]*FileChunkSection
- sectionsLock sync.RWMutex
- readerCache *ReaderCache
+ lookupFn wdclient.LookupFileIdFunctionType
+ sections map[SectionIndex]*FileChunkSection
+ sectionsLock sync.RWMutex
+ readerCache *ReaderCache
+ concurrentReaders int
}
-func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) {
+// NewChunkGroup creates a ChunkGroup with configurable concurrency.
+// concurrentReaders controls:
+// - Maximum parallel chunk fetches during read operations
+// - Read-ahead prefetch parallelism
+// - Number of concurrent section reads for large files
+// If concurrentReaders <= 0, defaults to 16.
+func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk, concurrentReaders int) (*ChunkGroup, error) {
+ if concurrentReaders <= 0 {
+ concurrentReaders = 16
+ }
+ if concurrentReaders > 128 {
+ concurrentReaders = 128 // Cap to prevent excessive goroutine fan-out
+ }
+ // ReaderCache limit should be at least concurrentReaders to allow parallel prefetching
+ readerCacheLimit := concurrentReaders * 2
+ if readerCacheLimit < 32 {
+ readerCacheLimit = 32
+ }
group := &ChunkGroup{
- lookupFn: lookupFn,
- sections: make(map[SectionIndex]*FileChunkSection),
- readerCache: NewReaderCache(32, chunkCache, lookupFn),
+ lookupFn: lookupFn,
+ sections: make(map[SectionIndex]*FileChunkSection),
+ readerCache: NewReaderCache(readerCacheLimit, chunkCache, lookupFn),
+ concurrentReaders: concurrentReaders,
}
err := group.SetChunks(chunks)
return group, err
}
+// GetPrefetchCount returns the number of chunks to prefetch ahead during sequential reads.
+// This is derived from concurrentReaders to keep the network pipeline full.
+func (group *ChunkGroup) GetPrefetchCount() int {
+ // Prefetch at least 1, and scale with concurrency (roughly 1/4 of concurrent readers)
+ prefetch := group.concurrentReaders / 4
+ if prefetch < 1 {
+ prefetch = 1
+ }
+ if prefetch > 8 {
+ prefetch = 8 // Cap at 8 to avoid excessive memory usage
+ }
+ return prefetch
+}
+
func (group *ChunkGroup) AddChunk(chunk *filer_pb.FileChunk) error {
group.sectionsLock.Lock()
@@ -54,6 +89,19 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff []
defer group.sectionsLock.RUnlock()
sectionIndexStart, sectionIndexStop := SectionIndex(offset/SectionSize), SectionIndex((offset+int64(len(buff)))/SectionSize)
+ numSections := int(sectionIndexStop - sectionIndexStart + 1)
+
+ // For single section or when concurrency is disabled, use sequential reading
+ if numSections <= 1 || group.concurrentReaders <= 1 {
+ return group.readDataAtSequential(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop)
+ }
+
+ // For multiple sections, use parallel reading
+ return group.readDataAtParallel(ctx, fileSize, buff, offset, sectionIndexStart, sectionIndexStop)
+}
+
+// readDataAtSequential reads sections sequentially (original behavior)
+func (group *ChunkGroup) readDataAtSequential(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) {
for si := sectionIndexStart; si < sectionIndexStop+1; si++ {
section, found := group.sections[si]
rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize))
@@ -78,9 +126,98 @@ func (group *ChunkGroup) ReadDataAt(ctx context.Context, fileSize int64, buff []
return
}
+// sectionReadResult holds the result of a section read operation
+type sectionReadResult struct {
+ sectionIndex SectionIndex
+ n int
+ tsNs int64
+ err error
+}
+
+// readDataAtParallel reads multiple sections in parallel for better throughput
+func (group *ChunkGroup) readDataAtParallel(ctx context.Context, fileSize int64, buff []byte, offset int64, sectionIndexStart, sectionIndexStop SectionIndex) (n int, tsNs int64, err error) {
+ numSections := int(sectionIndexStop - sectionIndexStart + 1)
+
+ // Limit concurrency to the smaller of concurrentReaders and numSections
+ maxConcurrent := group.concurrentReaders
+ if numSections < maxConcurrent {
+ maxConcurrent = numSections
+ }
+
+ g, gCtx := errgroup.WithContext(ctx)
+ g.SetLimit(maxConcurrent)
+
+ results := make([]sectionReadResult, numSections)
+
+ for i := 0; i < numSections; i++ {
+ si := sectionIndexStart + SectionIndex(i)
+ idx := i
+
+ section, found := group.sections[si]
+ rangeStart, rangeStop := max(offset, int64(si*SectionSize)), min(offset+int64(len(buff)), int64((si+1)*SectionSize))
+ if rangeStart >= rangeStop {
+ continue
+ }
+
+ if !found {
+ // Zero-fill missing sections synchronously
+ rangeStop = min(rangeStop, fileSize)
+ for j := rangeStart; j < rangeStop; j++ {
+ buff[j-offset] = 0
+ }
+ results[idx] = sectionReadResult{
+ sectionIndex: si,
+ n: int(rangeStop - rangeStart),
+ tsNs: 0,
+ err: nil,
+ }
+ continue
+ }
+
+ // Capture variables for closure
+ sectionCopy := section
+ buffSlice := buff[rangeStart-offset : rangeStop-offset]
+ rangeStartCopy := rangeStart
+
+ g.Go(func() error {
+ xn, xTsNs, xErr := sectionCopy.readDataAt(gCtx, group, fileSize, buffSlice, rangeStartCopy)
+ results[idx] = sectionReadResult{
+ sectionIndex: si,
+ n: xn,
+ tsNs: xTsNs,
+ err: xErr,
+ }
+ if xErr != nil && xErr != io.EOF {
+ return xErr
+ }
+ return nil
+ })
+ }
+
+ // Wait for all goroutines to complete
+ groupErr := g.Wait()
+
+ // Aggregate results
+ for _, result := range results {
+ n += result.n
+ tsNs = max(tsNs, result.tsNs)
+ // Collect first non-EOF error from results as fallback
+ if result.err != nil && result.err != io.EOF && err == nil {
+ err = result.err
+ }
+ }
+
+ // Prioritize errgroup error (first error that cancelled context)
+ if groupErr != nil {
+ err = groupErr
+ }
+
+ return
+}
+
func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error {
- group.sectionsLock.RLock()
- defer group.sectionsLock.RUnlock()
+ group.sectionsLock.Lock()
+ defer group.sectionsLock.Unlock()
var dataChunks []*filer_pb.FileChunk
for _, chunk := range chunks {
diff --git a/weed/filer/filechunk_section.go b/weed/filer/filechunk_section.go
index 76eb84c23..dd7c2ea48 100644
--- a/weed/filer/filechunk_section.go
+++ b/weed/filer/filechunk_section.go
@@ -85,7 +85,7 @@ func (section *FileChunkSection) setupForRead(ctx context.Context, group *ChunkG
}
if section.reader == nil {
- section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
+ section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize), group.GetPrefetchCount())
}
section.isPrepared = true
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index aeac9b34a..93fa76a2e 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -13,6 +13,12 @@ import (
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
+// DefaultPrefetchCount is the default number of chunks to prefetch ahead during
+// sequential reads. This value is used when prefetch count is not explicitly
+// configured (e.g., WebDAV, query engine, message queue). For mount operations,
+// the prefetch count is derived from the -concurrentReaders option.
+const DefaultPrefetchCount = 4
+
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews *IntervalList[*ChunkView]
@@ -20,6 +26,7 @@ type ChunkReadAt struct {
readerCache *ReaderCache
readerPattern *ReaderPattern
lastChunkFid string
+ prefetchCount int // Number of chunks to prefetch ahead during sequential reads
ctx context.Context // Context used for cancellation during chunk read operations
}
@@ -35,8 +42,9 @@ var _ = io.Closer(&ChunkReadAt{})
// - No high availability (single filer address, no automatic failover)
//
// For NEW code, especially mount operations, use wdclient.FilerClient instead:
-// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts)
-// lookupFn := filerClient.GetLookupFileIdFunction()
+//
+// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts)
+// lookupFn := filerClient.GetLookupFileIdFunction()
//
// This provides:
// - Bounded cache with configurable size
@@ -56,7 +64,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
var vidCacheLock sync.RWMutex
cacheSize := 0
const maxCacheSize = 10000 // Simple bound to prevent unbounded growth
-
+
return func(ctx context.Context, fileId string) (targetUrls []string, err error) {
vid := VolumeId(fileId)
vidCacheLock.RLock()
@@ -123,13 +131,14 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
}
}
-func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64) *ChunkReadAt {
+func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64, prefetchCount int) *ChunkReadAt {
return &ChunkReadAt{
chunkViews: chunkViews,
fileSize: fileSize,
readerCache: readerCache,
readerPattern: NewReaderPattern(),
+ prefetchCount: prefetchCount,
ctx: ctx,
}
}
@@ -246,8 +255,10 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk
if c.lastChunkFid != "" {
c.readerCache.UnCache(c.lastChunkFid)
}
- if nextChunkViews != nil {
- c.readerCache.MaybeCache(nextChunkViews) // just read the next chunk if at the very beginning
+ if nextChunkViews != nil && c.prefetchCount > 0 {
+ // Prefetch multiple chunks ahead for better sequential read throughput
+ // This keeps the network pipeline full with parallel chunk fetches
+ c.readerCache.MaybeCache(nextChunkViews, c.prefetchCount)
}
}
}
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index 11382bed3..605be5e73 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -46,10 +46,16 @@ func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn
}
}
-func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
+// MaybeCache prefetches up to 'count' chunks ahead in parallel.
+// This improves read throughput for sequential reads by keeping the
+// network pipeline full with parallel chunk fetches.
+func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView], count int) {
if rc.lookupFileIdFn == nil {
return
}
+ if count <= 0 {
+ count = 1
+ }
rc.Lock()
defer rc.Unlock()
@@ -58,7 +64,8 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
return
}
- for x := chunkViews; x != nil; x = x.Next {
+ cached := 0
+ for x := chunkViews; x != nil && cached < count; x = x.Next {
chunkView := x.Value
if _, found := rc.downloaders[chunkView.FileId]; found {
continue
@@ -80,7 +87,7 @@ func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
go cacher.startCaching()
<-cacher.cacheStartedCh
rc.downloaders[chunkView.FileId] = cacher
-
+ cached++
}
return
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
index c20f9eca8..e912fe310 100644
--- a/weed/mount/filehandle.go
+++ b/weed/mount/filehandle.go
@@ -81,7 +81,7 @@ func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
fileSize := filer.FileSize(entry)
entry.Attributes.FileSize = fileSize
var resolveManifestErr error
- fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks)
+ fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks, fh.wfs.option.ConcurrentReaders)
if resolveManifestErr != nil {
glog.Warningf("failed to resolve manifest chunks in %+v", entry)
}
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index 21c54841a..80e062c60 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -42,6 +42,7 @@ type Option struct {
DiskType types.DiskType
ChunkSizeLimit int64
ConcurrentWriters int
+ ConcurrentReaders int
CacheDirForRead string
CacheSizeMBForRead int64
CacheDirForWrite string
diff --git a/weed/mq/logstore/read_parquet_to_log.go b/weed/mq/logstore/read_parquet_to_log.go
index 01191eaad..7214965f5 100644
--- a/weed/mq/logstore/read_parquet_to_log.go
+++ b/weed/mq/logstore/read_parquet_to_log.go
@@ -101,7 +101,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
- readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize))
+ readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount)
// create parquet reader
parquetReader := parquet.NewReader(readerAt)
diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go
index c09ce2f54..8fa9f4381 100644
--- a/weed/query/engine/hybrid_message_scanner.go
+++ b/weed/query/engine/hybrid_message_scanner.go
@@ -1286,7 +1286,7 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
- readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize))
+ readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount)
// Create parquet reader - this only reads metadata, not data
parquetReader := parquet.NewReader(readerAt)
diff --git a/weed/query/engine/parquet_scanner.go b/weed/query/engine/parquet_scanner.go
index e4b5252c7..7a470817b 100644
--- a/weed/query/engine/parquet_scanner.go
+++ b/weed/query/engine/parquet_scanner.go
@@ -182,7 +182,7 @@ func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.E
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
readerCache := filer.NewReaderCache(32, ps.chunkCache, lookupFileIdFn)
- readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize))
+ readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount)
// Create Parquet reader
parquetReader := parquet.NewReader(readerAt)
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index aa43189f5..3e0e23148 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -566,7 +566,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
}
if f.reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize)
- f.reader = filer.NewChunkReaderAtFromClient(f.ctx, f.fs.readerCache, chunkViews, fileSize)
+ f.reader = filer.NewChunkReaderAtFromClient(f.ctx, f.fs.readerCache, chunkViews, fileSize, filer.DefaultPrefetchCount)
}
readSize, err = f.reader.ReadAt(p, f.off)