aboutsummaryrefslogtreecommitdiff
path: root/weed/notification/webhook/http.go
blob: 82a8f8e71c15904042af55ba07d7c8f70ab78877 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package webhook

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"net/http"
	"sync"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)

const (
	maxWebhookRetryDepth = 2
)

type httpClient struct {
	endpoint   string
	token      string
	timeout    time.Duration
	client     *http.Client // Reused HTTP client with redirect prevention
	endpointMu sync.RWMutex
	finalURL   string // Cached final URL after following redirects
}

func newHTTPClient(cfg *config) (*httpClient, error) {
	return &httpClient{
		endpoint: cfg.endpoint,
		token:    cfg.authBearerToken,
		timeout:  time.Duration(cfg.timeoutSeconds) * time.Second,
		client: &http.Client{
			CheckRedirect: func(req *http.Request, via []*http.Request) error {
				return http.ErrUseLastResponse
			},
		},
	}, nil
}

func (h *httpClient) sendMessage(message *webhookMessage) error {
	return h.sendMessageWithRetry(message, 0)
}

func (h *httpClient) sendMessageWithRetry(message *webhookMessage, depth int) error {
	// Prevent infinite recursion
	if depth > maxWebhookRetryDepth {
		return fmt.Errorf("webhook max retry depth exceeded")
	}

	// Serialize the protobuf message to JSON for HTTP payload
	notificationData, err := json.Marshal(message.Notification)
	if err != nil {
		return fmt.Errorf("failed to marshal notification: %w", err)
	}

	payload := map[string]interface{}{
		"key":        message.Key,
		"event_type": message.EventType,
		"message":    json.RawMessage(notificationData),
	}

	jsonData, err := json.Marshal(payload)
	if err != nil {
		return fmt.Errorf("failed to marshal message: %w", err)
	}

	// Use cached final URL if available, otherwise use original endpoint
	h.endpointMu.RLock()
	targetURL := h.finalURL
	usingCachedURL := targetURL != ""
	if targetURL == "" {
		targetURL = h.endpoint
	}
	h.endpointMu.RUnlock()

	req, err := http.NewRequest(http.MethodPost, targetURL, bytes.NewBuffer(jsonData))
	if err != nil {
		return fmt.Errorf("failed to create request: %w", err)
	}

	req.Header.Set("Content-Type", "application/json")
	if h.token != "" {
		req.Header.Set("Authorization", "Bearer "+h.token)
	}

	// Apply timeout via context (not on client) to avoid redundancy
	if h.timeout > 0 {
		ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
		defer cancel()
		req = req.WithContext(ctx)
	}

	resp, err := h.client.Do(req)
	if err != nil {
		if drainErr := drainResponse(resp); drainErr != nil {
			glog.Errorf("failed to drain response: %v", drainErr)
		}

		// If using cached URL and request failed, clear cache and retry with original endpoint
		if usingCachedURL && depth == 0 {
			glog.V(1).Infof("Webhook request to cached URL %s failed, clearing cache and retrying with original endpoint", targetURL)
			h.setFinalURL("")
			return h.sendMessageWithRetry(message, depth+1)
		}

		return fmt.Errorf("failed to send request: %w", err)
	}

	// Handle redirects by caching the final destination and recreating POST request
	if resp.StatusCode >= 300 && resp.StatusCode < 400 {
		// Drain and close response body to enable connection reuse
		util_http.CloseResponse(resp)

		location := resp.Header.Get("Location")
		if location == "" {
			return fmt.Errorf("webhook returned redirect status %d without Location header", resp.StatusCode)
		}

		// Resolve relative URLs against the request URL
		reqURL := req.URL
		finalURL, err := reqURL.Parse(location)
		if err != nil {
			return fmt.Errorf("failed to parse redirect location: %w", err)
		}

		// Update finalURL to follow the redirect for this attempt
		finalURLStr := finalURL.String()
		h.setFinalURL(finalURLStr)

		if depth == 0 {
			glog.V(1).Infof("Webhook endpoint redirected from %s to %s, caching final destination", targetURL, finalURLStr)
		} else {
			glog.V(1).Infof("Webhook endpoint redirected from %s to %s (following redirect on retry)", targetURL, finalURLStr)
		}

		// Recreate the POST request to the final destination (increment depth to prevent infinite loops)
		return h.sendMessageWithRetry(message, depth+1)
	}

	// If using cached URL and got an error response, clear cache and retry with original endpoint
	if resp.StatusCode >= 400 && usingCachedURL && depth == 0 {
		// Drain and close response body to enable connection reuse
		util_http.CloseResponse(resp)

		glog.V(1).Infof("Webhook request to cached URL %s returned error %d, clearing cache and retrying with original endpoint", targetURL, resp.StatusCode)
		h.setFinalURL("")
		return h.sendMessageWithRetry(message, depth+1)
	}

	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		// Drain and close response body before returning error to enable connection reuse
		util_http.CloseResponse(resp)
		return fmt.Errorf("webhook returned status code: %d", resp.StatusCode)
	}

	// Drain and close response body on success to enable connection reuse
	util_http.CloseResponse(resp)

	return nil
}

func (h *httpClient) setFinalURL(url string) {
	h.endpointMu.Lock()
	h.finalURL = url
	h.endpointMu.Unlock()
}

func drainResponse(resp *http.Response) error {
	if resp == nil || resp.Body == nil {
		return nil
	}

	_, err := io.ReadAll(resp.Body)

	return errors.Join(
		err,
		resp.Body.Close(),
	)
}