aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/internal/testutil/clients.go
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/internal/testutil/clients.go')
-rw-r--r--test/kafka/internal/testutil/clients.go15
1 files changed, 13 insertions, 2 deletions
diff --git a/test/kafka/internal/testutil/clients.go b/test/kafka/internal/testutil/clients.go
index 53cae52e0..40d29b55d 100644
--- a/test/kafka/internal/testutil/clients.go
+++ b/test/kafka/internal/testutil/clients.go
@@ -84,7 +84,9 @@ func (k *KafkaGoClient) ProduceMessages(topicName string, messages []kafka.Messa
}
defer writer.Close()
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ // Increased timeout to handle slow CI environments, especially when consumer groups
+ // are active and holding locks or requiring offset commits
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := writer.WriteMessages(ctx, messages...)
@@ -140,7 +142,13 @@ func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCoun
})
defer reader.Close()
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ // Log the initial offset position
+ offset := reader.Offset()
+ k.t.Logf("Consumer group reader created for group %s, initial offset: %d", groupID, offset)
+
+ // Increased timeout for consumer groups - they require coordinator discovery,
+ // offset fetching, and offset commits which can be slow in CI environments
+ ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
var messages []kafka.Message
@@ -151,14 +159,17 @@ func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCoun
return messages, fmt.Errorf("read message %d: %w", i, err)
}
messages = append(messages, msg)
+ k.t.Logf(" Fetched message %d: offset=%d, partition=%d", i, msg.Offset, msg.Partition)
// Commit with simple retry to handle transient connection churn
var commitErr error
for attempt := 0; attempt < 3; attempt++ {
commitErr = reader.CommitMessages(ctx, msg)
if commitErr == nil {
+ k.t.Logf(" Committed offset %d (attempt %d)", msg.Offset, attempt+1)
break
}
+ k.t.Logf(" Commit attempt %d failed for offset %d: %v", attempt+1, msg.Offset, commitErr)
// brief backoff
time.Sleep(time.Duration(50*(1<<attempt)) * time.Millisecond)
}