diff options
Diffstat (limited to 'test/kafka/kafka-client-loadtest/internal')
7 files changed, 2369 insertions, 0 deletions
diff --git a/test/kafka/kafka-client-loadtest/internal/config/config.go b/test/kafka/kafka-client-loadtest/internal/config/config.go new file mode 100644 index 000000000..dd9f6d6b2 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/config/config.go @@ -0,0 +1,361 @@ +package config + +import ( + "fmt" + "os" + "strconv" + "strings" + "time" + + "gopkg.in/yaml.v3" +) + +// Config represents the complete load test configuration +type Config struct { + TestMode string `yaml:"test_mode"` + Duration time.Duration `yaml:"duration"` + + Kafka KafkaConfig `yaml:"kafka"` + SchemaRegistry SchemaRegistryConfig `yaml:"schema_registry"` + Producers ProducersConfig `yaml:"producers"` + Consumers ConsumersConfig `yaml:"consumers"` + Topics TopicsConfig `yaml:"topics"` + Schemas SchemasConfig `yaml:"schemas"` + Metrics MetricsConfig `yaml:"metrics"` + Scenarios ScenariosConfig `yaml:"scenarios"` + Chaos ChaosConfig `yaml:"chaos"` + Output OutputConfig `yaml:"output"` + Logging LoggingConfig `yaml:"logging"` +} + +type KafkaConfig struct { + BootstrapServers []string `yaml:"bootstrap_servers"` + SecurityProtocol string `yaml:"security_protocol"` + SASLMechanism string `yaml:"sasl_mechanism"` + SASLUsername string `yaml:"sasl_username"` + SASLPassword string `yaml:"sasl_password"` +} + +type SchemaRegistryConfig struct { + URL string `yaml:"url"` + Auth struct { + Username string `yaml:"username"` + Password string `yaml:"password"` + } `yaml:"auth"` +} + +type ProducersConfig struct { + Count int `yaml:"count"` + MessageRate int `yaml:"message_rate"` + MessageSize int `yaml:"message_size"` + BatchSize int `yaml:"batch_size"` + LingerMs int `yaml:"linger_ms"` + CompressionType string `yaml:"compression_type"` + Acks string `yaml:"acks"` + Retries int `yaml:"retries"` + RetryBackoffMs int `yaml:"retry_backoff_ms"` + RequestTimeoutMs int `yaml:"request_timeout_ms"` + DeliveryTimeoutMs int `yaml:"delivery_timeout_ms"` + KeyDistribution string `yaml:"key_distribution"` + ValueType string `yaml:"value_type"` // json, avro, protobuf, binary + SchemaFormat string `yaml:"schema_format"` // AVRO, JSON, PROTOBUF (schema registry format) + IncludeTimestamp bool `yaml:"include_timestamp"` + IncludeHeaders bool `yaml:"include_headers"` +} + +type ConsumersConfig struct { + Count int `yaml:"count"` + GroupPrefix string `yaml:"group_prefix"` + AutoOffsetReset string `yaml:"auto_offset_reset"` + EnableAutoCommit bool `yaml:"enable_auto_commit"` + AutoCommitIntervalMs int `yaml:"auto_commit_interval_ms"` + SessionTimeoutMs int `yaml:"session_timeout_ms"` + HeartbeatIntervalMs int `yaml:"heartbeat_interval_ms"` + MaxPollRecords int `yaml:"max_poll_records"` + MaxPollIntervalMs int `yaml:"max_poll_interval_ms"` + FetchMinBytes int `yaml:"fetch_min_bytes"` + FetchMaxBytes int `yaml:"fetch_max_bytes"` + FetchMaxWaitMs int `yaml:"fetch_max_wait_ms"` +} + +type TopicsConfig struct { + Count int `yaml:"count"` + Prefix string `yaml:"prefix"` + Partitions int `yaml:"partitions"` + ReplicationFactor int `yaml:"replication_factor"` + CleanupPolicy string `yaml:"cleanup_policy"` + RetentionMs int64 `yaml:"retention_ms"` + SegmentMs int64 `yaml:"segment_ms"` +} + +type SchemaConfig struct { + Type string `yaml:"type"` + Schema string `yaml:"schema"` +} + +type SchemasConfig struct { + Enabled bool `yaml:"enabled"` + RegistryTimeoutMs int `yaml:"registry_timeout_ms"` + UserEvent SchemaConfig `yaml:"user_event"` + Transaction SchemaConfig `yaml:"transaction"` +} + +type MetricsConfig struct { + Enabled bool `yaml:"enabled"` + CollectionInterval time.Duration `yaml:"collection_interval"` + PrometheusPort int `yaml:"prometheus_port"` + TrackLatency bool `yaml:"track_latency"` + TrackThroughput bool `yaml:"track_throughput"` + TrackErrors bool `yaml:"track_errors"` + TrackConsumerLag bool `yaml:"track_consumer_lag"` + LatencyPercentiles []float64 `yaml:"latency_percentiles"` +} + +type ScenarioConfig struct { + ProducerRate int `yaml:"producer_rate"` + RampUpTime time.Duration `yaml:"ramp_up_time"` + SteadyDuration time.Duration `yaml:"steady_duration"` + RampDownTime time.Duration `yaml:"ramp_down_time"` + BaseRate int `yaml:"base_rate"` + BurstRate int `yaml:"burst_rate"` + BurstDuration time.Duration `yaml:"burst_duration"` + BurstInterval time.Duration `yaml:"burst_interval"` + StartRate int `yaml:"start_rate"` + EndRate int `yaml:"end_rate"` + RampDuration time.Duration `yaml:"ramp_duration"` + StepDuration time.Duration `yaml:"step_duration"` +} + +type ScenariosConfig struct { + SteadyLoad ScenarioConfig `yaml:"steady_load"` + BurstLoad ScenarioConfig `yaml:"burst_load"` + RampTest ScenarioConfig `yaml:"ramp_test"` +} + +type ChaosConfig struct { + Enabled bool `yaml:"enabled"` + ProducerFailureRate float64 `yaml:"producer_failure_rate"` + ConsumerFailureRate float64 `yaml:"consumer_failure_rate"` + NetworkPartitionProbability float64 `yaml:"network_partition_probability"` + BrokerRestartInterval time.Duration `yaml:"broker_restart_interval"` +} + +type OutputConfig struct { + ResultsDir string `yaml:"results_dir"` + ExportPrometheus bool `yaml:"export_prometheus"` + ExportCSV bool `yaml:"export_csv"` + ExportJSON bool `yaml:"export_json"` + RealTimeStats bool `yaml:"real_time_stats"` + StatsInterval time.Duration `yaml:"stats_interval"` +} + +type LoggingConfig struct { + Level string `yaml:"level"` + Format string `yaml:"format"` + EnableKafkaLogs bool `yaml:"enable_kafka_logs"` +} + +// Load reads and parses the configuration file +func Load(configFile string) (*Config, error) { + data, err := os.ReadFile(configFile) + if err != nil { + return nil, fmt.Errorf("failed to read config file %s: %w", configFile, err) + } + + var cfg Config + if err := yaml.Unmarshal(data, &cfg); err != nil { + return nil, fmt.Errorf("failed to parse config file %s: %w", configFile, err) + } + + // Apply default values + cfg.setDefaults() + + // Apply environment variable overrides + cfg.applyEnvOverrides() + + return &cfg, nil +} + +// ApplyOverrides applies command-line flag overrides +func (c *Config) ApplyOverrides(testMode string, duration time.Duration) { + if testMode != "" { + c.TestMode = testMode + } + if duration > 0 { + c.Duration = duration + } +} + +// setDefaults sets default values for optional fields +func (c *Config) setDefaults() { + if c.TestMode == "" { + c.TestMode = "comprehensive" + } + + if len(c.Kafka.BootstrapServers) == 0 { + c.Kafka.BootstrapServers = []string{"kafka-gateway:9093"} + } + + if c.SchemaRegistry.URL == "" { + c.SchemaRegistry.URL = "http://schema-registry:8081" + } + + // Schema support is always enabled since Kafka Gateway now enforces schema-first behavior + c.Schemas.Enabled = true + + if c.Producers.Count == 0 { + c.Producers.Count = 10 + } + + if c.Consumers.Count == 0 { + c.Consumers.Count = 5 + } + + if c.Topics.Count == 0 { + c.Topics.Count = 5 + } + + if c.Topics.Prefix == "" { + c.Topics.Prefix = "loadtest-topic" + } + + if c.Topics.Partitions == 0 { + c.Topics.Partitions = 4 // Default to 4 partitions + } + + if c.Topics.ReplicationFactor == 0 { + c.Topics.ReplicationFactor = 1 // Default to 1 replica + } + + if c.Consumers.GroupPrefix == "" { + c.Consumers.GroupPrefix = "loadtest-group" + } + + if c.Output.ResultsDir == "" { + c.Output.ResultsDir = "/test-results" + } + + if c.Metrics.CollectionInterval == 0 { + c.Metrics.CollectionInterval = 10 * time.Second + } + + if c.Output.StatsInterval == 0 { + c.Output.StatsInterval = 30 * time.Second + } +} + +// applyEnvOverrides applies environment variable overrides +func (c *Config) applyEnvOverrides() { + if servers := os.Getenv("KAFKA_BOOTSTRAP_SERVERS"); servers != "" { + c.Kafka.BootstrapServers = strings.Split(servers, ",") + } + + if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" { + c.SchemaRegistry.URL = url + } + + if mode := os.Getenv("TEST_MODE"); mode != "" { + c.TestMode = mode + } + + if duration := os.Getenv("TEST_DURATION"); duration != "" { + if d, err := time.ParseDuration(duration); err == nil { + c.Duration = d + } + } + + if count := os.Getenv("PRODUCER_COUNT"); count != "" { + if i, err := strconv.Atoi(count); err == nil { + c.Producers.Count = i + } + } + + if count := os.Getenv("CONSUMER_COUNT"); count != "" { + if i, err := strconv.Atoi(count); err == nil { + c.Consumers.Count = i + } + } + + if rate := os.Getenv("MESSAGE_RATE"); rate != "" { + if i, err := strconv.Atoi(rate); err == nil { + c.Producers.MessageRate = i + } + } + + if size := os.Getenv("MESSAGE_SIZE"); size != "" { + if i, err := strconv.Atoi(size); err == nil { + c.Producers.MessageSize = i + } + } + + if count := os.Getenv("TOPIC_COUNT"); count != "" { + if i, err := strconv.Atoi(count); err == nil { + c.Topics.Count = i + } + } + + if partitions := os.Getenv("PARTITIONS_PER_TOPIC"); partitions != "" { + if i, err := strconv.Atoi(partitions); err == nil { + c.Topics.Partitions = i + } + } + + if valueType := os.Getenv("VALUE_TYPE"); valueType != "" { + c.Producers.ValueType = valueType + } + + if schemaFormat := os.Getenv("SCHEMA_FORMAT"); schemaFormat != "" { + c.Producers.SchemaFormat = schemaFormat + } + + if enabled := os.Getenv("SCHEMAS_ENABLED"); enabled != "" { + c.Schemas.Enabled = enabled == "true" + } +} + +// GetTopicNames returns the list of topic names to use for testing +func (c *Config) GetTopicNames() []string { + topics := make([]string, c.Topics.Count) + for i := 0; i < c.Topics.Count; i++ { + topics[i] = fmt.Sprintf("%s-%d", c.Topics.Prefix, i) + } + return topics +} + +// GetConsumerGroupNames returns the list of consumer group names +func (c *Config) GetConsumerGroupNames() []string { + groups := make([]string, c.Consumers.Count) + for i := 0; i < c.Consumers.Count; i++ { + groups[i] = fmt.Sprintf("%s-%d", c.Consumers.GroupPrefix, i) + } + return groups +} + +// Validate validates the configuration +func (c *Config) Validate() error { + if c.TestMode != "producer" && c.TestMode != "consumer" && c.TestMode != "comprehensive" { + return fmt.Errorf("invalid test mode: %s", c.TestMode) + } + + if len(c.Kafka.BootstrapServers) == 0 { + return fmt.Errorf("kafka bootstrap servers not specified") + } + + if c.Producers.Count <= 0 && (c.TestMode == "producer" || c.TestMode == "comprehensive") { + return fmt.Errorf("producer count must be greater than 0 for producer or comprehensive tests") + } + + if c.Consumers.Count <= 0 && (c.TestMode == "consumer" || c.TestMode == "comprehensive") { + return fmt.Errorf("consumer count must be greater than 0 for consumer or comprehensive tests") + } + + if c.Topics.Count <= 0 { + return fmt.Errorf("topic count must be greater than 0") + } + + if c.Topics.Partitions <= 0 { + return fmt.Errorf("partitions per topic must be greater than 0") + } + + return nil +} diff --git a/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go new file mode 100644 index 000000000..e1c4caa41 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/consumer/consumer.go @@ -0,0 +1,626 @@ +package consumer + +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "log" + "sync" + "time" + + "github.com/IBM/sarama" + "github.com/linkedin/goavro/v2" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics" + pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb" + "google.golang.org/protobuf/proto" +) + +// Consumer represents a Kafka consumer for load testing +type Consumer struct { + id int + config *config.Config + metricsCollector *metrics.Collector + saramaConsumer sarama.ConsumerGroup + useConfluent bool // Always false, Sarama only + topics []string + consumerGroup string + avroCodec *goavro.Codec + + // Schema format tracking per topic + schemaFormats map[string]string // topic -> schema format mapping (AVRO, JSON, PROTOBUF) + + // Processing tracking + messagesProcessed int64 + lastOffset map[string]map[int32]int64 + offsetMutex sync.RWMutex +} + +// New creates a new consumer instance +func New(cfg *config.Config, collector *metrics.Collector, id int) (*Consumer, error) { + consumerGroup := fmt.Sprintf("%s-%d", cfg.Consumers.GroupPrefix, id) + + c := &Consumer{ + id: id, + config: cfg, + metricsCollector: collector, + topics: cfg.GetTopicNames(), + consumerGroup: consumerGroup, + useConfluent: false, // Use Sarama by default + lastOffset: make(map[string]map[int32]int64), + schemaFormats: make(map[string]string), + } + + // Initialize schema formats for each topic (must match producer logic) + // This mirrors the format distribution in cmd/loadtest/main.go registerSchemas() + for i, topic := range c.topics { + var schemaFormat string + if cfg.Producers.SchemaFormat != "" { + // Use explicit config if provided + schemaFormat = cfg.Producers.SchemaFormat + } else { + // Distribute across formats (same as producer) + switch i % 3 { + case 0: + schemaFormat = "AVRO" + case 1: + schemaFormat = "JSON" + case 2: + schemaFormat = "PROTOBUF" + } + } + c.schemaFormats[topic] = schemaFormat + log.Printf("Consumer %d: Topic %s will use schema format: %s", id, topic, schemaFormat) + } + + // Initialize consumer based on configuration + if c.useConfluent { + if err := c.initConfluentConsumer(); err != nil { + return nil, fmt.Errorf("failed to initialize Confluent consumer: %w", err) + } + } else { + if err := c.initSaramaConsumer(); err != nil { + return nil, fmt.Errorf("failed to initialize Sarama consumer: %w", err) + } + } + + // Initialize Avro codec if schemas are enabled + if cfg.Schemas.Enabled { + if err := c.initAvroCodec(); err != nil { + return nil, fmt.Errorf("failed to initialize Avro codec: %w", err) + } + } + + log.Printf("Consumer %d initialized for group %s", id, consumerGroup) + return c, nil +} + +// initSaramaConsumer initializes the Sarama consumer group +func (c *Consumer) initSaramaConsumer() error { + config := sarama.NewConfig() + + // Consumer configuration + config.Consumer.Return.Errors = true + config.Consumer.Offsets.Initial = sarama.OffsetOldest + if c.config.Consumers.AutoOffsetReset == "latest" { + config.Consumer.Offsets.Initial = sarama.OffsetNewest + } + + // Auto commit configuration + config.Consumer.Offsets.AutoCommit.Enable = c.config.Consumers.EnableAutoCommit + config.Consumer.Offsets.AutoCommit.Interval = time.Duration(c.config.Consumers.AutoCommitIntervalMs) * time.Millisecond + + // Session and heartbeat configuration + config.Consumer.Group.Session.Timeout = time.Duration(c.config.Consumers.SessionTimeoutMs) * time.Millisecond + config.Consumer.Group.Heartbeat.Interval = time.Duration(c.config.Consumers.HeartbeatIntervalMs) * time.Millisecond + + // Fetch configuration + config.Consumer.Fetch.Min = int32(c.config.Consumers.FetchMinBytes) + config.Consumer.Fetch.Default = 10 * 1024 * 1024 // 10MB per partition (increased from 1MB default) + config.Consumer.Fetch.Max = int32(c.config.Consumers.FetchMaxBytes) + config.Consumer.MaxWaitTime = time.Duration(c.config.Consumers.FetchMaxWaitMs) * time.Millisecond + config.Consumer.MaxProcessingTime = time.Duration(c.config.Consumers.MaxPollIntervalMs) * time.Millisecond + + // Channel buffer sizes for concurrent partition consumption + config.ChannelBufferSize = 256 // Increase from default 256 to allow more buffering + + // Enable concurrent partition fetching by increasing the number of broker connections + // This allows Sarama to fetch from multiple partitions in parallel + config.Net.MaxOpenRequests = 20 // Increase from default 5 to allow 20 concurrent requests + + // Version + config.Version = sarama.V2_8_0_0 + + // Create consumer group + consumerGroup, err := sarama.NewConsumerGroup(c.config.Kafka.BootstrapServers, c.consumerGroup, config) + if err != nil { + return fmt.Errorf("failed to create Sarama consumer group: %w", err) + } + + c.saramaConsumer = consumerGroup + return nil +} + +// initConfluentConsumer initializes the Confluent Kafka Go consumer +func (c *Consumer) initConfluentConsumer() error { + // Confluent consumer disabled, using Sarama only + return fmt.Errorf("confluent consumer not enabled") +} + +// initAvroCodec initializes the Avro codec for schema-based messages +func (c *Consumer) initAvroCodec() error { + // Use the LoadTestMessage schema (matches what producer uses) + loadTestSchema := `{ + "type": "record", + "name": "LoadTestMessage", + "namespace": "com.seaweedfs.loadtest", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "timestamp", "type": "long"}, + {"name": "producer_id", "type": "int"}, + {"name": "counter", "type": "long"}, + {"name": "user_id", "type": "string"}, + {"name": "event_type", "type": "string"}, + {"name": "properties", "type": {"type": "map", "values": "string"}} + ] + }` + + codec, err := goavro.NewCodec(loadTestSchema) + if err != nil { + return fmt.Errorf("failed to create Avro codec: %w", err) + } + + c.avroCodec = codec + return nil +} + +// Run starts the consumer and consumes messages until the context is cancelled +func (c *Consumer) Run(ctx context.Context) { + log.Printf("Consumer %d starting for group %s", c.id, c.consumerGroup) + defer log.Printf("Consumer %d stopped", c.id) + + if c.useConfluent { + c.runConfluentConsumer(ctx) + } else { + c.runSaramaConsumer(ctx) + } +} + +// runSaramaConsumer runs the Sarama consumer group +func (c *Consumer) runSaramaConsumer(ctx context.Context) { + handler := &ConsumerGroupHandler{ + consumer: c, + } + + var wg sync.WaitGroup + + // Start error handler + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case err, ok := <-c.saramaConsumer.Errors(): + if !ok { + return + } + log.Printf("Consumer %d error: %v", c.id, err) + c.metricsCollector.RecordConsumerError() + case <-ctx.Done(): + return + } + } + }() + + // Start consumer group session + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + if err := c.saramaConsumer.Consume(ctx, c.topics, handler); err != nil { + log.Printf("Consumer %d: Error consuming: %v", c.id, err) + c.metricsCollector.RecordConsumerError() + + // Wait before retrying + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + return + } + } + } + } + }() + + // Start lag monitoring + wg.Add(1) + go func() { + defer wg.Done() + c.monitorConsumerLag(ctx) + }() + + // Wait for completion + <-ctx.Done() + log.Printf("Consumer %d: Context cancelled, shutting down", c.id) + wg.Wait() +} + +// runConfluentConsumer runs the Confluent consumer +func (c *Consumer) runConfluentConsumer(ctx context.Context) { + // Confluent consumer disabled, using Sarama only + log.Printf("Consumer %d: Confluent consumer not enabled", c.id) +} + +// processMessage processes a consumed message +func (c *Consumer) processMessage(topicPtr *string, partition int32, offset int64, key, value []byte) error { + topic := "" + if topicPtr != nil { + topic = *topicPtr + } + + // Update offset tracking + c.updateOffset(topic, partition, offset) + + // Decode message based on topic-specific schema format + var decodedMessage interface{} + var err error + + // Determine schema format for this topic (if schemas are enabled) + var schemaFormat string + if c.config.Schemas.Enabled { + schemaFormat = c.schemaFormats[topic] + if schemaFormat == "" { + // Fallback to config if topic not in map + schemaFormat = c.config.Producers.ValueType + } + } else { + // No schemas, use global value type + schemaFormat = c.config.Producers.ValueType + } + + // Decode message based on format + switch schemaFormat { + case "avro", "AVRO": + decodedMessage, err = c.decodeAvroMessage(value) + case "json", "JSON", "JSON_SCHEMA": + decodedMessage, err = c.decodeJSONSchemaMessage(value) + case "protobuf", "PROTOBUF": + decodedMessage, err = c.decodeProtobufMessage(value) + case "binary": + decodedMessage, err = c.decodeBinaryMessage(value) + default: + // Fallback to plain JSON + decodedMessage, err = c.decodeJSONMessage(value) + } + + if err != nil { + return fmt.Errorf("failed to decode message: %w", err) + } + + // Note: Removed artificial delay to allow maximum throughput + // If you need to simulate processing time, add a configurable delay setting + // time.Sleep(time.Millisecond) // Minimal processing delay + + // Record metrics + c.metricsCollector.RecordConsumedMessage(len(value)) + c.messagesProcessed++ + + // Log progress + if c.id == 0 && c.messagesProcessed%1000 == 0 { + log.Printf("Consumer %d: Processed %d messages (latest: %s[%d]@%d)", + c.id, c.messagesProcessed, topic, partition, offset) + } + + // Optional: Validate message content (for testing purposes) + if c.config.Chaos.Enabled { + if err := c.validateMessage(decodedMessage); err != nil { + log.Printf("Consumer %d: Message validation failed: %v", c.id, err) + } + } + + return nil +} + +// decodeJSONMessage decodes a JSON message +func (c *Consumer) decodeJSONMessage(value []byte) (interface{}, error) { + var message map[string]interface{} + if err := json.Unmarshal(value, &message); err != nil { + // DEBUG: Log the raw bytes when JSON parsing fails + log.Printf("Consumer %d: JSON decode failed. Length: %d, Raw bytes (hex): %x, Raw string: %q, Error: %v", + c.id, len(value), value, string(value), err) + return nil, err + } + return message, nil +} + +// decodeAvroMessage decodes an Avro message (handles Confluent Wire Format) +func (c *Consumer) decodeAvroMessage(value []byte) (interface{}, error) { + if c.avroCodec == nil { + return nil, fmt.Errorf("Avro codec not initialized") + } + + // Handle Confluent Wire Format when schemas are enabled + var avroData []byte + if c.config.Schemas.Enabled { + if len(value) < 5 { + return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value)) + } + + // Check magic byte (should be 0) + if value[0] != 0 { + return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0]) + } + + // Extract schema ID (bytes 1-4, big-endian) + schemaID := binary.BigEndian.Uint32(value[1:5]) + _ = schemaID // TODO: Could validate schema ID matches expected schema + + // Extract Avro data (bytes 5+) + avroData = value[5:] + } else { + // No wire format, use raw data + avroData = value + } + + native, _, err := c.avroCodec.NativeFromBinary(avroData) + if err != nil { + return nil, fmt.Errorf("failed to decode Avro data: %w", err) + } + + return native, nil +} + +// decodeJSONSchemaMessage decodes a JSON Schema message (handles Confluent Wire Format) +func (c *Consumer) decodeJSONSchemaMessage(value []byte) (interface{}, error) { + // Handle Confluent Wire Format when schemas are enabled + var jsonData []byte + if c.config.Schemas.Enabled { + if len(value) < 5 { + return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value)) + } + + // Check magic byte (should be 0) + if value[0] != 0 { + return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0]) + } + + // Extract schema ID (bytes 1-4, big-endian) + schemaID := binary.BigEndian.Uint32(value[1:5]) + _ = schemaID // TODO: Could validate schema ID matches expected schema + + // Extract JSON data (bytes 5+) + jsonData = value[5:] + } else { + // No wire format, use raw data + jsonData = value + } + + // Decode JSON + var message map[string]interface{} + if err := json.Unmarshal(jsonData, &message); err != nil { + return nil, fmt.Errorf("failed to decode JSON data: %w", err) + } + + return message, nil +} + +// decodeProtobufMessage decodes a Protobuf message (handles Confluent Wire Format) +func (c *Consumer) decodeProtobufMessage(value []byte) (interface{}, error) { + // Handle Confluent Wire Format when schemas are enabled + var protoData []byte + if c.config.Schemas.Enabled { + if len(value) < 5 { + return nil, fmt.Errorf("message too short for Confluent Wire Format: %d bytes", len(value)) + } + + // Check magic byte (should be 0) + if value[0] != 0 { + return nil, fmt.Errorf("invalid Confluent Wire Format magic byte: %d", value[0]) + } + + // Extract schema ID (bytes 1-4, big-endian) + schemaID := binary.BigEndian.Uint32(value[1:5]) + _ = schemaID // TODO: Could validate schema ID matches expected schema + + // Extract Protobuf data (bytes 5+) + protoData = value[5:] + } else { + // No wire format, use raw data + protoData = value + } + + // Unmarshal protobuf message + var protoMsg pb.LoadTestMessage + if err := proto.Unmarshal(protoData, &protoMsg); err != nil { + return nil, fmt.Errorf("failed to unmarshal Protobuf data: %w", err) + } + + // Convert to map for consistency with other decoders + return map[string]interface{}{ + "id": protoMsg.Id, + "timestamp": protoMsg.Timestamp, + "producer_id": protoMsg.ProducerId, + "counter": protoMsg.Counter, + "user_id": protoMsg.UserId, + "event_type": protoMsg.EventType, + "properties": protoMsg.Properties, + }, nil +} + +// decodeBinaryMessage decodes a binary message +func (c *Consumer) decodeBinaryMessage(value []byte) (interface{}, error) { + if len(value) < 20 { + return nil, fmt.Errorf("binary message too short") + } + + // Extract fields from the binary format: + // [producer_id:4][counter:8][timestamp:8][random_data:...] + + producerID := int(value[0])<<24 | int(value[1])<<16 | int(value[2])<<8 | int(value[3]) + + var counter int64 + for i := 0; i < 8; i++ { + counter |= int64(value[4+i]) << (56 - i*8) + } + + var timestamp int64 + for i := 0; i < 8; i++ { + timestamp |= int64(value[12+i]) << (56 - i*8) + } + + return map[string]interface{}{ + "producer_id": producerID, + "counter": counter, + "timestamp": timestamp, + "data_size": len(value), + }, nil +} + +// validateMessage performs basic message validation +func (c *Consumer) validateMessage(message interface{}) error { + // This is a placeholder for message validation logic + // In a real load test, you might validate: + // - Message structure + // - Required fields + // - Data consistency + // - Schema compliance + + if message == nil { + return fmt.Errorf("message is nil") + } + + return nil +} + +// updateOffset updates the last seen offset for lag calculation +func (c *Consumer) updateOffset(topic string, partition int32, offset int64) { + c.offsetMutex.Lock() + defer c.offsetMutex.Unlock() + + if c.lastOffset[topic] == nil { + c.lastOffset[topic] = make(map[int32]int64) + } + c.lastOffset[topic][partition] = offset +} + +// monitorConsumerLag monitors and reports consumer lag +func (c *Consumer) monitorConsumerLag(ctx context.Context) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.reportConsumerLag() + } + } +} + +// reportConsumerLag calculates and reports consumer lag +func (c *Consumer) reportConsumerLag() { + // This is a simplified lag calculation + // In a real implementation, you would query the broker for high water marks + + c.offsetMutex.RLock() + defer c.offsetMutex.RUnlock() + + for topic, partitions := range c.lastOffset { + for partition, _ := range partitions { + // For simplicity, assume lag is always 0 when we're consuming actively + // In a real test, you would compare against the high water mark + lag := int64(0) + + c.metricsCollector.UpdateConsumerLag(c.consumerGroup, topic, partition, lag) + } + } +} + +// Close closes the consumer and cleans up resources +func (c *Consumer) Close() error { + log.Printf("Consumer %d: Closing", c.id) + + if c.saramaConsumer != nil { + return c.saramaConsumer.Close() + } + + return nil +} + +// ConsumerGroupHandler implements sarama.ConsumerGroupHandler +type ConsumerGroupHandler struct { + consumer *Consumer +} + +// Setup is run at the beginning of a new session, before ConsumeClaim +func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { + log.Printf("Consumer %d: Consumer group session setup", h.consumer.id) + return nil +} + +// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited +func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { + log.Printf("Consumer %d: Consumer group session cleanup", h.consumer.id) + return nil +} + +// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages() +func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + msgCount := 0 + for { + select { + case message, ok := <-claim.Messages(): + if !ok { + return nil + } + msgCount++ + + // Process the message + var key []byte + if message.Key != nil { + key = message.Key + } + + if err := h.consumer.processMessage(&message.Topic, message.Partition, message.Offset, key, message.Value); err != nil { + log.Printf("Consumer %d: Error processing message: %v", h.consumer.id, err) + h.consumer.metricsCollector.RecordConsumerError() + + // Add a small delay for schema validation or other processing errors to avoid overloading + // select { + // case <-time.After(100 * time.Millisecond): + // // Continue after brief delay + // case <-session.Context().Done(): + // return nil + // } + } else { + // Mark message as processed + session.MarkMessage(message, "") + } + + case <-session.Context().Done(): + log.Printf("Consumer %d: Session context cancelled for %s[%d]", + h.consumer.id, claim.Topic(), claim.Partition()) + return nil + } + } +} + +// Helper functions + +func joinStrings(strs []string, sep string) string { + if len(strs) == 0 { + return "" + } + + result := strs[0] + for i := 1; i < len(strs); i++ { + result += sep + strs[i] + } + return result +} diff --git a/test/kafka/kafka-client-loadtest/internal/metrics/collector.go b/test/kafka/kafka-client-loadtest/internal/metrics/collector.go new file mode 100644 index 000000000..d6a1edb8e --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/metrics/collector.go @@ -0,0 +1,353 @@ +package metrics + +import ( + "fmt" + "io" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Collector handles metrics collection for the load test +type Collector struct { + // Atomic counters for thread-safe operations + messagesProduced int64 + messagesConsumed int64 + bytesProduced int64 + bytesConsumed int64 + producerErrors int64 + consumerErrors int64 + + // Latency tracking + latencies []time.Duration + latencyMutex sync.RWMutex + + // Consumer lag tracking + consumerLag map[string]int64 + consumerLagMutex sync.RWMutex + + // Test timing + startTime time.Time + + // Prometheus metrics + prometheusMetrics *PrometheusMetrics +} + +// PrometheusMetrics holds all Prometheus metric definitions +type PrometheusMetrics struct { + MessagesProducedTotal prometheus.Counter + MessagesConsumedTotal prometheus.Counter + BytesProducedTotal prometheus.Counter + BytesConsumedTotal prometheus.Counter + ProducerErrorsTotal prometheus.Counter + ConsumerErrorsTotal prometheus.Counter + + MessageLatencyHistogram prometheus.Histogram + ProducerThroughput prometheus.Gauge + ConsumerThroughput prometheus.Gauge + ConsumerLagGauge *prometheus.GaugeVec + + ActiveProducers prometheus.Gauge + ActiveConsumers prometheus.Gauge +} + +// NewCollector creates a new metrics collector +func NewCollector() *Collector { + return &Collector{ + startTime: time.Now(), + consumerLag: make(map[string]int64), + prometheusMetrics: &PrometheusMetrics{ + MessagesProducedTotal: promauto.NewCounter(prometheus.CounterOpts{ + Name: "kafka_loadtest_messages_produced_total", + Help: "Total number of messages produced", + }), + MessagesConsumedTotal: promauto.NewCounter(prometheus.CounterOpts{ + Name: "kafka_loadtest_messages_consumed_total", + Help: "Total number of messages consumed", + }), + BytesProducedTotal: promauto.NewCounter(prometheus.CounterOpts{ + Name: "kafka_loadtest_bytes_produced_total", + Help: "Total bytes produced", + }), + BytesConsumedTotal: promauto.NewCounter(prometheus.CounterOpts{ + Name: "kafka_loadtest_bytes_consumed_total", + Help: "Total bytes consumed", + }), + ProducerErrorsTotal: promauto.NewCounter(prometheus.CounterOpts{ + Name: "kafka_loadtest_producer_errors_total", + Help: "Total number of producer errors", + }), + ConsumerErrorsTotal: promauto.NewCounter(prometheus.CounterOpts{ + Name: "kafka_loadtest_consumer_errors_total", + Help: "Total number of consumer errors", + }), + MessageLatencyHistogram: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "kafka_loadtest_message_latency_seconds", + Help: "Message end-to-end latency in seconds", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), // 1ms to ~32s + }), + ProducerThroughput: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "kafka_loadtest_producer_throughput_msgs_per_sec", + Help: "Current producer throughput in messages per second", + }), + ConsumerThroughput: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "kafka_loadtest_consumer_throughput_msgs_per_sec", + Help: "Current consumer throughput in messages per second", + }), + ConsumerLagGauge: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "kafka_loadtest_consumer_lag_messages", + Help: "Consumer lag in messages", + }, []string{"consumer_group", "topic", "partition"}), + ActiveProducers: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "kafka_loadtest_active_producers", + Help: "Number of active producers", + }), + ActiveConsumers: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "kafka_loadtest_active_consumers", + Help: "Number of active consumers", + }), + }, + } +} + +// RecordProducedMessage records a successfully produced message +func (c *Collector) RecordProducedMessage(size int, latency time.Duration) { + atomic.AddInt64(&c.messagesProduced, 1) + atomic.AddInt64(&c.bytesProduced, int64(size)) + + c.prometheusMetrics.MessagesProducedTotal.Inc() + c.prometheusMetrics.BytesProducedTotal.Add(float64(size)) + c.prometheusMetrics.MessageLatencyHistogram.Observe(latency.Seconds()) + + // Store latency for percentile calculations + c.latencyMutex.Lock() + c.latencies = append(c.latencies, latency) + // Keep only recent latencies to avoid memory bloat + if len(c.latencies) > 100000 { + c.latencies = c.latencies[50000:] + } + c.latencyMutex.Unlock() +} + +// RecordConsumedMessage records a successfully consumed message +func (c *Collector) RecordConsumedMessage(size int) { + atomic.AddInt64(&c.messagesConsumed, 1) + atomic.AddInt64(&c.bytesConsumed, int64(size)) + + c.prometheusMetrics.MessagesConsumedTotal.Inc() + c.prometheusMetrics.BytesConsumedTotal.Add(float64(size)) +} + +// RecordProducerError records a producer error +func (c *Collector) RecordProducerError() { + atomic.AddInt64(&c.producerErrors, 1) + c.prometheusMetrics.ProducerErrorsTotal.Inc() +} + +// RecordConsumerError records a consumer error +func (c *Collector) RecordConsumerError() { + atomic.AddInt64(&c.consumerErrors, 1) + c.prometheusMetrics.ConsumerErrorsTotal.Inc() +} + +// UpdateConsumerLag updates consumer lag metrics +func (c *Collector) UpdateConsumerLag(consumerGroup, topic string, partition int32, lag int64) { + key := fmt.Sprintf("%s-%s-%d", consumerGroup, topic, partition) + + c.consumerLagMutex.Lock() + c.consumerLag[key] = lag + c.consumerLagMutex.Unlock() + + c.prometheusMetrics.ConsumerLagGauge.WithLabelValues( + consumerGroup, topic, fmt.Sprintf("%d", partition), + ).Set(float64(lag)) +} + +// UpdateThroughput updates throughput gauges +func (c *Collector) UpdateThroughput(producerRate, consumerRate float64) { + c.prometheusMetrics.ProducerThroughput.Set(producerRate) + c.prometheusMetrics.ConsumerThroughput.Set(consumerRate) +} + +// UpdateActiveClients updates active client counts +func (c *Collector) UpdateActiveClients(producers, consumers int) { + c.prometheusMetrics.ActiveProducers.Set(float64(producers)) + c.prometheusMetrics.ActiveConsumers.Set(float64(consumers)) +} + +// GetStats returns current statistics +func (c *Collector) GetStats() Stats { + produced := atomic.LoadInt64(&c.messagesProduced) + consumed := atomic.LoadInt64(&c.messagesConsumed) + bytesProduced := atomic.LoadInt64(&c.bytesProduced) + bytesConsumed := atomic.LoadInt64(&c.bytesConsumed) + producerErrors := atomic.LoadInt64(&c.producerErrors) + consumerErrors := atomic.LoadInt64(&c.consumerErrors) + + duration := time.Since(c.startTime) + + // Calculate throughput + producerThroughput := float64(produced) / duration.Seconds() + consumerThroughput := float64(consumed) / duration.Seconds() + + // Calculate latency percentiles + var latencyPercentiles map[float64]time.Duration + c.latencyMutex.RLock() + if len(c.latencies) > 0 { + latencyPercentiles = c.calculatePercentiles(c.latencies) + } + c.latencyMutex.RUnlock() + + // Get consumer lag summary + c.consumerLagMutex.RLock() + totalLag := int64(0) + maxLag := int64(0) + for _, lag := range c.consumerLag { + totalLag += lag + if lag > maxLag { + maxLag = lag + } + } + avgLag := float64(0) + if len(c.consumerLag) > 0 { + avgLag = float64(totalLag) / float64(len(c.consumerLag)) + } + c.consumerLagMutex.RUnlock() + + return Stats{ + Duration: duration, + MessagesProduced: produced, + MessagesConsumed: consumed, + BytesProduced: bytesProduced, + BytesConsumed: bytesConsumed, + ProducerErrors: producerErrors, + ConsumerErrors: consumerErrors, + ProducerThroughput: producerThroughput, + ConsumerThroughput: consumerThroughput, + LatencyPercentiles: latencyPercentiles, + TotalConsumerLag: totalLag, + MaxConsumerLag: maxLag, + AvgConsumerLag: avgLag, + } +} + +// PrintSummary prints a summary of the test statistics +func (c *Collector) PrintSummary() { + stats := c.GetStats() + + fmt.Printf("\n=== Load Test Summary ===\n") + fmt.Printf("Test Duration: %v\n", stats.Duration) + fmt.Printf("\nMessages:\n") + fmt.Printf(" Produced: %d (%.2f MB)\n", stats.MessagesProduced, float64(stats.BytesProduced)/1024/1024) + fmt.Printf(" Consumed: %d (%.2f MB)\n", stats.MessagesConsumed, float64(stats.BytesConsumed)/1024/1024) + fmt.Printf(" Producer Errors: %d\n", stats.ProducerErrors) + fmt.Printf(" Consumer Errors: %d\n", stats.ConsumerErrors) + + fmt.Printf("\nThroughput:\n") + fmt.Printf(" Producer: %.2f msgs/sec\n", stats.ProducerThroughput) + fmt.Printf(" Consumer: %.2f msgs/sec\n", stats.ConsumerThroughput) + + if stats.LatencyPercentiles != nil { + fmt.Printf("\nLatency Percentiles:\n") + percentiles := []float64{50, 90, 95, 99, 99.9} + for _, p := range percentiles { + if latency, exists := stats.LatencyPercentiles[p]; exists { + fmt.Printf(" p%.1f: %v\n", p, latency) + } + } + } + + fmt.Printf("\nConsumer Lag:\n") + fmt.Printf(" Total: %d messages\n", stats.TotalConsumerLag) + fmt.Printf(" Max: %d messages\n", stats.MaxConsumerLag) + fmt.Printf(" Average: %.2f messages\n", stats.AvgConsumerLag) + fmt.Printf("=========================\n") +} + +// WriteStats writes statistics to a writer (for HTTP endpoint) +func (c *Collector) WriteStats(w io.Writer) { + stats := c.GetStats() + + fmt.Fprintf(w, "# Load Test Statistics\n") + fmt.Fprintf(w, "duration_seconds %v\n", stats.Duration.Seconds()) + fmt.Fprintf(w, "messages_produced %d\n", stats.MessagesProduced) + fmt.Fprintf(w, "messages_consumed %d\n", stats.MessagesConsumed) + fmt.Fprintf(w, "bytes_produced %d\n", stats.BytesProduced) + fmt.Fprintf(w, "bytes_consumed %d\n", stats.BytesConsumed) + fmt.Fprintf(w, "producer_errors %d\n", stats.ProducerErrors) + fmt.Fprintf(w, "consumer_errors %d\n", stats.ConsumerErrors) + fmt.Fprintf(w, "producer_throughput_msgs_per_sec %f\n", stats.ProducerThroughput) + fmt.Fprintf(w, "consumer_throughput_msgs_per_sec %f\n", stats.ConsumerThroughput) + fmt.Fprintf(w, "total_consumer_lag %d\n", stats.TotalConsumerLag) + fmt.Fprintf(w, "max_consumer_lag %d\n", stats.MaxConsumerLag) + fmt.Fprintf(w, "avg_consumer_lag %f\n", stats.AvgConsumerLag) + + if stats.LatencyPercentiles != nil { + for percentile, latency := range stats.LatencyPercentiles { + fmt.Fprintf(w, "latency_p%g_seconds %f\n", percentile, latency.Seconds()) + } + } +} + +// calculatePercentiles calculates latency percentiles +func (c *Collector) calculatePercentiles(latencies []time.Duration) map[float64]time.Duration { + if len(latencies) == 0 { + return nil + } + + // Make a copy and sort + sorted := make([]time.Duration, len(latencies)) + copy(sorted, latencies) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i] < sorted[j] + }) + + percentiles := map[float64]time.Duration{ + 50: calculatePercentile(sorted, 50), + 90: calculatePercentile(sorted, 90), + 95: calculatePercentile(sorted, 95), + 99: calculatePercentile(sorted, 99), + 99.9: calculatePercentile(sorted, 99.9), + } + + return percentiles +} + +// calculatePercentile calculates a specific percentile from sorted data +func calculatePercentile(sorted []time.Duration, percentile float64) time.Duration { + if len(sorted) == 0 { + return 0 + } + + index := percentile / 100.0 * float64(len(sorted)-1) + if index == float64(int(index)) { + return sorted[int(index)] + } + + lower := sorted[int(index)] + upper := sorted[int(index)+1] + weight := index - float64(int(index)) + + return time.Duration(float64(lower) + weight*float64(upper-lower)) +} + +// Stats represents the current test statistics +type Stats struct { + Duration time.Duration + MessagesProduced int64 + MessagesConsumed int64 + BytesProduced int64 + BytesConsumed int64 + ProducerErrors int64 + ConsumerErrors int64 + ProducerThroughput float64 + ConsumerThroughput float64 + LatencyPercentiles map[float64]time.Duration + TotalConsumerLag int64 + MaxConsumerLag int64 + AvgConsumerLag float64 +} diff --git a/test/kafka/kafka-client-loadtest/internal/producer/producer.go b/test/kafka/kafka-client-loadtest/internal/producer/producer.go new file mode 100644 index 000000000..167bfeac6 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/producer/producer.go @@ -0,0 +1,770 @@ +package producer + +import ( + "context" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "math/rand" + "net/http" + "strings" + "sync" + "time" + + "github.com/IBM/sarama" + "github.com/linkedin/goavro/v2" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/config" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema" + pb "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb" + "google.golang.org/protobuf/proto" +) + +// ErrCircuitBreakerOpen indicates that the circuit breaker is open due to consecutive failures +var ErrCircuitBreakerOpen = errors.New("circuit breaker is open") + +// Producer represents a Kafka producer for load testing +type Producer struct { + id int + config *config.Config + metricsCollector *metrics.Collector + saramaProducer sarama.SyncProducer + useConfluent bool + topics []string + avroCodec *goavro.Codec + startTime time.Time // Test run start time for generating unique keys + + // Schema management + schemaIDs map[string]int // topic -> schema ID mapping + schemaFormats map[string]string // topic -> schema format mapping (AVRO, JSON, etc.) + + // Rate limiting + rateLimiter *time.Ticker + + // Message generation + messageCounter int64 + random *rand.Rand + + // Circuit breaker detection + consecutiveFailures int +} + +// Message represents a test message +type Message struct { + ID string `json:"id"` + Timestamp int64 `json:"timestamp"` + ProducerID int `json:"producer_id"` + Counter int64 `json:"counter"` + UserID string `json:"user_id"` + EventType string `json:"event_type"` + Properties map[string]interface{} `json:"properties"` +} + +// New creates a new producer instance +func New(cfg *config.Config, collector *metrics.Collector, id int) (*Producer, error) { + p := &Producer{ + id: id, + config: cfg, + metricsCollector: collector, + topics: cfg.GetTopicNames(), + random: rand.New(rand.NewSource(time.Now().UnixNano() + int64(id))), + useConfluent: false, // Use Sarama by default, can be made configurable + schemaIDs: make(map[string]int), + schemaFormats: make(map[string]string), + startTime: time.Now(), // Record test start time for unique key generation + } + + // Initialize schema formats for each topic + // Distribute across AVRO, JSON, and PROTOBUF formats + for i, topic := range p.topics { + var schemaFormat string + if cfg.Producers.SchemaFormat != "" { + // Use explicit config if provided + schemaFormat = cfg.Producers.SchemaFormat + } else { + // Distribute across three formats: AVRO, JSON, PROTOBUF + switch i % 3 { + case 0: + schemaFormat = "AVRO" + case 1: + schemaFormat = "JSON" + case 2: + schemaFormat = "PROTOBUF" + } + } + p.schemaFormats[topic] = schemaFormat + log.Printf("Producer %d: Topic %s will use schema format: %s", id, topic, schemaFormat) + } + + // Set up rate limiter if specified + if cfg.Producers.MessageRate > 0 { + p.rateLimiter = time.NewTicker(time.Second / time.Duration(cfg.Producers.MessageRate)) + } + + // Initialize Sarama producer + if err := p.initSaramaProducer(); err != nil { + return nil, fmt.Errorf("failed to initialize Sarama producer: %w", err) + } + + // Initialize Avro codec and register/fetch schemas if schemas are enabled + if cfg.Schemas.Enabled { + if err := p.initAvroCodec(); err != nil { + return nil, fmt.Errorf("failed to initialize Avro codec: %w", err) + } + if err := p.ensureSchemasRegistered(); err != nil { + return nil, fmt.Errorf("failed to ensure schemas are registered: %w", err) + } + if err := p.fetchSchemaIDs(); err != nil { + return nil, fmt.Errorf("failed to fetch schema IDs: %w", err) + } + } + + log.Printf("Producer %d initialized successfully", id) + return p, nil +} + +// initSaramaProducer initializes the Sarama producer +func (p *Producer) initSaramaProducer() error { + config := sarama.NewConfig() + + // Producer configuration + config.Producer.RequiredAcks = sarama.WaitForAll + if p.config.Producers.Acks == "0" { + config.Producer.RequiredAcks = sarama.NoResponse + } else if p.config.Producers.Acks == "1" { + config.Producer.RequiredAcks = sarama.WaitForLocal + } + + config.Producer.Retry.Max = p.config.Producers.Retries + config.Producer.Retry.Backoff = time.Duration(p.config.Producers.RetryBackoffMs) * time.Millisecond + config.Producer.Return.Successes = true + config.Producer.Return.Errors = true + + // Compression + switch p.config.Producers.CompressionType { + case "gzip": + config.Producer.Compression = sarama.CompressionGZIP + case "snappy": + config.Producer.Compression = sarama.CompressionSnappy + case "lz4": + config.Producer.Compression = sarama.CompressionLZ4 + case "zstd": + config.Producer.Compression = sarama.CompressionZSTD + default: + config.Producer.Compression = sarama.CompressionNone + } + + // Batching + config.Producer.Flush.Messages = p.config.Producers.BatchSize + config.Producer.Flush.Frequency = time.Duration(p.config.Producers.LingerMs) * time.Millisecond + + // Timeouts + config.Net.DialTimeout = 30 * time.Second + config.Net.ReadTimeout = 30 * time.Second + config.Net.WriteTimeout = 30 * time.Second + + // Version + config.Version = sarama.V2_8_0_0 + + // Create producer + producer, err := sarama.NewSyncProducer(p.config.Kafka.BootstrapServers, config) + if err != nil { + return fmt.Errorf("failed to create Sarama producer: %w", err) + } + + p.saramaProducer = producer + return nil +} + +// initAvroCodec initializes the Avro codec for schema-based messages +func (p *Producer) initAvroCodec() error { + // Use the shared LoadTestMessage schema + codec, err := goavro.NewCodec(schema.GetAvroSchema()) + if err != nil { + return fmt.Errorf("failed to create Avro codec: %w", err) + } + + p.avroCodec = codec + return nil +} + +// Run starts the producer and produces messages until the context is cancelled +func (p *Producer) Run(ctx context.Context) error { + log.Printf("Producer %d starting", p.id) + defer log.Printf("Producer %d stopped", p.id) + + // Create topics if they don't exist + if err := p.createTopics(); err != nil { + log.Printf("Producer %d: Failed to create topics: %v", p.id, err) + p.metricsCollector.RecordProducerError() + return err + } + + var wg sync.WaitGroup + errChan := make(chan error, 1) + + // Main production loop + wg.Add(1) + go func() { + defer wg.Done() + if err := p.produceMessages(ctx); err != nil { + errChan <- err + } + }() + + // Wait for completion or error + select { + case <-ctx.Done(): + log.Printf("Producer %d: Context cancelled, shutting down", p.id) + case err := <-errChan: + log.Printf("Producer %d: Stopping due to error: %v", p.id, err) + return err + } + + // Stop rate limiter + if p.rateLimiter != nil { + p.rateLimiter.Stop() + } + + // Wait for goroutines to finish + wg.Wait() + return nil +} + +// produceMessages is the main message production loop +func (p *Producer) produceMessages(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + default: + // Rate limiting + if p.rateLimiter != nil { + select { + case <-p.rateLimiter.C: + // Proceed + case <-ctx.Done(): + return nil + } + } + + if err := p.produceMessage(); err != nil { + log.Printf("Producer %d: Failed to produce message: %v", p.id, err) + p.metricsCollector.RecordProducerError() + + // Check for circuit breaker error + if p.isCircuitBreakerError(err) { + p.consecutiveFailures++ + log.Printf("Producer %d: Circuit breaker error detected (%d/%d consecutive failures)", + p.id, p.consecutiveFailures, 3) + + // Progressive backoff delay to avoid overloading the gateway + backoffDelay := time.Duration(p.consecutiveFailures) * 500 * time.Millisecond + log.Printf("Producer %d: Backing off for %v to avoid overloading gateway", p.id, backoffDelay) + + select { + case <-time.After(backoffDelay): + // Continue after delay + case <-ctx.Done(): + return nil + } + + // If we've hit 3 consecutive circuit breaker errors, stop the producer + if p.consecutiveFailures >= 3 { + log.Printf("Producer %d: Circuit breaker is open - stopping producer after %d consecutive failures", + p.id, p.consecutiveFailures) + return fmt.Errorf("%w: stopping producer after %d consecutive failures", ErrCircuitBreakerOpen, p.consecutiveFailures) + } + } else { + // Reset counter for non-circuit breaker errors + p.consecutiveFailures = 0 + } + } else { + // Reset counter on successful message + p.consecutiveFailures = 0 + } + } + } +} + +// produceMessage produces a single message +func (p *Producer) produceMessage() error { + startTime := time.Now() + + // Select random topic + topic := p.topics[p.random.Intn(len(p.topics))] + + // Produce message using Sarama (message will be generated based on topic's schema format) + return p.produceSaramaMessage(topic, startTime) +} + +// produceSaramaMessage produces a message using Sarama +// The message is generated internally based on the topic's schema format +func (p *Producer) produceSaramaMessage(topic string, startTime time.Time) error { + // Generate key + key := p.generateMessageKey() + + // If schemas are enabled, wrap in Confluent Wire Format based on topic's schema format + var messageValue []byte + if p.config.Schemas.Enabled { + schemaID, exists := p.schemaIDs[topic] + if !exists { + return fmt.Errorf("schema ID not found for topic %s", topic) + } + + // Get the schema format for this topic + schemaFormat := p.schemaFormats[topic] + + // CRITICAL FIX: Encode based on schema format, NOT config value_type + // The encoding MUST match what the schema registry and gateway expect + var encodedMessage []byte + var err error + switch schemaFormat { + case "AVRO": + // For Avro schema, encode as Avro binary + encodedMessage, err = p.generateAvroMessage() + if err != nil { + return fmt.Errorf("failed to encode as Avro for topic %s: %w", topic, err) + } + case "JSON": + // For JSON schema, encode as JSON + encodedMessage, err = p.generateJSONMessage() + if err != nil { + return fmt.Errorf("failed to encode as JSON for topic %s: %w", topic, err) + } + case "PROTOBUF": + // For PROTOBUF schema, encode as Protobuf binary + encodedMessage, err = p.generateProtobufMessage() + if err != nil { + return fmt.Errorf("failed to encode as Protobuf for topic %s: %w", topic, err) + } + default: + // Unknown format - fallback to JSON + encodedMessage, err = p.generateJSONMessage() + if err != nil { + return fmt.Errorf("failed to encode as JSON (unknown format fallback) for topic %s: %w", topic, err) + } + } + + // Wrap in Confluent wire format (magic byte + schema ID + payload) + messageValue = p.createConfluentWireFormat(schemaID, encodedMessage) + } else { + // No schemas - generate message based on config value_type + var err error + messageValue, err = p.generateMessage() + if err != nil { + return fmt.Errorf("failed to generate message: %w", err) + } + } + + msg := &sarama.ProducerMessage{ + Topic: topic, + Key: sarama.StringEncoder(key), + Value: sarama.ByteEncoder(messageValue), + } + + // Add headers if configured + if p.config.Producers.IncludeHeaders { + msg.Headers = []sarama.RecordHeader{ + {Key: []byte("producer_id"), Value: []byte(fmt.Sprintf("%d", p.id))}, + {Key: []byte("timestamp"), Value: []byte(fmt.Sprintf("%d", startTime.UnixNano()))}, + } + } + + // Produce message + _, _, err := p.saramaProducer.SendMessage(msg) + if err != nil { + return err + } + + // Record metrics + latency := time.Since(startTime) + p.metricsCollector.RecordProducedMessage(len(messageValue), latency) + + return nil +} + +// generateMessage generates a test message +func (p *Producer) generateMessage() ([]byte, error) { + p.messageCounter++ + + switch p.config.Producers.ValueType { + case "avro": + return p.generateAvroMessage() + case "json": + return p.generateJSONMessage() + case "binary": + return p.generateBinaryMessage() + default: + return p.generateJSONMessage() + } +} + +// generateJSONMessage generates a JSON test message +func (p *Producer) generateJSONMessage() ([]byte, error) { + msg := Message{ + ID: fmt.Sprintf("msg-%d-%d", p.id, p.messageCounter), + Timestamp: time.Now().UnixNano(), + ProducerID: p.id, + Counter: p.messageCounter, + UserID: fmt.Sprintf("user-%d", p.random.Intn(10000)), + EventType: p.randomEventType(), + Properties: map[string]interface{}{ + "session_id": fmt.Sprintf("sess-%d-%d", p.id, p.random.Intn(1000)), + "page_views": fmt.Sprintf("%d", p.random.Intn(100)), // String for Avro map<string,string> + "duration_ms": fmt.Sprintf("%d", p.random.Intn(300000)), // String for Avro map<string,string> + "country": p.randomCountry(), + "device_type": p.randomDeviceType(), + "app_version": fmt.Sprintf("v%d.%d.%d", p.random.Intn(10), p.random.Intn(10), p.random.Intn(100)), + }, + } + + // Marshal to JSON (no padding - let natural message size be used) + messageBytes, err := json.Marshal(msg) + if err != nil { + return nil, err + } + + return messageBytes, nil +} + +// generateProtobufMessage generates a Protobuf-encoded message +func (p *Producer) generateProtobufMessage() ([]byte, error) { + // Create protobuf message + protoMsg := &pb.LoadTestMessage{ + Id: fmt.Sprintf("msg-%d-%d", p.id, p.messageCounter), + Timestamp: time.Now().UnixNano(), + ProducerId: int32(p.id), + Counter: p.messageCounter, + UserId: fmt.Sprintf("user-%d", p.random.Intn(10000)), + EventType: p.randomEventType(), + Properties: map[string]string{ + "session_id": fmt.Sprintf("sess-%d-%d", p.id, p.random.Intn(1000)), + "page_views": fmt.Sprintf("%d", p.random.Intn(100)), + "duration_ms": fmt.Sprintf("%d", p.random.Intn(300000)), + "country": p.randomCountry(), + "device_type": p.randomDeviceType(), + "app_version": fmt.Sprintf("v%d.%d.%d", p.random.Intn(10), p.random.Intn(10), p.random.Intn(100)), + }, + } + + // Marshal to protobuf binary + messageBytes, err := proto.Marshal(protoMsg) + if err != nil { + return nil, err + } + + return messageBytes, nil +} + +// generateAvroMessage generates an Avro-encoded message with Confluent Wire Format +// NOTE: Avro messages are NOT padded - they have their own binary format +func (p *Producer) generateAvroMessage() ([]byte, error) { + if p.avroCodec == nil { + return nil, fmt.Errorf("Avro codec not initialized") + } + + // Create Avro-compatible record matching the LoadTestMessage schema + record := map[string]interface{}{ + "id": fmt.Sprintf("msg-%d-%d", p.id, p.messageCounter), + "timestamp": time.Now().UnixNano(), + "producer_id": p.id, + "counter": p.messageCounter, + "user_id": fmt.Sprintf("user-%d", p.random.Intn(10000)), + "event_type": p.randomEventType(), + "properties": map[string]interface{}{ + "session_id": fmt.Sprintf("sess-%d-%d", p.id, p.random.Intn(1000)), + "page_views": fmt.Sprintf("%d", p.random.Intn(100)), + "duration_ms": fmt.Sprintf("%d", p.random.Intn(300000)), + "country": p.randomCountry(), + "device_type": p.randomDeviceType(), + "app_version": fmt.Sprintf("v%d.%d.%d", p.random.Intn(10), p.random.Intn(10), p.random.Intn(100)), + }, + } + + // Encode to Avro binary + avroBytes, err := p.avroCodec.BinaryFromNative(nil, record) + if err != nil { + return nil, err + } + + return avroBytes, nil +} + +// generateBinaryMessage generates a binary test message (no padding) +func (p *Producer) generateBinaryMessage() ([]byte, error) { + // Create a simple binary message format: + // [producer_id:4][counter:8][timestamp:8] + message := make([]byte, 20) + + // Producer ID (4 bytes) + message[0] = byte(p.id >> 24) + message[1] = byte(p.id >> 16) + message[2] = byte(p.id >> 8) + message[3] = byte(p.id) + + // Counter (8 bytes) + for i := 0; i < 8; i++ { + message[4+i] = byte(p.messageCounter >> (56 - i*8)) + } + + // Timestamp (8 bytes) + timestamp := time.Now().UnixNano() + for i := 0; i < 8; i++ { + message[12+i] = byte(timestamp >> (56 - i*8)) + } + + return message, nil +} + +// generateMessageKey generates a message key based on the configured distribution +// Keys are prefixed with a test run ID to track messages across test runs +func (p *Producer) generateMessageKey() string { + // Use test start time as run ID (format: YYYYMMDD-HHMMSS) + runID := p.startTime.Format("20060102-150405") + + switch p.config.Producers.KeyDistribution { + case "sequential": + return fmt.Sprintf("run-%s-key-%d", runID, p.messageCounter) + case "uuid": + return fmt.Sprintf("run-%s-uuid-%d-%d-%d", runID, p.id, time.Now().UnixNano(), p.random.Intn(1000000)) + default: // random + return fmt.Sprintf("run-%s-key-%d", runID, p.random.Intn(10000)) + } +} + +// createTopics creates the test topics if they don't exist +func (p *Producer) createTopics() error { + // Use Sarama admin client to create topics + config := sarama.NewConfig() + config.Version = sarama.V2_8_0_0 + + admin, err := sarama.NewClusterAdmin(p.config.Kafka.BootstrapServers, config) + if err != nil { + return fmt.Errorf("failed to create admin client: %w", err) + } + defer admin.Close() + + // Create topic specifications + topicSpecs := make(map[string]*sarama.TopicDetail) + for _, topic := range p.topics { + topicSpecs[topic] = &sarama.TopicDetail{ + NumPartitions: int32(p.config.Topics.Partitions), + ReplicationFactor: int16(p.config.Topics.ReplicationFactor), + ConfigEntries: map[string]*string{ + "cleanup.policy": &p.config.Topics.CleanupPolicy, + "retention.ms": stringPtr(fmt.Sprintf("%d", p.config.Topics.RetentionMs)), + "segment.ms": stringPtr(fmt.Sprintf("%d", p.config.Topics.SegmentMs)), + }, + } + } + + // Create topics + for _, topic := range p.topics { + err = admin.CreateTopic(topic, topicSpecs[topic], false) + if err != nil && err != sarama.ErrTopicAlreadyExists { + log.Printf("Producer %d: Warning - failed to create topic %s: %v", p.id, topic, err) + } else { + log.Printf("Producer %d: Successfully created topic %s", p.id, topic) + } + } + + return nil +} + +// Close closes the producer and cleans up resources +func (p *Producer) Close() error { + log.Printf("Producer %d: Closing", p.id) + + if p.rateLimiter != nil { + p.rateLimiter.Stop() + } + + if p.saramaProducer != nil { + return p.saramaProducer.Close() + } + + return nil +} + +// Helper functions + +func stringPtr(s string) *string { + return &s +} + +func joinStrings(strs []string, sep string) string { + if len(strs) == 0 { + return "" + } + + result := strs[0] + for i := 1; i < len(strs); i++ { + result += sep + strs[i] + } + return result +} + +func (p *Producer) randomEventType() string { + events := []string{"login", "logout", "view", "click", "purchase", "signup", "search", "download"} + return events[p.random.Intn(len(events))] +} + +func (p *Producer) randomCountry() string { + countries := []string{"US", "CA", "UK", "DE", "FR", "JP", "AU", "BR", "IN", "CN"} + return countries[p.random.Intn(len(countries))] +} + +func (p *Producer) randomDeviceType() string { + devices := []string{"desktop", "mobile", "tablet", "tv", "watch"} + return devices[p.random.Intn(len(devices))] +} + +// fetchSchemaIDs fetches schema IDs from Schema Registry for all topics +func (p *Producer) fetchSchemaIDs() error { + for _, topic := range p.topics { + subject := topic + "-value" + schemaID, err := p.getSchemaID(subject) + if err != nil { + return fmt.Errorf("failed to get schema ID for subject %s: %w", subject, err) + } + p.schemaIDs[topic] = schemaID + log.Printf("Producer %d: Fetched schema ID %d for topic %s", p.id, schemaID, topic) + } + return nil +} + +// getSchemaID fetches the latest schema ID for a subject from Schema Registry +func (p *Producer) getSchemaID(subject string) (int, error) { + url := fmt.Sprintf("%s/subjects/%s/versions/latest", p.config.SchemaRegistry.URL, subject) + + resp, err := http.Get(url) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + body, _ := io.ReadAll(resp.Body) + return 0, fmt.Errorf("failed to get schema: status=%d, body=%s", resp.StatusCode, string(body)) + } + + var schemaResp struct { + ID int `json:"id"` + } + if err := json.NewDecoder(resp.Body).Decode(&schemaResp); err != nil { + return 0, err + } + + return schemaResp.ID, nil +} + +// ensureSchemasRegistered ensures that schemas are registered for all topics +// It registers schemas if they don't exist, but doesn't fail if they already do +func (p *Producer) ensureSchemasRegistered() error { + for _, topic := range p.topics { + subject := topic + "-value" + + // First check if schema already exists + schemaID, err := p.getSchemaID(subject) + if err == nil { + log.Printf("Producer %d: Schema already exists for topic %s (ID: %d), skipping registration", p.id, topic, schemaID) + continue + } + + // Schema doesn't exist, register it + log.Printf("Producer %d: Registering schema for topic %s", p.id, topic) + if err := p.registerTopicSchema(subject); err != nil { + return fmt.Errorf("failed to register schema for topic %s: %w", topic, err) + } + log.Printf("Producer %d: Schema registered successfully for topic %s", p.id, topic) + } + return nil +} + +// registerTopicSchema registers the schema for a specific topic based on configured format +func (p *Producer) registerTopicSchema(subject string) error { + // Extract topic name from subject (remove -value or -key suffix) + topicName := strings.TrimSuffix(strings.TrimSuffix(subject, "-value"), "-key") + + // Get schema format for this topic + schemaFormat, ok := p.schemaFormats[topicName] + if !ok { + // Fallback to config or default + schemaFormat = p.config.Producers.SchemaFormat + if schemaFormat == "" { + schemaFormat = "AVRO" + } + } + + var schemaStr string + var schemaType string + + switch strings.ToUpper(schemaFormat) { + case "AVRO": + schemaStr = schema.GetAvroSchema() + schemaType = "AVRO" + case "JSON", "JSON_SCHEMA": + schemaStr = schema.GetJSONSchema() + schemaType = "JSON" + case "PROTOBUF": + schemaStr = schema.GetProtobufSchema() + schemaType = "PROTOBUF" + default: + return fmt.Errorf("unsupported schema format: %s", schemaFormat) + } + + url := fmt.Sprintf("%s/subjects/%s/versions", p.config.SchemaRegistry.URL, subject) + + payload := map[string]interface{}{ + "schema": schemaStr, + "schemaType": schemaType, + } + + jsonPayload, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal schema payload: %w", err) + } + + resp, err := http.Post(url, "application/vnd.schemaregistry.v1+json", strings.NewReader(string(jsonPayload))) + if err != nil { + return fmt.Errorf("failed to register schema: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("schema registration failed: status=%d, body=%s", resp.StatusCode, string(body)) + } + + var registerResp struct { + ID int `json:"id"` + } + if err := json.NewDecoder(resp.Body).Decode(®isterResp); err != nil { + return fmt.Errorf("failed to decode registration response: %w", err) + } + + log.Printf("Schema registered with ID: %d (format: %s)", registerResp.ID, schemaType) + return nil +} + +// createConfluentWireFormat creates a message in Confluent Wire Format +// This matches the implementation in weed/mq/kafka/schema/envelope.go CreateConfluentEnvelope +func (p *Producer) createConfluentWireFormat(schemaID int, avroData []byte) []byte { + // Confluent Wire Format: [magic_byte(1)][schema_id(4)][payload(n)] + // magic_byte = 0x00 + // schema_id = 4 bytes big-endian + wireFormat := make([]byte, 5+len(avroData)) + wireFormat[0] = 0x00 // Magic byte + binary.BigEndian.PutUint32(wireFormat[1:5], uint32(schemaID)) + copy(wireFormat[5:], avroData) + return wireFormat +} + +// isCircuitBreakerError checks if an error indicates that the circuit breaker is open +func (p *Producer) isCircuitBreakerError(err error) bool { + return errors.Is(err, ErrCircuitBreakerOpen) +} diff --git a/test/kafka/kafka-client-loadtest/internal/schema/loadtest.proto b/test/kafka/kafka-client-loadtest/internal/schema/loadtest.proto new file mode 100644 index 000000000..dfe00b72f --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/schema/loadtest.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package com.seaweedfs.loadtest; + +option go_package = "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pb"; + +message LoadTestMessage { + string id = 1; + int64 timestamp = 2; + int32 producer_id = 3; + int64 counter = 4; + string user_id = 5; + string event_type = 6; + map<string, string> properties = 7; +} + diff --git a/test/kafka/kafka-client-loadtest/internal/schema/pb/loadtest.pb.go b/test/kafka/kafka-client-loadtest/internal/schema/pb/loadtest.pb.go new file mode 100644 index 000000000..3ed58aa9e --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/schema/pb/loadtest.pb.go @@ -0,0 +1,185 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.6 +// protoc v5.29.3 +// source: loadtest.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type LoadTestMessage struct { + state protoimpl.MessageState `protogen:"open.v1"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + ProducerId int32 `protobuf:"varint,3,opt,name=producer_id,json=producerId,proto3" json:"producer_id,omitempty"` + Counter int64 `protobuf:"varint,4,opt,name=counter,proto3" json:"counter,omitempty"` + UserId string `protobuf:"bytes,5,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + EventType string `protobuf:"bytes,6,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` + Properties map[string]string `protobuf:"bytes,7,rep,name=properties,proto3" json:"properties,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *LoadTestMessage) Reset() { + *x = LoadTestMessage{} + mi := &file_loadtest_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *LoadTestMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LoadTestMessage) ProtoMessage() {} + +func (x *LoadTestMessage) ProtoReflect() protoreflect.Message { + mi := &file_loadtest_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LoadTestMessage.ProtoReflect.Descriptor instead. +func (*LoadTestMessage) Descriptor() ([]byte, []int) { + return file_loadtest_proto_rawDescGZIP(), []int{0} +} + +func (x *LoadTestMessage) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *LoadTestMessage) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *LoadTestMessage) GetProducerId() int32 { + if x != nil { + return x.ProducerId + } + return 0 +} + +func (x *LoadTestMessage) GetCounter() int64 { + if x != nil { + return x.Counter + } + return 0 +} + +func (x *LoadTestMessage) GetUserId() string { + if x != nil { + return x.UserId + } + return "" +} + +func (x *LoadTestMessage) GetEventType() string { + if x != nil { + return x.EventType + } + return "" +} + +func (x *LoadTestMessage) GetProperties() map[string]string { + if x != nil { + return x.Properties + } + return nil +} + +var File_loadtest_proto protoreflect.FileDescriptor + +const file_loadtest_proto_rawDesc = "" + + "\n" + + "\x0eloadtest.proto\x12\x16com.seaweedfs.loadtest\"\xca\x02\n" + + "\x0fLoadTestMessage\x12\x0e\n" + + "\x02id\x18\x01 \x01(\tR\x02id\x12\x1c\n" + + "\ttimestamp\x18\x02 \x01(\x03R\ttimestamp\x12\x1f\n" + + "\vproducer_id\x18\x03 \x01(\x05R\n" + + "producerId\x12\x18\n" + + "\acounter\x18\x04 \x01(\x03R\acounter\x12\x17\n" + + "\auser_id\x18\x05 \x01(\tR\x06userId\x12\x1d\n" + + "\n" + + "event_type\x18\x06 \x01(\tR\teventType\x12W\n" + + "\n" + + "properties\x18\a \x03(\v27.com.seaweedfs.loadtest.LoadTestMessage.PropertiesEntryR\n" + + "properties\x1a=\n" + + "\x0fPropertiesEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01BTZRgithub.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema/pbb\x06proto3" + +var ( + file_loadtest_proto_rawDescOnce sync.Once + file_loadtest_proto_rawDescData []byte +) + +func file_loadtest_proto_rawDescGZIP() []byte { + file_loadtest_proto_rawDescOnce.Do(func() { + file_loadtest_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_loadtest_proto_rawDesc), len(file_loadtest_proto_rawDesc))) + }) + return file_loadtest_proto_rawDescData +} + +var file_loadtest_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_loadtest_proto_goTypes = []any{ + (*LoadTestMessage)(nil), // 0: com.seaweedfs.loadtest.LoadTestMessage + nil, // 1: com.seaweedfs.loadtest.LoadTestMessage.PropertiesEntry +} +var file_loadtest_proto_depIdxs = []int32{ + 1, // 0: com.seaweedfs.loadtest.LoadTestMessage.properties:type_name -> com.seaweedfs.loadtest.LoadTestMessage.PropertiesEntry + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_loadtest_proto_init() } +func file_loadtest_proto_init() { + if File_loadtest_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_loadtest_proto_rawDesc), len(file_loadtest_proto_rawDesc)), + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_loadtest_proto_goTypes, + DependencyIndexes: file_loadtest_proto_depIdxs, + MessageInfos: file_loadtest_proto_msgTypes, + }.Build() + File_loadtest_proto = out.File + file_loadtest_proto_goTypes = nil + file_loadtest_proto_depIdxs = nil +} diff --git a/test/kafka/kafka-client-loadtest/internal/schema/schemas.go b/test/kafka/kafka-client-loadtest/internal/schema/schemas.go new file mode 100644 index 000000000..011b28ef2 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/schema/schemas.go @@ -0,0 +1,58 @@ +package schema + +// GetAvroSchema returns the Avro schema for load test messages +func GetAvroSchema() string { + return `{ + "type": "record", + "name": "LoadTestMessage", + "namespace": "com.seaweedfs.loadtest", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "timestamp", "type": "long"}, + {"name": "producer_id", "type": "int"}, + {"name": "counter", "type": "long"}, + {"name": "user_id", "type": "string"}, + {"name": "event_type", "type": "string"}, + {"name": "properties", "type": {"type": "map", "values": "string"}} + ] + }` +} + +// GetJSONSchema returns the JSON Schema for load test messages +func GetJSONSchema() string { + return `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "LoadTestMessage", + "type": "object", + "properties": { + "id": {"type": "string"}, + "timestamp": {"type": "integer"}, + "producer_id": {"type": "integer"}, + "counter": {"type": "integer"}, + "user_id": {"type": "string"}, + "event_type": {"type": "string"}, + "properties": { + "type": "object", + "additionalProperties": {"type": "string"} + } + }, + "required": ["id", "timestamp", "producer_id", "counter", "user_id", "event_type"] + }` +} + +// GetProtobufSchema returns the Protobuf schema for load test messages +func GetProtobufSchema() string { + return `syntax = "proto3"; + +package com.seaweedfs.loadtest; + +message LoadTestMessage { + string id = 1; + int64 timestamp = 2; + int32 producer_id = 3; + int64 counter = 4; + string user_id = 5; + string event_type = 6; + map<string, string> properties = 7; +}` +} |
