aboutsummaryrefslogtreecommitdiff
path: root/weed/notification/webhook/webhook_queue_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/notification/webhook/webhook_queue_test.go')
-rw-r--r--weed/notification/webhook/webhook_queue_test.go73
1 files changed, 73 insertions, 0 deletions
diff --git a/weed/notification/webhook/webhook_queue_test.go b/weed/notification/webhook/webhook_queue_test.go
index 52a290149..a084ef975 100644
--- a/weed/notification/webhook/webhook_queue_test.go
+++ b/weed/notification/webhook/webhook_queue_test.go
@@ -273,6 +273,7 @@ func TestQueueHandleWebhook(t *testing.T) {
client, _ := newHTTPClient(cfg)
q := &Queue{
client: client,
+ sem: make(chan struct{}, cfg.nWorkers),
}
message := newWebhookMessage("/test/path", &filer_pb.EventNotification{
@@ -386,6 +387,78 @@ func TestQueueRetryMechanism(t *testing.T) {
}
}
+// TestQueueNoDuplicateWebhooks verifies that webhooks are sent only once regardless of worker count
+func TestQueueNoDuplicateWebhooks(t *testing.T) {
+ tests := []struct {
+ name string
+ nWorkers int
+ expected int
+ }{
+ {"1 worker", 1, 1},
+ {"5 workers", 5, 1},
+ {"10 workers", 10, 1},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ callCount := 0
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ callCount++
+ w.WriteHeader(http.StatusOK)
+ }))
+ defer server.Close()
+
+ cfg := &config{
+ endpoint: server.URL,
+ authBearerToken: "test-token",
+ timeoutSeconds: 5,
+ maxRetries: 0,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: tt.nWorkers,
+ bufferSize: 10,
+ }
+
+ q := &Queue{}
+ err := q.initialize(cfg)
+ if err != nil {
+ t.Fatalf("Failed to initialize queue: %v", err)
+ }
+
+ defer func() {
+ if q.cancel != nil {
+ q.cancel()
+ }
+ time.Sleep(100 * time.Millisecond)
+ if q.router != nil {
+ q.router.Close()
+ }
+ }()
+
+ // Wait for router and subscriber to be fully ready
+ time.Sleep(200 * time.Millisecond)
+
+ msg := &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ },
+ }
+
+ err = q.SendMessage("/test/path", msg)
+ if err != nil {
+ t.Errorf("SendMessage() error = %v", err)
+ }
+
+ // Wait for message processing
+ time.Sleep(500 * time.Millisecond)
+
+ if callCount != tt.expected {
+ t.Errorf("Expected %d webhook call(s), got %d with %d workers", tt.expected, callCount, tt.nWorkers)
+ }
+ })
+ }
+}
+
func TestQueueSendMessageWithFilter(t *testing.T) {
tests := []struct {
name string