aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/e2e/comprehensive_test.go
blob: 739ccd3a31f0895ed4449c59a493768f1ba07afa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package e2e

import (
	"testing"

	"github.com/seaweedfs/seaweedfs/test/kafka/internal/testutil"
)

// TestComprehensiveE2E tests complete end-to-end workflows
// This test will use SMQ backend if SEAWEEDFS_MASTERS is available, otherwise mock
func TestComprehensiveE2E(t *testing.T) {
	gateway := testutil.NewGatewayTestServerWithSMQ(t, testutil.SMQAvailable)
	defer gateway.CleanupAndClose()

	addr := gateway.StartAndWait()

	// Log which backend we're using
	if gateway.IsSMQMode() {
		t.Logf("Running comprehensive E2E tests with SMQ backend")
	} else {
		t.Logf("Running comprehensive E2E tests with mock backend")
	}

	// Create topics for different test scenarios
	topics := []string{
		testutil.GenerateUniqueTopicName("e2e-kafka-go"),
		testutil.GenerateUniqueTopicName("e2e-sarama"),
		testutil.GenerateUniqueTopicName("e2e-mixed"),
	}
	gateway.AddTestTopics(topics...)

	t.Run("KafkaGo_to_KafkaGo", func(t *testing.T) {
		testKafkaGoToKafkaGo(t, addr, topics[0])
	})

	t.Run("Sarama_to_Sarama", func(t *testing.T) {
		testSaramaToSarama(t, addr, topics[1])
	})

	t.Run("KafkaGo_to_Sarama", func(t *testing.T) {
		testKafkaGoToSarama(t, addr, topics[2])
	})

	t.Run("Sarama_to_KafkaGo", func(t *testing.T) {
		testSaramaToKafkaGo(t, addr, topics[2])
	})
}

func testKafkaGoToKafkaGo(t *testing.T, addr, topic string) {
	client := testutil.NewKafkaGoClient(t, addr)
	msgGen := testutil.NewMessageGenerator()

	// Generate test messages
	messages := msgGen.GenerateKafkaGoMessages(2)

	// Produce with kafka-go
	err := client.ProduceMessages(topic, messages)
	testutil.AssertNoError(t, err, "kafka-go produce failed")

	// Consume with kafka-go
	consumed, err := client.ConsumeMessages(topic, len(messages))
	testutil.AssertNoError(t, err, "kafka-go consume failed")

	// Validate message content
	err = testutil.ValidateKafkaGoMessageContent(messages, consumed)
	testutil.AssertNoError(t, err, "Message content validation failed")

	t.Logf("kafka-go to kafka-go test PASSED")
}

func testSaramaToSarama(t *testing.T, addr, topic string) {
	client := testutil.NewSaramaClient(t, addr)
	msgGen := testutil.NewMessageGenerator()

	// Generate test messages
	messages := msgGen.GenerateStringMessages(2)

	// Produce with Sarama
	err := client.ProduceMessages(topic, messages)
	testutil.AssertNoError(t, err, "Sarama produce failed")

	// Consume with Sarama
	consumed, err := client.ConsumeMessages(topic, 0, len(messages))
	testutil.AssertNoError(t, err, "Sarama consume failed")

	// Validate message content
	err = testutil.ValidateMessageContent(messages, consumed)
	testutil.AssertNoError(t, err, "Message content validation failed")

	t.Logf("Sarama to Sarama test PASSED")
}

func testKafkaGoToSarama(t *testing.T, addr, topic string) {
	kafkaGoClient := testutil.NewKafkaGoClient(t, addr)
	saramaClient := testutil.NewSaramaClient(t, addr)
	msgGen := testutil.NewMessageGenerator()

	// Produce with kafka-go
	messages := msgGen.GenerateKafkaGoMessages(2)
	err := kafkaGoClient.ProduceMessages(topic, messages)
	testutil.AssertNoError(t, err, "kafka-go produce failed")

	// Consume with Sarama
	consumed, err := saramaClient.ConsumeMessages(topic, 0, len(messages))
	testutil.AssertNoError(t, err, "Sarama consume failed")

	// Validate that we got the expected number of messages
	testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch")

	t.Logf("kafka-go to Sarama test PASSED")
}

func testSaramaToKafkaGo(t *testing.T, addr, topic string) {
	kafkaGoClient := testutil.NewKafkaGoClient(t, addr)
	saramaClient := testutil.NewSaramaClient(t, addr)
	msgGen := testutil.NewMessageGenerator()

	// Produce with Sarama
	messages := msgGen.GenerateStringMessages(2)
	err := saramaClient.ProduceMessages(topic, messages)
	testutil.AssertNoError(t, err, "Sarama produce failed")

	// Consume with kafka-go
	consumed, err := kafkaGoClient.ConsumeMessages(topic, len(messages))
	testutil.AssertNoError(t, err, "kafka-go consume failed")

	// Validate that we got the expected number of messages
	testutil.AssertEqual(t, len(messages), len(consumed), "Message count mismatch")

	t.Logf("Sarama to kafka-go test PASSED")
}