aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-21 18:50:30 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-21 18:50:30 -0700
commit4f02f7121d232507bbbba825fa241bc8d5e630ff (patch)
treed9facd0b9188e459c0a483cb0891b97594ea1dd7
parentb8e4238ad29a60b37a3983a9968e325dd6849103 (diff)
downloadseaweedfs-4f02f7121d232507bbbba825fa241bc8d5e630ff.tar.xz
seaweedfs-4f02f7121d232507bbbba825fa241bc8d5e630ff.zip
read from meta cache
meta cache is not initialized
-rw-r--r--weed/filesys/dir.go29
-rw-r--r--weed/filesys/meta_cache/cache_config.go32
-rw-r--r--weed/filesys/meta_cache/meta_cache.go34
-rw-r--r--weed/filesys/wfs.go6
-rw-r--r--weed/filesys/xattr.go9
5 files changed, 101 insertions, 9 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index ace958e82..5b0e17ea3 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -206,7 +206,11 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
entry := dir.wfs.cacheGet(fullFilePath)
if dir.wfs.option.AsyncMetaDataCaching {
-
+ cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
+ if cacheErr == filer_pb.ErrNotFound {
+ return nil, fuse.ENOENT
+ }
+ entry = cachedEntry.ToProtoEntry()
}
if entry == nil {
@@ -248,13 +252,8 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath())
- if dir.wfs.option.AsyncMetaDataCaching {
-
- }
-
cacheTtl := 5 * time.Minute
-
- readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", func(entry *filer_pb.Entry, isLast bool) {
+ processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) {
fullpath := util.NewFullPath(dir.FullPath(), entry.Name)
inode := fullpath.AsInode()
if entry.IsDirectory {
@@ -265,7 +264,21 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
ret = append(ret, dirent)
}
dir.wfs.cacheSet(fullpath, entry, cacheTtl)
- })
+ }
+
+ if dir.wfs.option.AsyncMetaDataCaching {
+ listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit))
+ if listErr != nil {
+ glog.Errorf("list meta cache: %v", listErr)
+ return nil, fuse.EIO
+ }
+ for _, cachedEntry := range listedEntries {
+ processEachEntryFn(cachedEntry.ToProtoEntry(), false)
+ }
+ return
+ }
+
+ readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", processEachEntryFn)
if readErr != nil {
glog.V(0).Infof("list %s: %v", dir.FullPath(), err)
return ret, fuse.EIO
diff --git a/weed/filesys/meta_cache/cache_config.go b/weed/filesys/meta_cache/cache_config.go
new file mode 100644
index 000000000..e6593ebde
--- /dev/null
+++ b/weed/filesys/meta_cache/cache_config.go
@@ -0,0 +1,32 @@
+package meta_cache
+
+import "github.com/chrislusf/seaweedfs/weed/util"
+
+var (
+ _ = util.Configuration(&cacheConfig{})
+)
+
+// implementing util.Configuraion
+type cacheConfig struct {
+ dir string
+}
+
+func (c cacheConfig) GetString(key string) string {
+ return c.dir
+}
+
+func (c cacheConfig) GetBool(key string) bool {
+ panic("implement me")
+}
+
+func (c cacheConfig) GetInt(key string) int {
+ panic("implement me")
+}
+
+func (c cacheConfig) GetStringSlice(key string) []string {
+ panic("implement me")
+}
+
+func (c cacheConfig) SetDefault(key string, value interface{}) {
+ panic("implement me")
+}
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
new file mode 100644
index 000000000..4f047e824
--- /dev/null
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -0,0 +1,34 @@
+package meta_cache
+
+import (
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+type MetaCache struct {
+ filer2.FilerStore
+}
+
+func NewMetaCache(dbFolder string) *MetaCache {
+ return &MetaCache{
+ FilerStore: OpenMetaStore(dbFolder),
+ }
+}
+
+func OpenMetaStore(dbFolder string) filer2.FilerStore {
+
+ os.MkdirAll(dbFolder, 0755)
+
+ store := &leveldb.LevelDBStore{}
+ config := &cacheConfig{}
+
+ if err := store.Initialize(config, ""); err != nil {
+ glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
+ }
+
+ return store
+
+}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 2013f3c03..b27ac440d 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -5,6 +5,7 @@ import (
"fmt"
"math"
"os"
+ "path"
"strings"
"sync"
"time"
@@ -12,6 +13,7 @@ import (
"github.com/karlseguin/ccache"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -67,6 +69,7 @@ type WFS struct {
fsNodeCache *FsCache
chunkCache *chunk_cache.ChunkCache
+ metaCache *meta_cache.MetaCache
}
type statsCache struct {
filer_pb.StatisticsResponse
@@ -90,6 +93,9 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.chunkCache.Shutdown()
})
}
+ if wfs.option.AsyncMetaDataCaching {
+ wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta"))
+ }
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
wfs.fsNodeCache = newFsCache(wfs.root)
diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go
index 1768c1d6e..7e7b8c60b 100644
--- a/weed/filesys/xattr.go
+++ b/weed/filesys/xattr.go
@@ -1,6 +1,8 @@
package filesys
import (
+ "context"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -114,8 +116,13 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err err
}
// glog.V(3).Infof("read entry cache miss %s", fullpath)
+ // read from async meta cache
if wfs.option.AsyncMetaDataCaching {
-
+ cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
+ if cacheErr == filer_pb.ErrNotFound {
+ return nil, fuse.ENOENT
+ }
+ return cachedEntry.ToProtoEntry(), nil
}
err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {