diff options
Diffstat (limited to 'test/kafka/e2e')
| -rw-r--r-- | test/kafka/e2e/comprehensive_test.go | 131 | ||||
| -rw-r--r-- | test/kafka/e2e/offset_management_test.go | 101 |
2 files changed, 232 insertions, 0 deletions
diff --git a/test/kafka/e2e/comprehensive_test.go b/test/kafka/e2e/comprehensive_test.go new file mode 100644 index 000000000..739ccd3a3 --- /dev/null +++ b/test/kafka/e2e/comprehensive_test.go @@ -0,0 +1,131 @@ +package e2e + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil" +) + +// TestComprehensiveE2E tests complete end-to-end workflows +// This test will use SMQ backend if SEAWEEDFS_MASTERS is available, otherwise mock +func TestComprehensiveE2E(t *testing.T) { + gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQAvailable) + defer gateway.CleanupAndClose() + + addr := gateway.StartAndWait() + + // Log which backend we're using + if gateway.IsSMQMode() { + t.Logf("Running comprehensive E2E tests with SMQ backend") + } else { + t.Logf("Running comprehensive E2E tests with mock backend") + } + + // Create topics for different test scenarios + topics := []string{ + testutil.GenerateUniqueTopicName("e2e-kafka-go"), + testutil.GenerateUniqueTopicName("e2e-sarama"), + testutil.GenerateUniqueTopicName("e2e-mixed"), + } + gateway.AddTestTopics(topics...) + + t.Run("KafkaGo_to_KafkaGo", func(t *testing.T) { + testKafkaGoToKafkaGo(t, addr, topics[0]) + }) + + t.Run("Sarama_to_Sarama", func(t *testing.T) { + testSaramaToSarama(t, addr, topics[1]) + }) + + t.Run("KafkaGo_to_Sarama", func(t *testing.T) { + testKafkaGoToSarama(t, addr, topics[2]) + }) + + t.Run("Sarama_to_KafkaGo", func(t *testing.T) { + testSaramaToKafkaGo(t, addr, topics[2]) + }) +} + +func testKafkaGoToKafkaGo(t *testing.T, addr, topic string) { + client := testutil.NewKafkaGoClient(t, addr) + msgGen := testutil.NewMessageGenerator() + + // Generate test messages + messages := msgGen.GenerateKafkaGoMessages(2) + + // Produce with kafka-go + err := client.ProduceMessages(topic, messages) + testutil.AssertNoError(t, err, "kafka-go produce failed") + + // Consume with kafka-go + consumed, err := client.ConsumeMessages(topic, len(messages)) + testutil.AssertNoError(t, err, "kafka-go consume failed") + + // Validate message content + err = testutil.ValidateKafkaGoMessageContent(messages, consumed) + testutil.AssertNoError(t, err, "Message content validation failed") + + t.Logf("kafka-go to kafka-go test PASSED") +} + +func testSaramaToSarama(t *testing.T, addr, topic string) { + client := testutil.NewSaramaClient(t, addr) + msgGen := testutil.NewMessageGenerator() + + // Generate test messages + messages := msgGen.GenerateStringMessages(2) + + // Produce with Sarama + err := client.ProduceMessages(topic, messages) + testutil.AssertNoError(t, err, "Sarama produce failed") + + // Consume with Sarama + consumed, err := client.ConsumeMessages(topic, 0, len(messages)) + testutil.AssertNoError(t, err, "Sarama consume failed") + + // Validate message content + err = testutil.ValidateMessageContent(messages, consumed) + testutil.AssertNoError(t, err, "Message content validation failed") + + t.Logf("Sarama to Sarama test PASSED") +} + +func testKafkaGoToSarama(t *testing.T, addr, topic string) { + kafkaGoClient := testutil.NewKafkaGoClient(t, addr) + saramaClient := testutil.NewSaramaClient(t, addr) + msgGen := testutil.NewMessageGenerator() + + // Produce with kafka-go + messages := msgGen.GenerateKafkaGoMessages(2) + err := kafkaGoClient.ProduceMessages(topic, messages) + testutil.AssertNoError(t, err, "kafka-go produce failed") + + // Consume with Sarama + consumed, err := saramaClient.ConsumeMessages(topic, 0, len(messages)) + testutil.AssertNoError(t, err, "Sarama consume failed") + + // Validate that we got the expected number of messages + testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch") + + t.Logf("kafka-go to Sarama test PASSED") +} + +func testSaramaToKafkaGo(t *testing.T, addr, topic string) { + kafkaGoClient := testutil.NewKafkaGoClient(t, addr) + saramaClient := testutil.NewSaramaClient(t, addr) + msgGen := testutil.NewMessageGenerator() + + // Produce with Sarama + messages := msgGen.GenerateStringMessages(2) + err := saramaClient.ProduceMessages(topic, messages) + testutil.AssertNoError(t, err, "Sarama produce failed") + + // Consume with kafka-go + consumed, err := kafkaGoClient.ConsumeMessages(topic, len(messages)) + testutil.AssertNoError(t, err, "kafka-go consume failed") + + // Validate that we got the expected number of messages + testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch") + + t.Logf("Sarama to kafka-go test PASSED") +} diff --git a/test/kafka/e2e/offset_management_test.go b/test/kafka/e2e/offset_management_test.go new file mode 100644 index 000000000..398647843 --- /dev/null +++ b/test/kafka/e2e/offset_management_test.go @@ -0,0 +1,101 @@ +package e2e + +import ( + "os" + "testing" + + "github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil" +) + +// TestOffsetManagement tests end-to-end offset management scenarios +// This test will use SMQ backend if SEAWEEDFS_MASTERS is available, otherwise mock +func TestOffsetManagement(t *testing.T) { + gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQAvailable) + defer gateway.CleanupAndClose() + + addr := gateway.StartAndWait() + + // If schema registry is configured, ensure gateway is in schema mode and log + if v := os.Getenv("SCHEMA_REGISTRY_URL"); v != "" { + t.Logf("Schema Registry detected at %s - running offset tests in schematized mode", v) + } + + // Log which backend we're using + if gateway.IsSMQMode() { + t.Logf("Running offset management tests with SMQ backend - offsets will be persisted") + } else { + t.Logf("Running offset management tests with mock backend - offsets are in-memory only") + } + + topic := testutil.GenerateUniqueTopicName("offset-management") + groupID := testutil.GenerateUniqueGroupID("offset-test-group") + + gateway.AddTestTopic(topic) + + t.Run("BasicOffsetCommitFetch", func(t *testing.T) { + testBasicOffsetCommitFetch(t, addr, topic, groupID) + }) + + t.Run("ConsumerGroupResumption", func(t *testing.T) { + testConsumerGroupResumption(t, addr, topic, groupID+"2") + }) +} + +func testBasicOffsetCommitFetch(t *testing.T, addr, topic, groupID string) { + client := testutil.NewKafkaGoClient(t, addr) + msgGen := testutil.NewMessageGenerator() + + // Produce test messages + if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" { + if id, err := testutil.EnsureValueSchema(t, url, topic); err == nil { + t.Logf("Ensured value schema id=%d for subject %s-value", id, topic) + } else { + t.Logf("Schema registration failed (non-fatal for test): %v", err) + } + } + messages := msgGen.GenerateKafkaGoMessages(5) + err := client.ProduceMessages(topic, messages) + testutil.AssertNoError(t, err, "Failed to produce offset test messages") + + // Phase 1: Consume first 3 messages and commit offsets + t.Logf("=== Phase 1: Consuming first 3 messages ===") + consumed1, err := client.ConsumeWithGroup(topic, groupID, 3) + testutil.AssertNoError(t, err, "Failed to consume first batch") + testutil.AssertEqual(t, 3, len(consumed1), "Should consume exactly 3 messages") + + // Phase 2: Create new consumer with same group ID - should resume from committed offset + t.Logf("=== Phase 2: Resuming from committed offset ===") + consumed2, err := client.ConsumeWithGroup(topic, groupID, 2) + testutil.AssertNoError(t, err, "Failed to consume remaining messages") + testutil.AssertEqual(t, 2, len(consumed2), "Should consume remaining 2 messages") + + // Verify that we got all messages without duplicates + totalConsumed := len(consumed1) + len(consumed2) + testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages exactly once") + + t.Logf("SUCCESS: Offset management test completed - consumed %d + %d messages", len(consumed1), len(consumed2)) +} + +func testConsumerGroupResumption(t *testing.T, addr, topic, groupID string) { + client := testutil.NewKafkaGoClient(t, addr) + msgGen := testutil.NewMessageGenerator() + + // Produce messages + messages := msgGen.GenerateKafkaGoMessages(4) + err := client.ProduceMessages(topic, messages) + testutil.AssertNoError(t, err, "Failed to produce messages for resumption test") + + // Consume some messages + consumed1, err := client.ConsumeWithGroup(topic, groupID, 2) + testutil.AssertNoError(t, err, "Failed to consume first batch") + + // Simulate consumer restart by consuming remaining messages with same group ID + consumed2, err := client.ConsumeWithGroup(topic, groupID, 2) + testutil.AssertNoError(t, err, "Failed to consume after restart") + + // Verify total consumption + totalConsumed := len(consumed1) + len(consumed2) + testutil.AssertEqual(t, len(messages), totalConsumed, "Should consume all messages after restart") + + t.Logf("SUCCESS: Consumer group resumption test completed") +} |
