aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/filer_conf.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount/filer_conf.go')
-rw-r--r--weed/mount/filer_conf.go118
1 files changed, 118 insertions, 0 deletions
diff --git a/weed/mount/filer_conf.go b/weed/mount/filer_conf.go
new file mode 100644
index 000000000..a08d766e8
--- /dev/null
+++ b/weed/mount/filer_conf.go
@@ -0,0 +1,118 @@
+package mount
+
+import (
+ "fmt"
+ "path/filepath"
+ "sync/atomic"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+func (wfs *WFS) subscribeFilerConfEvents() (func(), error) {
+ now := time.Now()
+ confDir := filer.DirectoryEtcSeaweedFS
+ confName := filer.FilerConfName
+ confFullName := filepath.Join(filer.DirectoryEtcSeaweedFS, filer.FilerConfName)
+
+ // read current conf
+ err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ content, err := filer.ReadInsideFiler(client, confDir, confName)
+ if err != nil {
+ return err
+ }
+
+ fc := filer.NewFilerConf()
+ if len(content) > 0 {
+ if err := fc.LoadFromBytes(content); err != nil {
+ return fmt.Errorf("parse %s: %v", confFullName, err)
+ }
+ }
+
+ wfs.FilerConf = fc
+
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ if message.NewEntry == nil {
+ return nil
+ }
+
+ dir := resp.Directory
+ name := resp.EventNotification.NewEntry.Name
+
+ if dir != confDir || name != confName {
+ return nil
+ }
+
+ content := message.NewEntry.Content
+ fc := filer.NewFilerConf()
+ if len(content) > 0 {
+ if err = fc.LoadFromBytes(content); err != nil {
+ return fmt.Errorf("parse %s: %v", confFullName, err)
+ }
+ }
+
+ wfs.FilerConf = fc
+
+ return nil
+ }
+
+ metadataFollowOption := &pb.MetadataFollowOption{
+ ClientName: "fuse",
+ ClientId: wfs.signature,
+ ClientEpoch: 1,
+ SelfSignature: 0,
+ PathPrefix: confFullName,
+ AdditionalPathPrefixes: nil,
+ StartTsNs: now.UnixNano(),
+ StopTsNs: 0,
+ EventErrorType: pb.FatalOnError,
+ }
+
+ return func() {
+ // sync new conf changes
+ util.RetryUntil("followFilerConfChanges", func() error {
+ metadataFollowOption.ClientEpoch++
+ i := atomic.LoadInt32(&wfs.option.filerIndex)
+ n := len(wfs.option.FilerAddresses)
+ err = pb.FollowMetadata(wfs.option.FilerAddresses[i], wfs.option.GrpcDialOption, metadataFollowOption, processEventFn)
+ if err == nil {
+ atomic.StoreInt32(&wfs.option.filerIndex, i)
+ return nil
+ }
+
+ i++
+ if i >= int32(n) {
+ i = 0
+ }
+
+ return err
+ }, func(err error) bool {
+ glog.V(0).Infof("fuse follow filer conf changes: %v", err)
+ return true
+ })
+ }, nil
+}
+
+func (wfs *WFS) wormEnabledForEntry(path util.FullPath, entry *filer_pb.Entry) bool {
+ if entry == nil || entry.Attributes == nil {
+ return false
+ }
+
+ rule := wfs.FilerConf.MatchStorageRule(string(path))
+ if !rule.Worm {
+ return false
+ }
+
+ return entry.Attributes.FileSize > 0 || entry.Attributes.Crtime != entry.Attributes.Mtime
+}