aboutsummaryrefslogtreecommitdiff
path: root/weed/mount
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-12-09 23:44:15 -0800
committerGitHub <noreply@github.com>2025-12-09 23:44:15 -0800
commit0cd9f341778775195fb44cc32d1434e63dba4fca (patch)
tree7b3a0bc3a71a18081b6ca4669a810845bb0c4ced /weed/mount
parent1e1473ef4ac8e0ca70f25a3bcf5ef57f79b03779 (diff)
downloadseaweedfs-0cd9f341778775195fb44cc32d1434e63dba4fca.tar.xz
seaweedfs-0cd9f341778775195fb44cc32d1434e63dba4fca.zip
mount: improve EnsureVisited performance with dedup, parallelism, and batching (#7697)
* mount: add singleflight to deduplicate concurrent EnsureVisited calls When multiple goroutines access the same uncached directory simultaneously, they would all make redundant network requests to the filer. This change uses singleflight.Group to ensure only one goroutine fetches the directory entries while others wait for the result. This fixes a race condition where concurrent lookups or readdir operations on the same uncached directory would: 1. Make duplicate network requests to the filer 2. Insert duplicate entries into LevelDB cache 3. Waste CPU and network bandwidth * mount: fetch parent directories in parallel during EnsureVisited Previously, when accessing a deep path like /a/b/c/d, the parent directories were fetched serially from target to root. This change: 1. Collects all uncached directories from target to root first 2. Fetches them all in parallel using errgroup 3. Relies on singleflight (from previous commit) for deduplication This reduces latency when accessing deep uncached paths, especially in high-latency network environments where parallel requests can significantly improve performance. * mount: add batch inserts for LevelDB meta cache When populating the meta cache from filer, entries were inserted one-by-one into LevelDB. This change: 1. Adds BatchInsertEntries method to LevelDBStore that uses LevelDB's native batch write API 2. Updates MetaCache to keep a direct reference to the LevelDB store for batch operations 3. Modifies doEnsureVisited to collect entries and insert them in batches of 100 entries Batch writes are more efficient because: - Reduces number of individual write operations - Reduces disk syncs - Improves throughput for large directories * mount: fix potential nil dereference in MarkChildrenCached Add missing check for inode existence in inode2path map before accessing the InodeEntry. This prevents a potential nil pointer dereference if the inode exists in path2inode but not in inode2path (which could happen due to race conditions or bugs). This follows the same pattern used in IsChildrenCached which properly checks for existence before accessing the entry. * mount: fix batch flush when last entry is hidden The previous batch insert implementation relied on the isLast flag to flush remaining entries. However, if the last entry is a hidden system entry (like 'topics' or 'etc' in root), the callback returns early and the remaining entries in the batch are never flushed. Fix by: 1. Only flush when batch reaches threshold inside the callback 2. Flush any remaining entries after ReadDirAllEntries completes 3. Use error wrapping instead of logging+returning to avoid duplicate logs 4. Create new slice after flush to allow GC of flushed entries 5. Add documentation for batchInsertSize constant This ensures all entries are properly inserted regardless of whether the last entry is hidden, and prevents memory retention issues. * mount: add context support for cancellation in EnsureVisited Thread context.Context through the batch insert call chain to enable proper cancellation and timeout support: 1. Use errgroup.WithContext() so if one fetch fails, others are cancelled 2. Add context parameter to BatchInsertEntries for consistency with InsertEntry 3. Pass context to ReadDirAllEntries for cancellation during network calls 4. Check context cancellation before starting work in doEnsureVisited 5. Use %w for error wrapping to preserve error types for inspection This prevents unnecessary work when one directory fetch fails and makes the batch operations consistent with the existing context-aware APIs.
Diffstat (limited to 'weed/mount')
-rw-r--r--weed/mount/inode_to_path.go4
-rw-r--r--weed/mount/meta_cache/meta_cache.go22
-rw-r--r--weed/mount/meta_cache/meta_cache_init.go103
3 files changed, 96 insertions, 33 deletions
diff --git a/weed/mount/inode_to_path.go b/weed/mount/inode_to_path.go
index 021623af3..4a01e30e7 100644
--- a/weed/mount/inode_to_path.go
+++ b/weed/mount/inode_to_path.go
@@ -160,6 +160,10 @@ func (i *InodeToPath) MarkChildrenCached(fullpath util.FullPath) {
return
}
path, found := i.inode2path[inode]
+ if !found {
+ glog.Warningf("MarkChildrenCached inode %d not found in inode2path for %v", inode, fullpath)
+ return
+ }
path.isChildrenCached = true
if i.cacheMetaTtlSec > 0 {
path.cachedExpiresTime = time.Now().Add(i.cacheMetaTtlSec)
diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go
index 9578aff72..0ed76e039 100644
--- a/weed/mount/meta_cache/meta_cache.go
+++ b/weed/mount/meta_cache/meta_cache.go
@@ -6,6 +6,8 @@ import (
"sync"
"time"
+ "golang.org/x/sync/singleflight"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -17,20 +19,24 @@ import (
// e.g. fill fileId field for chunks
type MetaCache struct {
- root util.FullPath
- localStore filer.VirtualFilerStore
+ root util.FullPath
+ localStore filer.VirtualFilerStore
+ leveldbStore *leveldb.LevelDBStore // direct reference for batch operations
sync.RWMutex
uidGidMapper *UidGidMapper
markCachedFn func(fullpath util.FullPath)
isCachedFn func(fullpath util.FullPath) bool
invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
+ visitGroup singleflight.Group // deduplicates concurrent EnsureVisited calls for the same path
}
func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPath,
markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache {
+ leveldbStore, virtualStore := openMetaStore(dbFolder)
return &MetaCache{
root: root,
- localStore: openMetaStore(dbFolder),
+ localStore: virtualStore,
+ leveldbStore: leveldbStore,
markCachedFn: markCachedFn,
isCachedFn: isCachedFn,
uidGidMapper: uidGidMapper,
@@ -40,7 +46,7 @@ func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPat
}
}
-func openMetaStore(dbFolder string) filer.VirtualFilerStore {
+func openMetaStore(dbFolder string) (*leveldb.LevelDBStore, filer.VirtualFilerStore) {
os.RemoveAll(dbFolder)
os.MkdirAll(dbFolder, 0755)
@@ -54,7 +60,7 @@ func openMetaStore(dbFolder string) filer.VirtualFilerStore {
glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
}
- return filer.NewFilerStoreWrapper(store)
+ return store, filer.NewFilerStoreWrapper(store)
}
@@ -68,6 +74,12 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro
return mc.localStore.InsertEntry(ctx, entry)
}
+// doBatchInsertEntries inserts multiple entries using LevelDB's batch write.
+// This is more efficient than inserting entries one by one.
+func (mc *MetaCache) doBatchInsertEntries(ctx context.Context, entries []*filer.Entry) error {
+ return mc.leveldbStore.BatchInsertEntries(ctx, entries)
+}
+
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
mc.Lock()
defer mc.Unlock()
diff --git a/weed/mount/meta_cache/meta_cache_init.go b/weed/mount/meta_cache/meta_cache_init.go
index 068daa3f9..10ec9dad7 100644
--- a/weed/mount/meta_cache/meta_cache_init.go
+++ b/weed/mount/meta_cache/meta_cache_init.go
@@ -4,6 +4,8 @@ import (
"context"
"fmt"
+ "golang.org/x/sync/errgroup"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -11,22 +13,18 @@ import (
)
func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
-
+ // Collect all uncached paths from target directory up to root
+ var uncachedPaths []util.FullPath
currentPath := dirPath
for {
-
- // the directory children are already cached
- // so no need for this and upper directories
+ // If this path is cached, all ancestors are also cached
if mc.isCachedFn(currentPath) {
- return nil
- }
-
- if err := doEnsureVisited(mc, client, currentPath); err != nil {
- return err
+ break
}
+ uncachedPaths = append(uncachedPaths, currentPath)
- // continue to parent directory
+ // Continue to parent directory
if currentPath != mc.root {
parent, _ := currentPath.DirAndName()
currentPath = util.FullPath(parent)
@@ -35,33 +33,82 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
}
}
- return nil
+ if len(uncachedPaths) == 0 {
+ return nil
+ }
+ // Fetch all uncached directories in parallel with context for cancellation
+ // If one fetch fails, cancel the others to avoid unnecessary work
+ g, ctx := errgroup.WithContext(context.Background())
+ for _, p := range uncachedPaths {
+ path := p // capture for closure
+ g.Go(func() error {
+ return doEnsureVisited(ctx, mc, client, path)
+ })
+ }
+ return g.Wait()
}
-func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error {
+// batchInsertSize is the number of entries to accumulate before flushing to LevelDB.
+// 100 provides a balance between memory usage (~100 Entry pointers) and write efficiency
+// (fewer disk syncs). Larger values reduce I/O overhead but increase memory and latency.
+const batchInsertSize = 100
+
+func doEnsureVisited(ctx context.Context, mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error {
+ // Use singleflight to deduplicate concurrent requests for the same path
+ _, err, _ := mc.visitGroup.Do(string(path), func() (interface{}, error) {
+ // Check for cancellation before starting
+ if ctx.Err() != nil {
+ return nil, ctx.Err()
+ }
+
+ // Double-check if already cached (another goroutine may have completed)
+ if mc.isCachedFn(path) {
+ return nil, nil
+ }
- glog.V(4).Infof("ReadDirAllEntries %s ...", path)
+ glog.V(4).Infof("ReadDirAllEntries %s ...", path)
- err := util.Retry("ReadDirAllEntries", func() error {
- return filer_pb.ReadDirAllEntries(context.Background(), client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
- entry := filer.FromPbEntry(string(path), pbEntry)
- if IsHiddenSystemEntry(string(path), entry.Name()) {
+ // Collect entries in batches for efficient LevelDB writes
+ var batch []*filer.Entry
+
+ fetchErr := util.Retry("ReadDirAllEntries", func() error {
+ batch = nil // Reset batch on retry, allow GC of previous entries
+ return filer_pb.ReadDirAllEntries(ctx, client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
+ entry := filer.FromPbEntry(string(path), pbEntry)
+ if IsHiddenSystemEntry(string(path), entry.Name()) {
+ return nil
+ }
+
+ batch = append(batch, entry)
+
+ // Flush batch when it reaches the threshold
+ // Don't rely on isLast here - hidden entries may cause early return
+ if len(batch) >= batchInsertSize {
+ // No lock needed - LevelDB Write() is thread-safe
+ if err := mc.doBatchInsertEntries(ctx, batch); err != nil {
+ return fmt.Errorf("batch insert for %s: %w", path, err)
+ }
+ // Create new slice to allow GC of flushed entries
+ batch = make([]*filer.Entry, 0, batchInsertSize)
+ }
return nil
- }
- if err := mc.doInsertEntry(context.Background(), entry); err != nil {
- glog.V(0).Infof("read %s: %v", entry.FullPath, err)
- return err
- }
- return nil
+ })
})
- })
- if err != nil {
- err = fmt.Errorf("list %s: %v", path, err)
- } else {
+ if fetchErr != nil {
+ return nil, fmt.Errorf("list %s: %w", path, fetchErr)
+ }
+
+ // Flush any remaining entries in the batch
+ if len(batch) > 0 {
+ if err := mc.doBatchInsertEntries(ctx, batch); err != nil {
+ return nil, fmt.Errorf("batch insert remaining for %s: %w", path, err)
+ }
+ }
mc.markCachedFn(path)
- }
+ return nil, nil
+ })
return err
}