aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/kafka-client-loadtest/internal
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/kafka-client-loadtest/internal')
-rw-r--r--test/kafka/kafka-client-loadtest/internal/config/config.go361
-rw-r--r--test/kafka/kafka-client-loadtest/internal/consumer/consumer.go626
-rw-r--r--test/kafka/kafka-client-loadtest/internal/metrics/collector.go353
-rw-r--r--test/kafka/kafka-client-loadtest/internal/producer/producer.go770
-rw-r--r--test/kafka/kafka-client-loadtest/internal/schema/loadtest.proto16
-rw-r--r--test/kafka/kafka-client-loadtest/internal/schema/pb/loadtest.pb.go185
-rw-r--r--test/kafka/kafka-client-loadtest/internal/schema/schemas.go58
7 files changed, 2369 insertions, 0 deletions
diff --git a/test/kafka/kafka-client-loadtest/internal/config/config.go b/test/kafka/kafka-client-loadtest/internal/config/config.go
new file mode 100644
index 000000000..dd9f6d6b2
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/config/config.go
@@ -0,0 +1,361 @@
+package config
+
+import (
+ "fmt"
+ "os"
+ "strconv"
+ "strings"
+ "time"
+
+ "gopkg.in/yaml.v3"
+)
+
+// Config represents the complete load test configuration
+type Config struct {
+ TestMode string `yaml:"test_mode"`
+ Duration time.Duration `yaml:"duration"`
+
+ Kafka KafkaConfig `yaml:"kafka"`
+ SchemaRegistry SchemaRegistryConfig `yaml:"schema_registry"`
+ Producers ProducersConfig `yaml:"producers"`
+ Consumers ConsumersConfig `yaml:"consumers"`
+ Topics TopicsConfig `yaml:"topics"`
+ Schemas SchemasConfig `yaml:"schemas"`
+ Metrics MetricsConfig `yaml:"metrics"`
+ Scenarios ScenariosConfig `yaml:"scenarios"`
+ Chaos ChaosConfig `yaml:"chaos"`
+ Output OutputConfig `yaml:"output"`
+ Logging LoggingConfig `yaml:"logging"`
+}
+
+type KafkaConfig struct {
+ BootstrapServers []string `yaml:"bootstrap_servers"`
+ SecurityProtocol string `yaml:"security_protocol"`
+ SASLMechanism string `yaml:"sasl_mechanism"`
+ SASLUsername string `yaml:"sasl_username"`
+ SASLPassword string `yaml:"sasl_password"`
+}
+
+type SchemaRegistryConfig struct {
+ URL string `yaml:"url"`
+ Auth struct {
+ Username string `yaml:"username"`
+ Password string `yaml:"password"`
+ } `yaml:"auth"`
+}
+
+type ProducersConfig struct {
+ Count int `yaml:"count"`
+ MessageRate int `yaml:"message_rate"`
+ MessageSize int `yaml:"message_size"`
+ BatchSize int `yaml:"batch_size"`
+ LingerMs int `yaml:"linger_ms"`
+ CompressionType string `yaml:"compression_type"`
+ Acks string `yaml:"acks"`
+ Retries int `yaml:"retries"`
+ RetryBackoffMs int `yaml:"retry_backoff_ms"`
+ RequestTimeoutMs int `yaml:"request_timeout_ms"`
+ DeliveryTimeoutMs int `yaml:"delivery_timeout_ms"`
+ KeyDistribution string `yaml:"key_distribution"`
+ ValueType string `yaml:"value_type"` // json, avro, protobuf, binary
+ SchemaFormat string `yaml:"schema_format"` // AVRO, JSON, PROTOBUF (schema registry format)
+ IncludeTimestamp bool `yaml:"include_timestamp"`
+ IncludeHeaders bool `yaml:"include_headers"`
+}
+
+type ConsumersConfig struct {
+ Count int `yaml:"count"`
+ GroupPrefix string `yaml:"group_prefix"`
+ AutoOffsetReset string `yaml:"auto_offset_reset"`
+ EnableAutoCommit bool `yaml:"enable_auto_commit"`
+ AutoCommitIntervalMs int `yaml:"auto_commit_interval_ms"`
+ SessionTimeoutMs int `yaml:"session_timeout_ms"`
+ HeartbeatIntervalMs int `yaml:"heartbeat_interval_ms"`
+ MaxPollRecords int `yaml:"max_poll_records"`
+ MaxPollIntervalMs int `yaml:"max_poll_interval_ms"`
+ FetchMinBytes int `yaml:"fetch_min_bytes"`
+ FetchMaxBytes int `yaml:"fetch_max_bytes"`
+ FetchMaxWaitMs int `yaml:"fetch_max_wait_ms"`
+}
+
+type TopicsConfig struct {
+ Count int `yaml:"count"`
+ Prefix string `yaml:"prefix"`
+ Partitions int `yaml:"partitions"`
+ ReplicationFactor int `yaml:"replication_factor"`
+ CleanupPolicy string `yaml:"cleanup_policy"`
+ RetentionMs int64 `yaml:"retention_ms"`
+ SegmentMs int64 `yaml:"segment_ms"`
+}
+
+type SchemaConfig struct {
+ Type string `yaml:"type"`
+ Schema string `yaml:"schema"`
+}
+
+type SchemasConfig struct {
+ Enabled bool `yaml:"enabled"`
+ RegistryTimeoutMs int `yaml:"registry_timeout_ms"`
+ UserEvent SchemaConfig `yaml:"user_event"`
+ Transaction SchemaConfig `yaml:"transaction"`
+}
+
+type MetricsConfig struct {
+ Enabled bool `yaml:"enabled"`
+ CollectionInterval time.Duration `yaml:"collection_interval"`
+ PrometheusPort int `yaml:"prometheus_port"`
+ TrackLatency bool `yaml:"track_latency"`
+ TrackThroughput bool `yaml:"track_throughput"`
+ TrackErrors bool `yaml:"track_errors"`
+ TrackConsumerLag bool `yaml:"track_consumer_lag"`
+ LatencyPercentiles []float64 `yaml:"latency_percentiles"`
+}
+
+type ScenarioConfig struct {
+ ProducerRate int `yaml:"producer_rate"`
+ RampUpTime time.Duration `yaml:"ramp_up_time"`
+ SteadyDuration time.Duration `yaml:"steady_duration"`
+ RampDownTime time.Duration `yaml:"ramp_down_time"`
+ BaseRate int `yaml:"base_rate"`
+ BurstRate int `yaml:"burst_rate"`
+ BurstDuration time.Duration `yaml:"burst_duration"`
+ BurstInterval time.Duration `yaml:"burst_interval"`
+ StartRate int `yaml:"start_rate"`
+ EndRate int `yaml:"end_rate"`
+ RampDuration time.Duration `yaml:"ramp_duration"`
+ StepDuration time.Duration `yaml:"step_duration"`
+}
+
+type ScenariosConfig struct {
+ SteadyLoad ScenarioConfig `yaml:"steady_load"`
+ BurstLoad ScenarioConfig `yaml:"burst_load"`
+ RampTest ScenarioConfig `yaml:"ramp_test"`
+}
+
+type ChaosConfig struct {
+ Enabled bool `yaml:"enabled"`
+ ProducerFailureRate float64 `yaml:"producer_failure_rate"`
+ ConsumerFailureRate float64 `yaml:"consumer_failure_rate"`
+ NetworkPartitionProbability float64 `yaml:"network_partition_probability"`
+ BrokerRestartInterval time.Duration `yaml:"broker_restart_interval"`
+}
+
+type OutputConfig struct {
+ ResultsDir string `yaml:"results_dir"`
+ ExportPrometheus bool `yaml:"export_prometheus"`
+ ExportCSV bool `yaml:"export_csv"`
+ ExportJSON bool `yaml:"export_json"`
+ RealTimeStats bool `yaml:"real_time_stats"`
+ StatsInterval time.Duration `yaml:"stats_interval"`
+}
+
+type LoggingConfig struct {
+ Level string `yaml:"level"`
+ Format string `yaml:"format"`
+ EnableKafkaLogs bool `yaml:"enable_kafka_logs"`
+}
+
+// Load reads and parses the configuration file
+func Load(configFile string) (*Config, error) {
+ data, err := os.ReadFile(configFile)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read config file %s: %w", configFile, err)
+ }
+
+ var cfg Config
+ if err := yaml.Unmarshal(data, &cfg); err != nil {
+ return nil, fmt.Errorf("failed to parse config file %s: %w", configFile, err)
+ }
+
+ // Apply default values
+ cfg.setDefaults()
+
+ // Apply environment variable overrides
+ cfg.applyEnvOverrides()
+
+ return &cfg, nil
+}
+
+// ApplyOverrides applies command-line flag overrides
+func (c *Config) ApplyOverrides(testMode string, duration time.Duration) {
+ if testMode != "" {
+ c.TestMode = testMode
+ }
+ if duration > 0 {
+ c.Duration = duration
+ }
+}
+
+// setDefaults sets default values for optional fields
+func (c *Config) setDefaults() {
+ if c.TestMode == "" {
+ c.TestMode = "comprehensive"
+ }
+
+ if len(c.Kafka.BootstrapServers) == 0 {
+ c.Kafka.BootstrapServers = []string{"kafka-gateway:9093"}
+ }
+
+ if c.SchemaRegistry.URL == "" {
+ c.SchemaRegistry.URL = "http://schema-registry:8081"
+ }
+
+ // Schema support is always enabled since Kafka Gateway now enforces schema-first behavior
+ c.Schemas.Enabled = true
+
+ if c.Producers.Count == 0 {
+ c.Producers.Count = 10
+ }
+
+ if c.Consumers.Count == 0 {
+ c.Consumers.Count = 5
+ }
+
+ if c.Topics.Count == 0 {
+ c.Topics.Count = 5
+ }
+
+ if c.Topics.Prefix == "" {
+ c.Topics.Prefix = "loadtest-topic"
+ }
+
+ if c.Topics.Partitions == 0 {
+ c.Topics.Partitions = 4 // Default to 4 partitions
+ }
+
+ if c.Topics.ReplicationFactor == 0 {
+ c.Topics.ReplicationFactor = 1 // Default to 1 replica
+ }
+
+ if c.Consumers.GroupPrefix == "" {
+ c.Consumers.GroupPrefix = "loadtest-group"
+ }
+
+ if c.Output.ResultsDir == "" {
+ c.Output.ResultsDir = "/test-results"
+ }
+
+ if c.Metrics.CollectionInterval == 0 {
+ c.Metrics.CollectionInterval = 10 * time.Second
+ }
+
+ if c.Output.StatsInterval == 0 {
+ c.Output.StatsInterval = 30 * time.Second
+ }
+}
+
+// applyEnvOverrides applies environment variable overrides
+func (c *Config) applyEnvOverrides() {
+ if servers := os.Getenv("KAFKA_BOOTSTRAP_SERVERS"); servers != "" {
+ c.Kafka.BootstrapServers = strings.Split(servers, ",")
+ }
+
+ if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" {
+ c.SchemaRegistry.URL = url
+ }
+
+ if mode := os.Getenv("TEST_MODE"); mode != "" {
+ c.TestMode = mode
+ }
+
+ if duration := os.Getenv("TEST_DURATION"); duration != "" {
+ if d, err := time.ParseDuration(duration); err == nil {
+ c.Duration = d
+ }
+ }
+
+ if count := os.Getenv("PRODUCER_COUNT"); count != "" {
+ if i, err := strconv.Atoi(count); err == nil {
+ c.Producers.Count = i
+ }
+ }
+
+ if count := os.Getenv("CONSUMER_COUNT"); count != "" {
+ if i, err := strconv.Atoi(count); err == nil {
+ c.Consumers.Count = i
+ }
+ }
+
+ if rate := os.Getenv("MESSAGE_RATE"); rate != "" {
+ if i, err := strconv.Atoi(rate); err == nil {
+ c.Producers.MessageRate = i
+ }
+ }
+
+ if size := os.Getenv("MESSAGE_SIZE"); size != "" {
+ if i, err := strconv.Atoi(size); err == nil {
+ c.Producers.MessageSize = i
+ }
+ }
+
+ if count := os.Getenv("TOPIC_COUNT"); count != "" {
+ if i, err := strconv.Atoi(count); err == nil {
+ c.Topics.Count = i
+ }
+ }
+
+ if partitions := os.Getenv("PARTITIONS_PER_TOPIC"); partitions != "" {
+ if i, err := strconv.Atoi(partitions); err == nil {
+ c.Topics.Partitions = i
+ }
+ }
+
+ if valueType := os.Getenv("VALUE_TYPE"); valueType != "" {
+ c.Producers.ValueType = valueType
+ }
+
+ if schemaFormat := os.Getenv("SCHEMA_FORMAT"); schemaFormat != "" {
+ c.Producers.SchemaFormat = schemaFormat
+ }
+
+ if enabled := os.Getenv("SCHEMAS_ENABLED"); enabled != "" {
+ c.Schemas.Enabled = enabled == "true"
+ }
+}
+
+// GetTopicNames returns the list of topic names to use for testing
+func (c *Config) GetTopicNames() []string {
+ topics := make([]string, c.Topics.Count)
+ for i := 0; i < c.Topics.Count; i++ {
+ topics[i] = fmt.Sprintf("%s-%d", c.Topics.Prefix, i)
+ }
+ return topics
+}
+
+// GetConsumerGroupNames returns the list of consumer group names
+func (c *Config) GetConsumerGroupNames() []string {
+ groups := make([]string, c.Consumers.Count)
+ for i := 0; i < c.Consumers.Count; i++ {
+ groups[i] = fmt.Sprintf("%s-%d", c.Consumers.GroupPrefix, i)
+ }
+ return groups
+}
+
+// Validate validates the configuration
+func (c *Config) Validate() error {
+ if c.TestMode != "producer" && c.TestMode != "consumer" && c.TestMode != "comprehensive" {
+ return fmt.Errorf("invalid test mode: %s", c.TestMode)
+ }
+
+ if len(c.Kafka.BootstrapServers) == 0 {
+ return fmt.Errorf("kafka bootstrap servers not specified")
+ }
+
+ if c.Producers.Count <= 0 && (c.TestMode == "producer" || c.TestMode == "comprehensive") {
+ return fmt.Errorf("producer count must be greater than 0 for producer or comprehensive tests")
+ }
+
+ if c.Consumers.Count <= 0 && (c.TestMode == "consumer" || c.TestMode == "comprehensive") {
+ return fmt.Errorf("consumer count must be greater than 0 for consumer or comprehensive tests")
+ }
+
+ if c.Topics.Count <= 0 {
+ return fmt.Errorf("topic count must be greater than 0")
+ }
+
+ if c.Topics.Partitions <= 0 {
+ return fmt.Errorf("partitions per topic must be greater than 0")
+ }
+
+ return nil
+}
diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
new file mode 100644
index 000000000..e1c4caa41
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go
@@ -0,0 +1,626 @@
+package consumer
+
+import (
+ "context"
+ "encoding/binary"
+ "encoding/json"
+ "fmt"
+ "log"
+ "sync"
+ "time"
+
+ "github.com/IBM/sarama"
+ "github.com/linkedin/goavro/v2"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics"
+ pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb"
+ "google.golang.org/protobuf/proto"
+)
+
+// Consumer represents a Kafka consumer for load testing
+type Consumer struct {
+ id int
+ config *config.Config
+ metricsCollector *metrics.Collector
+ saramaConsumer sarama.ConsumerGroup
+ useConfluent bool // Always false, Sarama only
+ topics []string
+ consumerGroup string
+ avroCodec *goavro.Codec
+
+ // Schema format tracking per topic
+ schemaFormats map[string]string // topic -> schema format mapping (AVRO, JSON, PROTOBUF)
+
+ // Processing tracking
+ messagesProcessed int64
+ lastOffset map[string]map[int32]int64
+ offsetMutex sync.RWMutex
+}
+
+// New creates a new consumer instance
+func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, error) {
+ consumerGroup := fmt.Sprintf("%s-%d", cfg.Consumers.GroupPrefix, id)
+
+ c := &Consumer{
+ id: id,
+ config: cfg,
+ metricsCollector: collector,
+ topics: cfg.GetTopicNames(),
+ consumerGroup: consumerGroup,
+ useConfluent: false, // Use Sarama by default
+ lastOffset: make(map[string]map[int32]int64),
+ schemaFormats: make(map[string]string),
+ }
+
+ // Initialize schema formats for each topic (must match producer logic)
+ // This mirrors the format distribution in cmd/loadtest/main.go registerSchemas()
+ for i, topic := range c.topics {
+ var schemaFormat string
+ if cfg.Producers.SchemaFormat != "" {
+ // Use explicit config if provided
+ schemaFormat = cfg.Producers.SchemaFormat
+ } else {
+ // Distribute across formats (same as producer)
+ switch i % 3 {
+ case 0:
+ schemaFormat = "AVRO"
+ case 1:
+ schemaFormat = "JSON"
+ case 2:
+ schemaFormat = "PROTOBUF"
+ }
+ }
+ c.schemaFormats[topic] = schemaFormat
+ log.Printf("Consumer %d: Topic %s will use schema format: %s", id, topic, schemaFormat)
+ }
+
+ // Initialize consumer based on configuration
+ if c.useConfluent {
+ if err := c.initConfluentConsumer(); err != nil {
+ return nil, fmt.Errorf("failed to initialize Confluent consumer: %w", err)
+ }
+ } else {
+ if err := c.initSaramaConsumer(); err != nil {
+ return nil, fmt.Errorf("failed to initialize Sarama consumer: %w", err)
+ }
+ }
+
+ // Initialize Avro codec if schemas are enabled
+ if cfg.Schemas.Enabled {
+ if err := c.initAvroCodec(); err != nil {
+ return nil, fmt.Errorf("failed to initialize Avro codec: %w", err)
+ }
+ }
+
+ log.Printf("Consumer %d initialized for group %s", id, consumerGroup)
+ return c, nil
+}
+
+// initSaramaConsumer initializes the Sarama consumer group
+func (c *Consumer) initSaramaConsumer() error {
+ config := sarama.NewConfig()
+
+ // Consumer configuration
+ config.Consumer.Return.Errors = true
+ config.Consumer.Offsets.Initial = sarama.OffsetOldest
+ if c.config.Consumers.AutoOffsetReset == "latest" {
+ config.Consumer.Offsets.Initial = sarama.OffsetNewest
+ }
+
+ // Auto commit configuration
+ config.Consumer.Offsets.AutoCommit.Enable = c.config.Consumers.EnableAutoCommit
+ config.Consumer.Offsets.AutoCommit.Interval = time.Duration(c.config.Consumers.AutoCommitIntervalMs) * time.Millisecond
+
+ // Session and heartbeat configuration
+ config.Consumer.Group.Session.Timeout = time.Duration(c.config.Consumers.SessionTimeoutMs) * time.Millisecond
+ config.Consumer.Group.Heartbeat.Interval = time.Duration(c.config.Consumers.HeartbeatIntervalMs) * time.Millisecond
+
+ // Fetch configuration
+ config.Consumer.Fetch.Min = int32(c.config.Consumers.FetchMinBytes)
+ config.Consumer.Fetch.Default = 10 * 1024 * 1024 // 10MB per partition (increased from 1MB default)
+ config.Consumer.Fetch.Max = int32(c.config.Consumers.FetchMaxBytes)
+ config.Consumer.MaxWaitTime = time.Duration(c.config.Consumers.FetchMaxWaitMs) * time.Millisecond
+ config.Consumer.MaxProcessingTime = time.Duration(c.config.Consumers.MaxPollIntervalMs) * time.Millisecond
+
+ // Channel buffer sizes for concurrent partition consumption
+ config.ChannelBufferSize = 256 // Increase from default 256 to allow more buffering
+
+ // Enable concurrent partition fetching by increasing the number of broker connections
+ // This allows Sarama to fetch from multiple partitions in parallel
+ config.Net.MaxOpenRequests = 20 // Increase from default 5 to allow 20 concurrent requests
+
+ // Version
+ config.Version = sarama.V2_8_0_0
+
+ // Create consumer group
+ consumerGroup, err := sarama.NewConsumerGroup(c.config.Kafka.BootstrapServers, c.consumerGroup, config)
+ if err != nil {
+ return fmt.Errorf("failed to create Sarama consumer group: %w", err)
+ }
+
+ c.saramaConsumer = consumerGroup
+ return nil
+}
+
+// initConfluentConsumer initializes the Confluent Kafka Go consumer
+func (c *Consumer) initConfluentConsumer() error {
+ // Confluent consumer disabled, using Sarama only
+ return fmt.Errorf("confluent consumer not enabled")
+}
+
+// initAvroCodec initializes the Avro codec for schema-based messages
+func (c *Consumer) initAvroCodec() error {
+ // Use the LoadTestMessage schema (matches what producer uses)
+ loadTestSchema := `{
+ "type": "record",
+ "name": "LoadTestMessage",
+ "namespace": "com.seaweedfs.loadtest",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "timestamp", "type": "long"},
+ {"name": "producer_id", "type": "int"},
+ {"name": "counter", "type": "long"},
+ {"name": "user_id", "type": "string"},
+ {"name": "event_type", "type": "string"},
+ {"name": "properties", "type": {"type": "map", "values": "string"}}
+ ]
+ }`
+
+ codec, err := goavro.NewCodec(loadTestSchema)
+ if err != nil {
+ return fmt.Errorf("failed to create Avro codec: %w", err)
+ }
+
+ c.avroCodec = codec
+ return nil
+}
+
+// Run starts the consumer and consumes messages until the context is cancelled
+func (c *Consumer) Run(ctx context.Context) {
+ log.Printf("Consumer %d starting for group %s", c.id, c.consumerGroup)
+ defer log.Printf("Consumer %d stopped", c.id)
+
+ if c.useConfluent {
+ c.runConfluentConsumer(ctx)
+ } else {
+ c.runSaramaConsumer(ctx)
+ }
+}
+
+// runSaramaConsumer runs the Sarama consumer group
+func (c *Consumer) runSaramaConsumer(ctx context.Context) {
+ handler := &ConsumerGroupHandler{
+ consumer: c,
+ }
+
+ var wg sync.WaitGroup
+
+ // Start error handler
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case err, ok := <-c.saramaConsumer.Errors():
+ if !ok {
+ return
+ }
+ log.Printf("Consumer %d error: %v", c.id, err)
+ c.metricsCollector.RecordConsumerError()
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ // Start consumer group session
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if err := c.saramaConsumer.Consume(ctx, c.topics, handler); err != nil {
+ log.Printf("Consumer %d: Error consuming: %v", c.id, err)
+ c.metricsCollector.RecordConsumerError()
+
+ // Wait before retrying
+ select {
+ case <-time.After(5 * time.Second):
+ case <-ctx.Done():
+ return
+ }
+ }
+ }
+ }
+ }()
+
+ // Start lag monitoring
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ c.monitorConsumerLag(ctx)
+ }()
+
+ // Wait for completion
+ <-ctx.Done()
+ log.Printf("Consumer %d: Context cancelled, shutting down", c.id)
+ wg.Wait()
+}
+
+// runConfluentConsumer runs the Confluent consumer
+func (c *Consumer) runConfluentConsumer(ctx context.Context) {
+ // Confluent consumer disabled, using Sarama only
+ log.Printf("Consumer %d: Confluent consumer not enabled", c.id)
+}
+
+// processMessage processes a consumed message
+func (c *Consumer) processMessage(topicPtr *string, partition int32, offset int64, key, value []byte) error {
+ topic := ""
+ if topicPtr != nil {
+ topic = *topicPtr
+ }
+
+ // Update offset tracking
+ c.updateOffset(topic, partition, offset)
+
+ // Decode message based on topic-specific schema format
+ var decodedMessage interface{}
+ var err error
+
+ // Determine schema format for this topic (if schemas are enabled)
+ var schemaFormat string
+ if c.config.Schemas.Enabled {
+ schemaFormat = c.schemaFormats[topic]
+ if schemaFormat == "" {
+ // Fallback to config if topic not in map
+ schemaFormat = c.config.Producers.ValueType
+ }
+ } else {
+ // No schemas, use global value type
+ schemaFormat = c.config.Producers.ValueType
+ }
+
+ // Decode message based on format
+ switch schemaFormat {
+ case "avro", "AVRO":
+ decodedMessage, err = c.decodeAvroMessage(value)
+ case "json", "JSON", "JSON_SCHEMA":
+ decodedMessage, err = c.decodeJSONSchemaMessage(value)
+ case "protobuf", "PROTOBUF":
+ decodedMessage, err = c.decodeProtobufMessage(value)
+ case "binary":
+ decodedMessage, err = c.decodeBinaryMessage(value)
+ default:
+ // Fallback to plain JSON
+ decodedMessage, err = c.decodeJSONMessage(value)
+ }
+
+ if err != nil {
+ return fmt.Errorf("failed to decode message: %w", err)
+ }
+
+ // Note: Removed artificial delay to allow maximum throughput
+ // If you need to simulate processing time, add a configurable delay setting
+ // time.Sleep(time.Millisecond) // Minimal processing delay
+
+ // Record metrics
+ c.metricsCollector.RecordConsumedMessage(len(value))
+ c.messagesProcessed++
+
+ // Log progress
+ if c.id == 0 && c.messagesProcessed%1000 == 0 {
+ log.Printf("Consumer %d: Processed %d messages (latest: %s[%d]@%d)",
+ c.id, c.messagesProcessed, topic, partition, offset)
+ }
+
+ // Optional: Validate message content (for testing purposes)
+ if c.config.Chaos.Enabled {
+ if err := c.validateMessage(decodedMessage); err != nil {
+ log.Printf("Consumer %d: Message validation failed: %v", c.id, err)
+ }
+ }
+
+ return nil
+}
+
+// decodeJSONMessage decodes a JSON message
+func (c *Consumer) decodeJSONMessage(value []byte) (interface{}, error) {
+ var message map[string]interface{}
+ if err := json.Unmarshal(value, &message); err != nil {
+ // DEBUG: Log the raw bytes when JSON parsing fails
+ log.Printf("Consumer %d: JSON decode failed. Length: %d, Raw bytes (hex): %x, Raw string: %q, Error: %v",
+ c.id, len(value), value, string(value), err)
+ return nil, err
+ }
+ return message, nil
+}
+
+// decodeAvroMessage decodes an Avro message (handles Confluent Wire Format)
+func (c *Consumer) decodeAvroMessage(value []byte) (interface{}, error) {
+ if c.avroCodec == nil {
+ return nil, fmt.Errorf("Avro codec not initialized")
+ }
+
+ // Handle Confluent Wire Format when schemas are enabled
+ var avroData []byte
+ if c.config.Schemas.Enabled {
+ if len(value) < 5 {
+ return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value))
+ }
+
+ // Check magic byte (should be 0)
+ if value[0] != 0 {
+ return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0])
+ }
+
+ // Extract schema ID (bytes 1-4, big-endian)
+ schemaID := binary.BigEndian.Uint32(value[1:5])
+ _ = schemaID // TODO: Could validate schema ID matches expected schema
+
+ // Extract Avro data (bytes 5+)
+ avroData = value[5:]
+ } else {
+ // No wire format, use raw data
+ avroData = value
+ }
+
+ native, _, err := c.avroCodec.NativeFromBinary(avroData)
+ if err != nil {
+ return nil, fmt.Errorf("failed to decode Avro data: %w", err)
+ }
+
+ return native, nil
+}
+
+// decodeJSONSchemaMessage decodes a JSON Schema message (handles Confluent Wire Format)
+func (c *Consumer) decodeJSONSchemaMessage(value []byte) (interface{}, error) {
+ // Handle Confluent Wire Format when schemas are enabled
+ var jsonData []byte
+ if c.config.Schemas.Enabled {
+ if len(value) < 5 {
+ return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value))
+ }
+
+ // Check magic byte (should be 0)
+ if value[0] != 0 {
+ return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0])
+ }
+
+ // Extract schema ID (bytes 1-4, big-endian)
+ schemaID := binary.BigEndian.Uint32(value[1:5])
+ _ = schemaID // TODO: Could validate schema ID matches expected schema
+
+ // Extract JSON data (bytes 5+)
+ jsonData = value[5:]
+ } else {
+ // No wire format, use raw data
+ jsonData = value
+ }
+
+ // Decode JSON
+ var message map[string]interface{}
+ if err := json.Unmarshal(jsonData, &message); err != nil {
+ return nil, fmt.Errorf("failed to decode JSON data: %w", err)
+ }
+
+ return message, nil
+}
+
+// decodeProtobufMessage decodes a Protobuf message (handles Confluent Wire Format)
+func (c *Consumer) decodeProtobufMessage(value []byte) (interface{}, error) {
+ // Handle Confluent Wire Format when schemas are enabled
+ var protoData []byte
+ if c.config.Schemas.Enabled {
+ if len(value) < 5 {
+ return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value))
+ }
+
+ // Check magic byte (should be 0)
+ if value[0] != 0 {
+ return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0])
+ }
+
+ // Extract schema ID (bytes 1-4, big-endian)
+ schemaID := binary.BigEndian.Uint32(value[1:5])
+ _ = schemaID // TODO: Could validate schema ID matches expected schema
+
+ // Extract Protobuf data (bytes 5+)
+ protoData = value[5:]
+ } else {
+ // No wire format, use raw data
+ protoData = value
+ }
+
+ // Unmarshal protobuf message
+ var protoMsg pb.LoadTestMessage
+ if err := proto.Unmarshal(protoData, &protoMsg); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal Protobuf data: %w", err)
+ }
+
+ // Convert to map for consistency with other decoders
+ return map[string]interface{}{
+ "id": protoMsg.Id,
+ "timestamp": protoMsg.Timestamp,
+ "producer_id": protoMsg.ProducerId,
+ "counter": protoMsg.Counter,
+ "user_id": protoMsg.UserId,
+ "event_type": protoMsg.EventType,
+ "properties": protoMsg.Properties,
+ }, nil
+}
+
+// decodeBinaryMessage decodes a binary message
+func (c *Consumer) decodeBinaryMessage(value []byte) (interface{}, error) {
+ if len(value) < 20 {
+ return nil, fmt.Errorf("binary message too short")
+ }
+
+ // Extract fields from the binary format:
+ // [producer_id:4][counter:8][timestamp:8][random_data:...]
+
+ producerID := int(value[0])<<24 | int(value[1])<<16 | int(value[2])<<8 | int(value[3])
+
+ var counter int64
+ for i := 0; i < 8; i++ {
+ counter |= int64(value[4+i]) << (56 - i*8)
+ }
+
+ var timestamp int64
+ for i := 0; i < 8; i++ {
+ timestamp |= int64(value[12+i]) << (56 - i*8)
+ }
+
+ return map[string]interface{}{
+ "producer_id": producerID,
+ "counter": counter,
+ "timestamp": timestamp,
+ "data_size": len(value),
+ }, nil
+}
+
+// validateMessage performs basic message validation
+func (c *Consumer) validateMessage(message interface{}) error {
+ // This is a placeholder for message validation logic
+ // In a real load test, you might validate:
+ // - Message structure
+ // - Required fields
+ // - Data consistency
+ // - Schema compliance
+
+ if message == nil {
+ return fmt.Errorf("message is nil")
+ }
+
+ return nil
+}
+
+// updateOffset updates the last seen offset for lag calculation
+func (c *Consumer) updateOffset(topic string, partition int32, offset int64) {
+ c.offsetMutex.Lock()
+ defer c.offsetMutex.Unlock()
+
+ if c.lastOffset[topic] == nil {
+ c.lastOffset[topic] = make(map[int32]int64)
+ }
+ c.lastOffset[topic][partition] = offset
+}
+
+// monitorConsumerLag monitors and reports consumer lag
+func (c *Consumer) monitorConsumerLag(ctx context.Context) {
+ ticker := time.NewTicker(30 * time.Second)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ c.reportConsumerLag()
+ }
+ }
+}
+
+// reportConsumerLag calculates and reports consumer lag
+func (c *Consumer) reportConsumerLag() {
+ // This is a simplified lag calculation
+ // In a real implementation, you would query the broker for high water marks
+
+ c.offsetMutex.RLock()
+ defer c.offsetMutex.RUnlock()
+
+ for topic, partitions := range c.lastOffset {
+ for partition, _ := range partitions {
+ // For simplicity, assume lag is always 0 when we're consuming actively
+ // In a real test, you would compare against the high water mark
+ lag := int64(0)
+
+ c.metricsCollector.UpdateConsumerLag(c.consumerGroup, topic, partition, lag)
+ }
+ }
+}
+
+// Close closes the consumer and cleans up resources
+func (c *Consumer) Close() error {
+ log.Printf("Consumer %d: Closing", c.id)
+
+ if c.saramaConsumer != nil {
+ return c.saramaConsumer.Close()
+ }
+
+ return nil
+}
+
+// ConsumerGroupHandler implements sarama.ConsumerGroupHandler
+type ConsumerGroupHandler struct {
+ consumer *Consumer
+}
+
+// Setup is run at the beginning of a new session, before ConsumeClaim
+func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
+ log.Printf("Consumer %d: Consumer group session setup", h.consumer.id)
+ return nil
+}
+
+// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
+func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
+ log.Printf("Consumer %d: Consumer group session cleanup", h.consumer.id)
+ return nil
+}
+
+// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages()
+func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ msgCount := 0
+ for {
+ select {
+ case message, ok := <-claim.Messages():
+ if !ok {
+ return nil
+ }
+ msgCount++
+
+ // Process the message
+ var key []byte
+ if message.Key != nil {
+ key = message.Key
+ }
+
+ if err := h.consumer.processMessage(&message.Topic, message.Partition, message.Offset, key, message.Value); err != nil {
+ log.Printf("Consumer %d: Error processing message: %v", h.consumer.id, err)
+ h.consumer.metricsCollector.RecordConsumerError()
+
+ // Add a small delay for schema validation or other processing errors to avoid overloading
+ // select {
+ // case <-time.After(100 * time.Millisecond):
+ // // Continue after brief delay
+ // case <-session.Context().Done():
+ // return nil
+ // }
+ } else {
+ // Mark message as processed
+ session.MarkMessage(message, "")
+ }
+
+ case <-session.Context().Done():
+ log.Printf("Consumer %d: Session context cancelled for %s[%d]",
+ h.consumer.id, claim.Topic(), claim.Partition())
+ return nil
+ }
+ }
+}
+
+// Helper functions
+
+func joinStrings(strs []string, sep string) string {
+ if len(strs) == 0 {
+ return ""
+ }
+
+ result := strs[0]
+ for i := 1; i < len(strs); i++ {
+ result += sep + strs[i]
+ }
+ return result
+}
diff --git a/test/kafka/kafka-client-loadtest/internal/metrics/collector.go b/test/kafka/kafka-client-loadtest/internal/metrics/collector.go
new file mode 100644
index 000000000..d6a1edb8e
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/metrics/collector.go
@@ -0,0 +1,353 @@
+package metrics
+
+import (
+ "fmt"
+ "io"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+)
+
+// Collector handles metrics collection for the load test
+type Collector struct {
+ // Atomic counters for thread-safe operations
+ messagesProduced int64
+ messagesConsumed int64
+ bytesProduced int64
+ bytesConsumed int64
+ producerErrors int64
+ consumerErrors int64
+
+ // Latency tracking
+ latencies []time.Duration
+ latencyMutex sync.RWMutex
+
+ // Consumer lag tracking
+ consumerLag map[string]int64
+ consumerLagMutex sync.RWMutex
+
+ // Test timing
+ startTime time.Time
+
+ // Prometheus metrics
+ prometheusMetrics *PrometheusMetrics
+}
+
+// PrometheusMetrics holds all Prometheus metric definitions
+type PrometheusMetrics struct {
+ MessagesProducedTotal prometheus.Counter
+ MessagesConsumedTotal prometheus.Counter
+ BytesProducedTotal prometheus.Counter
+ BytesConsumedTotal prometheus.Counter
+ ProducerErrorsTotal prometheus.Counter
+ ConsumerErrorsTotal prometheus.Counter
+
+ MessageLatencyHistogram prometheus.Histogram
+ ProducerThroughput prometheus.Gauge
+ ConsumerThroughput prometheus.Gauge
+ ConsumerLagGauge *prometheus.GaugeVec
+
+ ActiveProducers prometheus.Gauge
+ ActiveConsumers prometheus.Gauge
+}
+
+// NewCollector creates a new metrics collector
+func NewCollector() *Collector {
+ return &Collector{
+ startTime: time.Now(),
+ consumerLag: make(map[string]int64),
+ prometheusMetrics: &PrometheusMetrics{
+ MessagesProducedTotal: promauto.NewCounter(prometheus.CounterOpts{
+ Name: "kafka_loadtest_messages_produced_total",
+ Help: "Total number of messages produced",
+ }),
+ MessagesConsumedTotal: promauto.NewCounter(prometheus.CounterOpts{
+ Name: "kafka_loadtest_messages_consumed_total",
+ Help: "Total number of messages consumed",
+ }),
+ BytesProducedTotal: promauto.NewCounter(prometheus.CounterOpts{
+ Name: "kafka_loadtest_bytes_produced_total",
+ Help: "Total bytes produced",
+ }),
+ BytesConsumedTotal: promauto.NewCounter(prometheus.CounterOpts{
+ Name: "kafka_loadtest_bytes_consumed_total",
+ Help: "Total bytes consumed",
+ }),
+ ProducerErrorsTotal: promauto.NewCounter(prometheus.CounterOpts{
+ Name: "kafka_loadtest_producer_errors_total",
+ Help: "Total number of producer errors",
+ }),
+ ConsumerErrorsTotal: promauto.NewCounter(prometheus.CounterOpts{
+ Name: "kafka_loadtest_consumer_errors_total",
+ Help: "Total number of consumer errors",
+ }),
+ MessageLatencyHistogram: promauto.NewHistogram(prometheus.HistogramOpts{
+ Name: "kafka_loadtest_message_latency_seconds",
+ Help: "Message end-to-end latency in seconds",
+ Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), // 1ms to ~32s
+ }),
+ ProducerThroughput: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "kafka_loadtest_producer_throughput_msgs_per_sec",
+ Help: "Current producer throughput in messages per second",
+ }),
+ ConsumerThroughput: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "kafka_loadtest_consumer_throughput_msgs_per_sec",
+ Help: "Current consumer throughput in messages per second",
+ }),
+ ConsumerLagGauge: promauto.NewGaugeVec(prometheus.GaugeOpts{
+ Name: "kafka_loadtest_consumer_lag_messages",
+ Help: "Consumer lag in messages",
+ }, []string{"consumer_group", "topic", "partition"}),
+ ActiveProducers: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "kafka_loadtest_active_producers",
+ Help: "Number of active producers",
+ }),
+ ActiveConsumers: promauto.NewGauge(prometheus.GaugeOpts{
+ Name: "kafka_loadtest_active_consumers",
+ Help: "Number of active consumers",
+ }),
+ },
+ }
+}
+
+// RecordProducedMessage records a successfully produced message
+func (c *Collector) RecordProducedMessage(size int, latency time.Duration) {
+ atomic.AddInt64(&c.messagesProduced, 1)
+ atomic.AddInt64(&c.bytesProduced, int64(size))
+
+ c.prometheusMetrics.MessagesProducedTotal.Inc()
+ c.prometheusMetrics.BytesProducedTotal.Add(float64(size))
+ c.prometheusMetrics.MessageLatencyHistogram.Observe(latency.Seconds())
+
+ // Store latency for percentile calculations
+ c.latencyMutex.Lock()
+ c.latencies = append(c.latencies, latency)
+ // Keep only recent latencies to avoid memory bloat
+ if len(c.latencies) > 100000 {
+ c.latencies = c.latencies[50000:]
+ }
+ c.latencyMutex.Unlock()
+}
+
+// RecordConsumedMessage records a successfully consumed message
+func (c *Collector) RecordConsumedMessage(size int) {
+ atomic.AddInt64(&c.messagesConsumed, 1)
+ atomic.AddInt64(&c.bytesConsumed, int64(size))
+
+ c.prometheusMetrics.MessagesConsumedTotal.Inc()
+ c.prometheusMetrics.BytesConsumedTotal.Add(float64(size))
+}
+
+// RecordProducerError records a producer error
+func (c *Collector) RecordProducerError() {
+ atomic.AddInt64(&c.producerErrors, 1)
+ c.prometheusMetrics.ProducerErrorsTotal.Inc()
+}
+
+// RecordConsumerError records a consumer error
+func (c *Collector) RecordConsumerError() {
+ atomic.AddInt64(&c.consumerErrors, 1)
+ c.prometheusMetrics.ConsumerErrorsTotal.Inc()
+}
+
+// UpdateConsumerLag updates consumer lag metrics
+func (c *Collector) UpdateConsumerLag(consumerGroup, topic string, partition int32, lag int64) {
+ key := fmt.Sprintf("%s-%s-%d", consumerGroup, topic, partition)
+
+ c.consumerLagMutex.Lock()
+ c.consumerLag[key] = lag
+ c.consumerLagMutex.Unlock()
+
+ c.prometheusMetrics.ConsumerLagGauge.WithLabelValues(
+ consumerGroup, topic, fmt.Sprintf("%d", partition),
+ ).Set(float64(lag))
+}
+
+// UpdateThroughput updates throughput gauges
+func (c *Collector) UpdateThroughput(producerRate, consumerRate float64) {
+ c.prometheusMetrics.ProducerThroughput.Set(producerRate)
+ c.prometheusMetrics.ConsumerThroughput.Set(consumerRate)
+}
+
+// UpdateActiveClients updates active client counts
+func (c *Collector) UpdateActiveClients(producers, consumers int) {
+ c.prometheusMetrics.ActiveProducers.Set(float64(producers))
+ c.prometheusMetrics.ActiveConsumers.Set(float64(consumers))
+}
+
+// GetStats returns current statistics
+func (c *Collector) GetStats() Stats {
+ produced := atomic.LoadInt64(&c.messagesProduced)
+ consumed := atomic.LoadInt64(&c.messagesConsumed)
+ bytesProduced := atomic.LoadInt64(&c.bytesProduced)
+ bytesConsumed := atomic.LoadInt64(&c.bytesConsumed)
+ producerErrors := atomic.LoadInt64(&c.producerErrors)
+ consumerErrors := atomic.LoadInt64(&c.consumerErrors)
+
+ duration := time.Since(c.startTime)
+
+ // Calculate throughput
+ producerThroughput := float64(produced) / duration.Seconds()
+ consumerThroughput := float64(consumed) / duration.Seconds()
+
+ // Calculate latency percentiles
+ var latencyPercentiles map[float64]time.Duration
+ c.latencyMutex.RLock()
+ if len(c.latencies) > 0 {
+ latencyPercentiles = c.calculatePercentiles(c.latencies)
+ }
+ c.latencyMutex.RUnlock()
+
+ // Get consumer lag summary
+ c.consumerLagMutex.RLock()
+ totalLag := int64(0)
+ maxLag := int64(0)
+ for _, lag := range c.consumerLag {
+ totalLag += lag
+ if lag > maxLag {
+ maxLag = lag
+ }
+ }
+ avgLag := float64(0)
+ if len(c.consumerLag) > 0 {
+ avgLag = float64(totalLag) / float64(len(c.consumerLag))
+ }
+ c.consumerLagMutex.RUnlock()
+
+ return Stats{
+ Duration: duration,
+ MessagesProduced: produced,
+ MessagesConsumed: consumed,
+ BytesProduced: bytesProduced,
+ BytesConsumed: bytesConsumed,
+ ProducerErrors: producerErrors,
+ ConsumerErrors: consumerErrors,
+ ProducerThroughput: producerThroughput,
+ ConsumerThroughput: consumerThroughput,
+ LatencyPercentiles: latencyPercentiles,
+ TotalConsumerLag: totalLag,
+ MaxConsumerLag: maxLag,
+ AvgConsumerLag: avgLag,
+ }
+}
+
+// PrintSummary prints a summary of the test statistics
+func (c *Collector) PrintSummary() {
+ stats := c.GetStats()
+
+ fmt.Printf("\n=== Load Test Summary ===\n")
+ fmt.Printf("Test Duration: %v\n", stats.Duration)
+ fmt.Printf("\nMessages:\n")
+ fmt.Printf(" Produced: %d (%.2f MB)\n", stats.MessagesProduced, float64(stats.BytesProduced)/1024/1024)
+ fmt.Printf(" Consumed: %d (%.2f MB)\n", stats.MessagesConsumed, float64(stats.BytesConsumed)/1024/1024)
+ fmt.Printf(" Producer Errors: %d\n", stats.ProducerErrors)
+ fmt.Printf(" Consumer Errors: %d\n", stats.ConsumerErrors)
+
+ fmt.Printf("\nThroughput:\n")
+ fmt.Printf(" Producer: %.2f msgs/sec\n", stats.ProducerThroughput)
+ fmt.Printf(" Consumer: %.2f msgs/sec\n", stats.ConsumerThroughput)
+
+ if stats.LatencyPercentiles != nil {
+ fmt.Printf("\nLatency Percentiles:\n")
+ percentiles := []float64{50, 90, 95, 99, 99.9}
+ for _, p := range percentiles {
+ if latency, exists := stats.LatencyPercentiles[p]; exists {
+ fmt.Printf(" p%.1f: %v\n", p, latency)
+ }
+ }
+ }
+
+ fmt.Printf("\nConsumer Lag:\n")
+ fmt.Printf(" Total: %d messages\n", stats.TotalConsumerLag)
+ fmt.Printf(" Max: %d messages\n", stats.MaxConsumerLag)
+ fmt.Printf(" Average: %.2f messages\n", stats.AvgConsumerLag)
+ fmt.Printf("=========================\n")
+}
+
+// WriteStats writes statistics to a writer (for HTTP endpoint)
+func (c *Collector) WriteStats(w io.Writer) {
+ stats := c.GetStats()
+
+ fmt.Fprintf(w, "# Load Test Statistics\n")
+ fmt.Fprintf(w, "duration_seconds %v\n", stats.Duration.Seconds())
+ fmt.Fprintf(w, "messages_produced %d\n", stats.MessagesProduced)
+ fmt.Fprintf(w, "messages_consumed %d\n", stats.MessagesConsumed)
+ fmt.Fprintf(w, "bytes_produced %d\n", stats.BytesProduced)
+ fmt.Fprintf(w, "bytes_consumed %d\n", stats.BytesConsumed)
+ fmt.Fprintf(w, "producer_errors %d\n", stats.ProducerErrors)
+ fmt.Fprintf(w, "consumer_errors %d\n", stats.ConsumerErrors)
+ fmt.Fprintf(w, "producer_throughput_msgs_per_sec %f\n", stats.ProducerThroughput)
+ fmt.Fprintf(w, "consumer_throughput_msgs_per_sec %f\n", stats.ConsumerThroughput)
+ fmt.Fprintf(w, "total_consumer_lag %d\n", stats.TotalConsumerLag)
+ fmt.Fprintf(w, "max_consumer_lag %d\n", stats.MaxConsumerLag)
+ fmt.Fprintf(w, "avg_consumer_lag %f\n", stats.AvgConsumerLag)
+
+ if stats.LatencyPercentiles != nil {
+ for percentile, latency := range stats.LatencyPercentiles {
+ fmt.Fprintf(w, "latency_p%g_seconds %f\n", percentile, latency.Seconds())
+ }
+ }
+}
+
+// calculatePercentiles calculates latency percentiles
+func (c *Collector) calculatePercentiles(latencies []time.Duration) map[float64]time.Duration {
+ if len(latencies) == 0 {
+ return nil
+ }
+
+ // Make a copy and sort
+ sorted := make([]time.Duration, len(latencies))
+ copy(sorted, latencies)
+ sort.Slice(sorted, func(i, j int) bool {
+ return sorted[i] < sorted[j]
+ })
+
+ percentiles := map[float64]time.Duration{
+ 50: calculatePercentile(sorted, 50),
+ 90: calculatePercentile(sorted, 90),
+ 95: calculatePercentile(sorted, 95),
+ 99: calculatePercentile(sorted, 99),
+ 99.9: calculatePercentile(sorted, 99.9),
+ }
+
+ return percentiles
+}
+
+// calculatePercentile calculates a specific percentile from sorted data
+func calculatePercentile(sorted []time.Duration, percentile float64) time.Duration {
+ if len(sorted) == 0 {
+ return 0
+ }
+
+ index := percentile / 100.0 * float64(len(sorted)-1)
+ if index == float64(int(index)) {
+ return sorted[int(index)]
+ }
+
+ lower := sorted[int(index)]
+ upper := sorted[int(index)+1]
+ weight := index - float64(int(index))
+
+ return time.Duration(float64(lower) + weight*float64(upper-lower))
+}
+
+// Stats represents the current test statistics
+type Stats struct {
+ Duration time.Duration
+ MessagesProduced int64
+ MessagesConsumed int64
+ BytesProduced int64
+ BytesConsumed int64
+ ProducerErrors int64
+ ConsumerErrors int64
+ ProducerThroughput float64
+ ConsumerThroughput float64
+ LatencyPercentiles map[float64]time.Duration
+ TotalConsumerLag int64
+ MaxConsumerLag int64
+ AvgConsumerLag float64
+}
diff --git a/test/kafka/kafka-client-loadtest/internal/producer/producer.go b/test/kafka/kafka-client-loadtest/internal/producer/producer.go
new file mode 100644
index 000000000..167bfeac6
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/producer/producer.go
@@ -0,0 +1,770 @@
+package producer
+
+import (
+ "context"
+ "encoding/binary"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "log"
+ "math/rand"
+ "net/http"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/IBM/sarama"
+ "github.com/linkedin/goavro/v2"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema"
+ pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb"
+ "google.golang.org/protobuf/proto"
+)
+
+// ErrCircuitBreakerOpen indicates that the circuit breaker is open due to consecutive failures
+var ErrCircuitBreakerOpen = errors.New("circuit breaker is open")
+
+// Producer represents a Kafka producer for load testing
+type Producer struct {
+ id int
+ config *config.Config
+ metricsCollector *metrics.Collector
+ saramaProducer sarama.SyncProducer
+ useConfluent bool
+ topics []string
+ avroCodec *goavro.Codec
+ startTime time.Time // Test run start time for generating unique keys
+
+ // Schema management
+ schemaIDs map[string]int // topic -> schema ID mapping
+ schemaFormats map[string]string // topic -> schema format mapping (AVRO, JSON, etc.)
+
+ // Rate limiting
+ rateLimiter *time.Ticker
+
+ // Message generation
+ messageCounter int64
+ random *rand.Rand
+
+ // Circuit breaker detection
+ consecutiveFailures int
+}
+
+// Message represents a test message
+type Message struct {
+ ID string `json:"id"`
+ Timestamp int64 `json:"timestamp"`
+ ProducerID int `json:"producer_id"`
+ Counter int64 `json:"counter"`
+ UserID string `json:"user_id"`
+ EventType string `json:"event_type"`
+ Properties map[string]interface{} `json:"properties"`
+}
+
+// New creates a new producer instance
+func New(cfg *config.Config, collector *metrics.Collector, id int) (*Producer, error) {
+ p := &Producer{
+ id: id,
+ config: cfg,
+ metricsCollector: collector,
+ topics: cfg.GetTopicNames(),
+ random: rand.New(rand.NewSource(time.Now().UnixNano() + int64(id))),
+ useConfluent: false, // Use Sarama by default, can be made configurable
+ schemaIDs: make(map[string]int),
+ schemaFormats: make(map[string]string),
+ startTime: time.Now(), // Record test start time for unique key generation
+ }
+
+ // Initialize schema formats for each topic
+ // Distribute across AVRO, JSON, and PROTOBUF formats
+ for i, topic := range p.topics {
+ var schemaFormat string
+ if cfg.Producers.SchemaFormat != "" {
+ // Use explicit config if provided
+ schemaFormat = cfg.Producers.SchemaFormat
+ } else {
+ // Distribute across three formats: AVRO, JSON, PROTOBUF
+ switch i % 3 {
+ case 0:
+ schemaFormat = "AVRO"
+ case 1:
+ schemaFormat = "JSON"
+ case 2:
+ schemaFormat = "PROTOBUF"
+ }
+ }
+ p.schemaFormats[topic] = schemaFormat
+ log.Printf("Producer %d: Topic %s will use schema format: %s", id, topic, schemaFormat)
+ }
+
+ // Set up rate limiter if specified
+ if cfg.Producers.MessageRate > 0 {
+ p.rateLimiter = time.NewTicker(time.Second / time.Duration(cfg.Producers.MessageRate))
+ }
+
+ // Initialize Sarama producer
+ if err := p.initSaramaProducer(); err != nil {
+ return nil, fmt.Errorf("failed to initialize Sarama producer: %w", err)
+ }
+
+ // Initialize Avro codec and register/fetch schemas if schemas are enabled
+ if cfg.Schemas.Enabled {
+ if err := p.initAvroCodec(); err != nil {
+ return nil, fmt.Errorf("failed to initialize Avro codec: %w", err)
+ }
+ if err := p.ensureSchemasRegistered(); err != nil {
+ return nil, fmt.Errorf("failed to ensure schemas are registered: %w", err)
+ }
+ if err := p.fetchSchemaIDs(); err != nil {
+ return nil, fmt.Errorf("failed to fetch schema IDs: %w", err)
+ }
+ }
+
+ log.Printf("Producer %d initialized successfully", id)
+ return p, nil
+}
+
+// initSaramaProducer initializes the Sarama producer
+func (p *Producer) initSaramaProducer() error {
+ config := sarama.NewConfig()
+
+ // Producer configuration
+ config.Producer.RequiredAcks = sarama.WaitForAll
+ if p.config.Producers.Acks == "0" {
+ config.Producer.RequiredAcks = sarama.NoResponse
+ } else if p.config.Producers.Acks == "1" {
+ config.Producer.RequiredAcks = sarama.WaitForLocal
+ }
+
+ config.Producer.Retry.Max = p.config.Producers.Retries
+ config.Producer.Retry.Backoff = time.Duration(p.config.Producers.RetryBackoffMs) * time.Millisecond
+ config.Producer.Return.Successes = true
+ config.Producer.Return.Errors = true
+
+ // Compression
+ switch p.config.Producers.CompressionType {
+ case "gzip":
+ config.Producer.Compression = sarama.CompressionGZIP
+ case "snappy":
+ config.Producer.Compression = sarama.CompressionSnappy
+ case "lz4":
+ config.Producer.Compression = sarama.CompressionLZ4
+ case "zstd":
+ config.Producer.Compression = sarama.CompressionZSTD
+ default:
+ config.Producer.Compression = sarama.CompressionNone
+ }
+
+ // Batching
+ config.Producer.Flush.Messages = p.config.Producers.BatchSize
+ config.Producer.Flush.Frequency = time.Duration(p.config.Producers.LingerMs) * time.Millisecond
+
+ // Timeouts
+ config.Net.DialTimeout = 30 * time.Second
+ config.Net.ReadTimeout = 30 * time.Second
+ config.Net.WriteTimeout = 30 * time.Second
+
+ // Version
+ config.Version = sarama.V2_8_0_0
+
+ // Create producer
+ producer, err := sarama.NewSyncProducer(p.config.Kafka.BootstrapServers, config)
+ if err != nil {
+ return fmt.Errorf("failed to create Sarama producer: %w", err)
+ }
+
+ p.saramaProducer = producer
+ return nil
+}
+
+// initAvroCodec initializes the Avro codec for schema-based messages
+func (p *Producer) initAvroCodec() error {
+ // Use the shared LoadTestMessage schema
+ codec, err := goavro.NewCodec(schema.GetAvroSchema())
+ if err != nil {
+ return fmt.Errorf("failed to create Avro codec: %w", err)
+ }
+
+ p.avroCodec = codec
+ return nil
+}
+
+// Run starts the producer and produces messages until the context is cancelled
+func (p *Producer) Run(ctx context.Context) error {
+ log.Printf("Producer %d starting", p.id)
+ defer log.Printf("Producer %d stopped", p.id)
+
+ // Create topics if they don't exist
+ if err := p.createTopics(); err != nil {
+ log.Printf("Producer %d: Failed to create topics: %v", p.id, err)
+ p.metricsCollector.RecordProducerError()
+ return err
+ }
+
+ var wg sync.WaitGroup
+ errChan := make(chan error, 1)
+
+ // Main production loop
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if err := p.produceMessages(ctx); err != nil {
+ errChan <- err
+ }
+ }()
+
+ // Wait for completion or error
+ select {
+ case <-ctx.Done():
+ log.Printf("Producer %d: Context cancelled, shutting down", p.id)
+ case err := <-errChan:
+ log.Printf("Producer %d: Stopping due to error: %v", p.id, err)
+ return err
+ }
+
+ // Stop rate limiter
+ if p.rateLimiter != nil {
+ p.rateLimiter.Stop()
+ }
+
+ // Wait for goroutines to finish
+ wg.Wait()
+ return nil
+}
+
+// produceMessages is the main message production loop
+func (p *Producer) produceMessages(ctx context.Context) error {
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ default:
+ // Rate limiting
+ if p.rateLimiter != nil {
+ select {
+ case <-p.rateLimiter.C:
+ // Proceed
+ case <-ctx.Done():
+ return nil
+ }
+ }
+
+ if err := p.produceMessage(); err != nil {
+ log.Printf("Producer %d: Failed to produce message: %v", p.id, err)
+ p.metricsCollector.RecordProducerError()
+
+ // Check for circuit breaker error
+ if p.isCircuitBreakerError(err) {
+ p.consecutiveFailures++
+ log.Printf("Producer %d: Circuit breaker error detected (%d/%d consecutive failures)",
+ p.id, p.consecutiveFailures, 3)
+
+ // Progressive backoff delay to avoid overloading the gateway
+ backoffDelay := time.Duration(p.consecutiveFailures) * 500 * time.Millisecond
+ log.Printf("Producer %d: Backing off for %v to avoid overloading gateway", p.id, backoffDelay)
+
+ select {
+ case <-time.After(backoffDelay):
+ // Continue after delay
+ case <-ctx.Done():
+ return nil
+ }
+
+ // If we've hit 3 consecutive circuit breaker errors, stop the producer
+ if p.consecutiveFailures >= 3 {
+ log.Printf("Producer %d: Circuit breaker is open - stopping producer after %d consecutive failures",
+ p.id, p.consecutiveFailures)
+ return fmt.Errorf("%w: stopping producer after %d consecutive failures", ErrCircuitBreakerOpen, p.consecutiveFailures)
+ }
+ } else {
+ // Reset counter for non-circuit breaker errors
+ p.consecutiveFailures = 0
+ }
+ } else {
+ // Reset counter on successful message
+ p.consecutiveFailures = 0
+ }
+ }
+ }
+}
+
+// produceMessage produces a single message
+func (p *Producer) produceMessage() error {
+ startTime := time.Now()
+
+ // Select random topic
+ topic := p.topics[p.random.Intn(len(p.topics))]
+
+ // Produce message using Sarama (message will be generated based on topic's schema format)
+ return p.produceSaramaMessage(topic, startTime)
+}
+
+// produceSaramaMessage produces a message using Sarama
+// The message is generated internally based on the topic's schema format
+func (p *Producer) produceSaramaMessage(topic string, startTime time.Time) error {
+ // Generate key
+ key := p.generateMessageKey()
+
+ // If schemas are enabled, wrap in Confluent Wire Format based on topic's schema format
+ var messageValue []byte
+ if p.config.Schemas.Enabled {
+ schemaID, exists := p.schemaIDs[topic]
+ if !exists {
+ return fmt.Errorf("schema ID not found for topic %s", topic)
+ }
+
+ // Get the schema format for this topic
+ schemaFormat := p.schemaFormats[topic]
+
+ // CRITICAL FIX: Encode based on schema format, NOT config value_type
+ // The encoding MUST match what the schema registry and gateway expect
+ var encodedMessage []byte
+ var err error
+ switch schemaFormat {
+ case "AVRO":
+ // For Avro schema, encode as Avro binary
+ encodedMessage, err = p.generateAvroMessage()
+ if err != nil {
+ return fmt.Errorf("failed to encode as Avro for topic %s: %w", topic, err)
+ }
+ case "JSON":
+ // For JSON schema, encode as JSON
+ encodedMessage, err = p.generateJSONMessage()
+ if err != nil {
+ return fmt.Errorf("failed to encode as JSON for topic %s: %w", topic, err)
+ }
+ case "PROTOBUF":
+ // For PROTOBUF schema, encode as Protobuf binary
+ encodedMessage, err = p.generateProtobufMessage()
+ if err != nil {
+ return fmt.Errorf("failed to encode as Protobuf for topic %s: %w", topic, err)
+ }
+ default:
+ // Unknown format - fallback to JSON
+ encodedMessage, err = p.generateJSONMessage()
+ if err != nil {
+ return fmt.Errorf("failed to encode as JSON (unknown format fallback) for topic %s: %w", topic, err)
+ }
+ }
+
+ // Wrap in Confluent wire format (magic byte + schema ID + payload)
+ messageValue = p.createConfluentWireFormat(schemaID, encodedMessage)
+ } else {
+ // No schemas - generate message based on config value_type
+ var err error
+ messageValue, err = p.generateMessage()
+ if err != nil {
+ return fmt.Errorf("failed to generate message: %w", err)
+ }
+ }
+
+ msg := &sarama.ProducerMessage{
+ Topic: topic,
+ Key: sarama.StringEncoder(key),
+ Value: sarama.ByteEncoder(messageValue),
+ }
+
+ // Add headers if configured
+ if p.config.Producers.IncludeHeaders {
+ msg.Headers = []sarama.RecordHeader{
+ {Key: []byte("producer_id"), Value: []byte(fmt.Sprintf("%d", p.id))},
+ {Key: []byte("timestamp"), Value: []byte(fmt.Sprintf("%d", startTime.UnixNano()))},
+ }
+ }
+
+ // Produce message
+ _, _, err := p.saramaProducer.SendMessage(msg)
+ if err != nil {
+ return err
+ }
+
+ // Record metrics
+ latency := time.Since(startTime)
+ p.metricsCollector.RecordProducedMessage(len(messageValue), latency)
+
+ return nil
+}
+
+// generateMessage generates a test message
+func (p *Producer) generateMessage() ([]byte, error) {
+ p.messageCounter++
+
+ switch p.config.Producers.ValueType {
+ case "avro":
+ return p.generateAvroMessage()
+ case "json":
+ return p.generateJSONMessage()
+ case "binary":
+ return p.generateBinaryMessage()
+ default:
+ return p.generateJSONMessage()
+ }
+}
+
+// generateJSONMessage generates a JSON test message
+func (p *Producer) generateJSONMessage() ([]byte, error) {
+ msg := Message{
+ ID: fmt.Sprintf("msg-%d-%d", p.id, p.messageCounter),
+ Timestamp: time.Now().UnixNano(),
+ ProducerID: p.id,
+ Counter: p.messageCounter,
+ UserID: fmt.Sprintf("user-%d", p.random.Intn(10000)),
+ EventType: p.randomEventType(),
+ Properties: map[string]interface{}{
+ "session_id": fmt.Sprintf("sess-%d-%d", p.id, p.random.Intn(1000)),
+ "page_views": fmt.Sprintf("%d", p.random.Intn(100)), // String for Avro map<string,string>
+ "duration_ms": fmt.Sprintf("%d", p.random.Intn(300000)), // String for Avro map<string,string>
+ "country": p.randomCountry(),
+ "device_type": p.randomDeviceType(),
+ "app_version": fmt.Sprintf("v%d.%d.%d", p.random.Intn(10), p.random.Intn(10), p.random.Intn(100)),
+ },
+ }
+
+ // Marshal to JSON (no padding - let natural message size be used)
+ messageBytes, err := json.Marshal(msg)
+ if err != nil {
+ return nil, err
+ }
+
+ return messageBytes, nil
+}
+
+// generateProtobufMessage generates a Protobuf-encoded message
+func (p *Producer) generateProtobufMessage() ([]byte, error) {
+ // Create protobuf message
+ protoMsg := &pb.LoadTestMessage{
+ Id: fmt.Sprintf("msg-%d-%d", p.id, p.messageCounter),
+ Timestamp: time.Now().UnixNano(),
+ ProducerId: int32(p.id),
+ Counter: p.messageCounter,
+ UserId: fmt.Sprintf("user-%d", p.random.Intn(10000)),
+ EventType: p.randomEventType(),
+ Properties: map[string]string{
+ "session_id": fmt.Sprintf("sess-%d-%d", p.id, p.random.Intn(1000)),
+ "page_views": fmt.Sprintf("%d", p.random.Intn(100)),
+ "duration_ms": fmt.Sprintf("%d", p.random.Intn(300000)),
+ "country": p.randomCountry(),
+ "device_type": p.randomDeviceType(),
+ "app_version": fmt.Sprintf("v%d.%d.%d", p.random.Intn(10), p.random.Intn(10), p.random.Intn(100)),
+ },
+ }
+
+ // Marshal to protobuf binary
+ messageBytes, err := proto.Marshal(protoMsg)
+ if err != nil {
+ return nil, err
+ }
+
+ return messageBytes, nil
+}
+
+// generateAvroMessage generates an Avro-encoded message with Confluent Wire Format
+// NOTE: Avro messages are NOT padded - they have their own binary format
+func (p *Producer) generateAvroMessage() ([]byte, error) {
+ if p.avroCodec == nil {
+ return nil, fmt.Errorf("Avro codec not initialized")
+ }
+
+ // Create Avro-compatible record matching the LoadTestMessage schema
+ record := map[string]interface{}{
+ "id": fmt.Sprintf("msg-%d-%d", p.id, p.messageCounter),
+ "timestamp": time.Now().UnixNano(),
+ "producer_id": p.id,
+ "counter": p.messageCounter,
+ "user_id": fmt.Sprintf("user-%d", p.random.Intn(10000)),
+ "event_type": p.randomEventType(),
+ "properties": map[string]interface{}{
+ "session_id": fmt.Sprintf("sess-%d-%d", p.id, p.random.Intn(1000)),
+ "page_views": fmt.Sprintf("%d", p.random.Intn(100)),
+ "duration_ms": fmt.Sprintf("%d", p.random.Intn(300000)),
+ "country": p.randomCountry(),
+ "device_type": p.randomDeviceType(),
+ "app_version": fmt.Sprintf("v%d.%d.%d", p.random.Intn(10), p.random.Intn(10), p.random.Intn(100)),
+ },
+ }
+
+ // Encode to Avro binary
+ avroBytes, err := p.avroCodec.BinaryFromNative(nil, record)
+ if err != nil {
+ return nil, err
+ }
+
+ return avroBytes, nil
+}
+
+// generateBinaryMessage generates a binary test message (no padding)
+func (p *Producer) generateBinaryMessage() ([]byte, error) {
+ // Create a simple binary message format:
+ // [producer_id:4][counter:8][timestamp:8]
+ message := make([]byte, 20)
+
+ // Producer ID (4 bytes)
+ message[0] = byte(p.id >> 24)
+ message[1] = byte(p.id >> 16)
+ message[2] = byte(p.id >> 8)
+ message[3] = byte(p.id)
+
+ // Counter (8 bytes)
+ for i := 0; i < 8; i++ {
+ message[4+i] = byte(p.messageCounter >> (56 - i*8))
+ }
+
+ // Timestamp (8 bytes)
+ timestamp := time.Now().UnixNano()
+ for i := 0; i < 8; i++ {
+ message[12+i] = byte(timestamp >> (56 - i*8))
+ }
+
+ return message, nil
+}
+
+// generateMessageKey generates a message key based on the configured distribution
+// Keys are prefixed with a test run ID to track messages across test runs
+func (p *Producer) generateMessageKey() string {
+ // Use test start time as run ID (format: YYYYMMDD-HHMMSS)
+ runID := p.startTime.Format("20060102-150405")
+
+ switch p.config.Producers.KeyDistribution {
+ case "sequential":
+ return fmt.Sprintf("run-%s-key-%d", runID, p.messageCounter)
+ case "uuid":
+ return fmt.Sprintf("run-%s-uuid-%d-%d-%d", runID, p.id, time.Now().UnixNano(), p.random.Intn(1000000))
+ default: // random
+ return fmt.Sprintf("run-%s-key-%d", runID, p.random.Intn(10000))
+ }
+}
+
+// createTopics creates the test topics if they don't exist
+func (p *Producer) createTopics() error {
+ // Use Sarama admin client to create topics
+ config := sarama.NewConfig()
+ config.Version = sarama.V2_8_0_0
+
+ admin, err := sarama.NewClusterAdmin(p.config.Kafka.BootstrapServers, config)
+ if err != nil {
+ return fmt.Errorf("failed to create admin client: %w", err)
+ }
+ defer admin.Close()
+
+ // Create topic specifications
+ topicSpecs := make(map[string]*sarama.TopicDetail)
+ for _, topic := range p.topics {
+ topicSpecs[topic] = &sarama.TopicDetail{
+ NumPartitions: int32(p.config.Topics.Partitions),
+ ReplicationFactor: int16(p.config.Topics.ReplicationFactor),
+ ConfigEntries: map[string]*string{
+ "cleanup.policy": &p.config.Topics.CleanupPolicy,
+ "retention.ms": stringPtr(fmt.Sprintf("%d", p.config.Topics.RetentionMs)),
+ "segment.ms": stringPtr(fmt.Sprintf("%d", p.config.Topics.SegmentMs)),
+ },
+ }
+ }
+
+ // Create topics
+ for _, topic := range p.topics {
+ err = admin.CreateTopic(topic, topicSpecs[topic], false)
+ if err != nil && err != sarama.ErrTopicAlreadyExists {
+ log.Printf("Producer %d: Warning - failed to create topic %s: %v", p.id, topic, err)
+ } else {
+ log.Printf("Producer %d: Successfully created topic %s", p.id, topic)
+ }
+ }
+
+ return nil
+}
+
+// Close closes the producer and cleans up resources
+func (p *Producer) Close() error {
+ log.Printf("Producer %d: Closing", p.id)
+
+ if p.rateLimiter != nil {
+ p.rateLimiter.Stop()
+ }
+
+ if p.saramaProducer != nil {
+ return p.saramaProducer.Close()
+ }
+
+ return nil
+}
+
+// Helper functions
+
+func stringPtr(s string) *string {
+ return &s
+}
+
+func joinStrings(strs []string, sep string) string {
+ if len(strs) == 0 {
+ return ""
+ }
+
+ result := strs[0]
+ for i := 1; i < len(strs); i++ {
+ result += sep + strs[i]
+ }
+ return result
+}
+
+func (p *Producer) randomEventType() string {
+ events := []string{"login", "logout", "view", "click", "purchase", "signup", "search", "download"}
+ return events[p.random.Intn(len(events))]
+}
+
+func (p *Producer) randomCountry() string {
+ countries := []string{"US", "CA", "UK", "DE", "FR", "JP", "AU", "BR", "IN", "CN"}
+ return countries[p.random.Intn(len(countries))]
+}
+
+func (p *Producer) randomDeviceType() string {
+ devices := []string{"desktop", "mobile", "tablet", "tv", "watch"}
+ return devices[p.random.Intn(len(devices))]
+}
+
+// fetchSchemaIDs fetches schema IDs from Schema Registry for all topics
+func (p *Producer) fetchSchemaIDs() error {
+ for _, topic := range p.topics {
+ subject := topic + "-value"
+ schemaID, err := p.getSchemaID(subject)
+ if err != nil {
+ return fmt.Errorf("failed to get schema ID for subject %s: %w", subject, err)
+ }
+ p.schemaIDs[topic] = schemaID
+ log.Printf("Producer %d: Fetched schema ID %d for topic %s", p.id, schemaID, topic)
+ }
+ return nil
+}
+
+// getSchemaID fetches the latest schema ID for a subject from Schema Registry
+func (p *Producer) getSchemaID(subject string) (int, error) {
+ url := fmt.Sprintf("%s/subjects/%s/versions/latest", p.config.SchemaRegistry.URL, subject)
+
+ resp, err := http.Get(url)
+ if err != nil {
+ return 0, err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != 200 {
+ body, _ := io.ReadAll(resp.Body)
+ return 0, fmt.Errorf("failed to get schema: status=%d, body=%s", resp.StatusCode, string(body))
+ }
+
+ var schemaResp struct {
+ ID int `json:"id"`
+ }
+ if err := json.NewDecoder(resp.Body).Decode(&schemaResp); err != nil {
+ return 0, err
+ }
+
+ return schemaResp.ID, nil
+}
+
+// ensureSchemasRegistered ensures that schemas are registered for all topics
+// It registers schemas if they don't exist, but doesn't fail if they already do
+func (p *Producer) ensureSchemasRegistered() error {
+ for _, topic := range p.topics {
+ subject := topic + "-value"
+
+ // First check if schema already exists
+ schemaID, err := p.getSchemaID(subject)
+ if err == nil {
+ log.Printf("Producer %d: Schema already exists for topic %s (ID: %d), skipping registration", p.id, topic, schemaID)
+ continue
+ }
+
+ // Schema doesn't exist, register it
+ log.Printf("Producer %d: Registering schema for topic %s", p.id, topic)
+ if err := p.registerTopicSchema(subject); err != nil {
+ return fmt.Errorf("failed to register schema for topic %s: %w", topic, err)
+ }
+ log.Printf("Producer %d: Schema registered successfully for topic %s", p.id, topic)
+ }
+ return nil
+}
+
+// registerTopicSchema registers the schema for a specific topic based on configured format
+func (p *Producer) registerTopicSchema(subject string) error {
+ // Extract topic name from subject (remove -value or -key suffix)
+ topicName := strings.TrimSuffix(strings.TrimSuffix(subject, "-value"), "-key")
+
+ // Get schema format for this topic
+ schemaFormat, ok := p.schemaFormats[topicName]
+ if !ok {
+ // Fallback to config or default
+ schemaFormat = p.config.Producers.SchemaFormat
+ if schemaFormat == "" {
+ schemaFormat = "AVRO"
+ }
+ }
+
+ var schemaStr string
+ var schemaType string
+
+ switch strings.ToUpper(schemaFormat) {
+ case "AVRO":
+ schemaStr = schema.GetAvroSchema()
+ schemaType = "AVRO"
+ case "JSON", "JSON_SCHEMA":
+ schemaStr = schema.GetJSONSchema()
+ schemaType = "JSON"
+ case "PROTOBUF":
+ schemaStr = schema.GetProtobufSchema()
+ schemaType = "PROTOBUF"
+ default:
+ return fmt.Errorf("unsupported schema format: %s", schemaFormat)
+ }
+
+ url := fmt.Sprintf("%s/subjects/%s/versions", p.config.SchemaRegistry.URL, subject)
+
+ payload := map[string]interface{}{
+ "schema": schemaStr,
+ "schemaType": schemaType,
+ }
+
+ jsonPayload, err := json.Marshal(payload)
+ if err != nil {
+ return fmt.Errorf("failed to marshal schema payload: %w", err)
+ }
+
+ resp, err := http.Post(url, "application/vnd.schemaregistry.v1+json", strings.NewReader(string(jsonPayload)))
+ if err != nil {
+ return fmt.Errorf("failed to register schema: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != 200 {
+ body, _ := io.ReadAll(resp.Body)
+ return fmt.Errorf("schema registration failed: status=%d, body=%s", resp.StatusCode, string(body))
+ }
+
+ var registerResp struct {
+ ID int `json:"id"`
+ }
+ if err := json.NewDecoder(resp.Body).Decode(&registerResp); err != nil {
+ return fmt.Errorf("failed to decode registration response: %w", err)
+ }
+
+ log.Printf("Schema registered with ID: %d (format: %s)", registerResp.ID, schemaType)
+ return nil
+}
+
+// createConfluentWireFormat creates a message in Confluent Wire Format
+// This matches the implementation in weed/mq/kafka/schema/envelope.go CreateConfluentEnvelope
+func (p *Producer) createConfluentWireFormat(schemaID int, avroData []byte) []byte {
+ // Confluent Wire Format: [magic_byte(1)][schema_id(4)][payload(n)]
+ // magic_byte = 0x00
+ // schema_id = 4 bytes big-endian
+ wireFormat := make([]byte, 5+len(avroData))
+ wireFormat[0] = 0x00 // Magic byte
+ binary.BigEndian.PutUint32(wireFormat[1:5], uint32(schemaID))
+ copy(wireFormat[5:], avroData)
+ return wireFormat
+}
+
+// isCircuitBreakerError checks if an error indicates that the circuit breaker is open
+func (p *Producer) isCircuitBreakerError(err error) bool {
+ return errors.Is(err, ErrCircuitBreakerOpen)
+}
diff --git a/test/kafka/kafka-client-loadtest/internal/schema/loadtest.proto b/test/kafka/kafka-client-loadtest/internal/schema/loadtest.proto
new file mode 100644
index 000000000..dfe00b72f
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/schema/loadtest.proto
@@ -0,0 +1,16 @@
+syntax = "proto3";
+
+package com.seaweedfs.loadtest;
+
+option go_package = "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb";
+
+message LoadTestMessage {
+ string id = 1;
+ int64 timestamp = 2;
+ int32 producer_id = 3;
+ int64 counter = 4;
+ string user_id = 5;
+ string event_type = 6;
+ map<string, string> properties = 7;
+}
+
diff --git a/test/kafka/kafka-client-loadtest/internal/schema/pb/loadtest.pb.go b/test/kafka/kafka-client-loadtest/internal/schema/pb/loadtest.pb.go
new file mode 100644
index 000000000..3ed58aa9e
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/schema/pb/loadtest.pb.go
@@ -0,0 +1,185 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.36.6
+// protoc v5.29.3
+// source: loadtest.proto
+
+package pb
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+ unsafe "unsafe"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type LoadTestMessage struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
+ Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
+ ProducerId int32 `protobuf:"varint,3,opt,name=producer_id,json=producerId,proto3" json:"producer_id,omitempty"`
+ Counter int64 `protobuf:"varint,4,opt,name=counter,proto3" json:"counter,omitempty"`
+ UserId string `protobuf:"bytes,5,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"`
+ EventType string `protobuf:"bytes,6,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
+ Properties map[string]string `protobuf:"bytes,7,rep,name=properties,proto3" json:"properties,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *LoadTestMessage) Reset() {
+ *x = LoadTestMessage{}
+ mi := &file_loadtest_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *LoadTestMessage) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*LoadTestMessage) ProtoMessage() {}
+
+func (x *LoadTestMessage) ProtoReflect() protoreflect.Message {
+ mi := &file_loadtest_proto_msgTypes[0]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use LoadTestMessage.ProtoReflect.Descriptor instead.
+func (*LoadTestMessage) Descriptor() ([]byte, []int) {
+ return file_loadtest_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *LoadTestMessage) GetId() string {
+ if x != nil {
+ return x.Id
+ }
+ return ""
+}
+
+func (x *LoadTestMessage) GetTimestamp() int64 {
+ if x != nil {
+ return x.Timestamp
+ }
+ return 0
+}
+
+func (x *LoadTestMessage) GetProducerId() int32 {
+ if x != nil {
+ return x.ProducerId
+ }
+ return 0
+}
+
+func (x *LoadTestMessage) GetCounter() int64 {
+ if x != nil {
+ return x.Counter
+ }
+ return 0
+}
+
+func (x *LoadTestMessage) GetUserId() string {
+ if x != nil {
+ return x.UserId
+ }
+ return ""
+}
+
+func (x *LoadTestMessage) GetEventType() string {
+ if x != nil {
+ return x.EventType
+ }
+ return ""
+}
+
+func (x *LoadTestMessage) GetProperties() map[string]string {
+ if x != nil {
+ return x.Properties
+ }
+ return nil
+}
+
+var File_loadtest_proto protoreflect.FileDescriptor
+
+const file_loadtest_proto_rawDesc = "" +
+ "\n" +
+ "\x0eloadtest.proto\x12\x16com.seaweedfs.loadtest\"\xca\x02\n" +
+ "\x0fLoadTestMessage\x12\x0e\n" +
+ "\x02id\x18\x01 \x01(\tR\x02id\x12\x1c\n" +
+ "\ttimestamp\x18\x02 \x01(\x03R\ttimestamp\x12\x1f\n" +
+ "\vproducer_id\x18\x03 \x01(\x05R\n" +
+ "producerId\x12\x18\n" +
+ "\acounter\x18\x04 \x01(\x03R\acounter\x12\x17\n" +
+ "\auser_id\x18\x05 \x01(\tR\x06userId\x12\x1d\n" +
+ "\n" +
+ "event_type\x18\x06 \x01(\tR\teventType\x12W\n" +
+ "\n" +
+ "properties\x18\a \x03(\v27.com.seaweedfs.loadtest.LoadTestMessage.PropertiesEntryR\n" +
+ "properties\x1a=\n" +
+ "\x0fPropertiesEntry\x12\x10\n" +
+ "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
+ "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01BTZRgithub.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pbb\x06proto3"
+
+var (
+ file_loadtest_proto_rawDescOnce sync.Once
+ file_loadtest_proto_rawDescData []byte
+)
+
+func file_loadtest_proto_rawDescGZIP() []byte {
+ file_loadtest_proto_rawDescOnce.Do(func() {
+ file_loadtest_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_loadtest_proto_rawDesc), len(file_loadtest_proto_rawDesc)))
+ })
+ return file_loadtest_proto_rawDescData
+}
+
+var file_loadtest_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_loadtest_proto_goTypes = []any{
+ (*LoadTestMessage)(nil), // 0: com.seaweedfs.loadtest.LoadTestMessage
+ nil, // 1: com.seaweedfs.loadtest.LoadTestMessage.PropertiesEntry
+}
+var file_loadtest_proto_depIdxs = []int32{
+ 1, // 0: com.seaweedfs.loadtest.LoadTestMessage.properties:type_name -> com.seaweedfs.loadtest.LoadTestMessage.PropertiesEntry
+ 1, // [1:1] is the sub-list for method output_type
+ 1, // [1:1] is the sub-list for method input_type
+ 1, // [1:1] is the sub-list for extension type_name
+ 1, // [1:1] is the sub-list for extension extendee
+ 0, // [0:1] is the sub-list for field type_name
+}
+
+func init() { file_loadtest_proto_init() }
+func file_loadtest_proto_init() {
+ if File_loadtest_proto != nil {
+ return
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: unsafe.Slice(unsafe.StringData(file_loadtest_proto_rawDesc), len(file_loadtest_proto_rawDesc)),
+ NumEnums: 0,
+ NumMessages: 2,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_loadtest_proto_goTypes,
+ DependencyIndexes: file_loadtest_proto_depIdxs,
+ MessageInfos: file_loadtest_proto_msgTypes,
+ }.Build()
+ File_loadtest_proto = out.File
+ file_loadtest_proto_goTypes = nil
+ file_loadtest_proto_depIdxs = nil
+}
diff --git a/test/kafka/kafka-client-loadtest/internal/schema/schemas.go b/test/kafka/kafka-client-loadtest/internal/schema/schemas.go
new file mode 100644
index 000000000..011b28ef2
--- /dev/null
+++ b/test/kafka/kafka-client-loadtest/internal/schema/schemas.go
@@ -0,0 +1,58 @@
+package schema
+
+// GetAvroSchema returns the Avro schema for load test messages
+func GetAvroSchema() string {
+ return `{
+ "type": "record",
+ "name": "LoadTestMessage",
+ "namespace": "com.seaweedfs.loadtest",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "timestamp", "type": "long"},
+ {"name": "producer_id", "type": "int"},
+ {"name": "counter", "type": "long"},
+ {"name": "user_id", "type": "string"},
+ {"name": "event_type", "type": "string"},
+ {"name": "properties", "type": {"type": "map", "values": "string"}}
+ ]
+ }`
+}
+
+// GetJSONSchema returns the JSON Schema for load test messages
+func GetJSONSchema() string {
+ return `{
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "title": "LoadTestMessage",
+ "type": "object",
+ "properties": {
+ "id": {"type": "string"},
+ "timestamp": {"type": "integer"},
+ "producer_id": {"type": "integer"},
+ "counter": {"type": "integer"},
+ "user_id": {"type": "string"},
+ "event_type": {"type": "string"},
+ "properties": {
+ "type": "object",
+ "additionalProperties": {"type": "string"}
+ }
+ },
+ "required": ["id", "timestamp", "producer_id", "counter", "user_id", "event_type"]
+ }`
+}
+
+// GetProtobufSchema returns the Protobuf schema for load test messages
+func GetProtobufSchema() string {
+ return `syntax = "proto3";
+
+package com.seaweedfs.loadtest;
+
+message LoadTestMessage {
+ string id = 1;
+ int64 timestamp = 2;
+ int32 producer_id = 3;
+ int64 counter = 4;
+ string user_id = 5;
+ string event_type = 6;
+ map<string, string> properties = 7;
+}`
+}