aboutsummaryrefslogtreecommitdiff
path: root/weed/notification/webhook/webhook_queue.go
diff options
context:
space:
mode:
authorIbrahim Konsowa <imkonsowa@gmail.com>2025-07-15 21:49:37 +0400
committerGitHub <noreply@github.com>2025-07-15 10:49:37 -0700
commitd78aa3d2de1a18c9802f72c926efa8e80ff2fa64 (patch)
tree7980858236682e2cdb3f91f6ed3f9eeaf4843b47 /weed/notification/webhook/webhook_queue.go
parent74f4e9ba5ab691938e89b32017a89359ae49428f (diff)
downloadseaweedfs-d78aa3d2de1a18c9802f72c926efa8e80ff2fa64.tar.xz
seaweedfs-d78aa3d2de1a18c9802f72c926efa8e80ff2fa64.zip
[Notifications] Improving webhook notifications (#6965)
* worker setup * fix tests * start worker * graceful worker drain * retry queue * migrate queue to watermill * adding filters and improvements * add the event type to the webhook message * eliminating redundant JSON serialization * resolve review comments * trigger actions * fix tests * typo fixes * read max_backoff_seconds from config * add more context to the dead letter * close the http response on errors * drain the http response body in case not empty * eliminate exported typesπ
Diffstat (limited to 'weed/notification/webhook/webhook_queue.go')
-rw-r--r--weed/notification/webhook/webhook_queue.go200
1 files changed, 168 insertions, 32 deletions
diff --git a/weed/notification/webhook/webhook_queue.go b/weed/notification/webhook/webhook_queue.go
index d209b74e2..d8f9a0734 100644
--- a/weed/notification/webhook/webhook_queue.go
+++ b/weed/notification/webhook/webhook_queue.go
@@ -1,51 +1,81 @@
package webhook
import (
+ "context"
+ "errors"
"fmt"
- "net/url"
+ "time"
+ "github.com/ThreeDotsLabs/watermill"
+ "github.com/ThreeDotsLabs/watermill/message"
+ "github.com/ThreeDotsLabs/watermill/message/router/middleware"
+ "github.com/ThreeDotsLabs/watermill/message/router/plugin"
+ "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/notification"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/protobuf/proto"
)
-// client defines the interface for transport client
-// could be extended to support gRPC
-type client interface {
- sendMessage(key string, message proto.Message) error
-}
-
func init() {
- notification.MessageQueues = append(notification.MessageQueues, &WebhookQueue{})
+ notification.MessageQueues = append(notification.MessageQueues, &Queue{})
}
-type WebhookQueue struct {
- client client
+type Queue struct {
+ router *message.Router
+ queueChannel *gochannel.GoChannel
+ config *config
+ client client
+ filter *filter
+
+ ctx context.Context
+ cancel context.CancelFunc
}
-type config struct {
- endpoint string
- authBearerToken string
+func (w *Queue) GetName() string {
+ return queueName
}
-func (c *config) validate() error {
- _, err := url.Parse(c.endpoint)
+func (w *Queue) SendMessage(key string, msg proto.Message) error {
+ eventNotification, ok := msg.(*filer_pb.EventNotification)
+ if !ok {
+ return nil
+ }
+
+ if w.filter != nil && !w.filter.shouldPublish(key, eventNotification) {
+ return nil
+ }
+
+ m := newWebhookMessage(key, msg)
+ if m == nil {
+ return nil
+ }
+
+ wMsg, err := m.toWaterMillMessage()
if err != nil {
- return fmt.Errorf("invalid webhook endpoint %w", err)
+ return err
}
- return nil
+ return w.queueChannel.Publish(pubSubTopicName, wMsg)
}
-func (w *WebhookQueue) GetName() string {
- return "webhook"
+func (w *webhookMessage) toWaterMillMessage() (*message.Message, error) {
+ payload, err := proto.Marshal(w.Notification)
+ if err != nil {
+ return nil, err
+ }
+
+ msg := message.NewMessage(watermill.NewUUID(), payload)
+ // Set event type and key as metadata
+ msg.Metadata.Set("event_type", w.EventType)
+ msg.Metadata.Set("key", w.Key)
+
+ return msg, nil
}
-func (w *WebhookQueue) Initialize(configuration util.Configuration, prefix string) error {
- c := &config{
- endpoint: configuration.GetString(prefix + "endpoint"),
- authBearerToken: configuration.GetString(prefix + "bearer_token"),
- }
+func (w *Queue) Initialize(configuration util.Configuration, prefix string) error {
+ c := newConfigWithDefaults(configuration, prefix)
if err := c.validate(); err != nil {
return err
@@ -54,18 +84,124 @@ func (w *WebhookQueue) Initialize(configuration util.Configuration, prefix strin
return w.initialize(c)
}
-func (w *WebhookQueue) initialize(cfg *config) error {
- client, err := newHTTPClient(cfg)
+func (w *Queue) initialize(cfg *config) error {
+ w.ctx, w.cancel = context.WithCancel(context.Background())
+ w.config = cfg
+ w.filter = newFilter(cfg)
+
+ hClient, err := newHTTPClient(cfg)
+ if err != nil {
+ return fmt.Errorf("failed to create webhook http client: %w", err)
+ }
+ w.client = hClient
+
+ if err = w.setupWatermillQueue(cfg); err != nil {
+ return fmt.Errorf("failed to setup watermill queue: %w", err)
+ }
+ if err = w.logDeadLetterMessages(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (w *Queue) setupWatermillQueue(cfg *config) error {
+ logger := watermill.NewStdLogger(false, false)
+ pubSubConfig := gochannel.Config{
+ OutputChannelBuffer: int64(cfg.bufferSize),
+ Persistent: false,
+ }
+ w.queueChannel = gochannel.NewGoChannel(pubSubConfig, logger)
+
+ router, err := message.NewRouter(
+ message.RouterConfig{
+ CloseTimeout: 60 * time.Second,
+ },
+ logger,
+ )
if err != nil {
- return fmt.Errorf("failed to create webhook client: %v", err)
+ return fmt.Errorf("failed to create router: %v", err)
+ }
+ w.router = router
+
+ retryMiddleware := middleware.Retry{
+ MaxRetries: cfg.maxRetries,
+ InitialInterval: time.Duration(cfg.backoffSeconds) * time.Second,
+ MaxInterval: time.Duration(cfg.maxBackoffSeconds) * time.Second,
+ Multiplier: 2.0,
+ RandomizationFactor: 0.3,
+ Logger: logger,
+ }.Middleware
+
+ poisonQueue, err := middleware.PoisonQueue(w.queueChannel, deadLetterTopic)
+ if err != nil {
+ return fmt.Errorf("failed to create poison queue: %v", err)
+ }
+
+ router.AddPlugin(plugin.SignalsHandler)
+ router.AddMiddleware(retryMiddleware, poisonQueue)
+
+ for i := 0; i < cfg.nWorkers; i++ {
+ router.AddNoPublisherHandler(
+ pubSubHandlerNameTemplate(i),
+ pubSubTopicName,
+ w.queueChannel,
+ w.handleWebhook,
+ )
+ }
+
+ go func() {
+ // cancels the queue context so the dead letter logger exists in case context not canceled by the shutdown signal already
+ defer w.cancel()
+
+ if err := router.Run(w.ctx); err != nil && !errors.Is(err, context.Canceled) {
+ glog.Errorf("webhook pubsub worker stopped with error: %v", err)
+ }
+
+ glog.Info("webhook pubsub worker stopped")
+ }()
+
+ return nil
+}
+
+func (w *Queue) handleWebhook(msg *message.Message) error {
+ var n filer_pb.EventNotification
+ if err := proto.Unmarshal(msg.Payload, &n); err != nil {
+ glog.Errorf("failed to unmarshal protobuf message: %v", err)
+ return err
+ }
+
+ // Reconstruct webhook message from metadata and payload
+ webhookMsg := &webhookMessage{
+ Key: msg.Metadata.Get("key"),
+ EventType: msg.Metadata.Get("event_type"),
+ Notification: &n,
+ }
+
+ if err := w.client.sendMessage(webhookMsg); err != nil {
+ glog.Errorf("failed to send message to webhook %s: %v", webhookMsg.Key, err)
+ return err
}
- w.client = client
+
return nil
}
-func (w *WebhookQueue) SendMessage(key string, message proto.Message) error {
- if w.client == nil {
- return fmt.Errorf("webhook client not initialized")
+func (w *Queue) logDeadLetterMessages() error {
+ ch, err := w.queueChannel.Subscribe(w.ctx, deadLetterTopic)
+ if err != nil {
+ return err
}
- return w.client.sendMessage(key, message)
+
+ go func() {
+ for {
+ select {
+ case msg := <-ch:
+ glog.Errorf("received dead letter message: %s, key: %s", string(msg.Payload), msg.Metadata["key"])
+ case <-w.ctx.Done():
+ return
+ }
+ }
+ }()
+
+ return nil
}