aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-10-06 12:55:19 -0700
committerchrislu <chris.lu@gmail.com>2024-10-06 12:55:19 -0700
commitc0e36231ad1fcddc5e436d1419a2e4d99fb27002 (patch)
tree3736cac44dfc94aa1b091cbcdbb4ab7618c73b9f
parent44b275879bf8a67af35c0b3c6e5049d95b3df3df (diff)
downloadseaweedfs-c0e36231ad1fcddc5e436d1419a2e4d99fb27002.tar.xz
seaweedfs-c0e36231ad1fcddc5e436d1419a2e4d99fb27002.zip
use only one metadata follow process
-rw-r--r--weed/mount/filer_conf.go48
-rw-r--r--weed/mount/meta_cache/meta_cache_subscribe.go41
-rw-r--r--weed/mount/weedfs.go6
3 files changed, 46 insertions, 49 deletions
diff --git a/weed/mount/filer_conf.go b/weed/mount/filer_conf.go
index 8b7d10dbc..bb5f33ce3 100644
--- a/weed/mount/filer_conf.go
+++ b/weed/mount/filer_conf.go
@@ -3,19 +3,15 @@ package mount
import (
"errors"
"fmt"
- "path/filepath"
- "sync/atomic"
- "time"
-
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "path/filepath"
)
-func (wfs *WFS) subscribeFilerConfEvents() (func(), error) {
- now := time.Now()
+func (wfs *WFS) subscribeFilerConfEvents() (*meta_cache.MetadataFollower, error) {
confDir := filer.DirectoryEtcSeaweedFS
confName := filer.FilerConfName
confFullName := filepath.Join(filer.DirectoryEtcSeaweedFS, filer.FilerConfName)
@@ -71,41 +67,9 @@ func (wfs *WFS) subscribeFilerConfEvents() (func(), error) {
return nil
}
-
- metadataFollowOption := &pb.MetadataFollowOption{
- ClientName: "fuse",
- ClientId: wfs.signature,
- ClientEpoch: 1,
- SelfSignature: 0,
- PathPrefix: confFullName,
- AdditionalPathPrefixes: nil,
- StartTsNs: now.UnixNano(),
- StopTsNs: 0,
- EventErrorType: pb.FatalOnError,
- }
-
- return func() {
- // sync new conf changes
- util.RetryUntil("followFilerConfChanges", func() error {
- metadataFollowOption.ClientEpoch++
- i := atomic.LoadInt32(&wfs.option.filerIndex)
- n := len(wfs.option.FilerAddresses)
- err = pb.FollowMetadata(wfs.option.FilerAddresses[i], wfs.option.GrpcDialOption, metadataFollowOption, processEventFn)
- if err == nil {
- atomic.StoreInt32(&wfs.option.filerIndex, i)
- return nil
- }
-
- i++
- if i >= int32(n) {
- i = 0
- }
-
- return err
- }, func(err error) bool {
- glog.V(0).Infof("fuse follow filer conf changes: %v", err)
- return true
- })
+ return &meta_cache.MetadataFollower{
+ PathPrefixToWatch: confFullName,
+ ProcessEventFn: processEventFn,
}, nil
}
diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go
index d3bb27d08..e67045013 100644
--- a/weed/mount/meta_cache/meta_cache_subscribe.go
+++ b/weed/mount/meta_cache/meta_cache_subscribe.go
@@ -10,7 +10,42 @@ import (
"strings"
)
-func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
+type MetadataFollower struct {
+ PathPrefixToWatch string
+ ProcessEventFn func(resp *filer_pb.SubscribeMetadataResponse) error
+}
+
+func mergeProceesors(mainProcessor func(resp *filer_pb.SubscribeMetadataResponse) error, followers ...*MetadataFollower) func(resp *filer_pb.SubscribeMetadataResponse) error {
+ return func(resp *filer_pb.SubscribeMetadataResponse) error {
+
+ // build the full path
+ entry := resp.EventNotification.NewEntry
+ if entry == nil {
+ entry = resp.EventNotification.OldEntry
+ }
+ dir := resp.Directory
+ if resp.EventNotification.NewParentPath != "" {
+ dir = resp.EventNotification.NewParentPath
+ }
+ fp := util.NewFullPath(dir, entry.Name)
+
+ for _, follower := range followers {
+ if strings.HasPrefix(string(fp), follower.PathPrefixToWatch) {
+ if err := follower.ProcessEventFn(resp); err != nil {
+ return err
+ }
+ }
+ }
+ return mainProcessor(resp)
+ }
+}
+
+func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64, followers ...*MetadataFollower) error {
+
+ var prefixes []string
+ for _, follower := range followers {
+ prefixes = append(prefixes, follower.PathPrefixToWatch)
+ }
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
@@ -69,7 +104,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
ClientEpoch: 1,
SelfSignature: selfSignature,
PathPrefix: prefix,
- AdditionalPathPrefixes: nil,
+ AdditionalPathPrefixes: prefixes,
DirectoriesToWatch: nil,
StartTsNs: lastTsNs,
StopTsNs: 0,
@@ -77,7 +112,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
}
util.RetryUntil("followMetaUpdates", func() error {
metadataFollowOption.ClientEpoch++
- return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, processEventFn)
+ return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, mergeProceesors(processEventFn, followers...))
}, func(err error) bool {
glog.Errorf("follow metadata updates: %v", err)
return true
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index 4f029bba8..ac665ce5e 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -141,15 +141,13 @@ func NewSeaweedFileSystem(option *Option) *WFS {
}
func (wfs *WFS) StartBackgroundTasks() error {
- fn, err := wfs.subscribeFilerConfEvents()
+ follower, err := wfs.subscribeFilerConfEvents()
if err != nil {
return err
}
- go fn()
-
startTime := time.Now()
- go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
+ go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), follower)
go wfs.loopCheckQuota()
return nil