1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
|
package tracker
import (
"encoding/json"
"fmt"
"os"
"sort"
"strings"
"sync"
"time"
)
// Record represents a tracked message
type Record struct {
Key string `json:"key"`
Topic string `json:"topic"`
Partition int32 `json:"partition"`
Offset int64 `json:"offset"`
Timestamp int64 `json:"timestamp"`
ProducerID int `json:"producer_id,omitempty"`
ConsumerID int `json:"consumer_id,omitempty"`
}
// Tracker tracks produced and consumed records
type Tracker struct {
mu sync.Mutex
producedRecords []Record
consumedRecords []Record
producedFile string
consumedFile string
testStartTime int64 // Unix timestamp in nanoseconds - used to filter old messages
testRunPrefix string // Key prefix for this test run (e.g., "run-20251015-170150")
filteredOldCount int // Count of old messages consumed but not tracked
}
// NewTracker creates a new record tracker
func NewTracker(producedFile, consumedFile string, testStartTime int64) *Tracker {
// Generate test run prefix from start time using same format as producer
// Producer format: p.startTime.Format("20060102-150405") -> "20251015-170859"
startTime := time.Unix(0, testStartTime)
runID := startTime.Format("20060102-150405")
testRunPrefix := fmt.Sprintf("run-%s", runID)
fmt.Printf("Tracker initialized with prefix: %s (filtering messages not matching this prefix)\n", testRunPrefix)
return &Tracker{
producedRecords: make([]Record, 0, 100000),
consumedRecords: make([]Record, 0, 100000),
producedFile: producedFile,
consumedFile: consumedFile,
testStartTime: testStartTime,
testRunPrefix: testRunPrefix,
filteredOldCount: 0,
}
}
// TrackProduced records a produced message
func (t *Tracker) TrackProduced(record Record) {
t.mu.Lock()
defer t.mu.Unlock()
t.producedRecords = append(t.producedRecords, record)
}
// TrackConsumed records a consumed message
// Only tracks messages from the current test run (filters out old messages from previous tests)
func (t *Tracker) TrackConsumed(record Record) {
t.mu.Lock()
defer t.mu.Unlock()
// Filter: Only track messages from current test run based on key prefix
// Producer keys look like: "run-20251015-170150-key-123"
// We only want messages that match our test run prefix
if !strings.HasPrefix(record.Key, t.testRunPrefix) {
// Count old messages consumed but not tracked
t.filteredOldCount++
return
}
t.consumedRecords = append(t.consumedRecords, record)
}
// SaveProduced writes produced records to file
func (t *Tracker) SaveProduced() error {
t.mu.Lock()
defer t.mu.Unlock()
f, err := os.Create(t.producedFile)
if err != nil {
return fmt.Errorf("failed to create produced file: %v", err)
}
defer f.Close()
encoder := json.NewEncoder(f)
for _, record := range t.producedRecords {
if err := encoder.Encode(record); err != nil {
return fmt.Errorf("failed to encode produced record: %v", err)
}
}
fmt.Printf("Saved %d produced records to %s\n", len(t.producedRecords), t.producedFile)
return nil
}
// SaveConsumed writes consumed records to file
func (t *Tracker) SaveConsumed() error {
t.mu.Lock()
defer t.mu.Unlock()
f, err := os.Create(t.consumedFile)
if err != nil {
return fmt.Errorf("failed to create consumed file: %v", err)
}
defer f.Close()
encoder := json.NewEncoder(f)
for _, record := range t.consumedRecords {
if err := encoder.Encode(record); err != nil {
return fmt.Errorf("failed to encode consumed record: %v", err)
}
}
fmt.Printf("Saved %d consumed records to %s\n", len(t.consumedRecords), t.consumedFile)
return nil
}
// Compare compares produced and consumed records
func (t *Tracker) Compare() ComparisonResult {
t.mu.Lock()
defer t.mu.Unlock()
result := ComparisonResult{
TotalProduced: len(t.producedRecords),
TotalConsumed: len(t.consumedRecords),
FilteredOldCount: t.filteredOldCount,
}
// Build maps for efficient lookup
producedMap := make(map[string]Record)
for _, record := range t.producedRecords {
key := fmt.Sprintf("%s-%d-%d", record.Topic, record.Partition, record.Offset)
producedMap[key] = record
}
consumedMap := make(map[string]int)
duplicateKeys := make(map[string][]Record)
for _, record := range t.consumedRecords {
key := fmt.Sprintf("%s-%d-%d", record.Topic, record.Partition, record.Offset)
consumedMap[key]++
if consumedMap[key] > 1 {
duplicateKeys[key] = append(duplicateKeys[key], record)
}
}
// Find missing records (produced but not consumed)
for key, record := range producedMap {
if _, found := consumedMap[key]; !found {
result.Missing = append(result.Missing, record)
}
}
// Find duplicate records (consumed multiple times)
for key, records := range duplicateKeys {
if len(records) > 0 {
// Add first occurrence for context
result.Duplicates = append(result.Duplicates, DuplicateRecord{
Record: records[0],
Count: consumedMap[key],
})
}
}
result.MissingCount = len(result.Missing)
result.DuplicateCount = len(result.Duplicates)
result.UniqueConsumed = result.TotalConsumed - sumDuplicates(result.Duplicates)
return result
}
// ComparisonResult holds the comparison results
type ComparisonResult struct {
TotalProduced int
TotalConsumed int
UniqueConsumed int
MissingCount int
DuplicateCount int
FilteredOldCount int // Old messages consumed but filtered out
Missing []Record
Duplicates []DuplicateRecord
}
// DuplicateRecord represents a record consumed multiple times
type DuplicateRecord struct {
Record Record
Count int
}
// PrintSummary prints a summary of the comparison
func (r *ComparisonResult) PrintSummary() {
fmt.Println("\n" + strings.Repeat("=", 70))
fmt.Println(" MESSAGE VERIFICATION RESULTS")
fmt.Println(strings.Repeat("=", 70))
fmt.Printf("\nProduction Summary:\n")
fmt.Printf(" Total Produced: %d messages\n", r.TotalProduced)
fmt.Printf("\nConsumption Summary:\n")
fmt.Printf(" Total Consumed: %d messages (from current test)\n", r.TotalConsumed)
fmt.Printf(" Unique Consumed: %d messages\n", r.UniqueConsumed)
fmt.Printf(" Duplicate Reads: %d messages\n", r.TotalConsumed-r.UniqueConsumed)
if r.FilteredOldCount > 0 {
fmt.Printf(" Filtered Old: %d messages (from previous tests, not tracked)\n", r.FilteredOldCount)
}
fmt.Printf("\nVerification Results:\n")
if r.MissingCount == 0 {
fmt.Printf(" ✅ Missing Records: 0 (all messages delivered)\n")
} else {
fmt.Printf(" ❌ Missing Records: %d (data loss detected!)\n", r.MissingCount)
}
if r.DuplicateCount == 0 {
fmt.Printf(" ✅ Duplicate Records: 0 (no duplicates)\n")
} else {
duplicatePercent := float64(r.TotalConsumed-r.UniqueConsumed) * 100.0 / float64(r.TotalProduced)
fmt.Printf(" ⚠️ Duplicate Records: %d unique messages read multiple times (%.1f%%)\n",
r.DuplicateCount, duplicatePercent)
}
fmt.Printf("\nDelivery Guarantee:\n")
if r.MissingCount == 0 && r.DuplicateCount == 0 {
fmt.Printf(" ✅ EXACTLY-ONCE: All messages delivered exactly once\n")
} else if r.MissingCount == 0 {
fmt.Printf(" ✅ AT-LEAST-ONCE: All messages delivered (some duplicates)\n")
} else {
fmt.Printf(" ❌ AT-MOST-ONCE: Some messages lost\n")
}
// Print sample of missing records (up to 10)
if len(r.Missing) > 0 {
fmt.Printf("\nSample Missing Records (first 10 of %d):\n", len(r.Missing))
for i, record := range r.Missing {
if i >= 10 {
break
}
fmt.Printf(" - %s[%d]@%d (key=%s)\n",
record.Topic, record.Partition, record.Offset, record.Key)
}
}
// Print sample of duplicate records (up to 10)
if len(r.Duplicates) > 0 {
fmt.Printf("\nSample Duplicate Records (first 10 of %d):\n", len(r.Duplicates))
// Sort by count descending
sorted := make([]DuplicateRecord, len(r.Duplicates))
copy(sorted, r.Duplicates)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].Count > sorted[j].Count
})
for i, dup := range sorted {
if i >= 10 {
break
}
fmt.Printf(" - %s[%d]@%d (key=%s, read %d times)\n",
dup.Record.Topic, dup.Record.Partition, dup.Record.Offset,
dup.Record.Key, dup.Count)
}
}
fmt.Println(strings.Repeat("=", 70))
}
func sumDuplicates(duplicates []DuplicateRecord) int {
sum := 0
for _, dup := range duplicates {
sum += dup.Count - 1 // Don't count the first occurrence
}
return sum
}
|