diff options
Diffstat (limited to 'weed/notification/webhook/webhook_queue.go')
| -rw-r--r-- | weed/notification/webhook/webhook_queue.go | 29 |
1 files changed, 21 insertions, 8 deletions
diff --git a/weed/notification/webhook/webhook_queue.go b/weed/notification/webhook/webhook_queue.go index e034e9537..2ac9db3eb 100644 --- a/weed/notification/webhook/webhook_queue.go +++ b/weed/notification/webhook/webhook_queue.go @@ -31,6 +31,9 @@ type Queue struct { ctx context.Context cancel context.CancelFunc + + // Semaphore for controlling concurrent webhook requests + sem chan struct{} } func (w *Queue) GetName() string { @@ -89,6 +92,9 @@ func (w *Queue) initialize(cfg *config) error { w.config = cfg w.filter = newFilter(cfg) + // Initialize semaphore for controlling concurrent webhook requests + w.sem = make(chan struct{}, cfg.nWorkers) + hClient, err := newHTTPClient(cfg) if err != nil { return fmt.Errorf("failed to create webhook http client: %w", err) @@ -141,14 +147,17 @@ func (w *Queue) setupWatermillQueue(cfg *config) error { router.AddPlugin(plugin.SignalsHandler) router.AddMiddleware(retryMiddleware, poisonQueue) - for i := 0; i < cfg.nWorkers; i++ { - router.AddNoPublisherHandler( - pubSubHandlerNameTemplate(i), - pubSubTopicName, - w.queueChannel, - w.handleWebhook, - ) - } + // Add a single handler to avoid duplicate message delivery. + // With gochannel's default behavior, each handler call creates + // a separate subscription, and all subscriptions receive their own copy of each message. + // Using a single handler ensures each webhook is sent only once. + // Concurrency is controlled via semaphore in handleWebhook based on nWorkers config. + router.AddConsumerHandler( + "webhook_handler", + pubSubTopicName, + w.queueChannel, + w.handleWebhook, + ) go func() { // cancels the queue context so the dead letter logger exists in case context not canceled by the shutdown signal already @@ -165,6 +174,10 @@ func (w *Queue) setupWatermillQueue(cfg *config) error { } func (w *Queue) handleWebhook(msg *message.Message) error { + // Acquire semaphore slot (blocks if at capacity) + w.sem <- struct{}{} + defer func() { <-w.sem }() + var n filer_pb.EventNotification if err := proto.Unmarshal(msg.Payload, &n); err != nil { glog.Errorf("failed to unmarshal protobuf message: %v", err) |
