diff options
Diffstat (limited to 'test/kafka/internal/testutil/clients.go')
| -rw-r--r-- | test/kafka/internal/testutil/clients.go | 294 |
1 files changed, 294 insertions, 0 deletions
diff --git a/test/kafka/internal/testutil/clients.go b/test/kafka/internal/testutil/clients.go new file mode 100644 index 000000000..53cae52e0 --- /dev/null +++ b/test/kafka/internal/testutil/clients.go @@ -0,0 +1,294 @@ +package testutil + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/IBM/sarama" + "github.com/segmentio/kafka-go" +) + +// KafkaGoClient wraps kafka-go client with test utilities +type KafkaGoClient struct { + brokerAddr string + t *testing.T +} + +// SaramaClient wraps Sarama client with test utilities +type SaramaClient struct { + brokerAddr string + config *sarama.Config + t *testing.T +} + +// NewKafkaGoClient creates a new kafka-go test client +func NewKafkaGoClient(t *testing.T, brokerAddr string) *KafkaGoClient { + return &KafkaGoClient{ + brokerAddr: brokerAddr, + t: t, + } +} + +// NewSaramaClient creates a new Sarama test client with default config +func NewSaramaClient(t *testing.T, brokerAddr string) *SaramaClient { + config := sarama.NewConfig() + config.Version = sarama.V2_8_0_0 + config.Producer.Return.Successes = true + config.Consumer.Return.Errors = true + config.Consumer.Offsets.Initial = sarama.OffsetOldest // Start from earliest when no committed offset + + return &SaramaClient{ + brokerAddr: brokerAddr, + config: config, + t: t, + } +} + +// CreateTopic creates a topic using kafka-go +func (k *KafkaGoClient) CreateTopic(topicName string, partitions int, replicationFactor int) error { + k.t.Helper() + + conn, err := kafka.Dial("tcp", k.brokerAddr) + if err != nil { + return fmt.Errorf("dial broker: %w", err) + } + defer conn.Close() + + topicConfig := kafka.TopicConfig{ + Topic: topicName, + NumPartitions: partitions, + ReplicationFactor: replicationFactor, + } + + err = conn.CreateTopics(topicConfig) + if err != nil { + return fmt.Errorf("create topic: %w", err) + } + + k.t.Logf("Created topic %s with %d partitions", topicName, partitions) + return nil +} + +// ProduceMessages produces messages using kafka-go +func (k *KafkaGoClient) ProduceMessages(topicName string, messages []kafka.Message) error { + k.t.Helper() + + writer := &kafka.Writer{ + Addr: kafka.TCP(k.brokerAddr), + Topic: topicName, + Balancer: &kafka.LeastBytes{}, + BatchTimeout: 50 * time.Millisecond, + RequiredAcks: kafka.RequireOne, + } + defer writer.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + err := writer.WriteMessages(ctx, messages...) + if err != nil { + return fmt.Errorf("write messages: %w", err) + } + + k.t.Logf("Produced %d messages to topic %s", len(messages), topicName) + return nil +} + +// ConsumeMessages consumes messages using kafka-go +func (k *KafkaGoClient) ConsumeMessages(topicName string, expectedCount int) ([]kafka.Message, error) { + k.t.Helper() + + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{k.brokerAddr}, + Topic: topicName, + Partition: 0, // Explicitly set partition 0 for simple consumption + StartOffset: kafka.FirstOffset, + MinBytes: 1, + MaxBytes: 10e6, + }) + defer reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var messages []kafka.Message + for i := 0; i < expectedCount; i++ { + msg, err := reader.ReadMessage(ctx) + if err != nil { + return messages, fmt.Errorf("read message %d: %w", i, err) + } + messages = append(messages, msg) + } + + k.t.Logf("Consumed %d messages from topic %s", len(messages), topicName) + return messages, nil +} + +// ConsumeWithGroup consumes messages using consumer group +func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCount int) ([]kafka.Message, error) { + k.t.Helper() + + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{k.brokerAddr}, + Topic: topicName, + GroupID: groupID, + MinBytes: 1, + MaxBytes: 10e6, + CommitInterval: 500 * time.Millisecond, + }) + defer reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var messages []kafka.Message + for i := 0; i < expectedCount; i++ { + // Fetch then explicitly commit to better control commit timing + msg, err := reader.FetchMessage(ctx) + if err != nil { + return messages, fmt.Errorf("read message %d: %w", i, err) + } + messages = append(messages, msg) + + // Commit with simple retry to handle transient connection churn + var commitErr error + for attempt := 0; attempt < 3; attempt++ { + commitErr = reader.CommitMessages(ctx, msg) + if commitErr == nil { + break + } + // brief backoff + time.Sleep(time.Duration(50*(1<<attempt)) * time.Millisecond) + } + if commitErr != nil { + return messages, fmt.Errorf("committing message %d: %w", i, commitErr) + } + } + + k.t.Logf("Consumed %d messages from topic %s with group %s", len(messages), topicName, groupID) + return messages, nil +} + +// CreateTopic creates a topic using Sarama +func (s *SaramaClient) CreateTopic(topicName string, partitions int32, replicationFactor int16) error { + s.t.Helper() + + admin, err := sarama.NewClusterAdmin([]string{s.brokerAddr}, s.config) + if err != nil { + return fmt.Errorf("create admin client: %w", err) + } + defer admin.Close() + + topicDetail := &sarama.TopicDetail{ + NumPartitions: partitions, + ReplicationFactor: replicationFactor, + } + + err = admin.CreateTopic(topicName, topicDetail, false) + if err != nil { + return fmt.Errorf("create topic: %w", err) + } + + s.t.Logf("Created topic %s with %d partitions", topicName, partitions) + return nil +} + +// ProduceMessages produces messages using Sarama +func (s *SaramaClient) ProduceMessages(topicName string, messages []string) error { + s.t.Helper() + + producer, err := sarama.NewSyncProducer([]string{s.brokerAddr}, s.config) + if err != nil { + return fmt.Errorf("create producer: %w", err) + } + defer producer.Close() + + for i, msgText := range messages { + msg := &sarama.ProducerMessage{ + Topic: topicName, + Key: sarama.StringEncoder(fmt.Sprintf("Test message %d", i)), + Value: sarama.StringEncoder(msgText), + } + + partition, offset, err := producer.SendMessage(msg) + if err != nil { + return fmt.Errorf("send message %d: %w", i, err) + } + + s.t.Logf("Produced message %d: partition=%d, offset=%d", i, partition, offset) + } + + return nil +} + +// ProduceMessageToPartition produces a single message to a specific partition using Sarama +func (s *SaramaClient) ProduceMessageToPartition(topicName string, partition int32, message string) error { + s.t.Helper() + + producer, err := sarama.NewSyncProducer([]string{s.brokerAddr}, s.config) + if err != nil { + return fmt.Errorf("create producer: %w", err) + } + defer producer.Close() + + msg := &sarama.ProducerMessage{ + Topic: topicName, + Partition: partition, + Key: sarama.StringEncoder(fmt.Sprintf("key-p%d", partition)), + Value: sarama.StringEncoder(message), + } + + actualPartition, offset, err := producer.SendMessage(msg) + if err != nil { + return fmt.Errorf("send message to partition %d: %w", partition, err) + } + + s.t.Logf("Produced message to partition %d: actualPartition=%d, offset=%d", partition, actualPartition, offset) + return nil +} + +// ConsumeMessages consumes messages using Sarama +func (s *SaramaClient) ConsumeMessages(topicName string, partition int32, expectedCount int) ([]string, error) { + s.t.Helper() + + consumer, err := sarama.NewConsumer([]string{s.brokerAddr}, s.config) + if err != nil { + return nil, fmt.Errorf("create consumer: %w", err) + } + defer consumer.Close() + + partitionConsumer, err := consumer.ConsumePartition(topicName, partition, sarama.OffsetOldest) + if err != nil { + return nil, fmt.Errorf("create partition consumer: %w", err) + } + defer partitionConsumer.Close() + + var messages []string + timeout := time.After(30 * time.Second) + + for len(messages) < expectedCount { + select { + case msg := <-partitionConsumer.Messages(): + messages = append(messages, string(msg.Value)) + case err := <-partitionConsumer.Errors(): + return messages, fmt.Errorf("consumer error: %w", err) + case <-timeout: + return messages, fmt.Errorf("timeout waiting for messages, got %d/%d", len(messages), expectedCount) + } + } + + s.t.Logf("Consumed %d messages from topic %s", len(messages), topicName) + return messages, nil +} + +// GetConfig returns the Sarama configuration +func (s *SaramaClient) GetConfig() *sarama.Config { + return s.config +} + +// SetConfig sets a custom Sarama configuration +func (s *SaramaClient) SetConfig(config *sarama.Config) { + s.config = config +} |
