aboutsummaryrefslogtreecommitdiff
path: root/weed/notification/webhook/webhook_queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/notification/webhook/webhook_queue.go')
-rw-r--r--weed/notification/webhook/webhook_queue.go29
1 files changed, 21 insertions, 8 deletions
diff --git a/weed/notification/webhook/webhook_queue.go b/weed/notification/webhook/webhook_queue.go
index e034e9537..2ac9db3eb 100644
--- a/weed/notification/webhook/webhook_queue.go
+++ b/weed/notification/webhook/webhook_queue.go
@@ -31,6 +31,9 @@ type Queue struct {
ctx context.Context
cancel context.CancelFunc
+
+ // Semaphore for controlling concurrent webhook requests
+ sem chan struct{}
}
func (w *Queue) GetName() string {
@@ -89,6 +92,9 @@ func (w *Queue) initialize(cfg *config) error {
w.config = cfg
w.filter = newFilter(cfg)
+ // Initialize semaphore for controlling concurrent webhook requests
+ w.sem = make(chan struct{}, cfg.nWorkers)
+
hClient, err := newHTTPClient(cfg)
if err != nil {
return fmt.Errorf("failed to create webhook http client: %w", err)
@@ -141,14 +147,17 @@ func (w *Queue) setupWatermillQueue(cfg *config) error {
router.AddPlugin(plugin.SignalsHandler)
router.AddMiddleware(retryMiddleware, poisonQueue)
- for i := 0; i < cfg.nWorkers; i++ {
- router.AddNoPublisherHandler(
- pubSubHandlerNameTemplate(i),
- pubSubTopicName,
- w.queueChannel,
- w.handleWebhook,
- )
- }
+ // Add a single handler to avoid duplicate message delivery.
+ // With gochannel's default behavior, each handler call creates
+ // a separate subscription, and all subscriptions receive their own copy of each message.
+ // Using a single handler ensures each webhook is sent only once.
+ // Concurrency is controlled via semaphore in handleWebhook based on nWorkers config.
+ router.AddConsumerHandler(
+ "webhook_handler",
+ pubSubTopicName,
+ w.queueChannel,
+ w.handleWebhook,
+ )
go func() {
// cancels the queue context so the dead letter logger exists in case context not canceled by the shutdown signal already
@@ -165,6 +174,10 @@ func (w *Queue) setupWatermillQueue(cfg *config) error {
}
func (w *Queue) handleWebhook(msg *message.Message) error {
+ // Acquire semaphore slot (blocks if at capacity)
+ w.sem <- struct{}{}
+ defer func() { <-w.sem }()
+
var n filer_pb.EventNotification
if err := proto.Unmarshal(msg.Payload, &n); err != nil {
glog.Errorf("failed to unmarshal protobuf message: %v", err)