aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/e2e
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/e2e')
-rw-r--r--test/kafka/e2e/comprehensive_test.go131
-rw-r--r--test/kafka/e2e/offset_management_test.go101
2 files changed, 232 insertions, 0 deletions
diff --git a/test/kafka/e2e/comprehensive_test.go b/test/kafka/e2e/comprehensive_test.go
new file mode 100644
index 000000000..739ccd3a3
--- /dev/null
+++ b/test/kafka/e2e/comprehensive_test.go
@@ -0,0 +1,131 @@
+package e2e
+
+import (
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
+)
+
+// TestComprehensiveE2E tests complete end-to-end workflows
+// This test will use SMQ backend if SEAWEEDFS_MASTERS is available, otherwise mock
+func TestComprehensiveE2E(t *testing.T) {
+ gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQAvailable)
+ defer gateway.CleanupAndClose()
+
+ addr := gateway.StartAndWait()
+
+ // Log which backend we're using
+ if gateway.IsSMQMode() {
+ t.Logf("Running comprehensive E2E tests with SMQ backend")
+ } else {
+ t.Logf("Running comprehensive E2E tests with mock backend")
+ }
+
+ // Create topics for different test scenarios
+ topics := []string{
+ testutil.GenerateUniqueTopicName("e2e-kafka-go"),
+ testutil.GenerateUniqueTopicName("e2e-sarama"),
+ testutil.GenerateUniqueTopicName("e2e-mixed"),
+ }
+ gateway.AddTestTopics(topics...)
+
+ t.Run("KafkaGo_to_KafkaGo", func(t *testing.T) {
+ testKafkaGoToKafkaGo(t, addr, topics[0])
+ })
+
+ t.Run("Sarama_to_Sarama", func(t *testing.T) {
+ testSaramaToSarama(t, addr, topics[1])
+ })
+
+ t.Run("KafkaGo_to_Sarama", func(t *testing.T) {
+ testKafkaGoToSarama(t, addr, topics[2])
+ })
+
+ t.Run("Sarama_to_KafkaGo", func(t *testing.T) {
+ testSaramaToKafkaGo(t, addr, topics[2])
+ })
+}
+
+func testKafkaGoToKafkaGo(t *testing.T, addr, topic string) {
+ client := testutil.NewKafkaGoClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Generate test messages
+ messages := msgGen.GenerateKafkaGoMessages(2)
+
+ // Produce with kafka-go
+ err := client.ProduceMessages(topic, messages)
+ testutil.AssertNoError(t, err, "kafka-go produce failed")
+
+ // Consume with kafka-go
+ consumed, err := client.ConsumeMessages(topic, len(messages))
+ testutil.AssertNoError(t, err, "kafka-go consume failed")
+
+ // Validate message content
+ err = testutil.ValidateKafkaGoMessageContent(messages, consumed)
+ testutil.AssertNoError(t, err, "Message content validation failed")
+
+ t.Logf("kafka-go to kafka-go test PASSED")
+}
+
+func testSaramaToSarama(t *testing.T, addr, topic string) {
+ client := testutil.NewSaramaClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Generate test messages
+ messages := msgGen.GenerateStringMessages(2)
+
+ // Produce with Sarama
+ err := client.ProduceMessages(topic, messages)
+ testutil.AssertNoError(t, err, "Sarama produce failed")
+
+ // Consume with Sarama
+ consumed, err := client.ConsumeMessages(topic, 0, len(messages))
+ testutil.AssertNoError(t, err, "Sarama consume failed")
+
+ // Validate message content
+ err = testutil.ValidateMessageContent(messages, consumed)
+ testutil.AssertNoError(t, err, "Message content validation failed")
+
+ t.Logf("Sarama to Sarama test PASSED")
+}
+
+func testKafkaGoToSarama(t *testing.T, addr, topic string) {
+ kafkaGoClient := testutil.NewKafkaGoClient(t, addr)
+ saramaClient := testutil.NewSaramaClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Produce with kafka-go
+ messages := msgGen.GenerateKafkaGoMessages(2)
+ err := kafkaGoClient.ProduceMessages(topic, messages)
+ testutil.AssertNoError(t, err, "kafka-go produce failed")
+
+ // Consume with Sarama
+ consumed, err := saramaClient.ConsumeMessages(topic, 0, len(messages))
+ testutil.AssertNoError(t, err, "Sarama consume failed")
+
+ // Validate that we got the expected number of messages
+ testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch")
+
+ t.Logf("kafka-go to Sarama test PASSED")
+}
+
+func testSaramaToKafkaGo(t *testing.T, addr, topic string) {
+ kafkaGoClient := testutil.NewKafkaGoClient(t, addr)
+ saramaClient := testutil.NewSaramaClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Produce with Sarama
+ messages := msgGen.GenerateStringMessages(2)
+ err := saramaClient.ProduceMessages(topic, messages)
+ testutil.AssertNoError(t, err, "Sarama produce failed")
+
+ // Consume with kafka-go
+ consumed, err := kafkaGoClient.ConsumeMessages(topic, len(messages))
+ testutil.AssertNoError(t, err, "kafka-go consume failed")
+
+ // Validate that we got the expected number of messages
+ testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch")
+
+ t.Logf("Sarama to kafka-go test PASSED")
+}
diff --git a/test/kafka/e2e/offset_management_test.go b/test/kafka/e2e/offset_management_test.go
new file mode 100644
index 000000000..398647843
--- /dev/null
+++ b/test/kafka/e2e/offset_management_test.go
@@ -0,0 +1,101 @@
+package e2e
+
+import (
+ "os"
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
+)
+
+// TestOffsetManagement tests end-to-end offset management scenarios
+// This test will use SMQ backend if SEAWEEDFS_MASTERS is available, otherwise mock
+func TestOffsetManagement(t *testing.T) {
+ gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQAvailable)
+ defer gateway.CleanupAndClose()
+
+ addr := gateway.StartAndWait()
+
+ // If schema registry is configured, ensure gateway is in schema mode and log
+ if v := os.Getenv("SCHEMA_REGISTRY_URL"); v != "" {
+ t.Logf("Schema Registry detected at %s - running offset tests in schematized mode", v)
+ }
+
+ // Log which backend we're using
+ if gateway.IsSMQMode() {
+ t.Logf("Running offset management tests with SMQ backend - offsets will be persisted")
+ } else {
+ t.Logf("Running offset management tests with mock backend - offsets are in-memory only")
+ }
+
+ topic := testutil.GenerateUniqueTopicName("offset-management")
+ groupID := testutil.GenerateUniqueGroupID("offset-test-group")
+
+ gateway.AddTestTopic(topic)
+
+ t.Run("BasicOffsetCommitFetch", func(t *testing.T) {
+ testBasicOffsetCommitFetch(t, addr, topic, groupID)
+ })
+
+ t.Run("ConsumerGroupResumption", func(t *testing.T) {
+ testConsumerGroupResumption(t, addr, topic, groupID+"2")
+ })
+}
+
+func testBasicOffsetCommitFetch(t *testing.T, addr, topic, groupID string) {
+ client := testutil.NewKafkaGoClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Produce test messages
+ if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" {
+ if id, err := testutil.EnsureValueSchema(t, url, topic); err == nil {
+ t.Logf("Ensured value schema id=%d for subject %s-value", id, topic)
+ } else {
+ t.Logf("Schema registration failed (non-fatal for test): %v", err)
+ }
+ }
+ messages := msgGen.GenerateKafkaGoMessages(5)
+ err := client.ProduceMessages(topic, messages)
+ testutil.AssertNoError(t, err, "Failed to produce offset test messages")
+
+ // Phase 1: Consume first 3 messages and commit offsets
+ t.Logf("=== Phase 1: Consuming first 3 messages ===")
+ consumed1, err := client.ConsumeWithGroup(topic, groupID, 3)
+ testutil.AssertNoError(t, err, "Failed to consume first batch")
+ testutil.AssertEqual(t, 3, len(consumed1), "Should consume exactly 3 messages")
+
+ // Phase 2: Create new consumer with same group ID - should resume from committed offset
+ t.Logf("=== Phase 2: Resuming from committed offset ===")
+ consumed2, err := client.ConsumeWithGroup(topic, groupID, 2)
+ testutil.AssertNoError(t, err, "Failed to consume remaining messages")
+ testutil.AssertEqual(t, 2, len(consumed2), "Should consume remaining 2 messages")
+
+ // Verify that we got all messages without duplicates
+ totalConsumed := len(consumed1) + len(consumed2)
+ testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages exactly once")
+
+ t.Logf("SUCCESS: Offset management test completed - consumed %d + %d messages", len(consumed1), len(consumed2))
+}
+
+func testConsumerGroupResumption(t *testing.T, addr, topic, groupID string) {
+ client := testutil.NewKafkaGoClient(t, addr)
+ msgGen := testutil.NewMessageGenerator()
+
+ // Produce messages
+ messages := msgGen.GenerateKafkaGoMessages(4)
+ err := client.ProduceMessages(topic, messages)
+ testutil.AssertNoError(t, err, "Failed to produce messages for resumption test")
+
+ // Consume some messages
+ consumed1, err := client.ConsumeWithGroup(topic, groupID, 2)
+ testutil.AssertNoError(t, err, "Failed to consume first batch")
+
+ // Simulate consumer restart by consuming remaining messages with same group ID
+ consumed2, err := client.ConsumeWithGroup(topic, groupID, 2)
+ testutil.AssertNoError(t, err, "Failed to consume after restart")
+
+ // Verify total consumption
+ totalConsumed := len(consumed1) + len(consumed2)
+ testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages after restart")
+
+ t.Logf("SUCCESS: Consumer group resumption test completed")
+}