diff options
Diffstat (limited to 'test/kafka/kafka-client-loadtest/cmd/loadtest/main.go')
| -rw-r--r-- | test/kafka/kafka-client-loadtest/cmd/loadtest/main.go | 45 |
1 files changed, 41 insertions, 4 deletions
diff --git a/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go b/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go index 2f435e600..bfd53501e 100644 --- a/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go +++ b/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go @@ -22,6 +22,7 @@ import ( "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics" "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/producer" "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema" + "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/tracker" ) var ( @@ -143,6 +144,10 @@ func main() { func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error { log.Printf("Starting producer-only test with %d producers", cfg.Producers.Count) + // Create record tracker with current timestamp to filter old messages + testStartTime := time.Now().UnixNano() + recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl", testStartTime) + errChan := make(chan error, cfg.Producers.Count) for i := 0; i < cfg.Producers.Count; i++ { @@ -150,7 +155,7 @@ func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics go func(id int) { defer wg.Done() - prod, err := producer.New(cfg, collector, id) + prod, err := producer.New(cfg, collector, id, recordTracker) if err != nil { log.Printf("Failed to create producer %d: %v", id, err) errChan <- err @@ -179,6 +184,10 @@ func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics func runConsumerTest(ctx context.Context, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error { log.Printf("Starting consumer-only test with %d consumers", cfg.Consumers.Count) + // Create record tracker with current timestamp to filter old messages + testStartTime := time.Now().UnixNano() + recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl", testStartTime) + errChan := make(chan error, cfg.Consumers.Count) for i := 0; i < cfg.Consumers.Count; i++ { @@ -186,7 +195,7 @@ func runConsumerTest(ctx context.Context, cfg *config.Config, collector *metrics go func(id int) { defer wg.Done() - cons, err := consumer.New(cfg, collector, id) + cons, err := consumer.New(cfg, collector, id, recordTracker) if err != nil { log.Printf("Failed to create consumer %d: %v", id, err) errChan <- err @@ -206,6 +215,11 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c log.Printf("Starting comprehensive test with %d producers and %d consumers", cfg.Producers.Count, cfg.Consumers.Count) + // Create record tracker with current timestamp to filter old messages + testStartTime := time.Now().UnixNano() + log.Printf("Test run starting at %d - only tracking messages from this run", testStartTime) + recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl", testStartTime) + errChan := make(chan error, cfg.Producers.Count) // Create separate contexts for producers and consumers @@ -218,7 +232,7 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c go func(id int) { defer wg.Done() - prod, err := producer.New(cfg, collector, id) + prod, err := producer.New(cfg, collector, id, recordTracker) if err != nil { log.Printf("Failed to create producer %d: %v", id, err) errChan <- err @@ -239,12 +253,13 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c time.Sleep(2 * time.Second) // Start consumers + // NOTE: With unique ClientIDs, all consumers can start simultaneously without connection storms for i := 0; i < cfg.Consumers.Count; i++ { wg.Add(1) go func(id int) { defer wg.Done() - cons, err := consumer.New(cfg, collector, id) + cons, err := consumer.New(cfg, collector, id, recordTracker) if err != nil { log.Printf("Failed to create consumer %d: %v", id, err) return @@ -304,6 +319,28 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c }() } + // Wait for all producer and consumer goroutines to complete + log.Printf("Waiting for all producers and consumers to complete...") + wg.Wait() + log.Printf("All producers and consumers completed, starting verification...") + + // Save produced and consumed records + log.Printf("Saving produced records...") + if err := recordTracker.SaveProduced(); err != nil { + log.Printf("Failed to save produced records: %v", err) + } + + log.Printf("Saving consumed records...") + if err := recordTracker.SaveConsumed(); err != nil { + log.Printf("Failed to save consumed records: %v", err) + } + + // Compare records + log.Printf("Comparing produced vs consumed records...") + result := recordTracker.Compare() + result.PrintSummary() + + log.Printf("Verification complete!") return nil } |
