aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-06-19 09:45:27 -0700
committerChris Lu <chris.lu@gmail.com>2020-06-19 09:45:42 -0700
commitf7a45d448f5efe1bea7c8fc5ee61cb7d535995b5 (patch)
tree07c9057b7c741dc15c8e1cd5e133ca3d6e9fa709
parentc0283eee1ad18d6a3907b186d3e2fc00fab83824 (diff)
downloadseaweedfs-f7a45d448f5efe1bea7c8fc5ee61cb7d535995b5.tar.xz
seaweedfs-f7a45d448f5efe1bea7c8fc5ee61cb7d535995b5.zip
FUSE mount: lazy loading meta cache
-rw-r--r--weed/filesys/dir.go9
-rw-r--r--weed/filesys/meta_cache/meta_cache.go25
-rw-r--r--weed/filesys/meta_cache/meta_cache_init.go26
-rw-r--r--weed/filesys/xattr.go2
4 files changed, 54 insertions, 8 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 4e164726d..18d21cf7f 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -8,6 +8,7 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -204,8 +205,10 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
- fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
+ dirPath := dir.FullPath()
+ fullFilePath := util.NewFullPath(dirPath, req.Name)
+ meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, util.FullPath(dirPath))
cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT
@@ -263,7 +266,9 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
return nil
}
- listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit))
+ dirPath := util.FullPath(dir.FullPath())
+ meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
+ listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int(dir.wfs.option.DirListCacheLimit))
if listErr != nil {
glog.Errorf("list meta cache: %v", listErr)
return nil, fuse.EIO
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index 4c9090d42..3b04040a5 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -9,16 +9,19 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/bounded_tree"
)
type MetaCache struct {
actualStore filer2.FilerStore
sync.RWMutex
+ visitedBoundary *bounded_tree.BoundedTree
}
func NewMetaCache(dbFolder string) *MetaCache {
return &MetaCache{
- actualStore: openMetaStore(dbFolder),
+ actualStore: openMetaStore(dbFolder),
+ visitedBoundary: bounded_tree.NewBoundedTree(),
}
}
@@ -49,14 +52,24 @@ func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error
func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPath, newEntry *filer2.Entry) error {
mc.Lock()
defer mc.Unlock()
- if oldPath != "" {
- if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil {
- return err
+
+ oldDir, _ := oldPath.DirAndName()
+ if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) {
+ if oldPath != "" {
+ if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil {
+ return err
+ }
}
+ }else{
+ // println("unknown old directory:", oldDir)
}
+
if newEntry != nil {
- if err := mc.actualStore.InsertEntry(ctx, newEntry); err != nil {
- return err
+ newDir, _ := newEntry.DirAndName()
+ if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) {
+ if err := mc.actualStore.InsertEntry(ctx, newEntry); err != nil {
+ return err
+ }
}
}
return nil
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
index 58bf6862e..1fbc3e532 100644
--- a/weed/filesys/meta_cache/meta_cache_init.go
+++ b/weed/filesys/meta_cache/meta_cache_init.go
@@ -2,6 +2,7 @@ package meta_cache
import (
"context"
+ "fmt"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -10,6 +11,7 @@ import (
)
func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient, path string) error {
+ return nil
glog.V(0).Infof("synchronizing meta data ...")
filer_pb.TraverseBfs(client, util.FullPath(path), func(parentPath util.FullPath, pbEntry *filer_pb.Entry) {
entry := filer2.FromPbEntry(string(parentPath), pbEntry)
@@ -19,3 +21,27 @@ func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient, path string) erro
})
return nil
}
+
+func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) {
+
+ mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {
+
+ glog.V(2).Infof("ReadDirAllEntries %s ...", path)
+
+ err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
+ entry := filer2.FromPbEntry(string(dirPath), pbEntry)
+ if err := mc.InsertEntry(context.Background(), entry); err != nil {
+ glog.V(0).Infof("read %s: %v", entry.FullPath, err)
+ return err
+ }
+ if entry.IsDirectory() {
+ childDirectories = append(childDirectories, entry.Name())
+ }
+ return nil
+ })
+ if err != nil {
+ err = fmt.Errorf("list %s: %v", dirPath, err)
+ }
+ return
+ })
+}
diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go
index 940979457..870a72ebe 100644
--- a/weed/filesys/xattr.go
+++ b/weed/filesys/xattr.go
@@ -5,6 +5,7 @@ import (
"github.com/seaweedfs/fuse"
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -113,6 +114,7 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err err
// glog.V(3).Infof("read entry cache miss %s", fullpath)
// read from async meta cache
+ meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT