aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/metadata_blocking_test.go
blob: 4034892101e4f6ac1ea814afdf3d8d3260d2fc9d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
package protocol

import (
	"context"
	"fmt"
	"testing"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)

// TestMetadataRequestBlocking documents the original bug where Metadata requests hang
// when the backend (broker/filer) ListTopics call blocks indefinitely.
// This test is kept for documentation purposes and to verify the mock handler behavior.
//
// NOTE: The actual fix is in the broker's ListTopics implementation (weed/mq/broker/broker_grpc_lookup.go)
// which adds a 2-second timeout for filer operations. This test uses a mock handler that
// bypasses that fix, so it still demonstrates the original blocking behavior.
func TestMetadataRequestBlocking(t *testing.T) {
	t.Skip("This test documents the original bug. The fix is in the broker's ListTopics with filer timeout. Run TestMetadataRequestWithFastMock to verify fast path works.")

	t.Log("Testing Metadata handler with blocking backend...")

	// Create a handler with a mock backend that blocks on ListTopics
	handler := &Handler{
		seaweedMQHandler: &BlockingMockHandler{
			blockDuration: 10 * time.Second, // Simulate slow backend
		},
	}

	// Call handleMetadata in a goroutine so we can timeout
	responseChan := make(chan []byte, 1)
	errorChan := make(chan error, 1)

	go func() {
		// Build a simple Metadata v1 request body (empty topics array = all topics)
		requestBody := []byte{0, 0, 0, 0} // Empty topics array
		response, err := handler.handleMetadata(1, 1, requestBody)
		if err != nil {
			errorChan <- err
		} else {
			responseChan <- response
		}
	}()

	// Wait for response with timeout
	select {
	case response := <-responseChan:
		t.Logf("Metadata response received (%d bytes) - backend responded", len(response))
		t.Error("UNEXPECTED: Response received before timeout - backend should have blocked")
	case err := <-errorChan:
		t.Logf("Metadata returned error: %v", err)
		t.Error("UNEXPECTED: Error received - expected blocking, not error")
	case <-time.After(3 * time.Second):
		t.Logf("✓ BUG REPRODUCED: Metadata request blocked for 3+ seconds")
		t.Logf("  Root cause: seaweedMQHandler.ListTopics() blocks indefinitely when broker/filer is slow")
		t.Logf("  Impact: Entire control plane processor goroutine is frozen")
		t.Logf("  Fix implemented: Broker's ListTopics now has 2-second timeout for filer operations")
		// This is expected behavior with blocking mock - demonstrates the original issue
	}
}

// TestMetadataRequestWithFastMock verifies that Metadata requests complete quickly
// when the backend responds promptly (the common case)
func TestMetadataRequestWithFastMock(t *testing.T) {
	t.Log("Testing Metadata handler with fast-responding backend...")

	// Create a handler with a fast mock (simulates in-memory topics only)
	handler := &Handler{
		seaweedMQHandler: &FastMockHandler{
			topics: []string{"test-topic-1", "test-topic-2"},
		},
	}

	// Call handleMetadata and measure time
	start := time.Now()
	requestBody := []byte{0, 0, 0, 0} // Empty topics array = list all
	response, err := handler.handleMetadata(1, 1, requestBody)
	duration := time.Since(start)

	if err != nil {
		t.Errorf("Metadata returned error: %v", err)
	} else if response == nil {
		t.Error("Metadata returned nil response")
	} else {
		t.Logf("✓ Metadata completed in %v (%d bytes)", duration, len(response))
		if duration > 500*time.Millisecond {
			t.Errorf("Metadata took too long: %v (should be < 500ms for fast backend)", duration)
		}
	}
}

// TestMetadataRequestWithTimeoutFix tests that Metadata requests with timeout-aware backend
// complete within reasonable time even when underlying storage is slow
func TestMetadataRequestWithTimeoutFix(t *testing.T) {
	t.Log("Testing Metadata handler with timeout-aware backend...")

	// Create a handler with a timeout-aware mock
	// This simulates the broker's ListTopics with 2-second filer timeout
	handler := &Handler{
		seaweedMQHandler: &TimeoutAwareMockHandler{
			timeout:       2 * time.Second,
			blockDuration: 10 * time.Second, // Backend is slow but timeout kicks in
		},
	}

	// Call handleMetadata and measure time
	start := time.Now()
	requestBody := []byte{0, 0, 0, 0} // Empty topics array
	response, err := handler.handleMetadata(1, 1, requestBody)
	duration := time.Since(start)

	t.Logf("Metadata completed in %v", duration)

	if err != nil {
		t.Logf("✓ Metadata returned error after timeout: %v", err)
		// This is acceptable - error response is better than hanging
	} else if response != nil {
		t.Logf("✓ Metadata returned response (%d bytes) without blocking", len(response))
		// Backend timed out but still returned in-memory topics
		if duration > 3*time.Second {
			t.Errorf("Metadata took too long: %v (should timeout at ~2s)", duration)
		}
	} else {
		t.Error("Metadata returned nil response and nil error - unexpected")
	}
}

// FastMockHandler simulates a fast backend (in-memory topics only)
type FastMockHandler struct {
	topics []string
}

func (h *FastMockHandler) ListTopics() []string {
	// Fast response - simulates in-memory topics
	return h.topics
}

func (h *FastMockHandler) TopicExists(name string) bool {
	for _, topic := range h.topics {
		if topic == name {
			return true
		}
	}
	return false
}

func (h *FastMockHandler) CreateTopic(name string, partitions int32) error {
	return fmt.Errorf("not implemented")
}

func (h *FastMockHandler) CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error {
	return fmt.Errorf("not implemented")
}

func (h *FastMockHandler) DeleteTopic(name string) error {
	return fmt.Errorf("not implemented")
}

func (h *FastMockHandler) GetTopicInfo(name string) (*integration.KafkaTopicInfo, bool) {
	return nil, false
}

func (h *FastMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
	return 0, fmt.Errorf("not implemented")
}

func (h *FastMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
	return 0, fmt.Errorf("not implemented")
}

func (h *FastMockHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) {
	return nil, fmt.Errorf("not implemented")
}

func (h *FastMockHandler) GetEarliestOffset(topic string, partition int32) (int64, error) {
	return 0, fmt.Errorf("not implemented")
}

func (h *FastMockHandler) GetLatestOffset(topic string, partition int32) (int64, error) {
	return 0, fmt.Errorf("not implemented")
}

func (h *FastMockHandler) WithFilerClient(streamingMode bool, fn func(client filer_pb.SeaweedFilerClient) error) error {
	return fmt.Errorf("not implemented")
}

func (h *FastMockHandler) GetBrokerAddresses() []string {
	return []string{"localhost:17777"}
}

func (h *FastMockHandler) CreatePerConnectionBrokerClient() (*integration.BrokerClient, error) {
	return nil, fmt.Errorf("not implemented")
}

func (h *FastMockHandler) SetProtocolHandler(handler integration.ProtocolHandler) {
	// No-op
}

func (h *FastMockHandler) Close() error {
	return nil
}

// BlockingMockHandler simulates a backend that blocks indefinitely on ListTopics
type BlockingMockHandler struct {
	blockDuration time.Duration
}

func (h *BlockingMockHandler) ListTopics() []string {
	// Simulate backend blocking (e.g., waiting for unresponsive broker/filer)
	time.Sleep(h.blockDuration)
	return []string{}
}

func (h *BlockingMockHandler) TopicExists(name string) bool {
	return false
}

func (h *BlockingMockHandler) CreateTopic(name string, partitions int32) error {
	return fmt.Errorf("not implemented")
}

func (h *BlockingMockHandler) CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error {
	return fmt.Errorf("not implemented")
}

func (h *BlockingMockHandler) DeleteTopic(name string) error {
	return fmt.Errorf("not implemented")
}

func (h *BlockingMockHandler) GetTopicInfo(name string) (*integration.KafkaTopicInfo, bool) {
	return nil, false
}

func (h *BlockingMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
	return 0, fmt.Errorf("not implemented")
}

func (h *BlockingMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
	return 0, fmt.Errorf("not implemented")
}

func (h *BlockingMockHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) {
	return nil, fmt.Errorf("not implemented")
}

func (h *BlockingMockHandler) GetEarliestOffset(topic string, partition int32) (int64, error) {
	return 0, fmt.Errorf("not implemented")
}

func (h *BlockingMockHandler) GetLatestOffset(topic string, partition int32) (int64, error) {
	return 0, fmt.Errorf("not implemented")
}

func (h *BlockingMockHandler) WithFilerClient(streamingMode bool, fn func(client filer_pb.SeaweedFilerClient) error) error {
	return fmt.Errorf("not implemented")
}

func (h *BlockingMockHandler) GetBrokerAddresses() []string {
	return []string{"localhost:17777"}
}

func (h *BlockingMockHandler) CreatePerConnectionBrokerClient() (*integration.BrokerClient, error) {
	return nil, fmt.Errorf("not implemented")
}

func (h *BlockingMockHandler) SetProtocolHandler(handler integration.ProtocolHandler) {
	// No-op
}

func (h *BlockingMockHandler) Close() error {
	return nil
}

// TimeoutAwareMockHandler demonstrates expected behavior with timeout
type TimeoutAwareMockHandler struct {
	timeout       time.Duration
	blockDuration time.Duration
}

func (h *TimeoutAwareMockHandler) ListTopics() []string {
	// Simulate timeout-aware backend
	ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
	defer cancel()

	done := make(chan bool)
	go func() {
		time.Sleep(h.blockDuration)
		done <- true
	}()

	select {
	case <-done:
		return []string{}
	case <-ctx.Done():
		// Timeout - return empty list rather than blocking forever
		return []string{}
	}
}

func (h *TimeoutAwareMockHandler) TopicExists(name string) bool {
	return false
}

func (h *TimeoutAwareMockHandler) CreateTopic(name string, partitions int32) error {
	return fmt.Errorf("not implemented")
}

func (h *TimeoutAwareMockHandler) CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error {
	return fmt.Errorf("not implemented")
}

func (h *TimeoutAwareMockHandler) DeleteTopic(name string) error {
	return fmt.Errorf("not implemented")
}

func (h *TimeoutAwareMockHandler) GetTopicInfo(name string) (*integration.KafkaTopicInfo, bool) {
	return nil, false
}

func (h *TimeoutAwareMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
	return 0, fmt.Errorf("not implemented")
}

func (h *TimeoutAwareMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
	return 0, fmt.Errorf("not implemented")
}

func (h *TimeoutAwareMockHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) {
	return nil, fmt.Errorf("not implemented")
}

func (h *TimeoutAwareMockHandler) GetEarliestOffset(topic string, partition int32) (int64, error) {
	return 0, fmt.Errorf("not implemented")
}

func (h *TimeoutAwareMockHandler) GetLatestOffset(topic string, partition int32) (int64, error) {
	return 0, fmt.Errorf("not implemented")
}

func (h *TimeoutAwareMockHandler) WithFilerClient(streamingMode bool, fn func(client filer_pb.SeaweedFilerClient) error) error {
	return fmt.Errorf("not implemented")
}

func (h *TimeoutAwareMockHandler) GetBrokerAddresses() []string {
	return []string{"localhost:17777"}
}

func (h *TimeoutAwareMockHandler) CreatePerConnectionBrokerClient() (*integration.BrokerClient, error) {
	return nil, fmt.Errorf("not implemented")
}

func (h *TimeoutAwareMockHandler) SetProtocolHandler(handler integration.ProtocolHandler) {
	// No-op
}

func (h *TimeoutAwareMockHandler) Close() error {
	return nil
}