diff options
| author | chrislu <chris.lu@gmail.com> | 2022-02-14 01:09:31 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-02-14 01:09:31 -0800 |
| commit | dbeeda812376eda39997cd814c3e7eefaf4ea686 (patch) | |
| tree | 18e989645dec87c977d59ea0fdaea8b073244b17 /weed/mount/meta_cache | |
| parent | 7286e525ad85dec877d506908a0ff35590b0f357 (diff) | |
| download | seaweedfs-dbeeda812376eda39997cd814c3e7eefaf4ea686.tar.xz seaweedfs-dbeeda812376eda39997cd814c3e7eefaf4ea686.zip | |
listen for metadata updates
Diffstat (limited to 'weed/mount/meta_cache')
| -rw-r--r-- | weed/mount/meta_cache/cache_config.go | 32 | ||||
| -rw-r--r-- | weed/mount/meta_cache/id_mapper.go | 101 | ||||
| -rw-r--r-- | weed/mount/meta_cache/meta_cache.go | 160 | ||||
| -rw-r--r-- | weed/mount/meta_cache/meta_cache_init.go | 67 | ||||
| -rw-r--r-- | weed/mount/meta_cache/meta_cache_subscribe.go | 68 |
5 files changed, 428 insertions, 0 deletions
diff --git a/weed/mount/meta_cache/cache_config.go b/weed/mount/meta_cache/cache_config.go new file mode 100644 index 000000000..e6593ebde --- /dev/null +++ b/weed/mount/meta_cache/cache_config.go @@ -0,0 +1,32 @@ +package meta_cache + +import "github.com/chrislusf/seaweedfs/weed/util" + +var ( + _ = util.Configuration(&cacheConfig{}) +) + +// implementing util.Configuraion +type cacheConfig struct { + dir string +} + +func (c cacheConfig) GetString(key string) string { + return c.dir +} + +func (c cacheConfig) GetBool(key string) bool { + panic("implement me") +} + +func (c cacheConfig) GetInt(key string) int { + panic("implement me") +} + +func (c cacheConfig) GetStringSlice(key string) []string { + panic("implement me") +} + +func (c cacheConfig) SetDefault(key string, value interface{}) { + panic("implement me") +} diff --git a/weed/mount/meta_cache/id_mapper.go b/weed/mount/meta_cache/id_mapper.go new file mode 100644 index 000000000..4a2179f31 --- /dev/null +++ b/weed/mount/meta_cache/id_mapper.go @@ -0,0 +1,101 @@ +package meta_cache + +import ( + "fmt" + "strconv" + "strings" +) + +type UidGidMapper struct { + uidMapper *IdMapper + gidMapper *IdMapper +} + +type IdMapper struct { + localToFiler map[uint32]uint32 + filerToLocal map[uint32]uint32 +} + +// UidGidMapper translates local uid/gid to filer uid/gid +// The local storage always persists the same as the filer. +// The local->filer translation happens when updating the filer first and later saving to meta_cache. +// And filer->local happens when reading from the meta_cache. +func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) { + uidMapper, err := newIdMapper(uidPairsStr) + if err != nil { + return nil, err + } + gidMapper, err := newIdMapper(gidPairStr) + if err != nil { + return nil, err + } + + return &UidGidMapper{ + uidMapper: uidMapper, + gidMapper: gidMapper, + }, nil +} + +func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) { + return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid) +} +func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) { + return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid) +} + +func (m *IdMapper) LocalToFiler(id uint32) uint32 { + value, found := m.localToFiler[id] + if found { + return value + } + return id +} +func (m *IdMapper) FilerToLocal(id uint32) uint32 { + value, found := m.filerToLocal[id] + if found { + return value + } + return id +} + +func newIdMapper(pairsStr string) (*IdMapper, error) { + + localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr) + if err != nil { + return nil, err + } + + return &IdMapper{ + localToFiler: localToFiler, + filerToLocal: filerToLocal, + }, nil + +} + +func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) { + + if pairsStr == "" { + return + } + + localToFiler = make(map[uint32]uint32) + filerToLocal = make(map[uint32]uint32) + for _, pairStr := range strings.Split(pairsStr, ",") { + pair := strings.Split(pairStr, ":") + localUidStr, filerUidStr := pair[0], pair[1] + localUid, localUidErr := strconv.Atoi(localUidStr) + if localUidErr != nil { + err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr) + return + } + filerUid, filerUidErr := strconv.Atoi(filerUidStr) + if filerUidErr != nil { + err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr) + return + } + localToFiler[uint32(localUid)] = uint32(filerUid) + filerToLocal[uint32(filerUid)] = uint32(localUid) + } + + return +} diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go new file mode 100644 index 000000000..7f997c5b0 --- /dev/null +++ b/weed/mount/meta_cache/meta_cache.go @@ -0,0 +1,160 @@ +package meta_cache + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/leveldb" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "os" +) + +// need to have logic similar to FilerStoreWrapper +// e.g. fill fileId field for chunks + +type MetaCache struct { + localStore filer.VirtualFilerStore + // sync.RWMutex + uidGidMapper *UidGidMapper + markCachedFn func(fullpath util.FullPath) + isCachedFn func(fullpath util.FullPath) bool + invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry) +} + +func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache { + return &MetaCache{ + localStore: openMetaStore(dbFolder), + markCachedFn: markCachedFn, + isCachedFn: isCachedFn, + uidGidMapper: uidGidMapper, + invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) { + invalidateFunc(fullpath, entry) + }, + } +} + +func openMetaStore(dbFolder string) filer.VirtualFilerStore { + + os.RemoveAll(dbFolder) + os.MkdirAll(dbFolder, 0755) + + store := &leveldb.LevelDBStore{} + config := &cacheConfig{ + dir: dbFolder, + } + + if err := store.Initialize(config, ""); err != nil { + glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err) + } + + return filer.NewFilerStoreWrapper(store) + +} + +func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error { + //mc.Lock() + //defer mc.Unlock() + return mc.doInsertEntry(ctx, entry) +} + +func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error { + return mc.localStore.InsertEntry(ctx, entry) +} + +func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error { + //mc.Lock() + //defer mc.Unlock() + + oldDir, _ := oldPath.DirAndName() + if mc.isCachedFn(util.FullPath(oldDir)) { + if oldPath != "" { + if newEntry != nil && oldPath == newEntry.FullPath { + // skip the unnecessary deletion + // leave the update to the following InsertEntry operation + } else { + glog.V(3).Infof("DeleteEntry %s", oldPath) + if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil { + return err + } + } + } + } else { + // println("unknown old directory:", oldDir) + } + + if newEntry != nil { + newDir, _ := newEntry.DirAndName() + if mc.isCachedFn(util.FullPath(newDir)) { + glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name()) + if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil { + return err + } + } + } + return nil +} + +func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error { + //mc.Lock() + //defer mc.Unlock() + return mc.localStore.UpdateEntry(ctx, entry) +} + +func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) { + //mc.RLock() + //defer mc.RUnlock() + entry, err = mc.localStore.FindEntry(ctx, fp) + if err != nil { + return nil, err + } + mc.mapIdFromFilerToLocal(entry) + return +} + +func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { + //mc.Lock() + //defer mc.Unlock() + return mc.localStore.DeleteEntry(ctx, fp) +} +func (mc *MetaCache) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) { + //mc.Lock() + //defer mc.Unlock() + return mc.localStore.DeleteFolderChildren(ctx, fp) +} + +func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error { + //mc.RLock() + //defer mc.RUnlock() + + if !mc.isCachedFn(dirPath) { + // if this request comes after renaming, it should be fine + glog.Warningf("unsynchronized dir: %v", dirPath) + } + + _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool { + mc.mapIdFromFilerToLocal(entry) + return eachEntryFunc(entry) + }) + if err != nil { + return err + } + return err +} + +func (mc *MetaCache) Shutdown() { + //mc.Lock() + //defer mc.Unlock() + mc.localStore.Shutdown() +} + +func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) { + entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid) +} + +func (mc *MetaCache) Debug() { + if debuggable, ok := mc.localStore.(filer.Debuggable); ok { + println("start debugging") + debuggable.Debug(os.Stderr) + } +} diff --git a/weed/mount/meta_cache/meta_cache_init.go b/weed/mount/meta_cache/meta_cache_init.go new file mode 100644 index 000000000..cd9c71668 --- /dev/null +++ b/weed/mount/meta_cache/meta_cache_init.go @@ -0,0 +1,67 @@ +package meta_cache + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error { + + for { + + // the directory children are already cached + // so no need for this and upper directories + if mc.isCachedFn(dirPath) { + return nil + } + + if err := doEnsureVisited(mc, client, dirPath); err != nil { + return err + } + + // continue to parent directory + if dirPath != "/" { + parent, _ := dirPath.DirAndName() + dirPath = util.FullPath(parent) + } else { + break + } + } + + return nil + +} + +func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error { + + glog.V(4).Infof("ReadDirAllEntries %s ...", path) + + err := util.Retry("ReadDirAllEntries", func() error { + return filer_pb.ReadDirAllEntries(client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error { + entry := filer.FromPbEntry(string(path), pbEntry) + if IsHiddenSystemEntry(string(path), entry.Name()) { + return nil + } + if err := mc.doInsertEntry(context.Background(), entry); err != nil { + glog.V(0).Infof("read %s: %v", entry.FullPath, err) + return err + } + return nil + }) + }) + + if err != nil { + err = fmt.Errorf("list %s: %v", path, err) + } + mc.markCachedFn(path) + return err +} + +func IsHiddenSystemEntry(dir, name string) bool { + return dir == "/" && (name == "topics" || name == "etc") +} diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go new file mode 100644 index 000000000..881fee08f --- /dev/null +++ b/weed/mount/meta_cache/meta_cache_subscribe.go @@ -0,0 +1,68 @@ +package meta_cache + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error { + + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + + for _, sig := range message.Signatures { + if sig == selfSignature && selfSignature != 0 { + return nil + } + } + + dir := resp.Directory + var oldPath util.FullPath + var newEntry *filer.Entry + if message.OldEntry != nil { + oldPath = util.NewFullPath(dir, message.OldEntry.Name) + glog.V(4).Infof("deleting %v", oldPath) + } + + if message.NewEntry != nil { + if message.NewParentPath != "" { + dir = message.NewParentPath + } + key := util.NewFullPath(dir, message.NewEntry.Name) + glog.V(4).Infof("creating %v", key) + newEntry = filer.FromPbEntry(dir, message.NewEntry) + } + err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry) + if err == nil { + if message.OldEntry != nil && message.NewEntry != nil { + oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) + mc.invalidateFunc(oldKey, message.OldEntry) + if message.OldEntry.Name != message.NewEntry.Name { + newKey := util.NewFullPath(dir, message.NewEntry.Name) + mc.invalidateFunc(newKey, message.NewEntry) + } + } else if message.OldEntry == nil && message.NewEntry != nil { + // no need to invaalidate + } else if message.OldEntry != nil && message.NewEntry == nil { + oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) + mc.invalidateFunc(oldKey, message.OldEntry) + } + } + + return err + + } + + util.RetryForever("followMetaUpdates", func() error { + return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, selfSignature, processEventFn, true) + }, func(err error) bool { + glog.Errorf("follow metadata updates: %v", err) + return true + }) + + return nil +} |
