aboutsummaryrefslogtreecommitdiff
path: root/test/kafka/loadtest
diff options
context:
space:
mode:
Diffstat (limited to 'test/kafka/loadtest')
-rw-r--r--test/kafka/loadtest/mock_million_record_test.go622
-rw-r--r--test/kafka/loadtest/quick_performance_test.go139
-rw-r--r--test/kafka/loadtest/resume_million_test.go208
-rwxr-xr-xtest/kafka/loadtest/run_million_record_test.sh115
-rwxr-xr-xtest/kafka/loadtest/setup_seaweed_infrastructure.sh131
5 files changed, 1215 insertions, 0 deletions
diff --git a/test/kafka/loadtest/mock_million_record_test.go b/test/kafka/loadtest/mock_million_record_test.go
new file mode 100644
index 000000000..ada018cbb
--- /dev/null
+++ b/test/kafka/loadtest/mock_million_record_test.go
@@ -0,0 +1,622 @@
+package integration
+
+import (
+ "context"
+ "fmt"
+ "math/rand"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/keepalive"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// TestRecord represents a record with reasonable fields for integration testing
+type MockTestRecord struct {
+ ID string
+ UserID int64
+ Timestamp int64
+ Event string
+ Data map[string]interface{}
+ Metadata map[string]string
+}
+
+// GenerateTestRecord creates a realistic test record
+func GenerateMockTestRecord(id int) MockTestRecord {
+ events := []string{"user_login", "user_logout", "page_view", "purchase", "signup", "profile_update", "search"}
+ metadata := map[string]string{
+ "source": "web",
+ "version": "1.0.0",
+ "region": "us-west-2",
+ "client_ip": fmt.Sprintf("192.168.%d.%d", rand.Intn(255), rand.Intn(255)),
+ }
+
+ data := map[string]interface{}{
+ "session_id": fmt.Sprintf("sess_%d_%d", id, time.Now().Unix()),
+ "user_agent": "Mozilla/5.0 (compatible; SeaweedFS-Test/1.0)",
+ "referrer": "https://example.com/page" + strconv.Itoa(rand.Intn(100)),
+ "duration": rand.Intn(3600), // seconds
+ "score": rand.Float64() * 100,
+ }
+
+ return MockTestRecord{
+ ID: fmt.Sprintf("record_%d", id),
+ UserID: int64(rand.Intn(10000) + 1),
+ Timestamp: time.Now().UnixNano(),
+ Event: events[rand.Intn(len(events))],
+ Data: data,
+ Metadata: metadata,
+ }
+}
+
+// SerializeTestRecord converts TestRecord to key-value pair for Kafka
+func SerializeMockTestRecord(record MockTestRecord) ([]byte, []byte) {
+ key := fmt.Sprintf("user_%d:%s", record.UserID, record.ID)
+
+ // Create a realistic JSON-like value with reasonable size (200-500 bytes)
+ value := fmt.Sprintf(`{
+ "id": "%s",
+ "user_id": %d,
+ "timestamp": %d,
+ "event": "%s",
+ "session_id": "%v",
+ "user_agent": "%v",
+ "referrer": "%v",
+ "duration": %v,
+ "score": %.2f,
+ "source": "%s",
+ "version": "%s",
+ "region": "%s",
+ "client_ip": "%s",
+ "batch_info": "This is additional data to make the record size more realistic for testing purposes. It simulates the kind of metadata and context that would typically be included in real-world event data."
+ }`,
+ record.ID,
+ record.UserID,
+ record.Timestamp,
+ record.Event,
+ record.Data["session_id"],
+ record.Data["user_agent"],
+ record.Data["referrer"],
+ record.Data["duration"],
+ record.Data["score"],
+ record.Metadata["source"],
+ record.Metadata["version"],
+ record.Metadata["region"],
+ record.Metadata["client_ip"],
+ )
+
+ return []byte(key), []byte(value)
+}
+
+// DirectBrokerClient connects directly to the broker without discovery
+type DirectBrokerClient struct {
+ brokerAddress string
+ conn *grpc.ClientConn
+ client mq_pb.SeaweedMessagingClient
+
+ // Publisher streams: topic-partition -> stream info
+ publishersLock sync.RWMutex
+ publishers map[string]*PublisherSession
+
+ ctx context.Context
+ cancel context.CancelFunc
+}
+
+// PublisherSession tracks a publishing stream to SeaweedMQ broker
+type PublisherSession struct {
+ Topic string
+ Partition int32
+ Stream mq_pb.SeaweedMessaging_PublishMessageClient
+ MessageCount int64 // Track messages sent for batch ack handling
+}
+
+func NewDirectBrokerClient(brokerAddr string) (*DirectBrokerClient, error) {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ // Add connection timeout and keepalive settings
+ conn, err := grpc.DialContext(ctx, brokerAddr,
+ grpc.WithTransportCredentials(insecure.NewCredentials()),
+ grpc.WithTimeout(30*time.Second),
+ grpc.WithKeepaliveParams(keepalive.ClientParameters{
+ Time: 30 * time.Second, // Increased from 10s to 30s
+ Timeout: 10 * time.Second, // Increased from 5s to 10s
+ PermitWithoutStream: false, // Changed to false to reduce pings
+ }))
+ if err != nil {
+ cancel()
+ return nil, fmt.Errorf("failed to connect to broker: %v", err)
+ }
+
+ client := mq_pb.NewSeaweedMessagingClient(conn)
+
+ return &DirectBrokerClient{
+ brokerAddress: brokerAddr,
+ conn: conn,
+ client: client,
+ publishers: make(map[string]*PublisherSession),
+ ctx: ctx,
+ cancel: cancel,
+ }, nil
+}
+
+func (c *DirectBrokerClient) Close() {
+ c.cancel()
+
+ // Close all publisher streams
+ c.publishersLock.Lock()
+ for key := range c.publishers {
+ delete(c.publishers, key)
+ }
+ c.publishersLock.Unlock()
+
+ c.conn.Close()
+}
+
+func (c *DirectBrokerClient) ConfigureTopic(topicName string, partitions int32) error {
+ topic := &schema_pb.Topic{
+ Namespace: "kafka",
+ Name: topicName,
+ }
+
+ // Create schema for MockTestRecord
+ recordType := &schema_pb.RecordType{
+ Fields: []*schema_pb.Field{
+ {
+ Name: "id",
+ FieldIndex: 0,
+ Type: &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING},
+ },
+ },
+ {
+ Name: "user_id",
+ FieldIndex: 1,
+ Type: &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64},
+ },
+ },
+ {
+ Name: "timestamp",
+ FieldIndex: 2,
+ Type: &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64},
+ },
+ },
+ {
+ Name: "event",
+ FieldIndex: 3,
+ Type: &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING},
+ },
+ },
+ {
+ Name: "data",
+ FieldIndex: 4,
+ Type: &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}, // JSON string
+ },
+ },
+ {
+ Name: "metadata",
+ FieldIndex: 5,
+ Type: &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}, // JSON string
+ },
+ },
+ },
+ }
+
+ // Use user_id as the key column for partitioning
+ keyColumns := []string{"user_id"}
+
+ _, err := c.client.ConfigureTopic(c.ctx, &mq_pb.ConfigureTopicRequest{
+ Topic: topic,
+ PartitionCount: partitions,
+ MessageRecordType: recordType,
+ KeyColumns: keyColumns,
+ })
+ return err
+}
+
+func (c *DirectBrokerClient) PublishRecord(topicName string, partition int32, key, value []byte) error {
+ session, err := c.getOrCreatePublisher(topicName, partition)
+ if err != nil {
+ return err
+ }
+
+ // Send data message using broker API format
+ dataMsg := &mq_pb.DataMessage{
+ Key: key,
+ Value: value,
+ TsNs: time.Now().UnixNano(),
+ }
+
+ if err := session.Stream.Send(&mq_pb.PublishMessageRequest{
+ Message: &mq_pb.PublishMessageRequest_Data{
+ Data: dataMsg,
+ },
+ }); err != nil {
+ return fmt.Errorf("failed to send data: %v", err)
+ }
+
+ // Don't wait for individual acks! AckInterval=100 means acks come in batches
+ // The broker will handle acknowledgments asynchronously
+ return nil
+}
+
+// getOrCreatePublisher gets or creates a publisher stream for a topic-partition
+func (c *DirectBrokerClient) getOrCreatePublisher(topic string, partition int32) (*PublisherSession, error) {
+ key := fmt.Sprintf("%s-%d", topic, partition)
+
+ // Try to get existing publisher
+ c.publishersLock.RLock()
+ if session, exists := c.publishers[key]; exists {
+ c.publishersLock.RUnlock()
+ return session, nil
+ }
+ c.publishersLock.RUnlock()
+
+ // Create new publisher stream
+ c.publishersLock.Lock()
+ defer c.publishersLock.Unlock()
+
+ // Double-check after acquiring write lock
+ if session, exists := c.publishers[key]; exists {
+ return session, nil
+ }
+
+ // Create the stream
+ stream, err := c.client.PublishMessage(c.ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create publish stream: %v", err)
+ }
+
+ // Get the actual partition assignment from the broker
+ actualPartition, err := c.getActualPartitionAssignment(topic, partition)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get actual partition assignment: %v", err)
+ }
+
+ // Send init message using the actual partition structure that the broker allocated
+ if err := stream.Send(&mq_pb.PublishMessageRequest{
+ Message: &mq_pb.PublishMessageRequest_Init{
+ Init: &mq_pb.PublishMessageRequest_InitMessage{
+ Topic: &schema_pb.Topic{
+ Namespace: "kafka",
+ Name: topic,
+ },
+ Partition: actualPartition,
+ AckInterval: 200, // Ack every 200 messages for better balance
+ PublisherName: "direct-test",
+ },
+ },
+ }); err != nil {
+ return nil, fmt.Errorf("failed to send init message: %v", err)
+ }
+
+ session := &PublisherSession{
+ Topic: topic,
+ Partition: partition,
+ Stream: stream,
+ MessageCount: 0,
+ }
+
+ c.publishers[key] = session
+ return session, nil
+}
+
+// getActualPartitionAssignment looks up the actual partition assignment from the broker configuration
+func (c *DirectBrokerClient) getActualPartitionAssignment(topic string, kafkaPartition int32) (*schema_pb.Partition, error) {
+ // Look up the topic configuration from the broker to get the actual partition assignments
+ lookupResp, err := c.client.LookupTopicBrokers(c.ctx, &mq_pb.LookupTopicBrokersRequest{
+ Topic: &schema_pb.Topic{
+ Namespace: "kafka",
+ Name: topic,
+ },
+ })
+ if err != nil {
+ return nil, fmt.Errorf("failed to lookup topic brokers: %v", err)
+ }
+
+ if len(lookupResp.BrokerPartitionAssignments) == 0 {
+ return nil, fmt.Errorf("no partition assignments found for topic %s", topic)
+ }
+
+ totalPartitions := int32(len(lookupResp.BrokerPartitionAssignments))
+ if kafkaPartition >= totalPartitions {
+ return nil, fmt.Errorf("kafka partition %d out of range, topic %s has %d partitions",
+ kafkaPartition, topic, totalPartitions)
+ }
+
+ // Calculate expected range for this Kafka partition
+ // Ring is divided equally among partitions, with last partition getting any remainder
+ const ringSize = int32(2520) // MaxPartitionCount constant
+ rangeSize := ringSize / totalPartitions
+ expectedRangeStart := kafkaPartition * rangeSize
+ var expectedRangeStop int32
+
+ if kafkaPartition == totalPartitions-1 {
+ // Last partition gets the remainder to fill the entire ring
+ expectedRangeStop = ringSize
+ } else {
+ expectedRangeStop = (kafkaPartition + 1) * rangeSize
+ }
+
+ // Find the broker assignment that matches this range
+ for _, assignment := range lookupResp.BrokerPartitionAssignments {
+ if assignment.Partition == nil {
+ continue
+ }
+
+ // Check if this assignment's range matches our expected range
+ if assignment.Partition.RangeStart == expectedRangeStart && assignment.Partition.RangeStop == expectedRangeStop {
+ return assignment.Partition, nil
+ }
+ }
+
+ return nil, fmt.Errorf("no broker assignment found for Kafka partition %d with expected range [%d, %d]",
+ kafkaPartition, expectedRangeStart, expectedRangeStop)
+}
+
+// TestDirectBroker_MillionRecordsIntegration tests the broker directly without discovery
+func TestDirectBroker_MillionRecordsIntegration(t *testing.T) {
+ // Skip by default - this is a large integration test
+ if testing.Short() {
+ t.Skip("Skipping million-record integration test in short mode")
+ }
+
+ // Configuration
+ const (
+ totalRecords = 1000000
+ numPartitions = int32(8) // Use multiple partitions for better performance
+ numProducers = 4 // Concurrent producers
+ brokerAddr = "localhost:17777"
+ )
+
+ // Create direct broker client for topic configuration
+ configClient, err := NewDirectBrokerClient(brokerAddr)
+ if err != nil {
+ t.Fatalf("Failed to create direct broker client: %v", err)
+ }
+ defer configClient.Close()
+
+ topicName := fmt.Sprintf("million-records-direct-test-%d", time.Now().Unix())
+
+ // Create topic
+ glog.Infof("Creating topic %s with %d partitions", topicName, numPartitions)
+ err = configClient.ConfigureTopic(topicName, numPartitions)
+ if err != nil {
+ t.Fatalf("Failed to configure topic: %v", err)
+ }
+
+ // Performance tracking
+ var totalProduced int64
+ var totalErrors int64
+ startTime := time.Now()
+
+ // Progress tracking
+ ticker := time.NewTicker(10 * time.Second)
+ defer ticker.Stop()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ produced := atomic.LoadInt64(&totalProduced)
+ errors := atomic.LoadInt64(&totalErrors)
+ elapsed := time.Since(startTime)
+ rate := float64(produced) / elapsed.Seconds()
+ glog.Infof("Progress: %d/%d records (%.1f%%), rate: %.0f records/sec, errors: %d",
+ produced, totalRecords, float64(produced)/float64(totalRecords)*100, rate, errors)
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ // Producer function
+ producer := func(producerID int, recordsPerProducer int) error {
+ defer func() {
+ glog.Infof("Producer %d finished", producerID)
+ }()
+
+ // Create dedicated client for this producer
+ producerClient, err := NewDirectBrokerClient(brokerAddr)
+ if err != nil {
+ return fmt.Errorf("Producer %d failed to create client: %v", producerID, err)
+ }
+ defer producerClient.Close()
+
+ // Add timeout context for each producer
+ producerCtx, producerCancel := context.WithTimeout(ctx, 10*time.Minute)
+ defer producerCancel()
+
+ glog.Infof("Producer %d: About to start producing %d records with dedicated client", producerID, recordsPerProducer)
+
+ for i := 0; i < recordsPerProducer; i++ {
+ // Check if context is cancelled or timed out
+ select {
+ case <-producerCtx.Done():
+ glog.Errorf("Producer %d timed out or cancelled after %d records", producerID, i)
+ return producerCtx.Err()
+ default:
+ }
+
+ // Debug progress for all producers every 50k records
+ if i > 0 && i%50000 == 0 {
+ glog.Infof("Producer %d: Progress %d/%d records (%.1f%%)", producerID, i, recordsPerProducer, float64(i)/float64(recordsPerProducer)*100)
+ }
+ // Calculate global record ID
+ recordID := producerID*recordsPerProducer + i
+
+ // Generate test record
+ testRecord := GenerateMockTestRecord(recordID)
+ key, value := SerializeMockTestRecord(testRecord)
+
+ // Distribute across partitions based on user ID
+ partition := int32(testRecord.UserID % int64(numPartitions))
+
+ // Debug first few records for each producer
+ if i < 3 {
+ glog.Infof("Producer %d: Record %d -> UserID %d -> Partition %d", producerID, i, testRecord.UserID, partition)
+ }
+
+ // Produce the record with retry logic
+ var err error
+ maxRetries := 3
+ for retry := 0; retry < maxRetries; retry++ {
+ err = producerClient.PublishRecord(topicName, partition, key, value)
+ if err == nil {
+ break // Success
+ }
+
+ // If it's an EOF error, wait a bit before retrying
+ if err.Error() == "failed to send data: EOF" {
+ time.Sleep(time.Duration(retry+1) * 100 * time.Millisecond)
+ continue
+ }
+
+ // For other errors, don't retry
+ break
+ }
+
+ if err != nil {
+ atomic.AddInt64(&totalErrors, 1)
+ errorCount := atomic.LoadInt64(&totalErrors)
+ if errorCount < 20 { // Log first 20 errors to get more insight
+ glog.Errorf("Producer %d failed to produce record %d (i=%d) after %d retries: %v", producerID, recordID, i, maxRetries, err)
+ }
+ // Don't continue - this might be causing producers to exit early
+ // Let's see what happens if we return the error instead
+ if errorCount > 1000 { // If too many errors, give up
+ glog.Errorf("Producer %d giving up after %d errors", producerID, errorCount)
+ return fmt.Errorf("too many errors: %d", errorCount)
+ }
+ continue
+ }
+
+ atomic.AddInt64(&totalProduced, 1)
+
+ // Log progress for first producer
+ if producerID == 0 && (i+1)%10000 == 0 {
+ glog.Infof("Producer %d: produced %d records", producerID, i+1)
+ }
+ }
+
+ glog.Infof("Producer %d: Completed loop, produced %d records successfully", producerID, recordsPerProducer)
+ return nil
+ }
+
+ // Start concurrent producers
+ glog.Infof("Starting %d concurrent producers to produce %d records", numProducers, totalRecords)
+
+ var wg sync.WaitGroup
+ recordsPerProducer := totalRecords / numProducers
+
+ for i := 0; i < numProducers; i++ {
+ wg.Add(1)
+ go func(producerID int) {
+ defer wg.Done()
+ glog.Infof("Producer %d starting with %d records to produce", producerID, recordsPerProducer)
+ if err := producer(producerID, recordsPerProducer); err != nil {
+ glog.Errorf("Producer %d failed: %v", producerID, err)
+ }
+ }(i)
+ }
+
+ // Wait for all producers to complete
+ wg.Wait()
+ cancel() // Stop progress reporting
+
+ produceTime := time.Since(startTime)
+ finalProduced := atomic.LoadInt64(&totalProduced)
+ finalErrors := atomic.LoadInt64(&totalErrors)
+
+ glog.Infof("Production completed: %d records in %v (%.0f records/sec), errors: %d",
+ finalProduced, produceTime, float64(finalProduced)/produceTime.Seconds(), finalErrors)
+
+ // Performance summary
+ if finalProduced > 0 {
+ glog.Infof("\n"+
+ "=== PERFORMANCE SUMMARY ===\n"+
+ "Records produced: %d\n"+
+ "Production time: %v\n"+
+ "Production rate: %.0f records/sec\n"+
+ "Errors: %d (%.2f%%)\n"+
+ "Partitions: %d\n"+
+ "Concurrent producers: %d\n"+
+ "Average record size: ~300 bytes\n"+
+ "Total data: ~%.1f MB\n"+
+ "Throughput: ~%.1f MB/sec\n",
+ finalProduced,
+ produceTime,
+ float64(finalProduced)/produceTime.Seconds(),
+ finalErrors,
+ float64(finalErrors)/float64(totalRecords)*100,
+ numPartitions,
+ numProducers,
+ float64(finalProduced)*300/(1024*1024),
+ float64(finalProduced)*300/(1024*1024)/produceTime.Seconds(),
+ )
+ }
+
+ // Test assertions
+ if finalProduced < int64(totalRecords*0.95) { // Allow 5% tolerance for errors
+ t.Errorf("Too few records produced: %d < %d (95%% of target)", finalProduced, int64(float64(totalRecords)*0.95))
+ }
+
+ if finalErrors > int64(totalRecords*0.05) { // Error rate should be < 5%
+ t.Errorf("Too many errors: %d > %d (5%% of target)", finalErrors, int64(float64(totalRecords)*0.05))
+ }
+
+ glog.Infof("Direct broker million-record integration test completed successfully!")
+}
+
+// BenchmarkDirectBroker_ProduceThroughput benchmarks the production throughput
+func BenchmarkDirectBroker_ProduceThroughput(b *testing.B) {
+ if testing.Short() {
+ b.Skip("Skipping benchmark in short mode")
+ }
+
+ client, err := NewDirectBrokerClient("localhost:17777")
+ if err != nil {
+ b.Fatalf("Failed to create client: %v", err)
+ }
+ defer client.Close()
+
+ topicName := fmt.Sprintf("benchmark-topic-%d", time.Now().Unix())
+ err = client.ConfigureTopic(topicName, 1)
+ if err != nil {
+ b.Fatalf("Failed to configure topic: %v", err)
+ }
+
+ // Pre-generate test data
+ records := make([]MockTestRecord, b.N)
+ for i := 0; i < b.N; i++ {
+ records[i] = GenerateMockTestRecord(i)
+ }
+
+ b.ResetTimer()
+ b.StartTimer()
+
+ for i := 0; i < b.N; i++ {
+ key, value := SerializeMockTestRecord(records[i])
+ err := client.PublishRecord(topicName, 0, key, value)
+ if err != nil {
+ b.Fatalf("Failed to produce record %d: %v", i, err)
+ }
+ }
+
+ b.StopTimer()
+}
diff --git a/test/kafka/loadtest/quick_performance_test.go b/test/kafka/loadtest/quick_performance_test.go
new file mode 100644
index 000000000..299a7d948
--- /dev/null
+++ b/test/kafka/loadtest/quick_performance_test.go
@@ -0,0 +1,139 @@
+package integration
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+)
+
+// TestQuickPerformance_10K tests the fixed broker with 10K records
+func TestQuickPerformance_10K(t *testing.T) {
+ const (
+ totalRecords = 10000 // 10K records for quick test
+ numPartitions = int32(4)
+ numProducers = 4
+ brokerAddr = "localhost:17777"
+ )
+
+ // Create direct broker client
+ client, err := NewDirectBrokerClient(brokerAddr)
+ if err != nil {
+ t.Fatalf("Failed to create direct broker client: %v", err)
+ }
+ defer client.Close()
+
+ topicName := fmt.Sprintf("quick-test-%d", time.Now().Unix())
+
+ // Create topic
+ glog.Infof("Creating topic %s with %d partitions", topicName, numPartitions)
+ err = client.ConfigureTopic(topicName, numPartitions)
+ if err != nil {
+ t.Fatalf("Failed to configure topic: %v", err)
+ }
+
+ // Performance tracking
+ var totalProduced int64
+ var totalErrors int64
+ startTime := time.Now()
+
+ // Producer function
+ producer := func(producerID int, recordsPerProducer int) error {
+ for i := 0; i < recordsPerProducer; i++ {
+ recordID := producerID*recordsPerProducer + i
+
+ // Generate test record
+ testRecord := GenerateMockTestRecord(recordID)
+ key, value := SerializeMockTestRecord(testRecord)
+
+ partition := int32(testRecord.UserID % int64(numPartitions))
+
+ // Produce the record (now async!)
+ err := client.PublishRecord(topicName, partition, key, value)
+ if err != nil {
+ atomic.AddInt64(&totalErrors, 1)
+ if atomic.LoadInt64(&totalErrors) < 5 {
+ glog.Errorf("Producer %d failed to produce record %d: %v", producerID, recordID, err)
+ }
+ continue
+ }
+
+ atomic.AddInt64(&totalProduced, 1)
+
+ // Log progress
+ if (i+1)%1000 == 0 {
+ elapsed := time.Since(startTime)
+ rate := float64(atomic.LoadInt64(&totalProduced)) / elapsed.Seconds()
+ glog.Infof("Producer %d: %d records, current rate: %.0f records/sec",
+ producerID, i+1, rate)
+ }
+ }
+ return nil
+ }
+
+ // Start concurrent producers
+ glog.Infof("Starting %d producers for %d records total", numProducers, totalRecords)
+
+ var wg sync.WaitGroup
+ recordsPerProducer := totalRecords / numProducers
+
+ for i := 0; i < numProducers; i++ {
+ wg.Add(1)
+ go func(producerID int) {
+ defer wg.Done()
+ if err := producer(producerID, recordsPerProducer); err != nil {
+ glog.Errorf("Producer %d failed: %v", producerID, err)
+ }
+ }(i)
+ }
+
+ // Wait for completion
+ wg.Wait()
+
+ produceTime := time.Since(startTime)
+ finalProduced := atomic.LoadInt64(&totalProduced)
+ finalErrors := atomic.LoadInt64(&totalErrors)
+
+ // Performance results
+ throughputPerSec := float64(finalProduced) / produceTime.Seconds()
+ dataVolumeMB := float64(finalProduced) * 300 / (1024 * 1024) // ~300 bytes per record
+ throughputMBPerSec := dataVolumeMB / produceTime.Seconds()
+
+ glog.Infof("\n"+
+ "QUICK PERFORMANCE TEST RESULTS\n"+
+ "=====================================\n"+
+ "Records produced: %d / %d\n"+
+ "Production time: %v\n"+
+ "Throughput: %.0f records/sec\n"+
+ "Data volume: %.1f MB\n"+
+ "Bandwidth: %.1f MB/sec\n"+
+ "Errors: %d (%.2f%%)\n"+
+ "Success rate: %.1f%%\n",
+ finalProduced, totalRecords,
+ produceTime,
+ throughputPerSec,
+ dataVolumeMB,
+ throughputMBPerSec,
+ finalErrors,
+ float64(finalErrors)/float64(totalRecords)*100,
+ float64(finalProduced)/float64(totalRecords)*100,
+ )
+
+ // Assertions
+ if finalProduced < int64(totalRecords*0.90) { // Allow 10% tolerance
+ t.Errorf("Too few records produced: %d < %d (90%% of target)", finalProduced, int64(float64(totalRecords)*0.90))
+ }
+
+ if throughputPerSec < 100 { // Should be much higher than 1 record/sec now!
+ t.Errorf("Throughput too low: %.0f records/sec (expected > 100)", throughputPerSec)
+ }
+
+ if finalErrors > int64(totalRecords*0.10) { // Error rate should be < 10%
+ t.Errorf("Too many errors: %d > %d (10%% of target)", finalErrors, int64(float64(totalRecords)*0.10))
+ }
+
+ glog.Infof("Performance test passed! Ready for million-record test.")
+}
diff --git a/test/kafka/loadtest/resume_million_test.go b/test/kafka/loadtest/resume_million_test.go
new file mode 100644
index 000000000..48656c154
--- /dev/null
+++ b/test/kafka/loadtest/resume_million_test.go
@@ -0,0 +1,208 @@
+package integration
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+)
+
+// TestResumeMillionRecords_Fixed - Fixed version with better concurrency handling
+func TestResumeMillionRecords_Fixed(t *testing.T) {
+ const (
+ totalRecords = 1000000
+ numPartitions = int32(8)
+ numProducers = 4
+ brokerAddr = "localhost:17777"
+ batchSize = 100 // Process in smaller batches to avoid overwhelming
+ )
+
+ // Create direct broker client
+ client, err := NewDirectBrokerClient(brokerAddr)
+ if err != nil {
+ t.Fatalf("Failed to create direct broker client: %v", err)
+ }
+ defer client.Close()
+
+ topicName := fmt.Sprintf("resume-million-test-%d", time.Now().Unix())
+
+ // Create topic
+ glog.Infof("Creating topic %s with %d partitions for RESUMED test", topicName, numPartitions)
+ err = client.ConfigureTopic(topicName, numPartitions)
+ if err != nil {
+ t.Fatalf("Failed to configure topic: %v", err)
+ }
+
+ // Performance tracking
+ var totalProduced int64
+ var totalErrors int64
+ startTime := time.Now()
+
+ // Progress tracking
+ ticker := time.NewTicker(5 * time.Second) // More frequent updates
+ defer ticker.Stop()
+
+ go func() {
+ for range ticker.C {
+ produced := atomic.LoadInt64(&totalProduced)
+ errors := atomic.LoadInt64(&totalErrors)
+ elapsed := time.Since(startTime)
+ rate := float64(produced) / elapsed.Seconds()
+ progressPercent := float64(produced) / float64(totalRecords) * 100
+
+ glog.Infof("PROGRESS: %d/%d records (%.1f%%), rate: %.0f records/sec, errors: %d",
+ produced, totalRecords, progressPercent, rate, errors)
+
+ if produced >= totalRecords {
+ return
+ }
+ }
+ }()
+
+ // Fixed producer function with better error handling
+ producer := func(producerID int, recordsPerProducer int) error {
+ defer glog.Infof("Producer %d FINISHED", producerID)
+
+ // Create dedicated clients per producer to avoid contention
+ producerClient, err := NewDirectBrokerClient(brokerAddr)
+ if err != nil {
+ return fmt.Errorf("producer %d failed to create client: %v", producerID, err)
+ }
+ defer producerClient.Close()
+
+ successCount := 0
+ for i := 0; i < recordsPerProducer; i++ {
+ recordID := producerID*recordsPerProducer + i
+
+ // Generate test record
+ testRecord := GenerateMockTestRecord(recordID)
+ key, value := SerializeMockTestRecord(testRecord)
+
+ partition := int32(testRecord.UserID % int64(numPartitions))
+
+ // Produce with retry logic
+ maxRetries := 3
+ var lastErr error
+ success := false
+
+ for retry := 0; retry < maxRetries; retry++ {
+ err := producerClient.PublishRecord(topicName, partition, key, value)
+ if err == nil {
+ success = true
+ break
+ }
+ lastErr = err
+ time.Sleep(time.Duration(retry+1) * 100 * time.Millisecond) // Exponential backoff
+ }
+
+ if success {
+ atomic.AddInt64(&totalProduced, 1)
+ successCount++
+ } else {
+ atomic.AddInt64(&totalErrors, 1)
+ if atomic.LoadInt64(&totalErrors) < 10 {
+ glog.Errorf("Producer %d failed record %d after retries: %v", producerID, recordID, lastErr)
+ }
+ }
+
+ // Batch progress logging
+ if successCount > 0 && successCount%10000 == 0 {
+ glog.Infof("Producer %d: %d/%d records completed", producerID, successCount, recordsPerProducer)
+ }
+
+ // Small delay to prevent overwhelming the broker
+ if i > 0 && i%batchSize == 0 {
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+
+ glog.Infof("Producer %d completed: %d successful, %d errors",
+ producerID, successCount, recordsPerProducer-successCount)
+ return nil
+ }
+
+ // Start concurrent producers
+ glog.Infof("Starting FIXED %d producers for %d records total", numProducers, totalRecords)
+
+ var wg sync.WaitGroup
+ recordsPerProducer := totalRecords / numProducers
+
+ for i := 0; i < numProducers; i++ {
+ wg.Add(1)
+ go func(producerID int) {
+ defer wg.Done()
+ if err := producer(producerID, recordsPerProducer); err != nil {
+ glog.Errorf("Producer %d FAILED: %v", producerID, err)
+ }
+ }(i)
+ }
+
+ // Wait for completion with timeout
+ done := make(chan bool)
+ go func() {
+ wg.Wait()
+ done <- true
+ }()
+
+ select {
+ case <-done:
+ glog.Infof("All producers completed normally")
+ case <-time.After(30 * time.Minute): // 30-minute timeout
+ glog.Errorf("Test timed out after 30 minutes")
+ t.Errorf("Test timed out")
+ return
+ }
+
+ produceTime := time.Since(startTime)
+ finalProduced := atomic.LoadInt64(&totalProduced)
+ finalErrors := atomic.LoadInt64(&totalErrors)
+
+ // Performance results
+ throughputPerSec := float64(finalProduced) / produceTime.Seconds()
+ dataVolumeMB := float64(finalProduced) * 300 / (1024 * 1024)
+ throughputMBPerSec := dataVolumeMB / produceTime.Seconds()
+ successRate := float64(finalProduced) / float64(totalRecords) * 100
+
+ glog.Infof("\n"+
+ "=== FINAL MILLION RECORD TEST RESULTS ===\n"+
+ "==========================================\n"+
+ "Records produced: %d / %d\n"+
+ "Production time: %v\n"+
+ "Average throughput: %.0f records/sec\n"+
+ "Data volume: %.1f MB\n"+
+ "Bandwidth: %.1f MB/sec\n"+
+ "Errors: %d (%.2f%%)\n"+
+ "Success rate: %.1f%%\n"+
+ "Partitions used: %d\n"+
+ "Concurrent producers: %d\n",
+ finalProduced, totalRecords,
+ produceTime,
+ throughputPerSec,
+ dataVolumeMB,
+ throughputMBPerSec,
+ finalErrors,
+ float64(finalErrors)/float64(totalRecords)*100,
+ successRate,
+ numPartitions,
+ numProducers,
+ )
+
+ // Test assertions
+ if finalProduced < int64(totalRecords*0.95) { // Allow 5% tolerance
+ t.Errorf("Too few records produced: %d < %d (95%% of target)", finalProduced, int64(float64(totalRecords)*0.95))
+ }
+
+ if finalErrors > int64(totalRecords*0.05) { // Error rate should be < 5%
+ t.Errorf("Too many errors: %d > %d (5%% of target)", finalErrors, int64(float64(totalRecords)*0.05))
+ }
+
+ if throughputPerSec < 100 {
+ t.Errorf("Throughput too low: %.0f records/sec (expected > 100)", throughputPerSec)
+ }
+
+ glog.Infof("🏆 MILLION RECORD KAFKA INTEGRATION TEST COMPLETED SUCCESSFULLY!")
+}
+
diff --git a/test/kafka/loadtest/run_million_record_test.sh b/test/kafka/loadtest/run_million_record_test.sh
new file mode 100755
index 000000000..0728e8121
--- /dev/null
+++ b/test/kafka/loadtest/run_million_record_test.sh
@@ -0,0 +1,115 @@
+#!/bin/bash
+
+# Script to run the Kafka Gateway Million Record Integration Test
+# This test requires a running SeaweedFS infrastructure (Master, Filer, MQ Broker)
+
+set -e
+
+echo "=== SeaweedFS Kafka Gateway Million Record Integration Test ==="
+echo "Test Date: $(date)"
+echo "Hostname: $(hostname)"
+echo ""
+
+# Configuration
+MASTERS=${SEAWEED_MASTERS:-"localhost:9333"}
+FILER_GROUP=${SEAWEED_FILER_GROUP:-"default"}
+TEST_DIR="."
+TEST_NAME="TestDirectBroker_MillionRecordsIntegration"
+
+echo "Configuration:"
+echo " Masters: $MASTERS"
+echo " Filer Group: $FILER_GROUP"
+echo " Test Directory: $TEST_DIR"
+echo ""
+
+# Check if SeaweedFS infrastructure is running
+echo "=== Checking Infrastructure ==="
+
+# Function to check if a service is running
+check_service() {
+ local host_port=$1
+ local service_name=$2
+
+ if timeout 3 bash -c "</dev/tcp/${host_port//://}" 2>/dev/null; then
+ echo "✓ $service_name is running on $host_port"
+ return 0
+ else
+ echo "✗ $service_name is NOT running on $host_port"
+ return 1
+ fi
+}
+
+# Check each master
+IFS=',' read -ra MASTER_ARRAY <<< "$MASTERS"
+MASTERS_OK=true
+for master in "${MASTER_ARRAY[@]}"; do
+ if ! check_service "$master" "SeaweedFS Master"; then
+ MASTERS_OK=false
+ fi
+done
+
+if [ "$MASTERS_OK" = false ]; then
+ echo ""
+ echo "ERROR: One or more SeaweedFS Masters are not running."
+ echo "Please start your SeaweedFS infrastructure before running this test."
+ echo ""
+ echo "Example commands to start SeaweedFS:"
+ echo " # Terminal 1: Start Master"
+ echo " weed master -defaultReplication=001 -mdir=/tmp/seaweedfs/master"
+ echo ""
+ echo " # Terminal 2: Start Filer"
+ echo " weed filer -master=localhost:9333 -filer.dir=/tmp/seaweedfs/filer"
+ echo ""
+ echo " # Terminal 3: Start MQ Broker"
+ echo " weed mq.broker -filer=localhost:8888 -master=localhost:9333"
+ echo ""
+ exit 1
+fi
+
+echo ""
+echo "=== Infrastructure Check Passed ==="
+echo ""
+
+# Change to the correct directory
+cd "$TEST_DIR"
+
+# Set environment variables for the test
+export SEAWEED_MASTERS="$MASTERS"
+export SEAWEED_FILER_GROUP="$FILER_GROUP"
+
+# Run the test with verbose output
+echo "=== Running Million Record Integration Test ==="
+echo "This may take several minutes..."
+echo ""
+
+# Run the specific test with timeout and verbose output
+timeout 1800 go test -v -run "$TEST_NAME" -timeout=30m 2>&1 | tee /tmp/seaweed_million_record_test.log
+
+TEST_EXIT_CODE=${PIPESTATUS[0]}
+
+echo ""
+echo "=== Test Completed ==="
+echo "Exit Code: $TEST_EXIT_CODE"
+echo "Full log available at: /tmp/seaweed_million_record_test.log"
+echo ""
+
+# Show summary from the log
+echo "=== Performance Summary ==="
+if grep -q "PERFORMANCE SUMMARY" /tmp/seaweed_million_record_test.log; then
+ grep -A 15 "PERFORMANCE SUMMARY" /tmp/seaweed_million_record_test.log
+else
+ echo "Performance summary not found in log"
+fi
+
+echo ""
+
+if [ $TEST_EXIT_CODE -eq 0 ]; then
+ echo "🎉 TEST PASSED: Million record integration test completed successfully!"
+else
+ echo "❌ TEST FAILED: Million record integration test failed with exit code $TEST_EXIT_CODE"
+ echo "Check the log file for details: /tmp/seaweed_million_record_test.log"
+fi
+
+echo ""
+echo "=== Test Run Complete ==="
+exit $TEST_EXIT_CODE
diff --git a/test/kafka/loadtest/setup_seaweed_infrastructure.sh b/test/kafka/loadtest/setup_seaweed_infrastructure.sh
new file mode 100755
index 000000000..448119097
--- /dev/null
+++ b/test/kafka/loadtest/setup_seaweed_infrastructure.sh
@@ -0,0 +1,131 @@
+#!/bin/bash
+
+# Script to set up SeaweedFS infrastructure for Kafka Gateway testing
+# This script will start Master, Filer, and MQ Broker components
+
+set -e
+
+BASE_DIR="/tmp/seaweedfs"
+LOG_DIR="$BASE_DIR/logs"
+DATA_DIR="$BASE_DIR/data"
+
+echo "=== SeaweedFS Infrastructure Setup ==="
+echo "Setup Date: $(date)"
+echo "Base Directory: $BASE_DIR"
+echo ""
+
+# Create directories
+mkdir -p "$BASE_DIR/master" "$BASE_DIR/filer" "$BASE_DIR/broker" "$LOG_DIR"
+
+# Function to check if a service is running
+check_service() {
+ local host_port=$1
+ local service_name=$2
+
+ if timeout 3 bash -c "</dev/tcp/${host_port//://}" 2>/dev/null; then
+ echo "✓ $service_name is already running on $host_port"
+ return 0
+ else
+ echo "✗ $service_name is NOT running on $host_port"
+ return 1
+ fi
+}
+
+# Function to start a service in background
+start_service() {
+ local cmd="$1"
+ local service_name="$2"
+ local log_file="$3"
+ local check_port="$4"
+
+ echo "Starting $service_name..."
+ echo "Command: $cmd"
+ echo "Log: $log_file"
+
+ # Start in background
+ nohup $cmd > "$log_file" 2>&1 &
+ local pid=$!
+ echo "PID: $pid"
+
+ # Wait for service to be ready
+ local retries=30
+ while [ $retries -gt 0 ]; do
+ if check_service "$check_port" "$service_name" 2>/dev/null; then
+ echo "✓ $service_name is ready"
+ return 0
+ fi
+ retries=$((retries - 1))
+ sleep 1
+ echo -n "."
+ done
+ echo ""
+ echo "❌ $service_name failed to start within 30 seconds"
+ return 1
+}
+
+# Stop any existing processes
+echo "=== Cleaning up existing processes ==="
+pkill -f "weed master" || true
+pkill -f "weed filer" || true
+pkill -f "weed mq.broker" || true
+sleep 2
+
+echo ""
+echo "=== Starting SeaweedFS Components ==="
+
+# Start Master
+if ! check_service "localhost:9333" "SeaweedFS Master"; then
+ start_service \
+ "weed master -defaultReplication=001 -mdir=$BASE_DIR/master" \
+ "SeaweedFS Master" \
+ "$LOG_DIR/master.log" \
+ "localhost:9333"
+ echo ""
+fi
+
+# Start Filer
+if ! check_service "localhost:8888" "SeaweedFS Filer"; then
+ start_service \
+ "weed filer -master=localhost:9333 -filer.dir=$BASE_DIR/filer" \
+ "SeaweedFS Filer" \
+ "$LOG_DIR/filer.log" \
+ "localhost:8888"
+ echo ""
+fi
+
+# Start MQ Broker
+if ! check_service "localhost:17777" "SeaweedFS MQ Broker"; then
+ start_service \
+ "weed mq.broker -filer=localhost:8888 -master=localhost:9333" \
+ "SeaweedFS MQ Broker" \
+ "$LOG_DIR/broker.log" \
+ "localhost:17777"
+ echo ""
+fi
+
+echo "=== Infrastructure Status ==="
+check_service "localhost:9333" "Master (gRPC)"
+check_service "localhost:9334" "Master (HTTP)"
+check_service "localhost:8888" "Filer (HTTP)"
+check_service "localhost:18888" "Filer (gRPC)"
+check_service "localhost:17777" "MQ Broker"
+
+echo ""
+echo "=== Infrastructure Ready ==="
+echo "Log files:"
+echo " Master: $LOG_DIR/master.log"
+echo " Filer: $LOG_DIR/filer.log"
+echo " Broker: $LOG_DIR/broker.log"
+echo ""
+echo "To view logs in real-time:"
+echo " tail -f $LOG_DIR/master.log"
+echo " tail -f $LOG_DIR/filer.log"
+echo " tail -f $LOG_DIR/broker.log"
+echo ""
+echo "To stop all services:"
+echo " pkill -f \"weed master\""
+echo " pkill -f \"weed filer\""
+echo " pkill -f \"weed mq.broker\""
+echo ""
+echo "[OK] SeaweedFS infrastructure is ready for testing!"
+