aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-21 21:16:13 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-21 21:16:13 -0700
commite24b25de784daf42a15daf573249d608ebc2b44a (patch)
tree53418a50125f664565572aea8e7fa10950c90a12 /weed/filesys
parent4f02f7121d232507bbbba825fa241bc8d5e630ff (diff)
downloadseaweedfs-e24b25de784daf42a15daf573249d608ebc2b44a.tar.xz
seaweedfs-e24b25de784daf42a15daf573249d608ebc2b44a.zip
async meta caching: can stream updates now
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/meta_cache/meta_cache.go6
-rw-r--r--weed/filesys/meta_cache/meta_cache_init.go10
-rw-r--r--weed/filesys/meta_cache/meta_cache_subscribe.go71
-rw-r--r--weed/filesys/wfs.go8
4 files changed, 92 insertions, 3 deletions
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index 4f047e824..0aea69050 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -14,11 +14,11 @@ type MetaCache struct {
func NewMetaCache(dbFolder string) *MetaCache {
return &MetaCache{
- FilerStore: OpenMetaStore(dbFolder),
+ FilerStore: openMetaStore(dbFolder),
}
}
-func OpenMetaStore(dbFolder string) filer2.FilerStore {
+func openMetaStore(dbFolder string) filer2.FilerStore {
os.MkdirAll(dbFolder, 0755)
@@ -31,4 +31,4 @@ func OpenMetaStore(dbFolder string) filer2.FilerStore {
return store
-}
+} \ No newline at end of file
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
new file mode 100644
index 000000000..8591f190a
--- /dev/null
+++ b/weed/filesys/meta_cache/meta_cache_init.go
@@ -0,0 +1,10 @@
+package meta_cache
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient) error {
+
+ return nil
+}
diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go
new file mode 100644
index 000000000..da3085fcb
--- /dev/null
+++ b/weed/filesys/meta_cache/meta_cache_subscribe.go
@@ -0,0 +1,71 @@
+package meta_cache
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string) error {
+
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ ctx := context.Background()
+ var err error
+ if message.OldEntry != nil {
+ key := util.NewFullPath(resp.Directory, message.OldEntry.Name)
+ glog.V(4).Infof("deleting %v", key)
+ err = mc.DeleteEntry(ctx, key)
+ }
+
+ if message.NewEntry != nil {
+ dir := resp.Directory
+ if message.NewParentPath != "" {
+ dir = message.NewParentPath
+ }
+ key := util.NewFullPath(dir, message.NewEntry.Name)
+ glog.V(4).Infof("creating %v", key)
+ err = mc.InsertEntry(ctx, filer2.FromPbEntry(dir, message.NewEntry))
+ }
+ return err
+ }
+
+ var lastTsNs int64
+ for {
+ err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
+ ClientName: "mount",
+ PathPrefix: dir,
+ SinceNs: lastTsNs,
+ })
+ if err != nil {
+ return fmt.Errorf("subscribe: %v", err)
+ }
+
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+
+ if err := processEventFn(resp); err != nil {
+ return fmt.Errorf("process %v: %v", resp, err)
+ }
+ lastTsNs = resp.TsNs
+ }
+ })
+ if err != nil {
+ glog.V(0).Infof("subscribing filer meta change: %v", err)
+ time.Sleep(time.Second)
+ }
+ }
+}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index b27ac440d..e67e77613 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -95,6 +95,14 @@ func NewSeaweedFileSystem(option *Option) *WFS {
}
if wfs.option.AsyncMetaDataCaching {
wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta"))
+ if err := meta_cache.InitMetaCache(wfs.metaCache, wfs); err != nil{
+ glog.V(0).Infof("failed to init meta cache: %v", err)
+ } else {
+ go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath)
+ util.OnInterrupt(func() {
+ wfs.metaCache.Shutdown()
+ })
+ }
}
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}