diff options
Diffstat (limited to 'test/kafka/kafka-client-loadtest/internal/consumer/consumer.go')
| -rw-r--r-- | test/kafka/kafka-client-loadtest/internal/consumer/consumer.go | 626 |
1 files changed, 626 insertions, 0 deletions
diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go new file mode 100644 index 000000000..e1c4caa41 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -0,0 +1,626 @@ +package consumer + +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "log" + "sync" + "time" + + "github.com/IBM/sarama" + "github.com/linkedin/goavro/v2" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics" + pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb" + "google.golang.org/protobuf/proto" +) + +// Consumer represents a Kafka consumer for load testing +type Consumer struct { + id int + config *config.Config + metricsCollector *metrics.Collector + saramaConsumer sarama.ConsumerGroup + useConfluent bool // Always false, Sarama only + topics []string + consumerGroup string + avroCodec *goavro.Codec + + // Schema format tracking per topic + schemaFormats map[string]string // topic -> schema format mapping (AVRO, JSON, PROTOBUF) + + // Processing tracking + messagesProcessed int64 + lastOffset map[string]map[int32]int64 + offsetMutex sync.RWMutex +} + +// New creates a new consumer instance +func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, error) { + consumerGroup := fmt.Sprintf("%s-%d", cfg.Consumers.GroupPrefix, id) + + c := &Consumer{ + id: id, + config: cfg, + metricsCollector: collector, + topics: cfg.GetTopicNames(), + consumerGroup: consumerGroup, + useConfluent: false, // Use Sarama by default + lastOffset: make(map[string]map[int32]int64), + schemaFormats: make(map[string]string), + } + + // Initialize schema formats for each topic (must match producer logic) + // This mirrors the format distribution in cmd/loadtest/main.go registerSchemas() + for i, topic := range c.topics { + var schemaFormat string + if cfg.Producers.SchemaFormat != "" { + // Use explicit config if provided + schemaFormat = cfg.Producers.SchemaFormat + } else { + // Distribute across formats (same as producer) + switch i % 3 { + case 0: + schemaFormat = "AVRO" + case 1: + schemaFormat = "JSON" + case 2: + schemaFormat = "PROTOBUF" + } + } + c.schemaFormats[topic] = schemaFormat + log.Printf("Consumer %d: Topic %s will use schema format: %s", id, topic, schemaFormat) + } + + // Initialize consumer based on configuration + if c.useConfluent { + if err := c.initConfluentConsumer(); err != nil { + return nil, fmt.Errorf("failed to initialize Confluent consumer: %w", err) + } + } else { + if err := c.initSaramaConsumer(); err != nil { + return nil, fmt.Errorf("failed to initialize Sarama consumer: %w", err) + } + } + + // Initialize Avro codec if schemas are enabled + if cfg.Schemas.Enabled { + if err := c.initAvroCodec(); err != nil { + return nil, fmt.Errorf("failed to initialize Avro codec: %w", err) + } + } + + log.Printf("Consumer %d initialized for group %s", id, consumerGroup) + return c, nil +} + +// initSaramaConsumer initializes the Sarama consumer group +func (c *Consumer) initSaramaConsumer() error { + config := sarama.NewConfig() + + // Consumer configuration + config.Consumer.Return.Errors = true + config.Consumer.Offsets.Initial = sarama.OffsetOldest + if c.config.Consumers.AutoOffsetReset == "latest" { + config.Consumer.Offsets.Initial = sarama.OffsetNewest + } + + // Auto commit configuration + config.Consumer.Offsets.AutoCommit.Enable = c.config.Consumers.EnableAutoCommit + config.Consumer.Offsets.AutoCommit.Interval = time.Duration(c.config.Consumers.AutoCommitIntervalMs) * time.Millisecond + + // Session and heartbeat configuration + config.Consumer.Group.Session.Timeout = time.Duration(c.config.Consumers.SessionTimeoutMs) * time.Millisecond + config.Consumer.Group.Heartbeat.Interval = time.Duration(c.config.Consumers.HeartbeatIntervalMs) * time.Millisecond + + // Fetch configuration + config.Consumer.Fetch.Min = int32(c.config.Consumers.FetchMinBytes) + config.Consumer.Fetch.Default = 10 * 1024 * 1024 // 10MB per partition (increased from 1MB default) + config.Consumer.Fetch.Max = int32(c.config.Consumers.FetchMaxBytes) + config.Consumer.MaxWaitTime = time.Duration(c.config.Consumers.FetchMaxWaitMs) * time.Millisecond + config.Consumer.MaxProcessingTime = time.Duration(c.config.Consumers.MaxPollIntervalMs) * time.Millisecond + + // Channel buffer sizes for concurrent partition consumption + config.ChannelBufferSize = 256 // Increase from default 256 to allow more buffering + + // Enable concurrent partition fetching by increasing the number of broker connections + // This allows Sarama to fetch from multiple partitions in parallel + config.Net.MaxOpenRequests = 20 // Increase from default 5 to allow 20 concurrent requests + + // Version + config.Version = sarama.V2_8_0_0 + + // Create consumer group + consumerGroup, err := sarama.NewConsumerGroup(c.config.Kafka.BootstrapServers, c.consumerGroup, config) + if err != nil { + return fmt.Errorf("failed to create Sarama consumer group: %w", err) + } + + c.saramaConsumer = consumerGroup + return nil +} + +// initConfluentConsumer initializes the Confluent Kafka Go consumer +func (c *Consumer) initConfluentConsumer() error { + // Confluent consumer disabled, using Sarama only + return fmt.Errorf("confluent consumer not enabled") +} + +// initAvroCodec initializes the Avro codec for schema-based messages +func (c *Consumer) initAvroCodec() error { + // Use the LoadTestMessage schema (matches what producer uses) + loadTestSchema := `{ + "type": "record", + "name": "LoadTestMessage", + "namespace": "com.seaweedfs.loadtest", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "timestamp", "type": "long"}, + {"name": "producer_id", "type": "int"}, + {"name": "counter", "type": "long"}, + {"name": "user_id", "type": "string"}, + {"name": "event_type", "type": "string"}, + {"name": "properties", "type": {"type": "map", "values": "string"}} + ] + }` + + codec, err := goavro.NewCodec(loadTestSchema) + if err != nil { + return fmt.Errorf("failed to create Avro codec: %w", err) + } + + c.avroCodec = codec + return nil +} + +// Run starts the consumer and consumes messages until the context is cancelled +func (c *Consumer) Run(ctx context.Context) { + log.Printf("Consumer %d starting for group %s", c.id, c.consumerGroup) + defer log.Printf("Consumer %d stopped", c.id) + + if c.useConfluent { + c.runConfluentConsumer(ctx) + } else { + c.runSaramaConsumer(ctx) + } +} + +// runSaramaConsumer runs the Sarama consumer group +func (c *Consumer) runSaramaConsumer(ctx context.Context) { + handler := &ConsumerGroupHandler{ + consumer: c, + } + + var wg sync.WaitGroup + + // Start error handler + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case err, ok := <-c.saramaConsumer.Errors(): + if !ok { + return + } + log.Printf("Consumer %d error: %v", c.id, err) + c.metricsCollector.RecordConsumerError() + case <-ctx.Done(): + return + } + } + }() + + // Start consumer group session + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + if err := c.saramaConsumer.Consume(ctx, c.topics, handler); err != nil { + log.Printf("Consumer %d: Error consuming: %v", c.id, err) + c.metricsCollector.RecordConsumerError() + + // Wait before retrying + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return + } + } + } + } + }() + + // Start lag monitoring + wg.Add(1) + go func() { + defer wg.Done() + c.monitorConsumerLag(ctx) + }() + + // Wait for completion + <-ctx.Done() + log.Printf("Consumer %d: Context cancelled, shutting down", c.id) + wg.Wait() +} + +// runConfluentConsumer runs the Confluent consumer +func (c *Consumer) runConfluentConsumer(ctx context.Context) { + // Confluent consumer disabled, using Sarama only + log.Printf("Consumer %d: Confluent consumer not enabled", c.id) +} + +// processMessage processes a consumed message +func (c *Consumer) processMessage(topicPtr *string, partition int32, offset int64, key, value []byte) error { + topic := "" + if topicPtr != nil { + topic = *topicPtr + } + + // Update offset tracking + c.updateOffset(topic, partition, offset) + + // Decode message based on topic-specific schema format + var decodedMessage interface{} + var err error + + // Determine schema format for this topic (if schemas are enabled) + var schemaFormat string + if c.config.Schemas.Enabled { + schemaFormat = c.schemaFormats[topic] + if schemaFormat == "" { + // Fallback to config if topic not in map + schemaFormat = c.config.Producers.ValueType + } + } else { + // No schemas, use global value type + schemaFormat = c.config.Producers.ValueType + } + + // Decode message based on format + switch schemaFormat { + case "avro", "AVRO": + decodedMessage, err = c.decodeAvroMessage(value) + case "json", "JSON", "JSON_SCHEMA": + decodedMessage, err = c.decodeJSONSchemaMessage(value) + case "protobuf", "PROTOBUF": + decodedMessage, err = c.decodeProtobufMessage(value) + case "binary": + decodedMessage, err = c.decodeBinaryMessage(value) + default: + // Fallback to plain JSON + decodedMessage, err = c.decodeJSONMessage(value) + } + + if err != nil { + return fmt.Errorf("failed to decode message: %w", err) + } + + // Note: Removed artificial delay to allow maximum throughput + // If you need to simulate processing time, add a configurable delay setting + // time.Sleep(time.Millisecond) // Minimal processing delay + + // Record metrics + c.metricsCollector.RecordConsumedMessage(len(value)) + c.messagesProcessed++ + + // Log progress + if c.id == 0 && c.messagesProcessed%1000 == 0 { + log.Printf("Consumer %d: Processed %d messages (latest: %s[%d]@%d)", + c.id, c.messagesProcessed, topic, partition, offset) + } + + // Optional: Validate message content (for testing purposes) + if c.config.Chaos.Enabled { + if err := c.validateMessage(decodedMessage); err != nil { + log.Printf("Consumer %d: Message validation failed: %v", c.id, err) + } + } + + return nil +} + +// decodeJSONMessage decodes a JSON message +func (c *Consumer) decodeJSONMessage(value []byte) (interface{}, error) { + var message map[string]interface{} + if err := json.Unmarshal(value, &message); err != nil { + // DEBUG: Log the raw bytes when JSON parsing fails + log.Printf("Consumer %d: JSON decode failed. Length: %d, Raw bytes (hex): %x, Raw string: %q, Error: %v", + c.id, len(value), value, string(value), err) + return nil, err + } + return message, nil +} + +// decodeAvroMessage decodes an Avro message (handles Confluent Wire Format) +func (c *Consumer) decodeAvroMessage(value []byte) (interface{}, error) { + if c.avroCodec == nil { + return nil, fmt.Errorf("Avro codec not initialized") + } + + // Handle Confluent Wire Format when schemas are enabled + var avroData []byte + if c.config.Schemas.Enabled { + if len(value) < 5 { + return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value)) + } + + // Check magic byte (should be 0) + if value[0] != 0 { + return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0]) + } + + // Extract schema ID (bytes 1-4, big-endian) + schemaID := binary.BigEndian.Uint32(value[1:5]) + _ = schemaID // TODO: Could validate schema ID matches expected schema + + // Extract Avro data (bytes 5+) + avroData = value[5:] + } else { + // No wire format, use raw data + avroData = value + } + + native, _, err := c.avroCodec.NativeFromBinary(avroData) + if err != nil { + return nil, fmt.Errorf("failed to decode Avro data: %w", err) + } + + return native, nil +} + +// decodeJSONSchemaMessage decodes a JSON Schema message (handles Confluent Wire Format) +func (c *Consumer) decodeJSONSchemaMessage(value []byte) (interface{}, error) { + // Handle Confluent Wire Format when schemas are enabled + var jsonData []byte + if c.config.Schemas.Enabled { + if len(value) < 5 { + return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value)) + } + + // Check magic byte (should be 0) + if value[0] != 0 { + return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0]) + } + + // Extract schema ID (bytes 1-4, big-endian) + schemaID := binary.BigEndian.Uint32(value[1:5]) + _ = schemaID // TODO: Could validate schema ID matches expected schema + + // Extract JSON data (bytes 5+) + jsonData = value[5:] + } else { + // No wire format, use raw data + jsonData = value + } + + // Decode JSON + var message map[string]interface{} + if err := json.Unmarshal(jsonData, &message); err != nil { + return nil, fmt.Errorf("failed to decode JSON data: %w", err) + } + + return message, nil +} + +// decodeProtobufMessage decodes a Protobuf message (handles Confluent Wire Format) +func (c *Consumer) decodeProtobufMessage(value []byte) (interface{}, error) { + // Handle Confluent Wire Format when schemas are enabled + var protoData []byte + if c.config.Schemas.Enabled { + if len(value) < 5 { + return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value)) + } + + // Check magic byte (should be 0) + if value[0] != 0 { + return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0]) + } + + // Extract schema ID (bytes 1-4, big-endian) + schemaID := binary.BigEndian.Uint32(value[1:5]) + _ = schemaID // TODO: Could validate schema ID matches expected schema + + // Extract Protobuf data (bytes 5+) + protoData = value[5:] + } else { + // No wire format, use raw data + protoData = value + } + + // Unmarshal protobuf message + var protoMsg pb.LoadTestMessage + if err := proto.Unmarshal(protoData, &protoMsg); err != nil { + return nil, fmt.Errorf("failed to unmarshal Protobuf data: %w", err) + } + + // Convert to map for consistency with other decoders + return map[string]interface{}{ + "id": protoMsg.Id, + "timestamp": protoMsg.Timestamp, + "producer_id": protoMsg.ProducerId, + "counter": protoMsg.Counter, + "user_id": protoMsg.UserId, + "event_type": protoMsg.EventType, + "properties": protoMsg.Properties, + }, nil +} + +// decodeBinaryMessage decodes a binary message +func (c *Consumer) decodeBinaryMessage(value []byte) (interface{}, error) { + if len(value) < 20 { + return nil, fmt.Errorf("binary message too short") + } + + // Extract fields from the binary format: + // [producer_id:4][counter:8][timestamp:8][random_data:...] + + producerID := int(value[0])<<24 | int(value[1])<<16 | int(value[2])<<8 | int(value[3]) + + var counter int64 + for i := 0; i < 8; i++ { + counter |= int64(value[4+i]) << (56 - i*8) + } + + var timestamp int64 + for i := 0; i < 8; i++ { + timestamp |= int64(value[12+i]) << (56 - i*8) + } + + return map[string]interface{}{ + "producer_id": producerID, + "counter": counter, + "timestamp": timestamp, + "data_size": len(value), + }, nil +} + +// validateMessage performs basic message validation +func (c *Consumer) validateMessage(message interface{}) error { + // This is a placeholder for message validation logic + // In a real load test, you might validate: + // - Message structure + // - Required fields + // - Data consistency + // - Schema compliance + + if message == nil { + return fmt.Errorf("message is nil") + } + + return nil +} + +// updateOffset updates the last seen offset for lag calculation +func (c *Consumer) updateOffset(topic string, partition int32, offset int64) { + c.offsetMutex.Lock() + defer c.offsetMutex.Unlock() + + if c.lastOffset[topic] == nil { + c.lastOffset[topic] = make(map[int32]int64) + } + c.lastOffset[topic][partition] = offset +} + +// monitorConsumerLag monitors and reports consumer lag +func (c *Consumer) monitorConsumerLag(ctx context.Context) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.reportConsumerLag() + } + } +} + +// reportConsumerLag calculates and reports consumer lag +func (c *Consumer) reportConsumerLag() { + // This is a simplified lag calculation + // In a real implementation, you would query the broker for high water marks + + c.offsetMutex.RLock() + defer c.offsetMutex.RUnlock() + + for topic, partitions := range c.lastOffset { + for partition, _ := range partitions { + // For simplicity, assume lag is always 0 when we're consuming actively + // In a real test, you would compare against the high water mark + lag := int64(0) + + c.metricsCollector.UpdateConsumerLag(c.consumerGroup, topic, partition, lag) + } + } +} + +// Close closes the consumer and cleans up resources +func (c *Consumer) Close() error { + log.Printf("Consumer %d: Closing", c.id) + + if c.saramaConsumer != nil { + return c.saramaConsumer.Close() + } + + return nil +} + +// ConsumerGroupHandler implements sarama.ConsumerGroupHandler +type ConsumerGroupHandler struct { + consumer *Consumer +} + +// Setup is run at the beginning of a new session, before ConsumeClaim +func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { + log.Printf("Consumer %d: Consumer group session setup", h.consumer.id) + return nil +} + +// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited +func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { + log.Printf("Consumer %d: Consumer group session cleanup", h.consumer.id) + return nil +} + +// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages() +func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + msgCount := 0 + for { + select { + case message, ok := <-claim.Messages(): + if !ok { + return nil + } + msgCount++ + + // Process the message + var key []byte + if message.Key != nil { + key = message.Key + } + + if err := h.consumer.processMessage(&message.Topic, message.Partition, message.Offset, key, message.Value); err != nil { + log.Printf("Consumer %d: Error processing message: %v", h.consumer.id, err) + h.consumer.metricsCollector.RecordConsumerError() + + // Add a small delay for schema validation or other processing errors to avoid overloading + // select { + // case <-time.After(100 * time.Millisecond): + // // Continue after brief delay + // case <-session.Context().Done(): + // return nil + // } + } else { + // Mark message as processed + session.MarkMessage(message, "") + } + + case <-session.Context().Done(): + log.Printf("Consumer %d: Session context cancelled for %s[%d]", + h.consumer.id, claim.Topic(), claim.Partition()) + return nil + } + } +} + +// Helper functions + +func joinStrings(strs []string, sep string) string { + if len(strs) == 0 { + return "" + } + + result := strs[0] + for i := 1; i < len(strs); i++ { + result += sep + strs[i] + } + return result +} |
