aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/e2e/offset_management_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/e2e/offset_management_test.go')
-rw-r--r--test/kafka/e2e/offset_management_test.go31
1 files changed, 30 insertions, 1 deletions
diff --git a/test/kafka/e2e/offset_management_test.go b/test/kafka/e2e/offset_management_test.go
index 398647843..11bbdc5ea 100644
--- a/test/kafka/e2e/offset_management_test.go
+++ b/test/kafka/e2e/offset_management_test.go
@@ -81,21 +81,50 @@ func testConsumerGroupResumption(t *testing.T, addr, topic, groupID string) {
msgGen := testutil.NewMessageGenerator()
// Produce messages
+ t.Logf("=== Phase 1: Producing 4 messages to topic %s ===", topic)
messages := msgGen.GenerateKafkaGoMessages(4)
err := client.ProduceMessages(topic, messages)
testutil.AssertNoError(t, err, "Failed to produce messages for resumption test")
+ t.Logf("Successfully produced %d messages", len(messages))
// Consume some messages
+ t.Logf("=== Phase 2: First consumer - consuming 2 messages with group %s ===", groupID)
consumed1, err := client.ConsumeWithGroup(topic, groupID, 2)
testutil.AssertNoError(t, err, "Failed to consume first batch")
+ t.Logf("First consumer consumed %d messages:", len(consumed1))
+ for i, msg := range consumed1 {
+ t.Logf(" Message %d: offset=%d, partition=%d, value=%s", i, msg.Offset, msg.Partition, string(msg.Value))
+ }
// Simulate consumer restart by consuming remaining messages with same group ID
+ t.Logf("=== Phase 3: Second consumer (simulated restart) - consuming remaining messages with same group %s ===", groupID)
consumed2, err := client.ConsumeWithGroup(topic, groupID, 2)
testutil.AssertNoError(t, err, "Failed to consume after restart")
+ t.Logf("Second consumer consumed %d messages:", len(consumed2))
+ for i, msg := range consumed2 {
+ t.Logf(" Message %d: offset=%d, partition=%d, value=%s", i, msg.Offset, msg.Partition, string(msg.Value))
+ }
// Verify total consumption
totalConsumed := len(consumed1) + len(consumed2)
+ t.Logf("=== Verification: Total consumed %d messages (expected %d) ===", totalConsumed, len(messages))
+
+ // Check for duplicates
+ offsetsSeen := make(map[int64]bool)
+ duplicateCount := 0
+ for _, msg := range append(consumed1, consumed2...) {
+ if offsetsSeen[msg.Offset] {
+ t.Logf("WARNING: Duplicate offset detected: %d", msg.Offset)
+ duplicateCount++
+ }
+ offsetsSeen[msg.Offset] = true
+ }
+
+ if duplicateCount > 0 {
+ t.Logf("ERROR: Found %d duplicate messages", duplicateCount)
+ }
+
testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages after restart")
- t.Logf("SUCCESS: Consumer group resumption test completed")
+ t.Logf("SUCCESS: Consumer group resumption test completed - no duplicates, all messages consumed exactly once")
}