diff options
| author | chrislu <chris.lu@gmail.com> | 2024-10-06 12:55:19 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-10-06 12:55:19 -0700 |
| commit | c0e36231ad1fcddc5e436d1419a2e4d99fb27002 (patch) | |
| tree | 3736cac44dfc94aa1b091cbcdbb4ab7618c73b9f /weed/mount/meta_cache | |
| parent | 44b275879bf8a67af35c0b3c6e5049d95b3df3df (diff) | |
| download | seaweedfs-c0e36231ad1fcddc5e436d1419a2e4d99fb27002.tar.xz seaweedfs-c0e36231ad1fcddc5e436d1419a2e4d99fb27002.zip | |
use only one metadata follow process
Diffstat (limited to 'weed/mount/meta_cache')
| -rw-r--r-- | weed/mount/meta_cache/meta_cache_subscribe.go | 41 |
1 files changed, 38 insertions, 3 deletions
diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go index d3bb27d08..e67045013 100644 --- a/weed/mount/meta_cache/meta_cache_subscribe.go +++ b/weed/mount/meta_cache/meta_cache_subscribe.go @@ -10,7 +10,42 @@ import ( "strings" ) -func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error { +type MetadataFollower struct { + PathPrefixToWatch string + ProcessEventFn func(resp *filer_pb.SubscribeMetadataResponse) error +} + +func mergeProceesors(mainProcessor func(resp *filer_pb.SubscribeMetadataResponse) error, followers ...*MetadataFollower) func(resp *filer_pb.SubscribeMetadataResponse) error { + return func(resp *filer_pb.SubscribeMetadataResponse) error { + + // build the full path + entry := resp.EventNotification.NewEntry + if entry == nil { + entry = resp.EventNotification.OldEntry + } + dir := resp.Directory + if resp.EventNotification.NewParentPath != "" { + dir = resp.EventNotification.NewParentPath + } + fp := util.NewFullPath(dir, entry.Name) + + for _, follower := range followers { + if strings.HasPrefix(string(fp), follower.PathPrefixToWatch) { + if err := follower.ProcessEventFn(resp); err != nil { + return err + } + } + } + return mainProcessor(resp) + } +} + +func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64, followers ...*MetadataFollower) error { + + var prefixes []string + for _, follower := range followers { + prefixes = append(prefixes, follower.PathPrefixToWatch) + } processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification @@ -69,7 +104,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil ClientEpoch: 1, SelfSignature: selfSignature, PathPrefix: prefix, - AdditionalPathPrefixes: nil, + AdditionalPathPrefixes: prefixes, DirectoriesToWatch: nil, StartTsNs: lastTsNs, StopTsNs: 0, @@ -77,7 +112,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil } util.RetryUntil("followMetaUpdates", func() error { metadataFollowOption.ClientEpoch++ - return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, processEventFn) + return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, mergeProceesors(processEventFn, followers...)) }, func(err error) bool { glog.Errorf("follow metadata updates: %v", err) return true |
