aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/notification/webhook/filter.go64
-rw-r--r--weed/notification/webhook/filter_test.go225
-rw-r--r--weed/notification/webhook/http.go44
-rw-r--r--weed/notification/webhook/http_test.go12
-rw-r--r--weed/notification/webhook/types.go182
-rw-r--r--weed/notification/webhook/webhook_queue.go200
-rw-r--r--weed/notification/webhook/webhook_queue_test.go536
7 files changed, 1223 insertions, 40 deletions
diff --git a/weed/notification/webhook/filter.go b/weed/notification/webhook/filter.go
new file mode 100644
index 000000000..f346d6c93
--- /dev/null
+++ b/weed/notification/webhook/filter.go
@@ -0,0 +1,64 @@
+package webhook
+
+import (
+ "strings"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+)
+
+type filter struct {
+ eventTypes map[eventType]bool
+ pathPrefixes []string
+}
+
+func newFilter(cfg *config) *filter {
+ f := &filter{
+ eventTypes: make(map[eventType]bool),
+ pathPrefixes: cfg.pathPrefixes,
+ }
+
+ if len(cfg.eventTypes) == 0 {
+ f.eventTypes[eventTypeCreate] = true
+ f.eventTypes[eventTypeDelete] = true
+ f.eventTypes[eventTypeUpdate] = true
+ f.eventTypes[eventTypeRename] = true
+ } else {
+ for _, et := range cfg.eventTypes {
+ t := eventType(et)
+ if !t.valid() {
+ glog.Warningf("invalid event type: %v", t)
+
+ continue
+ }
+
+ f.eventTypes[t] = true
+ }
+ }
+
+ return f
+}
+
+func (f *filter) shouldPublish(key string, notification *filer_pb.EventNotification) bool {
+ if !f.matchesPath(key) {
+ return false
+ }
+
+ eventType := detectEventType(notification)
+
+ return f.eventTypes[eventType]
+}
+
+func (f *filter) matchesPath(key string) bool {
+ if len(f.pathPrefixes) == 0 {
+ return true
+ }
+
+ for _, prefix := range f.pathPrefixes {
+ if strings.HasPrefix(key, prefix) {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/weed/notification/webhook/filter_test.go b/weed/notification/webhook/filter_test.go
new file mode 100644
index 000000000..e95a085fe
--- /dev/null
+++ b/weed/notification/webhook/filter_test.go
@@ -0,0 +1,225 @@
+package webhook
+
+import (
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+)
+
+func TestFilterEventTypes(t *testing.T) {
+ tests := []struct {
+ name string
+ eventTypes []string
+ notification *filer_pb.EventNotification
+ expectedType eventType
+ shouldPublish bool
+ }{
+ {
+ name: "create event - allowed",
+ eventTypes: []string{"create", "delete"},
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "test.txt"},
+ },
+ expectedType: eventTypeCreate,
+ shouldPublish: true,
+ },
+ {
+ name: "create event - not allowed",
+ eventTypes: []string{"delete", "update"},
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "test.txt"},
+ },
+ expectedType: eventTypeCreate,
+ shouldPublish: false,
+ },
+ {
+ name: "delete event - allowed",
+ eventTypes: []string{"create", "delete"},
+ notification: &filer_pb.EventNotification{
+ OldEntry: &filer_pb.Entry{Name: "test.txt"},
+ },
+ expectedType: eventTypeDelete,
+ shouldPublish: true,
+ },
+ {
+ name: "update event - allowed",
+ eventTypes: []string{"update"},
+ notification: &filer_pb.EventNotification{
+ OldEntry: &filer_pb.Entry{Name: "test.txt"},
+ NewEntry: &filer_pb.Entry{Name: "test.txt"},
+ },
+ expectedType: eventTypeUpdate,
+ shouldPublish: true,
+ },
+ {
+ name: "rename event - allowed",
+ eventTypes: []string{"rename"},
+ notification: &filer_pb.EventNotification{
+ OldEntry: &filer_pb.Entry{Name: "old.txt"},
+ NewEntry: &filer_pb.Entry{Name: "new.txt"},
+ NewParentPath: "/new/path",
+ },
+ expectedType: eventTypeRename,
+ shouldPublish: true,
+ },
+ {
+ name: "rename event - not allowed",
+ eventTypes: []string{"create", "delete", "update"},
+ notification: &filer_pb.EventNotification{
+ OldEntry: &filer_pb.Entry{Name: "old.txt"},
+ NewEntry: &filer_pb.Entry{Name: "new.txt"},
+ NewParentPath: "/new/path",
+ },
+ expectedType: eventTypeRename,
+ shouldPublish: false,
+ },
+ {
+ name: "all events allowed when empty",
+ eventTypes: []string{},
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "test.txt"},
+ },
+ expectedType: eventTypeCreate,
+ shouldPublish: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ cfg := &config{eventTypes: tt.eventTypes}
+ f := newFilter(cfg)
+
+ eventType := detectEventType(tt.notification)
+ if eventType != tt.expectedType {
+ t.Errorf("detectEventType() = %v, want %v", eventType, tt.expectedType)
+ }
+
+ shouldPublish := f.shouldPublish("/test/path", tt.notification)
+ if shouldPublish != tt.shouldPublish {
+ t.Errorf("shouldPublish() = %v, want %v", shouldPublish, tt.shouldPublish)
+ }
+ })
+ }
+}
+
+func TestFilterPathPrefixes(t *testing.T) {
+ tests := []struct {
+ name string
+ pathPrefixes []string
+ key string
+ shouldPublish bool
+ }{
+ {
+ name: "matches single prefix",
+ pathPrefixes: []string{"/data/"},
+ key: "/data/file.txt",
+ shouldPublish: true,
+ },
+ {
+ name: "matches one of multiple prefixes",
+ pathPrefixes: []string{"/data/", "/logs/", "/tmp/"},
+ key: "/logs/app.log",
+ shouldPublish: true,
+ },
+ {
+ name: "no match",
+ pathPrefixes: []string{"/data/", "/logs/"},
+ key: "/other/file.txt",
+ shouldPublish: false,
+ },
+ {
+ name: "empty prefixes allows all",
+ pathPrefixes: []string{},
+ key: "/any/path/file.txt",
+ shouldPublish: true,
+ },
+ {
+ name: "exact prefix match",
+ pathPrefixes: []string{"/data"},
+ key: "/data",
+ shouldPublish: true,
+ },
+ {
+ name: "partial match not allowed",
+ pathPrefixes: []string{"/data/"},
+ key: "/database/file.txt",
+ shouldPublish: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ cfg := &config{
+ pathPrefixes: tt.pathPrefixes,
+ eventTypes: []string{"create"},
+ }
+ f := newFilter(cfg)
+
+ notification := &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "test.txt"},
+ }
+
+ shouldPublish := f.shouldPublish(tt.key, notification)
+ if shouldPublish != tt.shouldPublish {
+ t.Errorf("shouldPublish() = %v, want %v", shouldPublish, tt.shouldPublish)
+ }
+ })
+ }
+}
+
+func TestFilterCombined(t *testing.T) {
+ cfg := &config{
+ eventTypes: []string{"create", "update"},
+ pathPrefixes: []string{"/data/", "/logs/"},
+ }
+ f := newFilter(cfg)
+
+ tests := []struct {
+ name string
+ key string
+ notification *filer_pb.EventNotification
+ shouldPublish bool
+ }{
+ {
+ name: "allowed event and path",
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: true,
+ },
+ {
+ name: "allowed event but wrong path",
+ key: "/other/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: false,
+ },
+ {
+ name: "wrong event but allowed path",
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ OldEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: false,
+ },
+ {
+ name: "wrong event and wrong path",
+ key: "/other/file.txt",
+ notification: &filer_pb.EventNotification{
+ OldEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ shouldPublish := f.shouldPublish(tt.key, tt.notification)
+ if shouldPublish != tt.shouldPublish {
+ t.Errorf("shouldPublish() = %v, want %v", shouldPublish, tt.shouldPublish)
+ }
+ })
+ }
+}
diff --git a/weed/notification/webhook/http.go b/weed/notification/webhook/http.go
index 13b7f30d9..bb6a11a09 100644
--- a/weed/notification/webhook/http.go
+++ b/weed/notification/webhook/http.go
@@ -2,30 +2,43 @@ package webhook
import (
"bytes"
+ "context"
"encoding/json"
+ "errors"
"fmt"
+ "io"
"net/http"
+ "time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
- "google.golang.org/protobuf/proto"
)
type httpClient struct {
endpoint string
token string
+ timeout time.Duration
}
func newHTTPClient(cfg *config) (*httpClient, error) {
return &httpClient{
endpoint: cfg.endpoint,
token: cfg.authBearerToken,
+ timeout: time.Duration(cfg.timeoutSeconds) * time.Second,
}, nil
}
-func (h *httpClient) sendMessage(key string, message proto.Message) error {
+func (h *httpClient) sendMessage(message *webhookMessage) error {
+ // Serialize the protobuf message to JSON for HTTP payload
+ notificationData, err := json.Marshal(message.Notification)
+ if err != nil {
+ return fmt.Errorf("failed to marshal notification: %v", err)
+ }
+
payload := map[string]interface{}{
- "key": key,
- "message": message,
+ "key": message.Key,
+ "event_type": message.EventType,
+ "message": json.RawMessage(notificationData),
}
jsonData, err := json.Marshal(payload)
@@ -43,8 +56,18 @@ func (h *httpClient) sendMessage(key string, message proto.Message) error {
req.Header.Set("Authorization", "Bearer "+h.token)
}
+ if h.timeout > 0 {
+ ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
+ defer cancel()
+ req = req.WithContext(ctx)
+ }
+
resp, err := util_http.Do(req)
if err != nil {
+ if err = drainResponse(resp); err != nil {
+ glog.Errorf("failed to drain response: %v", err)
+ }
+
return fmt.Errorf("failed to send request: %v", err)
}
defer resp.Body.Close()
@@ -55,3 +78,16 @@ func (h *httpClient) sendMessage(key string, message proto.Message) error {
return nil
}
+
+func drainResponse(resp *http.Response) error {
+ if resp == nil || resp.Body == nil {
+ return nil
+ }
+
+ _, err := io.ReadAll(resp.Body)
+
+ return errors.Join(
+ err,
+ resp.Body.Close(),
+ )
+}
diff --git a/weed/notification/webhook/http_test.go b/weed/notification/webhook/http_test.go
index 5a008d2a5..f7ef006ae 100644
--- a/weed/notification/webhook/http_test.go
+++ b/weed/notification/webhook/http_test.go
@@ -48,7 +48,7 @@ func TestHttpClientSendMessage(t *testing.T) {
},
}
- err = client.sendMessage("/test/path", message)
+ err = client.sendMessage(newWebhookMessage("/test/path", message))
if err != nil {
t.Fatalf("Failed to send message: %v", err)
}
@@ -57,6 +57,10 @@ func TestHttpClientSendMessage(t *testing.T) {
t.Errorf("Expected key '/test/path', got %v", receivedPayload["key"])
}
+ if receivedPayload["event_type"] != "create" {
+ t.Errorf("Expected event_type 'create', got %v", receivedPayload["event_type"])
+ }
+
if receivedPayload["message"] == nil {
t.Error("Expected message to be present")
}
@@ -92,7 +96,7 @@ func TestHttpClientSendMessageWithoutToken(t *testing.T) {
message := &filer_pb.EventNotification{}
- err = client.sendMessage("/test/path", message)
+ err = client.sendMessage(newWebhookMessage("/test/path", message))
if err != nil {
t.Fatalf("Failed to send message: %v", err)
}
@@ -120,7 +124,7 @@ func TestHttpClientSendMessageServerError(t *testing.T) {
message := &filer_pb.EventNotification{}
- err = client.sendMessage("/test/path", message)
+ err = client.sendMessage(newWebhookMessage("/test/path", message))
if err == nil {
t.Error("Expected error for server error response")
}
@@ -139,7 +143,7 @@ func TestHttpClientSendMessageNetworkError(t *testing.T) {
message := &filer_pb.EventNotification{}
- err = client.sendMessage("/test/path", message)
+ err = client.sendMessage(newWebhookMessage("/test/path", message))
if err == nil {
t.Error("Expected error for network failure")
}
diff --git a/weed/notification/webhook/types.go b/weed/notification/webhook/types.go
new file mode 100644
index 000000000..5cd79c7da
--- /dev/null
+++ b/weed/notification/webhook/types.go
@@ -0,0 +1,182 @@
+package webhook
+
+import (
+ "fmt"
+ "net/url"
+ "slices"
+ "strconv"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "google.golang.org/protobuf/proto"
+)
+
+const (
+ queueName = "webhook"
+ pubSubTopicName = "webhook_topic"
+ deadLetterTopic = "webhook_dead_letter"
+)
+
+type eventType string
+
+const (
+ eventTypeCreate eventType = "create"
+ eventTypeDelete eventType = "delete"
+ eventTypeUpdate eventType = "update"
+ eventTypeRename eventType = "rename"
+)
+
+func (e eventType) valid() bool {
+ return slices.Contains([]eventType{
+ eventTypeCreate,
+ eventTypeDelete,
+ eventTypeUpdate,
+ eventTypeRename,
+ },
+ e,
+ )
+}
+
+var (
+ pubSubHandlerNameTemplate = func(n int) string {
+ return "webhook_handler_" + strconv.Itoa(n)
+ }
+)
+
+type client interface {
+ sendMessage(message *webhookMessage) error
+}
+
+type webhookMessage struct {
+ Key string `json:"key"`
+ EventType string `json:"event_type"`
+ Notification *filer_pb.EventNotification `json:"message_data"`
+}
+
+func newWebhookMessage(key string, message proto.Message) *webhookMessage {
+ notification, ok := message.(*filer_pb.EventNotification)
+ if !ok {
+ return nil
+ }
+
+ eventType := string(detectEventType(notification))
+
+ return &webhookMessage{
+ Key: key,
+ EventType: eventType,
+ Notification: notification,
+ }
+}
+
+type config struct {
+ endpoint string
+ authBearerToken string
+ timeoutSeconds int
+
+ maxRetries int
+ backoffSeconds int
+ maxBackoffSeconds int
+ nWorkers int
+ bufferSize int
+
+ eventTypes []string
+ pathPrefixes []string
+}
+
+func newConfigWithDefaults(configuration util.Configuration, prefix string) *config {
+ c := &config{
+ endpoint: configuration.GetString(prefix + "endpoint"),
+ authBearerToken: configuration.GetString(prefix + "bearer_token"),
+ timeoutSeconds: 10,
+ maxRetries: 3,
+ backoffSeconds: 3,
+ maxBackoffSeconds: 30,
+ nWorkers: 5,
+ bufferSize: 10_000,
+ }
+
+ if bufferSize := configuration.GetInt(prefix + "buffer_size"); bufferSize > 0 {
+ c.bufferSize = bufferSize
+ }
+ if workers := configuration.GetInt(prefix + "workers"); workers > 0 {
+ c.nWorkers = workers
+ }
+ if maxRetries := configuration.GetInt(prefix + "max_retries"); maxRetries > 0 {
+ c.maxRetries = maxRetries
+ }
+ if backoffSeconds := configuration.GetInt(prefix + "backoff_seconds"); backoffSeconds > 0 {
+ c.backoffSeconds = backoffSeconds
+ }
+ if maxBackoffSeconds := configuration.GetInt(prefix + "max_backoff_seconds"); maxBackoffSeconds > 0 {
+ c.maxBackoffSeconds = maxBackoffSeconds
+ }
+ if timeout := configuration.GetInt(prefix + "timeout_seconds"); timeout > 0 {
+ c.timeoutSeconds = timeout
+ }
+
+ c.eventTypes = configuration.GetStringSlice(prefix + "event_types")
+ c.pathPrefixes = configuration.GetStringSlice(prefix + "path_prefixes")
+
+ return c
+}
+
+func (c *config) validate() error {
+ if c.endpoint == "" {
+ return fmt.Errorf("webhook endpoint is required")
+ }
+
+ _, err := url.Parse(c.endpoint)
+ if err != nil {
+ return fmt.Errorf("invalid webhook endpoint: %w", err)
+ }
+
+ if c.timeoutSeconds < 1 || c.timeoutSeconds > 300 {
+ return fmt.Errorf("timeout must be between 1 and 300 seconds, got %d", c.timeoutSeconds)
+ }
+
+ if c.maxRetries < 0 || c.maxRetries > 10 {
+ return fmt.Errorf("max retries must be between 0 and 10, got %d", c.maxRetries)
+ }
+
+ if c.backoffSeconds < 1 || c.backoffSeconds > 60 {
+ return fmt.Errorf("backoff seconds must be between 1 and 60, got %d", c.backoffSeconds)
+ }
+
+ if c.maxBackoffSeconds < c.backoffSeconds || c.maxBackoffSeconds > 300 {
+ return fmt.Errorf("max backoff seconds must be between %d and 300, got %d", c.backoffSeconds, c.maxBackoffSeconds)
+ }
+
+ if c.nWorkers < 1 || c.nWorkers > 100 {
+ return fmt.Errorf("workers must be between 1 and 100, got %d", c.nWorkers)
+ }
+
+ if c.bufferSize < 100 || c.bufferSize > 1_000_000 {
+ return fmt.Errorf("buffer size must be between 100 and 1,000,000, got %d", c.bufferSize)
+ }
+
+ return nil
+}
+
+func detectEventType(notification *filer_pb.EventNotification) eventType {
+ hasOldEntry := notification.OldEntry != nil
+ hasNewEntry := notification.NewEntry != nil
+ hasNewParentPath := notification.NewParentPath != ""
+
+ if !hasOldEntry && hasNewEntry {
+ return eventTypeCreate
+ }
+
+ if hasOldEntry && !hasNewEntry {
+ return eventTypeDelete
+ }
+
+ if hasOldEntry && hasNewEntry {
+ if hasNewParentPath {
+ return eventTypeRename
+ }
+
+ return eventTypeUpdate
+ }
+
+ return eventTypeUpdate
+}
diff --git a/weed/notification/webhook/webhook_queue.go b/weed/notification/webhook/webhook_queue.go
index d209b74e2..d8f9a0734 100644
--- a/weed/notification/webhook/webhook_queue.go
+++ b/weed/notification/webhook/webhook_queue.go
@@ -1,51 +1,81 @@
package webhook
import (
+ "context"
+ "errors"
"fmt"
- "net/url"
+ "time"
+ "github.com/ThreeDotsLabs/watermill"
+ "github.com/ThreeDotsLabs/watermill/message"
+ "github.com/ThreeDotsLabs/watermill/message/router/middleware"
+ "github.com/ThreeDotsLabs/watermill/message/router/plugin"
+ "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/notification"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/protobuf/proto"
)
-// client defines the interface for transport client
-// could be extended to support gRPC
-type client interface {
- sendMessage(key string, message proto.Message) error
-}
-
func init() {
- notification.MessageQueues = append(notification.MessageQueues, &WebhookQueue{})
+ notification.MessageQueues = append(notification.MessageQueues, &Queue{})
}
-type WebhookQueue struct {
- client client
+type Queue struct {
+ router *message.Router
+ queueChannel *gochannel.GoChannel
+ config *config
+ client client
+ filter *filter
+
+ ctx context.Context
+ cancel context.CancelFunc
}
-type config struct {
- endpoint string
- authBearerToken string
+func (w *Queue) GetName() string {
+ return queueName
}
-func (c *config) validate() error {
- _, err := url.Parse(c.endpoint)
+func (w *Queue) SendMessage(key string, msg proto.Message) error {
+ eventNotification, ok := msg.(*filer_pb.EventNotification)
+ if !ok {
+ return nil
+ }
+
+ if w.filter != nil && !w.filter.shouldPublish(key, eventNotification) {
+ return nil
+ }
+
+ m := newWebhookMessage(key, msg)
+ if m == nil {
+ return nil
+ }
+
+ wMsg, err := m.toWaterMillMessage()
if err != nil {
- return fmt.Errorf("invalid webhook endpoint %w", err)
+ return err
}
- return nil
+ return w.queueChannel.Publish(pubSubTopicName, wMsg)
}
-func (w *WebhookQueue) GetName() string {
- return "webhook"
+func (w *webhookMessage) toWaterMillMessage() (*message.Message, error) {
+ payload, err := proto.Marshal(w.Notification)
+ if err != nil {
+ return nil, err
+ }
+
+ msg := message.NewMessage(watermill.NewUUID(), payload)
+ // Set event type and key as metadata
+ msg.Metadata.Set("event_type", w.EventType)
+ msg.Metadata.Set("key", w.Key)
+
+ return msg, nil
}
-func (w *WebhookQueue) Initialize(configuration util.Configuration, prefix string) error {
- c := &config{
- endpoint: configuration.GetString(prefix + "endpoint"),
- authBearerToken: configuration.GetString(prefix + "bearer_token"),
- }
+func (w *Queue) Initialize(configuration util.Configuration, prefix string) error {
+ c := newConfigWithDefaults(configuration, prefix)
if err := c.validate(); err != nil {
return err
@@ -54,18 +84,124 @@ func (w *WebhookQueue) Initialize(configuration util.Configuration, prefix strin
return w.initialize(c)
}
-func (w *WebhookQueue) initialize(cfg *config) error {
- client, err := newHTTPClient(cfg)
+func (w *Queue) initialize(cfg *config) error {
+ w.ctx, w.cancel = context.WithCancel(context.Background())
+ w.config = cfg
+ w.filter = newFilter(cfg)
+
+ hClient, err := newHTTPClient(cfg)
+ if err != nil {
+ return fmt.Errorf("failed to create webhook http client: %w", err)
+ }
+ w.client = hClient
+
+ if err = w.setupWatermillQueue(cfg); err != nil {
+ return fmt.Errorf("failed to setup watermill queue: %w", err)
+ }
+ if err = w.logDeadLetterMessages(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (w *Queue) setupWatermillQueue(cfg *config) error {
+ logger := watermill.NewStdLogger(false, false)
+ pubSubConfig := gochannel.Config{
+ OutputChannelBuffer: int64(cfg.bufferSize),
+ Persistent: false,
+ }
+ w.queueChannel = gochannel.NewGoChannel(pubSubConfig, logger)
+
+ router, err := message.NewRouter(
+ message.RouterConfig{
+ CloseTimeout: 60 * time.Second,
+ },
+ logger,
+ )
if err != nil {
- return fmt.Errorf("failed to create webhook client: %v", err)
+ return fmt.Errorf("failed to create router: %v", err)
+ }
+ w.router = router
+
+ retryMiddleware := middleware.Retry{
+ MaxRetries: cfg.maxRetries,
+ InitialInterval: time.Duration(cfg.backoffSeconds) * time.Second,
+ MaxInterval: time.Duration(cfg.maxBackoffSeconds) * time.Second,
+ Multiplier: 2.0,
+ RandomizationFactor: 0.3,
+ Logger: logger,
+ }.Middleware
+
+ poisonQueue, err := middleware.PoisonQueue(w.queueChannel, deadLetterTopic)
+ if err != nil {
+ return fmt.Errorf("failed to create poison queue: %v", err)
+ }
+
+ router.AddPlugin(plugin.SignalsHandler)
+ router.AddMiddleware(retryMiddleware, poisonQueue)
+
+ for i := 0; i < cfg.nWorkers; i++ {
+ router.AddNoPublisherHandler(
+ pubSubHandlerNameTemplate(i),
+ pubSubTopicName,
+ w.queueChannel,
+ w.handleWebhook,
+ )
+ }
+
+ go func() {
+ // cancels the queue context so the dead letter logger exists in case context not canceled by the shutdown signal already
+ defer w.cancel()
+
+ if err := router.Run(w.ctx); err != nil && !errors.Is(err, context.Canceled) {
+ glog.Errorf("webhook pubsub worker stopped with error: %v", err)
+ }
+
+ glog.Info("webhook pubsub worker stopped")
+ }()
+
+ return nil
+}
+
+func (w *Queue) handleWebhook(msg *message.Message) error {
+ var n filer_pb.EventNotification
+ if err := proto.Unmarshal(msg.Payload, &n); err != nil {
+ glog.Errorf("failed to unmarshal protobuf message: %v", err)
+ return err
+ }
+
+ // Reconstruct webhook message from metadata and payload
+ webhookMsg := &webhookMessage{
+ Key: msg.Metadata.Get("key"),
+ EventType: msg.Metadata.Get("event_type"),
+ Notification: &n,
+ }
+
+ if err := w.client.sendMessage(webhookMsg); err != nil {
+ glog.Errorf("failed to send message to webhook %s: %v", webhookMsg.Key, err)
+ return err
}
- w.client = client
+
return nil
}
-func (w *WebhookQueue) SendMessage(key string, message proto.Message) error {
- if w.client == nil {
- return fmt.Errorf("webhook client not initialized")
+func (w *Queue) logDeadLetterMessages() error {
+ ch, err := w.queueChannel.Subscribe(w.ctx, deadLetterTopic)
+ if err != nil {
+ return err
}
- return w.client.sendMessage(key, message)
+
+ go func() {
+ for {
+ select {
+ case msg := <-ch:
+ glog.Errorf("received dead letter message: %s, key: %s", string(msg.Payload), msg.Metadata["key"])
+ case <-w.ctx.Done():
+ return
+ }
+ }
+ }()
+
+ return nil
}
diff --git a/weed/notification/webhook/webhook_queue_test.go b/weed/notification/webhook/webhook_queue_test.go
new file mode 100644
index 000000000..52a290149
--- /dev/null
+++ b/weed/notification/webhook/webhook_queue_test.go
@@ -0,0 +1,536 @@
+package webhook
+
+import (
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "google.golang.org/protobuf/proto"
+)
+
+func TestConfigValidation(t *testing.T) {
+ tests := []struct {
+ name string
+ config *config
+ wantErr bool
+ errMsg string
+ }{
+ {
+ name: "valid config",
+ config: &config{
+ endpoint: "https://example.com/webhook",
+ authBearerToken: "test-token",
+ timeoutSeconds: 30,
+ maxRetries: 3,
+ backoffSeconds: 5,
+ maxBackoffSeconds: 30,
+ nWorkers: 5,
+ bufferSize: 10000,
+ },
+ wantErr: false,
+ },
+ {
+ name: "empty endpoint",
+ config: &config{
+ endpoint: "",
+ timeoutSeconds: 30,
+ maxRetries: 3,
+ backoffSeconds: 5,
+ maxBackoffSeconds: 30,
+ nWorkers: 5,
+ bufferSize: 10000,
+ },
+ wantErr: true,
+ errMsg: "endpoint is required",
+ },
+ {
+ name: "invalid URL",
+ config: &config{
+ endpoint: "://invalid-url",
+ timeoutSeconds: 30,
+ maxRetries: 3,
+ backoffSeconds: 5,
+ maxBackoffSeconds: 30,
+ nWorkers: 5,
+ bufferSize: 10000,
+ },
+ wantErr: true,
+ errMsg: "invalid webhook endpoint",
+ },
+ {
+ name: "timeout too large",
+ config: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 301,
+ maxRetries: 3,
+ backoffSeconds: 5,
+ maxBackoffSeconds: 30,
+ nWorkers: 5,
+ bufferSize: 10000,
+ },
+ wantErr: true,
+ errMsg: "timeout must be between",
+ },
+ {
+ name: "too many retries",
+ config: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 30,
+ maxRetries: 11,
+ backoffSeconds: 5,
+ maxBackoffSeconds: 30,
+ nWorkers: 5,
+ bufferSize: 10000,
+ },
+ wantErr: true,
+ errMsg: "max retries must be between",
+ },
+ {
+ name: "too many workers",
+ config: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 30,
+ maxRetries: 3,
+ backoffSeconds: 5,
+ maxBackoffSeconds: 30,
+ nWorkers: 101,
+ bufferSize: 10000,
+ },
+ wantErr: true,
+ errMsg: "workers must be between",
+ },
+ {
+ name: "buffer too large",
+ config: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 30,
+ maxRetries: 3,
+ backoffSeconds: 5,
+ maxBackoffSeconds: 30,
+ nWorkers: 5,
+ bufferSize: 1000001,
+ },
+ wantErr: true,
+ errMsg: "buffer size must be between",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ err := tt.config.validate()
+ if (err != nil) != tt.wantErr {
+ t.Errorf("validate() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ if err != nil && tt.errMsg != "" {
+ if err.Error() == "" || !strings.Contains(err.Error(), tt.errMsg) {
+ t.Errorf("validate() error message = %v, want to contain %v", err.Error(), tt.errMsg)
+ }
+ }
+ })
+ }
+}
+
+func TestWebhookMessageSerialization(t *testing.T) {
+ msg := &filer_pb.EventNotification{
+ OldEntry: nil,
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ IsDirectory: false,
+ },
+ }
+
+ webhookMsg := newWebhookMessage("/test/path", msg)
+
+ wmMsg, err := webhookMsg.toWaterMillMessage()
+ if err != nil {
+ t.Fatalf("Failed to convert to watermill message: %v", err)
+ }
+
+ // Unmarshal the protobuf payload directly
+ var eventNotification filer_pb.EventNotification
+ err = proto.Unmarshal(wmMsg.Payload, &eventNotification)
+ if err != nil {
+ t.Fatalf("Failed to unmarshal protobuf message: %v", err)
+ }
+
+ // Check metadata
+ if wmMsg.Metadata.Get("key") != "/test/path" {
+ t.Errorf("Expected key '/test/path', got %v", wmMsg.Metadata.Get("key"))
+ }
+
+ if wmMsg.Metadata.Get("event_type") != "create" {
+ t.Errorf("Expected event type 'create', got %v", wmMsg.Metadata.Get("event_type"))
+ }
+
+ if eventNotification.NewEntry.Name != "test.txt" {
+ t.Errorf("Expected file name 'test.txt', got %v", eventNotification.NewEntry.Name)
+ }
+}
+
+func TestQueueInitialize(t *testing.T) {
+ cfg := &config{
+ endpoint: "https://example.com/webhook",
+ authBearerToken: "test-token",
+ timeoutSeconds: 10,
+ maxRetries: 3,
+ backoffSeconds: 3,
+ maxBackoffSeconds: 60,
+ nWorkers: 1,
+ bufferSize: 100,
+ }
+
+ q := &Queue{}
+ err := q.initialize(cfg)
+ if err != nil {
+ t.Errorf("Initialize() error = %v", err)
+ }
+
+ defer func() {
+ if q.cancel != nil {
+ q.cancel()
+ }
+ time.Sleep(100 * time.Millisecond)
+ if q.router != nil {
+ q.router.Close()
+ }
+ }()
+
+ if q.router == nil {
+ t.Error("Expected router to be initialized")
+ }
+ if q.queueChannel == nil {
+ t.Error("Expected queueChannel to be initialized")
+ }
+ if q.client == nil {
+ t.Error("Expected client to be initialized")
+ }
+ if q.config == nil {
+ t.Error("Expected config to be initialized")
+ }
+}
+
+// TestQueueSendMessage test sending messages to the queue
+func TestQueueSendMessage(t *testing.T) {
+ cfg := &config{
+ endpoint: "https://example.com/webhook",
+ authBearerToken: "test-token",
+ timeoutSeconds: 1,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ }
+
+ q := &Queue{}
+ err := q.initialize(cfg)
+ if err != nil {
+ t.Fatalf("Failed to initialize queue: %v", err)
+ }
+
+ defer func() {
+ if q.cancel != nil {
+ q.cancel()
+ }
+ time.Sleep(100 * time.Millisecond)
+ if q.router != nil {
+ q.router.Close()
+ }
+ }()
+
+ msg := &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ },
+ }
+
+ err = q.SendMessage("/test/path", msg)
+ if err != nil {
+ t.Errorf("SendMessage() error = %v", err)
+ }
+}
+
+func TestQueueHandleWebhook(t *testing.T) {
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ }))
+ defer server.Close()
+
+ cfg := &config{
+ endpoint: server.URL,
+ authBearerToken: "test-token",
+ timeoutSeconds: 1,
+ maxRetries: 0,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ }
+
+ client, _ := newHTTPClient(cfg)
+ q := &Queue{
+ client: client,
+ }
+
+ message := newWebhookMessage("/test/path", &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ },
+ })
+
+ wmMsg, err := message.toWaterMillMessage()
+ if err != nil {
+ t.Fatalf("Failed to create watermill message: %v", err)
+ }
+
+ err = q.handleWebhook(wmMsg)
+ if err != nil {
+ t.Errorf("handleWebhook() error = %v", err)
+ }
+}
+
+func TestQueueEndToEnd(t *testing.T) {
+ // Simplified test - just verify the queue can be created and message can be sent
+ // without needing full end-to-end processing
+ cfg := &config{
+ endpoint: "https://example.com/webhook",
+ authBearerToken: "test-token",
+ timeoutSeconds: 1,
+ maxRetries: 0,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ }
+
+ q := &Queue{}
+ err := q.initialize(cfg)
+ if err != nil {
+ t.Fatalf("Failed to initialize queue: %v", err)
+ }
+
+ defer func() {
+ if q.cancel != nil {
+ q.cancel()
+ }
+ time.Sleep(100 * time.Millisecond)
+ if q.router != nil {
+ q.router.Close()
+ }
+ }()
+
+ msg := &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{
+ Name: "test.txt",
+ },
+ }
+
+ err = q.SendMessage("/test/path", msg)
+ if err != nil {
+ t.Errorf("SendMessage() error = %v", err)
+ }
+}
+
+func TestQueueRetryMechanism(t *testing.T) {
+ cfg := &config{
+ endpoint: "https://example.com/webhook",
+ authBearerToken: "test-token",
+ timeoutSeconds: 1,
+ maxRetries: 3, // Test that this config is used
+ backoffSeconds: 2,
+ maxBackoffSeconds: 10,
+ nWorkers: 1,
+ bufferSize: 10,
+ }
+
+ q := &Queue{}
+ err := q.initialize(cfg)
+ if err != nil {
+ t.Fatalf("Failed to initialize queue: %v", err)
+ }
+
+ defer func() {
+ if q.cancel != nil {
+ q.cancel()
+ }
+ time.Sleep(100 * time.Millisecond)
+ if q.router != nil {
+ q.router.Close()
+ }
+ }()
+
+ // Verify that the queue is properly configured for retries
+ if q.config.maxRetries != 3 {
+ t.Errorf("Expected maxRetries=3, got %d", q.config.maxRetries)
+ }
+
+ if q.config.backoffSeconds != 2 {
+ t.Errorf("Expected backoffSeconds=2, got %d", q.config.backoffSeconds)
+ }
+
+ if q.config.maxBackoffSeconds != 10 {
+ t.Errorf("Expected maxBackoffSeconds=10, got %d", q.config.maxBackoffSeconds)
+ }
+
+ // Test that we can send a message (retry behavior is handled by Watermill middleware)
+ msg := &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "test.txt"},
+ }
+
+ err = q.SendMessage("/test/retry", msg)
+ if err != nil {
+ t.Errorf("SendMessage() error = %v", err)
+ }
+}
+
+func TestQueueSendMessageWithFilter(t *testing.T) {
+ tests := []struct {
+ name string
+ cfg *config
+ key string
+ notification *filer_pb.EventNotification
+ shouldPublish bool
+ }{
+ {
+ name: "allowed event type",
+ cfg: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 10,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ eventTypes: []string{"create"},
+ },
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: true,
+ },
+ {
+ name: "filtered event type",
+ cfg: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 10,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ eventTypes: []string{"update", "rename"},
+ },
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: false,
+ },
+ {
+ name: "allowed path prefix",
+ cfg: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 10,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ pathPrefixes: []string{"/data/"},
+ },
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: true,
+ },
+ {
+ name: "filtered path prefix",
+ cfg: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 10,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ pathPrefixes: []string{"/logs/"},
+ },
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: false,
+ },
+ {
+ name: "combined filters - both pass",
+ cfg: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 10,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ eventTypes: []string{"create", "delete"},
+ pathPrefixes: []string{"/data/", "/logs/"},
+ },
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: true,
+ },
+ {
+ name: "combined filters - event fails",
+ cfg: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 10,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ eventTypes: []string{"update", "delete"},
+ pathPrefixes: []string{"/data/", "/logs/"},
+ },
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: false,
+ },
+ {
+ name: "combined filters - path fails",
+ cfg: &config{
+ endpoint: "https://example.com/webhook",
+ timeoutSeconds: 10,
+ maxRetries: 1,
+ backoffSeconds: 1,
+ maxBackoffSeconds: 1,
+ nWorkers: 1,
+ bufferSize: 10,
+ eventTypes: []string{"create", "delete"},
+ pathPrefixes: []string{"/logs/"},
+ },
+ key: "/data/file.txt",
+ notification: &filer_pb.EventNotification{
+ NewEntry: &filer_pb.Entry{Name: "file.txt"},
+ },
+ shouldPublish: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ shouldPublish := newFilter(tt.cfg).shouldPublish(tt.key, tt.notification)
+ if shouldPublish != tt.shouldPublish {
+ t.Errorf("Expected shouldPublish=%v, got %v", tt.shouldPublish, shouldPublish)
+ }
+ })
+ }
+}