diff options
Diffstat (limited to 'test/kafka/kafka-client-loadtest/internal/tracker/tracker.go')
| -rw-r--r-- | test/kafka/kafka-client-loadtest/internal/tracker/tracker.go | 281 |
1 files changed, 281 insertions, 0 deletions
diff --git a/test/kafka/kafka-client-loadtest/internal/tracker/tracker.go b/test/kafka/kafka-client-loadtest/internal/tracker/tracker.go new file mode 100644 index 000000000..1f67c7a65 --- /dev/null +++ b/test/kafka/kafka-client-loadtest/internal/tracker/tracker.go @@ -0,0 +1,281 @@ +package tracker + +import ( + "encoding/json" + "fmt" + "os" + "sort" + "strings" + "sync" + "time" +) + +// Record represents a tracked message +type Record struct { + Key string `json:"key"` + Topic string `json:"topic"` + Partition int32 `json:"partition"` + Offset int64 `json:"offset"` + Timestamp int64 `json:"timestamp"` + ProducerID int `json:"producer_id,omitempty"` + ConsumerID int `json:"consumer_id,omitempty"` +} + +// Tracker tracks produced and consumed records +type Tracker struct { + mu sync.Mutex + producedRecords []Record + consumedRecords []Record + producedFile string + consumedFile string + testStartTime int64 // Unix timestamp in nanoseconds - used to filter old messages + testRunPrefix string // Key prefix for this test run (e.g., "run-20251015-170150") + filteredOldCount int // Count of old messages consumed but not tracked +} + +// NewTracker creates a new record tracker +func NewTracker(producedFile, consumedFile string, testStartTime int64) *Tracker { + // Generate test run prefix from start time using same format as producer + // Producer format: p.startTime.Format("20060102-150405") -> "20251015-170859" + startTime := time.Unix(0, testStartTime) + runID := startTime.Format("20060102-150405") + testRunPrefix := fmt.Sprintf("run-%s", runID) + + fmt.Printf("Tracker initialized with prefix: %s (filtering messages not matching this prefix)\n", testRunPrefix) + + return &Tracker{ + producedRecords: make([]Record, 0, 100000), + consumedRecords: make([]Record, 0, 100000), + producedFile: producedFile, + consumedFile: consumedFile, + testStartTime: testStartTime, + testRunPrefix: testRunPrefix, + filteredOldCount: 0, + } +} + +// TrackProduced records a produced message +func (t *Tracker) TrackProduced(record Record) { + t.mu.Lock() + defer t.mu.Unlock() + t.producedRecords = append(t.producedRecords, record) +} + +// TrackConsumed records a consumed message +// Only tracks messages from the current test run (filters out old messages from previous tests) +func (t *Tracker) TrackConsumed(record Record) { + t.mu.Lock() + defer t.mu.Unlock() + + // Filter: Only track messages from current test run based on key prefix + // Producer keys look like: "run-20251015-170150-key-123" + // We only want messages that match our test run prefix + if !strings.HasPrefix(record.Key, t.testRunPrefix) { + // Count old messages consumed but not tracked + t.filteredOldCount++ + return + } + + t.consumedRecords = append(t.consumedRecords, record) +} + +// SaveProduced writes produced records to file +func (t *Tracker) SaveProduced() error { + t.mu.Lock() + defer t.mu.Unlock() + + f, err := os.Create(t.producedFile) + if err != nil { + return fmt.Errorf("failed to create produced file: %v", err) + } + defer f.Close() + + encoder := json.NewEncoder(f) + for _, record := range t.producedRecords { + if err := encoder.Encode(record); err != nil { + return fmt.Errorf("failed to encode produced record: %v", err) + } + } + + fmt.Printf("Saved %d produced records to %s\n", len(t.producedRecords), t.producedFile) + return nil +} + +// SaveConsumed writes consumed records to file +func (t *Tracker) SaveConsumed() error { + t.mu.Lock() + defer t.mu.Unlock() + + f, err := os.Create(t.consumedFile) + if err != nil { + return fmt.Errorf("failed to create consumed file: %v", err) + } + defer f.Close() + + encoder := json.NewEncoder(f) + for _, record := range t.consumedRecords { + if err := encoder.Encode(record); err != nil { + return fmt.Errorf("failed to encode consumed record: %v", err) + } + } + + fmt.Printf("Saved %d consumed records to %s\n", len(t.consumedRecords), t.consumedFile) + return nil +} + +// Compare compares produced and consumed records +func (t *Tracker) Compare() ComparisonResult { + t.mu.Lock() + defer t.mu.Unlock() + + result := ComparisonResult{ + TotalProduced: len(t.producedRecords), + TotalConsumed: len(t.consumedRecords), + FilteredOldCount: t.filteredOldCount, + } + + // Build maps for efficient lookup + producedMap := make(map[string]Record) + for _, record := range t.producedRecords { + key := fmt.Sprintf("%s-%d-%d", record.Topic, record.Partition, record.Offset) + producedMap[key] = record + } + + consumedMap := make(map[string]int) + duplicateKeys := make(map[string][]Record) + + for _, record := range t.consumedRecords { + key := fmt.Sprintf("%s-%d-%d", record.Topic, record.Partition, record.Offset) + consumedMap[key]++ + + if consumedMap[key] > 1 { + duplicateKeys[key] = append(duplicateKeys[key], record) + } + } + + // Find missing records (produced but not consumed) + for key, record := range producedMap { + if _, found := consumedMap[key]; !found { + result.Missing = append(result.Missing, record) + } + } + + // Find duplicate records (consumed multiple times) + for key, records := range duplicateKeys { + if len(records) > 0 { + // Add first occurrence for context + result.Duplicates = append(result.Duplicates, DuplicateRecord{ + Record: records[0], + Count: consumedMap[key], + }) + } + } + + result.MissingCount = len(result.Missing) + result.DuplicateCount = len(result.Duplicates) + result.UniqueConsumed = result.TotalConsumed - sumDuplicates(result.Duplicates) + + return result +} + +// ComparisonResult holds the comparison results +type ComparisonResult struct { + TotalProduced int + TotalConsumed int + UniqueConsumed int + MissingCount int + DuplicateCount int + FilteredOldCount int // Old messages consumed but filtered out + Missing []Record + Duplicates []DuplicateRecord +} + +// DuplicateRecord represents a record consumed multiple times +type DuplicateRecord struct { + Record Record + Count int +} + +// PrintSummary prints a summary of the comparison +func (r *ComparisonResult) PrintSummary() { + fmt.Println("\n" + strings.Repeat("=", 70)) + fmt.Println(" MESSAGE VERIFICATION RESULTS") + fmt.Println(strings.Repeat("=", 70)) + + fmt.Printf("\nProduction Summary:\n") + fmt.Printf(" Total Produced: %d messages\n", r.TotalProduced) + + fmt.Printf("\nConsumption Summary:\n") + fmt.Printf(" Total Consumed: %d messages (from current test)\n", r.TotalConsumed) + fmt.Printf(" Unique Consumed: %d messages\n", r.UniqueConsumed) + fmt.Printf(" Duplicate Reads: %d messages\n", r.TotalConsumed-r.UniqueConsumed) + if r.FilteredOldCount > 0 { + fmt.Printf(" Filtered Old: %d messages (from previous tests, not tracked)\n", r.FilteredOldCount) + } + + fmt.Printf("\nVerification Results:\n") + if r.MissingCount == 0 { + fmt.Printf(" ✅ Missing Records: 0 (all messages delivered)\n") + } else { + fmt.Printf(" ❌ Missing Records: %d (data loss detected!)\n", r.MissingCount) + } + + if r.DuplicateCount == 0 { + fmt.Printf(" ✅ Duplicate Records: 0 (no duplicates)\n") + } else { + duplicatePercent := float64(r.TotalConsumed-r.UniqueConsumed) * 100.0 / float64(r.TotalProduced) + fmt.Printf(" ⚠️ Duplicate Records: %d unique messages read multiple times (%.1f%%)\n", + r.DuplicateCount, duplicatePercent) + } + + fmt.Printf("\nDelivery Guarantee:\n") + if r.MissingCount == 0 && r.DuplicateCount == 0 { + fmt.Printf(" ✅ EXACTLY-ONCE: All messages delivered exactly once\n") + } else if r.MissingCount == 0 { + fmt.Printf(" ✅ AT-LEAST-ONCE: All messages delivered (some duplicates)\n") + } else { + fmt.Printf(" ❌ AT-MOST-ONCE: Some messages lost\n") + } + + // Print sample of missing records (up to 10) + if len(r.Missing) > 0 { + fmt.Printf("\nSample Missing Records (first 10 of %d):\n", len(r.Missing)) + for i, record := range r.Missing { + if i >= 10 { + break + } + fmt.Printf(" - %s[%d]@%d (key=%s)\n", + record.Topic, record.Partition, record.Offset, record.Key) + } + } + + // Print sample of duplicate records (up to 10) + if len(r.Duplicates) > 0 { + fmt.Printf("\nSample Duplicate Records (first 10 of %d):\n", len(r.Duplicates)) + // Sort by count descending + sorted := make([]DuplicateRecord, len(r.Duplicates)) + copy(sorted, r.Duplicates) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].Count > sorted[j].Count + }) + + for i, dup := range sorted { + if i >= 10 { + break + } + fmt.Printf(" - %s[%d]@%d (key=%s, read %d times)\n", + dup.Record.Topic, dup.Record.Partition, dup.Record.Offset, + dup.Record.Key, dup.Count) + } + } + + fmt.Println(strings.Repeat("=", 70)) +} + +func sumDuplicates(duplicates []DuplicateRecord) int { + sum := 0 + for _, dup := range duplicates { + sum += dup.Count - 1 // Don't count the first occurrence + } + return sum +} |
