aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/reader_at.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/reader_at.go')
-rw-r--r--weed/filer/reader_at.go23
1 files changed, 17 insertions, 6 deletions
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index aeac9b34a..93fa76a2e 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -13,6 +13,12 @@ import (
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
+// DefaultPrefetchCount is the default number of chunks to prefetch ahead during
+// sequential reads. This value is used when prefetch count is not explicitly
+// configured (e.g., WebDAV, query engine, message queue). For mount operations,
+// the prefetch count is derived from the -concurrentReaders option.
+const DefaultPrefetchCount = 4
+
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews *IntervalList[*ChunkView]
@@ -20,6 +26,7 @@ type ChunkReadAt struct {
readerCache *ReaderCache
readerPattern *ReaderPattern
lastChunkFid string
+ prefetchCount int // Number of chunks to prefetch ahead during sequential reads
ctx context.Context // Context used for cancellation during chunk read operations
}
@@ -35,8 +42,9 @@ var _ = io.Closer(&ChunkReadAt{})
// - No high availability (single filer address, no automatic failover)
//
// For NEW code, especially mount operations, use wdclient.FilerClient instead:
-// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts)
-// lookupFn := filerClient.GetLookupFileIdFunction()
+//
+// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts)
+// lookupFn := filerClient.GetLookupFileIdFunction()
//
// This provides:
// - Bounded cache with configurable size
@@ -56,7 +64,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
var vidCacheLock sync.RWMutex
cacheSize := 0
const maxCacheSize = 10000 // Simple bound to prevent unbounded growth
-
+
return func(ctx context.Context, fileId string) (targetUrls []string, err error) {
vid := VolumeId(fileId)
vidCacheLock.RLock()
@@ -123,13 +131,14 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
}
}
-func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64) *ChunkReadAt {
+func NewChunkReaderAtFromClient(ctx context.Context, readerCache *ReaderCache, chunkViews *IntervalList[*ChunkView], fileSize int64, prefetchCount int) *ChunkReadAt {
return &ChunkReadAt{
chunkViews: chunkViews,
fileSize: fileSize,
readerCache: readerCache,
readerPattern: NewReaderPattern(),
+ prefetchCount: prefetchCount,
ctx: ctx,
}
}
@@ -246,8 +255,10 @@ func (c *ChunkReadAt) readChunkSliceAt(ctx context.Context, buffer []byte, chunk
if c.lastChunkFid != "" {
c.readerCache.UnCache(c.lastChunkFid)
}
- if nextChunkViews != nil {
- c.readerCache.MaybeCache(nextChunkViews) // just read the next chunk if at the very beginning
+ if nextChunkViews != nil && c.prefetchCount > 0 {
+ // Prefetch multiple chunks ahead for better sequential read throughput
+ // This keeps the network pipeline full with parallel chunk fetches
+ c.readerCache.MaybeCache(nextChunkViews, c.prefetchCount)
}
}
}