diff options
Diffstat (limited to 'test/kafka/loadtest')
| -rw-r--r-- | test/kafka/loadtest/mock_million_record_test.go | 622 | ||||
| -rw-r--r-- | test/kafka/loadtest/quick_performance_test.go | 139 | ||||
| -rw-r--r-- | test/kafka/loadtest/resume_million_test.go | 208 | ||||
| -rwxr-xr-x | test/kafka/loadtest/run_million_record_test.sh | 115 | ||||
| -rwxr-xr-x | test/kafka/loadtest/setup_seaweed_infrastructure.sh | 131 |
5 files changed, 1215 insertions, 0 deletions
diff --git a/test/kafka/loadtest/mock_million_record_test.go b/test/kafka/loadtest/mock_million_record_test.go new file mode 100644 index 000000000..ada018cbb --- /dev/null +++ b/test/kafka/loadtest/mock_million_record_test.go @@ -0,0 +1,622 @@ +package integration + +import ( + "context" + "fmt" + "math/rand" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// TestRecord represents a record with reasonable fields for integration testing +type MockTestRecord struct { + ID string + UserID int64 + Timestamp int64 + Event string + Data map[string]interface{} + Metadata map[string]string +} + +// GenerateTestRecord creates a realistic test record +func GenerateMockTestRecord(id int) MockTestRecord { + events := []string{"user_login", "user_logout", "page_view", "purchase", "signup", "profile_update", "search"} + metadata := map[string]string{ + "source": "web", + "version": "1.0.0", + "region": "us-west-2", + "client_ip": fmt.Sprintf("192.168.%d.%d", rand.Intn(255), rand.Intn(255)), + } + + data := map[string]interface{}{ + "session_id": fmt.Sprintf("sess_%d_%d", id, time.Now().Unix()), + "user_agent": "Mozilla/5.0 (compatible; SeaweedFS-Test/1.0)", + "referrer": "https://example.com/page" + strconv.Itoa(rand.Intn(100)), + "duration": rand.Intn(3600), // seconds + "score": rand.Float64() * 100, + } + + return MockTestRecord{ + ID: fmt.Sprintf("record_%d", id), + UserID: int64(rand.Intn(10000) + 1), + Timestamp: time.Now().UnixNano(), + Event: events[rand.Intn(len(events))], + Data: data, + Metadata: metadata, + } +} + +// SerializeTestRecord converts TestRecord to key-value pair for Kafka +func SerializeMockTestRecord(record MockTestRecord) ([]byte, []byte) { + key := fmt.Sprintf("user_%d:%s", record.UserID, record.ID) + + // Create a realistic JSON-like value with reasonable size (200-500 bytes) + value := fmt.Sprintf(`{ + "id": "%s", + "user_id": %d, + "timestamp": %d, + "event": "%s", + "session_id": "%v", + "user_agent": "%v", + "referrer": "%v", + "duration": %v, + "score": %.2f, + "source": "%s", + "version": "%s", + "region": "%s", + "client_ip": "%s", + "batch_info": "This is additional data to make the record size more realistic for testing purposes. It simulates the kind of metadata and context that would typically be included in real-world event data." + }`, + record.ID, + record.UserID, + record.Timestamp, + record.Event, + record.Data["session_id"], + record.Data["user_agent"], + record.Data["referrer"], + record.Data["duration"], + record.Data["score"], + record.Metadata["source"], + record.Metadata["version"], + record.Metadata["region"], + record.Metadata["client_ip"], + ) + + return []byte(key), []byte(value) +} + +// DirectBrokerClient connects directly to the broker without discovery +type DirectBrokerClient struct { + brokerAddress string + conn *grpc.ClientConn + client mq_pb.SeaweedMessagingClient + + // Publisher streams: topic-partition -> stream info + publishersLock sync.RWMutex + publishers map[string]*PublisherSession + + ctx context.Context + cancel context.CancelFunc +} + +// PublisherSession tracks a publishing stream to SeaweedMQ broker +type PublisherSession struct { + Topic string + Partition int32 + Stream mq_pb.SeaweedMessaging_PublishMessageClient + MessageCount int64 // Track messages sent for batch ack handling +} + +func NewDirectBrokerClient(brokerAddr string) (*DirectBrokerClient, error) { + ctx, cancel := context.WithCancel(context.Background()) + + // Add connection timeout and keepalive settings + conn, err := grpc.DialContext(ctx, brokerAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithTimeout(30*time.Second), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 30 * time.Second, // Increased from 10s to 30s + Timeout: 10 * time.Second, // Increased from 5s to 10s + PermitWithoutStream: false, // Changed to false to reduce pings + })) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to connect to broker: %v", err) + } + + client := mq_pb.NewSeaweedMessagingClient(conn) + + return &DirectBrokerClient{ + brokerAddress: brokerAddr, + conn: conn, + client: client, + publishers: make(map[string]*PublisherSession), + ctx: ctx, + cancel: cancel, + }, nil +} + +func (c *DirectBrokerClient) Close() { + c.cancel() + + // Close all publisher streams + c.publishersLock.Lock() + for key := range c.publishers { + delete(c.publishers, key) + } + c.publishersLock.Unlock() + + c.conn.Close() +} + +func (c *DirectBrokerClient) ConfigureTopic(topicName string, partitions int32) error { + topic := &schema_pb.Topic{ + Namespace: "kafka", + Name: topicName, + } + + // Create schema for MockTestRecord + recordType := &schema_pb.RecordType{ + Fields: []*schema_pb.Field{ + { + Name: "id", + FieldIndex: 0, + Type: &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}, + }, + }, + { + Name: "user_id", + FieldIndex: 1, + Type: &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}, + }, + }, + { + Name: "timestamp", + FieldIndex: 2, + Type: &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}, + }, + }, + { + Name: "event", + FieldIndex: 3, + Type: &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}, + }, + }, + { + Name: "data", + FieldIndex: 4, + Type: &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}, // JSON string + }, + }, + { + Name: "metadata", + FieldIndex: 5, + Type: &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}, // JSON string + }, + }, + }, + } + + // Use user_id as the key column for partitioning + keyColumns := []string{"user_id"} + + _, err := c.client.ConfigureTopic(c.ctx, &mq_pb.ConfigureTopicRequest{ + Topic: topic, + PartitionCount: partitions, + MessageRecordType: recordType, + KeyColumns: keyColumns, + }) + return err +} + +func (c *DirectBrokerClient) PublishRecord(topicName string, partition int32, key, value []byte) error { + session, err := c.getOrCreatePublisher(topicName, partition) + if err != nil { + return err + } + + // Send data message using broker API format + dataMsg := &mq_pb.DataMessage{ + Key: key, + Value: value, + TsNs: time.Now().UnixNano(), + } + + if err := session.Stream.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Data{ + Data: dataMsg, + }, + }); err != nil { + return fmt.Errorf("failed to send data: %v", err) + } + + // Don't wait for individual acks! AckInterval=100 means acks come in batches + // The broker will handle acknowledgments asynchronously + return nil +} + +// getOrCreatePublisher gets or creates a publisher stream for a topic-partition +func (c *DirectBrokerClient) getOrCreatePublisher(topic string, partition int32) (*PublisherSession, error) { + key := fmt.Sprintf("%s-%d", topic, partition) + + // Try to get existing publisher + c.publishersLock.RLock() + if session, exists := c.publishers[key]; exists { + c.publishersLock.RUnlock() + return session, nil + } + c.publishersLock.RUnlock() + + // Create new publisher stream + c.publishersLock.Lock() + defer c.publishersLock.Unlock() + + // Double-check after acquiring write lock + if session, exists := c.publishers[key]; exists { + return session, nil + } + + // Create the stream + stream, err := c.client.PublishMessage(c.ctx) + if err != nil { + return nil, fmt.Errorf("failed to create publish stream: %v", err) + } + + // Get the actual partition assignment from the broker + actualPartition, err := c.getActualPartitionAssignment(topic, partition) + if err != nil { + return nil, fmt.Errorf("failed to get actual partition assignment: %v", err) + } + + // Send init message using the actual partition structure that the broker allocated + if err := stream.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Init{ + Init: &mq_pb.PublishMessageRequest_InitMessage{ + Topic: &schema_pb.Topic{ + Namespace: "kafka", + Name: topic, + }, + Partition: actualPartition, + AckInterval: 200, // Ack every 200 messages for better balance + PublisherName: "direct-test", + }, + }, + }); err != nil { + return nil, fmt.Errorf("failed to send init message: %v", err) + } + + session := &PublisherSession{ + Topic: topic, + Partition: partition, + Stream: stream, + MessageCount: 0, + } + + c.publishers[key] = session + return session, nil +} + +// getActualPartitionAssignment looks up the actual partition assignment from the broker configuration +func (c *DirectBrokerClient) getActualPartitionAssignment(topic string, kafkaPartition int32) (*schema_pb.Partition, error) { + // Look up the topic configuration from the broker to get the actual partition assignments + lookupResp, err := c.client.LookupTopicBrokers(c.ctx, &mq_pb.LookupTopicBrokersRequest{ + Topic: &schema_pb.Topic{ + Namespace: "kafka", + Name: topic, + }, + }) + if err != nil { + return nil, fmt.Errorf("failed to lookup topic brokers: %v", err) + } + + if len(lookupResp.BrokerPartitionAssignments) == 0 { + return nil, fmt.Errorf("no partition assignments found for topic %s", topic) + } + + totalPartitions := int32(len(lookupResp.BrokerPartitionAssignments)) + if kafkaPartition >= totalPartitions { + return nil, fmt.Errorf("kafka partition %d out of range, topic %s has %d partitions", + kafkaPartition, topic, totalPartitions) + } + + // Calculate expected range for this Kafka partition + // Ring is divided equally among partitions, with last partition getting any remainder + const ringSize = int32(2520) // MaxPartitionCount constant + rangeSize := ringSize / totalPartitions + expectedRangeStart := kafkaPartition * rangeSize + var expectedRangeStop int32 + + if kafkaPartition == totalPartitions-1 { + // Last partition gets the remainder to fill the entire ring + expectedRangeStop = ringSize + } else { + expectedRangeStop = (kafkaPartition + 1) * rangeSize + } + + // Find the broker assignment that matches this range + for _, assignment := range lookupResp.BrokerPartitionAssignments { + if assignment.Partition == nil { + continue + } + + // Check if this assignment's range matches our expected range + if assignment.Partition.RangeStart == expectedRangeStart && assignment.Partition.RangeStop == expectedRangeStop { + return assignment.Partition, nil + } + } + + return nil, fmt.Errorf("no broker assignment found for Kafka partition %d with expected range [%d, %d]", + kafkaPartition, expectedRangeStart, expectedRangeStop) +} + +// TestDirectBroker_MillionRecordsIntegration tests the broker directly without discovery +func TestDirectBroker_MillionRecordsIntegration(t *testing.T) { + // Skip by default - this is a large integration test + if testing.Short() { + t.Skip("Skipping million-record integration test in short mode") + } + + // Configuration + const ( + totalRecords = 1000000 + numPartitions = int32(8) // Use multiple partitions for better performance + numProducers = 4 // Concurrent producers + brokerAddr = "localhost:17777" + ) + + // Create direct broker client for topic configuration + configClient, err := NewDirectBrokerClient(brokerAddr) + if err != nil { + t.Fatalf("Failed to create direct broker client: %v", err) + } + defer configClient.Close() + + topicName := fmt.Sprintf("million-records-direct-test-%d", time.Now().Unix()) + + // Create topic + glog.Infof("Creating topic %s with %d partitions", topicName, numPartitions) + err = configClient.ConfigureTopic(topicName, numPartitions) + if err != nil { + t.Fatalf("Failed to configure topic: %v", err) + } + + // Performance tracking + var totalProduced int64 + var totalErrors int64 + startTime := time.Now() + + // Progress tracking + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + select { + case <-ticker.C: + produced := atomic.LoadInt64(&totalProduced) + errors := atomic.LoadInt64(&totalErrors) + elapsed := time.Since(startTime) + rate := float64(produced) / elapsed.Seconds() + glog.Infof("Progress: %d/%d records (%.1f%%), rate: %.0f records/sec, errors: %d", + produced, totalRecords, float64(produced)/float64(totalRecords)*100, rate, errors) + case <-ctx.Done(): + return + } + } + }() + + // Producer function + producer := func(producerID int, recordsPerProducer int) error { + defer func() { + glog.Infof("Producer %d finished", producerID) + }() + + // Create dedicated client for this producer + producerClient, err := NewDirectBrokerClient(brokerAddr) + if err != nil { + return fmt.Errorf("Producer %d failed to create client: %v", producerID, err) + } + defer producerClient.Close() + + // Add timeout context for each producer + producerCtx, producerCancel := context.WithTimeout(ctx, 10*time.Minute) + defer producerCancel() + + glog.Infof("Producer %d: About to start producing %d records with dedicated client", producerID, recordsPerProducer) + + for i := 0; i < recordsPerProducer; i++ { + // Check if context is cancelled or timed out + select { + case <-producerCtx.Done(): + glog.Errorf("Producer %d timed out or cancelled after %d records", producerID, i) + return producerCtx.Err() + default: + } + + // Debug progress for all producers every 50k records + if i > 0 && i%50000 == 0 { + glog.Infof("Producer %d: Progress %d/%d records (%.1f%%)", producerID, i, recordsPerProducer, float64(i)/float64(recordsPerProducer)*100) + } + // Calculate global record ID + recordID := producerID*recordsPerProducer + i + + // Generate test record + testRecord := GenerateMockTestRecord(recordID) + key, value := SerializeMockTestRecord(testRecord) + + // Distribute across partitions based on user ID + partition := int32(testRecord.UserID % int64(numPartitions)) + + // Debug first few records for each producer + if i < 3 { + glog.Infof("Producer %d: Record %d -> UserID %d -> Partition %d", producerID, i, testRecord.UserID, partition) + } + + // Produce the record with retry logic + var err error + maxRetries := 3 + for retry := 0; retry < maxRetries; retry++ { + err = producerClient.PublishRecord(topicName, partition, key, value) + if err == nil { + break // Success + } + + // If it's an EOF error, wait a bit before retrying + if err.Error() == "failed to send data: EOF" { + time.Sleep(time.Duration(retry+1) * 100 * time.Millisecond) + continue + } + + // For other errors, don't retry + break + } + + if err != nil { + atomic.AddInt64(&totalErrors, 1) + errorCount := atomic.LoadInt64(&totalErrors) + if errorCount < 20 { // Log first 20 errors to get more insight + glog.Errorf("Producer %d failed to produce record %d (i=%d) after %d retries: %v", producerID, recordID, i, maxRetries, err) + } + // Don't continue - this might be causing producers to exit early + // Let's see what happens if we return the error instead + if errorCount > 1000 { // If too many errors, give up + glog.Errorf("Producer %d giving up after %d errors", producerID, errorCount) + return fmt.Errorf("too many errors: %d", errorCount) + } + continue + } + + atomic.AddInt64(&totalProduced, 1) + + // Log progress for first producer + if producerID == 0 && (i+1)%10000 == 0 { + glog.Infof("Producer %d: produced %d records", producerID, i+1) + } + } + + glog.Infof("Producer %d: Completed loop, produced %d records successfully", producerID, recordsPerProducer) + return nil + } + + // Start concurrent producers + glog.Infof("Starting %d concurrent producers to produce %d records", numProducers, totalRecords) + + var wg sync.WaitGroup + recordsPerProducer := totalRecords / numProducers + + for i := 0; i < numProducers; i++ { + wg.Add(1) + go func(producerID int) { + defer wg.Done() + glog.Infof("Producer %d starting with %d records to produce", producerID, recordsPerProducer) + if err := producer(producerID, recordsPerProducer); err != nil { + glog.Errorf("Producer %d failed: %v", producerID, err) + } + }(i) + } + + // Wait for all producers to complete + wg.Wait() + cancel() // Stop progress reporting + + produceTime := time.Since(startTime) + finalProduced := atomic.LoadInt64(&totalProduced) + finalErrors := atomic.LoadInt64(&totalErrors) + + glog.Infof("Production completed: %d records in %v (%.0f records/sec), errors: %d", + finalProduced, produceTime, float64(finalProduced)/produceTime.Seconds(), finalErrors) + + // Performance summary + if finalProduced > 0 { + glog.Infof("\n"+ + "=== PERFORMANCE SUMMARY ===\n"+ + "Records produced: %d\n"+ + "Production time: %v\n"+ + "Production rate: %.0f records/sec\n"+ + "Errors: %d (%.2f%%)\n"+ + "Partitions: %d\n"+ + "Concurrent producers: %d\n"+ + "Average record size: ~300 bytes\n"+ + "Total data: ~%.1f MB\n"+ + "Throughput: ~%.1f MB/sec\n", + finalProduced, + produceTime, + float64(finalProduced)/produceTime.Seconds(), + finalErrors, + float64(finalErrors)/float64(totalRecords)*100, + numPartitions, + numProducers, + float64(finalProduced)*300/(1024*1024), + float64(finalProduced)*300/(1024*1024)/produceTime.Seconds(), + ) + } + + // Test assertions + if finalProduced < int64(totalRecords*0.95) { // Allow 5% tolerance for errors + t.Errorf("Too few records produced: %d < %d (95%% of target)", finalProduced, int64(float64(totalRecords)*0.95)) + } + + if finalErrors > int64(totalRecords*0.05) { // Error rate should be < 5% + t.Errorf("Too many errors: %d > %d (5%% of target)", finalErrors, int64(float64(totalRecords)*0.05)) + } + + glog.Infof("Direct broker million-record integration test completed successfully!") +} + +// BenchmarkDirectBroker_ProduceThroughput benchmarks the production throughput +func BenchmarkDirectBroker_ProduceThroughput(b *testing.B) { + if testing.Short() { + b.Skip("Skipping benchmark in short mode") + } + + client, err := NewDirectBrokerClient("localhost:17777") + if err != nil { + b.Fatalf("Failed to create client: %v", err) + } + defer client.Close() + + topicName := fmt.Sprintf("benchmark-topic-%d", time.Now().Unix()) + err = client.ConfigureTopic(topicName, 1) + if err != nil { + b.Fatalf("Failed to configure topic: %v", err) + } + + // Pre-generate test data + records := make([]MockTestRecord, b.N) + for i := 0; i < b.N; i++ { + records[i] = GenerateMockTestRecord(i) + } + + b.ResetTimer() + b.StartTimer() + + for i := 0; i < b.N; i++ { + key, value := SerializeMockTestRecord(records[i]) + err := client.PublishRecord(topicName, 0, key, value) + if err != nil { + b.Fatalf("Failed to produce record %d: %v", i, err) + } + } + + b.StopTimer() +} diff --git a/test/kafka/loadtest/quick_performance_test.go b/test/kafka/loadtest/quick_performance_test.go new file mode 100644 index 000000000..299a7d948 --- /dev/null +++ b/test/kafka/loadtest/quick_performance_test.go @@ -0,0 +1,139 @@ +package integration + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +// TestQuickPerformance_10K tests the fixed broker with 10K records +func TestQuickPerformance_10K(t *testing.T) { + const ( + totalRecords = 10000 // 10K records for quick test + numPartitions = int32(4) + numProducers = 4 + brokerAddr = "localhost:17777" + ) + + // Create direct broker client + client, err := NewDirectBrokerClient(brokerAddr) + if err != nil { + t.Fatalf("Failed to create direct broker client: %v", err) + } + defer client.Close() + + topicName := fmt.Sprintf("quick-test-%d", time.Now().Unix()) + + // Create topic + glog.Infof("Creating topic %s with %d partitions", topicName, numPartitions) + err = client.ConfigureTopic(topicName, numPartitions) + if err != nil { + t.Fatalf("Failed to configure topic: %v", err) + } + + // Performance tracking + var totalProduced int64 + var totalErrors int64 + startTime := time.Now() + + // Producer function + producer := func(producerID int, recordsPerProducer int) error { + for i := 0; i < recordsPerProducer; i++ { + recordID := producerID*recordsPerProducer + i + + // Generate test record + testRecord := GenerateMockTestRecord(recordID) + key, value := SerializeMockTestRecord(testRecord) + + partition := int32(testRecord.UserID % int64(numPartitions)) + + // Produce the record (now async!) + err := client.PublishRecord(topicName, partition, key, value) + if err != nil { + atomic.AddInt64(&totalErrors, 1) + if atomic.LoadInt64(&totalErrors) < 5 { + glog.Errorf("Producer %d failed to produce record %d: %v", producerID, recordID, err) + } + continue + } + + atomic.AddInt64(&totalProduced, 1) + + // Log progress + if (i+1)%1000 == 0 { + elapsed := time.Since(startTime) + rate := float64(atomic.LoadInt64(&totalProduced)) / elapsed.Seconds() + glog.Infof("Producer %d: %d records, current rate: %.0f records/sec", + producerID, i+1, rate) + } + } + return nil + } + + // Start concurrent producers + glog.Infof("Starting %d producers for %d records total", numProducers, totalRecords) + + var wg sync.WaitGroup + recordsPerProducer := totalRecords / numProducers + + for i := 0; i < numProducers; i++ { + wg.Add(1) + go func(producerID int) { + defer wg.Done() + if err := producer(producerID, recordsPerProducer); err != nil { + glog.Errorf("Producer %d failed: %v", producerID, err) + } + }(i) + } + + // Wait for completion + wg.Wait() + + produceTime := time.Since(startTime) + finalProduced := atomic.LoadInt64(&totalProduced) + finalErrors := atomic.LoadInt64(&totalErrors) + + // Performance results + throughputPerSec := float64(finalProduced) / produceTime.Seconds() + dataVolumeMB := float64(finalProduced) * 300 / (1024 * 1024) // ~300 bytes per record + throughputMBPerSec := dataVolumeMB / produceTime.Seconds() + + glog.Infof("\n"+ + "QUICK PERFORMANCE TEST RESULTS\n"+ + "=====================================\n"+ + "Records produced: %d / %d\n"+ + "Production time: %v\n"+ + "Throughput: %.0f records/sec\n"+ + "Data volume: %.1f MB\n"+ + "Bandwidth: %.1f MB/sec\n"+ + "Errors: %d (%.2f%%)\n"+ + "Success rate: %.1f%%\n", + finalProduced, totalRecords, + produceTime, + throughputPerSec, + dataVolumeMB, + throughputMBPerSec, + finalErrors, + float64(finalErrors)/float64(totalRecords)*100, + float64(finalProduced)/float64(totalRecords)*100, + ) + + // Assertions + if finalProduced < int64(totalRecords*0.90) { // Allow 10% tolerance + t.Errorf("Too few records produced: %d < %d (90%% of target)", finalProduced, int64(float64(totalRecords)*0.90)) + } + + if throughputPerSec < 100 { // Should be much higher than 1 record/sec now! + t.Errorf("Throughput too low: %.0f records/sec (expected > 100)", throughputPerSec) + } + + if finalErrors > int64(totalRecords*0.10) { // Error rate should be < 10% + t.Errorf("Too many errors: %d > %d (10%% of target)", finalErrors, int64(float64(totalRecords)*0.10)) + } + + glog.Infof("Performance test passed! Ready for million-record test.") +} diff --git a/test/kafka/loadtest/resume_million_test.go b/test/kafka/loadtest/resume_million_test.go new file mode 100644 index 000000000..48656c154 --- /dev/null +++ b/test/kafka/loadtest/resume_million_test.go @@ -0,0 +1,208 @@ +package integration + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +// TestResumeMillionRecords_Fixed - Fixed version with better concurrency handling +func TestResumeMillionRecords_Fixed(t *testing.T) { + const ( + totalRecords = 1000000 + numPartitions = int32(8) + numProducers = 4 + brokerAddr = "localhost:17777" + batchSize = 100 // Process in smaller batches to avoid overwhelming + ) + + // Create direct broker client + client, err := NewDirectBrokerClient(brokerAddr) + if err != nil { + t.Fatalf("Failed to create direct broker client: %v", err) + } + defer client.Close() + + topicName := fmt.Sprintf("resume-million-test-%d", time.Now().Unix()) + + // Create topic + glog.Infof("Creating topic %s with %d partitions for RESUMED test", topicName, numPartitions) + err = client.ConfigureTopic(topicName, numPartitions) + if err != nil { + t.Fatalf("Failed to configure topic: %v", err) + } + + // Performance tracking + var totalProduced int64 + var totalErrors int64 + startTime := time.Now() + + // Progress tracking + ticker := time.NewTicker(5 * time.Second) // More frequent updates + defer ticker.Stop() + + go func() { + for range ticker.C { + produced := atomic.LoadInt64(&totalProduced) + errors := atomic.LoadInt64(&totalErrors) + elapsed := time.Since(startTime) + rate := float64(produced) / elapsed.Seconds() + progressPercent := float64(produced) / float64(totalRecords) * 100 + + glog.Infof("PROGRESS: %d/%d records (%.1f%%), rate: %.0f records/sec, errors: %d", + produced, totalRecords, progressPercent, rate, errors) + + if produced >= totalRecords { + return + } + } + }() + + // Fixed producer function with better error handling + producer := func(producerID int, recordsPerProducer int) error { + defer glog.Infof("Producer %d FINISHED", producerID) + + // Create dedicated clients per producer to avoid contention + producerClient, err := NewDirectBrokerClient(brokerAddr) + if err != nil { + return fmt.Errorf("producer %d failed to create client: %v", producerID, err) + } + defer producerClient.Close() + + successCount := 0 + for i := 0; i < recordsPerProducer; i++ { + recordID := producerID*recordsPerProducer + i + + // Generate test record + testRecord := GenerateMockTestRecord(recordID) + key, value := SerializeMockTestRecord(testRecord) + + partition := int32(testRecord.UserID % int64(numPartitions)) + + // Produce with retry logic + maxRetries := 3 + var lastErr error + success := false + + for retry := 0; retry < maxRetries; retry++ { + err := producerClient.PublishRecord(topicName, partition, key, value) + if err == nil { + success = true + break + } + lastErr = err + time.Sleep(time.Duration(retry+1) * 100 * time.Millisecond) // Exponential backoff + } + + if success { + atomic.AddInt64(&totalProduced, 1) + successCount++ + } else { + atomic.AddInt64(&totalErrors, 1) + if atomic.LoadInt64(&totalErrors) < 10 { + glog.Errorf("Producer %d failed record %d after retries: %v", producerID, recordID, lastErr) + } + } + + // Batch progress logging + if successCount > 0 && successCount%10000 == 0 { + glog.Infof("Producer %d: %d/%d records completed", producerID, successCount, recordsPerProducer) + } + + // Small delay to prevent overwhelming the broker + if i > 0 && i%batchSize == 0 { + time.Sleep(10 * time.Millisecond) + } + } + + glog.Infof("Producer %d completed: %d successful, %d errors", + producerID, successCount, recordsPerProducer-successCount) + return nil + } + + // Start concurrent producers + glog.Infof("Starting FIXED %d producers for %d records total", numProducers, totalRecords) + + var wg sync.WaitGroup + recordsPerProducer := totalRecords / numProducers + + for i := 0; i < numProducers; i++ { + wg.Add(1) + go func(producerID int) { + defer wg.Done() + if err := producer(producerID, recordsPerProducer); err != nil { + glog.Errorf("Producer %d FAILED: %v", producerID, err) + } + }(i) + } + + // Wait for completion with timeout + done := make(chan bool) + go func() { + wg.Wait() + done <- true + }() + + select { + case <-done: + glog.Infof("All producers completed normally") + case <-time.After(30 * time.Minute): // 30-minute timeout + glog.Errorf("Test timed out after 30 minutes") + t.Errorf("Test timed out") + return + } + + produceTime := time.Since(startTime) + finalProduced := atomic.LoadInt64(&totalProduced) + finalErrors := atomic.LoadInt64(&totalErrors) + + // Performance results + throughputPerSec := float64(finalProduced) / produceTime.Seconds() + dataVolumeMB := float64(finalProduced) * 300 / (1024 * 1024) + throughputMBPerSec := dataVolumeMB / produceTime.Seconds() + successRate := float64(finalProduced) / float64(totalRecords) * 100 + + glog.Infof("\n"+ + "=== FINAL MILLION RECORD TEST RESULTS ===\n"+ + "==========================================\n"+ + "Records produced: %d / %d\n"+ + "Production time: %v\n"+ + "Average throughput: %.0f records/sec\n"+ + "Data volume: %.1f MB\n"+ + "Bandwidth: %.1f MB/sec\n"+ + "Errors: %d (%.2f%%)\n"+ + "Success rate: %.1f%%\n"+ + "Partitions used: %d\n"+ + "Concurrent producers: %d\n", + finalProduced, totalRecords, + produceTime, + throughputPerSec, + dataVolumeMB, + throughputMBPerSec, + finalErrors, + float64(finalErrors)/float64(totalRecords)*100, + successRate, + numPartitions, + numProducers, + ) + + // Test assertions + if finalProduced < int64(totalRecords*0.95) { // Allow 5% tolerance + t.Errorf("Too few records produced: %d < %d (95%% of target)", finalProduced, int64(float64(totalRecords)*0.95)) + } + + if finalErrors > int64(totalRecords*0.05) { // Error rate should be < 5% + t.Errorf("Too many errors: %d > %d (5%% of target)", finalErrors, int64(float64(totalRecords)*0.05)) + } + + if throughputPerSec < 100 { + t.Errorf("Throughput too low: %.0f records/sec (expected > 100)", throughputPerSec) + } + + glog.Infof("🏆 MILLION RECORD KAFKA INTEGRATION TEST COMPLETED SUCCESSFULLY!") +} + diff --git a/test/kafka/loadtest/run_million_record_test.sh b/test/kafka/loadtest/run_million_record_test.sh new file mode 100755 index 000000000..0728e8121 --- /dev/null +++ b/test/kafka/loadtest/run_million_record_test.sh @@ -0,0 +1,115 @@ +#!/bin/bash + +# Script to run the Kafka Gateway Million Record Integration Test +# This test requires a running SeaweedFS infrastructure (Master, Filer, MQ Broker) + +set -e + +echo "=== SeaweedFS Kafka Gateway Million Record Integration Test ===" +echo "Test Date: $(date)" +echo "Hostname: $(hostname)" +echo "" + +# Configuration +MASTERS=${SEAWEED_MASTERS:-"localhost:9333"} +FILER_GROUP=${SEAWEED_FILER_GROUP:-"default"} +TEST_DIR="." +TEST_NAME="TestDirectBroker_MillionRecordsIntegration" + +echo "Configuration:" +echo " Masters: $MASTERS" +echo " Filer Group: $FILER_GROUP" +echo " Test Directory: $TEST_DIR" +echo "" + +# Check if SeaweedFS infrastructure is running +echo "=== Checking Infrastructure ===" + +# Function to check if a service is running +check_service() { + local host_port=$1 + local service_name=$2 + + if timeout 3 bash -c "</dev/tcp/${host_port//://}" 2>/dev/null; then + echo "✓ $service_name is running on $host_port" + return 0 + else + echo "✗ $service_name is NOT running on $host_port" + return 1 + fi +} + +# Check each master +IFS=',' read -ra MASTER_ARRAY <<< "$MASTERS" +MASTERS_OK=true +for master in "${MASTER_ARRAY[@]}"; do + if ! check_service "$master" "SeaweedFS Master"; then + MASTERS_OK=false + fi +done + +if [ "$MASTERS_OK" = false ]; then + echo "" + echo "ERROR: One or more SeaweedFS Masters are not running." + echo "Please start your SeaweedFS infrastructure before running this test." + echo "" + echo "Example commands to start SeaweedFS:" + echo " # Terminal 1: Start Master" + echo " weed master -defaultReplication=001 -mdir=/tmp/seaweedfs/master" + echo "" + echo " # Terminal 2: Start Filer" + echo " weed filer -master=localhost:9333 -filer.dir=/tmp/seaweedfs/filer" + echo "" + echo " # Terminal 3: Start MQ Broker" + echo " weed mq.broker -filer=localhost:8888 -master=localhost:9333" + echo "" + exit 1 +fi + +echo "" +echo "=== Infrastructure Check Passed ===" +echo "" + +# Change to the correct directory +cd "$TEST_DIR" + +# Set environment variables for the test +export SEAWEED_MASTERS="$MASTERS" +export SEAWEED_FILER_GROUP="$FILER_GROUP" + +# Run the test with verbose output +echo "=== Running Million Record Integration Test ===" +echo "This may take several minutes..." +echo "" + +# Run the specific test with timeout and verbose output +timeout 1800 go test -v -run "$TEST_NAME" -timeout=30m 2>&1 | tee /tmp/seaweed_million_record_test.log + +TEST_EXIT_CODE=${PIPESTATUS[0]} + +echo "" +echo "=== Test Completed ===" +echo "Exit Code: $TEST_EXIT_CODE" +echo "Full log available at: /tmp/seaweed_million_record_test.log" +echo "" + +# Show summary from the log +echo "=== Performance Summary ===" +if grep -q "PERFORMANCE SUMMARY" /tmp/seaweed_million_record_test.log; then + grep -A 15 "PERFORMANCE SUMMARY" /tmp/seaweed_million_record_test.log +else + echo "Performance summary not found in log" +fi + +echo "" + +if [ $TEST_EXIT_CODE -eq 0 ]; then + echo "🎉 TEST PASSED: Million record integration test completed successfully!" +else + echo "❌ TEST FAILED: Million record integration test failed with exit code $TEST_EXIT_CODE" + echo "Check the log file for details: /tmp/seaweed_million_record_test.log" +fi + +echo "" +echo "=== Test Run Complete ===" +exit $TEST_EXIT_CODE diff --git a/test/kafka/loadtest/setup_seaweed_infrastructure.sh b/test/kafka/loadtest/setup_seaweed_infrastructure.sh new file mode 100755 index 000000000..448119097 --- /dev/null +++ b/test/kafka/loadtest/setup_seaweed_infrastructure.sh @@ -0,0 +1,131 @@ +#!/bin/bash + +# Script to set up SeaweedFS infrastructure for Kafka Gateway testing +# This script will start Master, Filer, and MQ Broker components + +set -e + +BASE_DIR="/tmp/seaweedfs" +LOG_DIR="$BASE_DIR/logs" +DATA_DIR="$BASE_DIR/data" + +echo "=== SeaweedFS Infrastructure Setup ===" +echo "Setup Date: $(date)" +echo "Base Directory: $BASE_DIR" +echo "" + +# Create directories +mkdir -p "$BASE_DIR/master" "$BASE_DIR/filer" "$BASE_DIR/broker" "$LOG_DIR" + +# Function to check if a service is running +check_service() { + local host_port=$1 + local service_name=$2 + + if timeout 3 bash -c "</dev/tcp/${host_port//://}" 2>/dev/null; then + echo "✓ $service_name is already running on $host_port" + return 0 + else + echo "✗ $service_name is NOT running on $host_port" + return 1 + fi +} + +# Function to start a service in background +start_service() { + local cmd="$1" + local service_name="$2" + local log_file="$3" + local check_port="$4" + + echo "Starting $service_name..." + echo "Command: $cmd" + echo "Log: $log_file" + + # Start in background + nohup $cmd > "$log_file" 2>&1 & + local pid=$! + echo "PID: $pid" + + # Wait for service to be ready + local retries=30 + while [ $retries -gt 0 ]; do + if check_service "$check_port" "$service_name" 2>/dev/null; then + echo "✓ $service_name is ready" + return 0 + fi + retries=$((retries - 1)) + sleep 1 + echo -n "." + done + echo "" + echo "❌ $service_name failed to start within 30 seconds" + return 1 +} + +# Stop any existing processes +echo "=== Cleaning up existing processes ===" +pkill -f "weed master" || true +pkill -f "weed filer" || true +pkill -f "weed mq.broker" || true +sleep 2 + +echo "" +echo "=== Starting SeaweedFS Components ===" + +# Start Master +if ! check_service "localhost:9333" "SeaweedFS Master"; then + start_service \ + "weed master -defaultReplication=001 -mdir=$BASE_DIR/master" \ + "SeaweedFS Master" \ + "$LOG_DIR/master.log" \ + "localhost:9333" + echo "" +fi + +# Start Filer +if ! check_service "localhost:8888" "SeaweedFS Filer"; then + start_service \ + "weed filer -master=localhost:9333 -filer.dir=$BASE_DIR/filer" \ + "SeaweedFS Filer" \ + "$LOG_DIR/filer.log" \ + "localhost:8888" + echo "" +fi + +# Start MQ Broker +if ! check_service "localhost:17777" "SeaweedFS MQ Broker"; then + start_service \ + "weed mq.broker -filer=localhost:8888 -master=localhost:9333" \ + "SeaweedFS MQ Broker" \ + "$LOG_DIR/broker.log" \ + "localhost:17777" + echo "" +fi + +echo "=== Infrastructure Status ===" +check_service "localhost:9333" "Master (gRPC)" +check_service "localhost:9334" "Master (HTTP)" +check_service "localhost:8888" "Filer (HTTP)" +check_service "localhost:18888" "Filer (gRPC)" +check_service "localhost:17777" "MQ Broker" + +echo "" +echo "=== Infrastructure Ready ===" +echo "Log files:" +echo " Master: $LOG_DIR/master.log" +echo " Filer: $LOG_DIR/filer.log" +echo " Broker: $LOG_DIR/broker.log" +echo "" +echo "To view logs in real-time:" +echo " tail -f $LOG_DIR/master.log" +echo " tail -f $LOG_DIR/filer.log" +echo " tail -f $LOG_DIR/broker.log" +echo "" +echo "To stop all services:" +echo " pkill -f \"weed master\"" +echo " pkill -f \"weed filer\"" +echo " pkill -f \"weed mq.broker\"" +echo "" +echo "[OK] SeaweedFS infrastructure is ready for testing!" + |
