aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/integration
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/integration')
-rw-r--r--test/kafka/integration/client_compatibility_test.go549
-rw-r--r--test/kafka/integration/consumer_groups_test.go351
-rw-r--r--test/kafka/integration/docker_test.go216
-rw-r--r--test/kafka/integration/rebalancing_test.go453
-rw-r--r--test/kafka/integration/schema_end_to_end_test.go299
-rw-r--r--test/kafka/integration/schema_registry_test.go210
-rw-r--r--test/kafka/integration/smq_integration_test.go305
7 files changed, 2383 insertions, 0 deletions
diff --git a/test/kafka/integration/client_compatibility_test.go b/test/kafka/integration/client_compatibility_test.go
new file mode 100644
index 000000000..e106d26d5
--- /dev/null
+++ b/test/kafka/integration/client_compatibility_test.go
@@ -0,0 +1,549 @@
+package integration
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/IBM/sarama"
+ "github.com/segmentio/kafka-go"
+
+ "github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
+)
+
+// TestClientCompatibility tests compatibility with different Kafka client libraries and versions
+// This test will use SMQ backend if SEAWEEDFS_MASTERS is available, otherwise mock
+func TestClientCompatibility(t *testing.T) {
+ gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQAvailable)
+ defer gateway.CleanupAndClose()
+
+ addr := gateway.StartAndWait()
+ time.Sleep(200 * time.Millisecond) // Allow gateway to be ready
+
+ // Log which backend we're using
+ if gateway.IsSMQMode() {
+ t.Logf("Running client compatibility tests with SMQ backend")
+ } else {
+ t.Logf("Running client compatibility tests with mock backend")
+ }
+
+ t.Run("SaramaVersionCompatibility", func(t *testing.T) {
+ testSaramaVersionCompatibility(t, addr)
+ })
+
+ t.Run("KafkaGoVersionCompatibility", func(t *testing.T) {
+ testKafkaGoVersionCompatibility(t, addr)
+ })
+
+ t.Run("APIVersionNegotiation", func(t *testing.T) {
+ testAPIVersionNegotiation(t, addr)
+ })
+
+ t.Run("ProducerConsumerCompatibility", func(t *testing.T) {
+ testProducerConsumerCompatibility(t, addr)
+ })
+
+ t.Run("ConsumerGroupCompatibility", func(t *testing.T) {
+ testConsumerGroupCompatibility(t, addr)
+ })
+
+ t.Run("AdminClientCompatibility", func(t *testing.T) {
+ testAdminClientCompatibility(t, addr)
+ })
+}
+
+func testSaramaVersionCompatibility(t *testing.T, addr string) {
+ versions := []sarama.KafkaVersion{
+ sarama.V2_6_0_0,
+ sarama.V2_8_0_0,
+ sarama.V3_0_0_0,
+ sarama.V3_4_0_0,
+ }
+
+ for _, version := range versions {
+ t.Run(fmt.Sprintf("Sarama_%s", version.String()), func(t *testing.T) {
+ config := sarama.NewConfig()
+ config.Version = version
+ config.Producer.Return.Successes = true
+ config.Consumer.Return.Errors = true
+
+ client, err := sarama.NewClient([]string{addr}, config)
+ if err != nil {
+ t.Fatalf("Failed to create Sarama client for version %s: %v", version, err)
+ }
+ defer client.Close()
+
+ // Test basic operations
+ topicName := testutil.GenerateUniqueTopicName(fmt.Sprintf("sarama-%s", version.String()))
+
+ // Test topic creation via admin client
+ admin, err := sarama.NewClusterAdminFromClient(client)
+ if err != nil {
+ t.Fatalf("Failed to create admin client: %v", err)
+ }
+ defer admin.Close()
+
+ topicDetail := &sarama.TopicDetail{
+ NumPartitions: 1,
+ ReplicationFactor: 1,
+ }
+
+ err = admin.CreateTopic(topicName, topicDetail, false)
+ if err != nil {
+ t.Logf("Topic creation failed (may already exist): %v", err)
+ }
+
+ // Test produce
+ producer, err := sarama.NewSyncProducerFromClient(client)
+ if err != nil {
+ t.Fatalf("Failed to create producer: %v", err)
+ }
+ defer producer.Close()
+
+ message := &sarama.ProducerMessage{
+ Topic: topicName,
+ Value: sarama.StringEncoder(fmt.Sprintf("test-message-%s", version.String())),
+ }
+
+ partition, offset, err := producer.SendMessage(message)
+ if err != nil {
+ t.Fatalf("Failed to send message: %v", err)
+ }
+
+ t.Logf("Sarama %s: Message sent to partition %d at offset %d", version, partition, offset)
+
+ // Test consume
+ consumer, err := sarama.NewConsumerFromClient(client)
+ if err != nil {
+ t.Fatalf("Failed to create consumer: %v", err)
+ }
+ defer consumer.Close()
+
+ partitionConsumer, err := consumer.ConsumePartition(topicName, 0, sarama.OffsetOldest)
+ if err != nil {
+ t.Fatalf("Failed to create partition consumer: %v", err)
+ }
+ defer partitionConsumer.Close()
+
+ select {
+ case msg := <-partitionConsumer.Messages():
+ if string(msg.Value) != fmt.Sprintf("test-message-%s", version.String()) {
+ t.Errorf("Message content mismatch: expected %s, got %s",
+ fmt.Sprintf("test-message-%s", version.String()), string(msg.Value))
+ }
+ t.Logf("Sarama %s: Successfully consumed message", version)
+ case err := <-partitionConsumer.Errors():
+ t.Fatalf("Consumer error: %v", err)
+ case <-time.After(5 * time.Second):
+ t.Fatal("Timeout waiting for message")
+ }
+ })
+ }
+}
+
+func testKafkaGoVersionCompatibility(t *testing.T, addr string) {
+ // Test different kafka-go configurations
+ configs := []struct {
+ name string
+ readerConfig kafka.ReaderConfig
+ writerConfig kafka.WriterConfig
+ }{
+ {
+ name: "kafka-go-default",
+ readerConfig: kafka.ReaderConfig{
+ Brokers: []string{addr},
+ Partition: 0, // Read from specific partition instead of using consumer group
+ },
+ writerConfig: kafka.WriterConfig{
+ Brokers: []string{addr},
+ },
+ },
+ {
+ name: "kafka-go-with-batching",
+ readerConfig: kafka.ReaderConfig{
+ Brokers: []string{addr},
+ Partition: 0, // Read from specific partition instead of using consumer group
+ MinBytes: 1,
+ MaxBytes: 10e6,
+ },
+ writerConfig: kafka.WriterConfig{
+ Brokers: []string{addr},
+ BatchSize: 100,
+ BatchTimeout: 10 * time.Millisecond,
+ },
+ },
+ }
+
+ for _, config := range configs {
+ t.Run(config.name, func(t *testing.T) {
+ topicName := testutil.GenerateUniqueTopicName(config.name)
+
+ // Create topic first using Sarama admin client (kafka-go doesn't have admin client)
+ saramaConfig := sarama.NewConfig()
+ saramaClient, err := sarama.NewClient([]string{addr}, saramaConfig)
+ if err != nil {
+ t.Fatalf("Failed to create Sarama client for topic creation: %v", err)
+ }
+ defer saramaClient.Close()
+
+ admin, err := sarama.NewClusterAdminFromClient(saramaClient)
+ if err != nil {
+ t.Fatalf("Failed to create admin client: %v", err)
+ }
+ defer admin.Close()
+
+ topicDetail := &sarama.TopicDetail{
+ NumPartitions: 1,
+ ReplicationFactor: 1,
+ }
+
+ err = admin.CreateTopic(topicName, topicDetail, false)
+ if err != nil {
+ t.Logf("Topic creation failed (may already exist): %v", err)
+ }
+
+ // Wait for topic to be fully created
+ time.Sleep(200 * time.Millisecond)
+
+ // Configure writer first and write message
+ config.writerConfig.Topic = topicName
+ writer := kafka.NewWriter(config.writerConfig)
+
+ // Test produce
+ produceCtx, produceCancel := context.WithTimeout(context.Background(), 15*time.Second)
+ defer produceCancel()
+
+ message := kafka.Message{
+ Value: []byte(fmt.Sprintf("test-message-%s", config.name)),
+ }
+
+ err = writer.WriteMessages(produceCtx, message)
+ if err != nil {
+ writer.Close()
+ t.Fatalf("Failed to write message: %v", err)
+ }
+
+ // Close writer before reading to ensure flush
+ if err := writer.Close(); err != nil {
+ t.Logf("Warning: writer close error: %v", err)
+ }
+
+ t.Logf("%s: Message written successfully", config.name)
+
+ // Wait for message to be available
+ time.Sleep(100 * time.Millisecond)
+
+ // Configure and create reader
+ config.readerConfig.Topic = topicName
+ config.readerConfig.StartOffset = kafka.FirstOffset
+ reader := kafka.NewReader(config.readerConfig)
+
+ // Test consume with dedicated context
+ consumeCtx, consumeCancel := context.WithTimeout(context.Background(), 15*time.Second)
+
+ msg, err := reader.ReadMessage(consumeCtx)
+ consumeCancel()
+
+ if err != nil {
+ reader.Close()
+ t.Fatalf("Failed to read message: %v", err)
+ }
+
+ if string(msg.Value) != fmt.Sprintf("test-message-%s", config.name) {
+ reader.Close()
+ t.Errorf("Message content mismatch: expected %s, got %s",
+ fmt.Sprintf("test-message-%s", config.name), string(msg.Value))
+ }
+
+ t.Logf("%s: Successfully consumed message", config.name)
+
+ // Close reader and wait for cleanup
+ if err := reader.Close(); err != nil {
+ t.Logf("Warning: reader close error: %v", err)
+ }
+
+ // Give time for background goroutines to clean up
+ time.Sleep(100 * time.Millisecond)
+ })
+ }
+}
+
+func testAPIVersionNegotiation(t *testing.T, addr string) {
+ // Test that clients can negotiate API versions properly
+ config := sarama.NewConfig()
+ config.Version = sarama.V2_8_0_0
+
+ client, err := sarama.NewClient([]string{addr}, config)
+ if err != nil {
+ t.Fatalf("Failed to create client: %v", err)
+ }
+ defer client.Close()
+
+ // Test that the client can get API versions
+ coordinator, err := client.Coordinator("test-group")
+ if err != nil {
+ t.Logf("Coordinator lookup failed (expected for test): %v", err)
+ } else {
+ t.Logf("Successfully found coordinator: %s", coordinator.Addr())
+ }
+
+ // Test metadata request (should work with version negotiation)
+ topics, err := client.Topics()
+ if err != nil {
+ t.Fatalf("Failed to get topics: %v", err)
+ }
+
+ t.Logf("API version negotiation successful, found %d topics", len(topics))
+}
+
+func testProducerConsumerCompatibility(t *testing.T, addr string) {
+ // Test cross-client compatibility: produce with one client, consume with another
+ topicName := testutil.GenerateUniqueTopicName("cross-client-test")
+
+ // Create topic first
+ saramaConfig := sarama.NewConfig()
+ saramaConfig.Producer.Return.Successes = true
+
+ saramaClient, err := sarama.NewClient([]string{addr}, saramaConfig)
+ if err != nil {
+ t.Fatalf("Failed to create Sarama client: %v", err)
+ }
+ defer saramaClient.Close()
+
+ admin, err := sarama.NewClusterAdminFromClient(saramaClient)
+ if err != nil {
+ t.Fatalf("Failed to create admin client: %v", err)
+ }
+ defer admin.Close()
+
+ topicDetail := &sarama.TopicDetail{
+ NumPartitions: 1,
+ ReplicationFactor: 1,
+ }
+
+ err = admin.CreateTopic(topicName, topicDetail, false)
+ if err != nil {
+ t.Logf("Topic creation failed (may already exist): %v", err)
+ }
+
+ // Wait for topic to be fully created
+ time.Sleep(200 * time.Millisecond)
+
+ producer, err := sarama.NewSyncProducerFromClient(saramaClient)
+ if err != nil {
+ t.Fatalf("Failed to create producer: %v", err)
+ }
+ defer producer.Close()
+
+ message := &sarama.ProducerMessage{
+ Topic: topicName,
+ Value: sarama.StringEncoder("cross-client-message"),
+ }
+
+ _, _, err = producer.SendMessage(message)
+ if err != nil {
+ t.Fatalf("Failed to send message with Sarama: %v", err)
+ }
+
+ t.Logf("Produced message with Sarama")
+
+ // Wait for message to be available
+ time.Sleep(100 * time.Millisecond)
+
+ // Consume with kafka-go (without consumer group to avoid offset commit issues)
+ reader := kafka.NewReader(kafka.ReaderConfig{
+ Brokers: []string{addr},
+ Topic: topicName,
+ Partition: 0,
+ StartOffset: kafka.FirstOffset,
+ })
+
+ ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+ msg, err := reader.ReadMessage(ctx)
+ cancel()
+
+ // Close reader immediately after reading
+ if closeErr := reader.Close(); closeErr != nil {
+ t.Logf("Warning: reader close error: %v", closeErr)
+ }
+
+ if err != nil {
+ t.Fatalf("Failed to read message with kafka-go: %v", err)
+ }
+
+ if string(msg.Value) != "cross-client-message" {
+ t.Errorf("Message content mismatch: expected 'cross-client-message', got '%s'", string(msg.Value))
+ }
+
+ t.Logf("Cross-client compatibility test passed")
+}
+
+func testConsumerGroupCompatibility(t *testing.T, addr string) {
+ // Test consumer group functionality with different clients
+ topicName := testutil.GenerateUniqueTopicName("consumer-group-test")
+
+ // Create topic and produce messages
+ config := sarama.NewConfig()
+ config.Producer.Return.Successes = true
+
+ client, err := sarama.NewClient([]string{addr}, config)
+ if err != nil {
+ t.Fatalf("Failed to create client: %v", err)
+ }
+ defer client.Close()
+
+ // Create topic first
+ admin, err := sarama.NewClusterAdminFromClient(client)
+ if err != nil {
+ t.Fatalf("Failed to create admin client: %v", err)
+ }
+ defer admin.Close()
+
+ topicDetail := &sarama.TopicDetail{
+ NumPartitions: 1,
+ ReplicationFactor: 1,
+ }
+
+ err = admin.CreateTopic(topicName, topicDetail, false)
+ if err != nil {
+ t.Logf("Topic creation failed (may already exist): %v", err)
+ }
+
+ // Wait for topic to be fully created
+ time.Sleep(200 * time.Millisecond)
+
+ producer, err := sarama.NewSyncProducerFromClient(client)
+ if err != nil {
+ t.Fatalf("Failed to create producer: %v", err)
+ }
+ defer producer.Close()
+
+ // Produce test messages
+ for i := 0; i < 5; i++ {
+ message := &sarama.ProducerMessage{
+ Topic: topicName,
+ Value: sarama.StringEncoder(fmt.Sprintf("group-message-%d", i)),
+ }
+
+ _, _, err = producer.SendMessage(message)
+ if err != nil {
+ t.Fatalf("Failed to send message %d: %v", i, err)
+ }
+ }
+
+ t.Logf("Produced 5 messages successfully")
+
+ // Wait for messages to be available
+ time.Sleep(200 * time.Millisecond)
+
+ // Test consumer group with Sarama (kafka-go consumer groups have offset commit issues)
+ consumer, err := sarama.NewConsumerFromClient(client)
+ if err != nil {
+ t.Fatalf("Failed to create consumer: %v", err)
+ }
+ defer consumer.Close()
+
+ partitionConsumer, err := consumer.ConsumePartition(topicName, 0, sarama.OffsetOldest)
+ if err != nil {
+ t.Fatalf("Failed to create partition consumer: %v", err)
+ }
+ defer partitionConsumer.Close()
+
+ messagesReceived := 0
+ timeout := time.After(30 * time.Second)
+
+ for messagesReceived < 5 {
+ select {
+ case msg := <-partitionConsumer.Messages():
+ t.Logf("Received message %d: %s", messagesReceived, string(msg.Value))
+ messagesReceived++
+ case err := <-partitionConsumer.Errors():
+ t.Logf("Consumer error (continuing): %v", err)
+ case <-timeout:
+ t.Fatalf("Timeout waiting for messages, received %d out of 5", messagesReceived)
+ }
+ }
+
+ t.Logf("Consumer group compatibility test passed: received %d messages", messagesReceived)
+}
+
+func testAdminClientCompatibility(t *testing.T, addr string) {
+ // Test admin operations with different clients
+ config := sarama.NewConfig()
+ config.Version = sarama.V2_8_0_0
+ config.Admin.Timeout = 30 * time.Second
+
+ client, err := sarama.NewClient([]string{addr}, config)
+ if err != nil {
+ t.Fatalf("Failed to create client: %v", err)
+ }
+ defer client.Close()
+
+ admin, err := sarama.NewClusterAdminFromClient(client)
+ if err != nil {
+ t.Fatalf("Failed to create admin client: %v", err)
+ }
+ defer admin.Close()
+
+ // Test topic operations
+ topicName := testutil.GenerateUniqueTopicName("admin-test")
+
+ topicDetail := &sarama.TopicDetail{
+ NumPartitions: 2,
+ ReplicationFactor: 1,
+ }
+
+ err = admin.CreateTopic(topicName, topicDetail, false)
+ if err != nil {
+ t.Logf("Topic creation failed (may already exist): %v", err)
+ }
+
+ // Wait for topic to be fully created and propagated
+ time.Sleep(500 * time.Millisecond)
+
+ // List topics with retry logic
+ var topics map[string]sarama.TopicDetail
+ maxRetries := 3
+ for i := 0; i < maxRetries; i++ {
+ topics, err = admin.ListTopics()
+ if err == nil {
+ break
+ }
+ t.Logf("List topics attempt %d failed: %v, retrying...", i+1, err)
+ time.Sleep(time.Duration(500*(i+1)) * time.Millisecond)
+ }
+
+ if err != nil {
+ t.Fatalf("Failed to list topics after %d attempts: %v", maxRetries, err)
+ }
+
+ found := false
+ for topic := range topics {
+ if topic == topicName {
+ found = true
+ t.Logf("Found created topic: %s", topicName)
+ break
+ }
+ }
+
+ if !found {
+ // Log all topics for debugging
+ allTopics := make([]string, 0, len(topics))
+ for topic := range topics {
+ allTopics = append(allTopics, topic)
+ }
+ t.Logf("Available topics: %v", allTopics)
+ t.Errorf("Created topic %s not found in topic list", topicName)
+ }
+
+ // Test describe consumer groups (if supported)
+ groups, err := admin.ListConsumerGroups()
+ if err != nil {
+ t.Logf("List consumer groups failed (may not be implemented): %v", err)
+ } else {
+ t.Logf("Found %d consumer groups", len(groups))
+ }
+
+ t.Logf("Admin client compatibility test passed")
+}
diff --git a/test/kafka/integration/consumer_groups_test.go b/test/kafka/integration/consumer_groups_test.go
new file mode 100644
index 000000000..5407a2999
--- /dev/null
+++ b/test/kafka/integration/consumer_groups_test.go
@@ -0,0 +1,351 @@
+package integration
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/IBM/sarama"
+ "github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
+)
+
+// TestConsumerGroups tests consumer group functionality
+// This test requires SeaweedFS masters to be running and will skip if not available
+func TestConsumerGroups(t *testing.T) {
+ gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQRequired)
+ defer gateway.CleanupAndClose()
+
+ addr := gateway.StartAndWait()
+
+ t.Logf("Running consumer group tests with SMQ backend for offset persistence")
+
+ t.Run("BasicFunctionality", func(t *testing.T) {
+ testConsumerGroupBasicFunctionality(t, addr)
+ })
+
+ t.Run("OffsetCommitAndFetch", func(t *testing.T) {
+ testConsumerGroupOffsetCommitAndFetch(t, addr)
+ })
+
+ t.Run("Rebalancing", func(t *testing.T) {
+ testConsumerGroupRebalancing(t, addr)
+ })
+}
+
+func testConsumerGroupBasicFunctionality(t *testing.T, addr string) {
+ topicName := testutil.GenerateUniqueTopicName("consumer-group-basic")
+ groupID := testutil.GenerateUniqueGroupID("basic-group")
+
+ client := testutil.NewSaramaClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Create topic and produce messages
+ err := client.CreateTopic(topicName, 1, 1)
+ testutil.AssertNoError(t, err, "Failed to create topic")
+
+ messages := msgGen.GenerateStringMessages(9) // 3 messages per consumer
+ err = client.ProduceMessages(topicName, messages)
+ testutil.AssertNoError(t, err, "Failed to produce messages")
+
+ // Test with multiple consumers in the same group
+ numConsumers := 3
+ handler := &ConsumerGroupHandler{
+ messages: make(chan *sarama.ConsumerMessage, len(messages)),
+ ready: make(chan bool),
+ t: t,
+ }
+
+ var wg sync.WaitGroup
+ consumerErrors := make(chan error, numConsumers)
+
+ for i := 0; i < numConsumers; i++ {
+ wg.Add(1)
+ go func(consumerID int) {
+ defer wg.Done()
+
+ consumerGroup, err := sarama.NewConsumerGroup([]string{addr}, groupID, client.GetConfig())
+ if err != nil {
+ consumerErrors <- fmt.Errorf("consumer %d: failed to create consumer group: %v", consumerID, err)
+ return
+ }
+ defer consumerGroup.Close()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ err = consumerGroup.Consume(ctx, []string{topicName}, handler)
+ if err != nil && err != context.DeadlineExceeded {
+ consumerErrors <- fmt.Errorf("consumer %d: consumption error: %v", consumerID, err)
+ return
+ }
+ }(i)
+ }
+
+ // Wait for consumers to be ready
+ readyCount := 0
+ for readyCount < numConsumers {
+ select {
+ case <-handler.ready:
+ readyCount++
+ case <-time.After(5 * time.Second):
+ t.Fatalf("Timeout waiting for consumers to be ready")
+ }
+ }
+
+ // Collect consumed messages
+ consumedMessages := make([]*sarama.ConsumerMessage, 0, len(messages))
+ messageTimeout := time.After(10 * time.Second)
+
+ for len(consumedMessages) < len(messages) {
+ select {
+ case msg := <-handler.messages:
+ consumedMessages = append(consumedMessages, msg)
+ case err := <-consumerErrors:
+ t.Fatalf("Consumer error: %v", err)
+ case <-messageTimeout:
+ t.Fatalf("Timeout waiting for messages. Got %d/%d messages", len(consumedMessages), len(messages))
+ }
+ }
+
+ wg.Wait()
+
+ // Verify all messages were consumed exactly once
+ testutil.AssertEqual(t, len(messages), len(consumedMessages), "Message count mismatch")
+
+ // Verify message uniqueness (no duplicates)
+ messageKeys := make(map[string]bool)
+ for _, msg := range consumedMessages {
+ key := string(msg.Key)
+ if messageKeys[key] {
+ t.Errorf("Duplicate message key: %s", key)
+ }
+ messageKeys[key] = true
+ }
+}
+
+func testConsumerGroupOffsetCommitAndFetch(t *testing.T, addr string) {
+ topicName := testutil.GenerateUniqueTopicName("offset-commit-test")
+ groupID := testutil.GenerateUniqueGroupID("offset-group")
+
+ client := testutil.NewSaramaClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Create topic and produce messages
+ err := client.CreateTopic(topicName, 1, 1)
+ testutil.AssertNoError(t, err, "Failed to create topic")
+
+ messages := msgGen.GenerateStringMessages(5)
+ err = client.ProduceMessages(topicName, messages)
+ testutil.AssertNoError(t, err, "Failed to produce messages")
+
+ // First consumer: consume first 3 messages and commit offsets
+ handler1 := &OffsetTestHandler{
+ messages: make(chan *sarama.ConsumerMessage, len(messages)),
+ ready: make(chan bool),
+ stopAfter: 3,
+ t: t,
+ }
+
+ consumerGroup1, err := sarama.NewConsumerGroup([]string{addr}, groupID, client.GetConfig())
+ testutil.AssertNoError(t, err, "Failed to create first consumer group")
+
+ ctx1, cancel1 := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel1()
+
+ go func() {
+ err := consumerGroup1.Consume(ctx1, []string{topicName}, handler1)
+ if err != nil && err != context.DeadlineExceeded {
+ t.Logf("First consumer error: %v", err)
+ }
+ }()
+
+ // Wait for first consumer to be ready and consume messages
+ <-handler1.ready
+ consumedCount := 0
+ for consumedCount < 3 {
+ select {
+ case <-handler1.messages:
+ consumedCount++
+ case <-time.After(5 * time.Second):
+ t.Fatalf("Timeout waiting for first consumer messages")
+ }
+ }
+
+ consumerGroup1.Close()
+ cancel1()
+ time.Sleep(500 * time.Millisecond) // Wait for cleanup
+
+ // Stop the first consumer after N messages
+ // Allow a brief moment for commit/heartbeat to flush
+ time.Sleep(1 * time.Second)
+
+ // Start a second consumer in the same group to verify resumption from committed offset
+ handler2 := &OffsetTestHandler{
+ messages: make(chan *sarama.ConsumerMessage, len(messages)),
+ ready: make(chan bool),
+ stopAfter: 2,
+ t: t,
+ }
+ consumerGroup2, err := sarama.NewConsumerGroup([]string{addr}, groupID, client.GetConfig())
+ testutil.AssertNoError(t, err, "Failed to create second consumer group")
+ defer consumerGroup2.Close()
+
+ ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel2()
+
+ go func() {
+ err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2)
+ if err != nil && err != context.DeadlineExceeded {
+ t.Logf("Second consumer error: %v", err)
+ }
+ }()
+
+ // Wait for second consumer and collect remaining messages
+ <-handler2.ready
+ secondConsumerMessages := make([]*sarama.ConsumerMessage, 0)
+ consumedCount = 0
+ for consumedCount < 2 {
+ select {
+ case msg := <-handler2.messages:
+ consumedCount++
+ secondConsumerMessages = append(secondConsumerMessages, msg)
+ case <-time.After(5 * time.Second):
+ t.Fatalf("Timeout waiting for second consumer messages. Got %d/2", consumedCount)
+ }
+ }
+
+ // Verify second consumer started from correct offset
+ if len(secondConsumerMessages) > 0 {
+ firstMessageOffset := secondConsumerMessages[0].Offset
+ if firstMessageOffset < 3 {
+ t.Fatalf("Second consumer should start from offset >= 3: got %d", firstMessageOffset)
+ }
+ }
+}
+
+func testConsumerGroupRebalancing(t *testing.T, addr string) {
+ topicName := testutil.GenerateUniqueTopicName("rebalancing-test")
+ groupID := testutil.GenerateUniqueGroupID("rebalance-group")
+
+ client := testutil.NewSaramaClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Create topic with multiple partitions for rebalancing
+ err := client.CreateTopic(topicName, 4, 1) // 4 partitions
+ testutil.AssertNoError(t, err, "Failed to create topic")
+
+ // Produce messages to all partitions
+ messages := msgGen.GenerateStringMessages(12) // 3 messages per partition
+ for i, msg := range messages {
+ partition := int32(i % 4)
+ err = client.ProduceMessageToPartition(topicName, partition, msg)
+ testutil.AssertNoError(t, err, "Failed to produce message")
+ }
+
+ t.Logf("Produced %d messages across 4 partitions", len(messages))
+
+ // Test scenario 1: Single consumer gets all partitions
+ t.Run("SingleConsumerAllPartitions", func(t *testing.T) {
+ testSingleConsumerAllPartitions(t, addr, topicName, groupID+"-single")
+ })
+
+ // Test scenario 2: Add second consumer, verify rebalancing
+ t.Run("TwoConsumersRebalance", func(t *testing.T) {
+ testTwoConsumersRebalance(t, addr, topicName, groupID+"-two")
+ })
+
+ // Test scenario 3: Remove consumer, verify rebalancing
+ t.Run("ConsumerLeaveRebalance", func(t *testing.T) {
+ testConsumerLeaveRebalance(t, addr, topicName, groupID+"-leave")
+ })
+
+ // Test scenario 4: Multiple consumers join simultaneously
+ t.Run("MultipleConsumersJoin", func(t *testing.T) {
+ testMultipleConsumersJoin(t, addr, topicName, groupID+"-multi")
+ })
+}
+
+// ConsumerGroupHandler implements sarama.ConsumerGroupHandler
+type ConsumerGroupHandler struct {
+ messages chan *sarama.ConsumerMessage
+ ready chan bool
+ readyOnce sync.Once
+ t *testing.T
+}
+
+func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
+ h.t.Logf("Consumer group session setup")
+ h.readyOnce.Do(func() {
+ close(h.ready)
+ })
+ return nil
+}
+
+func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
+ h.t.Logf("Consumer group session cleanup")
+ return nil
+}
+
+func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ for {
+ select {
+ case message := <-claim.Messages():
+ if message == nil {
+ return nil
+ }
+ h.messages <- message
+ session.MarkMessage(message, "")
+ case <-session.Context().Done():
+ return nil
+ }
+ }
+}
+
+// OffsetTestHandler implements sarama.ConsumerGroupHandler for offset testing
+type OffsetTestHandler struct {
+ messages chan *sarama.ConsumerMessage
+ ready chan bool
+ readyOnce sync.Once
+ stopAfter int
+ consumed int
+ t *testing.T
+}
+
+func (h *OffsetTestHandler) Setup(sarama.ConsumerGroupSession) error {
+ h.t.Logf("Offset test consumer setup")
+ h.readyOnce.Do(func() {
+ close(h.ready)
+ })
+ return nil
+}
+
+func (h *OffsetTestHandler) Cleanup(sarama.ConsumerGroupSession) error {
+ h.t.Logf("Offset test consumer cleanup")
+ return nil
+}
+
+func (h *OffsetTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ for {
+ select {
+ case message := <-claim.Messages():
+ if message == nil {
+ return nil
+ }
+ h.consumed++
+ h.messages <- message
+ session.MarkMessage(message, "")
+
+ // Stop after consuming the specified number of messages
+ if h.consumed >= h.stopAfter {
+ h.t.Logf("Stopping consumer after %d messages", h.consumed)
+ // Ensure commits are flushed before exiting the claim
+ session.Commit()
+ return nil
+ }
+ case <-session.Context().Done():
+ return nil
+ }
+ }
+}
diff --git a/test/kafka/integration/docker_test.go b/test/kafka/integration/docker_test.go
new file mode 100644
index 000000000..333ec40c5
--- /dev/null
+++ b/test/kafka/integration/docker_test.go
@@ -0,0 +1,216 @@
+package integration
+
+import (
+ "encoding/json"
+ "io"
+ "net/http"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
+)
+
+// TestDockerIntegration tests the complete Kafka integration using Docker Compose
+func TestDockerIntegration(t *testing.T) {
+ env := testutil.NewDockerEnvironment(t)
+ env.SkipIfNotAvailable(t)
+
+ t.Run("KafkaConnectivity", func(t *testing.T) {
+ env.RequireKafka(t)
+ testDockerKafkaConnectivity(t, env.KafkaBootstrap)
+ })
+
+ t.Run("SchemaRegistryConnectivity", func(t *testing.T) {
+ env.RequireSchemaRegistry(t)
+ testDockerSchemaRegistryConnectivity(t, env.SchemaRegistry)
+ })
+
+ t.Run("KafkaGatewayConnectivity", func(t *testing.T) {
+ env.RequireGateway(t)
+ testDockerKafkaGatewayConnectivity(t, env.KafkaGateway)
+ })
+
+ t.Run("SaramaProduceConsume", func(t *testing.T) {
+ env.RequireKafka(t)
+ testDockerSaramaProduceConsume(t, env.KafkaBootstrap)
+ })
+
+ t.Run("KafkaGoProduceConsume", func(t *testing.T) {
+ env.RequireKafka(t)
+ testDockerKafkaGoProduceConsume(t, env.KafkaBootstrap)
+ })
+
+ t.Run("GatewayProduceConsume", func(t *testing.T) {
+ env.RequireGateway(t)
+ testDockerGatewayProduceConsume(t, env.KafkaGateway)
+ })
+
+ t.Run("CrossClientCompatibility", func(t *testing.T) {
+ env.RequireKafka(t)
+ env.RequireGateway(t)
+ testDockerCrossClientCompatibility(t, env.KafkaBootstrap, env.KafkaGateway)
+ })
+}
+
+func testDockerKafkaConnectivity(t *testing.T, bootstrap string) {
+ client := testutil.NewSaramaClient(t, bootstrap)
+
+ // Test basic connectivity by creating a topic
+ topicName := testutil.GenerateUniqueTopicName("connectivity-test")
+ err := client.CreateTopic(topicName, 1, 1)
+ testutil.AssertNoError(t, err, "Failed to create topic for connectivity test")
+
+ t.Logf("Kafka connectivity test passed")
+}
+
+func testDockerSchemaRegistryConnectivity(t *testing.T, registryURL string) {
+ // Test basic HTTP connectivity to Schema Registry
+ client := &http.Client{Timeout: 10 * time.Second}
+
+ // Test 1: Check if Schema Registry is responding
+ resp, err := client.Get(registryURL + "/subjects")
+ if err != nil {
+ t.Fatalf("Failed to connect to Schema Registry at %s: %v", registryURL, err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ t.Fatalf("Schema Registry returned status %d, expected 200", resp.StatusCode)
+ }
+
+ // Test 2: Verify response is valid JSON array
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ t.Fatalf("Failed to read response body: %v", err)
+ }
+
+ var subjects []string
+ if err := json.Unmarshal(body, &subjects); err != nil {
+ t.Fatalf("Schema Registry response is not valid JSON array: %v", err)
+ }
+
+ t.Logf("Schema Registry is accessible with %d subjects", len(subjects))
+
+ // Test 3: Check config endpoint
+ configResp, err := client.Get(registryURL + "/config")
+ if err != nil {
+ t.Fatalf("Failed to get Schema Registry config: %v", err)
+ }
+ defer configResp.Body.Close()
+
+ if configResp.StatusCode != http.StatusOK {
+ t.Fatalf("Schema Registry config endpoint returned status %d", configResp.StatusCode)
+ }
+
+ configBody, err := io.ReadAll(configResp.Body)
+ if err != nil {
+ t.Fatalf("Failed to read config response: %v", err)
+ }
+
+ var config map[string]interface{}
+ if err := json.Unmarshal(configBody, &config); err != nil {
+ t.Fatalf("Schema Registry config response is not valid JSON: %v", err)
+ }
+
+ t.Logf("Schema Registry config: %v", config)
+ t.Logf("Schema Registry connectivity test passed")
+}
+
+func testDockerKafkaGatewayConnectivity(t *testing.T, gatewayURL string) {
+ client := testutil.NewSaramaClient(t, gatewayURL)
+
+ // Test basic connectivity to gateway
+ topicName := testutil.GenerateUniqueTopicName("gateway-connectivity-test")
+ err := client.CreateTopic(topicName, 1, 1)
+ testutil.AssertNoError(t, err, "Failed to create topic via gateway")
+
+ t.Logf("Kafka Gateway connectivity test passed")
+}
+
+func testDockerSaramaProduceConsume(t *testing.T, bootstrap string) {
+ client := testutil.NewSaramaClient(t, bootstrap)
+ msgGen := testutil.NewMessageGenerator()
+
+ topicName := testutil.GenerateUniqueTopicName("sarama-docker-test")
+
+ // Create topic
+ err := client.CreateTopic(topicName, 1, 1)
+ testutil.AssertNoError(t, err, "Failed to create topic")
+
+ // Produce and consume messages
+ messages := msgGen.GenerateStringMessages(3)
+ err = client.ProduceMessages(topicName, messages)
+ testutil.AssertNoError(t, err, "Failed to produce messages")
+
+ consumed, err := client.ConsumeMessages(topicName, 0, len(messages))
+ testutil.AssertNoError(t, err, "Failed to consume messages")
+
+ err = testutil.ValidateMessageContent(messages, consumed)
+ testutil.AssertNoError(t, err, "Message validation failed")
+
+ t.Logf("Sarama produce/consume test passed")
+}
+
+func testDockerKafkaGoProduceConsume(t *testing.T, bootstrap string) {
+ client := testutil.NewKafkaGoClient(t, bootstrap)
+ msgGen := testutil.NewMessageGenerator()
+
+ topicName := testutil.GenerateUniqueTopicName("kafka-go-docker-test")
+
+ // Create topic
+ err := client.CreateTopic(topicName, 1, 1)
+ testutil.AssertNoError(t, err, "Failed to create topic")
+
+ // Produce and consume messages
+ messages := msgGen.GenerateKafkaGoMessages(3)
+ err = client.ProduceMessages(topicName, messages)
+ testutil.AssertNoError(t, err, "Failed to produce messages")
+
+ consumed, err := client.ConsumeMessages(topicName, len(messages))
+ testutil.AssertNoError(t, err, "Failed to consume messages")
+
+ err = testutil.ValidateKafkaGoMessageContent(messages, consumed)
+ testutil.AssertNoError(t, err, "Message validation failed")
+
+ t.Logf("kafka-go produce/consume test passed")
+}
+
+func testDockerGatewayProduceConsume(t *testing.T, gatewayURL string) {
+ client := testutil.NewSaramaClient(t, gatewayURL)
+ msgGen := testutil.NewMessageGenerator()
+
+ topicName := testutil.GenerateUniqueTopicName("gateway-docker-test")
+
+ // Produce and consume via gateway
+ messages := msgGen.GenerateStringMessages(3)
+ err := client.ProduceMessages(topicName, messages)
+ testutil.AssertNoError(t, err, "Failed to produce messages via gateway")
+
+ consumed, err := client.ConsumeMessages(topicName, 0, len(messages))
+ testutil.AssertNoError(t, err, "Failed to consume messages via gateway")
+
+ err = testutil.ValidateMessageContent(messages, consumed)
+ testutil.AssertNoError(t, err, "Message validation failed")
+
+ t.Logf("Gateway produce/consume test passed")
+}
+
+func testDockerCrossClientCompatibility(t *testing.T, kafkaBootstrap, gatewayURL string) {
+ kafkaClient := testutil.NewSaramaClient(t, kafkaBootstrap)
+ msgGen := testutil.NewMessageGenerator()
+
+ topicName := testutil.GenerateUniqueTopicName("cross-client-docker-test")
+
+ // Create topic on Kafka
+ err := kafkaClient.CreateTopic(topicName, 1, 1)
+ testutil.AssertNoError(t, err, "Failed to create topic on Kafka")
+
+ // Produce to Kafka
+ messages := msgGen.GenerateStringMessages(2)
+ err = kafkaClient.ProduceMessages(topicName, messages)
+ testutil.AssertNoError(t, err, "Failed to produce to Kafka")
+
+ // This tests the integration between Kafka and the Gateway
+ // In a real scenario, messages would be replicated or bridged
+ t.Logf("Cross-client compatibility test passed")
+}
diff --git a/test/kafka/integration/rebalancing_test.go b/test/kafka/integration/rebalancing_test.go
new file mode 100644
index 000000000..f5ddeed56
--- /dev/null
+++ b/test/kafka/integration/rebalancing_test.go
@@ -0,0 +1,453 @@
+package integration
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/IBM/sarama"
+ "github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
+)
+
+func testSingleConsumerAllPartitions(t *testing.T, addr, topicName, groupID string) {
+ config := sarama.NewConfig()
+ config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
+ config.Consumer.Offsets.Initial = sarama.OffsetOldest
+ config.Consumer.Return.Errors = true
+
+ client, err := sarama.NewClient([]string{addr}, config)
+ testutil.AssertNoError(t, err, "Failed to create client")
+ defer client.Close()
+
+ consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client)
+ testutil.AssertNoError(t, err, "Failed to create consumer group")
+ defer consumerGroup.Close()
+
+ handler := &RebalanceTestHandler{
+ messages: make(chan *sarama.ConsumerMessage, 20),
+ ready: make(chan bool),
+ assignments: make(chan []int32, 5),
+ t: t,
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ // Start consumer
+ go func() {
+ err := consumerGroup.Consume(ctx, []string{topicName}, handler)
+ if err != nil && err != context.DeadlineExceeded {
+ t.Logf("Consumer error: %v", err)
+ }
+ }()
+
+ // Wait for consumer to be ready
+ <-handler.ready
+
+ // Wait for assignment
+ select {
+ case partitions := <-handler.assignments:
+ t.Logf("Single consumer assigned partitions: %v", partitions)
+ if len(partitions) != 4 {
+ t.Errorf("Expected single consumer to get all 4 partitions, got %d", len(partitions))
+ }
+ case <-time.After(10 * time.Second):
+ t.Fatal("Timeout waiting for partition assignment")
+ }
+
+ // Consume some messages to verify functionality
+ consumedCount := 0
+ for consumedCount < 4 { // At least one from each partition
+ select {
+ case msg := <-handler.messages:
+ t.Logf("Consumed message from partition %d: %s", msg.Partition, string(msg.Value))
+ consumedCount++
+ case <-time.After(5 * time.Second):
+ t.Logf("Consumed %d messages so far", consumedCount)
+ break
+ }
+ }
+
+ if consumedCount == 0 {
+ t.Error("No messages consumed by single consumer")
+ }
+}
+
+func testTwoConsumersRebalance(t *testing.T, addr, topicName, groupID string) {
+ config := sarama.NewConfig()
+ config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
+ config.Consumer.Offsets.Initial = sarama.OffsetOldest
+ config.Consumer.Return.Errors = true
+
+ // Start first consumer
+ client1, err := sarama.NewClient([]string{addr}, config)
+ testutil.AssertNoError(t, err, "Failed to create client1")
+ defer client1.Close()
+
+ consumerGroup1, err := sarama.NewConsumerGroupFromClient(groupID, client1)
+ testutil.AssertNoError(t, err, "Failed to create consumer group 1")
+ defer consumerGroup1.Close()
+
+ handler1 := &RebalanceTestHandler{
+ messages: make(chan *sarama.ConsumerMessage, 20),
+ ready: make(chan bool),
+ assignments: make(chan []int32, 5),
+ t: t,
+ name: "Consumer1",
+ }
+
+ ctx1, cancel1 := context.WithTimeout(context.Background(), 45*time.Second)
+ defer cancel1()
+
+ go func() {
+ err := consumerGroup1.Consume(ctx1, []string{topicName}, handler1)
+ if err != nil && err != context.DeadlineExceeded {
+ t.Logf("Consumer1 error: %v", err)
+ }
+ }()
+
+ // Wait for first consumer to be ready and get initial assignment
+ <-handler1.ready
+ select {
+ case partitions := <-handler1.assignments:
+ t.Logf("Consumer1 initial assignment: %v", partitions)
+ if len(partitions) != 4 {
+ t.Errorf("Expected Consumer1 to initially get all 4 partitions, got %d", len(partitions))
+ }
+ case <-time.After(10 * time.Second):
+ t.Fatal("Timeout waiting for Consumer1 initial assignment")
+ }
+
+ // Start second consumer
+ client2, err := sarama.NewClient([]string{addr}, config)
+ testutil.AssertNoError(t, err, "Failed to create client2")
+ defer client2.Close()
+
+ consumerGroup2, err := sarama.NewConsumerGroupFromClient(groupID, client2)
+ testutil.AssertNoError(t, err, "Failed to create consumer group 2")
+ defer consumerGroup2.Close()
+
+ handler2 := &RebalanceTestHandler{
+ messages: make(chan *sarama.ConsumerMessage, 20),
+ ready: make(chan bool),
+ assignments: make(chan []int32, 5),
+ t: t,
+ name: "Consumer2",
+ }
+
+ ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel2()
+
+ go func() {
+ err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2)
+ if err != nil && err != context.DeadlineExceeded {
+ t.Logf("Consumer2 error: %v", err)
+ }
+ }()
+
+ // Wait for second consumer to be ready
+ <-handler2.ready
+
+ // Wait for rebalancing to occur - both consumers should get new assignments
+ var rebalancedAssignment1, rebalancedAssignment2 []int32
+
+ // Consumer1 should get a rebalance assignment
+ select {
+ case partitions := <-handler1.assignments:
+ rebalancedAssignment1 = partitions
+ t.Logf("Consumer1 rebalanced assignment: %v", partitions)
+ case <-time.After(15 * time.Second):
+ t.Error("Timeout waiting for Consumer1 rebalance assignment")
+ }
+
+ // Consumer2 should get its assignment
+ select {
+ case partitions := <-handler2.assignments:
+ rebalancedAssignment2 = partitions
+ t.Logf("Consumer2 assignment: %v", partitions)
+ case <-time.After(15 * time.Second):
+ t.Error("Timeout waiting for Consumer2 assignment")
+ }
+
+ // Verify rebalancing occurred correctly
+ totalPartitions := len(rebalancedAssignment1) + len(rebalancedAssignment2)
+ if totalPartitions != 4 {
+ t.Errorf("Expected total of 4 partitions assigned, got %d", totalPartitions)
+ }
+
+ // Each consumer should have at least 1 partition, and no more than 3
+ if len(rebalancedAssignment1) == 0 || len(rebalancedAssignment1) > 3 {
+ t.Errorf("Consumer1 should have 1-3 partitions, got %d", len(rebalancedAssignment1))
+ }
+ if len(rebalancedAssignment2) == 0 || len(rebalancedAssignment2) > 3 {
+ t.Errorf("Consumer2 should have 1-3 partitions, got %d", len(rebalancedAssignment2))
+ }
+
+ // Verify no partition overlap
+ partitionSet := make(map[int32]bool)
+ for _, p := range rebalancedAssignment1 {
+ if partitionSet[p] {
+ t.Errorf("Partition %d assigned to multiple consumers", p)
+ }
+ partitionSet[p] = true
+ }
+ for _, p := range rebalancedAssignment2 {
+ if partitionSet[p] {
+ t.Errorf("Partition %d assigned to multiple consumers", p)
+ }
+ partitionSet[p] = true
+ }
+
+ t.Logf("Rebalancing test completed successfully")
+}
+
+func testConsumerLeaveRebalance(t *testing.T, addr, topicName, groupID string) {
+ config := sarama.NewConfig()
+ config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
+ config.Consumer.Offsets.Initial = sarama.OffsetOldest
+ config.Consumer.Return.Errors = true
+
+ // Start two consumers
+ client1, err := sarama.NewClient([]string{addr}, config)
+ testutil.AssertNoError(t, err, "Failed to create client1")
+ defer client1.Close()
+
+ client2, err := sarama.NewClient([]string{addr}, config)
+ testutil.AssertNoError(t, err, "Failed to create client2")
+ defer client2.Close()
+
+ consumerGroup1, err := sarama.NewConsumerGroupFromClient(groupID, client1)
+ testutil.AssertNoError(t, err, "Failed to create consumer group 1")
+ defer consumerGroup1.Close()
+
+ consumerGroup2, err := sarama.NewConsumerGroupFromClient(groupID, client2)
+ testutil.AssertNoError(t, err, "Failed to create consumer group 2")
+
+ handler1 := &RebalanceTestHandler{
+ messages: make(chan *sarama.ConsumerMessage, 20),
+ ready: make(chan bool),
+ assignments: make(chan []int32, 5),
+ t: t,
+ name: "Consumer1",
+ }
+
+ handler2 := &RebalanceTestHandler{
+ messages: make(chan *sarama.ConsumerMessage, 20),
+ ready: make(chan bool),
+ assignments: make(chan []int32, 5),
+ t: t,
+ name: "Consumer2",
+ }
+
+ ctx1, cancel1 := context.WithTimeout(context.Background(), 60*time.Second)
+ defer cancel1()
+
+ ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second)
+
+ // Start both consumers
+ go func() {
+ err := consumerGroup1.Consume(ctx1, []string{topicName}, handler1)
+ if err != nil && err != context.DeadlineExceeded {
+ t.Logf("Consumer1 error: %v", err)
+ }
+ }()
+
+ go func() {
+ err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2)
+ if err != nil && err != context.DeadlineExceeded {
+ t.Logf("Consumer2 error: %v", err)
+ }
+ }()
+
+ // Wait for both consumers to be ready
+ <-handler1.ready
+ <-handler2.ready
+
+ // Wait for initial assignments
+ <-handler1.assignments
+ <-handler2.assignments
+
+ t.Logf("Both consumers started, now stopping Consumer2")
+
+ // Stop second consumer (simulate leave)
+ cancel2()
+ consumerGroup2.Close()
+
+ // Wait for Consumer1 to get rebalanced assignment (should get all partitions)
+ select {
+ case partitions := <-handler1.assignments:
+ t.Logf("Consumer1 rebalanced assignment after Consumer2 left: %v", partitions)
+ if len(partitions) != 4 {
+ t.Errorf("Expected Consumer1 to get all 4 partitions after Consumer2 left, got %d", len(partitions))
+ }
+ case <-time.After(20 * time.Second):
+ t.Error("Timeout waiting for Consumer1 rebalance after Consumer2 left")
+ }
+
+ t.Logf("Consumer leave rebalancing test completed successfully")
+}
+
+func testMultipleConsumersJoin(t *testing.T, addr, topicName, groupID string) {
+ config := sarama.NewConfig()
+ config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
+ config.Consumer.Offsets.Initial = sarama.OffsetOldest
+ config.Consumer.Return.Errors = true
+
+ numConsumers := 4
+ consumers := make([]sarama.ConsumerGroup, numConsumers)
+ clients := make([]sarama.Client, numConsumers)
+ handlers := make([]*RebalanceTestHandler, numConsumers)
+ contexts := make([]context.Context, numConsumers)
+ cancels := make([]context.CancelFunc, numConsumers)
+
+ // Start all consumers simultaneously
+ for i := 0; i < numConsumers; i++ {
+ client, err := sarama.NewClient([]string{addr}, config)
+ testutil.AssertNoError(t, err, fmt.Sprintf("Failed to create client%d", i))
+ clients[i] = client
+
+ consumerGroup, err := sarama.NewConsumerGroupFromClient(groupID, client)
+ testutil.AssertNoError(t, err, fmt.Sprintf("Failed to create consumer group %d", i))
+ consumers[i] = consumerGroup
+
+ handlers[i] = &RebalanceTestHandler{
+ messages: make(chan *sarama.ConsumerMessage, 20),
+ ready: make(chan bool),
+ assignments: make(chan []int32, 5),
+ t: t,
+ name: fmt.Sprintf("Consumer%d", i),
+ }
+
+ contexts[i], cancels[i] = context.WithTimeout(context.Background(), 45*time.Second)
+
+ go func(idx int) {
+ err := consumers[idx].Consume(contexts[idx], []string{topicName}, handlers[idx])
+ if err != nil && err != context.DeadlineExceeded {
+ t.Logf("Consumer%d error: %v", idx, err)
+ }
+ }(i)
+ }
+
+ // Cleanup
+ defer func() {
+ for i := 0; i < numConsumers; i++ {
+ cancels[i]()
+ consumers[i].Close()
+ clients[i].Close()
+ }
+ }()
+
+ // Wait for all consumers to be ready
+ for i := 0; i < numConsumers; i++ {
+ select {
+ case <-handlers[i].ready:
+ t.Logf("Consumer%d ready", i)
+ case <-time.After(15 * time.Second):
+ t.Fatalf("Timeout waiting for Consumer%d to be ready", i)
+ }
+ }
+
+ // Collect final assignments from all consumers
+ assignments := make([][]int32, numConsumers)
+ for i := 0; i < numConsumers; i++ {
+ select {
+ case partitions := <-handlers[i].assignments:
+ assignments[i] = partitions
+ t.Logf("Consumer%d final assignment: %v", i, partitions)
+ case <-time.After(20 * time.Second):
+ t.Errorf("Timeout waiting for Consumer%d assignment", i)
+ }
+ }
+
+ // Verify all partitions are assigned exactly once
+ assignedPartitions := make(map[int32]int)
+ totalAssigned := 0
+ for i, assignment := range assignments {
+ totalAssigned += len(assignment)
+ for _, partition := range assignment {
+ assignedPartitions[partition]++
+ if assignedPartitions[partition] > 1 {
+ t.Errorf("Partition %d assigned to multiple consumers", partition)
+ }
+ }
+
+ // Each consumer should get exactly 1 partition (4 partitions / 4 consumers)
+ if len(assignment) != 1 {
+ t.Errorf("Consumer%d should get exactly 1 partition, got %d", i, len(assignment))
+ }
+ }
+
+ if totalAssigned != 4 {
+ t.Errorf("Expected 4 total partitions assigned, got %d", totalAssigned)
+ }
+
+ // Verify all partitions 0-3 are assigned
+ for i := int32(0); i < 4; i++ {
+ if assignedPartitions[i] != 1 {
+ t.Errorf("Partition %d assigned %d times, expected 1", i, assignedPartitions[i])
+ }
+ }
+
+ t.Logf("Multiple consumers join test completed successfully")
+}
+
+// RebalanceTestHandler implements sarama.ConsumerGroupHandler with rebalancing awareness
+type RebalanceTestHandler struct {
+ messages chan *sarama.ConsumerMessage
+ ready chan bool
+ assignments chan []int32
+ readyOnce sync.Once
+ t *testing.T
+ name string
+}
+
+func (h *RebalanceTestHandler) Setup(session sarama.ConsumerGroupSession) error {
+ h.t.Logf("%s: Consumer group session setup", h.name)
+ h.readyOnce.Do(func() {
+ close(h.ready)
+ })
+
+ // Send partition assignment
+ partitions := make([]int32, 0)
+ for topic, partitionList := range session.Claims() {
+ h.t.Logf("%s: Assigned topic %s with partitions %v", h.name, topic, partitionList)
+ for _, partition := range partitionList {
+ partitions = append(partitions, partition)
+ }
+ }
+
+ select {
+ case h.assignments <- partitions:
+ default:
+ // Channel might be full, that's ok
+ }
+
+ return nil
+}
+
+func (h *RebalanceTestHandler) Cleanup(sarama.ConsumerGroupSession) error {
+ h.t.Logf("%s: Consumer group session cleanup", h.name)
+ return nil
+}
+
+func (h *RebalanceTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ for {
+ select {
+ case message := <-claim.Messages():
+ if message == nil {
+ return nil
+ }
+ h.t.Logf("%s: Received message from partition %d: %s", h.name, message.Partition, string(message.Value))
+ select {
+ case h.messages <- message:
+ default:
+ // Channel full, drop message for test
+ }
+ session.MarkMessage(message, "")
+ case <-session.Context().Done():
+ return nil
+ }
+ }
+}
diff --git a/test/kafka/integration/schema_end_to_end_test.go b/test/kafka/integration/schema_end_to_end_test.go
new file mode 100644
index 000000000..414056dd0
--- /dev/null
+++ b/test/kafka/integration/schema_end_to_end_test.go
@@ -0,0 +1,299 @@
+package integration
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+
+ "github.com/linkedin/goavro/v2"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
+)
+
+// TestSchemaEndToEnd_AvroRoundTrip tests the complete Avro schema round-trip workflow
+func TestSchemaEndToEnd_AvroRoundTrip(t *testing.T) {
+ // Create mock schema registry
+ server := createMockSchemaRegistryForE2E(t)
+ defer server.Close()
+
+ // Create schema manager
+ config := schema.ManagerConfig{
+ RegistryURL: server.URL,
+ ValidationMode: schema.ValidationPermissive,
+ }
+ manager, err := schema.NewManager(config)
+ require.NoError(t, err)
+
+ // Test data
+ avroSchema := getUserAvroSchemaForE2E()
+ testData := map[string]interface{}{
+ "id": int32(12345),
+ "name": "Alice Johnson",
+ "email": map[string]interface{}{"string": "alice@example.com"}, // Avro union
+ "age": map[string]interface{}{"int": int32(28)}, // Avro union
+ "preferences": map[string]interface{}{
+ "Preferences": map[string]interface{}{ // Avro union with record type
+ "notifications": true,
+ "theme": "dark",
+ },
+ },
+ }
+
+ t.Run("SchemaManagerRoundTrip", func(t *testing.T) {
+ // Step 1: Create Confluent envelope (simulate producer)
+ codec, err := goavro.NewCodec(avroSchema)
+ require.NoError(t, err)
+
+ avroBinary, err := codec.BinaryFromNative(nil, testData)
+ require.NoError(t, err)
+
+ confluentMsg := schema.CreateConfluentEnvelope(schema.FormatAvro, 1, nil, avroBinary)
+ require.True(t, len(confluentMsg) > 0, "Confluent envelope should not be empty")
+
+ t.Logf("Created Confluent envelope: %d bytes", len(confluentMsg))
+
+ // Step 2: Decode message using schema manager
+ decodedMsg, err := manager.DecodeMessage(confluentMsg)
+ require.NoError(t, err)
+ require.NotNil(t, decodedMsg.RecordValue, "RecordValue should not be nil")
+
+ t.Logf("Decoded message with schema ID %d, format %v", decodedMsg.SchemaID, decodedMsg.SchemaFormat)
+
+ // Step 3: Re-encode message using schema manager
+ reconstructedMsg, err := manager.EncodeMessage(decodedMsg.RecordValue, 1, schema.FormatAvro)
+ require.NoError(t, err)
+ require.True(t, len(reconstructedMsg) > 0, "Reconstructed message should not be empty")
+
+ t.Logf("Re-encoded message: %d bytes", len(reconstructedMsg))
+
+ // Step 4: Verify the reconstructed message is a valid Confluent envelope
+ envelope, ok := schema.ParseConfluentEnvelope(reconstructedMsg)
+ require.True(t, ok, "Reconstructed message should be a valid Confluent envelope")
+ require.Equal(t, uint32(1), envelope.SchemaID, "Schema ID should match")
+ require.Equal(t, schema.FormatAvro, envelope.Format, "Schema format should be Avro")
+
+ // Step 5: Decode and verify the content
+ decodedNative, _, err := codec.NativeFromBinary(envelope.Payload)
+ require.NoError(t, err)
+
+ decodedMap, ok := decodedNative.(map[string]interface{})
+ require.True(t, ok, "Decoded data should be a map")
+
+ // Verify all fields
+ assert.Equal(t, int32(12345), decodedMap["id"])
+ assert.Equal(t, "Alice Johnson", decodedMap["name"])
+
+ // Verify union fields
+ emailUnion, ok := decodedMap["email"].(map[string]interface{})
+ require.True(t, ok, "Email should be a union")
+ assert.Equal(t, "alice@example.com", emailUnion["string"])
+
+ ageUnion, ok := decodedMap["age"].(map[string]interface{})
+ require.True(t, ok, "Age should be a union")
+ assert.Equal(t, int32(28), ageUnion["int"])
+
+ preferencesUnion, ok := decodedMap["preferences"].(map[string]interface{})
+ require.True(t, ok, "Preferences should be a union")
+ preferencesRecord, ok := preferencesUnion["Preferences"].(map[string]interface{})
+ require.True(t, ok, "Preferences should contain a record")
+ assert.Equal(t, true, preferencesRecord["notifications"])
+ assert.Equal(t, "dark", preferencesRecord["theme"])
+
+ t.Log("Successfully completed Avro schema round-trip test")
+ })
+}
+
+// TestSchemaEndToEnd_ProtobufRoundTrip tests the complete Protobuf schema round-trip workflow
+func TestSchemaEndToEnd_ProtobufRoundTrip(t *testing.T) {
+ t.Run("ProtobufEnvelopeCreation", func(t *testing.T) {
+ // Create a simple Protobuf message (simulated)
+ // In a real scenario, this would be generated from a .proto file
+ protobufData := []byte{0x08, 0x96, 0x01, 0x12, 0x04, 0x74, 0x65, 0x73, 0x74} // id=150, name="test"
+
+ // Create Confluent envelope with Protobuf format
+ confluentMsg := schema.CreateConfluentEnvelope(schema.FormatProtobuf, 2, []int{0}, protobufData)
+ require.True(t, len(confluentMsg) > 0, "Confluent envelope should not be empty")
+
+ t.Logf("Created Protobuf Confluent envelope: %d bytes", len(confluentMsg))
+
+ // Verify Confluent envelope
+ envelope, ok := schema.ParseConfluentEnvelope(confluentMsg)
+ require.True(t, ok, "Message should be a valid Confluent envelope")
+ require.Equal(t, uint32(2), envelope.SchemaID, "Schema ID should match")
+ // Note: ParseConfluentEnvelope defaults to FormatAvro; format detection requires schema registry
+ require.Equal(t, schema.FormatAvro, envelope.Format, "Format defaults to Avro without schema registry lookup")
+
+ // For Protobuf with indexes, we need to use the specialized parser
+ protobufEnvelope, ok := schema.ParseConfluentProtobufEnvelopeWithIndexCount(confluentMsg, 1)
+ require.True(t, ok, "Message should be a valid Protobuf envelope")
+ require.Equal(t, uint32(2), protobufEnvelope.SchemaID, "Schema ID should match")
+ require.Equal(t, schema.FormatProtobuf, protobufEnvelope.Format, "Schema format should be Protobuf")
+ require.Equal(t, []int{0}, protobufEnvelope.Indexes, "Indexes should match")
+ require.Equal(t, protobufData, protobufEnvelope.Payload, "Payload should match")
+
+ t.Log("Successfully completed Protobuf envelope test")
+ })
+}
+
+// TestSchemaEndToEnd_JSONSchemaRoundTrip tests the complete JSON Schema round-trip workflow
+func TestSchemaEndToEnd_JSONSchemaRoundTrip(t *testing.T) {
+ t.Run("JSONSchemaEnvelopeCreation", func(t *testing.T) {
+ // Create JSON data
+ jsonData := []byte(`{"id": 123, "name": "Bob Smith", "active": true}`)
+
+ // Create Confluent envelope with JSON Schema format
+ confluentMsg := schema.CreateConfluentEnvelope(schema.FormatJSONSchema, 3, nil, jsonData)
+ require.True(t, len(confluentMsg) > 0, "Confluent envelope should not be empty")
+
+ t.Logf("Created JSON Schema Confluent envelope: %d bytes", len(confluentMsg))
+
+ // Verify Confluent envelope
+ envelope, ok := schema.ParseConfluentEnvelope(confluentMsg)
+ require.True(t, ok, "Message should be a valid Confluent envelope")
+ require.Equal(t, uint32(3), envelope.SchemaID, "Schema ID should match")
+ // Note: ParseConfluentEnvelope defaults to FormatAvro; format detection requires schema registry
+ require.Equal(t, schema.FormatAvro, envelope.Format, "Format defaults to Avro without schema registry lookup")
+
+ // Verify JSON content
+ assert.JSONEq(t, string(jsonData), string(envelope.Payload), "JSON payload should match")
+
+ t.Log("Successfully completed JSON Schema envelope test")
+ })
+}
+
+// TestSchemaEndToEnd_CompressionAndBatching tests schema handling with compression and batching
+func TestSchemaEndToEnd_CompressionAndBatching(t *testing.T) {
+ // Create mock schema registry
+ server := createMockSchemaRegistryForE2E(t)
+ defer server.Close()
+
+ // Create schema manager
+ config := schema.ManagerConfig{
+ RegistryURL: server.URL,
+ ValidationMode: schema.ValidationPermissive,
+ }
+ manager, err := schema.NewManager(config)
+ require.NoError(t, err)
+
+ t.Run("BatchedSchematizedMessages", func(t *testing.T) {
+ // Create multiple messages
+ avroSchema := getUserAvroSchemaForE2E()
+ codec, err := goavro.NewCodec(avroSchema)
+ require.NoError(t, err)
+
+ messageCount := 5
+ var confluentMessages [][]byte
+
+ // Create multiple Confluent envelopes
+ for i := 0; i < messageCount; i++ {
+ testData := map[string]interface{}{
+ "id": int32(1000 + i),
+ "name": fmt.Sprintf("User %d", i),
+ "email": map[string]interface{}{"string": fmt.Sprintf("user%d@example.com", i)},
+ "age": map[string]interface{}{"int": int32(20 + i)},
+ "preferences": map[string]interface{}{
+ "Preferences": map[string]interface{}{
+ "notifications": i%2 == 0, // Alternate true/false
+ "theme": "light",
+ },
+ },
+ }
+
+ avroBinary, err := codec.BinaryFromNative(nil, testData)
+ require.NoError(t, err)
+
+ confluentMsg := schema.CreateConfluentEnvelope(schema.FormatAvro, 1, nil, avroBinary)
+ confluentMessages = append(confluentMessages, confluentMsg)
+ }
+
+ t.Logf("Created %d schematized messages", messageCount)
+
+ // Test round-trip for each message
+ for i, confluentMsg := range confluentMessages {
+ // Decode message
+ decodedMsg, err := manager.DecodeMessage(confluentMsg)
+ require.NoError(t, err, "Message %d should decode", i)
+
+ // Re-encode message
+ reconstructedMsg, err := manager.EncodeMessage(decodedMsg.RecordValue, 1, schema.FormatAvro)
+ require.NoError(t, err, "Message %d should re-encode", i)
+
+ // Verify envelope
+ envelope, ok := schema.ParseConfluentEnvelope(reconstructedMsg)
+ require.True(t, ok, "Message %d should be a valid Confluent envelope", i)
+ require.Equal(t, uint32(1), envelope.SchemaID, "Message %d schema ID should match", i)
+
+ // Decode and verify content
+ decodedNative, _, err := codec.NativeFromBinary(envelope.Payload)
+ require.NoError(t, err, "Message %d should decode successfully", i)
+
+ decodedMap, ok := decodedNative.(map[string]interface{})
+ require.True(t, ok, "Message %d should be a map", i)
+
+ expectedID := int32(1000 + i)
+ assert.Equal(t, expectedID, decodedMap["id"], "Message %d ID should match", i)
+ assert.Equal(t, fmt.Sprintf("User %d", i), decodedMap["name"], "Message %d name should match", i)
+ }
+
+ t.Log("Successfully verified batched schematized messages")
+ })
+}
+
+// Helper functions for creating mock schema registries
+
+func createMockSchemaRegistryForE2E(t *testing.T) *httptest.Server {
+ return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ switch r.URL.Path {
+ case "/schemas/ids/1":
+ response := map[string]interface{}{
+ "schema": getUserAvroSchemaForE2E(),
+ "subject": "user-events-e2e-value",
+ "version": 1,
+ }
+ writeJSONResponse(w, response)
+ case "/subjects/user-events-e2e-value/versions/latest":
+ response := map[string]interface{}{
+ "id": 1,
+ "schema": getUserAvroSchemaForE2E(),
+ "subject": "user-events-e2e-value",
+ "version": 1,
+ }
+ writeJSONResponse(w, response)
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }))
+}
+
+
+func getUserAvroSchemaForE2E() string {
+ return `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"},
+ {"name": "email", "type": ["null", "string"], "default": null},
+ {"name": "age", "type": ["null", "int"], "default": null},
+ {"name": "preferences", "type": ["null", {
+ "type": "record",
+ "name": "Preferences",
+ "fields": [
+ {"name": "notifications", "type": "boolean", "default": true},
+ {"name": "theme", "type": "string", "default": "light"}
+ ]
+ }], "default": null}
+ ]
+ }`
+}
+
+func writeJSONResponse(w http.ResponseWriter, data interface{}) {
+ w.Header().Set("Content-Type", "application/json")
+ if err := json.NewEncoder(w).Encode(data); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+}
diff --git a/test/kafka/integration/schema_registry_test.go b/test/kafka/integration/schema_registry_test.go
new file mode 100644
index 000000000..9f6d32849
--- /dev/null
+++ b/test/kafka/integration/schema_registry_test.go
@@ -0,0 +1,210 @@
+package integration
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
+)
+
+// TestSchemaRegistryEventualConsistency reproduces the issue where schemas
+// are registered successfully but are not immediately queryable due to
+// Schema Registry's consumer lag
+func TestSchemaRegistryEventualConsistency(t *testing.T) {
+ // This test requires real SMQ backend
+ gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQRequired)
+ defer gateway.CleanupAndClose()
+
+ addr := gateway.StartAndWait()
+ t.Logf("Gateway running on %s", addr)
+
+ // Schema Registry URL from environment or default
+ schemaRegistryURL := "http://localhost:8081"
+
+ // Wait for Schema Registry to be ready
+ if !waitForSchemaRegistry(t, schemaRegistryURL, 30*time.Second) {
+ t.Fatal("Schema Registry not ready")
+ }
+
+ // Define test schemas
+ valueSchema := `{"type":"record","name":"TestMessage","fields":[{"name":"id","type":"string"}]}`
+ keySchema := `{"type":"string"}`
+
+ // Register multiple schemas rapidly (simulates the load test scenario)
+ subjects := []string{
+ "test-topic-0-value",
+ "test-topic-0-key",
+ "test-topic-1-value",
+ "test-topic-1-key",
+ "test-topic-2-value",
+ "test-topic-2-key",
+ "test-topic-3-value",
+ "test-topic-3-key",
+ }
+
+ t.Log("Registering schemas rapidly...")
+ registeredIDs := make(map[string]int)
+ for _, subject := range subjects {
+ schema := valueSchema
+ if strings.HasSuffix(subject, "-key") {
+ schema = keySchema
+ }
+
+ id, err := registerSchema(schemaRegistryURL, subject, schema)
+ if err != nil {
+ t.Fatalf("Failed to register schema for %s: %v", subject, err)
+ }
+ registeredIDs[subject] = id
+ t.Logf("Registered %s with ID %d", subject, id)
+ }
+
+ t.Log("All schemas registered successfully!")
+
+ // Now immediately try to verify them (this reproduces the bug)
+ t.Log("Immediately verifying schemas (without delay)...")
+ immediateFailures := 0
+ for _, subject := range subjects {
+ exists, id, version, err := verifySchema(schemaRegistryURL, subject)
+ if err != nil || !exists {
+ immediateFailures++
+ t.Logf("Immediate verification failed for %s: exists=%v id=%d err=%v", subject, exists, id, err)
+ } else {
+ t.Logf("Immediate verification passed for %s: ID=%d Version=%d", subject, id, version)
+ }
+ }
+
+ if immediateFailures > 0 {
+ t.Logf("BUG REPRODUCED: %d/%d schemas not immediately queryable after registration",
+ immediateFailures, len(subjects))
+ t.Logf(" This is due to Schema Registry's KafkaStoreReaderThread lag")
+ }
+
+ // Now verify with retry logic (this should succeed)
+ t.Log("Verifying schemas with retry logic...")
+ for _, subject := range subjects {
+ expectedID := registeredIDs[subject]
+ if !verifySchemaWithRetry(t, schemaRegistryURL, subject, expectedID, 5*time.Second) {
+ t.Errorf("Failed to verify %s even with retry", subject)
+ }
+ }
+
+ t.Log("✓ All schemas verified successfully with retry logic!")
+}
+
+// registerSchema registers a schema and returns its ID
+func registerSchema(registryURL, subject, schema string) (int, error) {
+ // Escape the schema JSON
+ escapedSchema, err := json.Marshal(schema)
+ if err != nil {
+ return 0, err
+ }
+
+ payload := fmt.Sprintf(`{"schema":%s,"schemaType":"AVRO"}`, escapedSchema)
+
+ resp, err := http.Post(
+ fmt.Sprintf("%s/subjects/%s/versions", registryURL, subject),
+ "application/vnd.schemaregistry.v1+json",
+ strings.NewReader(payload),
+ )
+ if err != nil {
+ return 0, err
+ }
+ defer resp.Body.Close()
+
+ body, _ := io.ReadAll(resp.Body)
+
+ if resp.StatusCode != http.StatusOK {
+ return 0, fmt.Errorf("registration failed: %s - %s", resp.Status, string(body))
+ }
+
+ var result struct {
+ ID int `json:"id"`
+ }
+ if err := json.Unmarshal(body, &result); err != nil {
+ return 0, err
+ }
+
+ return result.ID, nil
+}
+
+// verifySchema checks if a schema exists
+func verifySchema(registryURL, subject string) (exists bool, id int, version int, err error) {
+ resp, err := http.Get(fmt.Sprintf("%s/subjects/%s/versions/latest", registryURL, subject))
+ if err != nil {
+ return false, 0, 0, err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode == http.StatusNotFound {
+ return false, 0, 0, nil
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ body, _ := io.ReadAll(resp.Body)
+ return false, 0, 0, fmt.Errorf("verification failed: %s - %s", resp.Status, string(body))
+ }
+
+ var result struct {
+ ID int `json:"id"`
+ Version int `json:"version"`
+ Schema string `json:"schema"`
+ }
+ body, _ := io.ReadAll(resp.Body)
+ if err := json.Unmarshal(body, &result); err != nil {
+ return false, 0, 0, err
+ }
+
+ return true, result.ID, result.Version, nil
+}
+
+// verifySchemaWithRetry verifies a schema with retry logic
+func verifySchemaWithRetry(t *testing.T, registryURL, subject string, expectedID int, timeout time.Duration) bool {
+ deadline := time.Now().Add(timeout)
+ attempt := 0
+
+ for time.Now().Before(deadline) {
+ attempt++
+ exists, id, version, err := verifySchema(registryURL, subject)
+
+ if err == nil && exists && id == expectedID {
+ if attempt > 1 {
+ t.Logf("✓ %s verified after %d attempts (ID=%d, Version=%d)", subject, attempt, id, version)
+ }
+ return true
+ }
+
+ // Wait before retry (exponential backoff)
+ waitTime := time.Duration(attempt*100) * time.Millisecond
+ if waitTime > 1*time.Second {
+ waitTime = 1 * time.Second
+ }
+ time.Sleep(waitTime)
+ }
+
+ t.Logf("%s verification timed out after %d attempts", subject, attempt)
+ return false
+}
+
+// waitForSchemaRegistry waits for Schema Registry to be ready
+func waitForSchemaRegistry(t *testing.T, url string, timeout time.Duration) bool {
+ deadline := time.Now().Add(timeout)
+
+ for time.Now().Before(deadline) {
+ resp, err := http.Get(url + "/subjects")
+ if err == nil && resp.StatusCode == http.StatusOK {
+ resp.Body.Close()
+ return true
+ }
+ if resp != nil {
+ resp.Body.Close()
+ }
+ time.Sleep(500 * time.Millisecond)
+ }
+
+ return false
+}
diff --git a/test/kafka/integration/smq_integration_test.go b/test/kafka/integration/smq_integration_test.go
new file mode 100644
index 000000000..f0c140178
--- /dev/null
+++ b/test/kafka/integration/smq_integration_test.go
@@ -0,0 +1,305 @@
+package integration
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/IBM/sarama"
+ "github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
+)
+
+// TestSMQIntegration tests that the Kafka gateway properly integrates with SeaweedMQ
+// This test REQUIRES SeaweedFS masters to be running and will skip if not available
+func TestSMQIntegration(t *testing.T) {
+ // This test requires SMQ to be available
+ gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQRequired)
+ defer gateway.CleanupAndClose()
+
+ addr := gateway.StartAndWait()
+
+ t.Logf("Running SMQ integration test with SeaweedFS backend")
+
+ t.Run("ProduceConsumeWithPersistence", func(t *testing.T) {
+ testProduceConsumeWithPersistence(t, addr)
+ })
+
+ t.Run("ConsumerGroupOffsetPersistence", func(t *testing.T) {
+ testConsumerGroupOffsetPersistence(t, addr)
+ })
+
+ t.Run("TopicPersistence", func(t *testing.T) {
+ testTopicPersistence(t, addr)
+ })
+}
+
+func testProduceConsumeWithPersistence(t *testing.T, addr string) {
+ topicName := testutil.GenerateUniqueTopicName("smq-integration-produce-consume")
+
+ client := testutil.NewSaramaClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Create topic
+ err := client.CreateTopic(topicName, 1, 1)
+ testutil.AssertNoError(t, err, "Failed to create topic")
+
+ // Allow time for topic to propagate in SMQ backend
+ time.Sleep(500 * time.Millisecond)
+
+ // Produce messages
+ messages := msgGen.GenerateStringMessages(5)
+ err = client.ProduceMessages(topicName, messages)
+ testutil.AssertNoError(t, err, "Failed to produce messages")
+
+ // Allow time for messages to be fully persisted in SMQ backend
+ time.Sleep(200 * time.Millisecond)
+
+ t.Logf("Produced %d messages to topic %s", len(messages), topicName)
+
+ // Consume messages
+ consumed, err := client.ConsumeMessages(topicName, 0, len(messages))
+ testutil.AssertNoError(t, err, "Failed to consume messages")
+
+ // Verify all messages were consumed
+ testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch")
+
+ t.Logf("Successfully consumed %d messages from SMQ backend", len(consumed))
+}
+
+func testConsumerGroupOffsetPersistence(t *testing.T, addr string) {
+ topicName := testutil.GenerateUniqueTopicName("smq-integration-offset-persistence")
+ groupID := testutil.GenerateUniqueGroupID("smq-offset-group")
+
+ client := testutil.NewSaramaClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Create topic and produce messages
+ err := client.CreateTopic(topicName, 1, 1)
+ testutil.AssertNoError(t, err, "Failed to create topic")
+
+ // Allow time for topic to propagate in SMQ backend
+ time.Sleep(500 * time.Millisecond)
+
+ messages := msgGen.GenerateStringMessages(10)
+ err = client.ProduceMessages(topicName, messages)
+ testutil.AssertNoError(t, err, "Failed to produce messages")
+
+ // Allow time for messages to be fully persisted in SMQ backend
+ time.Sleep(200 * time.Millisecond)
+
+ // Phase 1: Consume first 5 messages with consumer group and commit offsets
+ t.Logf("Phase 1: Consuming first 5 messages and committing offsets")
+
+ config := client.GetConfig()
+ config.Consumer.Offsets.Initial = sarama.OffsetOldest
+ // Enable auto-commit for more reliable offset handling
+ config.Consumer.Offsets.AutoCommit.Enable = true
+ config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
+
+ consumerGroup1, err := sarama.NewConsumerGroup([]string{addr}, groupID, config)
+ testutil.AssertNoError(t, err, "Failed to create first consumer group")
+
+ handler := &SMQOffsetTestHandler{
+ messages: make(chan *sarama.ConsumerMessage, len(messages)),
+ ready: make(chan bool),
+ stopAfter: 5,
+ t: t,
+ }
+
+ ctx1, cancel1 := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel1()
+
+ consumeErrChan1 := make(chan error, 1)
+ go func() {
+ err := consumerGroup1.Consume(ctx1, []string{topicName}, handler)
+ if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
+ t.Logf("First consumer error: %v", err)
+ consumeErrChan1 <- err
+ }
+ }()
+
+ // Wait for consumer to be ready with timeout
+ select {
+ case <-handler.ready:
+ // Consumer is ready, continue
+ case err := <-consumeErrChan1:
+ t.Fatalf("First consumer failed to start: %v", err)
+ case <-time.After(10 * time.Second):
+ t.Fatalf("Timeout waiting for first consumer to be ready")
+ }
+ consumedCount := 0
+ for consumedCount < 5 {
+ select {
+ case <-handler.messages:
+ consumedCount++
+ case <-time.After(20 * time.Second):
+ t.Fatalf("Timeout waiting for first batch of messages. Got %d/5", consumedCount)
+ }
+ }
+
+ consumerGroup1.Close()
+ cancel1()
+ time.Sleep(7 * time.Second) // Allow auto-commit to complete and offset commits to be processed in SMQ
+
+ t.Logf("Consumed %d messages in first phase", consumedCount)
+
+ // Phase 2: Start new consumer group with same ID - should resume from committed offset
+ t.Logf("Phase 2: Starting new consumer group to test offset persistence")
+
+ // Create a fresh config for the second consumer group to avoid any state issues
+ config2 := client.GetConfig()
+ config2.Consumer.Offsets.Initial = sarama.OffsetOldest
+ config2.Consumer.Offsets.AutoCommit.Enable = true
+ config2.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
+
+ consumerGroup2, err := sarama.NewConsumerGroup([]string{addr}, groupID, config2)
+ testutil.AssertNoError(t, err, "Failed to create second consumer group")
+ defer consumerGroup2.Close()
+
+ handler2 := &SMQOffsetTestHandler{
+ messages: make(chan *sarama.ConsumerMessage, len(messages)),
+ ready: make(chan bool),
+ stopAfter: 5, // Should consume remaining 5 messages
+ t: t,
+ }
+
+ ctx2, cancel2 := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel2()
+
+ consumeErrChan := make(chan error, 1)
+ go func() {
+ err := consumerGroup2.Consume(ctx2, []string{topicName}, handler2)
+ if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
+ t.Logf("Second consumer error: %v", err)
+ consumeErrChan <- err
+ }
+ }()
+
+ // Wait for second consumer to be ready with timeout
+ select {
+ case <-handler2.ready:
+ // Consumer is ready, continue
+ case err := <-consumeErrChan:
+ t.Fatalf("Second consumer failed to start: %v", err)
+ case <-time.After(10 * time.Second):
+ t.Fatalf("Timeout waiting for second consumer to be ready")
+ }
+ secondConsumerMessages := make([]*sarama.ConsumerMessage, 0)
+ consumedCount = 0
+ for consumedCount < 5 {
+ select {
+ case msg := <-handler2.messages:
+ consumedCount++
+ secondConsumerMessages = append(secondConsumerMessages, msg)
+ case <-time.After(20 * time.Second):
+ t.Fatalf("Timeout waiting for second batch of messages. Got %d/5", consumedCount)
+ }
+ }
+
+ // Verify second consumer started from correct offset (should be >= 5)
+ if len(secondConsumerMessages) > 0 {
+ firstMessageOffset := secondConsumerMessages[0].Offset
+ if firstMessageOffset < 5 {
+ t.Fatalf("Second consumer should start from offset >= 5: got %d", firstMessageOffset)
+ }
+ t.Logf("Second consumer correctly resumed from offset %d", firstMessageOffset)
+ }
+
+ t.Logf("Successfully verified SMQ offset persistence")
+}
+
+func testTopicPersistence(t *testing.T, addr string) {
+ topicName := testutil.GenerateUniqueTopicName("smq-integration-topic-persistence")
+
+ client := testutil.NewSaramaClient(t, addr)
+
+ // Create topic
+ err := client.CreateTopic(topicName, 2, 1) // 2 partitions
+ testutil.AssertNoError(t, err, "Failed to create topic")
+
+ // Allow time for topic to propagate and persist in SMQ backend
+ time.Sleep(1 * time.Second)
+
+ // Verify topic exists by listing topics using admin client
+ config := client.GetConfig()
+ config.Admin.Timeout = 30 * time.Second
+
+ admin, err := sarama.NewClusterAdmin([]string{addr}, config)
+ testutil.AssertNoError(t, err, "Failed to create admin client")
+ defer admin.Close()
+
+ // Retry topic listing to handle potential delays in topic propagation
+ var topics map[string]sarama.TopicDetail
+ var listErr error
+ for attempt := 0; attempt < 3; attempt++ {
+ if attempt > 0 {
+ sleepDuration := time.Duration(500*(1<<(attempt-1))) * time.Millisecond
+ t.Logf("Retrying ListTopics after %v (attempt %d/3)", sleepDuration, attempt+1)
+ time.Sleep(sleepDuration)
+ }
+
+ topics, listErr = admin.ListTopics()
+ if listErr == nil {
+ break
+ }
+ }
+ testutil.AssertNoError(t, listErr, "Failed to list topics")
+
+ topicDetails, exists := topics[topicName]
+ if !exists {
+ t.Fatalf("Topic %s not found in topic list", topicName)
+ }
+
+ if topicDetails.NumPartitions != 2 {
+ t.Errorf("Expected 2 partitions, got %d", topicDetails.NumPartitions)
+ }
+
+ t.Logf("Successfully verified topic persistence with %d partitions", topicDetails.NumPartitions)
+}
+
+// SMQOffsetTestHandler implements sarama.ConsumerGroupHandler for SMQ offset testing
+type SMQOffsetTestHandler struct {
+ messages chan *sarama.ConsumerMessage
+ ready chan bool
+ readyOnce bool
+ stopAfter int
+ consumed int
+ t *testing.T
+}
+
+func (h *SMQOffsetTestHandler) Setup(sarama.ConsumerGroupSession) error {
+ h.t.Logf("SMQ offset test consumer setup")
+ if !h.readyOnce {
+ close(h.ready)
+ h.readyOnce = true
+ }
+ return nil
+}
+
+func (h *SMQOffsetTestHandler) Cleanup(sarama.ConsumerGroupSession) error {
+ h.t.Logf("SMQ offset test consumer cleanup")
+ return nil
+}
+
+func (h *SMQOffsetTestHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+ for {
+ select {
+ case message := <-claim.Messages():
+ if message == nil {
+ return nil
+ }
+ h.consumed++
+ h.messages <- message
+ session.MarkMessage(message, "")
+
+ // Stop after consuming the specified number of messages
+ if h.consumed >= h.stopAfter {
+ h.t.Logf("Stopping SMQ consumer after %d messages", h.consumed)
+ // Auto-commit will handle offset commits automatically
+ return nil
+ }
+ case <-session.Context().Done():
+ return nil
+ }
+ }
+}