diff options
| author | Ibrahim Konsowa <imkonsowa@gmail.com> | 2025-07-15 21:49:37 +0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-15 10:49:37 -0700 |
| commit | d78aa3d2de1a18c9802f72c926efa8e80ff2fa64 (patch) | |
| tree | 7980858236682e2cdb3f91f6ed3f9eeaf4843b47 /weed/notification/webhook/webhook_queue_test.go | |
| parent | 74f4e9ba5ab691938e89b32017a89359ae49428f (diff) | |
| download | seaweedfs-d78aa3d2de1a18c9802f72c926efa8e80ff2fa64.tar.xz seaweedfs-d78aa3d2de1a18c9802f72c926efa8e80ff2fa64.zip | |
[Notifications] Improving webhook notifications (#6965)
* worker setup
* fix tests
* start worker
* graceful worker drain
* retry queue
* migrate queue to watermill
* adding filters and improvements
* add the event type to the webhook message
* eliminating redundant JSON serialization
* resolve review comments
* trigger actions
* fix tests
* typo fixes
* read max_backoff_seconds from config
* add more context to the dead letter
* close the http response on errors
* drain the http response body in case not empty
* eliminate exported typesπ
Diffstat (limited to 'weed/notification/webhook/webhook_queue_test.go')
| -rw-r--r-- | weed/notification/webhook/webhook_queue_test.go | 536 |
1 files changed, 536 insertions, 0 deletions
diff --git a/weed/notification/webhook/webhook_queue_test.go b/weed/notification/webhook/webhook_queue_test.go new file mode 100644 index 000000000..52a290149 --- /dev/null +++ b/weed/notification/webhook/webhook_queue_test.go @@ -0,0 +1,536 @@ +package webhook + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "google.golang.org/protobuf/proto" +) + +func TestConfigValidation(t *testing.T) { + tests := []struct { + name string + config *config + wantErr bool + errMsg string + }{ + { + name: "valid config", + config: &config{ + endpoint: "https://example.com/webhook", + authBearerToken: "test-token", + timeoutSeconds: 30, + maxRetries: 3, + backoffSeconds: 5, + maxBackoffSeconds: 30, + nWorkers: 5, + bufferSize: 10000, + }, + wantErr: false, + }, + { + name: "empty endpoint", + config: &config{ + endpoint: "", + timeoutSeconds: 30, + maxRetries: 3, + backoffSeconds: 5, + maxBackoffSeconds: 30, + nWorkers: 5, + bufferSize: 10000, + }, + wantErr: true, + errMsg: "endpoint is required", + }, + { + name: "invalid URL", + config: &config{ + endpoint: "://invalid-url", + timeoutSeconds: 30, + maxRetries: 3, + backoffSeconds: 5, + maxBackoffSeconds: 30, + nWorkers: 5, + bufferSize: 10000, + }, + wantErr: true, + errMsg: "invalid webhook endpoint", + }, + { + name: "timeout too large", + config: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 301, + maxRetries: 3, + backoffSeconds: 5, + maxBackoffSeconds: 30, + nWorkers: 5, + bufferSize: 10000, + }, + wantErr: true, + errMsg: "timeout must be between", + }, + { + name: "too many retries", + config: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 30, + maxRetries: 11, + backoffSeconds: 5, + maxBackoffSeconds: 30, + nWorkers: 5, + bufferSize: 10000, + }, + wantErr: true, + errMsg: "max retries must be between", + }, + { + name: "too many workers", + config: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 30, + maxRetries: 3, + backoffSeconds: 5, + maxBackoffSeconds: 30, + nWorkers: 101, + bufferSize: 10000, + }, + wantErr: true, + errMsg: "workers must be between", + }, + { + name: "buffer too large", + config: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 30, + maxRetries: 3, + backoffSeconds: 5, + maxBackoffSeconds: 30, + nWorkers: 5, + bufferSize: 1000001, + }, + wantErr: true, + errMsg: "buffer size must be between", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.config.validate() + if (err != nil) != tt.wantErr { + t.Errorf("validate() error = %v, wantErr %v", err, tt.wantErr) + } + if err != nil && tt.errMsg != "" { + if err.Error() == "" || !strings.Contains(err.Error(), tt.errMsg) { + t.Errorf("validate() error message = %v, want to contain %v", err.Error(), tt.errMsg) + } + } + }) + } +} + +func TestWebhookMessageSerialization(t *testing.T) { + msg := &filer_pb.EventNotification{ + OldEntry: nil, + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + IsDirectory: false, + }, + } + + webhookMsg := newWebhookMessage("/test/path", msg) + + wmMsg, err := webhookMsg.toWaterMillMessage() + if err != nil { + t.Fatalf("Failed to convert to watermill message: %v", err) + } + + // Unmarshal the protobuf payload directly + var eventNotification filer_pb.EventNotification + err = proto.Unmarshal(wmMsg.Payload, &eventNotification) + if err != nil { + t.Fatalf("Failed to unmarshal protobuf message: %v", err) + } + + // Check metadata + if wmMsg.Metadata.Get("key") != "/test/path" { + t.Errorf("Expected key '/test/path', got %v", wmMsg.Metadata.Get("key")) + } + + if wmMsg.Metadata.Get("event_type") != "create" { + t.Errorf("Expected event type 'create', got %v", wmMsg.Metadata.Get("event_type")) + } + + if eventNotification.NewEntry.Name != "test.txt" { + t.Errorf("Expected file name 'test.txt', got %v", eventNotification.NewEntry.Name) + } +} + +func TestQueueInitialize(t *testing.T) { + cfg := &config{ + endpoint: "https://example.com/webhook", + authBearerToken: "test-token", + timeoutSeconds: 10, + maxRetries: 3, + backoffSeconds: 3, + maxBackoffSeconds: 60, + nWorkers: 1, + bufferSize: 100, + } + + q := &Queue{} + err := q.initialize(cfg) + if err != nil { + t.Errorf("Initialize() error = %v", err) + } + + defer func() { + if q.cancel != nil { + q.cancel() + } + time.Sleep(100 * time.Millisecond) + if q.router != nil { + q.router.Close() + } + }() + + if q.router == nil { + t.Error("Expected router to be initialized") + } + if q.queueChannel == nil { + t.Error("Expected queueChannel to be initialized") + } + if q.client == nil { + t.Error("Expected client to be initialized") + } + if q.config == nil { + t.Error("Expected config to be initialized") + } +} + +// TestQueueSendMessage test sending messages to the queue +func TestQueueSendMessage(t *testing.T) { + cfg := &config{ + endpoint: "https://example.com/webhook", + authBearerToken: "test-token", + timeoutSeconds: 1, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + } + + q := &Queue{} + err := q.initialize(cfg) + if err != nil { + t.Fatalf("Failed to initialize queue: %v", err) + } + + defer func() { + if q.cancel != nil { + q.cancel() + } + time.Sleep(100 * time.Millisecond) + if q.router != nil { + q.router.Close() + } + }() + + msg := &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + }, + } + + err = q.SendMessage("/test/path", msg) + if err != nil { + t.Errorf("SendMessage() error = %v", err) + } +} + +func TestQueueHandleWebhook(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + cfg := &config{ + endpoint: server.URL, + authBearerToken: "test-token", + timeoutSeconds: 1, + maxRetries: 0, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + } + + client, _ := newHTTPClient(cfg) + q := &Queue{ + client: client, + } + + message := newWebhookMessage("/test/path", &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + }, + }) + + wmMsg, err := message.toWaterMillMessage() + if err != nil { + t.Fatalf("Failed to create watermill message: %v", err) + } + + err = q.handleWebhook(wmMsg) + if err != nil { + t.Errorf("handleWebhook() error = %v", err) + } +} + +func TestQueueEndToEnd(t *testing.T) { + // Simplified test - just verify the queue can be created and message can be sent + // without needing full end-to-end processing + cfg := &config{ + endpoint: "https://example.com/webhook", + authBearerToken: "test-token", + timeoutSeconds: 1, + maxRetries: 0, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + } + + q := &Queue{} + err := q.initialize(cfg) + if err != nil { + t.Fatalf("Failed to initialize queue: %v", err) + } + + defer func() { + if q.cancel != nil { + q.cancel() + } + time.Sleep(100 * time.Millisecond) + if q.router != nil { + q.router.Close() + } + }() + + msg := &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{ + Name: "test.txt", + }, + } + + err = q.SendMessage("/test/path", msg) + if err != nil { + t.Errorf("SendMessage() error = %v", err) + } +} + +func TestQueueRetryMechanism(t *testing.T) { + cfg := &config{ + endpoint: "https://example.com/webhook", + authBearerToken: "test-token", + timeoutSeconds: 1, + maxRetries: 3, // Test that this config is used + backoffSeconds: 2, + maxBackoffSeconds: 10, + nWorkers: 1, + bufferSize: 10, + } + + q := &Queue{} + err := q.initialize(cfg) + if err != nil { + t.Fatalf("Failed to initialize queue: %v", err) + } + + defer func() { + if q.cancel != nil { + q.cancel() + } + time.Sleep(100 * time.Millisecond) + if q.router != nil { + q.router.Close() + } + }() + + // Verify that the queue is properly configured for retries + if q.config.maxRetries != 3 { + t.Errorf("Expected maxRetries=3, got %d", q.config.maxRetries) + } + + if q.config.backoffSeconds != 2 { + t.Errorf("Expected backoffSeconds=2, got %d", q.config.backoffSeconds) + } + + if q.config.maxBackoffSeconds != 10 { + t.Errorf("Expected maxBackoffSeconds=10, got %d", q.config.maxBackoffSeconds) + } + + // Test that we can send a message (retry behavior is handled by Watermill middleware) + msg := &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "test.txt"}, + } + + err = q.SendMessage("/test/retry", msg) + if err != nil { + t.Errorf("SendMessage() error = %v", err) + } +} + +func TestQueueSendMessageWithFilter(t *testing.T) { + tests := []struct { + name string + cfg *config + key string + notification *filer_pb.EventNotification + shouldPublish bool + }{ + { + name: "allowed event type", + cfg: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 10, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + eventTypes: []string{"create"}, + }, + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: true, + }, + { + name: "filtered event type", + cfg: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 10, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + eventTypes: []string{"update", "rename"}, + }, + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: false, + }, + { + name: "allowed path prefix", + cfg: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 10, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + pathPrefixes: []string{"/data/"}, + }, + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: true, + }, + { + name: "filtered path prefix", + cfg: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 10, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + pathPrefixes: []string{"/logs/"}, + }, + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: false, + }, + { + name: "combined filters - both pass", + cfg: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 10, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + eventTypes: []string{"create", "delete"}, + pathPrefixes: []string{"/data/", "/logs/"}, + }, + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: true, + }, + { + name: "combined filters - event fails", + cfg: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 10, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + eventTypes: []string{"update", "delete"}, + pathPrefixes: []string{"/data/", "/logs/"}, + }, + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: false, + }, + { + name: "combined filters - path fails", + cfg: &config{ + endpoint: "https://example.com/webhook", + timeoutSeconds: 10, + maxRetries: 1, + backoffSeconds: 1, + maxBackoffSeconds: 1, + nWorkers: 1, + bufferSize: 10, + eventTypes: []string{"create", "delete"}, + pathPrefixes: []string{"/logs/"}, + }, + key: "/data/file.txt", + notification: &filer_pb.EventNotification{ + NewEntry: &filer_pb.Entry{Name: "file.txt"}, + }, + shouldPublish: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + shouldPublish := newFilter(tt.cfg).shouldPublish(tt.key, tt.notification) + if shouldPublish != tt.shouldPublish { + t.Errorf("Expected shouldPublish=%v, got %v", tt.shouldPublish, shouldPublish) + } + }) + } +} |
