aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/integration/smq_integration_test.go
blob: f0c14017846a708e8408ef309cd778de3126480b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
package integration

import (
	"context"
	"testing"
	"time"

	"github.com/IBM/sarama"
	"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
)

// TestSMQIntegration tests that the Kafka gateway properly integrates with SeaweedMQ
// This test REQUIRES SeaweedFS masters to be running and will skip if not available
func TestSMQIntegration(t *testing.T) {
	// This test requires SMQ to be available
	gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQRequired)
	defer gateway.CleanupAndClose()

	addr := gateway.StartAndWait()

	t.Logf("Running SMQ integration test with SeaweedFS backend")

	t.Run("ProduceConsumeWithPersistence", func(t *testing.T) {
		testProduceConsumeWithPersistence(t, addr)
	})

	t.Run("ConsumerGroupOffsetPersistence", func(t *testing.T) {
		testConsumerGroupOffsetPersistence(t, addr)
	})

	t.Run("TopicPersistence", func(t *testing.T) {
		testTopicPersistence(t, addr)
	})
}

func testProduceConsumeWithPersistence(t *testing.T, addr string) {
	topicName := testutil.GenerateUniqueTopicName("smq-integration-produce-consume")

	client := testutil.NewSaramaClient(t, addr)
	msgGen := testutil.NewMessageGenerator()

	// Create topic
	err := client.CreateTopic(topicName, 1, 1)
	testutil.AssertNoError(t, err, "Failed to create topic")

	// Allow time for topic to propagate in SMQ backend
	time.Sleep(500 * time.Millisecond)

	// Produce messages
	messages := msgGen.GenerateStringMessages(5)
	err = client.ProduceMessages(topicName, messages)
	testutil.AssertNoError(t, err, "Failed to produce messages")

	// Allow time for messages to be fully persisted in SMQ backend
	time.Sleep(200 * time.Millisecond)

	t.Logf("Produced %d messages to topic %s", len(messages), topicName)

	// Consume messages
	consumed, err := client.ConsumeMessages(topicName, 0, len(messages))
	testutil.AssertNoError(t, err, "Failed to consume messages")

	// Verify all messages were consumed
	testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch")

	t.Logf("Successfully consumed %d messages from SMQ backend", len(consumed))
}

func testConsumerGroupOffsetPersistence(t *testing.T, addr string) {
	topicName := testutil.GenerateUniqueTopicName("smq-integration-offset-persistence")
	groupID := testutil.GenerateUniqueGroupID("smq-offset-group")

	client := testutil.NewSaramaClient(t, addr)
	msgGen := testutil.NewMessageGenerator()

	// Create topic and produce messages
	err := client.CreateTopic(topicName, 1, 1)
	testutil.AssertNoError(t, err, "Failed to create topic")

	// Allow time for topic to propagate in SMQ backend
	time.Sleep(500 * time.Millisecond)

	messages := msgGen.GenerateStringMessages(10)
	err = client.ProduceMessages(topicName, messages)
	testutil.AssertNoError(t, err, "Failed to produce messages")

	// Allow time for messages to be fully persisted in SMQ backend
	time.Sleep(200 * time.Millisecond)

	// Phase 1: Consume first 5 messages with consumer group and commit offsets
	t.Logf("Phase 1: Consuming first 5 messages and committing offsets")

	config := client.GetConfig()
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	// Enable auto-commit for more reliable offset handling
	config.Consumer.Offsets.AutoCommit.Enable = true
	config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second

	consumerGroup1, err := sarama.NewConsumerGroup([]string{addr}, groupID, config)
	testutil.AssertNoError(t, err, "Failed to create first consumer group")

	handler := &SMQOffsetTestHandler{
		messages:  make(chan *sarama.ConsumerMessage, len(messages)),
		ready:     make(chan bool),
		stopAfter: 5,
		t:         t,
	}

	ctx1, cancel1 := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel1()

	consumeErrChan1 := make(chan error, 1)
	go func() {
		err := consumerGroup1.Consume(ctx1, []string{topicName}, handler)
		if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
			t.Logf("First consumer error: %v", err)
			consumeErrChan1 <- err
		}
	}()

	// Wait for consumer to be ready with timeout
	select {
	case <-handler.ready:
		// Consumer is ready, continue
	case err := <-consumeErrChan1:
		t.Fatalf("First consumer failed to start: %v", err)
	case <-time.After(10 * time.Second):
		t.Fatalf("Timeout waiting for first consumer to be ready")
	}
	consumedCount := 0
	for consumedCount < 5 {
		select {
		case <-handler.messages:
			consumedCount++
		case <-time.After(20 * time.Second):
			t.Fatalf("Timeout waiting for first batch of messages. Got %d/5", consumedCount)
		}
	}

	consumerGroup1.Close()
	cancel1()
	time.Sleep(7 * time.Second) // Allow auto-commit to complete and offset commits to be processed in SMQ

	t.Logf("Consumed %d messages in first phase", consumedCount)

	// Phase 2: Start new consumer group with same ID - should resume from committed offset
	t.Logf("Phase 2: Starting new consumer group to test offset persistence")

	// Create a fresh config for the second consumer group to avoid any state issues
	config2 := client.GetConfig()
	config2.Consumer.Offsets.Initial = sarama.OffsetOldest
	config2.Consumer.Offsets.AutoCommit.Enable = true
	config2.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second

	consumerGroup2, err := sarama.NewConsumerGroup([]string{addr}, groupID, config2)
	testutil.AssertNoError(t, err, "Failed to create second consumer group")
	defer consumerGroup2.Close()

	handler2 := &SMQOffsetTestHandler{
		messages:  make(chan *sarama.ConsumerMessage, len(messages)),
		ready:     make(chan bool),
		stopAfter: 5, // Should consume remaining 5 messages
		t:         t,
	}

	ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel2()

	consumeErrChan := make(chan error, 1)
	go func() {
		err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2)
		if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
			t.Logf("Second consumer error: %v", err)
			consumeErrChan <- err
		}
	}()

	// Wait for second consumer to be ready with timeout
	select {
	case <-handler2.ready:
		// Consumer is ready, continue
	case err := <-consumeErrChan:
		t.Fatalf("Second consumer failed to start: %v", err)
	case <-time.After(10 * time.Second):
		t.Fatalf("Timeout waiting for second consumer to be ready")
	}
	secondConsumerMessages := make([]*sarama.ConsumerMessage, 0)
	consumedCount = 0
	for consumedCount < 5 {
		select {
		case msg := <-handler2.messages:
			consumedCount++
			secondConsumerMessages = append(secondConsumerMessages, msg)
		case <-time.After(20 * time.Second):
			t.Fatalf("Timeout waiting for second batch of messages. Got %d/5", consumedCount)
		}
	}

	// Verify second consumer started from correct offset (should be >= 5)
	if len(secondConsumerMessages) > 0 {
		firstMessageOffset := secondConsumerMessages[0].Offset
		if firstMessageOffset < 5 {
			t.Fatalf("Second consumer should start from offset >= 5: got %d", firstMessageOffset)
		}
		t.Logf("Second consumer correctly resumed from offset %d", firstMessageOffset)
	}

	t.Logf("Successfully verified SMQ offset persistence")
}

func testTopicPersistence(t *testing.T, addr string) {
	topicName := testutil.GenerateUniqueTopicName("smq-integration-topic-persistence")

	client := testutil.NewSaramaClient(t, addr)

	// Create topic
	err := client.CreateTopic(topicName, 2, 1) // 2 partitions
	testutil.AssertNoError(t, err, "Failed to create topic")

	// Allow time for topic to propagate and persist in SMQ backend
	time.Sleep(1 * time.Second)

	// Verify topic exists by listing topics using admin client
	config := client.GetConfig()
	config.Admin.Timeout = 30 * time.Second

	admin, err := sarama.NewClusterAdmin([]string{addr}, config)
	testutil.AssertNoError(t, err, "Failed to create admin client")
	defer admin.Close()

	// Retry topic listing to handle potential delays in topic propagation
	var topics map[string]sarama.TopicDetail
	var listErr error
	for attempt := 0; attempt < 3; attempt++ {
		if attempt > 0 {
			sleepDuration := time.Duration(500*(1<<(attempt-1))) * time.Millisecond
			t.Logf("Retrying ListTopics after %v (attempt %d/3)", sleepDuration, attempt+1)
			time.Sleep(sleepDuration)
		}

		topics, listErr = admin.ListTopics()
		if listErr == nil {
			break
		}
	}
	testutil.AssertNoError(t, listErr, "Failed to list topics")

	topicDetails, exists := topics[topicName]
	if !exists {
		t.Fatalf("Topic %s not found in topic list", topicName)
	}

	if topicDetails.NumPartitions != 2 {
		t.Errorf("Expected 2 partitions, got %d", topicDetails.NumPartitions)
	}

	t.Logf("Successfully verified topic persistence with %d partitions", topicDetails.NumPartitions)
}

// SMQOffsetTestHandler implements sarama.ConsumerGroupHandler for SMQ offset testing
type SMQOffsetTestHandler struct {
	messages  chan *sarama.ConsumerMessage
	ready     chan bool
	readyOnce bool
	stopAfter int
	consumed  int
	t         *testing.T
}

func (h *SMQOffsetTestHandler) Setup(sarama.ConsumerGroupSession) error {
	h.t.Logf("SMQ offset test consumer setup")
	if !h.readyOnce {
		close(h.ready)
		h.readyOnce = true
	}
	return nil
}

func (h *SMQOffsetTestHandler) Cleanup(sarama.ConsumerGroupSession) error {
	h.t.Logf("SMQ offset test consumer cleanup")
	return nil
}

func (h *SMQOffsetTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for {
		select {
		case message := <-claim.Messages():
			if message == nil {
				return nil
			}
			h.consumed++
			h.messages <- message
			session.MarkMessage(message, "")

			// Stop after consuming the specified number of messages
			if h.consumed >= h.stopAfter {
				h.t.Logf("Stopping SMQ consumer after %d messages", h.consumed)
				// Auto-commit will handle offset commits automatically
				return nil
			}
		case <-session.Context().Done():
			return nil
		}
	}
}