diff options
Diffstat (limited to 'weed/notification/webhook/webhook_queue.go')
| -rw-r--r-- | weed/notification/webhook/webhook_queue.go | 200 |
1 files changed, 168 insertions, 32 deletions
diff --git a/weed/notification/webhook/webhook_queue.go b/weed/notification/webhook/webhook_queue.go index d209b74e2..d8f9a0734 100644 --- a/weed/notification/webhook/webhook_queue.go +++ b/weed/notification/webhook/webhook_queue.go @@ -1,51 +1,81 @@ package webhook import ( + "context" + "errors" "fmt" - "net/url" + "time" + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" + "github.com/ThreeDotsLabs/watermill/message/router/plugin" + "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/notification" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/protobuf/proto" ) -// client defines the interface for transport client -// could be extended to support gRPC -type client interface { - sendMessage(key string, message proto.Message) error -} - func init() { - notification.MessageQueues = append(notification.MessageQueues, &WebhookQueue{}) + notification.MessageQueues = append(notification.MessageQueues, &Queue{}) } -type WebhookQueue struct { - client client +type Queue struct { + router *message.Router + queueChannel *gochannel.GoChannel + config *config + client client + filter *filter + + ctx context.Context + cancel context.CancelFunc } -type config struct { - endpoint string - authBearerToken string +func (w *Queue) GetName() string { + return queueName } -func (c *config) validate() error { - _, err := url.Parse(c.endpoint) +func (w *Queue) SendMessage(key string, msg proto.Message) error { + eventNotification, ok := msg.(*filer_pb.EventNotification) + if !ok { + return nil + } + + if w.filter != nil && !w.filter.shouldPublish(key, eventNotification) { + return nil + } + + m := newWebhookMessage(key, msg) + if m == nil { + return nil + } + + wMsg, err := m.toWaterMillMessage() if err != nil { - return fmt.Errorf("invalid webhook endpoint %w", err) + return err } - return nil + return w.queueChannel.Publish(pubSubTopicName, wMsg) } -func (w *WebhookQueue) GetName() string { - return "webhook" +func (w *webhookMessage) toWaterMillMessage() (*message.Message, error) { + payload, err := proto.Marshal(w.Notification) + if err != nil { + return nil, err + } + + msg := message.NewMessage(watermill.NewUUID(), payload) + // Set event type and key as metadata + msg.Metadata.Set("event_type", w.EventType) + msg.Metadata.Set("key", w.Key) + + return msg, nil } -func (w *WebhookQueue) Initialize(configuration util.Configuration, prefix string) error { - c := &config{ - endpoint: configuration.GetString(prefix + "endpoint"), - authBearerToken: configuration.GetString(prefix + "bearer_token"), - } +func (w *Queue) Initialize(configuration util.Configuration, prefix string) error { + c := newConfigWithDefaults(configuration, prefix) if err := c.validate(); err != nil { return err @@ -54,18 +84,124 @@ func (w *WebhookQueue) Initialize(configuration util.Configuration, prefix strin return w.initialize(c) } -func (w *WebhookQueue) initialize(cfg *config) error { - client, err := newHTTPClient(cfg) +func (w *Queue) initialize(cfg *config) error { + w.ctx, w.cancel = context.WithCancel(context.Background()) + w.config = cfg + w.filter = newFilter(cfg) + + hClient, err := newHTTPClient(cfg) + if err != nil { + return fmt.Errorf("failed to create webhook http client: %w", err) + } + w.client = hClient + + if err = w.setupWatermillQueue(cfg); err != nil { + return fmt.Errorf("failed to setup watermill queue: %w", err) + } + if err = w.logDeadLetterMessages(); err != nil { + return err + } + + return nil +} + +func (w *Queue) setupWatermillQueue(cfg *config) error { + logger := watermill.NewStdLogger(false, false) + pubSubConfig := gochannel.Config{ + OutputChannelBuffer: int64(cfg.bufferSize), + Persistent: false, + } + w.queueChannel = gochannel.NewGoChannel(pubSubConfig, logger) + + router, err := message.NewRouter( + message.RouterConfig{ + CloseTimeout: 60 * time.Second, + }, + logger, + ) if err != nil { - return fmt.Errorf("failed to create webhook client: %v", err) + return fmt.Errorf("failed to create router: %v", err) + } + w.router = router + + retryMiddleware := middleware.Retry{ + MaxRetries: cfg.maxRetries, + InitialInterval: time.Duration(cfg.backoffSeconds) * time.Second, + MaxInterval: time.Duration(cfg.maxBackoffSeconds) * time.Second, + Multiplier: 2.0, + RandomizationFactor: 0.3, + Logger: logger, + }.Middleware + + poisonQueue, err := middleware.PoisonQueue(w.queueChannel, deadLetterTopic) + if err != nil { + return fmt.Errorf("failed to create poison queue: %v", err) + } + + router.AddPlugin(plugin.SignalsHandler) + router.AddMiddleware(retryMiddleware, poisonQueue) + + for i := 0; i < cfg.nWorkers; i++ { + router.AddNoPublisherHandler( + pubSubHandlerNameTemplate(i), + pubSubTopicName, + w.queueChannel, + w.handleWebhook, + ) + } + + go func() { + // cancels the queue context so the dead letter logger exists in case context not canceled by the shutdown signal already + defer w.cancel() + + if err := router.Run(w.ctx); err != nil && !errors.Is(err, context.Canceled) { + glog.Errorf("webhook pubsub worker stopped with error: %v", err) + } + + glog.Info("webhook pubsub worker stopped") + }() + + return nil +} + +func (w *Queue) handleWebhook(msg *message.Message) error { + var n filer_pb.EventNotification + if err := proto.Unmarshal(msg.Payload, &n); err != nil { + glog.Errorf("failed to unmarshal protobuf message: %v", err) + return err + } + + // Reconstruct webhook message from metadata and payload + webhookMsg := &webhookMessage{ + Key: msg.Metadata.Get("key"), + EventType: msg.Metadata.Get("event_type"), + Notification: &n, + } + + if err := w.client.sendMessage(webhookMsg); err != nil { + glog.Errorf("failed to send message to webhook %s: %v", webhookMsg.Key, err) + return err } - w.client = client + return nil } -func (w *WebhookQueue) SendMessage(key string, message proto.Message) error { - if w.client == nil { - return fmt.Errorf("webhook client not initialized") +func (w *Queue) logDeadLetterMessages() error { + ch, err := w.queueChannel.Subscribe(w.ctx, deadLetterTopic) + if err != nil { + return err } - return w.client.sendMessage(key, message) + + go func() { + for { + select { + case msg := <-ch: + glog.Errorf("received dead letter message: %s, key: %s", string(msg.Payload), msg.Metadata["key"]) + case <-w.ctx.Done(): + return + } + } + }() + + return nil } |
