aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/e2e/comprehensive_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/e2e/comprehensive_test.go')
-rw-r--r--test/kafka/e2e/comprehensive_test.go131
1 files changed, 131 insertions, 0 deletions
diff --git a/test/kafka/e2e/comprehensive_test.go b/test/kafka/e2e/comprehensive_test.go
new file mode 100644
index 000000000..739ccd3a3
--- /dev/null
+++ b/test/kafka/e2e/comprehensive_test.go
@@ -0,0 +1,131 @@
+package e2e
+
+import (
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
+)
+
+// TestComprehensiveE2E tests complete end-to-end workflows
+// This test will use SMQ backend if SEAWEEDFS_MASTERS is available, otherwise mock
+func TestComprehensiveE2E(t *testing.T) {
+ gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQAvailable)
+ defer gateway.CleanupAndClose()
+
+ addr := gateway.StartAndWait()
+
+ // Log which backend we're using
+ if gateway.IsSMQMode() {
+ t.Logf("Running comprehensive E2E tests with SMQ backend")
+ } else {
+ t.Logf("Running comprehensive E2E tests with mock backend")
+ }
+
+ // Create topics for different test scenarios
+ topics := []string{
+ testutil.GenerateUniqueTopicName("e2e-kafka-go"),
+ testutil.GenerateUniqueTopicName("e2e-sarama"),
+ testutil.GenerateUniqueTopicName("e2e-mixed"),
+ }
+ gateway.AddTestTopics(topics...)
+
+ t.Run("KafkaGo_to_KafkaGo", func(t *testing.T) {
+ testKafkaGoToKafkaGo(t, addr, topics[0])
+ })
+
+ t.Run("Sarama_to_Sarama", func(t *testing.T) {
+ testSaramaToSarama(t, addr, topics[1])
+ })
+
+ t.Run("KafkaGo_to_Sarama", func(t *testing.T) {
+ testKafkaGoToSarama(t, addr, topics[2])
+ })
+
+ t.Run("Sarama_to_KafkaGo", func(t *testing.T) {
+ testSaramaToKafkaGo(t, addr, topics[2])
+ })
+}
+
+func testKafkaGoToKafkaGo(t *testing.T, addr, topic string) {
+ client := testutil.NewKafkaGoClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Generate test messages
+ messages := msgGen.GenerateKafkaGoMessages(2)
+
+ // Produce with kafka-go
+ err := client.ProduceMessages(topic, messages)
+ testutil.AssertNoError(t, err, "kafka-go produce failed")
+
+ // Consume with kafka-go
+ consumed, err := client.ConsumeMessages(topic, len(messages))
+ testutil.AssertNoError(t, err, "kafka-go consume failed")
+
+ // Validate message content
+ err = testutil.ValidateKafkaGoMessageContent(messages, consumed)
+ testutil.AssertNoError(t, err, "Message content validation failed")
+
+ t.Logf("kafka-go to kafka-go test PASSED")
+}
+
+func testSaramaToSarama(t *testing.T, addr, topic string) {
+ client := testutil.NewSaramaClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Generate test messages
+ messages := msgGen.GenerateStringMessages(2)
+
+ // Produce with Sarama
+ err := client.ProduceMessages(topic, messages)
+ testutil.AssertNoError(t, err, "Sarama produce failed")
+
+ // Consume with Sarama
+ consumed, err := client.ConsumeMessages(topic, 0, len(messages))
+ testutil.AssertNoError(t, err, "Sarama consume failed")
+
+ // Validate message content
+ err = testutil.ValidateMessageContent(messages, consumed)
+ testutil.AssertNoError(t, err, "Message content validation failed")
+
+ t.Logf("Sarama to Sarama test PASSED")
+}
+
+func testKafkaGoToSarama(t *testing.T, addr, topic string) {
+ kafkaGoClient := testutil.NewKafkaGoClient(t, addr)
+ saramaClient := testutil.NewSaramaClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Produce with kafka-go
+ messages := msgGen.GenerateKafkaGoMessages(2)
+ err := kafkaGoClient.ProduceMessages(topic, messages)
+ testutil.AssertNoError(t, err, "kafka-go produce failed")
+
+ // Consume with Sarama
+ consumed, err := saramaClient.ConsumeMessages(topic, 0, len(messages))
+ testutil.AssertNoError(t, err, "Sarama consume failed")
+
+ // Validate that we got the expected number of messages
+ testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch")
+
+ t.Logf("kafka-go to Sarama test PASSED")
+}
+
+func testSaramaToKafkaGo(t *testing.T, addr, topic string) {
+ kafkaGoClient := testutil.NewKafkaGoClient(t, addr)
+ saramaClient := testutil.NewSaramaClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Produce with Sarama
+ messages := msgGen.GenerateStringMessages(2)
+ err := saramaClient.ProduceMessages(topic, messages)
+ testutil.AssertNoError(t, err, "Sarama produce failed")
+
+ // Consume with kafka-go
+ consumed, err := kafkaGoClient.ConsumeMessages(topic, len(messages))
+ testutil.AssertNoError(t, err, "kafka-go consume failed")
+
+ // Validate that we got the expected number of messages
+ testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch")
+
+ t.Logf("Sarama to kafka-go test PASSED")
+}