aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-06-28 10:14:17 -0700
committerChris Lu <chris.lu@gmail.com>2020-06-28 10:14:17 -0700
commit1bb8cae65d499dc48cc2d7db2dc5692482f3f305 (patch)
treee467980c5973dc2e3aeb6a2f1586b79df02e3c9c
parentb813fac4a39e9cec311a17706f70796d72a69bf2 (diff)
downloadseaweedfs-1bb8cae65d499dc48cc2d7db2dc5692482f3f305.tar.xz
seaweedfs-1bb8cae65d499dc48cc2d7db2dc5692482f3f305.zip
reverting and working
-rw-r--r--weed/command/mount.go2
-rw-r--r--weed/command/mount_std.go1
-rw-r--r--weed/filesys/dir.go99
-rw-r--r--weed/filesys/dir_link.go4
-rw-r--r--weed/filesys/dir_rename.go9
-rw-r--r--weed/filesys/file.go13
-rw-r--r--weed/filesys/filehandle.go4
-rw-r--r--weed/filesys/fscache.go207
-rw-r--r--weed/filesys/fscache_test.go96
-rw-r--r--weed/filesys/wfs.go52
-rw-r--r--weed/filesys/xattr.go42
11 files changed, 477 insertions, 52 deletions
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 440aca8c6..97207f7f9 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -21,6 +21,7 @@ type MountOptions struct {
umaskString *string
nonempty *bool
outsideContainerClusterMode *bool
+ asyncMetaDataCaching *bool
}
var (
@@ -49,6 +50,7 @@ func init() {
mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system")
+ mountOptions.asyncMetaDataCaching = cmdMount.Flag.Bool("asyncMetaDataCaching", true, "async meta data caching. this feature will be permanent and this option will be removed.")
}
var cmdMount = &Command{
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 915754166..abcf85110 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -170,6 +170,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
MountMtime: time.Now(),
Umask: umask,
OutsideContainerClusterMode: *mountOptions.outsideContainerClusterMode,
+ AsyncMetaDataCaching: *mountOptions.asyncMetaDataCaching,
Cipher: cipher,
})
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 18d21cf7f..2a4a6a1a5 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -7,13 +7,14 @@ import (
"strings"
"time"
+ "github.com/seaweedfs/fuse"
+ "github.com/seaweedfs/fuse/fs"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
)
type Dir struct {
@@ -90,18 +91,22 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
}
func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
- return &File{
- Name: name,
- dir: dir,
- wfs: dir.wfs,
- entry: entry,
- entryViewCache: nil,
- }
+ return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node {
+ return &File{
+ Name: name,
+ dir: dir,
+ wfs: dir.wfs,
+ entry: entry,
+ entryViewCache: nil,
+ }
+ })
}
func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node {
- return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir}
+ return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node {
+ return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir}
+ })
}
@@ -136,7 +141,9 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
return fuse.EIO
}
- dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ if dir.wfs.option.AsyncMetaDataCaching {
+ dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ }
return nil
}); err != nil {
@@ -185,7 +192,9 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
return err
}
- dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ if dir.wfs.option.AsyncMetaDataCaching {
+ dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ }
return nil
})
@@ -205,15 +214,18 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
- dirPath := dir.FullPath()
- fullFilePath := util.NewFullPath(dirPath, req.Name)
+ fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
+ entry := dir.wfs.cacheGet(fullFilePath)
+ dirPath := util.FullPath(dir.FullPath())
meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, util.FullPath(dirPath))
- cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
- if cacheErr == filer_pb.ErrNotFound {
- return nil, fuse.ENOENT
+ if dir.wfs.option.AsyncMetaDataCaching {
+ cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
+ if cacheErr == filer_pb.ErrNotFound {
+ return nil, fuse.ENOENT
+ }
+ entry = cachedEntry.ToProtoEntry()
}
- entry := cachedEntry.ToProtoEntry()
if entry == nil {
// glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
@@ -222,6 +234,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err)
return nil, fuse.ENOENT
}
+ dir.wfs.cacheSet(fullFilePath, entry, 5*time.Minute)
} else {
glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath)
}
@@ -253,6 +266,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath())
+ cacheTtl := 5 * time.Minute
processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error {
fullpath := util.NewFullPath(dir.FullPath(), entry.Name)
inode := fullpath.AsInode()
@@ -263,21 +277,31 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_File}
ret = append(ret, dirent)
}
+ dir.wfs.cacheSet(fullpath, entry, cacheTtl)
return nil
}
dirPath := util.FullPath(dir.FullPath())
meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
- listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int(dir.wfs.option.DirListCacheLimit))
- if listErr != nil {
- glog.Errorf("list meta cache: %v", listErr)
- return nil, fuse.EIO
+ if dir.wfs.option.AsyncMetaDataCaching {
+ listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit))
+ if listErr != nil {
+ glog.Errorf("list meta cache: %v", listErr)
+ return nil, fuse.EIO
+ }
+ for _, cachedEntry := range listedEntries {
+ processEachEntryFn(cachedEntry.ToProtoEntry(), false)
+ }
+ return
}
- for _, cachedEntry := range listedEntries {
- processEachEntryFn(cachedEntry.ToProtoEntry(), false)
+
+ readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", processEachEntryFn)
+ if readErr != nil {
+ glog.V(0).Infof("list %s: %v", dir.FullPath(), err)
+ return ret, fuse.EIO
}
- return
+ return ret, err
}
func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
@@ -303,7 +327,12 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
dir.wfs.deleteFileChunks(entry.Chunks)
- dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
+ dir.wfs.cacheDelete(filePath)
+ dir.wfs.fsNodeCache.DeleteFsNode(filePath)
+
+ if dir.wfs.option.AsyncMetaDataCaching {
+ dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
+ }
glog.V(3).Infof("remove file: %v", req)
err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false)
@@ -319,8 +348,12 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
t := util.NewFullPath(dir.FullPath(), req.Name)
+ dir.wfs.cacheDelete(t)
+ dir.wfs.fsNodeCache.DeleteFsNode(t)
- dir.wfs.metaCache.DeleteEntry(context.Background(), t)
+ if dir.wfs.option.AsyncMetaDataCaching {
+ dir.wfs.metaCache.DeleteEntry(context.Background(), t)
+ }
glog.V(3).Infof("remove directory entry: %v", req)
err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false)
@@ -357,6 +390,8 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus
dir.entry.Attributes.Mtime = req.Mtime.Unix()
}
+ dir.wfs.cacheDelete(util.FullPath(dir.FullPath()))
+
return dir.saveEntry()
}
@@ -373,6 +408,8 @@ func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
return err
}
+ dir.wfs.cacheDelete(util.FullPath(dir.FullPath()))
+
return dir.saveEntry()
}
@@ -389,6 +426,8 @@ func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) e
return err
}
+ dir.wfs.cacheDelete(util.FullPath(dir.FullPath()))
+
return dir.saveEntry()
}
@@ -411,6 +450,8 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp
func (dir *Dir) Forget() {
glog.V(3).Infof("Forget dir %s", dir.FullPath())
+
+ dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath()))
}
func (dir *Dir) maybeLoadEntry() error {
@@ -443,7 +484,9 @@ func (dir *Dir) saveEntry() error {
return fuse.EIO
}
- dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ if dir.wfs.option.AsyncMetaDataCaching {
+ dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ }
return nil
})
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
index 4990e743c..d1858e99b 100644
--- a/weed/filesys/dir_link.go
+++ b/weed/filesys/dir_link.go
@@ -42,7 +42,9 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
return fuse.EIO
}
- dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ if dir.wfs.option.AsyncMetaDataCaching {
+ dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ }
return nil
})
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go
index 92d667c57..ea40f5c31 100644
--- a/weed/filesys/dir_rename.go
+++ b/weed/filesys/dir_rename.go
@@ -38,5 +38,14 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
})
+ if err == nil {
+ dir.wfs.cacheDelete(newPath)
+ dir.wfs.cacheDelete(oldPath)
+
+ // fmt.Printf("rename path: %v => %v\n", oldPath, newPath)
+ dir.wfs.fsNodeCache.Move(oldPath, newPath)
+
+ }
+
return err
}
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 0f788a888..bafbd7cc8 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -150,6 +150,8 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
return nil
}
+ file.wfs.cacheDelete(file.fullpath())
+
return file.saveEntry()
}
@@ -166,6 +168,8 @@ func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error
return err
}
+ file.wfs.cacheDelete(file.fullpath())
+
return file.saveEntry()
}
@@ -182,6 +186,8 @@ func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest)
return err
}
+ file.wfs.cacheDelete(file.fullpath())
+
return file.saveEntry()
}
@@ -212,7 +218,8 @@ func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
func (file *File) Forget() {
t := util.NewFullPath(file.dir.FullPath(), file.Name)
- glog.V(4).Infof("Forget file %s", t)
+ glog.V(3).Infof("Forget file %s", t)
+ file.wfs.fsNodeCache.DeleteFsNode(t)
}
func (file *File) maybeLoadEntry(ctx context.Context) error {
@@ -271,7 +278,9 @@ func (file *File) saveEntry() error {
return fuse.EIO
}
- file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ if file.wfs.option.AsyncMetaDataCaching {
+ file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ }
return nil
})
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 9b9df916c..83daccad1 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -215,7 +215,9 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
}
- fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ if fh.f.wfs.option.AsyncMetaDataCaching {
+ fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ }
fh.f.wfs.deleteFileChunks(garbages)
for i, chunk := range garbages {
diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go
new file mode 100644
index 000000000..b146f0615
--- /dev/null
+++ b/weed/filesys/fscache.go
@@ -0,0 +1,207 @@
+package filesys
+
+import (
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/seaweedfs/fuse/fs"
+)
+
+type FsCache struct {
+ root *FsNode
+ sync.RWMutex
+}
+type FsNode struct {
+ parent *FsNode
+ node fs.Node
+ name string
+ childrenLock sync.RWMutex
+ children map[string]*FsNode
+}
+
+func newFsCache(root fs.Node) *FsCache {
+ return &FsCache{
+ root: &FsNode{
+ node: root,
+ },
+ }
+}
+
+func (c *FsCache) GetFsNode(path util.FullPath) fs.Node {
+
+ c.RLock()
+ defer c.RUnlock()
+
+ return c.doGetFsNode(path)
+}
+
+func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node {
+ t := c.root
+ for _, p := range path.Split() {
+ t = t.findChild(p)
+ if t == nil {
+ return nil
+ }
+ }
+ return t.node
+}
+
+func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) {
+
+ c.Lock()
+ defer c.Unlock()
+
+ c.doSetFsNode(path, node)
+}
+
+func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) {
+ t := c.root
+ for _, p := range path.Split() {
+ t = t.ensureChild(p)
+ }
+ t.node = node
+}
+
+func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node {
+
+ c.Lock()
+ defer c.Unlock()
+
+ t := c.doGetFsNode(path)
+ if t != nil {
+ return t
+ }
+ t = genNodeFn()
+ c.doSetFsNode(path, t)
+ return t
+}
+
+func (c *FsCache) DeleteFsNode(path util.FullPath) {
+
+ c.Lock()
+ defer c.Unlock()
+
+ t := c.root
+ for _, p := range path.Split() {
+ t = t.findChild(p)
+ if t == nil {
+ return
+ }
+ }
+ if t.parent != nil {
+ t.parent.disconnectChild(t)
+ }
+ t.deleteSelf()
+}
+
+// oldPath and newPath are full path including the new name
+func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
+
+ c.Lock()
+ defer c.Unlock()
+
+ // find old node
+ src := c.root
+ for _, p := range oldPath.Split() {
+ src = src.findChild(p)
+ if src == nil {
+ return src
+ }
+ }
+ if src.parent != nil {
+ src.parent.disconnectChild(src)
+ }
+
+ // find new node
+ target := c.root
+ for _, p := range newPath.Split() {
+ target = target.ensureChild(p)
+ }
+ parent := target.parent
+ src.name = target.name
+ if dir, ok := src.node.(*Dir); ok {
+ dir.name = target.name // target is not Dir, but a shortcut
+ }
+ if f, ok := src.node.(*File); ok {
+ f.Name = target.name
+ if f.entry != nil {
+ f.entry.Name = f.Name
+ }
+ }
+ parent.disconnectChild(target)
+
+ target.deleteSelf()
+
+ src.connectToParent(parent)
+
+ return src
+}
+
+func (n *FsNode) connectToParent(parent *FsNode) {
+ n.parent = parent
+ oldNode := parent.findChild(n.name)
+ if oldNode != nil {
+ oldNode.deleteSelf()
+ }
+ if dir, ok := n.node.(*Dir); ok {
+ dir.parent = parent.node.(*Dir)
+ }
+ if f, ok := n.node.(*File); ok {
+ f.dir = parent.node.(*Dir)
+ }
+ n.childrenLock.Lock()
+ parent.children[n.name] = n
+ n.childrenLock.Unlock()
+}
+
+func (n *FsNode) findChild(name string) *FsNode {
+ n.childrenLock.RLock()
+ defer n.childrenLock.RUnlock()
+
+ child, found := n.children[name]
+ if found {
+ return child
+ }
+ return nil
+}
+
+func (n *FsNode) ensureChild(name string) *FsNode {
+ n.childrenLock.Lock()
+ defer n.childrenLock.Unlock()
+
+ if n.children == nil {
+ n.children = make(map[string]*FsNode)
+ }
+ child, found := n.children[name]
+ if found {
+ return child
+ }
+ t := &FsNode{
+ parent: n,
+ node: nil,
+ name: name,
+ children: nil,
+ }
+ n.children[name] = t
+ return t
+}
+
+func (n *FsNode) disconnectChild(child *FsNode) {
+ n.childrenLock.Lock()
+ delete(n.children, child.name)
+ n.childrenLock.Unlock()
+ child.parent = nil
+}
+
+func (n *FsNode) deleteSelf() {
+ n.childrenLock.Lock()
+ for _, child := range n.children {
+ child.deleteSelf()
+ }
+ n.children = nil
+ n.childrenLock.Unlock()
+
+ n.node = nil
+ n.parent = nil
+
+}
diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go
new file mode 100644
index 000000000..67f9aacc8
--- /dev/null
+++ b/weed/filesys/fscache_test.go
@@ -0,0 +1,96 @@
+package filesys
+
+import (
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func TestPathSplit(t *testing.T) {
+ parts := util.FullPath("/").Split()
+ if len(parts) != 0 {
+ t.Errorf("expecting an empty list, but getting %d", len(parts))
+ }
+
+ parts = util.FullPath("/readme.md").Split()
+ if len(parts) != 1 {
+ t.Errorf("expecting an empty list, but getting %d", len(parts))
+ }
+
+}
+
+func TestFsCache(t *testing.T) {
+
+ cache := newFsCache(nil)
+
+ x := cache.GetFsNode(util.FullPath("/y/x"))
+ if x != nil {
+ t.Errorf("wrong node!")
+ }
+
+ p := util.FullPath("/a/b/c")
+ cache.SetFsNode(p, &File{Name: "cc"})
+ tNode := cache.GetFsNode(p)
+ tFile := tNode.(*File)
+ if tFile.Name != "cc" {
+ t.Errorf("expecting a FsNode")
+ }
+
+ cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
+ cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
+ cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"})
+ cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
+ cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
+
+ b := cache.GetFsNode(util.FullPath("/a/b"))
+ if b != nil {
+ t.Errorf("unexpected node!")
+ }
+
+ a := cache.GetFsNode(util.FullPath("/a"))
+ if a == nil {
+ t.Errorf("missing node!")
+ }
+
+ cache.DeleteFsNode(util.FullPath("/a"))
+ if b != nil {
+ t.Errorf("unexpected node!")
+ }
+
+ a = cache.GetFsNode(util.FullPath("/a"))
+ if a != nil {
+ t.Errorf("wrong DeleteFsNode!")
+ }
+
+ z := cache.GetFsNode(util.FullPath("/z"))
+ if z == nil {
+ t.Errorf("missing node!")
+ }
+
+ y := cache.GetFsNode(util.FullPath("/x/y"))
+ if y != nil {
+ t.Errorf("wrong node!")
+ }
+
+}
+
+func TestFsCacheMove(t *testing.T) {
+
+ cache := newFsCache(nil)
+
+ cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
+ cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
+ cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
+ cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
+
+ cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x"))
+
+ d := cache.GetFsNode(util.FullPath("/z/x/d"))
+ if d == nil {
+ t.Errorf("unexpected nil node!")
+ }
+ if d.(*File).Name != "dd" {
+ t.Errorf("unexpected non dd node!")
+ }
+
+}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index ee4dcc916..2b0ef64c2 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -10,12 +10,9 @@ import (
"sync"
"time"
- "google.golang.org/grpc"
-
"github.com/chrislusf/seaweedfs/weed/util/grace"
-
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
+ "github.com/karlseguin/ccache"
+ "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -23,6 +20,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
+ "github.com/seaweedfs/fuse"
+ "github.com/seaweedfs/fuse/fs"
)
type Option struct {
@@ -48,6 +47,7 @@ type Option struct {
OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
Cipher bool // whether encrypt data on volume server
+ AsyncMetaDataCaching bool // whether asynchronously cache meta data
}
@@ -56,6 +56,7 @@ var _ = fs.FSStatfser(&WFS{})
type WFS struct {
option *Option
+ listDirectoryEntriesCache *ccache.Cache
// contains all open handles, protected by handlesLock
handlesLock sync.Mutex
@@ -66,6 +67,7 @@ type WFS struct {
stats statsCache
root fs.Node
+ fsNodeCache *FsCache
chunkCache *chunk_cache.ChunkCache
metaCache *meta_cache.MetaCache
@@ -78,6 +80,7 @@ type statsCache struct {
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
option: option,
+ listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
handles: make(map[uint64]*FileHandle),
bufPool: sync.Pool{
New: func() interface{} {
@@ -92,18 +95,21 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.chunkCache.Shutdown()
})
}
- wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta"))
- startTime := time.Now()
- if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil {
- glog.V(0).Infof("failed to init meta cache: %v", err)
- } else {
- go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
- grace.OnInterrupt(func() {
- wfs.metaCache.Shutdown()
- })
+ if wfs.option.AsyncMetaDataCaching {
+ wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta"))
+ startTime := time.Now()
+ if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil {
+ glog.V(0).Infof("failed to init meta cache: %v", err)
+ } else {
+ go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
+ grace.OnInterrupt(func() {
+ wfs.metaCache.Shutdown()
+ })
+ }
}
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
+ wfs.fsNodeCache = newFsCache(wfs.root)
return wfs
}
@@ -223,6 +229,24 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
return nil
}
+func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry {
+ item := wfs.listDirectoryEntriesCache.Get(string(path))
+ if item != nil && !item.Expired() {
+ return item.Value().(*filer_pb.Entry)
+ }
+ return nil
+}
+func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
+ if entry == nil {
+ wfs.listDirectoryEntriesCache.Delete(string(path))
+ } else {
+ wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl)
+ }
+}
+func (wfs *WFS) cacheDelete(path util.FullPath) {
+ wfs.listDirectoryEntriesCache.Delete(string(path))
+}
+
func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
if !wfs.option.OutsideContainerClusterMode {
return hostAndPort
diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go
index 870a72ebe..9603ece79 100644
--- a/weed/filesys/xattr.go
+++ b/weed/filesys/xattr.go
@@ -3,11 +3,11 @@ package filesys
import (
"context"
- "github.com/seaweedfs/fuse"
-
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/seaweedfs/fuse"
)
func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
@@ -111,14 +111,44 @@ func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.Lis
func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) {
fullpath := util.NewFullPath(dir, name)
+ entry = wfs.cacheGet(fullpath)
+ if entry != nil {
+ return
+ }
// glog.V(3).Infof("read entry cache miss %s", fullpath)
// read from async meta cache
meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
- cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
- if cacheErr == filer_pb.ErrNotFound {
- return nil, fuse.ENOENT
+ if wfs.option.AsyncMetaDataCaching {
+ cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
+ if cacheErr == filer_pb.ErrNotFound {
+ return nil, fuse.ENOENT
+ }
+ return cachedEntry.ToProtoEntry(), nil
}
- return cachedEntry.ToProtoEntry(), nil
+ err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.LookupDirectoryEntryRequest{
+ Name: name,
+ Directory: dir,
+ }
+
+ resp, err := filer_pb.LookupEntry(client, request)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ glog.V(3).Infof("file attr read not found file %v: %v", request, err)
+ return fuse.ENOENT
+ }
+ glog.V(3).Infof("attr read %v: %v", request, err)
+ return fuse.EIO
+ }
+
+ entry = resp.Entry
+ wfs.cacheSet(fullpath, entry, wfs.option.EntryCacheTtl)
+
+ return nil
+ })
+
+ return
}