diff options
Diffstat (limited to 'weed/mount/filer_conf.go')
| -rw-r--r-- | weed/mount/filer_conf.go | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/weed/mount/filer_conf.go b/weed/mount/filer_conf.go new file mode 100644 index 000000000..a08d766e8 --- /dev/null +++ b/weed/mount/filer_conf.go @@ -0,0 +1,118 @@ +package mount + +import ( + "fmt" + "path/filepath" + "sync/atomic" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func (wfs *WFS) subscribeFilerConfEvents() (func(), error) { + now := time.Now() + confDir := filer.DirectoryEtcSeaweedFS + confName := filer.FilerConfName + confFullName := filepath.Join(filer.DirectoryEtcSeaweedFS, filer.FilerConfName) + + // read current conf + err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + content, err := filer.ReadInsideFiler(client, confDir, confName) + if err != nil { + return err + } + + fc := filer.NewFilerConf() + if len(content) > 0 { + if err := fc.LoadFromBytes(content); err != nil { + return fmt.Errorf("parse %s: %v", confFullName, err) + } + } + + wfs.FilerConf = fc + + return nil + }) + if err != nil { + return nil, err + } + + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if message.NewEntry == nil { + return nil + } + + dir := resp.Directory + name := resp.EventNotification.NewEntry.Name + + if dir != confDir || name != confName { + return nil + } + + content := message.NewEntry.Content + fc := filer.NewFilerConf() + if len(content) > 0 { + if err = fc.LoadFromBytes(content); err != nil { + return fmt.Errorf("parse %s: %v", confFullName, err) + } + } + + wfs.FilerConf = fc + + return nil + } + + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "fuse", + ClientId: wfs.signature, + ClientEpoch: 1, + SelfSignature: 0, + PathPrefix: confFullName, + AdditionalPathPrefixes: nil, + StartTsNs: now.UnixNano(), + StopTsNs: 0, + EventErrorType: pb.FatalOnError, + } + + return func() { + // sync new conf changes + util.RetryUntil("followFilerConfChanges", func() error { + metadataFollowOption.ClientEpoch++ + i := atomic.LoadInt32(&wfs.option.filerIndex) + n := len(wfs.option.FilerAddresses) + err = pb.FollowMetadata(wfs.option.FilerAddresses[i], wfs.option.GrpcDialOption, metadataFollowOption, processEventFn) + if err == nil { + atomic.StoreInt32(&wfs.option.filerIndex, i) + return nil + } + + i++ + if i >= int32(n) { + i = 0 + } + + return err + }, func(err error) bool { + glog.V(0).Infof("fuse follow filer conf changes: %v", err) + return true + }) + }, nil +} + +func (wfs *WFS) wormEnabledForEntry(path util.FullPath, entry *filer_pb.Entry) bool { + if entry == nil || entry.Attributes == nil { + return false + } + + rule := wfs.FilerConf.MatchStorageRule(string(path)) + if !rule.Worm { + return false + } + + return entry.Attributes.FileSize > 0 || entry.Attributes.Crtime != entry.Attributes.Mtime +} |
