aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/meta_cache
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-02-14 01:09:31 -0800
committerchrislu <chris.lu@gmail.com>2022-02-14 01:09:31 -0800
commitdbeeda812376eda39997cd814c3e7eefaf4ea686 (patch)
tree18e989645dec87c977d59ea0fdaea8b073244b17 /weed/mount/meta_cache
parent7286e525ad85dec877d506908a0ff35590b0f357 (diff)
downloadseaweedfs-dbeeda812376eda39997cd814c3e7eefaf4ea686.tar.xz
seaweedfs-dbeeda812376eda39997cd814c3e7eefaf4ea686.zip
listen for metadata updates
Diffstat (limited to 'weed/mount/meta_cache')
-rw-r--r--weed/mount/meta_cache/cache_config.go32
-rw-r--r--weed/mount/meta_cache/id_mapper.go101
-rw-r--r--weed/mount/meta_cache/meta_cache.go160
-rw-r--r--weed/mount/meta_cache/meta_cache_init.go67
-rw-r--r--weed/mount/meta_cache/meta_cache_subscribe.go68
5 files changed, 428 insertions, 0 deletions
diff --git a/weed/mount/meta_cache/cache_config.go b/weed/mount/meta_cache/cache_config.go
new file mode 100644
index 000000000..e6593ebde
--- /dev/null
+++ b/weed/mount/meta_cache/cache_config.go
@@ -0,0 +1,32 @@
+package meta_cache
+
+import "github.com/chrislusf/seaweedfs/weed/util"
+
+var (
+ _ = util.Configuration(&cacheConfig{})
+)
+
+// implementing util.Configuraion
+type cacheConfig struct {
+ dir string
+}
+
+func (c cacheConfig) GetString(key string) string {
+ return c.dir
+}
+
+func (c cacheConfig) GetBool(key string) bool {
+ panic("implement me")
+}
+
+func (c cacheConfig) GetInt(key string) int {
+ panic("implement me")
+}
+
+func (c cacheConfig) GetStringSlice(key string) []string {
+ panic("implement me")
+}
+
+func (c cacheConfig) SetDefault(key string, value interface{}) {
+ panic("implement me")
+}
diff --git a/weed/mount/meta_cache/id_mapper.go b/weed/mount/meta_cache/id_mapper.go
new file mode 100644
index 000000000..4a2179f31
--- /dev/null
+++ b/weed/mount/meta_cache/id_mapper.go
@@ -0,0 +1,101 @@
+package meta_cache
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+)
+
+type UidGidMapper struct {
+ uidMapper *IdMapper
+ gidMapper *IdMapper
+}
+
+type IdMapper struct {
+ localToFiler map[uint32]uint32
+ filerToLocal map[uint32]uint32
+}
+
+// UidGidMapper translates local uid/gid to filer uid/gid
+// The local storage always persists the same as the filer.
+// The local->filer translation happens when updating the filer first and later saving to meta_cache.
+// And filer->local happens when reading from the meta_cache.
+func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) {
+ uidMapper, err := newIdMapper(uidPairsStr)
+ if err != nil {
+ return nil, err
+ }
+ gidMapper, err := newIdMapper(gidPairStr)
+ if err != nil {
+ return nil, err
+ }
+
+ return &UidGidMapper{
+ uidMapper: uidMapper,
+ gidMapper: gidMapper,
+ }, nil
+}
+
+func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) {
+ return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid)
+}
+func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) {
+ return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid)
+}
+
+func (m *IdMapper) LocalToFiler(id uint32) uint32 {
+ value, found := m.localToFiler[id]
+ if found {
+ return value
+ }
+ return id
+}
+func (m *IdMapper) FilerToLocal(id uint32) uint32 {
+ value, found := m.filerToLocal[id]
+ if found {
+ return value
+ }
+ return id
+}
+
+func newIdMapper(pairsStr string) (*IdMapper, error) {
+
+ localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr)
+ if err != nil {
+ return nil, err
+ }
+
+ return &IdMapper{
+ localToFiler: localToFiler,
+ filerToLocal: filerToLocal,
+ }, nil
+
+}
+
+func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) {
+
+ if pairsStr == "" {
+ return
+ }
+
+ localToFiler = make(map[uint32]uint32)
+ filerToLocal = make(map[uint32]uint32)
+ for _, pairStr := range strings.Split(pairsStr, ",") {
+ pair := strings.Split(pairStr, ":")
+ localUidStr, filerUidStr := pair[0], pair[1]
+ localUid, localUidErr := strconv.Atoi(localUidStr)
+ if localUidErr != nil {
+ err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr)
+ return
+ }
+ filerUid, filerUidErr := strconv.Atoi(filerUidStr)
+ if filerUidErr != nil {
+ err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr)
+ return
+ }
+ localToFiler[uint32(localUid)] = uint32(filerUid)
+ filerToLocal[uint32(filerUid)] = uint32(localUid)
+ }
+
+ return
+}
diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go
new file mode 100644
index 000000000..7f997c5b0
--- /dev/null
+++ b/weed/mount/meta_cache/meta_cache.go
@@ -0,0 +1,160 @@
+package meta_cache
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "os"
+)
+
+// need to have logic similar to FilerStoreWrapper
+// e.g. fill fileId field for chunks
+
+type MetaCache struct {
+ localStore filer.VirtualFilerStore
+ // sync.RWMutex
+ uidGidMapper *UidGidMapper
+ markCachedFn func(fullpath util.FullPath)
+ isCachedFn func(fullpath util.FullPath) bool
+ invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
+}
+
+func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache {
+ return &MetaCache{
+ localStore: openMetaStore(dbFolder),
+ markCachedFn: markCachedFn,
+ isCachedFn: isCachedFn,
+ uidGidMapper: uidGidMapper,
+ invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) {
+ invalidateFunc(fullpath, entry)
+ },
+ }
+}
+
+func openMetaStore(dbFolder string) filer.VirtualFilerStore {
+
+ os.RemoveAll(dbFolder)
+ os.MkdirAll(dbFolder, 0755)
+
+ store := &leveldb.LevelDBStore{}
+ config := &cacheConfig{
+ dir: dbFolder,
+ }
+
+ if err := store.Initialize(config, ""); err != nil {
+ glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
+ }
+
+ return filer.NewFilerStoreWrapper(store)
+
+}
+
+func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
+ //mc.Lock()
+ //defer mc.Unlock()
+ return mc.doInsertEntry(ctx, entry)
+}
+
+func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
+ return mc.localStore.InsertEntry(ctx, entry)
+}
+
+func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
+ //mc.Lock()
+ //defer mc.Unlock()
+
+ oldDir, _ := oldPath.DirAndName()
+ if mc.isCachedFn(util.FullPath(oldDir)) {
+ if oldPath != "" {
+ if newEntry != nil && oldPath == newEntry.FullPath {
+ // skip the unnecessary deletion
+ // leave the update to the following InsertEntry operation
+ } else {
+ glog.V(3).Infof("DeleteEntry %s", oldPath)
+ if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
+ return err
+ }
+ }
+ }
+ } else {
+ // println("unknown old directory:", oldDir)
+ }
+
+ if newEntry != nil {
+ newDir, _ := newEntry.DirAndName()
+ if mc.isCachedFn(util.FullPath(newDir)) {
+ glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name())
+ if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
+ //mc.Lock()
+ //defer mc.Unlock()
+ return mc.localStore.UpdateEntry(ctx, entry)
+}
+
+func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
+ //mc.RLock()
+ //defer mc.RUnlock()
+ entry, err = mc.localStore.FindEntry(ctx, fp)
+ if err != nil {
+ return nil, err
+ }
+ mc.mapIdFromFilerToLocal(entry)
+ return
+}
+
+func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
+ //mc.Lock()
+ //defer mc.Unlock()
+ return mc.localStore.DeleteEntry(ctx, fp)
+}
+func (mc *MetaCache) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
+ //mc.Lock()
+ //defer mc.Unlock()
+ return mc.localStore.DeleteFolderChildren(ctx, fp)
+}
+
+func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
+ //mc.RLock()
+ //defer mc.RUnlock()
+
+ if !mc.isCachedFn(dirPath) {
+ // if this request comes after renaming, it should be fine
+ glog.Warningf("unsynchronized dir: %v", dirPath)
+ }
+
+ _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
+ mc.mapIdFromFilerToLocal(entry)
+ return eachEntryFunc(entry)
+ })
+ if err != nil {
+ return err
+ }
+ return err
+}
+
+func (mc *MetaCache) Shutdown() {
+ //mc.Lock()
+ //defer mc.Unlock()
+ mc.localStore.Shutdown()
+}
+
+func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) {
+ entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid)
+}
+
+func (mc *MetaCache) Debug() {
+ if debuggable, ok := mc.localStore.(filer.Debuggable); ok {
+ println("start debugging")
+ debuggable.Debug(os.Stderr)
+ }
+}
diff --git a/weed/mount/meta_cache/meta_cache_init.go b/weed/mount/meta_cache/meta_cache_init.go
new file mode 100644
index 000000000..cd9c71668
--- /dev/null
+++ b/weed/mount/meta_cache/meta_cache_init.go
@@ -0,0 +1,67 @@
+package meta_cache
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
+
+ for {
+
+ // the directory children are already cached
+ // so no need for this and upper directories
+ if mc.isCachedFn(dirPath) {
+ return nil
+ }
+
+ if err := doEnsureVisited(mc, client, dirPath); err != nil {
+ return err
+ }
+
+ // continue to parent directory
+ if dirPath != "/" {
+ parent, _ := dirPath.DirAndName()
+ dirPath = util.FullPath(parent)
+ } else {
+ break
+ }
+ }
+
+ return nil
+
+}
+
+func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error {
+
+ glog.V(4).Infof("ReadDirAllEntries %s ...", path)
+
+ err := util.Retry("ReadDirAllEntries", func() error {
+ return filer_pb.ReadDirAllEntries(client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
+ entry := filer.FromPbEntry(string(path), pbEntry)
+ if IsHiddenSystemEntry(string(path), entry.Name()) {
+ return nil
+ }
+ if err := mc.doInsertEntry(context.Background(), entry); err != nil {
+ glog.V(0).Infof("read %s: %v", entry.FullPath, err)
+ return err
+ }
+ return nil
+ })
+ })
+
+ if err != nil {
+ err = fmt.Errorf("list %s: %v", path, err)
+ }
+ mc.markCachedFn(path)
+ return err
+}
+
+func IsHiddenSystemEntry(dir, name string) bool {
+ return dir == "/" && (name == "topics" || name == "etc")
+}
diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go
new file mode 100644
index 000000000..881fee08f
--- /dev/null
+++ b/weed/mount/meta_cache/meta_cache_subscribe.go
@@ -0,0 +1,68 @@
+package meta_cache
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
+
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+
+ for _, sig := range message.Signatures {
+ if sig == selfSignature && selfSignature != 0 {
+ return nil
+ }
+ }
+
+ dir := resp.Directory
+ var oldPath util.FullPath
+ var newEntry *filer.Entry
+ if message.OldEntry != nil {
+ oldPath = util.NewFullPath(dir, message.OldEntry.Name)
+ glog.V(4).Infof("deleting %v", oldPath)
+ }
+
+ if message.NewEntry != nil {
+ if message.NewParentPath != "" {
+ dir = message.NewParentPath
+ }
+ key := util.NewFullPath(dir, message.NewEntry.Name)
+ glog.V(4).Infof("creating %v", key)
+ newEntry = filer.FromPbEntry(dir, message.NewEntry)
+ }
+ err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
+ if err == nil {
+ if message.OldEntry != nil && message.NewEntry != nil {
+ oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
+ mc.invalidateFunc(oldKey, message.OldEntry)
+ if message.OldEntry.Name != message.NewEntry.Name {
+ newKey := util.NewFullPath(dir, message.NewEntry.Name)
+ mc.invalidateFunc(newKey, message.NewEntry)
+ }
+ } else if message.OldEntry == nil && message.NewEntry != nil {
+ // no need to invaalidate
+ } else if message.OldEntry != nil && message.NewEntry == nil {
+ oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
+ mc.invalidateFunc(oldKey, message.OldEntry)
+ }
+ }
+
+ return err
+
+ }
+
+ util.RetryForever("followMetaUpdates", func() error {
+ return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, selfSignature, processEventFn, true)
+ }, func(err error) bool {
+ glog.Errorf("follow metadata updates: %v", err)
+ return true
+ })
+
+ return nil
+}