aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/metadata_blocking_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/protocol/metadata_blocking_test.go')
-rw-r--r--weed/mq/kafka/protocol/metadata_blocking_test.go361
1 files changed, 361 insertions, 0 deletions
diff --git a/weed/mq/kafka/protocol/metadata_blocking_test.go b/weed/mq/kafka/protocol/metadata_blocking_test.go
new file mode 100644
index 000000000..403489210
--- /dev/null
+++ b/weed/mq/kafka/protocol/metadata_blocking_test.go
@@ -0,0 +1,361 @@
+package protocol
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/mq/kafka/integration"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// TestMetadataRequestBlocking documents the original bug where Metadata requests hang
+// when the backend (broker/filer) ListTopics call blocks indefinitely.
+// This test is kept for documentation purposes and to verify the mock handler behavior.
+//
+// NOTE: The actual fix is in the broker's ListTopics implementation (weed/mq/broker/broker_grpc_lookup.go)
+// which adds a 2-second timeout for filer operations. This test uses a mock handler that
+// bypasses that fix, so it still demonstrates the original blocking behavior.
+func TestMetadataRequestBlocking(t *testing.T) {
+ t.Skip("This test documents the original bug. The fix is in the broker's ListTopics with filer timeout. Run TestMetadataRequestWithFastMock to verify fast path works.")
+
+ t.Log("Testing Metadata handler with blocking backend...")
+
+ // Create a handler with a mock backend that blocks on ListTopics
+ handler := &Handler{
+ seaweedMQHandler: &BlockingMockHandler{
+ blockDuration: 10 * time.Second, // Simulate slow backend
+ },
+ }
+
+ // Call handleMetadata in a goroutine so we can timeout
+ responseChan := make(chan []byte, 1)
+ errorChan := make(chan error, 1)
+
+ go func() {
+ // Build a simple Metadata v1 request body (empty topics array = all topics)
+ requestBody := []byte{0, 0, 0, 0} // Empty topics array
+ response, err := handler.handleMetadata(1, 1, requestBody)
+ if err != nil {
+ errorChan <- err
+ } else {
+ responseChan <- response
+ }
+ }()
+
+ // Wait for response with timeout
+ select {
+ case response := <-responseChan:
+ t.Logf("Metadata response received (%d bytes) - backend responded", len(response))
+ t.Error("UNEXPECTED: Response received before timeout - backend should have blocked")
+ case err := <-errorChan:
+ t.Logf("Metadata returned error: %v", err)
+ t.Error("UNEXPECTED: Error received - expected blocking, not error")
+ case <-time.After(3 * time.Second):
+ t.Logf("✓ BUG REPRODUCED: Metadata request blocked for 3+ seconds")
+ t.Logf(" Root cause: seaweedMQHandler.ListTopics() blocks indefinitely when broker/filer is slow")
+ t.Logf(" Impact: Entire control plane processor goroutine is frozen")
+ t.Logf(" Fix implemented: Broker's ListTopics now has 2-second timeout for filer operations")
+ // This is expected behavior with blocking mock - demonstrates the original issue
+ }
+}
+
+// TestMetadataRequestWithFastMock verifies that Metadata requests complete quickly
+// when the backend responds promptly (the common case)
+func TestMetadataRequestWithFastMock(t *testing.T) {
+ t.Log("Testing Metadata handler with fast-responding backend...")
+
+ // Create a handler with a fast mock (simulates in-memory topics only)
+ handler := &Handler{
+ seaweedMQHandler: &FastMockHandler{
+ topics: []string{"test-topic-1", "test-topic-2"},
+ },
+ }
+
+ // Call handleMetadata and measure time
+ start := time.Now()
+ requestBody := []byte{0, 0, 0, 0} // Empty topics array = list all
+ response, err := handler.handleMetadata(1, 1, requestBody)
+ duration := time.Since(start)
+
+ if err != nil {
+ t.Errorf("Metadata returned error: %v", err)
+ } else if response == nil {
+ t.Error("Metadata returned nil response")
+ } else {
+ t.Logf("✓ Metadata completed in %v (%d bytes)", duration, len(response))
+ if duration > 500*time.Millisecond {
+ t.Errorf("Metadata took too long: %v (should be < 500ms for fast backend)", duration)
+ }
+ }
+}
+
+// TestMetadataRequestWithTimeoutFix tests that Metadata requests with timeout-aware backend
+// complete within reasonable time even when underlying storage is slow
+func TestMetadataRequestWithTimeoutFix(t *testing.T) {
+ t.Log("Testing Metadata handler with timeout-aware backend...")
+
+ // Create a handler with a timeout-aware mock
+ // This simulates the broker's ListTopics with 2-second filer timeout
+ handler := &Handler{
+ seaweedMQHandler: &TimeoutAwareMockHandler{
+ timeout: 2 * time.Second,
+ blockDuration: 10 * time.Second, // Backend is slow but timeout kicks in
+ },
+ }
+
+ // Call handleMetadata and measure time
+ start := time.Now()
+ requestBody := []byte{0, 0, 0, 0} // Empty topics array
+ response, err := handler.handleMetadata(1, 1, requestBody)
+ duration := time.Since(start)
+
+ t.Logf("Metadata completed in %v", duration)
+
+ if err != nil {
+ t.Logf("✓ Metadata returned error after timeout: %v", err)
+ // This is acceptable - error response is better than hanging
+ } else if response != nil {
+ t.Logf("✓ Metadata returned response (%d bytes) without blocking", len(response))
+ // Backend timed out but still returned in-memory topics
+ if duration > 3*time.Second {
+ t.Errorf("Metadata took too long: %v (should timeout at ~2s)", duration)
+ }
+ } else {
+ t.Error("Metadata returned nil response and nil error - unexpected")
+ }
+}
+
+// FastMockHandler simulates a fast backend (in-memory topics only)
+type FastMockHandler struct {
+ topics []string
+}
+
+func (h *FastMockHandler) ListTopics() []string {
+ // Fast response - simulates in-memory topics
+ return h.topics
+}
+
+func (h *FastMockHandler) TopicExists(name string) bool {
+ for _, topic := range h.topics {
+ if topic == name {
+ return true
+ }
+ }
+ return false
+}
+
+func (h *FastMockHandler) CreateTopic(name string, partitions int32) error {
+ return fmt.Errorf("not implemented")
+}
+
+func (h *FastMockHandler) CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error {
+ return fmt.Errorf("not implemented")
+}
+
+func (h *FastMockHandler) DeleteTopic(name string) error {
+ return fmt.Errorf("not implemented")
+}
+
+func (h *FastMockHandler) GetTopicInfo(name string) (*integration.KafkaTopicInfo, bool) {
+ return nil, false
+}
+
+func (h *FastMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
+ return 0, fmt.Errorf("not implemented")
+}
+
+func (h *FastMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
+ return 0, fmt.Errorf("not implemented")
+}
+
+func (h *FastMockHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) {
+ return nil, fmt.Errorf("not implemented")
+}
+
+func (h *FastMockHandler) GetEarliestOffset(topic string, partition int32) (int64, error) {
+ return 0, fmt.Errorf("not implemented")
+}
+
+func (h *FastMockHandler) GetLatestOffset(topic string, partition int32) (int64, error) {
+ return 0, fmt.Errorf("not implemented")
+}
+
+func (h *FastMockHandler) WithFilerClient(streamingMode bool, fn func(client filer_pb.SeaweedFilerClient) error) error {
+ return fmt.Errorf("not implemented")
+}
+
+func (h *FastMockHandler) GetBrokerAddresses() []string {
+ return []string{"localhost:17777"}
+}
+
+func (h *FastMockHandler) CreatePerConnectionBrokerClient() (*integration.BrokerClient, error) {
+ return nil, fmt.Errorf("not implemented")
+}
+
+func (h *FastMockHandler) SetProtocolHandler(handler integration.ProtocolHandler) {
+ // No-op
+}
+
+func (h *FastMockHandler) Close() error {
+ return nil
+}
+
+// BlockingMockHandler simulates a backend that blocks indefinitely on ListTopics
+type BlockingMockHandler struct {
+ blockDuration time.Duration
+}
+
+func (h *BlockingMockHandler) ListTopics() []string {
+ // Simulate backend blocking (e.g., waiting for unresponsive broker/filer)
+ time.Sleep(h.blockDuration)
+ return []string{}
+}
+
+func (h *BlockingMockHandler) TopicExists(name string) bool {
+ return false
+}
+
+func (h *BlockingMockHandler) CreateTopic(name string, partitions int32) error {
+ return fmt.Errorf("not implemented")
+}
+
+func (h *BlockingMockHandler) CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error {
+ return fmt.Errorf("not implemented")
+}
+
+func (h *BlockingMockHandler) DeleteTopic(name string) error {
+ return fmt.Errorf("not implemented")
+}
+
+func (h *BlockingMockHandler) GetTopicInfo(name string) (*integration.KafkaTopicInfo, bool) {
+ return nil, false
+}
+
+func (h *BlockingMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
+ return 0, fmt.Errorf("not implemented")
+}
+
+func (h *BlockingMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
+ return 0, fmt.Errorf("not implemented")
+}
+
+func (h *BlockingMockHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) {
+ return nil, fmt.Errorf("not implemented")
+}
+
+func (h *BlockingMockHandler) GetEarliestOffset(topic string, partition int32) (int64, error) {
+ return 0, fmt.Errorf("not implemented")
+}
+
+func (h *BlockingMockHandler) GetLatestOffset(topic string, partition int32) (int64, error) {
+ return 0, fmt.Errorf("not implemented")
+}
+
+func (h *BlockingMockHandler) WithFilerClient(streamingMode bool, fn func(client filer_pb.SeaweedFilerClient) error) error {
+ return fmt.Errorf("not implemented")
+}
+
+func (h *BlockingMockHandler) GetBrokerAddresses() []string {
+ return []string{"localhost:17777"}
+}
+
+func (h *BlockingMockHandler) CreatePerConnectionBrokerClient() (*integration.BrokerClient, error) {
+ return nil, fmt.Errorf("not implemented")
+}
+
+func (h *BlockingMockHandler) SetProtocolHandler(handler integration.ProtocolHandler) {
+ // No-op
+}
+
+func (h *BlockingMockHandler) Close() error {
+ return nil
+}
+
+// TimeoutAwareMockHandler demonstrates expected behavior with timeout
+type TimeoutAwareMockHandler struct {
+ timeout time.Duration
+ blockDuration time.Duration
+}
+
+func (h *TimeoutAwareMockHandler) ListTopics() []string {
+ // Simulate timeout-aware backend
+ ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
+ defer cancel()
+
+ done := make(chan bool)
+ go func() {
+ time.Sleep(h.blockDuration)
+ done <- true
+ }()
+
+ select {
+ case <-done:
+ return []string{}
+ case <-ctx.Done():
+ // Timeout - return empty list rather than blocking forever
+ return []string{}
+ }
+}
+
+func (h *TimeoutAwareMockHandler) TopicExists(name string) bool {
+ return false
+}
+
+func (h *TimeoutAwareMockHandler) CreateTopic(name string, partitions int32) error {
+ return fmt.Errorf("not implemented")
+}
+
+func (h *TimeoutAwareMockHandler) CreateTopicWithSchemas(name string, partitions int32, keyRecordType *schema_pb.RecordType, valueRecordType *schema_pb.RecordType) error {
+ return fmt.Errorf("not implemented")
+}
+
+func (h *TimeoutAwareMockHandler) DeleteTopic(name string) error {
+ return fmt.Errorf("not implemented")
+}
+
+func (h *TimeoutAwareMockHandler) GetTopicInfo(name string) (*integration.KafkaTopicInfo, bool) {
+ return nil, false
+}
+
+func (h *TimeoutAwareMockHandler) ProduceRecord(topicName string, partitionID int32, key, value []byte) (int64, error) {
+ return 0, fmt.Errorf("not implemented")
+}
+
+func (h *TimeoutAwareMockHandler) ProduceRecordValue(topicName string, partitionID int32, key []byte, recordValueBytes []byte) (int64, error) {
+ return 0, fmt.Errorf("not implemented")
+}
+
+func (h *TimeoutAwareMockHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]integration.SMQRecord, error) {
+ return nil, fmt.Errorf("not implemented")
+}
+
+func (h *TimeoutAwareMockHandler) GetEarliestOffset(topic string, partition int32) (int64, error) {
+ return 0, fmt.Errorf("not implemented")
+}
+
+func (h *TimeoutAwareMockHandler) GetLatestOffset(topic string, partition int32) (int64, error) {
+ return 0, fmt.Errorf("not implemented")
+}
+
+func (h *TimeoutAwareMockHandler) WithFilerClient(streamingMode bool, fn func(client filer_pb.SeaweedFilerClient) error) error {
+ return fmt.Errorf("not implemented")
+}
+
+func (h *TimeoutAwareMockHandler) GetBrokerAddresses() []string {
+ return []string{"localhost:17777"}
+}
+
+func (h *TimeoutAwareMockHandler) CreatePerConnectionBrokerClient() (*integration.BrokerClient, error) {
+ return nil, fmt.Errorf("not implemented")
+}
+
+func (h *TimeoutAwareMockHandler) SetProtocolHandler(handler integration.ProtocolHandler) {
+ // No-op
+}
+
+func (h *TimeoutAwareMockHandler) Close() error {
+ return nil
+}