aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/loadtest/quick_performance_test.go
blob: 299a7d9488c4c4de748bf524ea16268fbb9b3825 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package integration

import (
	"fmt"
	"sync"
	"sync/atomic"
	"testing"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
)

// TestQuickPerformance_10K tests the fixed broker with 10K records
func TestQuickPerformance_10K(t *testing.T) {
	const (
		totalRecords  = 10000 // 10K records for quick test
		numPartitions = int32(4)
		numProducers  = 4
		brokerAddr    = "localhost:17777"
	)

	// Create direct broker client
	client, err := NewDirectBrokerClient(brokerAddr)
	if err != nil {
		t.Fatalf("Failed to create direct broker client: %v", err)
	}
	defer client.Close()

	topicName := fmt.Sprintf("quick-test-%d", time.Now().Unix())

	// Create topic
	glog.Infof("Creating topic %s with %d partitions", topicName, numPartitions)
	err = client.ConfigureTopic(topicName, numPartitions)
	if err != nil {
		t.Fatalf("Failed to configure topic: %v", err)
	}

	// Performance tracking
	var totalProduced int64
	var totalErrors int64
	startTime := time.Now()

	// Producer function
	producer := func(producerID int, recordsPerProducer int) error {
		for i := 0; i < recordsPerProducer; i++ {
			recordID := producerID*recordsPerProducer + i

			// Generate test record
			testRecord := GenerateMockTestRecord(recordID)
			key, value := SerializeMockTestRecord(testRecord)

			partition := int32(testRecord.UserID % int64(numPartitions))

			// Produce the record (now async!)
			err := client.PublishRecord(topicName, partition, key, value)
			if err != nil {
				atomic.AddInt64(&totalErrors, 1)
				if atomic.LoadInt64(&totalErrors) < 5 {
					glog.Errorf("Producer %d failed to produce record %d: %v", producerID, recordID, err)
				}
				continue
			}

			atomic.AddInt64(&totalProduced, 1)

			// Log progress
			if (i+1)%1000 == 0 {
				elapsed := time.Since(startTime)
				rate := float64(atomic.LoadInt64(&totalProduced)) / elapsed.Seconds()
				glog.Infof("Producer %d: %d records, current rate: %.0f records/sec",
					producerID, i+1, rate)
			}
		}
		return nil
	}

	// Start concurrent producers
	glog.Infof("Starting %d producers for %d records total", numProducers, totalRecords)

	var wg sync.WaitGroup
	recordsPerProducer := totalRecords / numProducers

	for i := 0; i < numProducers; i++ {
		wg.Add(1)
		go func(producerID int) {
			defer wg.Done()
			if err := producer(producerID, recordsPerProducer); err != nil {
				glog.Errorf("Producer %d failed: %v", producerID, err)
			}
		}(i)
	}

	// Wait for completion
	wg.Wait()

	produceTime := time.Since(startTime)
	finalProduced := atomic.LoadInt64(&totalProduced)
	finalErrors := atomic.LoadInt64(&totalErrors)

	// Performance results
	throughputPerSec := float64(finalProduced) / produceTime.Seconds()
	dataVolumeMB := float64(finalProduced) * 300 / (1024 * 1024) // ~300 bytes per record
	throughputMBPerSec := dataVolumeMB / produceTime.Seconds()

	glog.Infof("\n"+
		"QUICK PERFORMANCE TEST RESULTS\n"+
		"=====================================\n"+
		"Records produced: %d / %d\n"+
		"Production time: %v\n"+
		"Throughput: %.0f records/sec\n"+
		"Data volume: %.1f MB\n"+
		"Bandwidth: %.1f MB/sec\n"+
		"Errors: %d (%.2f%%)\n"+
		"Success rate: %.1f%%\n",
		finalProduced, totalRecords,
		produceTime,
		throughputPerSec,
		dataVolumeMB,
		throughputMBPerSec,
		finalErrors,
		float64(finalErrors)/float64(totalRecords)*100,
		float64(finalProduced)/float64(totalRecords)*100,
	)

	// Assertions
	if finalProduced < int64(totalRecords*0.90) { // Allow 10% tolerance
		t.Errorf("Too few records produced: %d < %d (90%% of target)", finalProduced, int64(float64(totalRecords)*0.90))
	}

	if throughputPerSec < 100 { // Should be much higher than 1 record/sec now!
		t.Errorf("Throughput too low: %.0f records/sec (expected > 100)", throughputPerSec)
	}

	if finalErrors > int64(totalRecords*0.10) { // Error rate should be < 10%
		t.Errorf("Too many errors: %d > %d (10%% of target)", finalErrors, int64(float64(totalRecords)*0.10))
	}

	glog.Infof("Performance test passed! Ready for million-record test.")
}