aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/meta_cache
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount/meta_cache')
-rw-r--r--weed/mount/meta_cache/meta_cache.go22
-rw-r--r--weed/mount/meta_cache/meta_cache_init.go103
2 files changed, 92 insertions, 33 deletions
diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go
index 9578aff72..0ed76e039 100644
--- a/weed/mount/meta_cache/meta_cache.go
+++ b/weed/mount/meta_cache/meta_cache.go
@@ -6,6 +6,8 @@ import (
"sync"
"time"
+ "golang.org/x/sync/singleflight"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -17,20 +19,24 @@ import (
// e.g. fill fileId field for chunks
type MetaCache struct {
- root util.FullPath
- localStore filer.VirtualFilerStore
+ root util.FullPath
+ localStore filer.VirtualFilerStore
+ leveldbStore *leveldb.LevelDBStore // direct reference for batch operations
sync.RWMutex
uidGidMapper *UidGidMapper
markCachedFn func(fullpath util.FullPath)
isCachedFn func(fullpath util.FullPath) bool
invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
+ visitGroup singleflight.Group // deduplicates concurrent EnsureVisited calls for the same path
}
func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPath,
markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache {
+ leveldbStore, virtualStore := openMetaStore(dbFolder)
return &MetaCache{
root: root,
- localStore: openMetaStore(dbFolder),
+ localStore: virtualStore,
+ leveldbStore: leveldbStore,
markCachedFn: markCachedFn,
isCachedFn: isCachedFn,
uidGidMapper: uidGidMapper,
@@ -40,7 +46,7 @@ func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPat
}
}
-func openMetaStore(dbFolder string) filer.VirtualFilerStore {
+func openMetaStore(dbFolder string) (*leveldb.LevelDBStore, filer.VirtualFilerStore) {
os.RemoveAll(dbFolder)
os.MkdirAll(dbFolder, 0755)
@@ -54,7 +60,7 @@ func openMetaStore(dbFolder string) filer.VirtualFilerStore {
glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
}
- return filer.NewFilerStoreWrapper(store)
+ return store, filer.NewFilerStoreWrapper(store)
}
@@ -68,6 +74,12 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro
return mc.localStore.InsertEntry(ctx, entry)
}
+// doBatchInsertEntries inserts multiple entries using LevelDB's batch write.
+// This is more efficient than inserting entries one by one.
+func (mc *MetaCache) doBatchInsertEntries(ctx context.Context, entries []*filer.Entry) error {
+ return mc.leveldbStore.BatchInsertEntries(ctx, entries)
+}
+
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
mc.Lock()
defer mc.Unlock()
diff --git a/weed/mount/meta_cache/meta_cache_init.go b/weed/mount/meta_cache/meta_cache_init.go
index 068daa3f9..10ec9dad7 100644
--- a/weed/mount/meta_cache/meta_cache_init.go
+++ b/weed/mount/meta_cache/meta_cache_init.go
@@ -4,6 +4,8 @@ import (
"context"
"fmt"
+ "golang.org/x/sync/errgroup"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -11,22 +13,18 @@ import (
)
func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
-
+ // Collect all uncached paths from target directory up to root
+ var uncachedPaths []util.FullPath
currentPath := dirPath
for {
-
- // the directory children are already cached
- // so no need for this and upper directories
+ // If this path is cached, all ancestors are also cached
if mc.isCachedFn(currentPath) {
- return nil
- }
-
- if err := doEnsureVisited(mc, client, currentPath); err != nil {
- return err
+ break
}
+ uncachedPaths = append(uncachedPaths, currentPath)
- // continue to parent directory
+ // Continue to parent directory
if currentPath != mc.root {
parent, _ := currentPath.DirAndName()
currentPath = util.FullPath(parent)
@@ -35,33 +33,82 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
}
}
- return nil
+ if len(uncachedPaths) == 0 {
+ return nil
+ }
+ // Fetch all uncached directories in parallel with context for cancellation
+ // If one fetch fails, cancel the others to avoid unnecessary work
+ g, ctx := errgroup.WithContext(context.Background())
+ for _, p := range uncachedPaths {
+ path := p // capture for closure
+ g.Go(func() error {
+ return doEnsureVisited(ctx, mc, client, path)
+ })
+ }
+ return g.Wait()
}
-func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error {
+// batchInsertSize is the number of entries to accumulate before flushing to LevelDB.
+// 100 provides a balance between memory usage (~100 Entry pointers) and write efficiency
+// (fewer disk syncs). Larger values reduce I/O overhead but increase memory and latency.
+const batchInsertSize = 100
+
+func doEnsureVisited(ctx context.Context, mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error {
+ // Use singleflight to deduplicate concurrent requests for the same path
+ _, err, _ := mc.visitGroup.Do(string(path), func() (interface{}, error) {
+ // Check for cancellation before starting
+ if ctx.Err() != nil {
+ return nil, ctx.Err()
+ }
+
+ // Double-check if already cached (another goroutine may have completed)
+ if mc.isCachedFn(path) {
+ return nil, nil
+ }
- glog.V(4).Infof("ReadDirAllEntries %s ...", path)
+ glog.V(4).Infof("ReadDirAllEntries %s ...", path)
- err := util.Retry("ReadDirAllEntries", func() error {
- return filer_pb.ReadDirAllEntries(context.Background(), client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
- entry := filer.FromPbEntry(string(path), pbEntry)
- if IsHiddenSystemEntry(string(path), entry.Name()) {
+ // Collect entries in batches for efficient LevelDB writes
+ var batch []*filer.Entry
+
+ fetchErr := util.Retry("ReadDirAllEntries", func() error {
+ batch = nil // Reset batch on retry, allow GC of previous entries
+ return filer_pb.ReadDirAllEntries(ctx, client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
+ entry := filer.FromPbEntry(string(path), pbEntry)
+ if IsHiddenSystemEntry(string(path), entry.Name()) {
+ return nil
+ }
+
+ batch = append(batch, entry)
+
+ // Flush batch when it reaches the threshold
+ // Don't rely on isLast here - hidden entries may cause early return
+ if len(batch) >= batchInsertSize {
+ // No lock needed - LevelDB Write() is thread-safe
+ if err := mc.doBatchInsertEntries(ctx, batch); err != nil {
+ return fmt.Errorf("batch insert for %s: %w", path, err)
+ }
+ // Create new slice to allow GC of flushed entries
+ batch = make([]*filer.Entry, 0, batchInsertSize)
+ }
return nil
- }
- if err := mc.doInsertEntry(context.Background(), entry); err != nil {
- glog.V(0).Infof("read %s: %v", entry.FullPath, err)
- return err
- }
- return nil
+ })
})
- })
- if err != nil {
- err = fmt.Errorf("list %s: %v", path, err)
- } else {
+ if fetchErr != nil {
+ return nil, fmt.Errorf("list %s: %w", path, fetchErr)
+ }
+
+ // Flush any remaining entries in the batch
+ if len(batch) > 0 {
+ if err := mc.doBatchInsertEntries(ctx, batch); err != nil {
+ return nil, fmt.Errorf("batch insert remaining for %s: %w", path, err)
+ }
+ }
mc.markCachedFn(path)
- }
+ return nil, nil
+ })
return err
}