aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys/meta_cache
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys/meta_cache')
-rw-r--r--weed/filesys/meta_cache/cache_config.go32
-rw-r--r--weed/filesys/meta_cache/id_mapper.go101
-rw-r--r--weed/filesys/meta_cache/meta_cache.go154
-rw-r--r--weed/filesys/meta_cache/meta_cache_init.go47
-rw-r--r--weed/filesys/meta_cache/meta_cache_subscribe.go68
5 files changed, 0 insertions, 402 deletions
diff --git a/weed/filesys/meta_cache/cache_config.go b/weed/filesys/meta_cache/cache_config.go
deleted file mode 100644
index e6593ebde..000000000
--- a/weed/filesys/meta_cache/cache_config.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package meta_cache
-
-import "github.com/chrislusf/seaweedfs/weed/util"
-
-var (
- _ = util.Configuration(&cacheConfig{})
-)
-
-// implementing util.Configuraion
-type cacheConfig struct {
- dir string
-}
-
-func (c cacheConfig) GetString(key string) string {
- return c.dir
-}
-
-func (c cacheConfig) GetBool(key string) bool {
- panic("implement me")
-}
-
-func (c cacheConfig) GetInt(key string) int {
- panic("implement me")
-}
-
-func (c cacheConfig) GetStringSlice(key string) []string {
- panic("implement me")
-}
-
-func (c cacheConfig) SetDefault(key string, value interface{}) {
- panic("implement me")
-}
diff --git a/weed/filesys/meta_cache/id_mapper.go b/weed/filesys/meta_cache/id_mapper.go
deleted file mode 100644
index 4a2179f31..000000000
--- a/weed/filesys/meta_cache/id_mapper.go
+++ /dev/null
@@ -1,101 +0,0 @@
-package meta_cache
-
-import (
- "fmt"
- "strconv"
- "strings"
-)
-
-type UidGidMapper struct {
- uidMapper *IdMapper
- gidMapper *IdMapper
-}
-
-type IdMapper struct {
- localToFiler map[uint32]uint32
- filerToLocal map[uint32]uint32
-}
-
-// UidGidMapper translates local uid/gid to filer uid/gid
-// The local storage always persists the same as the filer.
-// The local->filer translation happens when updating the filer first and later saving to meta_cache.
-// And filer->local happens when reading from the meta_cache.
-func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) {
- uidMapper, err := newIdMapper(uidPairsStr)
- if err != nil {
- return nil, err
- }
- gidMapper, err := newIdMapper(gidPairStr)
- if err != nil {
- return nil, err
- }
-
- return &UidGidMapper{
- uidMapper: uidMapper,
- gidMapper: gidMapper,
- }, nil
-}
-
-func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) {
- return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid)
-}
-func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) {
- return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid)
-}
-
-func (m *IdMapper) LocalToFiler(id uint32) uint32 {
- value, found := m.localToFiler[id]
- if found {
- return value
- }
- return id
-}
-func (m *IdMapper) FilerToLocal(id uint32) uint32 {
- value, found := m.filerToLocal[id]
- if found {
- return value
- }
- return id
-}
-
-func newIdMapper(pairsStr string) (*IdMapper, error) {
-
- localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr)
- if err != nil {
- return nil, err
- }
-
- return &IdMapper{
- localToFiler: localToFiler,
- filerToLocal: filerToLocal,
- }, nil
-
-}
-
-func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) {
-
- if pairsStr == "" {
- return
- }
-
- localToFiler = make(map[uint32]uint32)
- filerToLocal = make(map[uint32]uint32)
- for _, pairStr := range strings.Split(pairsStr, ",") {
- pair := strings.Split(pairStr, ":")
- localUidStr, filerUidStr := pair[0], pair[1]
- localUid, localUidErr := strconv.Atoi(localUidStr)
- if localUidErr != nil {
- err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr)
- return
- }
- filerUid, filerUidErr := strconv.Atoi(filerUidStr)
- if filerUidErr != nil {
- err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr)
- return
- }
- localToFiler[uint32(localUid)] = uint32(filerUid)
- filerToLocal[uint32(filerUid)] = uint32(localUid)
- }
-
- return
-}
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
deleted file mode 100644
index dc8e6838f..000000000
--- a/weed/filesys/meta_cache/meta_cache.go
+++ /dev/null
@@ -1,154 +0,0 @@
-package meta_cache
-
-import (
- "context"
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/util/bounded_tree"
- "os"
-)
-
-// need to have logic similar to FilerStoreWrapper
-// e.g. fill fileId field for chunks
-
-type MetaCache struct {
- localStore filer.VirtualFilerStore
- // sync.RWMutex
- visitedBoundary *bounded_tree.BoundedTree
- uidGidMapper *UidGidMapper
- invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
-}
-
-func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache {
- return &MetaCache{
- localStore: openMetaStore(dbFolder),
- visitedBoundary: bounded_tree.NewBoundedTree(baseDir),
- uidGidMapper: uidGidMapper,
- invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) {
- invalidateFunc(fullpath, entry)
- },
- }
-}
-
-func openMetaStore(dbFolder string) filer.VirtualFilerStore {
-
- os.RemoveAll(dbFolder)
- os.MkdirAll(dbFolder, 0755)
-
- store := &leveldb.LevelDBStore{}
- config := &cacheConfig{
- dir: dbFolder,
- }
-
- if err := store.Initialize(config, ""); err != nil {
- glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
- }
-
- return filer.NewFilerStoreWrapper(store)
-
-}
-
-func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
- //mc.Lock()
- //defer mc.Unlock()
- return mc.doInsertEntry(ctx, entry)
-}
-
-func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
- return mc.localStore.InsertEntry(ctx, entry)
-}
-
-func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
- //mc.Lock()
- //defer mc.Unlock()
-
- oldDir, _ := oldPath.DirAndName()
- if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) {
- if oldPath != "" {
- if newEntry != nil && oldPath == newEntry.FullPath {
- // skip the unnecessary deletion
- // leave the update to the following InsertEntry operation
- } else {
- glog.V(3).Infof("DeleteEntry %s", oldPath)
- if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
- return err
- }
- }
- }
- } else {
- // println("unknown old directory:", oldDir)
- }
-
- if newEntry != nil {
- newDir, _ := newEntry.DirAndName()
- if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) {
- glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name())
- if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
- return err
- }
- }
- }
- return nil
-}
-
-func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
- //mc.Lock()
- //defer mc.Unlock()
- return mc.localStore.UpdateEntry(ctx, entry)
-}
-
-func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
- //mc.RLock()
- //defer mc.RUnlock()
- entry, err = mc.localStore.FindEntry(ctx, fp)
- if err != nil {
- return nil, err
- }
- mc.mapIdFromFilerToLocal(entry)
- return
-}
-
-func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
- //mc.Lock()
- //defer mc.Unlock()
- return mc.localStore.DeleteEntry(ctx, fp)
-}
-
-func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
- //mc.RLock()
- //defer mc.RUnlock()
-
- if !mc.visitedBoundary.HasVisited(dirPath) {
- // if this request comes after renaming, it should be fine
- glog.Warningf("unsynchronized dir: %v", dirPath)
- }
-
- _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
- mc.mapIdFromFilerToLocal(entry)
- return eachEntryFunc(entry)
- })
- if err != nil {
- return err
- }
- return err
-}
-
-func (mc *MetaCache) Shutdown() {
- //mc.Lock()
- //defer mc.Unlock()
- mc.localStore.Shutdown()
-}
-
-func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) {
- entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid)
-}
-
-func (mc *MetaCache) Debug() {
- if debuggable, ok := mc.localStore.(filer.Debuggable); ok {
- println("start debugging")
- debuggable.Debug(os.Stderr)
- }
-}
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
deleted file mode 100644
index 07098bf6b..000000000
--- a/weed/filesys/meta_cache/meta_cache_init.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package meta_cache
-
-import (
- "context"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
-
- return mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {
-
- glog.V(4).Infof("ReadDirAllEntries %s ...", path)
-
- util.Retry("ReadDirAllEntries", func() error {
- err = filer_pb.ReadDirAllEntries(client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
- entry := filer.FromPbEntry(string(path), pbEntry)
- if IsHiddenSystemEntry(string(path), entry.Name()) {
- return nil
- }
- if err := mc.doInsertEntry(context.Background(), entry); err != nil {
- glog.V(0).Infof("read %s: %v", entry.FullPath, err)
- return err
- }
- if entry.IsDirectory() {
- childDirectories = append(childDirectories, entry.Name())
- }
- return nil
- })
- return err
- })
-
- if err != nil {
- err = fmt.Errorf("list %s: %v", path, err)
- }
-
- return
- })
-}
-
-func IsHiddenSystemEntry(dir, name string) bool {
- return dir == "/" && (name == "topics" || name == "etc")
-}
diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go
deleted file mode 100644
index 881fee08f..000000000
--- a/weed/filesys/meta_cache/meta_cache_subscribe.go
+++ /dev/null
@@ -1,68 +0,0 @@
-package meta_cache
-
-import (
- "context"
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
-
- processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
- message := resp.EventNotification
-
- for _, sig := range message.Signatures {
- if sig == selfSignature && selfSignature != 0 {
- return nil
- }
- }
-
- dir := resp.Directory
- var oldPath util.FullPath
- var newEntry *filer.Entry
- if message.OldEntry != nil {
- oldPath = util.NewFullPath(dir, message.OldEntry.Name)
- glog.V(4).Infof("deleting %v", oldPath)
- }
-
- if message.NewEntry != nil {
- if message.NewParentPath != "" {
- dir = message.NewParentPath
- }
- key := util.NewFullPath(dir, message.NewEntry.Name)
- glog.V(4).Infof("creating %v", key)
- newEntry = filer.FromPbEntry(dir, message.NewEntry)
- }
- err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
- if err == nil {
- if message.OldEntry != nil && message.NewEntry != nil {
- oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
- mc.invalidateFunc(oldKey, message.OldEntry)
- if message.OldEntry.Name != message.NewEntry.Name {
- newKey := util.NewFullPath(dir, message.NewEntry.Name)
- mc.invalidateFunc(newKey, message.NewEntry)
- }
- } else if message.OldEntry == nil && message.NewEntry != nil {
- // no need to invaalidate
- } else if message.OldEntry != nil && message.NewEntry == nil {
- oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
- mc.invalidateFunc(oldKey, message.OldEntry)
- }
- }
-
- return err
-
- }
-
- util.RetryForever("followMetaUpdates", func() error {
- return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, selfSignature, processEventFn, true)
- }, func(err error) bool {
- glog.Errorf("follow metadata updates: %v", err)
- return true
- })
-
- return nil
-}