aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/internal/testutil/messages.go
blob: 803dc8e0d7b0975e963842cbf6f82f80a0c165dc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package testutil

import (
	"fmt"
	"os"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/mq/kafka/schema"
	"github.com/segmentio/kafka-go"
)

// MessageGenerator provides utilities for generating test messages
type MessageGenerator struct {
	counter int
}

// NewMessageGenerator creates a new message generator
func NewMessageGenerator() *MessageGenerator {
	return &MessageGenerator{counter: 0}
}

// GenerateKafkaGoMessages generates kafka-go messages for testing
func (m *MessageGenerator) GenerateKafkaGoMessages(count int) []kafka.Message {
	messages := make([]kafka.Message, count)

	for i := 0; i < count; i++ {
		m.counter++
		key := []byte(fmt.Sprintf("test-key-%d", m.counter))
		val := []byte(fmt.Sprintf("{\"value\":\"test-message-%d-generated-at-%d\"}", m.counter, time.Now().Unix()))

		// If schema mode is requested, ensure a test schema exists and wrap with Confluent envelope
		if url := os.Getenv("SCHEMA_REGISTRY_URL"); url != "" {
			subject := "offset-management-value"
			schemaJSON := `{"type":"record","name":"TestRecord","fields":[{"name":"value","type":"string"}]}`
			rc := schema.NewRegistryClient(schema.RegistryConfig{URL: url})
			if _, err := rc.GetLatestSchema(subject); err != nil {
				// Best-effort register schema
				_, _ = rc.RegisterSchema(subject, schemaJSON)
			}
			if latest, err := rc.GetLatestSchema(subject); err == nil {
				val = schema.CreateConfluentEnvelope(schema.FormatAvro, latest.LatestID, nil, val)
			} else {
				// fallback to schema id 1
				val = schema.CreateConfluentEnvelope(schema.FormatAvro, 1, nil, val)
			}
		}

		messages[i] = kafka.Message{Key: key, Value: val}
	}

	return messages
}

// GenerateStringMessages generates string messages for Sarama
func (m *MessageGenerator) GenerateStringMessages(count int) []string {
	messages := make([]string, count)

	for i := 0; i < count; i++ {
		m.counter++
		messages[i] = fmt.Sprintf("test-message-%d-generated-at-%d", m.counter, time.Now().Unix())
	}

	return messages
}

// GenerateKafkaGoMessage generates a single kafka-go message
func (m *MessageGenerator) GenerateKafkaGoMessage(key, value string) kafka.Message {
	if key == "" {
		m.counter++
		key = fmt.Sprintf("test-key-%d", m.counter)
	}
	if value == "" {
		value = fmt.Sprintf("test-message-%d-generated-at-%d", m.counter, time.Now().Unix())
	}

	return kafka.Message{
		Key:   []byte(key),
		Value: []byte(value),
	}
}

// GenerateUniqueTopicName generates a unique topic name for testing
func GenerateUniqueTopicName(prefix string) string {
	if prefix == "" {
		prefix = "test-topic"
	}
	return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano())
}

// GenerateUniqueGroupID generates a unique consumer group ID for testing
func GenerateUniqueGroupID(prefix string) string {
	if prefix == "" {
		prefix = "test-group"
	}
	return fmt.Sprintf("%s-%d", prefix, time.Now().UnixNano())
}

// ValidateMessageContent validates that consumed messages match expected content
func ValidateMessageContent(expected, actual []string) error {
	if len(expected) != len(actual) {
		return fmt.Errorf("message count mismatch: expected %d, got %d", len(expected), len(actual))
	}

	for i, expectedMsg := range expected {
		if i >= len(actual) {
			return fmt.Errorf("missing message at index %d", i)
		}
		if actual[i] != expectedMsg {
			return fmt.Errorf("message mismatch at index %d: expected %q, got %q", i, expectedMsg, actual[i])
		}
	}

	return nil
}

// ValidateKafkaGoMessageContent validates kafka-go messages
func ValidateKafkaGoMessageContent(expected, actual []kafka.Message) error {
	if len(expected) != len(actual) {
		return fmt.Errorf("message count mismatch: expected %d, got %d", len(expected), len(actual))
	}

	for i, expectedMsg := range expected {
		if i >= len(actual) {
			return fmt.Errorf("missing message at index %d", i)
		}
		if string(actual[i].Key) != string(expectedMsg.Key) {
			return fmt.Errorf("key mismatch at index %d: expected %q, got %q", i, string(expectedMsg.Key), string(actual[i].Key))
		}
		if string(actual[i].Value) != string(expectedMsg.Value) {
			return fmt.Errorf("value mismatch at index %d: expected %q, got %q", i, string(expectedMsg.Value), string(actual[i].Value))
		}
	}

	return nil
}