aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/notification/webhook/http.go106
-rw-r--r--weed/notification/webhook/http_test.go372
-rw-r--r--weed/notification/webhook/webhook_queue.go29
-rw-r--r--weed/notification/webhook/webhook_queue_test.go73
4 files changed, 564 insertions, 16 deletions
diff --git a/weed/notification/webhook/http.go b/weed/notification/webhook/http.go
index 6b1a0e26d..82a8f8e71 100644
--- a/weed/notification/webhook/http.go
+++ b/weed/notification/webhook/http.go
@@ -8,16 +8,24 @@ import (
"fmt"
"io"
"net/http"
+ "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
+const (
+ maxWebhookRetryDepth = 2
+)
+
type httpClient struct {
- endpoint string
- token string
- timeout time.Duration
+ endpoint string
+ token string
+ timeout time.Duration
+ client *http.Client // Reused HTTP client with redirect prevention
+ endpointMu sync.RWMutex
+ finalURL string // Cached final URL after following redirects
}
func newHTTPClient(cfg *config) (*httpClient, error) {
@@ -25,10 +33,24 @@ func newHTTPClient(cfg *config) (*httpClient, error) {
endpoint: cfg.endpoint,
token: cfg.authBearerToken,
timeout: time.Duration(cfg.timeoutSeconds) * time.Second,
+ client: &http.Client{
+ CheckRedirect: func(req *http.Request, via []*http.Request) error {
+ return http.ErrUseLastResponse
+ },
+ },
}, nil
}
func (h *httpClient) sendMessage(message *webhookMessage) error {
+ return h.sendMessageWithRetry(message, 0)
+}
+
+func (h *httpClient) sendMessageWithRetry(message *webhookMessage, depth int) error {
+ // Prevent infinite recursion
+ if depth > maxWebhookRetryDepth {
+ return fmt.Errorf("webhook max retry depth exceeded")
+ }
+
// Serialize the protobuf message to JSON for HTTP payload
notificationData, err := json.Marshal(message.Notification)
if err != nil {
@@ -46,7 +68,16 @@ func (h *httpClient) sendMessage(message *webhookMessage) error {
return fmt.Errorf("failed to marshal message: %w", err)
}
- req, err := http.NewRequest(http.MethodPost, h.endpoint, bytes.NewBuffer(jsonData))
+ // Use cached final URL if available, otherwise use original endpoint
+ h.endpointMu.RLock()
+ targetURL := h.finalURL
+ usingCachedURL := targetURL != ""
+ if targetURL == "" {
+ targetURL = h.endpoint
+ }
+ h.endpointMu.RUnlock()
+
+ req, err := http.NewRequest(http.MethodPost, targetURL, bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
@@ -56,29 +87,88 @@ func (h *httpClient) sendMessage(message *webhookMessage) error {
req.Header.Set("Authorization", "Bearer "+h.token)
}
+ // Apply timeout via context (not on client) to avoid redundancy
if h.timeout > 0 {
ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
defer cancel()
req = req.WithContext(ctx)
}
- resp, err := util_http.Do(req)
+ resp, err := h.client.Do(req)
if err != nil {
- if err = drainResponse(resp); err != nil {
- glog.Errorf("failed to drain response: %v", err)
+ if drainErr := drainResponse(resp); drainErr != nil {
+ glog.Errorf("failed to drain response: %v", drainErr)
+ }
+
+ // If using cached URL and request failed, clear cache and retry with original endpoint
+ if usingCachedURL && depth == 0 {
+ glog.V(1).Infof("Webhook request to cached URL %s failed, clearing cache and retrying with original endpoint", targetURL)
+ h.setFinalURL("")
+ return h.sendMessageWithRetry(message, depth+1)
}
return fmt.Errorf("failed to send request: %w", err)
}
- defer resp.Body.Close()
+
+ // Handle redirects by caching the final destination and recreating POST request
+ if resp.StatusCode >= 300 && resp.StatusCode < 400 {
+ // Drain and close response body to enable connection reuse
+ util_http.CloseResponse(resp)
+
+ location := resp.Header.Get("Location")
+ if location == "" {
+ return fmt.Errorf("webhook returned redirect status %d without Location header", resp.StatusCode)
+ }
+
+ // Resolve relative URLs against the request URL
+ reqURL := req.URL
+ finalURL, err := reqURL.Parse(location)
+ if err != nil {
+ return fmt.Errorf("failed to parse redirect location: %w", err)
+ }
+
+ // Update finalURL to follow the redirect for this attempt
+ finalURLStr := finalURL.String()
+ h.setFinalURL(finalURLStr)
+
+ if depth == 0 {
+ glog.V(1).Infof("Webhook endpoint redirected from %s to %s, caching final destination", targetURL, finalURLStr)
+ } else {
+ glog.V(1).Infof("Webhook endpoint redirected from %s to %s (following redirect on retry)", targetURL, finalURLStr)
+ }
+
+ // Recreate the POST request to the final destination (increment depth to prevent infinite loops)
+ return h.sendMessageWithRetry(message, depth+1)
+ }
+
+ // If using cached URL and got an error response, clear cache and retry with original endpoint
+ if resp.StatusCode >= 400 && usingCachedURL && depth == 0 {
+ // Drain and close response body to enable connection reuse
+ util_http.CloseResponse(resp)
+
+ glog.V(1).Infof("Webhook request to cached URL %s returned error %d, clearing cache and retrying with original endpoint", targetURL, resp.StatusCode)
+ h.setFinalURL("")
+ return h.sendMessageWithRetry(message, depth+1)
+ }
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+ // Drain and close response body before returning error to enable connection reuse
+ util_http.CloseResponse(resp)
return fmt.Errorf("webhook returned status code: %d", resp.StatusCode)
}
+ // Drain and close response body on success to enable connection reuse
+ util_http.CloseResponse(resp)
+
return nil
}
+func (h *httpClient) setFinalURL(url string) {
+ h.endpointMu.Lock()
+ h.finalURL = url
+ h.endpointMu.Unlock()
+}
+
func drainResponse(resp *http.Response) error {
if resp == nil || resp.Body == nil {
return nil
diff --git a/weed/notification/webhook/http_test.go b/weed/notification/webhook/http_test.go
index f7ef006ae..f64c01f2d 100644
--- a/weed/notification/webhook/http_test.go
+++ b/weed/notification/webhook/http_test.go
@@ -148,3 +148,375 @@ func TestHttpClientSendMessageNetworkError(t *testing.T) {
t.Error("Expected error for network failure")
}
}
+
+// TestHttpClientFollowsRedirectAsPost verifies that redirects are followed with POST method preserved
+func TestHttpClientFollowsRedirectAsPost(t *testing.T) {
+ redirectCalled := false
+ finalCalled := false
+ var finalMethod string
+ var finalBody map[string]interface{}
+
+ // Create final destination server
+ finalServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ finalCalled = true
+ finalMethod = r.Method
+ body, _ := io.ReadAll(r.Body)
+ json.Unmarshal(body, &finalBody)
+ w.WriteHeader(http.StatusOK)
+ }))
+ defer finalServer.Close()
+
+ // Create redirect server
+ redirectServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ redirectCalled = true
+ // Return 301 redirect to final server
+ http.Redirect(w, r, finalServer.URL, http.StatusMovedPermanently)
+ }))
+ defer redirectServer.Close()
+
+ cfg := &config{
+ endpoint: redirectServer.URL,
+ authBearerToken: "test-token",
+ timeoutSeconds: 5,
+ }
+
+ client, err := newHTTPClient(cfg)
+ if err != nil {
+ t.Fatalf("Failed to create HTTP client: %v", err)
+ }
+
+ message := &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ },
+ }
+
+ // Send message - should follow redirect and recreate POST request
+ err = client.sendMessage(newWebhookMessage("/test/path", message))
+ if err != nil {
+ t.Fatalf("Failed to send message: %v", err)
+ }
+
+ if !redirectCalled {
+ t.Error("Expected redirect server to be called")
+ }
+
+ if !finalCalled {
+ t.Error("Expected final server to be called after redirect")
+ }
+
+ if finalMethod != "POST" {
+ t.Errorf("Expected POST method at final destination, got %s", finalMethod)
+ }
+
+ if finalBody["key"] != "/test/path" {
+ t.Errorf("Expected key '/test/path' at final destination, got %v", finalBody["key"])
+ }
+
+ // Verify the final URL is cached
+ client.endpointMu.RLock()
+ cachedURL := client.finalURL
+ client.endpointMu.RUnlock()
+
+ if cachedURL != finalServer.URL {
+ t.Errorf("Expected cached URL %s, got %s", finalServer.URL, cachedURL)
+ }
+}
+
+// TestHttpClientUsesCachedRedirect verifies that subsequent requests use the cached redirect destination
+func TestHttpClientUsesCachedRedirect(t *testing.T) {
+ redirectCount := 0
+ finalCount := 0
+
+ // Create final destination server
+ finalServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ finalCount++
+ w.WriteHeader(http.StatusOK)
+ }))
+ defer finalServer.Close()
+
+ // Create redirect server
+ redirectServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ redirectCount++
+ http.Redirect(w, r, finalServer.URL, http.StatusMovedPermanently)
+ }))
+ defer redirectServer.Close()
+
+ cfg := &config{
+ endpoint: redirectServer.URL,
+ authBearerToken: "test-token",
+ timeoutSeconds: 5,
+ }
+
+ client, err := newHTTPClient(cfg)
+ if err != nil {
+ t.Fatalf("Failed to create HTTP client: %v", err)
+ }
+
+ message := &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ },
+ }
+
+ // First request - should hit redirect server
+ err = client.sendMessage(newWebhookMessage("/test/path1", message))
+ if err != nil {
+ t.Fatalf("Failed to send first message: %v", err)
+ }
+
+ if redirectCount != 1 {
+ t.Errorf("Expected 1 redirect call, got %d", redirectCount)
+ }
+ if finalCount != 1 {
+ t.Errorf("Expected 1 final call, got %d", finalCount)
+ }
+
+ // Second request - should use cached URL and skip redirect server
+ err = client.sendMessage(newWebhookMessage("/test/path2", message))
+ if err != nil {
+ t.Fatalf("Failed to send second message: %v", err)
+ }
+
+ if redirectCount != 1 {
+ t.Errorf("Expected redirect server to be called only once (cached), got %d calls", redirectCount)
+ }
+ if finalCount != 2 {
+ t.Errorf("Expected 2 final calls, got %d", finalCount)
+ }
+}
+
+// TestHttpClientPreservesPostMethod verifies POST method is preserved and not converted to GET
+func TestHttpClientPreservesPostMethod(t *testing.T) {
+ var receivedMethod string
+
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ receivedMethod = r.Method
+ w.WriteHeader(http.StatusOK)
+ }))
+ defer server.Close()
+
+ cfg := &config{
+ endpoint: server.URL,
+ authBearerToken: "test-token",
+ timeoutSeconds: 5,
+ }
+
+ client, err := newHTTPClient(cfg)
+ if err != nil {
+ t.Fatalf("Failed to create HTTP client: %v", err)
+ }
+
+ message := &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ },
+ }
+
+ err = client.sendMessage(newWebhookMessage("/test/path", message))
+ if err != nil {
+ t.Fatalf("Failed to send message: %v", err)
+ }
+
+ if receivedMethod != "POST" {
+ t.Errorf("Expected POST method, got %s", receivedMethod)
+ }
+}
+
+// TestHttpClientInvalidatesCacheOnError verifies that cache is invalidated when cached URL fails
+func TestHttpClientInvalidatesCacheOnError(t *testing.T) {
+ finalServerDown := false // Start with server UP
+ originalCallCount := 0
+ finalCallCount := 0
+
+ // Create final destination server that can be toggled
+ finalServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ finalCallCount++
+ if finalServerDown {
+ w.WriteHeader(http.StatusServiceUnavailable)
+ } else {
+ w.WriteHeader(http.StatusOK)
+ }
+ }))
+ defer finalServer.Close()
+
+ // Create redirect server
+ redirectServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ originalCallCount++
+ http.Redirect(w, r, finalServer.URL, http.StatusMovedPermanently)
+ }))
+ defer redirectServer.Close()
+
+ cfg := &config{
+ endpoint: redirectServer.URL,
+ authBearerToken: "test-token",
+ timeoutSeconds: 5,
+ }
+
+ client, err := newHTTPClient(cfg)
+ if err != nil {
+ t.Fatalf("Failed to create HTTP client: %v", err)
+ }
+
+ message := &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ },
+ }
+
+ // First request - should follow redirect and cache the final URL
+ err = client.sendMessage(newWebhookMessage("/test/path1", message))
+ if err != nil {
+ t.Fatalf("Failed to send first message: %v", err)
+ }
+
+ if originalCallCount != 1 {
+ t.Errorf("Expected 1 original call, got %d", originalCallCount)
+ }
+ if finalCallCount != 1 {
+ t.Errorf("Expected 1 final call, got %d", finalCallCount)
+ }
+
+ // Verify cache was set
+ client.endpointMu.RLock()
+ cachedURL := client.finalURL
+ client.endpointMu.RUnlock()
+ if cachedURL != finalServer.URL {
+ t.Errorf("Expected cached URL %s, got %s", finalServer.URL, cachedURL)
+ }
+
+ // Second request with cached URL working - should use cache
+ err = client.sendMessage(newWebhookMessage("/test/path2", message))
+ if err != nil {
+ t.Fatalf("Failed to send second message: %v", err)
+ }
+
+ if originalCallCount != 1 {
+ t.Errorf("Expected still 1 original call (using cache), got %d", originalCallCount)
+ }
+ if finalCallCount != 2 {
+ t.Errorf("Expected 2 final calls, got %d", finalCallCount)
+ }
+
+ // Third request - bring final server DOWN, should invalidate cache and retry with original
+ // Flow: cached URL (fail, depth=0) -> clear cache -> retry original (depth=1) -> redirect -> final (fail, depth=2)
+ finalServerDown = true
+ err = client.sendMessage(newWebhookMessage("/test/path3", message))
+ if err == nil {
+ t.Error("Expected error when cached URL fails and retry also fails")
+ }
+
+ // originalCallCount: 1 (initial) + 1 (retry after cache invalidation) = 2
+ if originalCallCount != 2 {
+ t.Errorf("Expected 2 original calls, got %d", originalCallCount)
+ }
+ // finalCallCount: 2 (previous) + 1 (cached fail) + 1 (retry after redirect) = 4
+ if finalCallCount != 4 {
+ t.Errorf("Expected 4 final calls, got %d", finalCallCount)
+ }
+
+ // Verify final URL is still set (to the failed destination from the redirect)
+ client.endpointMu.RLock()
+ finalURLAfterError := client.finalURL
+ client.endpointMu.RUnlock()
+ if finalURLAfterError != finalServer.URL {
+ t.Errorf("Expected finalURL to be %s after error, got %s", finalServer.URL, finalURLAfterError)
+ }
+
+ // Fourth request - bring final server back UP
+ // Since cache still has the final URL, it should use it directly
+ finalServerDown = false
+ err = client.sendMessage(newWebhookMessage("/test/path4", message))
+ if err != nil {
+ t.Fatalf("Failed to send fourth message after recovery: %v", err)
+ }
+
+ // Should have used the cached URL directly (no new original call)
+ // originalCallCount: still 2
+ if originalCallCount != 2 {
+ t.Errorf("Expected 2 original calls (using cache), got %d", originalCallCount)
+ }
+ // finalCallCount: 4 + 1 = 5
+ if finalCallCount != 5 {
+ t.Errorf("Expected 5 final calls, got %d", finalCallCount)
+ }
+
+ // Verify cache was re-established
+ client.endpointMu.RLock()
+ reestablishedCache := client.finalURL
+ client.endpointMu.RUnlock()
+ if reestablishedCache != finalServer.URL {
+ t.Errorf("Expected cache to be re-established to %s, got %s", finalServer.URL, reestablishedCache)
+ }
+}
+
+// TestHttpClientInvalidatesCacheOnNetworkError verifies cache invalidation on network errors
+func TestHttpClientInvalidatesCacheOnNetworkError(t *testing.T) {
+ originalCallCount := 0
+ var finalServer *httptest.Server
+ // Create redirect server
+ redirectServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ originalCallCount++
+ if finalServer != nil {
+ http.Redirect(w, r, finalServer.URL, http.StatusMovedPermanently)
+ } else {
+ w.WriteHeader(http.StatusInternalServerError)
+ }
+ }))
+ defer redirectServer.Close()
+
+ // Create final destination server
+ finalServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ }))
+
+ cfg := &config{
+ endpoint: redirectServer.URL,
+ authBearerToken: "test-token",
+ timeoutSeconds: 5,
+ }
+
+ client, err := newHTTPClient(cfg)
+ if err != nil {
+ t.Fatalf("Failed to create HTTP client: %v", err)
+ }
+
+ message := &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ },
+ }
+
+ // First request - establish cache
+ err = client.sendMessage(newWebhookMessage("/test/path1", message))
+ if err != nil {
+ t.Fatalf("Failed to send first message: %v", err)
+ }
+
+ if originalCallCount != 1 {
+ t.Errorf("Expected 1 original call, got %d", originalCallCount)
+ }
+
+ // Close final server to simulate network error
+ cachedURL := finalServer.URL
+ finalServer.Close()
+ finalServer = nil
+
+ // Second request - cached URL is down, should invalidate and retry with original
+ err = client.sendMessage(newWebhookMessage("/test/path2", message))
+ if err == nil {
+ t.Error("Expected error when network fails")
+ }
+
+ if originalCallCount != 2 {
+ t.Errorf("Expected 2 original calls (retry after cache invalidation), got %d", originalCallCount)
+ }
+
+ // Verify cache was cleared
+ client.endpointMu.RLock()
+ clearedCache := client.finalURL
+ client.endpointMu.RUnlock()
+ if clearedCache == cachedURL {
+ t.Errorf("Expected cache to be invalidated, but still has %s", clearedCache)
+ }
+}
diff --git a/weed/notification/webhook/webhook_queue.go b/weed/notification/webhook/webhook_queue.go
index e034e9537..2ac9db3eb 100644
--- a/weed/notification/webhook/webhook_queue.go
+++ b/weed/notification/webhook/webhook_queue.go
@@ -31,6 +31,9 @@ type Queue struct {
ctx context.Context
cancel context.CancelFunc
+
+ // Semaphore for controlling concurrent webhook requests
+ sem chan struct{}
}
func (w *Queue) GetName() string {
@@ -89,6 +92,9 @@ func (w *Queue) initialize(cfg *config) error {
w.config = cfg
w.filter = newFilter(cfg)
+ // Initialize semaphore for controlling concurrent webhook requests
+ w.sem = make(chan struct{}, cfg.nWorkers)
+
hClient, err := newHTTPClient(cfg)
if err != nil {
return fmt.Errorf("failed to create webhook http client: %w", err)
@@ -141,14 +147,17 @@ func (w *Queue) setupWatermillQueue(cfg *config) error {
router.AddPlugin(plugin.SignalsHandler)
router.AddMiddleware(retryMiddleware, poisonQueue)
- for i := 0; i < cfg.nWorkers; i++ {
- router.AddNoPublisherHandler(
- pubSubHandlerNameTemplate(i),
- pubSubTopicName,
- w.queueChannel,
- w.handleWebhook,
- )
- }
+ // Add a single handler to avoid duplicate message delivery.
+ // With gochannel's default behavior, each handler call creates
+ // a separate subscription, and all subscriptions receive their own copy of each message.
+ // Using a single handler ensures each webhook is sent only once.
+ // Concurrency is controlled via semaphore in handleWebhook based on nWorkers config.
+ router.AddConsumerHandler(
+ "webhook_handler",
+ pubSubTopicName,
+ w.queueChannel,
+ w.handleWebhook,
+ )
go func() {
// cancels the queue context so the dead letter logger exists in case context not canceled by the shutdown signal already
@@ -165,6 +174,10 @@ func (w *Queue) setupWatermillQueue(cfg *config) error {
}
func (w *Queue) handleWebhook(msg *message.Message) error {
+ // Acquire semaphore slot (blocks if at capacity)
+ w.sem <- struct{}{}
+ defer func() { <-w.sem }()
+
var n filer_pb.EventNotification
if err := proto.Unmarshal(msg.Payload, &n); err != nil {
glog.Errorf("failed to unmarshal protobuf message: %v", err)
diff --git a/weed/notification/webhook/webhook_queue_test.go b/weed/notification/webhook/webhook_queue_test.go
index 52a290149..a084ef975 100644
--- a/weed/notification/webhook/webhook_queue_test.go
+++ b/weed/notification/webhook/webhook_queue_test.go
@@ -273,6 +273,7 @@ func TestQueueHandleWebhook(t *testing.T) {
client, _ := newHTTPClient(cfg)
q := &Queue{
client: client,
+ sem: make(chan struct{}, cfg.nWorkers),
}
message := newWebhookMessage("/test/path", &filer_pb.EventNotification{
@@ -386,6 +387,78 @@ func TestQueueRetryMechanism(t *testing.T) {
}
}
+// TestQueueNoDuplicateWebhooks verifies that webhooks are sent only once regardless of worker count
+func TestQueueNoDuplicateWebhooks(t *testing.T) {
+ tests := []struct {
+ name string
+ nWorkers int
+ expected int
+ }{
+ {"1 worker", 1, 1},
+ {"5 workers", 5, 1},
+ {"10 workers", 10, 1},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ callCount := 0
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ callCount++
+ w.WriteHeader(http.StatusOK)
+ }))
+ defer server.Close()
+
+ cfg := &config{
+ endpoint: server.URL,
+ authBearerToken: "test-token",
+ timeoutSeconds: 5,
+ maxRetries: 0,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: tt.nWorkers,
+ bufferSize: 10,
+ }
+
+ q := &Queue{}
+ err := q.initialize(cfg)
+ if err != nil {
+ t.Fatalf("Failed to initialize queue: %v", err)
+ }
+
+ defer func() {
+ if q.cancel != nil {
+ q.cancel()
+ }
+ time.Sleep(100 * time.Millisecond)
+ if q.router != nil {
+ q.router.Close()
+ }
+ }()
+
+ // Wait for router and subscriber to be fully ready
+ time.Sleep(200 * time.Millisecond)
+
+ msg := &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ },
+ }
+
+ err = q.SendMessage("/test/path", msg)
+ if err != nil {
+ t.Errorf("SendMessage() error = %v", err)
+ }
+
+ // Wait for message processing
+ time.Sleep(500 * time.Millisecond)
+
+ if callCount != tt.expected {
+ t.Errorf("Expected %d webhook call(s), got %d with %d workers", tt.expected, callCount, tt.nWorkers)
+ }
+ })
+ }
+}
+
func TestQueueSendMessageWithFilter(t *testing.T) {
tests := []struct {
name string