diff options
Diffstat (limited to 'test/kafka/e2e/offset_management_test.go')
| -rw-r--r-- | test/kafka/e2e/offset_management_test.go | 31 |
1 files changed, 30 insertions, 1 deletions
diff --git a/test/kafka/e2e/offset_management_test.go b/test/kafka/e2e/offset_management_test.go index 398647843..11bbdc5ea 100644 --- a/test/kafka/e2e/offset_management_test.go +++ b/test/kafka/e2e/offset_management_test.go @@ -81,21 +81,50 @@ func testConsumerGroupResumption(t *testing.T, addr, topic, groupID string) { msgGen := testutil.NewMessageGenerator() // Produce messages + t.Logf("=== Phase 1: Producing 4 messages to topic %s ===", topic) messages := msgGen.GenerateKafkaGoMessages(4) err := client.ProduceMessages(topic, messages) testutil.AssertNoError(t, err, "Failed to produce messages for resumption test") + t.Logf("Successfully produced %d messages", len(messages)) // Consume some messages + t.Logf("=== Phase 2: First consumer - consuming 2 messages with group %s ===", groupID) consumed1, err := client.ConsumeWithGroup(topic, groupID, 2) testutil.AssertNoError(t, err, "Failed to consume first batch") + t.Logf("First consumer consumed %d messages:", len(consumed1)) + for i, msg := range consumed1 { + t.Logf(" Message %d: offset=%d, partition=%d, value=%s", i, msg.Offset, msg.Partition, string(msg.Value)) + } // Simulate consumer restart by consuming remaining messages with same group ID + t.Logf("=== Phase 3: Second consumer (simulated restart) - consuming remaining messages with same group %s ===", groupID) consumed2, err := client.ConsumeWithGroup(topic, groupID, 2) testutil.AssertNoError(t, err, "Failed to consume after restart") + t.Logf("Second consumer consumed %d messages:", len(consumed2)) + for i, msg := range consumed2 { + t.Logf(" Message %d: offset=%d, partition=%d, value=%s", i, msg.Offset, msg.Partition, string(msg.Value)) + } // Verify total consumption totalConsumed := len(consumed1) + len(consumed2) + t.Logf("=== Verification: Total consumed %d messages (expected %d) ===", totalConsumed, len(messages)) + + // Check for duplicates + offsetsSeen := make(map[int64]bool) + duplicateCount := 0 + for _, msg := range append(consumed1, consumed2...) { + if offsetsSeen[msg.Offset] { + t.Logf("WARNING: Duplicate offset detected: %d", msg.Offset) + duplicateCount++ + } + offsetsSeen[msg.Offset] = true + } + + if duplicateCount > 0 { + t.Logf("ERROR: Found %d duplicate messages", duplicateCount) + } + testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages after restart") - t.Logf("SUCCESS: Consumer group resumption test completed") + t.Logf("SUCCESS: Consumer group resumption test completed - no duplicates, all messages consumed exactly once") } |
