aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys/meta_cache
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys/meta_cache')
-rw-r--r--weed/filesys/meta_cache/cache_config.go32
-rw-r--r--weed/filesys/meta_cache/meta_cache.go93
-rw-r--r--weed/filesys/meta_cache/meta_cache_init.go21
-rw-r--r--weed/filesys/meta_cache/meta_cache_subscribe.go69
4 files changed, 215 insertions, 0 deletions
diff --git a/weed/filesys/meta_cache/cache_config.go b/weed/filesys/meta_cache/cache_config.go
new file mode 100644
index 000000000..e6593ebde
--- /dev/null
+++ b/weed/filesys/meta_cache/cache_config.go
@@ -0,0 +1,32 @@
+package meta_cache
+
+import "github.com/chrislusf/seaweedfs/weed/util"
+
+var (
+ _ = util.Configuration(&cacheConfig{})
+)
+
+// implementing util.Configuraion
+type cacheConfig struct {
+ dir string
+}
+
+func (c cacheConfig) GetString(key string) string {
+ return c.dir
+}
+
+func (c cacheConfig) GetBool(key string) bool {
+ panic("implement me")
+}
+
+func (c cacheConfig) GetInt(key string) int {
+ panic("implement me")
+}
+
+func (c cacheConfig) GetStringSlice(key string) []string {
+ panic("implement me")
+}
+
+func (c cacheConfig) SetDefault(key string, value interface{}) {
+ panic("implement me")
+}
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
new file mode 100644
index 000000000..4c9090d42
--- /dev/null
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -0,0 +1,93 @@
+package meta_cache
+
+import (
+ "context"
+ "os"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type MetaCache struct {
+ actualStore filer2.FilerStore
+ sync.RWMutex
+}
+
+func NewMetaCache(dbFolder string) *MetaCache {
+ return &MetaCache{
+ actualStore: openMetaStore(dbFolder),
+ }
+}
+
+func openMetaStore(dbFolder string) filer2.FilerStore {
+
+ os.RemoveAll(dbFolder)
+ os.MkdirAll(dbFolder, 0755)
+
+ store := &leveldb.LevelDBStore{}
+ config := &cacheConfig{
+ dir: dbFolder,
+ }
+
+ if err := store.Initialize(config, ""); err != nil {
+ glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
+ }
+
+ return store
+
+}
+
+func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error {
+ mc.Lock()
+ defer mc.Unlock()
+ return mc.actualStore.InsertEntry(ctx, entry)
+}
+
+func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPath, newEntry *filer2.Entry) error {
+ mc.Lock()
+ defer mc.Unlock()
+ if oldPath != "" {
+ if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil {
+ return err
+ }
+ }
+ if newEntry != nil {
+ if err := mc.actualStore.InsertEntry(ctx, newEntry); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer2.Entry) error {
+ mc.Lock()
+ defer mc.Unlock()
+ return mc.actualStore.UpdateEntry(ctx, entry)
+}
+
+func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer2.Entry, err error) {
+ mc.RLock()
+ defer mc.RUnlock()
+ return mc.actualStore.FindEntry(ctx, fp)
+}
+
+func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
+ mc.Lock()
+ defer mc.Unlock()
+ return mc.actualStore.DeleteEntry(ctx, fp)
+}
+
+func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer2.Entry, error) {
+ mc.RLock()
+ defer mc.RUnlock()
+ return mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+}
+
+func (mc *MetaCache) Shutdown() {
+ mc.Lock()
+ defer mc.Unlock()
+ mc.actualStore.Shutdown()
+}
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
new file mode 100644
index 000000000..58bf6862e
--- /dev/null
+++ b/weed/filesys/meta_cache/meta_cache_init.go
@@ -0,0 +1,21 @@
+package meta_cache
+
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient, path string) error {
+ glog.V(0).Infof("synchronizing meta data ...")
+ filer_pb.TraverseBfs(client, util.FullPath(path), func(parentPath util.FullPath, pbEntry *filer_pb.Entry) {
+ entry := filer2.FromPbEntry(string(parentPath), pbEntry)
+ if err := mc.InsertEntry(context.Background(), entry); err != nil {
+ glog.V(0).Infof("read %s: %v", entry.FullPath, err)
+ }
+ })
+ return nil
+}
diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go
new file mode 100644
index 000000000..2e411a48a
--- /dev/null
+++ b/weed/filesys/meta_cache/meta_cache_subscribe.go
@@ -0,0 +1,69 @@
+package meta_cache
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
+
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ var oldPath util.FullPath
+ var newEntry *filer2.Entry
+ if message.OldEntry != nil {
+ oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name)
+ glog.V(4).Infof("deleting %v", oldPath)
+ }
+
+ if message.NewEntry != nil {
+ dir := resp.Directory
+ if message.NewParentPath != "" {
+ dir = message.NewParentPath
+ }
+ key := util.NewFullPath(dir, message.NewEntry.Name)
+ glog.V(4).Infof("creating %v", key)
+ newEntry = filer2.FromPbEntry(dir, message.NewEntry)
+ }
+ return mc.AtomicUpdateEntry(context.Background(), oldPath, newEntry)
+ }
+
+ for {
+ err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
+ ClientName: "mount",
+ PathPrefix: dir,
+ SinceNs: lastTsNs,
+ })
+ if err != nil {
+ return fmt.Errorf("subscribe: %v", err)
+ }
+
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+
+ if err := processEventFn(resp); err != nil {
+ return fmt.Errorf("process %v: %v", resp, err)
+ }
+ lastTsNs = resp.TsNs
+ }
+ })
+ if err != nil {
+ glog.V(0).Infof("subscribing filer meta change: %v", err)
+ time.Sleep(time.Second)
+ }
+ }
+}