aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/integration/consumer_groups_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/integration/consumer_groups_test.go')
-rw-r--r--test/kafka/integration/consumer_groups_test.go351
1 files changed, 351 insertions, 0 deletions
diff --git a/test/kafka/integration/consumer_groups_test.go b/test/kafka/integration/consumer_groups_test.go
new file mode 100644
index 000000000..5407a2999
--- /dev/null
+++ b/test/kafka/integration/consumer_groups_test.go
@@ -0,0 +1,351 @@
+package integration
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/IBM/sarama"
+ "github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
+)
+
+// TestConsumerGroups tests consumer group functionality
+// This test requires SeaweedFS masters to be running and will skip if not available
+func TestConsumerGroups(t *testing.T) {
+ gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQRequired)
+ defer gateway.CleanupAndClose()
+
+ addr := gateway.StartAndWait()
+
+ t.Logf("Running consumer group tests with SMQ backend for offset persistence")
+
+ t.Run("BasicFunctionality", func(t *testing.T) {
+ testConsumerGroupBasicFunctionality(t, addr)
+ })
+
+ t.Run("OffsetCommitAndFetch", func(t *testing.T) {
+ testConsumerGroupOffsetCommitAndFetch(t, addr)
+ })
+
+ t.Run("Rebalancing", func(t *testing.T) {
+ testConsumerGroupRebalancing(t, addr)
+ })
+}
+
+func testConsumerGroupBasicFunctionality(t *testing.T, addr string) {
+ topicName := testutil.GenerateUniqueTopicName("consumer-group-basic")
+ groupID := testutil.GenerateUniqueGroupID("basic-group")
+
+ client := testutil.NewSaramaClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Create topic and produce messages
+ err := client.CreateTopic(topicName, 1, 1)
+ testutil.AssertNoError(t, err, "Failed to create topic")
+
+ messages := msgGen.GenerateStringMessages(9) // 3 messages per consumer
+ err = client.ProduceMessages(topicName, messages)
+ testutil.AssertNoError(t, err, "Failed to produce messages")
+
+ // Test with multiple consumers in the same group
+ numConsumers := 3
+ handler := &ConsumerGroupHandler{
+ messages: make(chan *sarama.ConsumerMessage, len(messages)),
+ ready: make(chan bool),
+ t: t,
+ }
+
+ var wg sync.WaitGroup
+ consumerErrors := make(chan error, numConsumers)
+
+ for i := 0; i < numConsumers; i++ {
+ wg.Add(1)
+ go func(consumerID int) {
+ defer wg.Done()
+
+ consumerGroup, err := sarama.NewConsumerGroup([]string{addr}, groupID, client.GetConfig())
+ if err != nil {
+ consumerErrors <- fmt.Errorf("consumer %d: failed to create consumer group: %v", consumerID, err)
+ return
+ }
+ defer consumerGroup.Close()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ err = consumerGroup.Consume(ctx, []string{topicName}, handler)
+ if err != nil && err != context.DeadlineExceeded {
+ consumerErrors <- fmt.Errorf("consumer %d: consumption error: %v", consumerID, err)
+ return
+ }
+ }(i)
+ }
+
+ // Wait for consumers to be ready
+ readyCount := 0
+ for readyCount < numConsumers {
+ select {
+ case <-handler.ready:
+ readyCount++
+ case <-time.After(5 * time.Second):
+ t.Fatalf("Timeout waiting for consumers to be ready")
+ }
+ }
+
+ // Collect consumed messages
+ consumedMessages := make([]*sarama.ConsumerMessage, 0, len(messages))
+ messageTimeout := time.After(10 * time.Second)
+
+ for len(consumedMessages) < len(messages) {
+ select {
+ case msg := <-handler.messages:
+ consumedMessages = append(consumedMessages, msg)
+ case err := <-consumerErrors:
+ t.Fatalf("Consumer error: %v", err)
+ case <-messageTimeout:
+ t.Fatalf("Timeout waiting for messages. Got %d/%d messages", len(consumedMessages), len(messages))
+ }
+ }
+
+ wg.Wait()
+
+ // Verify all messages were consumed exactly once
+ testutil.AssertEqual(t, len(messages), len(consumedMessages), "Message count mismatch")
+
+ // Verify message uniqueness (no duplicates)
+ messageKeys := make(map[string]bool)
+ for _, msg := range consumedMessages {
+ key := string(msg.Key)
+ if messageKeys[key] {
+ t.Errorf("Duplicate message key: %s", key)
+ }
+ messageKeys[key] = true
+ }
+}
+
+func testConsumerGroupOffsetCommitAndFetch(t *testing.T, addr string) {
+ topicName := testutil.GenerateUniqueTopicName("offset-commit-test")
+ groupID := testutil.GenerateUniqueGroupID("offset-group")
+
+ client := testutil.NewSaramaClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Create topic and produce messages
+ err := client.CreateTopic(topicName, 1, 1)
+ testutil.AssertNoError(t, err, "Failed to create topic")
+
+ messages := msgGen.GenerateStringMessages(5)
+ err = client.ProduceMessages(topicName, messages)
+ testutil.AssertNoError(t, err, "Failed to produce messages")
+
+ // First consumer: consume first 3 messages and commit offsets
+ handler1 := &OffsetTestHandler{
+ messages: make(chan *sarama.ConsumerMessage, len(messages)),
+ ready: make(chan bool),
+ stopAfter: 3,
+ t: t,
+ }
+
+ consumerGroup1, err := sarama.NewConsumerGroup([]string{addr}, groupID, client.GetConfig())
+ testutil.AssertNoError(t, err, "Failed to create first consumer group")
+
+ ctx1, cancel1 := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel1()
+
+ go func() {
+ err := consumerGroup1.Consume(ctx1, []string{topicName}, handler1)
+ if err != nil && err != context.DeadlineExceeded {
+ t.Logf("First consumer error: %v", err)
+ }
+ }()
+
+ // Wait for first consumer to be ready and consume messages
+ <-handler1.ready
+ consumedCount := 0
+ for consumedCount < 3 {
+ select {
+ case <-handler1.messages:
+ consumedCount++
+ case <-time.After(5 * time.Second):
+ t.Fatalf("Timeout waiting for first consumer messages")
+ }
+ }
+
+ consumerGroup1.Close()
+ cancel1()
+ time.Sleep(500 * time.Millisecond) // Wait for cleanup
+
+ // Stop the first consumer after N messages
+ // Allow a brief moment for commit/heartbeat to flush
+ time.Sleep(1 * time.Second)
+
+ // Start a second consumer in the same group to verify resumption from committed offset
+ handler2 := &OffsetTestHandler{
+ messages: make(chan *sarama.ConsumerMessage, len(messages)),
+ ready: make(chan bool),
+ stopAfter: 2,
+ t: t,
+ }
+ consumerGroup2, err := sarama.NewConsumerGroup([]string{addr}, groupID, client.GetConfig())
+ testutil.AssertNoError(t, err, "Failed to create second consumer group")
+ defer consumerGroup2.Close()
+
+ ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel2()
+
+ go func() {
+ err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2)
+ if err != nil && err != context.DeadlineExceeded {
+ t.Logf("Second consumer error: %v", err)
+ }
+ }()
+
+ // Wait for second consumer and collect remaining messages
+ <-handler2.ready
+ secondConsumerMessages := make([]*sarama.ConsumerMessage, 0)
+ consumedCount = 0
+ for consumedCount < 2 {
+ select {
+ case msg := <-handler2.messages:
+ consumedCount++
+ secondConsumerMessages = append(secondConsumerMessages, msg)
+ case <-time.After(5 * time.Second):
+ t.Fatalf("Timeout waiting for second consumer messages. Got %d/2", consumedCount)
+ }
+ }
+
+ // Verify second consumer started from correct offset
+ if len(secondConsumerMessages) > 0 {
+ firstMessageOffset := secondConsumerMessages[0].Offset
+ if firstMessageOffset < 3 {
+ t.Fatalf("Second consumer should start from offset >= 3: got %d", firstMessageOffset)
+ }
+ }
+}
+
+func testConsumerGroupRebalancing(t *testing.T, addr string) {
+ topicName := testutil.GenerateUniqueTopicName("rebalancing-test")
+ groupID := testutil.GenerateUniqueGroupID("rebalance-group")
+
+ client := testutil.NewSaramaClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Create topic with multiple partitions for rebalancing
+ err := client.CreateTopic(topicName, 4, 1) // 4 partitions
+ testutil.AssertNoError(t, err, "Failed to create topic")
+
+ // Produce messages to all partitions
+ messages := msgGen.GenerateStringMessages(12) // 3 messages per partition
+ for i, msg := range messages {
+ partition := int32(i % 4)
+ err = client.ProduceMessageToPartition(topicName, partition, msg)
+ testutil.AssertNoError(t, err, "Failed to produce message")
+ }
+
+ t.Logf("Produced %d messages across 4 partitions", len(messages))
+
+ // Test scenario 1: Single consumer gets all partitions
+ t.Run("SingleConsumerAllPartitions", func(t *testing.T) {
+ testSingleConsumerAllPartitions(t, addr, topicName, groupID+"-single")
+ })
+
+ // Test scenario 2: Add second consumer, verify rebalancing
+ t.Run("TwoConsumersRebalance", func(t *testing.T) {
+ testTwoConsumersRebalance(t, addr, topicName, groupID+"-two")
+ })
+
+ // Test scenario 3: Remove consumer, verify rebalancing
+ t.Run("ConsumerLeaveRebalance", func(t *testing.T) {
+ testConsumerLeaveRebalance(t, addr, topicName, groupID+"-leave")
+ })
+
+ // Test scenario 4: Multiple consumers join simultaneously
+ t.Run("MultipleConsumersJoin", func(t *testing.T) {
+ testMultipleConsumersJoin(t, addr, topicName, groupID+"-multi")
+ })
+}
+
+// ConsumerGroupHandler implements sarama.ConsumerGroupHandler
+type ConsumerGroupHandler struct {
+ messages chan *sarama.ConsumerMessage
+ ready chan bool
+ readyOnce sync.Once
+ t *testing.T
+}
+
+func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
+ h.t.Logf("Consumer group session setup")
+ h.readyOnce.Do(func() {
+ close(h.ready)
+ })
+ return nil
+}
+
+func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
+ h.t.Logf("Consumer group session cleanup")
+ return nil
+}
+
+func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ for {
+ select {
+ case message := <-claim.Messages():
+ if message == nil {
+ return nil
+ }
+ h.messages <- message
+ session.MarkMessage(message, "")
+ case <-session.Context().Done():
+ return nil
+ }
+ }
+}
+
+// OffsetTestHandler implements sarama.ConsumerGroupHandler for offset testing
+type OffsetTestHandler struct {
+ messages chan *sarama.ConsumerMessage
+ ready chan bool
+ readyOnce sync.Once
+ stopAfter int
+ consumed int
+ t *testing.T
+}
+
+func (h *OffsetTestHandler) Setup(sarama.ConsumerGroupSession) error {
+ h.t.Logf("Offset test consumer setup")
+ h.readyOnce.Do(func() {
+ close(h.ready)
+ })
+ return nil
+}
+
+func (h *OffsetTestHandler) Cleanup(sarama.ConsumerGroupSession) error {
+ h.t.Logf("Offset test consumer cleanup")
+ return nil
+}
+
+func (h *OffsetTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ for {
+ select {
+ case message := <-claim.Messages():
+ if message == nil {
+ return nil
+ }
+ h.consumed++
+ h.messages <- message
+ session.MarkMessage(message, "")
+
+ // Stop after consuming the specified number of messages
+ if h.consumed >= h.stopAfter {
+ h.t.Logf("Stopping consumer after %d messages", h.consumed)
+ // Ensure commits are flushed before exiting the claim
+ session.Commit()
+ return nil
+ }
+ case <-session.Context().Done():
+ return nil
+ }
+ }
+}