aboutsummaryrefslogtreecommitdiff
path: root/weed/notification/webhook/webhook_queue_test.go
diff options
context:
space:
mode:
authorIbrahim Konsowa <imkonsowa@gmail.com>2025-07-15 21:49:37 +0400
committerGitHub <noreply@github.com>2025-07-15 10:49:37 -0700
commitd78aa3d2de1a18c9802f72c926efa8e80ff2fa64 (patch)
tree7980858236682e2cdb3f91f6ed3f9eeaf4843b47 /weed/notification/webhook/webhook_queue_test.go
parent74f4e9ba5ab691938e89b32017a89359ae49428f (diff)
downloadseaweedfs-d78aa3d2de1a18c9802f72c926efa8e80ff2fa64.tar.xz
seaweedfs-d78aa3d2de1a18c9802f72c926efa8e80ff2fa64.zip
[Notifications] Improving webhook notifications (#6965)
* worker setup * fix tests * start worker * graceful worker drain * retry queue * migrate queue to watermill * adding filters and improvements * add the event type to the webhook message * eliminating redundant JSON serialization * resolve review comments * trigger actions * fix tests * typo fixes * read max_backoff_seconds from config * add more context to the dead letter * close the http response on errors * drain the http response body in case not empty * eliminate exported typesπ
Diffstat (limited to 'weed/notification/webhook/webhook_queue_test.go')
-rw-r--r--weed/notification/webhook/webhook_queue_test.go536
1 files changed, 536 insertions, 0 deletions
diff --git a/weed/notification/webhook/webhook_queue_test.go b/weed/notification/webhook/webhook_queue_test.go
new file mode 100644
index 000000000..52a290149
--- /dev/null
+++ b/weed/notification/webhook/webhook_queue_test.go
@@ -0,0 +1,536 @@
+package webhook
+
+import (
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "google.golang.org/protobuf/proto"
+)
+
+func TestConfigValidation(t *testing.T) {
+ tests := []struct {
+ name string
+ config *config
+ wantErr bool
+ errMsg string
+ }{
+ {
+ name: "valid config",
+ config: &config{
+ endpoint: "https://example.com/webhook",
+ authBearerToken: "test-token",
+ timeoutSeconds: 30,
+ maxRetries: 3,
+ backoffSeconds: 5,
+ maxBackoffSeconds: 30,
+ nWorkers: 5,
+ bufferSize: 10000,
+ },
+ wantErr: false,
+ },
+ {
+ name: "empty endpoint",
+ config: &config{
+ endpoint: "",
+ timeoutSeconds: 30,
+ maxRetries: 3,
+ backoffSeconds: 5,
+ maxBackoffSeconds: 30,
+ nWorkers: 5,
+ bufferSize: 10000,
+ },
+ wantErr: true,
+ errMsg: "endpoint is required",
+ },
+ {
+ name: "invalid URL",
+ config: &config{
+ endpoint: "://invalid-url",
+ timeoutSeconds: 30,
+ maxRetries: 3,
+ backoffSeconds: 5,
+ maxBackoffSeconds: 30,
+ nWorkers: 5,
+ bufferSize: 10000,
+ },
+ wantErr: true,
+ errMsg: "invalid webhook endpoint",
+ },
+ {
+ name: "timeout too large",
+ config: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 301,
+ maxRetries: 3,
+ backoffSeconds: 5,
+ maxBackoffSeconds: 30,
+ nWorkers: 5,
+ bufferSize: 10000,
+ },
+ wantErr: true,
+ errMsg: "timeout must be between",
+ },
+ {
+ name: "too many retries",
+ config: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 30,
+ maxRetries: 11,
+ backoffSeconds: 5,
+ maxBackoffSeconds: 30,
+ nWorkers: 5,
+ bufferSize: 10000,
+ },
+ wantErr: true,
+ errMsg: "max retries must be between",
+ },
+ {
+ name: "too many workers",
+ config: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 30,
+ maxRetries: 3,
+ backoffSeconds: 5,
+ maxBackoffSeconds: 30,
+ nWorkers: 101,
+ bufferSize: 10000,
+ },
+ wantErr: true,
+ errMsg: "workers must be between",
+ },
+ {
+ name: "buffer too large",
+ config: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 30,
+ maxRetries: 3,
+ backoffSeconds: 5,
+ maxBackoffSeconds: 30,
+ nWorkers: 5,
+ bufferSize: 1000001,
+ },
+ wantErr: true,
+ errMsg: "buffer size must be between",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ err := tt.config.validate()
+ if (err != nil) != tt.wantErr {
+ t.Errorf("validate() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ if err != nil && tt.errMsg != "" {
+ if err.Error() == "" || !strings.Contains(err.Error(), tt.errMsg) {
+ t.Errorf("validate() error message = %v, want to contain %v", err.Error(), tt.errMsg)
+ }
+ }
+ })
+ }
+}
+
+func TestWebhookMessageSerialization(t *testing.T) {
+ msg := &filer_pb.EventNotification{
+ OldEntry: nil,
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ IsDirectory: false,
+ },
+ }
+
+ webhookMsg := newWebhookMessage("/test/path", msg)
+
+ wmMsg, err := webhookMsg.toWaterMillMessage()
+ if err != nil {
+ t.Fatalf("Failed to convert to watermill message: %v", err)
+ }
+
+ // Unmarshal the protobuf payload directly
+ var eventNotification filer_pb.EventNotification
+ err = proto.Unmarshal(wmMsg.Payload, &eventNotification)
+ if err != nil {
+ t.Fatalf("Failed to unmarshal protobuf message: %v", err)
+ }
+
+ // Check metadata
+ if wmMsg.Metadata.Get("key") != "/test/path" {
+ t.Errorf("Expected key '/test/path', got %v", wmMsg.Metadata.Get("key"))
+ }
+
+ if wmMsg.Metadata.Get("event_type") != "create" {
+ t.Errorf("Expected event type 'create', got %v", wmMsg.Metadata.Get("event_type"))
+ }
+
+ if eventNotification.NewEntry.Name != "test.txt" {
+ t.Errorf("Expected file name 'test.txt', got %v", eventNotification.NewEntry.Name)
+ }
+}
+
+func TestQueueInitialize(t *testing.T) {
+ cfg := &config{
+ endpoint: "https://example.com/webhook",
+ authBearerToken: "test-token",
+ timeoutSeconds: 10,
+ maxRetries: 3,
+ backoffSeconds: 3,
+ maxBackoffSeconds: 60,
+ nWorkers: 1,
+ bufferSize: 100,
+ }
+
+ q := &Queue{}
+ err := q.initialize(cfg)
+ if err != nil {
+ t.Errorf("Initialize() error = %v", err)
+ }
+
+ defer func() {
+ if q.cancel != nil {
+ q.cancel()
+ }
+ time.Sleep(100 * time.Millisecond)
+ if q.router != nil {
+ q.router.Close()
+ }
+ }()
+
+ if q.router == nil {
+ t.Error("Expected router to be initialized")
+ }
+ if q.queueChannel == nil {
+ t.Error("Expected queueChannel to be initialized")
+ }
+ if q.client == nil {
+ t.Error("Expected client to be initialized")
+ }
+ if q.config == nil {
+ t.Error("Expected config to be initialized")
+ }
+}
+
+// TestQueueSendMessage test sending messages to the queue
+func TestQueueSendMessage(t *testing.T) {
+ cfg := &config{
+ endpoint: "https://example.com/webhook",
+ authBearerToken: "test-token",
+ timeoutSeconds: 1,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ }
+
+ q := &Queue{}
+ err := q.initialize(cfg)
+ if err != nil {
+ t.Fatalf("Failed to initialize queue: %v", err)
+ }
+
+ defer func() {
+ if q.cancel != nil {
+ q.cancel()
+ }
+ time.Sleep(100 * time.Millisecond)
+ if q.router != nil {
+ q.router.Close()
+ }
+ }()
+
+ msg := &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ },
+ }
+
+ err = q.SendMessage("/test/path", msg)
+ if err != nil {
+ t.Errorf("SendMessage() error = %v", err)
+ }
+}
+
+func TestQueueHandleWebhook(t *testing.T) {
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ }))
+ defer server.Close()
+
+ cfg := &config{
+ endpoint: server.URL,
+ authBearerToken: "test-token",
+ timeoutSeconds: 1,
+ maxRetries: 0,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ }
+
+ client, _ := newHTTPClient(cfg)
+ q := &Queue{
+ client: client,
+ }
+
+ message := newWebhookMessage("/test/path", &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ },
+ })
+
+ wmMsg, err := message.toWaterMillMessage()
+ if err != nil {
+ t.Fatalf("Failed to create watermill message: %v", err)
+ }
+
+ err = q.handleWebhook(wmMsg)
+ if err != nil {
+ t.Errorf("handleWebhook() error = %v", err)
+ }
+}
+
+func TestQueueEndToEnd(t *testing.T) {
+ // Simplified test - just verify the queue can be created and message can be sent
+ // without needing full end-to-end processing
+ cfg := &config{
+ endpoint: "https://example.com/webhook",
+ authBearerToken: "test-token",
+ timeoutSeconds: 1,
+ maxRetries: 0,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ }
+
+ q := &Queue{}
+ err := q.initialize(cfg)
+ if err != nil {
+ t.Fatalf("Failed to initialize queue: %v", err)
+ }
+
+ defer func() {
+ if q.cancel != nil {
+ q.cancel()
+ }
+ time.Sleep(100 * time.Millisecond)
+ if q.router != nil {
+ q.router.Close()
+ }
+ }()
+
+ msg := &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ },
+ }
+
+ err = q.SendMessage("/test/path", msg)
+ if err != nil {
+ t.Errorf("SendMessage() error = %v", err)
+ }
+}
+
+func TestQueueRetryMechanism(t *testing.T) {
+ cfg := &config{
+ endpoint: "https://example.com/webhook",
+ authBearerToken: "test-token",
+ timeoutSeconds: 1,
+ maxRetries: 3, // Test that this config is used
+ backoffSeconds: 2,
+ maxBackoffSeconds: 10,
+ nWorkers: 1,
+ bufferSize: 10,
+ }
+
+ q := &Queue{}
+ err := q.initialize(cfg)
+ if err != nil {
+ t.Fatalf("Failed to initialize queue: %v", err)
+ }
+
+ defer func() {
+ if q.cancel != nil {
+ q.cancel()
+ }
+ time.Sleep(100 * time.Millisecond)
+ if q.router != nil {
+ q.router.Close()
+ }
+ }()
+
+ // Verify that the queue is properly configured for retries
+ if q.config.maxRetries != 3 {
+ t.Errorf("Expected maxRetries=3, got %d", q.config.maxRetries)
+ }
+
+ if q.config.backoffSeconds != 2 {
+ t.Errorf("Expected backoffSeconds=2, got %d", q.config.backoffSeconds)
+ }
+
+ if q.config.maxBackoffSeconds != 10 {
+ t.Errorf("Expected maxBackoffSeconds=10, got %d", q.config.maxBackoffSeconds)
+ }
+
+ // Test that we can send a message (retry behavior is handled by Watermill middleware)
+ msg := &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "test.txt"},
+ }
+
+ err = q.SendMessage("/test/retry", msg)
+ if err != nil {
+ t.Errorf("SendMessage() error = %v", err)
+ }
+}
+
+func TestQueueSendMessageWithFilter(t *testing.T) {
+ tests := []struct {
+ name string
+ cfg *config
+ key string
+ notification *filer_pb.EventNotification
+ shouldPublish bool
+ }{
+ {
+ name: "allowed event type",
+ cfg: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 10,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ eventTypes: []string{"create"},
+ },
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: true,
+ },
+ {
+ name: "filtered event type",
+ cfg: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 10,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ eventTypes: []string{"update", "rename"},
+ },
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: false,
+ },
+ {
+ name: "allowed path prefix",
+ cfg: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 10,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ pathPrefixes: []string{"/data/"},
+ },
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: true,
+ },
+ {
+ name: "filtered path prefix",
+ cfg: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 10,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ pathPrefixes: []string{"/logs/"},
+ },
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: false,
+ },
+ {
+ name: "combined filters - both pass",
+ cfg: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 10,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ eventTypes: []string{"create", "delete"},
+ pathPrefixes: []string{"/data/", "/logs/"},
+ },
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: true,
+ },
+ {
+ name: "combined filters - event fails",
+ cfg: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 10,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ eventTypes: []string{"update", "delete"},
+ pathPrefixes: []string{"/data/", "/logs/"},
+ },
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: false,
+ },
+ {
+ name: "combined filters - path fails",
+ cfg: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 10,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ eventTypes: []string{"create", "delete"},
+ pathPrefixes: []string{"/logs/"},
+ },
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ shouldPublish := newFilter(tt.cfg).shouldPublish(tt.key, tt.notification)
+ if shouldPublish != tt.shouldPublish {
+ t.Errorf("Expected shouldPublish=%v, got %v", tt.shouldPublish, shouldPublish)
+ }
+ })
+ }
+}