aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/kafka-client-loadtest/cmd/loadtest/main.go')
-rw-r--r--test/kafka/kafka-client-loadtest/cmd/loadtest/main.go45
1 files changed, 41 insertions, 4 deletions
diff --git a/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go b/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go
index 2f435e600..bfd53501e 100644
--- a/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go
+++ b/test/kafka/kafka-client-loadtest/cmd/loadtest/main.go
@@ -22,6 +22,7 @@ import (
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/metrics"
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/producer"
"github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/schema"
+ "github.com/seaweedfs/seaweedfs/test/kafka/kafka-client-loadtest/internal/tracker"
)
var (
@@ -143,6 +144,10 @@ func main() {
func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error {
log.Printf("Starting producer-only test with %d producers", cfg.Producers.Count)
+ // Create record tracker with current timestamp to filter old messages
+ testStartTime := time.Now().UnixNano()
+ recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl", testStartTime)
+
errChan := make(chan error, cfg.Producers.Count)
for i := 0; i < cfg.Producers.Count; i++ {
@@ -150,7 +155,7 @@ func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics
go func(id int) {
defer wg.Done()
- prod, err := producer.New(cfg, collector, id)
+ prod, err := producer.New(cfg, collector, id, recordTracker)
if err != nil {
log.Printf("Failed to create producer %d: %v", id, err)
errChan <- err
@@ -179,6 +184,10 @@ func runProducerTest(ctx context.Context, cfg *config.Config, collector *metrics
func runConsumerTest(ctx context.Context, cfg *config.Config, collector *metrics.Collector, wg *sync.WaitGroup) error {
log.Printf("Starting consumer-only test with %d consumers", cfg.Consumers.Count)
+ // Create record tracker with current timestamp to filter old messages
+ testStartTime := time.Now().UnixNano()
+ recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl", testStartTime)
+
errChan := make(chan error, cfg.Consumers.Count)
for i := 0; i < cfg.Consumers.Count; i++ {
@@ -186,7 +195,7 @@ func runConsumerTest(ctx context.Context, cfg *config.Config, collector *metrics
go func(id int) {
defer wg.Done()
- cons, err := consumer.New(cfg, collector, id)
+ cons, err := consumer.New(cfg, collector, id, recordTracker)
if err != nil {
log.Printf("Failed to create consumer %d: %v", id, err)
errChan <- err
@@ -206,6 +215,11 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c
log.Printf("Starting comprehensive test with %d producers and %d consumers",
cfg.Producers.Count, cfg.Consumers.Count)
+ // Create record tracker with current timestamp to filter old messages
+ testStartTime := time.Now().UnixNano()
+ log.Printf("Test run starting at %d - only tracking messages from this run", testStartTime)
+ recordTracker := tracker.NewTracker("/test-results/produced.jsonl", "/test-results/consumed.jsonl", testStartTime)
+
errChan := make(chan error, cfg.Producers.Count)
// Create separate contexts for producers and consumers
@@ -218,7 +232,7 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c
go func(id int) {
defer wg.Done()
- prod, err := producer.New(cfg, collector, id)
+ prod, err := producer.New(cfg, collector, id, recordTracker)
if err != nil {
log.Printf("Failed to create producer %d: %v", id, err)
errChan <- err
@@ -239,12 +253,13 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c
time.Sleep(2 * time.Second)
// Start consumers
+ // NOTE: With unique ClientIDs, all consumers can start simultaneously without connection storms
for i := 0; i < cfg.Consumers.Count; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
- cons, err := consumer.New(cfg, collector, id)
+ cons, err := consumer.New(cfg, collector, id, recordTracker)
if err != nil {
log.Printf("Failed to create consumer %d: %v", id, err)
return
@@ -304,6 +319,28 @@ func runComprehensiveTest(ctx context.Context, cancel context.CancelFunc, cfg *c
}()
}
+ // Wait for all producer and consumer goroutines to complete
+ log.Printf("Waiting for all producers and consumers to complete...")
+ wg.Wait()
+ log.Printf("All producers and consumers completed, starting verification...")
+
+ // Save produced and consumed records
+ log.Printf("Saving produced records...")
+ if err := recordTracker.SaveProduced(); err != nil {
+ log.Printf("Failed to save produced records: %v", err)
+ }
+
+ log.Printf("Saving consumed records...")
+ if err := recordTracker.SaveConsumed(); err != nil {
+ log.Printf("Failed to save consumed records: %v", err)
+ }
+
+ // Compare records
+ log.Printf("Comparing produced vs consumed records...")
+ result := recordTracker.Compare()
+ result.PrintSummary()
+
+ log.Printf("Verification complete!")
return nil
}