diff options
Diffstat (limited to 'test/kafka/internal/testutil/clients.go')
| -rw-r--r-- | test/kafka/internal/testutil/clients.go | 15 |
1 files changed, 13 insertions, 2 deletions
diff --git a/test/kafka/internal/testutil/clients.go b/test/kafka/internal/testutil/clients.go index 53cae52e0..40d29b55d 100644 --- a/test/kafka/internal/testutil/clients.go +++ b/test/kafka/internal/testutil/clients.go @@ -84,7 +84,9 @@ func (k *KafkaGoClient) ProduceMessages(topicName string, messages []kafka.Messa } defer writer.Close() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + // Increased timeout to handle slow CI environments, especially when consumer groups + // are active and holding locks or requiring offset commits + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() err := writer.WriteMessages(ctx, messages...) @@ -140,7 +142,13 @@ func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCoun }) defer reader.Close() - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + // Log the initial offset position + offset := reader.Offset() + k.t.Logf("Consumer group reader created for group %s, initial offset: %d", groupID, offset) + + // Increased timeout for consumer groups - they require coordinator discovery, + // offset fetching, and offset commits which can be slow in CI environments + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() var messages []kafka.Message @@ -151,14 +159,17 @@ func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCoun return messages, fmt.Errorf("read message %d: %w", i, err) } messages = append(messages, msg) + k.t.Logf(" Fetched message %d: offset=%d, partition=%d", i, msg.Offset, msg.Partition) // Commit with simple retry to handle transient connection churn var commitErr error for attempt := 0; attempt < 3; attempt++ { commitErr = reader.CommitMessages(ctx, msg) if commitErr == nil { + k.t.Logf(" Committed offset %d (attempt %d)", msg.Offset, attempt+1) break } + k.t.Logf(" Commit attempt %d failed for offset %d: %v", attempt+1, msg.Offset, commitErr) // brief backoff time.Sleep(time.Duration(50*(1<<attempt)) * time.Millisecond) } |
