diff options
| -rw-r--r-- | weed/filer/leveldb/leveldb_store.go | 33 | ||||
| -rw-r--r-- | weed/mount/inode_to_path.go | 4 | ||||
| -rw-r--r-- | weed/mount/meta_cache/meta_cache.go | 22 | ||||
| -rw-r--r-- | weed/mount/meta_cache/meta_cache_init.go | 103 |
4 files changed, 129 insertions, 33 deletions
diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go index fea6e0a3d..3e001b6df 100644 --- a/weed/filer/leveldb/leveldb_store.go +++ b/weed/filer/leveldb/leveldb_store.go @@ -107,6 +107,39 @@ func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry) return store.InsertEntry(ctx, entry) } +// BatchInsertEntries inserts multiple entries in a single LevelDB batch write. +// This is more efficient than inserting entries one by one as it reduces +// the number of write operations and syncs to disk. +func (store *LevelDBStore) BatchInsertEntries(ctx context.Context, entries []*filer.Entry) error { + if len(entries) == 0 { + return nil + } + + batch := new(leveldb.Batch) + + for _, entry := range entries { + key := genKey(entry.DirAndName()) + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %w", entry.FullPath, entry.Attr, err) + } + + if len(entry.GetChunks()) > filer.CountEntryChunksForGzip { + value = weed_util.MaybeGzipData(value) + } + + batch.Put(key, value) + } + + err := store.db.Write(batch, nil) + if err != nil { + return fmt.Errorf("batch write: %w", err) + } + + return nil +} + func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { key := genKey(fullpath.DirAndName()) diff --git a/weed/mount/inode_to_path.go b/weed/mount/inode_to_path.go index 021623af3..4a01e30e7 100644 --- a/weed/mount/inode_to_path.go +++ b/weed/mount/inode_to_path.go @@ -160,6 +160,10 @@ func (i *InodeToPath) MarkChildrenCached(fullpath util.FullPath) { return } path, found := i.inode2path[inode] + if !found { + glog.Warningf("MarkChildrenCached inode %d not found in inode2path for %v", inode, fullpath) + return + } path.isChildrenCached = true if i.cacheMetaTtlSec > 0 { path.cachedExpiresTime = time.Now().Add(i.cacheMetaTtlSec) diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go index 9578aff72..0ed76e039 100644 --- a/weed/mount/meta_cache/meta_cache.go +++ b/weed/mount/meta_cache/meta_cache.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "golang.org/x/sync/singleflight" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer/leveldb" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -17,20 +19,24 @@ import ( // e.g. fill fileId field for chunks type MetaCache struct { - root util.FullPath - localStore filer.VirtualFilerStore + root util.FullPath + localStore filer.VirtualFilerStore + leveldbStore *leveldb.LevelDBStore // direct reference for batch operations sync.RWMutex uidGidMapper *UidGidMapper markCachedFn func(fullpath util.FullPath) isCachedFn func(fullpath util.FullPath) bool invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry) + visitGroup singleflight.Group // deduplicates concurrent EnsureVisited calls for the same path } func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPath, markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache { + leveldbStore, virtualStore := openMetaStore(dbFolder) return &MetaCache{ root: root, - localStore: openMetaStore(dbFolder), + localStore: virtualStore, + leveldbStore: leveldbStore, markCachedFn: markCachedFn, isCachedFn: isCachedFn, uidGidMapper: uidGidMapper, @@ -40,7 +46,7 @@ func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPat } } -func openMetaStore(dbFolder string) filer.VirtualFilerStore { +func openMetaStore(dbFolder string) (*leveldb.LevelDBStore, filer.VirtualFilerStore) { os.RemoveAll(dbFolder) os.MkdirAll(dbFolder, 0755) @@ -54,7 +60,7 @@ func openMetaStore(dbFolder string) filer.VirtualFilerStore { glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err) } - return filer.NewFilerStoreWrapper(store) + return store, filer.NewFilerStoreWrapper(store) } @@ -68,6 +74,12 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro return mc.localStore.InsertEntry(ctx, entry) } +// doBatchInsertEntries inserts multiple entries using LevelDB's batch write. +// This is more efficient than inserting entries one by one. +func (mc *MetaCache) doBatchInsertEntries(ctx context.Context, entries []*filer.Entry) error { + return mc.leveldbStore.BatchInsertEntries(ctx, entries) +} + func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error { mc.Lock() defer mc.Unlock() diff --git a/weed/mount/meta_cache/meta_cache_init.go b/weed/mount/meta_cache/meta_cache_init.go index 068daa3f9..10ec9dad7 100644 --- a/weed/mount/meta_cache/meta_cache_init.go +++ b/weed/mount/meta_cache/meta_cache_init.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "golang.org/x/sync/errgroup" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -11,22 +13,18 @@ import ( ) func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error { - + // Collect all uncached paths from target directory up to root + var uncachedPaths []util.FullPath currentPath := dirPath for { - - // the directory children are already cached - // so no need for this and upper directories + // If this path is cached, all ancestors are also cached if mc.isCachedFn(currentPath) { - return nil - } - - if err := doEnsureVisited(mc, client, currentPath); err != nil { - return err + break } + uncachedPaths = append(uncachedPaths, currentPath) - // continue to parent directory + // Continue to parent directory if currentPath != mc.root { parent, _ := currentPath.DirAndName() currentPath = util.FullPath(parent) @@ -35,33 +33,82 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full } } - return nil + if len(uncachedPaths) == 0 { + return nil + } + // Fetch all uncached directories in parallel with context for cancellation + // If one fetch fails, cancel the others to avoid unnecessary work + g, ctx := errgroup.WithContext(context.Background()) + for _, p := range uncachedPaths { + path := p // capture for closure + g.Go(func() error { + return doEnsureVisited(ctx, mc, client, path) + }) + } + return g.Wait() } -func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error { +// batchInsertSize is the number of entries to accumulate before flushing to LevelDB. +// 100 provides a balance between memory usage (~100 Entry pointers) and write efficiency +// (fewer disk syncs). Larger values reduce I/O overhead but increase memory and latency. +const batchInsertSize = 100 + +func doEnsureVisited(ctx context.Context, mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error { + // Use singleflight to deduplicate concurrent requests for the same path + _, err, _ := mc.visitGroup.Do(string(path), func() (interface{}, error) { + // Check for cancellation before starting + if ctx.Err() != nil { + return nil, ctx.Err() + } + + // Double-check if already cached (another goroutine may have completed) + if mc.isCachedFn(path) { + return nil, nil + } - glog.V(4).Infof("ReadDirAllEntries %s ...", path) + glog.V(4).Infof("ReadDirAllEntries %s ...", path) - err := util.Retry("ReadDirAllEntries", func() error { - return filer_pb.ReadDirAllEntries(context.Background(), client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error { - entry := filer.FromPbEntry(string(path), pbEntry) - if IsHiddenSystemEntry(string(path), entry.Name()) { + // Collect entries in batches for efficient LevelDB writes + var batch []*filer.Entry + + fetchErr := util.Retry("ReadDirAllEntries", func() error { + batch = nil // Reset batch on retry, allow GC of previous entries + return filer_pb.ReadDirAllEntries(ctx, client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error { + entry := filer.FromPbEntry(string(path), pbEntry) + if IsHiddenSystemEntry(string(path), entry.Name()) { + return nil + } + + batch = append(batch, entry) + + // Flush batch when it reaches the threshold + // Don't rely on isLast here - hidden entries may cause early return + if len(batch) >= batchInsertSize { + // No lock needed - LevelDB Write() is thread-safe + if err := mc.doBatchInsertEntries(ctx, batch); err != nil { + return fmt.Errorf("batch insert for %s: %w", path, err) + } + // Create new slice to allow GC of flushed entries + batch = make([]*filer.Entry, 0, batchInsertSize) + } return nil - } - if err := mc.doInsertEntry(context.Background(), entry); err != nil { - glog.V(0).Infof("read %s: %v", entry.FullPath, err) - return err - } - return nil + }) }) - }) - if err != nil { - err = fmt.Errorf("list %s: %v", path, err) - } else { + if fetchErr != nil { + return nil, fmt.Errorf("list %s: %w", path, fetchErr) + } + + // Flush any remaining entries in the batch + if len(batch) > 0 { + if err := mc.doBatchInsertEntries(ctx, batch); err != nil { + return nil, fmt.Errorf("batch insert remaining for %s: %w", path, err) + } + } mc.markCachedFn(path) - } + return nil, nil + }) return err } |
