aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/kafka-client-loadtest/internal/consumer/consumer.go')
-rw-r--r--test/kafka/kafka-client-loadtest/internal/consumer/consumer.go626
1 files changed, 626 insertions, 0 deletions
diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
new file mode 100644
index 000000000..e1c4caa41
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
@@ -0,0 +1,626 @@
+package consumer
+
+import (
+ "context"
+ "encoding/binary"
+ "encoding/json"
+ "fmt"
+ "log"
+ "sync"
+ "time"
+
+ "github.com/IBM/sarama"
+ "github.com/linkedin/goavro/v2"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics"
+ pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb"
+ "google.golang.org/protobuf/proto"
+)
+
+// Consumer represents a Kafka consumer for load testing
+type Consumer struct {
+ id int
+ config *config.Config
+ metricsCollector *metrics.Collector
+ saramaConsumer sarama.ConsumerGroup
+ useConfluent bool // Always false, Sarama only
+ topics []string
+ consumerGroup string
+ avroCodec *goavro.Codec
+
+ // Schema format tracking per topic
+ schemaFormats map[string]string // topic -> schema format mapping (AVRO, JSON, PROTOBUF)
+
+ // Processing tracking
+ messagesProcessed int64
+ lastOffset map[string]map[int32]int64
+ offsetMutex sync.RWMutex
+}
+
+// New creates a new consumer instance
+func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, error) {
+ consumerGroup := fmt.Sprintf("%s-%d", cfg.Consumers.GroupPrefix, id)
+
+ c := &Consumer{
+ id: id,
+ config: cfg,
+ metricsCollector: collector,
+ topics: cfg.GetTopicNames(),
+ consumerGroup: consumerGroup,
+ useConfluent: false, // Use Sarama by default
+ lastOffset: make(map[string]map[int32]int64),
+ schemaFormats: make(map[string]string),
+ }
+
+ // Initialize schema formats for each topic (must match producer logic)
+ // This mirrors the format distribution in cmd/loadtest/main.go registerSchemas()
+ for i, topic := range c.topics {
+ var schemaFormat string
+ if cfg.Producers.SchemaFormat != "" {
+ // Use explicit config if provided
+ schemaFormat = cfg.Producers.SchemaFormat
+ } else {
+ // Distribute across formats (same as producer)
+ switch i % 3 {
+ case 0:
+ schemaFormat = "AVRO"
+ case 1:
+ schemaFormat = "JSON"
+ case 2:
+ schemaFormat = "PROTOBUF"
+ }
+ }
+ c.schemaFormats[topic] = schemaFormat
+ log.Printf("Consumer %d: Topic %s will use schema format: %s", id, topic, schemaFormat)
+ }
+
+ // Initialize consumer based on configuration
+ if c.useConfluent {
+ if err := c.initConfluentConsumer(); err != nil {
+ return nil, fmt.Errorf("failed to initialize Confluent consumer: %w", err)
+ }
+ } else {
+ if err := c.initSaramaConsumer(); err != nil {
+ return nil, fmt.Errorf("failed to initialize Sarama consumer: %w", err)
+ }
+ }
+
+ // Initialize Avro codec if schemas are enabled
+ if cfg.Schemas.Enabled {
+ if err := c.initAvroCodec(); err != nil {
+ return nil, fmt.Errorf("failed to initialize Avro codec: %w", err)
+ }
+ }
+
+ log.Printf("Consumer %d initialized for group %s", id, consumerGroup)
+ return c, nil
+}
+
+// initSaramaConsumer initializes the Sarama consumer group
+func (c *Consumer) initSaramaConsumer() error {
+ config := sarama.NewConfig()
+
+ // Consumer configuration
+ config.Consumer.Return.Errors = true
+ config.Consumer.Offsets.Initial = sarama.OffsetOldest
+ if c.config.Consumers.AutoOffsetReset == "latest" {
+ config.Consumer.Offsets.Initial = sarama.OffsetNewest
+ }
+
+ // Auto commit configuration
+ config.Consumer.Offsets.AutoCommit.Enable = c.config.Consumers.EnableAutoCommit
+ config.Consumer.Offsets.AutoCommit.Interval = time.Duration(c.config.Consumers.AutoCommitIntervalMs) * time.Millisecond
+
+ // Session and heartbeat configuration
+ config.Consumer.Group.Session.Timeout = time.Duration(c.config.Consumers.SessionTimeoutMs) * time.Millisecond
+ config.Consumer.Group.Heartbeat.Interval = time.Duration(c.config.Consumers.HeartbeatIntervalMs) * time.Millisecond
+
+ // Fetch configuration
+ config.Consumer.Fetch.Min = int32(c.config.Consumers.FetchMinBytes)
+ config.Consumer.Fetch.Default = 10 * 1024 * 1024 // 10MB per partition (increased from 1MB default)
+ config.Consumer.Fetch.Max = int32(c.config.Consumers.FetchMaxBytes)
+ config.Consumer.MaxWaitTime = time.Duration(c.config.Consumers.FetchMaxWaitMs) * time.Millisecond
+ config.Consumer.MaxProcessingTime = time.Duration(c.config.Consumers.MaxPollIntervalMs) * time.Millisecond
+
+ // Channel buffer sizes for concurrent partition consumption
+ config.ChannelBufferSize = 256 // Increase from default 256 to allow more buffering
+
+ // Enable concurrent partition fetching by increasing the number of broker connections
+ // This allows Sarama to fetch from multiple partitions in parallel
+ config.Net.MaxOpenRequests = 20 // Increase from default 5 to allow 20 concurrent requests
+
+ // Version
+ config.Version = sarama.V2_8_0_0
+
+ // Create consumer group
+ consumerGroup, err := sarama.NewConsumerGroup(c.config.Kafka.BootstrapServers, c.consumerGroup, config)
+ if err != nil {
+ return fmt.Errorf("failed to create Sarama consumer group: %w", err)
+ }
+
+ c.saramaConsumer = consumerGroup
+ return nil
+}
+
+// initConfluentConsumer initializes the Confluent Kafka Go consumer
+func (c *Consumer) initConfluentConsumer() error {
+ // Confluent consumer disabled, using Sarama only
+ return fmt.Errorf("confluent consumer not enabled")
+}
+
+// initAvroCodec initializes the Avro codec for schema-based messages
+func (c *Consumer) initAvroCodec() error {
+ // Use the LoadTestMessage schema (matches what producer uses)
+ loadTestSchema := `{
+ "type": "record",
+ "name": "LoadTestMessage",
+ "namespace": "com.seaweedfs.loadtest",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "timestamp", "type": "long"},
+ {"name": "producer_id", "type": "int"},
+ {"name": "counter", "type": "long"},
+ {"name": "user_id", "type": "string"},
+ {"name": "event_type", "type": "string"},
+ {"name": "properties", "type": {"type": "map", "values": "string"}}
+ ]
+ }`
+
+ codec, err := goavro.NewCodec(loadTestSchema)
+ if err != nil {
+ return fmt.Errorf("failed to create Avro codec: %w", err)
+ }
+
+ c.avroCodec = codec
+ return nil
+}
+
+// Run starts the consumer and consumes messages until the context is cancelled
+func (c *Consumer) Run(ctx context.Context) {
+ log.Printf("Consumer %d starting for group %s", c.id, c.consumerGroup)
+ defer log.Printf("Consumer %d stopped", c.id)
+
+ if c.useConfluent {
+ c.runConfluentConsumer(ctx)
+ } else {
+ c.runSaramaConsumer(ctx)
+ }
+}
+
+// runSaramaConsumer runs the Sarama consumer group
+func (c *Consumer) runSaramaConsumer(ctx context.Context) {
+ handler := &ConsumerGroupHandler{
+ consumer: c,
+ }
+
+ var wg sync.WaitGroup
+
+ // Start error handler
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case err, ok := <-c.saramaConsumer.Errors():
+ if !ok {
+ return
+ }
+ log.Printf("Consumer %d error: %v", c.id, err)
+ c.metricsCollector.RecordConsumerError()
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ // Start consumer group session
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if err := c.saramaConsumer.Consume(ctx, c.topics, handler); err != nil {
+ log.Printf("Consumer %d: Error consuming: %v", c.id, err)
+ c.metricsCollector.RecordConsumerError()
+
+ // Wait before retrying
+ select {
+ case <-time.After(5 * time.Second):
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+ }
+ }()
+
+ // Start lag monitoring
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ c.monitorConsumerLag(ctx)
+ }()
+
+ // Wait for completion
+ <-ctx.Done()
+ log.Printf("Consumer %d: Context cancelled, shutting down", c.id)
+ wg.Wait()
+}
+
+// runConfluentConsumer runs the Confluent consumer
+func (c *Consumer) runConfluentConsumer(ctx context.Context) {
+ // Confluent consumer disabled, using Sarama only
+ log.Printf("Consumer %d: Confluent consumer not enabled", c.id)
+}
+
+// processMessage processes a consumed message
+func (c *Consumer) processMessage(topicPtr *string, partition int32, offset int64, key, value []byte) error {
+ topic := ""
+ if topicPtr != nil {
+ topic = *topicPtr
+ }
+
+ // Update offset tracking
+ c.updateOffset(topic, partition, offset)
+
+ // Decode message based on topic-specific schema format
+ var decodedMessage interface{}
+ var err error
+
+ // Determine schema format for this topic (if schemas are enabled)
+ var schemaFormat string
+ if c.config.Schemas.Enabled {
+ schemaFormat = c.schemaFormats[topic]
+ if schemaFormat == "" {
+ // Fallback to config if topic not in map
+ schemaFormat = c.config.Producers.ValueType
+ }
+ } else {
+ // No schemas, use global value type
+ schemaFormat = c.config.Producers.ValueType
+ }
+
+ // Decode message based on format
+ switch schemaFormat {
+ case "avro", "AVRO":
+ decodedMessage, err = c.decodeAvroMessage(value)
+ case "json", "JSON", "JSON_SCHEMA":
+ decodedMessage, err = c.decodeJSONSchemaMessage(value)
+ case "protobuf", "PROTOBUF":
+ decodedMessage, err = c.decodeProtobufMessage(value)
+ case "binary":
+ decodedMessage, err = c.decodeBinaryMessage(value)
+ default:
+ // Fallback to plain JSON
+ decodedMessage, err = c.decodeJSONMessage(value)
+ }
+
+ if err != nil {
+ return fmt.Errorf("failed to decode message: %w", err)
+ }
+
+ // Note: Removed artificial delay to allow maximum throughput
+ // If you need to simulate processing time, add a configurable delay setting
+ // time.Sleep(time.Millisecond) // Minimal processing delay
+
+ // Record metrics
+ c.metricsCollector.RecordConsumedMessage(len(value))
+ c.messagesProcessed++
+
+ // Log progress
+ if c.id == 0 && c.messagesProcessed%1000 == 0 {
+ log.Printf("Consumer %d: Processed %d messages (latest: %s[%d]@%d)",
+ c.id, c.messagesProcessed, topic, partition, offset)
+ }
+
+ // Optional: Validate message content (for testing purposes)
+ if c.config.Chaos.Enabled {
+ if err := c.validateMessage(decodedMessage); err != nil {
+ log.Printf("Consumer %d: Message validation failed: %v", c.id, err)
+ }
+ }
+
+ return nil
+}
+
+// decodeJSONMessage decodes a JSON message
+func (c *Consumer) decodeJSONMessage(value []byte) (interface{}, error) {
+ var message map[string]interface{}
+ if err := json.Unmarshal(value, &message); err != nil {
+ // DEBUG: Log the raw bytes when JSON parsing fails
+ log.Printf("Consumer %d: JSON decode failed. Length: %d, Raw bytes (hex): %x, Raw string: %q, Error: %v",
+ c.id, len(value), value, string(value), err)
+ return nil, err
+ }
+ return message, nil
+}
+
+// decodeAvroMessage decodes an Avro message (handles Confluent Wire Format)
+func (c *Consumer) decodeAvroMessage(value []byte) (interface{}, error) {
+ if c.avroCodec == nil {
+ return nil, fmt.Errorf("Avro codec not initialized")
+ }
+
+ // Handle Confluent Wire Format when schemas are enabled
+ var avroData []byte
+ if c.config.Schemas.Enabled {
+ if len(value) < 5 {
+ return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value))
+ }
+
+ // Check magic byte (should be 0)
+ if value[0] != 0 {
+ return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0])
+ }
+
+ // Extract schema ID (bytes 1-4, big-endian)
+ schemaID := binary.BigEndian.Uint32(value[1:5])
+ _ = schemaID // TODO: Could validate schema ID matches expected schema
+
+ // Extract Avro data (bytes 5+)
+ avroData = value[5:]
+ } else {
+ // No wire format, use raw data
+ avroData = value
+ }
+
+ native, _, err := c.avroCodec.NativeFromBinary(avroData)
+ if err != nil {
+ return nil, fmt.Errorf("failed to decode Avro data: %w", err)
+ }
+
+ return native, nil
+}
+
+// decodeJSONSchemaMessage decodes a JSON Schema message (handles Confluent Wire Format)
+func (c *Consumer) decodeJSONSchemaMessage(value []byte) (interface{}, error) {
+ // Handle Confluent Wire Format when schemas are enabled
+ var jsonData []byte
+ if c.config.Schemas.Enabled {
+ if len(value) < 5 {
+ return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value))
+ }
+
+ // Check magic byte (should be 0)
+ if value[0] != 0 {
+ return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0])
+ }
+
+ // Extract schema ID (bytes 1-4, big-endian)
+ schemaID := binary.BigEndian.Uint32(value[1:5])
+ _ = schemaID // TODO: Could validate schema ID matches expected schema
+
+ // Extract JSON data (bytes 5+)
+ jsonData = value[5:]
+ } else {
+ // No wire format, use raw data
+ jsonData = value
+ }
+
+ // Decode JSON
+ var message map[string]interface{}
+ if err := json.Unmarshal(jsonData, &message); err != nil {
+ return nil, fmt.Errorf("failed to decode JSON data: %w", err)
+ }
+
+ return message, nil
+}
+
+// decodeProtobufMessage decodes a Protobuf message (handles Confluent Wire Format)
+func (c *Consumer) decodeProtobufMessage(value []byte) (interface{}, error) {
+ // Handle Confluent Wire Format when schemas are enabled
+ var protoData []byte
+ if c.config.Schemas.Enabled {
+ if len(value) < 5 {
+ return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value))
+ }
+
+ // Check magic byte (should be 0)
+ if value[0] != 0 {
+ return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0])
+ }
+
+ // Extract schema ID (bytes 1-4, big-endian)
+ schemaID := binary.BigEndian.Uint32(value[1:5])
+ _ = schemaID // TODO: Could validate schema ID matches expected schema
+
+ // Extract Protobuf data (bytes 5+)
+ protoData = value[5:]
+ } else {
+ // No wire format, use raw data
+ protoData = value
+ }
+
+ // Unmarshal protobuf message
+ var protoMsg pb.LoadTestMessage
+ if err := proto.Unmarshal(protoData, &protoMsg); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal Protobuf data: %w", err)
+ }
+
+ // Convert to map for consistency with other decoders
+ return map[string]interface{}{
+ "id": protoMsg.Id,
+ "timestamp": protoMsg.Timestamp,
+ "producer_id": protoMsg.ProducerId,
+ "counter": protoMsg.Counter,
+ "user_id": protoMsg.UserId,
+ "event_type": protoMsg.EventType,
+ "properties": protoMsg.Properties,
+ }, nil
+}
+
+// decodeBinaryMessage decodes a binary message
+func (c *Consumer) decodeBinaryMessage(value []byte) (interface{}, error) {
+ if len(value) < 20 {
+ return nil, fmt.Errorf("binary message too short")
+ }
+
+ // Extract fields from the binary format:
+ // [producer_id:4][counter:8][timestamp:8][random_data:...]
+
+ producerID := int(value[0])<<24 | int(value[1])<<16 | int(value[2])<<8 | int(value[3])
+
+ var counter int64
+ for i := 0; i < 8; i++ {
+ counter |= int64(value[4+i]) << (56 - i*8)
+ }
+
+ var timestamp int64
+ for i := 0; i < 8; i++ {
+ timestamp |= int64(value[12+i]) << (56 - i*8)
+ }
+
+ return map[string]interface{}{
+ "producer_id": producerID,
+ "counter": counter,
+ "timestamp": timestamp,
+ "data_size": len(value),
+ }, nil
+}
+
+// validateMessage performs basic message validation
+func (c *Consumer) validateMessage(message interface{}) error {
+ // This is a placeholder for message validation logic
+ // In a real load test, you might validate:
+ // - Message structure
+ // - Required fields
+ // - Data consistency
+ // - Schema compliance
+
+ if message == nil {
+ return fmt.Errorf("message is nil")
+ }
+
+ return nil
+}
+
+// updateOffset updates the last seen offset for lag calculation
+func (c *Consumer) updateOffset(topic string, partition int32, offset int64) {
+ c.offsetMutex.Lock()
+ defer c.offsetMutex.Unlock()
+
+ if c.lastOffset[topic] == nil {
+ c.lastOffset[topic] = make(map[int32]int64)
+ }
+ c.lastOffset[topic][partition] = offset
+}
+
+// monitorConsumerLag monitors and reports consumer lag
+func (c *Consumer) monitorConsumerLag(ctx context.Context) {
+ ticker := time.NewTicker(30 * time.Second)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ c.reportConsumerLag()
+ }
+ }
+}
+
+// reportConsumerLag calculates and reports consumer lag
+func (c *Consumer) reportConsumerLag() {
+ // This is a simplified lag calculation
+ // In a real implementation, you would query the broker for high water marks
+
+ c.offsetMutex.RLock()
+ defer c.offsetMutex.RUnlock()
+
+ for topic, partitions := range c.lastOffset {
+ for partition, _ := range partitions {
+ // For simplicity, assume lag is always 0 when we're consuming actively
+ // In a real test, you would compare against the high water mark
+ lag := int64(0)
+
+ c.metricsCollector.UpdateConsumerLag(c.consumerGroup, topic, partition, lag)
+ }
+ }
+}
+
+// Close closes the consumer and cleans up resources
+func (c *Consumer) Close() error {
+ log.Printf("Consumer %d: Closing", c.id)
+
+ if c.saramaConsumer != nil {
+ return c.saramaConsumer.Close()
+ }
+
+ return nil
+}
+
+// ConsumerGroupHandler implements sarama.ConsumerGroupHandler
+type ConsumerGroupHandler struct {
+ consumer *Consumer
+}
+
+// Setup is run at the beginning of a new session, before ConsumeClaim
+func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
+ log.Printf("Consumer %d: Consumer group session setup", h.consumer.id)
+ return nil
+}
+
+// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
+func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
+ log.Printf("Consumer %d: Consumer group session cleanup", h.consumer.id)
+ return nil
+}
+
+// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages()
+func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ msgCount := 0
+ for {
+ select {
+ case message, ok := <-claim.Messages():
+ if !ok {
+ return nil
+ }
+ msgCount++
+
+ // Process the message
+ var key []byte
+ if message.Key != nil {
+ key = message.Key
+ }
+
+ if err := h.consumer.processMessage(&message.Topic, message.Partition, message.Offset, key, message.Value); err != nil {
+ log.Printf("Consumer %d: Error processing message: %v", h.consumer.id, err)
+ h.consumer.metricsCollector.RecordConsumerError()
+
+ // Add a small delay for schema validation or other processing errors to avoid overloading
+ // select {
+ // case <-time.After(100 * time.Millisecond):
+ // // Continue after brief delay
+ // case <-session.Context().Done():
+ // return nil
+ // }
+ } else {
+ // Mark message as processed
+ session.MarkMessage(message, "")
+ }
+
+ case <-session.Context().Done():
+ log.Printf("Consumer %d: Session context cancelled for %s[%d]",
+ h.consumer.id, claim.Topic(), claim.Partition())
+ return nil
+ }
+ }
+}
+
+// Helper functions
+
+func joinStrings(strs []string, sep string) string {
+ if len(strs) == 0 {
+ return ""
+ }
+
+ result := strs[0]
+ for i := 1; i < len(strs); i++ {
+ result += sep + strs[i]
+ }
+ return result
+}