diff options
Diffstat (limited to 'weed/notification/webhook/webhook_queue_test.go')
| -rw-r--r-- | weed/notification/webhook/webhook_queue_test.go | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/weed/notification/webhook/webhook_queue_test.go b/weed/notification/webhook/webhook_queue_test.go index 52a290149..a084ef975 100644 --- a/weed/notification/webhook/webhook_queue_test.go +++ b/weed/notification/webhook/webhook_queue_test.go @@ -273,6 +273,7 @@ func TestQueueHandleWebhook(t *testing.T) { client, _ := newHTTPClient(cfg) q := &Queue{ client: client, + sem: make(chan struct{}, cfg.nWorkers), } message := newWebhookMessage("/test/path", &filer_pb.EventNotification{ @@ -386,6 +387,78 @@ func TestQueueRetryMechanism(t *testing.T) { } } +// TestQueueNoDuplicateWebhooks verifies that webhooks are sent only once regardless of worker count +func TestQueueNoDuplicateWebhooks(t *testing.T) { + tests := []struct { + name string + nWorkers int + expected int + }{ + {"1 worker", 1, 1}, + {"5 workers", 5, 1}, + {"10 workers", 10, 1}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + callCount := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + callCount++ + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + cfg := &config{ + endpoint: server.URL, + authBearerToken: "test-token", + timeoutSeconds: 5, + maxRetries: 0, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: tt.nWorkers, + bufferSize: 10, + } + + q := &Queue{} + err := q.initialize(cfg) + if err != nil { + t.Fatalf("Failed to initialize queue: %v", err) + } + + defer func() { + if q.cancel != nil { + q.cancel() + } + time.Sleep(100 * time.Millisecond) + if q.router != nil { + q.router.Close() + } + }() + + // Wait for router and subscriber to be fully ready + time.Sleep(200 * time.Millisecond) + + msg := &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + }, + } + + err = q.SendMessage("/test/path", msg) + if err != nil { + t.Errorf("SendMessage() error = %v", err) + } + + // Wait for message processing + time.Sleep(500 * time.Millisecond) + + if callCount != tt.expected { + t.Errorf("Expected %d webhook call(s), got %d with %d workers", tt.expected, callCount, tt.nWorkers) + } + }) + } +} + func TestQueueSendMessageWithFilter(t *testing.T) { tests := []struct { name string |
