aboutsummaryrefslogtreecommitdiff
path: root/weed/notification/webhook/http.go
diff options
context:
space:
mode:
authorIbrahim Konsowa <imkonsowa@gmail.com>2025-07-15 21:49:37 +0400
committerGitHub <noreply@github.com>2025-07-15 10:49:37 -0700
commitd78aa3d2de1a18c9802f72c926efa8e80ff2fa64 (patch)
tree7980858236682e2cdb3f91f6ed3f9eeaf4843b47 /weed/notification/webhook/http.go
parent74f4e9ba5ab691938e89b32017a89359ae49428f (diff)
downloadseaweedfs-d78aa3d2de1a18c9802f72c926efa8e80ff2fa64.tar.xz
seaweedfs-d78aa3d2de1a18c9802f72c926efa8e80ff2fa64.zip
[Notifications] Improving webhook notifications (#6965)
* worker setup * fix tests * start worker * graceful worker drain * retry queue * migrate queue to watermill * adding filters and improvements * add the event type to the webhook message * eliminating redundant JSON serialization * resolve review comments * trigger actions * fix tests * typo fixes * read max_backoff_seconds from config * add more context to the dead letter * close the http response on errors * drain the http response body in case not empty * eliminate exported typesπ
Diffstat (limited to 'weed/notification/webhook/http.go')
-rw-r--r--weed/notification/webhook/http.go44
1 files changed, 40 insertions, 4 deletions
diff --git a/weed/notification/webhook/http.go b/weed/notification/webhook/http.go
index 13b7f30d9..bb6a11a09 100644
--- a/weed/notification/webhook/http.go
+++ b/weed/notification/webhook/http.go
@@ -2,30 +2,43 @@ package webhook
import (
"bytes"
+ "context"
"encoding/json"
+ "errors"
"fmt"
+ "io"
"net/http"
+ "time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
- "google.golang.org/protobuf/proto"
)
type httpClient struct {
endpoint string
token string
+ timeout time.Duration
}
func newHTTPClient(cfg *config) (*httpClient, error) {
return &httpClient{
endpoint: cfg.endpoint,
token: cfg.authBearerToken,
+ timeout: time.Duration(cfg.timeoutSeconds) * time.Second,
}, nil
}
-func (h *httpClient) sendMessage(key string, message proto.Message) error {
+func (h *httpClient) sendMessage(message *webhookMessage) error {
+ // Serialize the protobuf message to JSON for HTTP payload
+ notificationData, err := json.Marshal(message.Notification)
+ if err != nil {
+ return fmt.Errorf("failed to marshal notification: %v", err)
+ }
+
payload := map[string]interface{}{
- "key": key,
- "message": message,
+ "key": message.Key,
+ "event_type": message.EventType,
+ "message": json.RawMessage(notificationData),
}
jsonData, err := json.Marshal(payload)
@@ -43,8 +56,18 @@ func (h *httpClient) sendMessage(key string, message proto.Message) error {
req.Header.Set("Authorization", "Bearer "+h.token)
}
+ if h.timeout > 0 {
+ ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
+ defer cancel()
+ req = req.WithContext(ctx)
+ }
+
resp, err := util_http.Do(req)
if err != nil {
+ if err = drainResponse(resp); err != nil {
+ glog.Errorf("failed to drain response: %v", err)
+ }
+
return fmt.Errorf("failed to send request: %v", err)
}
defer resp.Body.Close()
@@ -55,3 +78,16 @@ func (h *httpClient) sendMessage(key string, message proto.Message) error {
return nil
}
+
+func drainResponse(resp *http.Response) error {
+ if resp == nil || resp.Body == nil {
+ return nil
+ }
+
+ _, err := io.ReadAll(resp.Body)
+
+ return errors.Join(
+ err,
+ resp.Body.Close(),
+ )
+}