aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/internal/testutil/messages.go
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/internal/testutil/messages.go')
-rw-r--r--test/kafka/internal/testutil/messages.go135
1 files changed, 135 insertions, 0 deletions
diff --git a/test/kafka/internal/testutil/messages.go b/test/kafka/internal/testutil/messages.go
new file mode 100644
index 000000000..803dc8e0d
--- /dev/null
+++ b/test/kafka/internal/testutil/messages.go
@@ -0,0 +1,135 @@
+package testutil
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
+ "github.com/segmentio/kafka-go"
+)
+
+// MessageGenerator provides utilities for generating test messages
+type MessageGenerator struct {
+ counter int
+}
+
+// NewMessageGenerator creates a new message generator
+func NewMessageGenerator() *MessageGenerator {
+ return &MessageGenerator{counter: 0}
+}
+
+// GenerateKafkaGoMessages generates kafka-go messages for testing
+func (m *MessageGenerator) GenerateKafkaGoMessages(count int) []kafka.Message {
+ messages := make([]kafka.Message, count)
+
+ for i := 0; i < count; i++ {
+ m.counter++
+ key := []byte(fmt.Sprintf("test-key-%d", m.counter))
+ val := []byte(fmt.Sprintf("{\"value\":\"test-message-%d-generated-at-%d\"}", m.counter, time.Now().Unix()))
+
+ // If schema mode is requested, ensure a test schema exists and wrap with Confluent envelope
+ if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" {
+ subject := "offset-management-value"
+ schemaJSON := `{"type":"record","name":"TestRecord","fields":[{"name":"value","type":"string"}]}`
+ rc := schema.NewRegistryClient(schema.RegistryConfig{URL: url})
+ if _, err := rc.GetLatestSchema(subject); err != nil {
+ // Best-effort register schema
+ _, _ = rc.RegisterSchema(subject, schemaJSON)
+ }
+ if latest, err := rc.GetLatestSchema(subject); err == nil {
+ val = schema.CreateConfluentEnvelope(schema.FormatAvro, latest.LatestID, nil, val)
+ } else {
+ // fallback to schema id 1
+ val = schema.CreateConfluentEnvelope(schema.FormatAvro, 1, nil, val)
+ }
+ }
+
+ messages[i] = kafka.Message{Key: key, Value: val}
+ }
+
+ return messages
+}
+
+// GenerateStringMessages generates string messages for Sarama
+func (m *MessageGenerator) GenerateStringMessages(count int) []string {
+ messages := make([]string, count)
+
+ for i := 0; i < count; i++ {
+ m.counter++
+ messages[i] = fmt.Sprintf("test-message-%d-generated-at-%d", m.counter, time.Now().Unix())
+ }
+
+ return messages
+}
+
+// GenerateKafkaGoMessage generates a single kafka-go message
+func (m *MessageGenerator) GenerateKafkaGoMessage(key, value string) kafka.Message {
+ if key == "" {
+ m.counter++
+ key = fmt.Sprintf("test-key-%d", m.counter)
+ }
+ if value == "" {
+ value = fmt.Sprintf("test-message-%d-generated-at-%d", m.counter, time.Now().Unix())
+ }
+
+ return kafka.Message{
+ Key: []byte(key),
+ Value: []byte(value),
+ }
+}
+
+// GenerateUniqueTopicName generates a unique topic name for testing
+func GenerateUniqueTopicName(prefix string) string {
+ if prefix == "" {
+ prefix = "test-topic"
+ }
+ return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano())
+}
+
+// GenerateUniqueGroupID generates a unique consumer group ID for testing
+func GenerateUniqueGroupID(prefix string) string {
+ if prefix == "" {
+ prefix = "test-group"
+ }
+ return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano())
+}
+
+// ValidateMessageContent validates that consumed messages match expected content
+func ValidateMessageContent(expected, actual []string) error {
+ if len(expected) != len(actual) {
+ return fmt.Errorf("message count mismatch: expected %d, got %d", len(expected), len(actual))
+ }
+
+ for i, expectedMsg := range expected {
+ if i >= len(actual) {
+ return fmt.Errorf("missing message at index %d", i)
+ }
+ if actual[i] != expectedMsg {
+ return fmt.Errorf("message mismatch at index %d: expected %q, got %q", i, expectedMsg, actual[i])
+ }
+ }
+
+ return nil
+}
+
+// ValidateKafkaGoMessageContent validates kafka-go messages
+func ValidateKafkaGoMessageContent(expected, actual []kafka.Message) error {
+ if len(expected) != len(actual) {
+ return fmt.Errorf("message count mismatch: expected %d, got %d", len(expected), len(actual))
+ }
+
+ for i, expectedMsg := range expected {
+ if i >= len(actual) {
+ return fmt.Errorf("missing message at index %d", i)
+ }
+ if string(actual[i].Key) != string(expectedMsg.Key) {
+ return fmt.Errorf("key mismatch at index %d: expected %q, got %q", i, string(expectedMsg.Key), string(actual[i].Key))
+ }
+ if string(actual[i].Value) != string(expectedMsg.Value) {
+ return fmt.Errorf("value mismatch at index %d: expected %q, got %q", i, string(expectedMsg.Value), string(actual[i].Value))
+ }
+ }
+
+ return nil
+}