diff options
| author | Ibrahim Konsowa <imkonsowa@gmail.com> | 2025-07-15 21:49:37 +0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-15 10:49:37 -0700 |
| commit | d78aa3d2de1a18c9802f72c926efa8e80ff2fa64 (patch) | |
| tree | 7980858236682e2cdb3f91f6ed3f9eeaf4843b47 /weed/notification/webhook/http.go | |
| parent | 74f4e9ba5ab691938e89b32017a89359ae49428f (diff) | |
| download | seaweedfs-d78aa3d2de1a18c9802f72c926efa8e80ff2fa64.tar.xz seaweedfs-d78aa3d2de1a18c9802f72c926efa8e80ff2fa64.zip | |
[Notifications] Improving webhook notifications (#6965)
* worker setup
* fix tests
* start worker
* graceful worker drain
* retry queue
* migrate queue to watermill
* adding filters and improvements
* add the event type to the webhook message
* eliminating redundant JSON serialization
* resolve review comments
* trigger actions
* fix tests
* typo fixes
* read max_backoff_seconds from config
* add more context to the dead letter
* close the http response on errors
* drain the http response body in case not empty
* eliminate exported typesπ
Diffstat (limited to 'weed/notification/webhook/http.go')
| -rw-r--r-- | weed/notification/webhook/http.go | 44 |
1 files changed, 40 insertions, 4 deletions
diff --git a/weed/notification/webhook/http.go b/weed/notification/webhook/http.go index 13b7f30d9..bb6a11a09 100644 --- a/weed/notification/webhook/http.go +++ b/weed/notification/webhook/http.go @@ -2,30 +2,43 @@ package webhook import ( "bytes" + "context" "encoding/json" + "errors" "fmt" + "io" "net/http" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" - "google.golang.org/protobuf/proto" ) type httpClient struct { endpoint string token string + timeout time.Duration } func newHTTPClient(cfg *config) (*httpClient, error) { return &httpClient{ endpoint: cfg.endpoint, token: cfg.authBearerToken, + timeout: time.Duration(cfg.timeoutSeconds) * time.Second, }, nil } -func (h *httpClient) sendMessage(key string, message proto.Message) error { +func (h *httpClient) sendMessage(message *webhookMessage) error { + // Serialize the protobuf message to JSON for HTTP payload + notificationData, err := json.Marshal(message.Notification) + if err != nil { + return fmt.Errorf("failed to marshal notification: %v", err) + } + payload := map[string]interface{}{ - "key": key, - "message": message, + "key": message.Key, + "event_type": message.EventType, + "message": json.RawMessage(notificationData), } jsonData, err := json.Marshal(payload) @@ -43,8 +56,18 @@ func (h *httpClient) sendMessage(key string, message proto.Message) error { req.Header.Set("Authorization", "Bearer "+h.token) } + if h.timeout > 0 { + ctx, cancel := context.WithTimeout(context.Background(), h.timeout) + defer cancel() + req = req.WithContext(ctx) + } + resp, err := util_http.Do(req) if err != nil { + if err = drainResponse(resp); err != nil { + glog.Errorf("failed to drain response: %v", err) + } + return fmt.Errorf("failed to send request: %v", err) } defer resp.Body.Close() @@ -55,3 +78,16 @@ func (h *httpClient) sendMessage(key string, message proto.Message) error { return nil } + +func drainResponse(resp *http.Response) error { + if resp == nil || resp.Body == nil { + return nil + } + + _, err := io.ReadAll(resp.Body) + + return errors.Join( + err, + resp.Body.Close(), + ) +} |
