diff options
Diffstat (limited to 'weed/filesys/meta_cache')
| -rw-r--r-- | weed/filesys/meta_cache/cache_config.go | 32 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/id_mapper.go | 101 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache.go | 154 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache_init.go | 47 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache_subscribe.go | 68 |
5 files changed, 0 insertions, 402 deletions
diff --git a/weed/filesys/meta_cache/cache_config.go b/weed/filesys/meta_cache/cache_config.go deleted file mode 100644 index e6593ebde..000000000 --- a/weed/filesys/meta_cache/cache_config.go +++ /dev/null @@ -1,32 +0,0 @@ -package meta_cache - -import "github.com/chrislusf/seaweedfs/weed/util" - -var ( - _ = util.Configuration(&cacheConfig{}) -) - -// implementing util.Configuraion -type cacheConfig struct { - dir string -} - -func (c cacheConfig) GetString(key string) string { - return c.dir -} - -func (c cacheConfig) GetBool(key string) bool { - panic("implement me") -} - -func (c cacheConfig) GetInt(key string) int { - panic("implement me") -} - -func (c cacheConfig) GetStringSlice(key string) []string { - panic("implement me") -} - -func (c cacheConfig) SetDefault(key string, value interface{}) { - panic("implement me") -} diff --git a/weed/filesys/meta_cache/id_mapper.go b/weed/filesys/meta_cache/id_mapper.go deleted file mode 100644 index 4a2179f31..000000000 --- a/weed/filesys/meta_cache/id_mapper.go +++ /dev/null @@ -1,101 +0,0 @@ -package meta_cache - -import ( - "fmt" - "strconv" - "strings" -) - -type UidGidMapper struct { - uidMapper *IdMapper - gidMapper *IdMapper -} - -type IdMapper struct { - localToFiler map[uint32]uint32 - filerToLocal map[uint32]uint32 -} - -// UidGidMapper translates local uid/gid to filer uid/gid -// The local storage always persists the same as the filer. -// The local->filer translation happens when updating the filer first and later saving to meta_cache. -// And filer->local happens when reading from the meta_cache. -func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) { - uidMapper, err := newIdMapper(uidPairsStr) - if err != nil { - return nil, err - } - gidMapper, err := newIdMapper(gidPairStr) - if err != nil { - return nil, err - } - - return &UidGidMapper{ - uidMapper: uidMapper, - gidMapper: gidMapper, - }, nil -} - -func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) { - return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid) -} -func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) { - return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid) -} - -func (m *IdMapper) LocalToFiler(id uint32) uint32 { - value, found := m.localToFiler[id] - if found { - return value - } - return id -} -func (m *IdMapper) FilerToLocal(id uint32) uint32 { - value, found := m.filerToLocal[id] - if found { - return value - } - return id -} - -func newIdMapper(pairsStr string) (*IdMapper, error) { - - localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr) - if err != nil { - return nil, err - } - - return &IdMapper{ - localToFiler: localToFiler, - filerToLocal: filerToLocal, - }, nil - -} - -func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) { - - if pairsStr == "" { - return - } - - localToFiler = make(map[uint32]uint32) - filerToLocal = make(map[uint32]uint32) - for _, pairStr := range strings.Split(pairsStr, ",") { - pair := strings.Split(pairStr, ":") - localUidStr, filerUidStr := pair[0], pair[1] - localUid, localUidErr := strconv.Atoi(localUidStr) - if localUidErr != nil { - err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr) - return - } - filerUid, filerUidErr := strconv.Atoi(filerUidStr) - if filerUidErr != nil { - err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr) - return - } - localToFiler[uint32(localUid)] = uint32(filerUid) - filerToLocal[uint32(filerUid)] = uint32(localUid) - } - - return -} diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go deleted file mode 100644 index dc8e6838f..000000000 --- a/weed/filesys/meta_cache/meta_cache.go +++ /dev/null @@ -1,154 +0,0 @@ -package meta_cache - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/filer/leveldb" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/bounded_tree" - "os" -) - -// need to have logic similar to FilerStoreWrapper -// e.g. fill fileId field for chunks - -type MetaCache struct { - localStore filer.VirtualFilerStore - // sync.RWMutex - visitedBoundary *bounded_tree.BoundedTree - uidGidMapper *UidGidMapper - invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry) -} - -func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache { - return &MetaCache{ - localStore: openMetaStore(dbFolder), - visitedBoundary: bounded_tree.NewBoundedTree(baseDir), - uidGidMapper: uidGidMapper, - invalidateFunc: func(fullpath util.FullPath, entry *filer_pb.Entry) { - invalidateFunc(fullpath, entry) - }, - } -} - -func openMetaStore(dbFolder string) filer.VirtualFilerStore { - - os.RemoveAll(dbFolder) - os.MkdirAll(dbFolder, 0755) - - store := &leveldb.LevelDBStore{} - config := &cacheConfig{ - dir: dbFolder, - } - - if err := store.Initialize(config, ""); err != nil { - glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err) - } - - return filer.NewFilerStoreWrapper(store) - -} - -func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error { - //mc.Lock() - //defer mc.Unlock() - return mc.doInsertEntry(ctx, entry) -} - -func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error { - return mc.localStore.InsertEntry(ctx, entry) -} - -func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error { - //mc.Lock() - //defer mc.Unlock() - - oldDir, _ := oldPath.DirAndName() - if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) { - if oldPath != "" { - if newEntry != nil && oldPath == newEntry.FullPath { - // skip the unnecessary deletion - // leave the update to the following InsertEntry operation - } else { - glog.V(3).Infof("DeleteEntry %s", oldPath) - if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil { - return err - } - } - } - } else { - // println("unknown old directory:", oldDir) - } - - if newEntry != nil { - newDir, _ := newEntry.DirAndName() - if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) { - glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name()) - if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil { - return err - } - } - } - return nil -} - -func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error { - //mc.Lock() - //defer mc.Unlock() - return mc.localStore.UpdateEntry(ctx, entry) -} - -func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) { - //mc.RLock() - //defer mc.RUnlock() - entry, err = mc.localStore.FindEntry(ctx, fp) - if err != nil { - return nil, err - } - mc.mapIdFromFilerToLocal(entry) - return -} - -func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { - //mc.Lock() - //defer mc.Unlock() - return mc.localStore.DeleteEntry(ctx, fp) -} - -func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error { - //mc.RLock() - //defer mc.RUnlock() - - if !mc.visitedBoundary.HasVisited(dirPath) { - // if this request comes after renaming, it should be fine - glog.Warningf("unsynchronized dir: %v", dirPath) - } - - _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool { - mc.mapIdFromFilerToLocal(entry) - return eachEntryFunc(entry) - }) - if err != nil { - return err - } - return err -} - -func (mc *MetaCache) Shutdown() { - //mc.Lock() - //defer mc.Unlock() - mc.localStore.Shutdown() -} - -func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) { - entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid) -} - -func (mc *MetaCache) Debug() { - if debuggable, ok := mc.localStore.(filer.Debuggable); ok { - println("start debugging") - debuggable.Debug(os.Stderr) - } -} diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go deleted file mode 100644 index 07098bf6b..000000000 --- a/weed/filesys/meta_cache/meta_cache_init.go +++ /dev/null @@ -1,47 +0,0 @@ -package meta_cache - -import ( - "context" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error { - - return mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) { - - glog.V(4).Infof("ReadDirAllEntries %s ...", path) - - util.Retry("ReadDirAllEntries", func() error { - err = filer_pb.ReadDirAllEntries(client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error { - entry := filer.FromPbEntry(string(path), pbEntry) - if IsHiddenSystemEntry(string(path), entry.Name()) { - return nil - } - if err := mc.doInsertEntry(context.Background(), entry); err != nil { - glog.V(0).Infof("read %s: %v", entry.FullPath, err) - return err - } - if entry.IsDirectory() { - childDirectories = append(childDirectories, entry.Name()) - } - return nil - }) - return err - }) - - if err != nil { - err = fmt.Errorf("list %s: %v", path, err) - } - - return - }) -} - -func IsHiddenSystemEntry(dir, name string) bool { - return dir == "/" && (name == "topics" || name == "etc") -} diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go deleted file mode 100644 index 881fee08f..000000000 --- a/weed/filesys/meta_cache/meta_cache_subscribe.go +++ /dev/null @@ -1,68 +0,0 @@ -package meta_cache - -import ( - "context" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error { - - processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { - message := resp.EventNotification - - for _, sig := range message.Signatures { - if sig == selfSignature && selfSignature != 0 { - return nil - } - } - - dir := resp.Directory - var oldPath util.FullPath - var newEntry *filer.Entry - if message.OldEntry != nil { - oldPath = util.NewFullPath(dir, message.OldEntry.Name) - glog.V(4).Infof("deleting %v", oldPath) - } - - if message.NewEntry != nil { - if message.NewParentPath != "" { - dir = message.NewParentPath - } - key := util.NewFullPath(dir, message.NewEntry.Name) - glog.V(4).Infof("creating %v", key) - newEntry = filer.FromPbEntry(dir, message.NewEntry) - } - err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry) - if err == nil { - if message.OldEntry != nil && message.NewEntry != nil { - oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) - mc.invalidateFunc(oldKey, message.OldEntry) - if message.OldEntry.Name != message.NewEntry.Name { - newKey := util.NewFullPath(dir, message.NewEntry.Name) - mc.invalidateFunc(newKey, message.NewEntry) - } - } else if message.OldEntry == nil && message.NewEntry != nil { - // no need to invaalidate - } else if message.OldEntry != nil && message.NewEntry == nil { - oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) - mc.invalidateFunc(oldKey, message.OldEntry) - } - } - - return err - - } - - util.RetryForever("followMetaUpdates", func() error { - return pb.WithFilerClientFollowMetadata(client, "mount", selfSignature, dir, &lastTsNs, selfSignature, processEventFn, true) - }, func(err error) bool { - glog.Errorf("follow metadata updates: %v", err) - return true - }) - - return nil -} |
