diff options
Diffstat (limited to 'weed/mount/meta_cache/meta_cache_init.go')
| -rw-r--r-- | weed/mount/meta_cache/meta_cache_init.go | 103 |
1 files changed, 75 insertions, 28 deletions
diff --git a/weed/mount/meta_cache/meta_cache_init.go b/weed/mount/meta_cache/meta_cache_init.go index 068daa3f9..10ec9dad7 100644 --- a/weed/mount/meta_cache/meta_cache_init.go +++ b/weed/mount/meta_cache/meta_cache_init.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "golang.org/x/sync/errgroup" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -11,22 +13,18 @@ import ( ) func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error { - + // Collect all uncached paths from target directory up to root + var uncachedPaths []util.FullPath currentPath := dirPath for { - - // the directory children are already cached - // so no need for this and upper directories + // If this path is cached, all ancestors are also cached if mc.isCachedFn(currentPath) { - return nil - } - - if err := doEnsureVisited(mc, client, currentPath); err != nil { - return err + break } + uncachedPaths = append(uncachedPaths, currentPath) - // continue to parent directory + // Continue to parent directory if currentPath != mc.root { parent, _ := currentPath.DirAndName() currentPath = util.FullPath(parent) @@ -35,33 +33,82 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full } } - return nil + if len(uncachedPaths) == 0 { + return nil + } + // Fetch all uncached directories in parallel with context for cancellation + // If one fetch fails, cancel the others to avoid unnecessary work + g, ctx := errgroup.WithContext(context.Background()) + for _, p := range uncachedPaths { + path := p // capture for closure + g.Go(func() error { + return doEnsureVisited(ctx, mc, client, path) + }) + } + return g.Wait() } -func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error { +// batchInsertSize is the number of entries to accumulate before flushing to LevelDB. +// 100 provides a balance between memory usage (~100 Entry pointers) and write efficiency +// (fewer disk syncs). Larger values reduce I/O overhead but increase memory and latency. +const batchInsertSize = 100 + +func doEnsureVisited(ctx context.Context, mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error { + // Use singleflight to deduplicate concurrent requests for the same path + _, err, _ := mc.visitGroup.Do(string(path), func() (interface{}, error) { + // Check for cancellation before starting + if ctx.Err() != nil { + return nil, ctx.Err() + } + + // Double-check if already cached (another goroutine may have completed) + if mc.isCachedFn(path) { + return nil, nil + } - glog.V(4).Infof("ReadDirAllEntries %s ...", path) + glog.V(4).Infof("ReadDirAllEntries %s ...", path) - err := util.Retry("ReadDirAllEntries", func() error { - return filer_pb.ReadDirAllEntries(context.Background(), client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error { - entry := filer.FromPbEntry(string(path), pbEntry) - if IsHiddenSystemEntry(string(path), entry.Name()) { + // Collect entries in batches for efficient LevelDB writes + var batch []*filer.Entry + + fetchErr := util.Retry("ReadDirAllEntries", func() error { + batch = nil // Reset batch on retry, allow GC of previous entries + return filer_pb.ReadDirAllEntries(ctx, client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error { + entry := filer.FromPbEntry(string(path), pbEntry) + if IsHiddenSystemEntry(string(path), entry.Name()) { + return nil + } + + batch = append(batch, entry) + + // Flush batch when it reaches the threshold + // Don't rely on isLast here - hidden entries may cause early return + if len(batch) >= batchInsertSize { + // No lock needed - LevelDB Write() is thread-safe + if err := mc.doBatchInsertEntries(ctx, batch); err != nil { + return fmt.Errorf("batch insert for %s: %w", path, err) + } + // Create new slice to allow GC of flushed entries + batch = make([]*filer.Entry, 0, batchInsertSize) + } return nil - } - if err := mc.doInsertEntry(context.Background(), entry); err != nil { - glog.V(0).Infof("read %s: %v", entry.FullPath, err) - return err - } - return nil + }) }) - }) - if err != nil { - err = fmt.Errorf("list %s: %v", path, err) - } else { + if fetchErr != nil { + return nil, fmt.Errorf("list %s: %w", path, fetchErr) + } + + // Flush any remaining entries in the batch + if len(batch) > 0 { + if err := mc.doBatchInsertEntries(ctx, batch); err != nil { + return nil, fmt.Errorf("batch insert remaining for %s: %w", path, err) + } + } mc.markCachedFn(path) - } + return nil, nil + }) return err } |
