aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-02-14 01:09:31 -0800
committerchrislu <chris.lu@gmail.com>2022-02-14 01:09:31 -0800
commitdbeeda812376eda39997cd814c3e7eefaf4ea686 (patch)
tree18e989645dec87c977d59ea0fdaea8b073244b17
parent7286e525ad85dec877d506908a0ff35590b0f357 (diff)
downloadseaweedfs-dbeeda812376eda39997cd814c3e7eefaf4ea686.tar.xz
seaweedfs-dbeeda812376eda39997cd814c3e7eefaf4ea686.zip
listen for metadata updates
-rw-r--r--weed/command/mount2_std.go4
-rw-r--r--weed/mount/directory_read.go2
-rw-r--r--weed/mount/inode_to_path.go65
-rw-r--r--weed/mount/meta_cache/cache_config.go32
-rw-r--r--weed/mount/meta_cache/id_mapper.go101
-rw-r--r--weed/mount/meta_cache/meta_cache.go160
-rw-r--r--weed/mount/meta_cache/meta_cache_init.go67
-rw-r--r--weed/mount/meta_cache/meta_cache_subscribe.go68
-rw-r--r--weed/mount/weedfs.go13
-rw-r--r--weed/mount/weedfs_dir_lookup.go4
-rw-r--r--weed/mount/weedfs_dir_mkrm.go2
-rw-r--r--weed/mount/weedfs_dir_read.go2
-rw-r--r--weed/mount/weedfs_file_mkrm.go4
-rw-r--r--weed/mount/weedfs_forget.go9
-rw-r--r--weed/mount/weedfs_link.go2
-rw-r--r--weed/mount/weedfs_rename.go2
-rw-r--r--weed/mount/weedfs_symlink.go2
17 files changed, 499 insertions, 40 deletions
diff --git a/weed/command/mount2_std.go b/weed/command/mount2_std.go
index cb2b46556..584a72fc1 100644
--- a/weed/command/mount2_std.go
+++ b/weed/command/mount2_std.go
@@ -3,9 +3,9 @@ package command
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/mount"
+ "github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
"github.com/chrislusf/seaweedfs/weed/mount/unmount"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -200,6 +200,8 @@ func RunMount2(option *Mount2Options, umask os.FileMode) bool {
unmount.Unmount(dir)
})
+ seaweedFileSystem.StartBackgroundTasks()
+
fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH)
server.Serve()
diff --git a/weed/mount/directory_read.go b/weed/mount/directory_read.go
index 51c51ae16..6034856f0 100644
--- a/weed/mount/directory_read.go
+++ b/weed/mount/directory_read.go
@@ -3,8 +3,8 @@ package mount
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse"
diff --git a/weed/mount/inode_to_path.go b/weed/mount/inode_to_path.go
index 590531397..ffb0cc02f 100644
--- a/weed/mount/inode_to_path.go
+++ b/weed/mount/inode_to_path.go
@@ -14,21 +14,23 @@ type InodeToPath struct {
}
type InodeEntry struct {
util.FullPath
- nlookup uint64
+ nlookup uint64
+ isDirectory bool
+ isChildrenCached bool
}
func NewInodeToPath() *InodeToPath {
- return &InodeToPath{
+ t := &InodeToPath{
inode2path: make(map[uint64]*InodeEntry),
path2inode: make(map[util.FullPath]uint64),
nextInodeId: 2, // the root inode id is 1
}
+ t.inode2path[1] = &InodeEntry{"/", 1, true, false}
+ t.path2inode["/"] = 1
+ return t
}
-func (i *InodeToPath) Lookup(path util.FullPath) uint64 {
- if path == "/" {
- return 1
- }
+func (i *InodeToPath) Lookup(path util.FullPath, isDirectory bool) uint64 {
i.Lock()
defer i.Unlock()
inode, found := i.path2inode[path]
@@ -36,7 +38,7 @@ func (i *InodeToPath) Lookup(path util.FullPath) uint64 {
inode = i.nextInodeId
i.nextInodeId++
i.path2inode[path] = inode
- i.inode2path[inode] = &InodeEntry{path, 1}
+ i.inode2path[inode] = &InodeEntry{path, 1, isDirectory, false}
} else {
i.inode2path[inode].nlookup++
}
@@ -58,9 +60,6 @@ func (i *InodeToPath) GetInode(path util.FullPath) uint64 {
}
func (i *InodeToPath) GetPath(inode uint64) util.FullPath {
- if inode == 1 {
- return "/"
- }
i.RLock()
defer i.RUnlock()
path, found := i.inode2path[inode]
@@ -71,15 +70,37 @@ func (i *InodeToPath) GetPath(inode uint64) util.FullPath {
}
func (i *InodeToPath) HasPath(path util.FullPath) bool {
- if path == "/" {
- return true
- }
i.RLock()
defer i.RUnlock()
_, found := i.path2inode[path]
return found
}
+func (i *InodeToPath) MarkChildrenCached(fullpath util.FullPath) {
+ i.RLock()
+ defer i.RUnlock()
+ inode, found := i.path2inode[fullpath]
+ if !found {
+ glog.Fatalf("MarkChildrenCached not found inode %v", fullpath)
+ }
+ path, found := i.inode2path[inode]
+ path.isChildrenCached = true
+}
+
+func (i *InodeToPath) IsChildrenCached(fullpath util.FullPath) bool {
+ i.RLock()
+ defer i.RUnlock()
+ inode, found := i.path2inode[fullpath]
+ if !found {
+ return false
+ }
+ path, found := i.inode2path[inode]
+ if found {
+ return path.isChildrenCached
+ }
+ return false
+}
+
func (i *InodeToPath) HasInode(inode uint64) bool {
if inode == 1 {
return true
@@ -91,9 +112,6 @@ func (i *InodeToPath) HasInode(inode uint64) bool {
}
func (i *InodeToPath) RemovePath(path util.FullPath) {
- if path == "/" {
- return
- }
i.Lock()
defer i.Unlock()
inode, found := i.path2inode[path]
@@ -104,9 +122,6 @@ func (i *InodeToPath) RemovePath(path util.FullPath) {
}
func (i *InodeToPath) MovePath(sourcePath, targetPath util.FullPath) {
- if sourcePath == "/" || targetPath == "/" {
- return
- }
i.Lock()
defer i.Unlock()
sourceInode, sourceFound := i.path2inode[sourcePath]
@@ -127,12 +142,8 @@ func (i *InodeToPath) MovePath(sourcePath, targetPath util.FullPath) {
}
}
-func (i *InodeToPath) Forget(inode, nlookup uint64) {
- if inode == 1 {
- return
- }
+func (i *InodeToPath) Forget(inode, nlookup uint64, onForgetDir func(dir util.FullPath)) {
i.Lock()
- defer i.Unlock()
path, found := i.inode2path[inode]
if found {
path.nlookup -= nlookup
@@ -141,4 +152,10 @@ func (i *InodeToPath) Forget(inode, nlookup uint64) {
delete(i.inode2path, inode)
}
}
+ i.Unlock()
+ if found {
+ if path.isDirectory && onForgetDir != nil {
+ onForgetDir(path.FullPath)
+ }
+ }
}
diff --git a/weed/mount/meta_cache/cache_config.go b/weed/mount/meta_cache/cache_config.go
new file mode 100644
index 000000000..e6593ebde
--- /dev/null
+++ b/weed/mount/meta_cache/cache_config.go
@@ -0,0 +1,32 @@
+package meta_cache
+
+import "github.com/chrislusf/seaweedfs/weed/util"
+
+var (
+ _ = util.Configuration(&cacheConfig{})
+)
+
+// implementing util.Configuraion
+type cacheConfig struct {
+ dir string
+}
+
+func (c cacheConfig) GetString(key string) string {
+ return c.dir
+}
+
+func (c cacheConfig) GetBool(key string) bool {
+ panic("implement me")
+}
+
+func (c cacheConfig) GetInt(key string) int {
+ panic("implement me")
+}
+
+func (c cacheConfig) GetStringSlice(key string) []string {
+ panic("implement me")
+}
+
+func (c cacheConfig) SetDefault(key string, value interface{}) {
+ panic("implement me")
+}
diff --git a/weed/mount/meta_cache/id_mapper.go b/weed/mount/meta_cache/id_mapper.go
new file mode 100644
index 000000000..4a2179f31
--- /dev/null
+++ b/weed/mount/meta_cache/id_mapper.go
@@ -0,0 +1,101 @@
+package meta_cache
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+)
+
+type UidGidMapper struct {
+ uidMapper *IdMapper
+ gidMapper *IdMapper
+}
+
+type IdMapper struct {
+ localToFiler map[uint32]uint32
+ filerToLocal map[uint32]uint32
+}
+
+// UidGidMapper translates local uid/gid to filer uid/gid
+// The local storage always persists the same as the filer.
+// The local->filer translation happens when updating the filer first and later saving to meta_cache.
+// And filer->local happens when reading from the meta_cache.
+func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) {
+ uidMapper, err := newIdMapper(uidPairsStr)
+ if err != nil {
+ return nil, err
+ }
+ gidMapper, err := newIdMapper(gidPairStr)
+ if err != nil {
+ return nil, err
+ }
+
+ return &UidGidMapper{
+ uidMapper: uidMapper,
+ gidMapper: gidMapper,
+ }, nil
+}
+
+func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) {
+ return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid)
+}
+func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) {
+ return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid)
+}
+
+func (m *IdMapper) LocalToFiler(id uint32) uint32 {
+ value, found := m.localToFiler[id]
+ if found {
+ return value
+ }
+ return id
+}
+func (m *IdMapper) FilerToLocal(id uint32) uint32 {
+ value, found := m.filerToLocal[id]
+ if found {
+ return value
+ }
+ return id
+}
+
+func newIdMapper(pairsStr string) (*IdMapper, error) {
+
+ localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr)
+ if err != nil {
+ return nil, err
+ }
+
+ return &IdMapper{
+ localToFiler: localToFiler,
+ filerToLocal: filerToLocal,
+ }, nil
+
+}
+
+func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) {
+
+ if pairsStr == "" {
+ return
+ }
+
+ localToFiler = make(map[uint32]uint32)
+ filerToLocal = make(map[uint32]uint32)
+ for _, pairStr := range strings.Split(pairsStr, ",") {
+ pair := strings.Split(pairStr, ":")
+ localUidStr, filerUidStr := pair[0], pair[1]
+ localUid, localUidErr := strconv.Atoi(localUidStr)
+ if localUidErr != nil {
+ err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr)
+ return
+ }
+ filerUid, filerUidErr := strconv.Atoi(filerUidStr)
+ if filerUidErr != nil {
+ err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr)
+ return
+ }
+ localToFiler[uint32(localUid)] = uint32(filerUid)
+ filerToLocal[uint32(filerUid)] = uint32(localUid)
+ }
+
+ return
+}
diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go
new file mode 100644
index 000000000..7f997c5b0
--- /dev/null
+++ b/weed/mount/meta_cache/meta_cache.go
@@ -0,0 +1,160 @@
+package meta_cache
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "os"
+)
+
+// need to have logic similar to FilerStoreWrapper
+// e.g. fill fileId field for chunks
+
+type MetaCache struct {
+ localStore filer.VirtualFilerStore
+ // sync.RWMutex
+ uidGidMapper *UidGidMapper
+ markCachedFn func(fullpath util.FullPath)
+ isCachedFn func(fullpath util.FullPath) bool
+ invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
+}
+
+func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache {
+ return &MetaCache{
+ localStore: openMetaStore(dbFolder),
+ markCachedFn: markCachedFn,
+ isCachedFn: isCachedFn,
+ uidGidMapper: uidGidMapper,
+ invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) {
+ invalidateFunc(fullpath, entry)
+ },
+ }
+}
+
+func openMetaStore(dbFolder string) filer.VirtualFilerStore {
+
+ os.RemoveAll(dbFolder)
+ os.MkdirAll(dbFolder, 0755)
+
+ store := &leveldb.LevelDBStore{}
+ config := &cacheConfig{
+ dir: dbFolder,
+ }
+
+ if err := store.Initialize(config, ""); err != nil {
+ glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
+ }
+
+ return filer.NewFilerStoreWrapper(store)
+
+}
+
+func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
+ //mc.Lock()
+ //defer mc.Unlock()
+ return mc.doInsertEntry(ctx, entry)
+}
+
+func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
+ return mc.localStore.InsertEntry(ctx, entry)
+}
+
+func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
+ //mc.Lock()
+ //defer mc.Unlock()
+
+ oldDir, _ := oldPath.DirAndName()
+ if mc.isCachedFn(util.FullPath(oldDir)) {
+ if oldPath != "" {
+ if newEntry != nil && oldPath == newEntry.FullPath {
+ // skip the unnecessary deletion
+ // leave the update to the following InsertEntry operation
+ } else {
+ glog.V(3).Infof("DeleteEntry %s", oldPath)
+ if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
+ return err
+ }
+ }
+ }
+ } else {
+ // println("unknown old directory:", oldDir)
+ }
+
+ if newEntry != nil {
+ newDir, _ := newEntry.DirAndName()
+ if mc.isCachedFn(util.FullPath(newDir)) {
+ glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name())
+ if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
+ //mc.Lock()
+ //defer mc.Unlock()
+ return mc.localStore.UpdateEntry(ctx, entry)
+}
+
+func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
+ //mc.RLock()
+ //defer mc.RUnlock()
+ entry, err = mc.localStore.FindEntry(ctx, fp)
+ if err != nil {
+ return nil, err
+ }
+ mc.mapIdFromFilerToLocal(entry)
+ return
+}
+
+func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
+ //mc.Lock()
+ //defer mc.Unlock()
+ return mc.localStore.DeleteEntry(ctx, fp)
+}
+func (mc *MetaCache) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
+ //mc.Lock()
+ //defer mc.Unlock()
+ return mc.localStore.DeleteFolderChildren(ctx, fp)
+}
+
+func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
+ //mc.RLock()
+ //defer mc.RUnlock()
+
+ if !mc.isCachedFn(dirPath) {
+ // if this request comes after renaming, it should be fine
+ glog.Warningf("unsynchronized dir: %v", dirPath)
+ }
+
+ _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
+ mc.mapIdFromFilerToLocal(entry)
+ return eachEntryFunc(entry)
+ })
+ if err != nil {
+ return err
+ }
+ return err
+}
+
+func (mc *MetaCache) Shutdown() {
+ //mc.Lock()
+ //defer mc.Unlock()
+ mc.localStore.Shutdown()
+}
+
+func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) {
+ entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid)
+}
+
+func (mc *MetaCache) Debug() {
+ if debuggable, ok := mc.localStore.(filer.Debuggable); ok {
+ println("start debugging")
+ debuggable.Debug(os.Stderr)
+ }
+}
diff --git a/weed/mount/meta_cache/meta_cache_init.go b/weed/mount/meta_cache/meta_cache_init.go
new file mode 100644
index 000000000..cd9c71668
--- /dev/null
+++ b/weed/mount/meta_cache/meta_cache_init.go
@@ -0,0 +1,67 @@
+package meta_cache
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
+
+ for {
+
+ // the directory children are already cached
+ // so no need for this and upper directories
+ if mc.isCachedFn(dirPath) {
+ return nil
+ }
+
+ if err := doEnsureVisited(mc, client, dirPath); err != nil {
+ return err
+ }
+
+ // continue to parent directory
+ if dirPath != "/" {
+ parent, _ := dirPath.DirAndName()
+ dirPath = util.FullPath(parent)
+ } else {
+ break
+ }
+ }
+
+ return nil
+
+}
+
+func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error {
+
+ glog.V(4).Infof("ReadDirAllEntries %s ...", path)
+
+ err := util.Retry("ReadDirAllEntries", func() error {
+ return filer_pb.ReadDirAllEntries(client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
+ entry := filer.FromPbEntry(string(path), pbEntry)
+ if IsHiddenSystemEntry(string(path), entry.Name()) {
+ return nil
+ }
+ if err := mc.doInsertEntry(context.Background(), entry); err != nil {
+ glog.V(0).Infof("read %s: %v", entry.FullPath, err)
+ return err
+ }
+ return nil
+ })
+ })
+
+ if err != nil {
+ err = fmt.Errorf("list %s: %v", path, err)
+ }
+ mc.markCachedFn(path)
+ return err
+}
+
+func IsHiddenSystemEntry(dir, name string) bool {
+ return dir == "/" && (name == "topics" || name == "etc")
+}
diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go
new file mode 100644
index 000000000..881fee08f
--- /dev/null
+++ b/weed/mount/meta_cache/meta_cache_subscribe.go
@@ -0,0 +1,68 @@
+package meta_cache
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
+
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+
+ for _, sig := range message.Signatures {
+ if sig == selfSignature && selfSignature != 0 {
+ return nil
+ }
+ }
+
+ dir := resp.Directory
+ var oldPath util.FullPath
+ var newEntry *filer.Entry
+ if message.OldEntry != nil {
+ oldPath = util.NewFullPath(dir, message.OldEntry.Name)
+ glog.V(4).Infof("deleting %v", oldPath)
+ }
+
+ if message.NewEntry != nil {
+ if message.NewParentPath != "" {
+ dir = message.NewParentPath
+ }
+ key := util.NewFullPath(dir, message.NewEntry.Name)
+ glog.V(4).Infof("creating %v", key)
+ newEntry = filer.FromPbEntry(dir, message.NewEntry)
+ }
+ err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
+ if err == nil {
+ if message.OldEntry != nil && message.NewEntry != nil {
+ oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
+ mc.invalidateFunc(oldKey, message.OldEntry)
+ if message.OldEntry.Name != message.NewEntry.Name {
+ newKey := util.NewFullPath(dir, message.NewEntry.Name)
+ mc.invalidateFunc(newKey, message.NewEntry)
+ }
+ } else if message.OldEntry == nil && message.NewEntry != nil {
+ // no need to invaalidate
+ } else if message.OldEntry != nil && message.NewEntry == nil {
+ oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
+ mc.invalidateFunc(oldKey, message.OldEntry)
+ }
+ }
+
+ return err
+
+ }
+
+ util.RetryForever("followMetaUpdates", func() error {
+ return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, selfSignature, processEventFn, true)
+ }, func(err error) bool {
+ glog.Errorf("follow metadata updates: %v", err)
+ return true
+ })
+
+ return nil
+}
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index b7f50cd13..0fdd9bd28 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -3,7 +3,7 @@ package mount
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
+ "github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
@@ -91,7 +91,11 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
}
- wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath, entry *filer_pb.Entry) {
+ wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), option.UidGidMapper, func(path util.FullPath) {
+ wfs.inodeToPath.MarkChildrenCached(path)
+ }, func(path util.FullPath) bool {
+ return wfs.inodeToPath.IsChildrenCached(path)
+ }, func(filePath util.FullPath, entry *filer_pb.Entry) {
})
grace.OnInterrupt(func() {
wfs.metaCache.Shutdown()
@@ -103,6 +107,11 @@ func NewSeaweedFileSystem(option *Option) *WFS {
return wfs
}
+func (wfs *WFS) StartBackgroundTasks() {
+ startTime := time.Now()
+ go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
+}
+
func (wfs *WFS) Root() *Directory {
return &wfs.root
}
diff --git a/weed/mount/weedfs_dir_lookup.go b/weed/mount/weedfs_dir_lookup.go
index 733e31908..30b61d75f 100644
--- a/weed/mount/weedfs_dir_lookup.go
+++ b/weed/mount/weedfs_dir_lookup.go
@@ -3,8 +3,8 @@ package mount
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/hanwen/go-fuse/v2/fuse"
)
@@ -50,7 +50,7 @@ func (wfs *WFS) Lookup(cancel <-chan struct{}, header *fuse.InHeader, name strin
return fuse.ENOENT
}
- inode := wfs.inodeToPath.Lookup(fullFilePath)
+ inode := wfs.inodeToPath.Lookup(fullFilePath, localEntry.IsDirectory())
wfs.outputFilerEntry(out, inode, localEntry)
diff --git a/weed/mount/weedfs_dir_mkrm.go b/weed/mount/weedfs_dir_mkrm.go
index 4efab078f..839fa493b 100644
--- a/weed/mount/weedfs_dir_mkrm.go
+++ b/weed/mount/weedfs_dir_mkrm.go
@@ -71,7 +71,7 @@ func (wfs *WFS) Mkdir(cancel <-chan struct{}, in *fuse.MkdirIn, name string, out
return fuse.EIO
}
- inode := wfs.inodeToPath.Lookup(entryFullPath)
+ inode := wfs.inodeToPath.Lookup(entryFullPath, true)
wfs.outputPbEntry(out, inode, newEntry)
diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go
index 3a187aa1c..0177c1863 100644
--- a/weed/mount/weedfs_dir_read.go
+++ b/weed/mount/weedfs_dir_read.go
@@ -3,8 +3,8 @@ package mount
import (
"context"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/hanwen/go-fuse/v2/fuse"
"math"
diff --git a/weed/mount/weedfs_file_mkrm.go b/weed/mount/weedfs_file_mkrm.go
index 218ce24f1..c3fd04661 100644
--- a/weed/mount/weedfs_file_mkrm.go
+++ b/weed/mount/weedfs_file_mkrm.go
@@ -88,7 +88,7 @@ func (wfs *WFS) Mknod(cancel <-chan struct{}, in *fuse.MknodIn, name string, out
return fuse.EIO
}
- inode := wfs.inodeToPath.Lookup(entryFullPath)
+ inode := wfs.inodeToPath.Lookup(entryFullPath, false)
wfs.outputPbEntry(out, inode, newEntry)
@@ -125,8 +125,6 @@ func (wfs *WFS) Unlink(cancel <-chan struct{}, header *fuse.InHeader, name strin
wfs.metaCache.DeleteEntry(context.Background(), entryFullPath)
wfs.inodeToPath.RemovePath(entryFullPath)
- // TODO handle open files, hardlink
-
return fuse.OK
}
diff --git a/weed/mount/weedfs_forget.go b/weed/mount/weedfs_forget.go
index 14b39882e..62946b216 100644
--- a/weed/mount/weedfs_forget.go
+++ b/weed/mount/weedfs_forget.go
@@ -1,5 +1,10 @@
package mount
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
// Forget is called when the kernel discards entries from its
// dentry cache. This happens on unmount, and when the kernel
// is short on memory. Since it is not guaranteed to occur at
@@ -57,5 +62,7 @@ Side effects: increments the lookup count on success
*/
func (wfs *WFS) Forget(nodeid, nlookup uint64) {
- wfs.inodeToPath.Forget(nodeid, nlookup)
+ wfs.inodeToPath.Forget(nodeid, nlookup, func(dir util.FullPath) {
+ wfs.metaCache.DeleteFolderChildren(context.Background(), dir)
+ })
}
diff --git a/weed/mount/weedfs_link.go b/weed/mount/weedfs_link.go
index 05710e5a0..ca252d639 100644
--- a/weed/mount/weedfs_link.go
+++ b/weed/mount/weedfs_link.go
@@ -85,7 +85,7 @@ func (wfs *WFS) Link(cancel <-chan struct{}, in *fuse.LinkIn, name string, out *
return fuse.EIO
}
- inode := wfs.inodeToPath.Lookup(newEntryPath)
+ inode := wfs.inodeToPath.Lookup(newEntryPath, false)
wfs.outputPbEntry(out, inode, request.Entry)
diff --git a/weed/mount/weedfs_rename.go b/weed/mount/weedfs_rename.go
index a4054b64a..9e461abce 100644
--- a/weed/mount/weedfs_rename.go
+++ b/weed/mount/weedfs_rename.go
@@ -223,8 +223,6 @@ func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR
wfs.inodeToPath.MovePath(oldPath, newPath)
- // TODO change file handle
-
} else if resp.EventNotification.OldEntry != nil {
// without new entry, only old entry name exists. This is the second step to delete old entry
if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil); err != nil {
diff --git a/weed/mount/weedfs_symlink.go b/weed/mount/weedfs_symlink.go
index 86a7b50e4..c47ad0a2e 100644
--- a/weed/mount/weedfs_symlink.go
+++ b/weed/mount/weedfs_symlink.go
@@ -56,7 +56,7 @@ func (wfs *WFS) Symlink(cancel <-chan struct{}, header *fuse.InHeader, target st
return fuse.EIO
}
- inode := wfs.inodeToPath.Lookup(entryFullPath)
+ inode := wfs.inodeToPath.Lookup(entryFullPath, false)
wfs.outputPbEntry(out, inode, request.Entry)