aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/internal
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/internal')
-rw-r--r--test/kafka/internal/testutil/assertions.go150
-rw-r--r--test/kafka/internal/testutil/clients.go294
-rw-r--r--test/kafka/internal/testutil/docker.go68
-rw-r--r--test/kafka/internal/testutil/gateway.go220
-rw-r--r--test/kafka/internal/testutil/messages.go135
-rw-r--r--test/kafka/internal/testutil/schema_helper.go33
6 files changed, 900 insertions, 0 deletions
diff --git a/test/kafka/internal/testutil/assertions.go b/test/kafka/internal/testutil/assertions.go
new file mode 100644
index 000000000..605c61f8e
--- /dev/null
+++ b/test/kafka/internal/testutil/assertions.go
@@ -0,0 +1,150 @@
+package testutil
+
+import (
+ "fmt"
+ "testing"
+ "time"
+)
+
+// AssertEventually retries an assertion until it passes or times out
+func AssertEventually(t *testing.T, assertion func() error, timeout time.Duration, interval time.Duration, msgAndArgs ...interface{}) {
+ t.Helper()
+
+ deadline := time.Now().Add(timeout)
+ var lastErr error
+
+ for time.Now().Before(deadline) {
+ if err := assertion(); err == nil {
+ return // Success
+ } else {
+ lastErr = err
+ }
+ time.Sleep(interval)
+ }
+
+ // Format the failure message
+ var msg string
+ if len(msgAndArgs) > 0 {
+ if format, ok := msgAndArgs[0].(string); ok {
+ msg = fmt.Sprintf(format, msgAndArgs[1:]...)
+ } else {
+ msg = fmt.Sprint(msgAndArgs...)
+ }
+ } else {
+ msg = "assertion failed"
+ }
+
+ t.Fatalf("%s after %v: %v", msg, timeout, lastErr)
+}
+
+// AssertNoError fails the test if err is not nil
+func AssertNoError(t *testing.T, err error, msgAndArgs ...interface{}) {
+ t.Helper()
+ if err != nil {
+ var msg string
+ if len(msgAndArgs) > 0 {
+ if format, ok := msgAndArgs[0].(string); ok {
+ msg = fmt.Sprintf(format, msgAndArgs[1:]...)
+ } else {
+ msg = fmt.Sprint(msgAndArgs...)
+ }
+ } else {
+ msg = "unexpected error"
+ }
+ t.Fatalf("%s: %v", msg, err)
+ }
+}
+
+// AssertError fails the test if err is nil
+func AssertError(t *testing.T, err error, msgAndArgs ...interface{}) {
+ t.Helper()
+ if err == nil {
+ var msg string
+ if len(msgAndArgs) > 0 {
+ if format, ok := msgAndArgs[0].(string); ok {
+ msg = fmt.Sprintf(format, msgAndArgs[1:]...)
+ } else {
+ msg = fmt.Sprint(msgAndArgs...)
+ }
+ } else {
+ msg = "expected error but got nil"
+ }
+ t.Fatal(msg)
+ }
+}
+
+// AssertEqual fails the test if expected != actual
+func AssertEqual(t *testing.T, expected, actual interface{}, msgAndArgs ...interface{}) {
+ t.Helper()
+ if expected != actual {
+ var msg string
+ if len(msgAndArgs) > 0 {
+ if format, ok := msgAndArgs[0].(string); ok {
+ msg = fmt.Sprintf(format, msgAndArgs[1:]...)
+ } else {
+ msg = fmt.Sprint(msgAndArgs...)
+ }
+ } else {
+ msg = "values not equal"
+ }
+ t.Fatalf("%s: expected %v, got %v", msg, expected, actual)
+ }
+}
+
+// AssertNotEqual fails the test if expected == actual
+func AssertNotEqual(t *testing.T, expected, actual interface{}, msgAndArgs ...interface{}) {
+ t.Helper()
+ if expected == actual {
+ var msg string
+ if len(msgAndArgs) > 0 {
+ if format, ok := msgAndArgs[0].(string); ok {
+ msg = fmt.Sprintf(format, msgAndArgs[1:]...)
+ } else {
+ msg = fmt.Sprint(msgAndArgs...)
+ }
+ } else {
+ msg = "values should not be equal"
+ }
+ t.Fatalf("%s: both values are %v", msg, expected)
+ }
+}
+
+// AssertGreaterThan fails the test if actual <= expected
+func AssertGreaterThan(t *testing.T, expected, actual int, msgAndArgs ...interface{}) {
+ t.Helper()
+ if actual <= expected {
+ var msg string
+ if len(msgAndArgs) > 0 {
+ if format, ok := msgAndArgs[0].(string); ok {
+ msg = fmt.Sprintf(format, msgAndArgs[1:]...)
+ } else {
+ msg = fmt.Sprint(msgAndArgs...)
+ }
+ } else {
+ msg = "value not greater than expected"
+ }
+ t.Fatalf("%s: expected > %d, got %d", msg, expected, actual)
+ }
+}
+
+// AssertContains fails the test if slice doesn't contain item
+func AssertContains(t *testing.T, slice []string, item string, msgAndArgs ...interface{}) {
+ t.Helper()
+ for _, s := range slice {
+ if s == item {
+ return // Found it
+ }
+ }
+
+ var msg string
+ if len(msgAndArgs) > 0 {
+ if format, ok := msgAndArgs[0].(string); ok {
+ msg = fmt.Sprintf(format, msgAndArgs[1:]...)
+ } else {
+ msg = fmt.Sprint(msgAndArgs...)
+ }
+ } else {
+ msg = "item not found in slice"
+ }
+ t.Fatalf("%s: %q not found in %v", msg, item, slice)
+}
diff --git a/test/kafka/internal/testutil/clients.go b/test/kafka/internal/testutil/clients.go
new file mode 100644
index 000000000..53cae52e0
--- /dev/null
+++ b/test/kafka/internal/testutil/clients.go
@@ -0,0 +1,294 @@
+package testutil
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/IBM/sarama"
+ "github.com/segmentio/kafka-go"
+)
+
+// KafkaGoClient wraps kafka-go client with test utilities
+type KafkaGoClient struct {
+ brokerAddr string
+ t *testing.T
+}
+
+// SaramaClient wraps Sarama client with test utilities
+type SaramaClient struct {
+ brokerAddr string
+ config *sarama.Config
+ t *testing.T
+}
+
+// NewKafkaGoClient creates a new kafka-go test client
+func NewKafkaGoClient(t *testing.T, brokerAddr string) *KafkaGoClient {
+ return &KafkaGoClient{
+ brokerAddr: brokerAddr,
+ t: t,
+ }
+}
+
+// NewSaramaClient creates a new Sarama test client with default config
+func NewSaramaClient(t *testing.T, brokerAddr string) *SaramaClient {
+ config := sarama.NewConfig()
+ config.Version = sarama.V2_8_0_0
+ config.Producer.Return.Successes = true
+ config.Consumer.Return.Errors = true
+ config.Consumer.Offsets.Initial = sarama.OffsetOldest // Start from earliest when no committed offset
+
+ return &SaramaClient{
+ brokerAddr: brokerAddr,
+ config: config,
+ t: t,
+ }
+}
+
+// CreateTopic creates a topic using kafka-go
+func (k *KafkaGoClient) CreateTopic(topicName string, partitions int, replicationFactor int) error {
+ k.t.Helper()
+
+ conn, err := kafka.Dial("tcp", k.brokerAddr)
+ if err != nil {
+ return fmt.Errorf("dial broker: %w", err)
+ }
+ defer conn.Close()
+
+ topicConfig := kafka.TopicConfig{
+ Topic: topicName,
+ NumPartitions: partitions,
+ ReplicationFactor: replicationFactor,
+ }
+
+ err = conn.CreateTopics(topicConfig)
+ if err != nil {
+ return fmt.Errorf("create topic: %w", err)
+ }
+
+ k.t.Logf("Created topic %s with %d partitions", topicName, partitions)
+ return nil
+}
+
+// ProduceMessages produces messages using kafka-go
+func (k *KafkaGoClient) ProduceMessages(topicName string, messages []kafka.Message) error {
+ k.t.Helper()
+
+ writer := &kafka.Writer{
+ Addr: kafka.TCP(k.brokerAddr),
+ Topic: topicName,
+ Balancer: &kafka.LeastBytes{},
+ BatchTimeout: 50 * time.Millisecond,
+ RequiredAcks: kafka.RequireOne,
+ }
+ defer writer.Close()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ err := writer.WriteMessages(ctx, messages...)
+ if err != nil {
+ return fmt.Errorf("write messages: %w", err)
+ }
+
+ k.t.Logf("Produced %d messages to topic %s", len(messages), topicName)
+ return nil
+}
+
+// ConsumeMessages consumes messages using kafka-go
+func (k *KafkaGoClient) ConsumeMessages(topicName string, expectedCount int) ([]kafka.Message, error) {
+ k.t.Helper()
+
+ reader := kafka.NewReader(kafka.ReaderConfig{
+ Brokers: []string{k.brokerAddr},
+ Topic: topicName,
+ Partition: 0, // Explicitly set partition 0 for simple consumption
+ StartOffset: kafka.FirstOffset,
+ MinBytes: 1,
+ MaxBytes: 10e6,
+ })
+ defer reader.Close()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ var messages []kafka.Message
+ for i := 0; i < expectedCount; i++ {
+ msg, err := reader.ReadMessage(ctx)
+ if err != nil {
+ return messages, fmt.Errorf("read message %d: %w", i, err)
+ }
+ messages = append(messages, msg)
+ }
+
+ k.t.Logf("Consumed %d messages from topic %s", len(messages), topicName)
+ return messages, nil
+}
+
+// ConsumeWithGroup consumes messages using consumer group
+func (k *KafkaGoClient) ConsumeWithGroup(topicName, groupID string, expectedCount int) ([]kafka.Message, error) {
+ k.t.Helper()
+
+ reader := kafka.NewReader(kafka.ReaderConfig{
+ Brokers: []string{k.brokerAddr},
+ Topic: topicName,
+ GroupID: groupID,
+ MinBytes: 1,
+ MaxBytes: 10e6,
+ CommitInterval: 500 * time.Millisecond,
+ })
+ defer reader.Close()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ var messages []kafka.Message
+ for i := 0; i < expectedCount; i++ {
+ // Fetch then explicitly commit to better control commit timing
+ msg, err := reader.FetchMessage(ctx)
+ if err != nil {
+ return messages, fmt.Errorf("read message %d: %w", i, err)
+ }
+ messages = append(messages, msg)
+
+ // Commit with simple retry to handle transient connection churn
+ var commitErr error
+ for attempt := 0; attempt < 3; attempt++ {
+ commitErr = reader.CommitMessages(ctx, msg)
+ if commitErr == nil {
+ break
+ }
+ // brief backoff
+ time.Sleep(time.Duration(50*(1<<attempt)) * time.Millisecond)
+ }
+ if commitErr != nil {
+ return messages, fmt.Errorf("committing message %d: %w", i, commitErr)
+ }
+ }
+
+ k.t.Logf("Consumed %d messages from topic %s with group %s", len(messages), topicName, groupID)
+ return messages, nil
+}
+
+// CreateTopic creates a topic using Sarama
+func (s *SaramaClient) CreateTopic(topicName string, partitions int32, replicationFactor int16) error {
+ s.t.Helper()
+
+ admin, err := sarama.NewClusterAdmin([]string{s.brokerAddr}, s.config)
+ if err != nil {
+ return fmt.Errorf("create admin client: %w", err)
+ }
+ defer admin.Close()
+
+ topicDetail := &sarama.TopicDetail{
+ NumPartitions: partitions,
+ ReplicationFactor: replicationFactor,
+ }
+
+ err = admin.CreateTopic(topicName, topicDetail, false)
+ if err != nil {
+ return fmt.Errorf("create topic: %w", err)
+ }
+
+ s.t.Logf("Created topic %s with %d partitions", topicName, partitions)
+ return nil
+}
+
+// ProduceMessages produces messages using Sarama
+func (s *SaramaClient) ProduceMessages(topicName string, messages []string) error {
+ s.t.Helper()
+
+ producer, err := sarama.NewSyncProducer([]string{s.brokerAddr}, s.config)
+ if err != nil {
+ return fmt.Errorf("create producer: %w", err)
+ }
+ defer producer.Close()
+
+ for i, msgText := range messages {
+ msg := &sarama.ProducerMessage{
+ Topic: topicName,
+ Key: sarama.StringEncoder(fmt.Sprintf("Test message %d", i)),
+ Value: sarama.StringEncoder(msgText),
+ }
+
+ partition, offset, err := producer.SendMessage(msg)
+ if err != nil {
+ return fmt.Errorf("send message %d: %w", i, err)
+ }
+
+ s.t.Logf("Produced message %d: partition=%d, offset=%d", i, partition, offset)
+ }
+
+ return nil
+}
+
+// ProduceMessageToPartition produces a single message to a specific partition using Sarama
+func (s *SaramaClient) ProduceMessageToPartition(topicName string, partition int32, message string) error {
+ s.t.Helper()
+
+ producer, err := sarama.NewSyncProducer([]string{s.brokerAddr}, s.config)
+ if err != nil {
+ return fmt.Errorf("create producer: %w", err)
+ }
+ defer producer.Close()
+
+ msg := &sarama.ProducerMessage{
+ Topic: topicName,
+ Partition: partition,
+ Key: sarama.StringEncoder(fmt.Sprintf("key-p%d", partition)),
+ Value: sarama.StringEncoder(message),
+ }
+
+ actualPartition, offset, err := producer.SendMessage(msg)
+ if err != nil {
+ return fmt.Errorf("send message to partition %d: %w", partition, err)
+ }
+
+ s.t.Logf("Produced message to partition %d: actualPartition=%d, offset=%d", partition, actualPartition, offset)
+ return nil
+}
+
+// ConsumeMessages consumes messages using Sarama
+func (s *SaramaClient) ConsumeMessages(topicName string, partition int32, expectedCount int) ([]string, error) {
+ s.t.Helper()
+
+ consumer, err := sarama.NewConsumer([]string{s.brokerAddr}, s.config)
+ if err != nil {
+ return nil, fmt.Errorf("create consumer: %w", err)
+ }
+ defer consumer.Close()
+
+ partitionConsumer, err := consumer.ConsumePartition(topicName, partition, sarama.OffsetOldest)
+ if err != nil {
+ return nil, fmt.Errorf("create partition consumer: %w", err)
+ }
+ defer partitionConsumer.Close()
+
+ var messages []string
+ timeout := time.After(30 * time.Second)
+
+ for len(messages) < expectedCount {
+ select {
+ case msg := <-partitionConsumer.Messages():
+ messages = append(messages, string(msg.Value))
+ case err := <-partitionConsumer.Errors():
+ return messages, fmt.Errorf("consumer error: %w", err)
+ case <-timeout:
+ return messages, fmt.Errorf("timeout waiting for messages, got %d/%d", len(messages), expectedCount)
+ }
+ }
+
+ s.t.Logf("Consumed %d messages from topic %s", len(messages), topicName)
+ return messages, nil
+}
+
+// GetConfig returns the Sarama configuration
+func (s *SaramaClient) GetConfig() *sarama.Config {
+ return s.config
+}
+
+// SetConfig sets a custom Sarama configuration
+func (s *SaramaClient) SetConfig(config *sarama.Config) {
+ s.config = config
+}
diff --git a/test/kafka/internal/testutil/docker.go b/test/kafka/internal/testutil/docker.go
new file mode 100644
index 000000000..e839fe28c
--- /dev/null
+++ b/test/kafka/internal/testutil/docker.go
@@ -0,0 +1,68 @@
+package testutil
+
+import (
+ "os"
+ "testing"
+)
+
+// DockerEnvironment provides utilities for Docker-based integration tests
+type DockerEnvironment struct {
+ KafkaBootstrap string
+ KafkaGateway string
+ SchemaRegistry string
+ Available bool
+}
+
+// NewDockerEnvironment creates a new Docker environment helper
+func NewDockerEnvironment(t *testing.T) *DockerEnvironment {
+ t.Helper()
+
+ env := &DockerEnvironment{
+ KafkaBootstrap: os.Getenv("KAFKA_BOOTSTRAP_SERVERS"),
+ KafkaGateway: os.Getenv("KAFKA_GATEWAY_URL"),
+ SchemaRegistry: os.Getenv("SCHEMA_REGISTRY_URL"),
+ }
+
+ env.Available = env.KafkaBootstrap != ""
+
+ if env.Available {
+ t.Logf("Docker environment detected:")
+ t.Logf(" Kafka Bootstrap: %s", env.KafkaBootstrap)
+ t.Logf(" Kafka Gateway: %s", env.KafkaGateway)
+ t.Logf(" Schema Registry: %s", env.SchemaRegistry)
+ }
+
+ return env
+}
+
+// SkipIfNotAvailable skips the test if Docker environment is not available
+func (d *DockerEnvironment) SkipIfNotAvailable(t *testing.T) {
+ t.Helper()
+ if !d.Available {
+ t.Skip("Skipping Docker integration test - set KAFKA_BOOTSTRAP_SERVERS to run")
+ }
+}
+
+// RequireKafka ensures Kafka is available or skips the test
+func (d *DockerEnvironment) RequireKafka(t *testing.T) {
+ t.Helper()
+ if d.KafkaBootstrap == "" {
+ t.Skip("Kafka bootstrap servers not available")
+ }
+}
+
+// RequireGateway ensures Kafka Gateway is available or skips the test
+func (d *DockerEnvironment) RequireGateway(t *testing.T) {
+ t.Helper()
+ if d.KafkaGateway == "" {
+ t.Skip("Kafka Gateway not available")
+ }
+}
+
+// RequireSchemaRegistry ensures Schema Registry is available or skips the test
+func (d *DockerEnvironment) RequireSchemaRegistry(t *testing.T) {
+ t.Helper()
+ if d.SchemaRegistry == "" {
+ t.Skip("Schema Registry not available")
+ }
+}
diff --git a/test/kafka/internal/testutil/gateway.go b/test/kafka/internal/testutil/gateway.go
new file mode 100644
index 000000000..8021abcb6
--- /dev/null
+++ b/test/kafka/internal/testutil/gateway.go
@@ -0,0 +1,220 @@
+package testutil
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/mq/kafka/gateway"
+ "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
+)
+
+// GatewayTestServer wraps the gateway server with common test utilities
+type GatewayTestServer struct {
+ *gateway.Server
+ t *testing.T
+}
+
+// GatewayOptions contains configuration for test gateway
+type GatewayOptions struct {
+ Listen string
+ Masters string
+ UseProduction bool
+ // Add more options as needed
+}
+
+// NewGatewayTestServer creates a new test gateway server with common setup
+func NewGatewayTestServer(t *testing.T, opts GatewayOptions) *GatewayTestServer {
+ if opts.Listen == "" {
+ opts.Listen = "127.0.0.1:0" // Use random port by default
+ }
+
+ // Allow switching to production gateway if requested (requires masters)
+ var srv *gateway.Server
+ if opts.UseProduction {
+ if opts.Masters == "" {
+ // Fallback to env variable for convenience in CI
+ if v := os.Getenv("SEAWEEDFS_MASTERS"); v != "" {
+ opts.Masters = v
+ } else {
+ opts.Masters = "localhost:9333"
+ }
+ }
+ srv = gateway.NewServer(gateway.Options{
+ Listen: opts.Listen,
+ Masters: opts.Masters,
+ })
+ } else {
+ // For unit testing without real SeaweedMQ masters
+ srv = gateway.NewTestServerForUnitTests(gateway.Options{
+ Listen: opts.Listen,
+ })
+ }
+
+ return &GatewayTestServer{
+ Server: srv,
+ t: t,
+ }
+}
+
+// StartAndWait starts the gateway and waits for it to be ready
+func (g *GatewayTestServer) StartAndWait() string {
+ g.t.Helper()
+
+ // Start server in goroutine
+ go func() {
+ // Enable schema mode automatically when SCHEMA_REGISTRY_URL is set
+ if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" {
+ h := g.GetHandler()
+ if h != nil {
+ _ = h.EnableSchemaManagement(schema.ManagerConfig{RegistryURL: url})
+ }
+ }
+ if err := g.Start(); err != nil {
+ g.t.Errorf("Failed to start gateway: %v", err)
+ }
+ }()
+
+ // Wait for server to be ready
+ time.Sleep(100 * time.Millisecond)
+
+ host, port := g.GetListenerAddr()
+ addr := fmt.Sprintf("%s:%d", host, port)
+ g.t.Logf("Gateway running on %s", addr)
+
+ return addr
+}
+
+// AddTestTopic adds a topic for testing with default configuration
+func (g *GatewayTestServer) AddTestTopic(name string) {
+ g.t.Helper()
+ g.GetHandler().AddTopicForTesting(name, 1)
+ g.t.Logf("Added test topic: %s", name)
+}
+
+// AddTestTopics adds multiple topics for testing
+func (g *GatewayTestServer) AddTestTopics(names ...string) {
+ g.t.Helper()
+ for _, name := range names {
+ g.AddTestTopic(name)
+ }
+}
+
+// CleanupAndClose properly closes the gateway server
+func (g *GatewayTestServer) CleanupAndClose() {
+ g.t.Helper()
+ if err := g.Close(); err != nil {
+ g.t.Errorf("Failed to close gateway: %v", err)
+ }
+}
+
+// SMQAvailabilityMode indicates whether SeaweedMQ is available for testing
+type SMQAvailabilityMode int
+
+const (
+ SMQUnavailable SMQAvailabilityMode = iota // Use mock handler only
+ SMQAvailable // SMQ is available, can use production mode
+ SMQRequired // SMQ is required, skip test if unavailable
+)
+
+// CheckSMQAvailability checks if SeaweedFS masters are available for testing
+func CheckSMQAvailability() (bool, string) {
+ masters := os.Getenv("SEAWEEDFS_MASTERS")
+ if masters == "" {
+ return false, ""
+ }
+
+ // Test if at least one master is reachable
+ if masters != "" {
+ // Try to connect to the first master to verify availability
+ conn, err := net.DialTimeout("tcp", masters, 2*time.Second)
+ if err != nil {
+ return false, masters // Masters specified but unreachable
+ }
+ conn.Close()
+ return true, masters
+ }
+
+ return false, ""
+}
+
+// NewGatewayTestServerWithSMQ creates a gateway server that automatically uses SMQ if available
+func NewGatewayTestServerWithSMQ(t *testing.T, mode SMQAvailabilityMode) *GatewayTestServer {
+ smqAvailable, masters := CheckSMQAvailability()
+
+ switch mode {
+ case SMQRequired:
+ if !smqAvailable {
+ if masters != "" {
+ t.Skipf("Skipping test: SEAWEEDFS_MASTERS=%s specified but unreachable", masters)
+ } else {
+ t.Skip("Skipping test: SEAWEEDFS_MASTERS required but not set")
+ }
+ }
+ t.Logf("Using SMQ-backed gateway with masters: %s", masters)
+ return newGatewayTestServerWithTimeout(t, GatewayOptions{
+ UseProduction: true,
+ Masters: masters,
+ }, 120*time.Second)
+
+ case SMQAvailable:
+ if smqAvailable {
+ t.Logf("SMQ available, using production gateway with masters: %s", masters)
+ return newGatewayTestServerWithTimeout(t, GatewayOptions{
+ UseProduction: true,
+ Masters: masters,
+ }, 120*time.Second)
+ } else {
+ t.Logf("SMQ not available, using mock gateway")
+ return NewGatewayTestServer(t, GatewayOptions{})
+ }
+
+ default: // SMQUnavailable
+ t.Logf("Using mock gateway (SMQ integration disabled)")
+ return NewGatewayTestServer(t, GatewayOptions{})
+ }
+}
+
+// newGatewayTestServerWithTimeout creates a gateway server with a timeout to prevent hanging
+func newGatewayTestServerWithTimeout(t *testing.T, opts GatewayOptions, timeout time.Duration) *GatewayTestServer {
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+
+ done := make(chan *GatewayTestServer, 1)
+ errChan := make(chan error, 1)
+
+ go func() {
+ defer func() {
+ if r := recover(); r != nil {
+ errChan <- fmt.Errorf("panic creating gateway: %v", r)
+ }
+ }()
+
+ // Create the gateway in a goroutine so we can timeout if it hangs
+ t.Logf("Creating gateway with masters: %s (with %v timeout)", opts.Masters, timeout)
+ gateway := NewGatewayTestServer(t, opts)
+ t.Logf("Gateway created successfully")
+ done <- gateway
+ }()
+
+ select {
+ case gateway := <-done:
+ return gateway
+ case err := <-errChan:
+ t.Fatalf("Error creating gateway: %v", err)
+ case <-ctx.Done():
+ t.Fatalf("Timeout creating gateway after %v - likely SMQ broker discovery failed. Check if MQ brokers are running and accessible.", timeout)
+ }
+
+ return nil // This should never be reached
+}
+
+// IsSMQMode returns true if the gateway is using real SMQ backend
+// This is determined by checking if we have the SEAWEEDFS_MASTERS environment variable
+func (g *GatewayTestServer) IsSMQMode() bool {
+ available, _ := CheckSMQAvailability()
+ return available
+}
diff --git a/test/kafka/internal/testutil/messages.go b/test/kafka/internal/testutil/messages.go
new file mode 100644
index 000000000..803dc8e0d
--- /dev/null
+++ b/test/kafka/internal/testutil/messages.go
@@ -0,0 +1,135 @@
+package testutil
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
+ "github.com/segmentio/kafka-go"
+)
+
+// MessageGenerator provides utilities for generating test messages
+type MessageGenerator struct {
+ counter int
+}
+
+// NewMessageGenerator creates a new message generator
+func NewMessageGenerator() *MessageGenerator {
+ return &MessageGenerator{counter: 0}
+}
+
+// GenerateKafkaGoMessages generates kafka-go messages for testing
+func (m *MessageGenerator) GenerateKafkaGoMessages(count int) []kafka.Message {
+ messages := make([]kafka.Message, count)
+
+ for i := 0; i < count; i++ {
+ m.counter++
+ key := []byte(fmt.Sprintf("test-key-%d", m.counter))
+ val := []byte(fmt.Sprintf("{\"value\":\"test-message-%d-generated-at-%d\"}", m.counter, time.Now().Unix()))
+
+ // If schema mode is requested, ensure a test schema exists and wrap with Confluent envelope
+ if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" {
+ subject := "offset-management-value"
+ schemaJSON := `{"type":"record","name":"TestRecord","fields":[{"name":"value","type":"string"}]}`
+ rc := schema.NewRegistryClient(schema.RegistryConfig{URL: url})
+ if _, err := rc.GetLatestSchema(subject); err != nil {
+ // Best-effort register schema
+ _, _ = rc.RegisterSchema(subject, schemaJSON)
+ }
+ if latest, err := rc.GetLatestSchema(subject); err == nil {
+ val = schema.CreateConfluentEnvelope(schema.FormatAvro, latest.LatestID, nil, val)
+ } else {
+ // fallback to schema id 1
+ val = schema.CreateConfluentEnvelope(schema.FormatAvro, 1, nil, val)
+ }
+ }
+
+ messages[i] = kafka.Message{Key: key, Value: val}
+ }
+
+ return messages
+}
+
+// GenerateStringMessages generates string messages for Sarama
+func (m *MessageGenerator) GenerateStringMessages(count int) []string {
+ messages := make([]string, count)
+
+ for i := 0; i < count; i++ {
+ m.counter++
+ messages[i] = fmt.Sprintf("test-message-%d-generated-at-%d", m.counter, time.Now().Unix())
+ }
+
+ return messages
+}
+
+// GenerateKafkaGoMessage generates a single kafka-go message
+func (m *MessageGenerator) GenerateKafkaGoMessage(key, value string) kafka.Message {
+ if key == "" {
+ m.counter++
+ key = fmt.Sprintf("test-key-%d", m.counter)
+ }
+ if value == "" {
+ value = fmt.Sprintf("test-message-%d-generated-at-%d", m.counter, time.Now().Unix())
+ }
+
+ return kafka.Message{
+ Key: []byte(key),
+ Value: []byte(value),
+ }
+}
+
+// GenerateUniqueTopicName generates a unique topic name for testing
+func GenerateUniqueTopicName(prefix string) string {
+ if prefix == "" {
+ prefix = "test-topic"
+ }
+ return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano())
+}
+
+// GenerateUniqueGroupID generates a unique consumer group ID for testing
+func GenerateUniqueGroupID(prefix string) string {
+ if prefix == "" {
+ prefix = "test-group"
+ }
+ return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano())
+}
+
+// ValidateMessageContent validates that consumed messages match expected content
+func ValidateMessageContent(expected, actual []string) error {
+ if len(expected) != len(actual) {
+ return fmt.Errorf("message count mismatch: expected %d, got %d", len(expected), len(actual))
+ }
+
+ for i, expectedMsg := range expected {
+ if i >= len(actual) {
+ return fmt.Errorf("missing message at index %d", i)
+ }
+ if actual[i] != expectedMsg {
+ return fmt.Errorf("message mismatch at index %d: expected %q, got %q", i, expectedMsg, actual[i])
+ }
+ }
+
+ return nil
+}
+
+// ValidateKafkaGoMessageContent validates kafka-go messages
+func ValidateKafkaGoMessageContent(expected, actual []kafka.Message) error {
+ if len(expected) != len(actual) {
+ return fmt.Errorf("message count mismatch: expected %d, got %d", len(expected), len(actual))
+ }
+
+ for i, expectedMsg := range expected {
+ if i >= len(actual) {
+ return fmt.Errorf("missing message at index %d", i)
+ }
+ if string(actual[i].Key) != string(expectedMsg.Key) {
+ return fmt.Errorf("key mismatch at index %d: expected %q, got %q", i, string(expectedMsg.Key), string(actual[i].Key))
+ }
+ if string(actual[i].Value) != string(expectedMsg.Value) {
+ return fmt.Errorf("value mismatch at index %d: expected %q, got %q", i, string(expectedMsg.Value), string(actual[i].Value))
+ }
+ }
+
+ return nil
+}
diff --git a/test/kafka/internal/testutil/schema_helper.go b/test/kafka/internal/testutil/schema_helper.go
new file mode 100644
index 000000000..868cc286b
--- /dev/null
+++ b/test/kafka/internal/testutil/schema_helper.go
@@ -0,0 +1,33 @@
+package testutil
+
+import (
+ "testing"
+
+ kschema "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
+)
+
+// EnsureValueSchema registers a minimal Avro value schema for the given topic if not present.
+// Returns the latest schema ID if successful.
+func EnsureValueSchema(t *testing.T, registryURL, topic string) (uint32, error) {
+ t.Helper()
+ subject := topic + "-value"
+ rc := kschema.NewRegistryClient(kschema.RegistryConfig{URL: registryURL})
+
+ // Minimal Avro record schema with string field "value"
+ schemaJSON := `{"type":"record","name":"TestRecord","fields":[{"name":"value","type":"string"}]}`
+
+ // Try to get existing
+ if latest, err := rc.GetLatestSchema(subject); err == nil {
+ return latest.LatestID, nil
+ }
+
+ // Register and fetch latest
+ if _, err := rc.RegisterSchema(subject, schemaJSON); err != nil {
+ return 0, err
+ }
+ latest, err := rc.GetLatestSchema(subject)
+ if err != nil {
+ return 0, err
+ }
+ return latest.LatestID, nil
+}