diff options
Diffstat (limited to 'test/kafka/internal')
| -rw-r--r-- | test/kafka/internal/testutil/assertions.go | 150 | ||||
| -rw-r--r-- | test/kafka/internal/testutil/clients.go | 294 | ||||
| -rw-r--r-- | test/kafka/internal/testutil/docker.go | 68 | ||||
| -rw-r--r-- | test/kafka/internal/testutil/gateway.go | 220 | ||||
| -rw-r--r-- | test/kafka/internal/testutil/messages.go | 135 | ||||
| -rw-r--r-- | test/kafka/internal/testutil/schema_helper.go | 33 |
6 files changed, 900 insertions, 0 deletions
diff --git a/test/kafka/internal/testutil/assertions.go b/test/kafka/internal/testutil/assertions.go new file mode 100644 index 000000000..605c61f8e --- /dev/null +++ b/test/kafka/internal/testutil/assertions.go @@ -0,0 +1,150 @@ +package testutil + +import ( + "fmt" + "testing" + "time" +) + +// AssertEventually retries an assertion until it passes or times out +func AssertEventually(t *testing.T, assertion func() error, timeout time.Duration, interval time.Duration, msgAndArgs ...interface{}) { + t.Helper() + + deadline := time.Now().Add(timeout) + var lastErr error + + for time.Now().Before(deadline) { + if err := assertion(); err == nil { + return // Success + } else { + lastErr = err + } + time.Sleep(interval) + } + + // Format the failure message + var msg string + if len(msgAndArgs) > 0 { + if format, ok := msgAndArgs[0].(string); ok { + msg = fmt.Sprintf(format, msgAndArgs[1:]...) + } else { + msg = fmt.Sprint(msgAndArgs...) + } + } else { + msg = "assertion failed" + } + + t.Fatalf("%s after %v: %v", msg, timeout, lastErr) +} + +// AssertNoError fails the test if err is not nil +func AssertNoError(t *testing.T, err error, msgAndArgs ...interface{}) { + t.Helper() + if err != nil { + var msg string + if len(msgAndArgs) > 0 { + if format, ok := msgAndArgs[0].(string); ok { + msg = fmt.Sprintf(format, msgAndArgs[1:]...) + } else { + msg = fmt.Sprint(msgAndArgs...) + } + } else { + msg = "unexpected error" + } + t.Fatalf("%s: %v", msg, err) + } +} + +// AssertError fails the test if err is nil +func AssertError(t *testing.T, err error, msgAndArgs ...interface{}) { + t.Helper() + if err == nil { + var msg string + if len(msgAndArgs) > 0 { + if format, ok := msgAndArgs[0].(string); ok { + msg = fmt.Sprintf(format, msgAndArgs[1:]...) + } else { + msg = fmt.Sprint(msgAndArgs...) + } + } else { + msg = "expected error but got nil" + } + t.Fatal(msg) + } +} + +// AssertEqual fails the test if expected != actual +func AssertEqual(t *testing.T, expected, actual interface{}, msgAndArgs ...interface{}) { + t.Helper() + if expected != actual { + var msg string + if len(msgAndArgs) > 0 { + if format, ok := msgAndArgs[0].(string); ok { + msg = fmt.Sprintf(format, msgAndArgs[1:]...) + } else { + msg = fmt.Sprint(msgAndArgs...) + } + } else { + msg = "values not equal" + } + t.Fatalf("%s: expected %v, got %v", msg, expected, actual) + } +} + +// AssertNotEqual fails the test if expected == actual +func AssertNotEqual(t *testing.T, expected, actual interface{}, msgAndArgs ...interface{}) { + t.Helper() + if expected == actual { + var msg string + if len(msgAndArgs) > 0 { + if format, ok := msgAndArgs[0].(string); ok { + msg = fmt.Sprintf(format, msgAndArgs[1:]...) + } else { + msg = fmt.Sprint(msgAndArgs...) + } + } else { + msg = "values should not be equal" + } + t.Fatalf("%s: both values are %v", msg, expected) + } +} + +// AssertGreaterThan fails the test if actual <= expected +func AssertGreaterThan(t *testing.T, expected, actual int, msgAndArgs ...interface{}) { + t.Helper() + if actual <= expected { + var msg string + if len(msgAndArgs) > 0 { + if format, ok := msgAndArgs[0].(string); ok { + msg = fmt.Sprintf(format, msgAndArgs[1:]...) + } else { + msg = fmt.Sprint(msgAndArgs...) + } + } else { + msg = "value not greater than expected" + } + t.Fatalf("%s: expected > %d, got %d", msg, expected, actual) + } +} + +// AssertContains fails the test if slice doesn't contain item +func AssertContains(t *testing.T, slice []string, item string, msgAndArgs ...interface{}) { + t.Helper() + for _, s := range slice { + if s == item { + return // Found it + } + } + + var msg string + if len(msgAndArgs) > 0 { + if format, ok := msgAndArgs[0].(string); ok { + msg = fmt.Sprintf(format, msgAndArgs[1:]...) + } else { + msg = fmt.Sprint(msgAndArgs...) + } + } else { + msg = "item not found in slice" + } + t.Fatalf("%s: %q not found in %v", msg, item, slice) +} diff --git a/test/kafka/internal/testutil/clients.go b/test/kafka/internal/testutil/clients.go new file mode 100644 index 000000000..53cae52e0 --- /dev/null +++ b/test/kafka/internal/testutil/clients.go @@ -0,0 +1,294 @@ +package testutil + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/IBM/sarama" + "github.com/segmentio/kafka-go" +) + +// KafkaGoClient wraps kafka-go client with test utilities +type KafkaGoClient struct { + brokerAddr string + t *testing.T +} + +// SaramaClient wraps Sarama client with test utilities +type SaramaClient struct { + brokerAddr string + config *sarama.Config + t *testing.T +} + +// NewKafkaGoClient creates a new kafka-go test client +func NewKafkaGoClient(t *testing.T, brokerAddr string) *KafkaGoClient { + return &KafkaGoClient{ + brokerAddr: brokerAddr, + t: t, + } +} + +// NewSaramaClient creates a new Sarama test client with default config +func NewSaramaClient(t *testing.T, brokerAddr string) *SaramaClient { + config := sarama.NewConfig() + config.Version = sarama.V2_8_0_0 + config.Producer.Return.Successes = true + config.Consumer.Return.Errors = true + config.Consumer.Offsets.Initial = sarama.OffsetOldest // Start from earliest when no committed offset + + return &SaramaClient{ + brokerAddr: brokerAddr, + config: config, + t: t, + } +} + +// CreateTopic creates a topic using kafka-go +func (k *KafkaGoClient) CreateTopic(topicName string, partitions int, replicationFactor int) error { + k.t.Helper() + + conn, err := kafka.Dial("tcp", k.brokerAddr) + if err != nil { + return fmt.Errorf("dial broker: %w", err) + } + defer conn.Close() + + topicConfig := kafka.TopicConfig{ + Topic: topicName, + NumPartitions: partitions, + ReplicationFactor: replicationFactor, + } + + err = conn.CreateTopics(topicConfig) + if err != nil { + return fmt.Errorf("create topic: %w", err) + } + + k.t.Logf("Created topic %s with %d partitions", topicName, partitions) + return nil +} + +// ProduceMessages produces messages using kafka-go +func (k *KafkaGoClient) ProduceMessages(topicName string, messages []kafka.Message) error { + k.t.Helper() + + writer := &kafka.Writer{ + Addr: kafka.TCP(k.brokerAddr), + Topic: topicName, + Balancer: &kafka.LeastBytes{}, + BatchTimeout: 50 * time.Millisecond, + RequiredAcks: kafka.RequireOne, + } + defer writer.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + err := writer.WriteMessages(ctx, messages...) + if err != nil { + return fmt.Errorf("write messages: %w", err) + } + + k.t.Logf("Produced %d messages to topic %s", len(messages), topicName) + return nil +} + +// ConsumeMessages consumes messages using kafka-go +func (k *KafkaGoClient) ConsumeMessages(topicName string, expectedCount int) ([]kafka.Message, error) { + k.t.Helper() + + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{k.brokerAddr}, + Topic: topicName, + Partition: 0, // Explicitly set partition 0 for simple consumption + StartOffset: kafka.FirstOffset, + MinBytes: 1, + MaxBytes: 10e6, + }) + defer reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var messages []kafka.Message + for i := 0; i < expectedCount; i++ { + msg, err := reader.ReadMessage(ctx) + if err != nil { + return messages, fmt.Errorf("read message %d: %w", i, err) + } + messages = append(messages, msg) + } + + k.t.Logf("Consumed %d messages from topic %s", len(messages), topicName) + return messages, nil +} + +// ConsumeWithGroup consumes messages using consumer group +func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCount int) ([]kafka.Message, error) { + k.t.Helper() + + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{k.brokerAddr}, + Topic: topicName, + GroupID: groupID, + MinBytes: 1, + MaxBytes: 10e6, + CommitInterval: 500 * time.Millisecond, + }) + defer reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + var messages []kafka.Message + for i := 0; i < expectedCount; i++ { + // Fetch then explicitly commit to better control commit timing + msg, err := reader.FetchMessage(ctx) + if err != nil { + return messages, fmt.Errorf("read message %d: %w", i, err) + } + messages = append(messages, msg) + + // Commit with simple retry to handle transient connection churn + var commitErr error + for attempt := 0; attempt < 3; attempt++ { + commitErr = reader.CommitMessages(ctx, msg) + if commitErr == nil { + break + } + // brief backoff + time.Sleep(time.Duration(50*(1<<attempt)) * time.Millisecond) + } + if commitErr != nil { + return messages, fmt.Errorf("committing message %d: %w", i, commitErr) + } + } + + k.t.Logf("Consumed %d messages from topic %s with group %s", len(messages), topicName, groupID) + return messages, nil +} + +// CreateTopic creates a topic using Sarama +func (s *SaramaClient) CreateTopic(topicName string, partitions int32, replicationFactor int16) error { + s.t.Helper() + + admin, err := sarama.NewClusterAdmin([]string{s.brokerAddr}, s.config) + if err != nil { + return fmt.Errorf("create admin client: %w", err) + } + defer admin.Close() + + topicDetail := &sarama.TopicDetail{ + NumPartitions: partitions, + ReplicationFactor: replicationFactor, + } + + err = admin.CreateTopic(topicName, topicDetail, false) + if err != nil { + return fmt.Errorf("create topic: %w", err) + } + + s.t.Logf("Created topic %s with %d partitions", topicName, partitions) + return nil +} + +// ProduceMessages produces messages using Sarama +func (s *SaramaClient) ProduceMessages(topicName string, messages []string) error { + s.t.Helper() + + producer, err := sarama.NewSyncProducer([]string{s.brokerAddr}, s.config) + if err != nil { + return fmt.Errorf("create producer: %w", err) + } + defer producer.Close() + + for i, msgText := range messages { + msg := &sarama.ProducerMessage{ + Topic: topicName, + Key: sarama.StringEncoder(fmt.Sprintf("Test message %d", i)), + Value: sarama.StringEncoder(msgText), + } + + partition, offset, err := producer.SendMessage(msg) + if err != nil { + return fmt.Errorf("send message %d: %w", i, err) + } + + s.t.Logf("Produced message %d: partition=%d, offset=%d", i, partition, offset) + } + + return nil +} + +// ProduceMessageToPartition produces a single message to a specific partition using Sarama +func (s *SaramaClient) ProduceMessageToPartition(topicName string, partition int32, message string) error { + s.t.Helper() + + producer, err := sarama.NewSyncProducer([]string{s.brokerAddr}, s.config) + if err != nil { + return fmt.Errorf("create producer: %w", err) + } + defer producer.Close() + + msg := &sarama.ProducerMessage{ + Topic: topicName, + Partition: partition, + Key: sarama.StringEncoder(fmt.Sprintf("key-p%d", partition)), + Value: sarama.StringEncoder(message), + } + + actualPartition, offset, err := producer.SendMessage(msg) + if err != nil { + return fmt.Errorf("send message to partition %d: %w", partition, err) + } + + s.t.Logf("Produced message to partition %d: actualPartition=%d, offset=%d", partition, actualPartition, offset) + return nil +} + +// ConsumeMessages consumes messages using Sarama +func (s *SaramaClient) ConsumeMessages(topicName string, partition int32, expectedCount int) ([]string, error) { + s.t.Helper() + + consumer, err := sarama.NewConsumer([]string{s.brokerAddr}, s.config) + if err != nil { + return nil, fmt.Errorf("create consumer: %w", err) + } + defer consumer.Close() + + partitionConsumer, err := consumer.ConsumePartition(topicName, partition, sarama.OffsetOldest) + if err != nil { + return nil, fmt.Errorf("create partition consumer: %w", err) + } + defer partitionConsumer.Close() + + var messages []string + timeout := time.After(30 * time.Second) + + for len(messages) < expectedCount { + select { + case msg := <-partitionConsumer.Messages(): + messages = append(messages, string(msg.Value)) + case err := <-partitionConsumer.Errors(): + return messages, fmt.Errorf("consumer error: %w", err) + case <-timeout: + return messages, fmt.Errorf("timeout waiting for messages, got %d/%d", len(messages), expectedCount) + } + } + + s.t.Logf("Consumed %d messages from topic %s", len(messages), topicName) + return messages, nil +} + +// GetConfig returns the Sarama configuration +func (s *SaramaClient) GetConfig() *sarama.Config { + return s.config +} + +// SetConfig sets a custom Sarama configuration +func (s *SaramaClient) SetConfig(config *sarama.Config) { + s.config = config +} diff --git a/test/kafka/internal/testutil/docker.go b/test/kafka/internal/testutil/docker.go new file mode 100644 index 000000000..e839fe28c --- /dev/null +++ b/test/kafka/internal/testutil/docker.go @@ -0,0 +1,68 @@ +package testutil + +import ( + "os" + "testing" +) + +// DockerEnvironment provides utilities for Docker-based integration tests +type DockerEnvironment struct { + KafkaBootstrap string + KafkaGateway string + SchemaRegistry string + Available bool +} + +// NewDockerEnvironment creates a new Docker environment helper +func NewDockerEnvironment(t *testing.T) *DockerEnvironment { + t.Helper() + + env := &DockerEnvironment{ + KafkaBootstrap: os.Getenv("KAFKA_BOOTSTRAP_SERVERS"), + KafkaGateway: os.Getenv("KAFKA_GATEWAY_URL"), + SchemaRegistry: os.Getenv("SCHEMA_REGISTRY_URL"), + } + + env.Available = env.KafkaBootstrap != "" + + if env.Available { + t.Logf("Docker environment detected:") + t.Logf(" Kafka Bootstrap: %s", env.KafkaBootstrap) + t.Logf(" Kafka Gateway: %s", env.KafkaGateway) + t.Logf(" Schema Registry: %s", env.SchemaRegistry) + } + + return env +} + +// SkipIfNotAvailable skips the test if Docker environment is not available +func (d *DockerEnvironment) SkipIfNotAvailable(t *testing.T) { + t.Helper() + if !d.Available { + t.Skip("Skipping Docker integration test - set KAFKA_BOOTSTRAP_SERVERS to run") + } +} + +// RequireKafka ensures Kafka is available or skips the test +func (d *DockerEnvironment) RequireKafka(t *testing.T) { + t.Helper() + if d.KafkaBootstrap == "" { + t.Skip("Kafka bootstrap servers not available") + } +} + +// RequireGateway ensures Kafka Gateway is available or skips the test +func (d *DockerEnvironment) RequireGateway(t *testing.T) { + t.Helper() + if d.KafkaGateway == "" { + t.Skip("Kafka Gateway not available") + } +} + +// RequireSchemaRegistry ensures Schema Registry is available or skips the test +func (d *DockerEnvironment) RequireSchemaRegistry(t *testing.T) { + t.Helper() + if d.SchemaRegistry == "" { + t.Skip("Schema Registry not available") + } +} diff --git a/test/kafka/internal/testutil/gateway.go b/test/kafka/internal/testutil/gateway.go new file mode 100644 index 000000000..8021abcb6 --- /dev/null +++ b/test/kafka/internal/testutil/gateway.go @@ -0,0 +1,220 @@ +package testutil + +import ( + "context" + "fmt" + "net" + "os" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway" + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema" +) + +// GatewayTestServer wraps the gateway server with common test utilities +type GatewayTestServer struct { + *gateway.Server + t *testing.T +} + +// GatewayOptions contains configuration for test gateway +type GatewayOptions struct { + Listen string + Masters string + UseProduction bool + // Add more options as needed +} + +// NewGatewayTestServer creates a new test gateway server with common setup +func NewGatewayTestServer(t *testing.T, opts GatewayOptions) *GatewayTestServer { + if opts.Listen == "" { + opts.Listen = "127.0.0.1:0" // Use random port by default + } + + // Allow switching to production gateway if requested (requires masters) + var srv *gateway.Server + if opts.UseProduction { + if opts.Masters == "" { + // Fallback to env variable for convenience in CI + if v := os.Getenv("SEAWEEDFS_MASTERS"); v != "" { + opts.Masters = v + } else { + opts.Masters = "localhost:9333" + } + } + srv = gateway.NewServer(gateway.Options{ + Listen: opts.Listen, + Masters: opts.Masters, + }) + } else { + // For unit testing without real SeaweedMQ masters + srv = gateway.NewTestServerForUnitTests(gateway.Options{ + Listen: opts.Listen, + }) + } + + return &GatewayTestServer{ + Server: srv, + t: t, + } +} + +// StartAndWait starts the gateway and waits for it to be ready +func (g *GatewayTestServer) StartAndWait() string { + g.t.Helper() + + // Start server in goroutine + go func() { + // Enable schema mode automatically when SCHEMA_REGISTRY_URL is set + if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" { + h := g.GetHandler() + if h != nil { + _ = h.EnableSchemaManagement(schema.ManagerConfig{RegistryURL: url}) + } + } + if err := g.Start(); err != nil { + g.t.Errorf("Failed to start gateway: %v", err) + } + }() + + // Wait for server to be ready + time.Sleep(100 * time.Millisecond) + + host, port := g.GetListenerAddr() + addr := fmt.Sprintf("%s:%d", host, port) + g.t.Logf("Gateway running on %s", addr) + + return addr +} + +// AddTestTopic adds a topic for testing with default configuration +func (g *GatewayTestServer) AddTestTopic(name string) { + g.t.Helper() + g.GetHandler().AddTopicForTesting(name, 1) + g.t.Logf("Added test topic: %s", name) +} + +// AddTestTopics adds multiple topics for testing +func (g *GatewayTestServer) AddTestTopics(names ...string) { + g.t.Helper() + for _, name := range names { + g.AddTestTopic(name) + } +} + +// CleanupAndClose properly closes the gateway server +func (g *GatewayTestServer) CleanupAndClose() { + g.t.Helper() + if err := g.Close(); err != nil { + g.t.Errorf("Failed to close gateway: %v", err) + } +} + +// SMQAvailabilityMode indicates whether SeaweedMQ is available for testing +type SMQAvailabilityMode int + +const ( + SMQUnavailable SMQAvailabilityMode = iota // Use mock handler only + SMQAvailable // SMQ is available, can use production mode + SMQRequired // SMQ is required, skip test if unavailable +) + +// CheckSMQAvailability checks if SeaweedFS masters are available for testing +func CheckSMQAvailability() (bool, string) { + masters := os.Getenv("SEAWEEDFS_MASTERS") + if masters == "" { + return false, "" + } + + // Test if at least one master is reachable + if masters != "" { + // Try to connect to the first master to verify availability + conn, err := net.DialTimeout("tcp", masters, 2*time.Second) + if err != nil { + return false, masters // Masters specified but unreachable + } + conn.Close() + return true, masters + } + + return false, "" +} + +// NewGatewayTestServerWithSMQ creates a gateway server that automatically uses SMQ if available +func NewGatewayTestServerWithSMQ(t *testing.T, mode SMQAvailabilityMode) *GatewayTestServer { + smqAvailable, masters := CheckSMQAvailability() + + switch mode { + case SMQRequired: + if !smqAvailable { + if masters != "" { + t.Skipf("Skipping test: SEAWEEDFS_MASTERS=%s specified but unreachable", masters) + } else { + t.Skip("Skipping test: SEAWEEDFS_MASTERS required but not set") + } + } + t.Logf("Using SMQ-backed gateway with masters: %s", masters) + return newGatewayTestServerWithTimeout(t, GatewayOptions{ + UseProduction: true, + Masters: masters, + }, 120*time.Second) + + case SMQAvailable: + if smqAvailable { + t.Logf("SMQ available, using production gateway with masters: %s", masters) + return newGatewayTestServerWithTimeout(t, GatewayOptions{ + UseProduction: true, + Masters: masters, + }, 120*time.Second) + } else { + t.Logf("SMQ not available, using mock gateway") + return NewGatewayTestServer(t, GatewayOptions{}) + } + + default: // SMQUnavailable + t.Logf("Using mock gateway (SMQ integration disabled)") + return NewGatewayTestServer(t, GatewayOptions{}) + } +} + +// newGatewayTestServerWithTimeout creates a gateway server with a timeout to prevent hanging +func newGatewayTestServerWithTimeout(t *testing.T, opts GatewayOptions, timeout time.Duration) *GatewayTestServer { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + done := make(chan *GatewayTestServer, 1) + errChan := make(chan error, 1) + + go func() { + defer func() { + if r := recover(); r != nil { + errChan <- fmt.Errorf("panic creating gateway: %v", r) + } + }() + + // Create the gateway in a goroutine so we can timeout if it hangs + t.Logf("Creating gateway with masters: %s (with %v timeout)", opts.Masters, timeout) + gateway := NewGatewayTestServer(t, opts) + t.Logf("Gateway created successfully") + done <- gateway + }() + + select { + case gateway := <-done: + return gateway + case err := <-errChan: + t.Fatalf("Error creating gateway: %v", err) + case <-ctx.Done(): + t.Fatalf("Timeout creating gateway after %v - likely SMQ broker discovery failed. Check if MQ brokers are running and accessible.", timeout) + } + + return nil // This should never be reached +} + +// IsSMQMode returns true if the gateway is using real SMQ backend +// This is determined by checking if we have the SEAWEEDFS_MASTERS environment variable +func (g *GatewayTestServer) IsSMQMode() bool { + available, _ := CheckSMQAvailability() + return available +} diff --git a/test/kafka/internal/testutil/messages.go b/test/kafka/internal/testutil/messages.go new file mode 100644 index 000000000..803dc8e0d --- /dev/null +++ b/test/kafka/internal/testutil/messages.go @@ -0,0 +1,135 @@ +package testutil + +import ( + "fmt" + "os" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema" + "github.com/segmentio/kafka-go" +) + +// MessageGenerator provides utilities for generating test messages +type MessageGenerator struct { + counter int +} + +// NewMessageGenerator creates a new message generator +func NewMessageGenerator() *MessageGenerator { + return &MessageGenerator{counter: 0} +} + +// GenerateKafkaGoMessages generates kafka-go messages for testing +func (m *MessageGenerator) GenerateKafkaGoMessages(count int) []kafka.Message { + messages := make([]kafka.Message, count) + + for i := 0; i < count; i++ { + m.counter++ + key := []byte(fmt.Sprintf("test-key-%d", m.counter)) + val := []byte(fmt.Sprintf("{\"value\":\"test-message-%d-generated-at-%d\"}", m.counter, time.Now().Unix())) + + // If schema mode is requested, ensure a test schema exists and wrap with Confluent envelope + if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" { + subject := "offset-management-value" + schemaJSON := `{"type":"record","name":"TestRecord","fields":[{"name":"value","type":"string"}]}` + rc := schema.NewRegistryClient(schema.RegistryConfig{URL: url}) + if _, err := rc.GetLatestSchema(subject); err != nil { + // Best-effort register schema + _, _ = rc.RegisterSchema(subject, schemaJSON) + } + if latest, err := rc.GetLatestSchema(subject); err == nil { + val = schema.CreateConfluentEnvelope(schema.FormatAvro, latest.LatestID, nil, val) + } else { + // fallback to schema id 1 + val = schema.CreateConfluentEnvelope(schema.FormatAvro, 1, nil, val) + } + } + + messages[i] = kafka.Message{Key: key, Value: val} + } + + return messages +} + +// GenerateStringMessages generates string messages for Sarama +func (m *MessageGenerator) GenerateStringMessages(count int) []string { + messages := make([]string, count) + + for i := 0; i < count; i++ { + m.counter++ + messages[i] = fmt.Sprintf("test-message-%d-generated-at-%d", m.counter, time.Now().Unix()) + } + + return messages +} + +// GenerateKafkaGoMessage generates a single kafka-go message +func (m *MessageGenerator) GenerateKafkaGoMessage(key, value string) kafka.Message { + if key == "" { + m.counter++ + key = fmt.Sprintf("test-key-%d", m.counter) + } + if value == "" { + value = fmt.Sprintf("test-message-%d-generated-at-%d", m.counter, time.Now().Unix()) + } + + return kafka.Message{ + Key: []byte(key), + Value: []byte(value), + } +} + +// GenerateUniqueTopicName generates a unique topic name for testing +func GenerateUniqueTopicName(prefix string) string { + if prefix == "" { + prefix = "test-topic" + } + return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano()) +} + +// GenerateUniqueGroupID generates a unique consumer group ID for testing +func GenerateUniqueGroupID(prefix string) string { + if prefix == "" { + prefix = "test-group" + } + return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano()) +} + +// ValidateMessageContent validates that consumed messages match expected content +func ValidateMessageContent(expected, actual []string) error { + if len(expected) != len(actual) { + return fmt.Errorf("message count mismatch: expected %d, got %d", len(expected), len(actual)) + } + + for i, expectedMsg := range expected { + if i >= len(actual) { + return fmt.Errorf("missing message at index %d", i) + } + if actual[i] != expectedMsg { + return fmt.Errorf("message mismatch at index %d: expected %q, got %q", i, expectedMsg, actual[i]) + } + } + + return nil +} + +// ValidateKafkaGoMessageContent validates kafka-go messages +func ValidateKafkaGoMessageContent(expected, actual []kafka.Message) error { + if len(expected) != len(actual) { + return fmt.Errorf("message count mismatch: expected %d, got %d", len(expected), len(actual)) + } + + for i, expectedMsg := range expected { + if i >= len(actual) { + return fmt.Errorf("missing message at index %d", i) + } + if string(actual[i].Key) != string(expectedMsg.Key) { + return fmt.Errorf("key mismatch at index %d: expected %q, got %q", i, string(expectedMsg.Key), string(actual[i].Key)) + } + if string(actual[i].Value) != string(expectedMsg.Value) { + return fmt.Errorf("value mismatch at index %d: expected %q, got %q", i, string(expectedMsg.Value), string(actual[i].Value)) + } + } + + return nil +} diff --git a/test/kafka/internal/testutil/schema_helper.go b/test/kafka/internal/testutil/schema_helper.go new file mode 100644 index 000000000..868cc286b --- /dev/null +++ b/test/kafka/internal/testutil/schema_helper.go @@ -0,0 +1,33 @@ +package testutil + +import ( + "testing" + + kschema "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema" +) + +// EnsureValueSchema registers a minimal Avro value schema for the given topic if not present. +// Returns the latest schema ID if successful. +func EnsureValueSchema(t *testing.T, registryURL, topic string) (uint32, error) { + t.Helper() + subject := topic + "-value" + rc := kschema.NewRegistryClient(kschema.RegistryConfig{URL: registryURL}) + + // Minimal Avro record schema with string field "value" + schemaJSON := `{"type":"record","name":"TestRecord","fields":[{"name":"value","type":"string"}]}` + + // Try to get existing + if latest, err := rc.GetLatestSchema(subject); err == nil { + return latest.LatestID, nil + } + + // Register and fetch latest + if _, err := rc.RegisterSchema(subject, schemaJSON); err != nil { + return 0, err + } + latest, err := rc.GetLatestSchema(subject) + if err != nil { + return 0, err + } + return latest.LatestID, nil +} |
