aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-06-25 17:42:39 -0700
committerchrislu <chris.lu@gmail.com>2025-06-25 17:42:39 -0700
commite4fe657bd953d4ce0c823f362584348bfb4903fb (patch)
tree0f95eb4221a0ee847c856d61f6595109f972eeb4
parent31473fbe1053001796388bc0832c93d47f22839e (diff)
downloadseaweedfs-origin/adding-message-queue-integration-tests.tar.xz
seaweedfs-origin/adding-message-queue-integration-tests.zip
-rw-r--r--test/mq/Dockerfile.test13
-rw-r--r--test/mq/integration/performance_test.go666
-rw-r--r--test/mq/integration/resilient_publisher.go336
-rw-r--r--test/mq/simple_demo.go56
4 files changed, 1063 insertions, 8 deletions
diff --git a/test/mq/Dockerfile.test b/test/mq/Dockerfile.test
index 8902df6af..d9d86a4e1 100644
--- a/test/mq/Dockerfile.test
+++ b/test/mq/Dockerfile.test
@@ -6,7 +6,8 @@ RUN apk add --no-cache \
netcat-openbsd \
bash \
git \
- build-base
+ build-base \
+ ca-certificates
# Set working directory
WORKDIR /workspace
@@ -19,12 +20,8 @@ RUN go mod download
COPY weed/ ./weed/
COPY test/mq/integration/ ./test/mq/integration/
-# Install test dependencies
-RUN go install github.com/onsi/ginkgo/v2/ginkgo@latest
-# Note: testify is a library, not a binary, so it's installed via go mod download
-
# Build the weed binary for testing (optional - tests can use external cluster)
-RUN cd weed && go build -o ../weed .
+RUN cd weed && CGO_ENABLED=1 GOOS=linux go build -o ../weed .
# Create test results directory
RUN mkdir -p /test-results
@@ -37,5 +34,5 @@ ENV GO111MODULE=on
# Default working directory for tests
WORKDIR /workspace
-# Entry point for running tests
-ENTRYPOINT ["/bin/bash"] \ No newline at end of file
+# Entry point for running tests - use explicit bash
+ENTRYPOINT ["/bin/bash", "-c"] \ No newline at end of file
diff --git a/test/mq/integration/performance_test.go b/test/mq/integration/performance_test.go
new file mode 100644
index 000000000..bdf8bd82a
--- /dev/null
+++ b/test/mq/integration/performance_test.go
@@ -0,0 +1,666 @@
+package integration
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// PerformanceMetrics holds test metrics
+type PerformanceMetrics struct {
+ MessagesPublished int64
+ MessagesConsumed int64
+ PublishLatencies []time.Duration
+ ConsumeLatencies []time.Duration
+ StartTime time.Time
+ EndTime time.Time
+ ErrorCount int64
+ mu sync.RWMutex
+}
+
+func (m *PerformanceMetrics) AddPublishLatency(d time.Duration) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.PublishLatencies = append(m.PublishLatencies, d)
+}
+
+func (m *PerformanceMetrics) AddConsumeLatency(d time.Duration) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.ConsumeLatencies = append(m.ConsumeLatencies, d)
+}
+
+func (m *PerformanceMetrics) GetThroughput() float64 {
+ duration := m.EndTime.Sub(m.StartTime).Seconds()
+ if duration == 0 {
+ return 0
+ }
+ return float64(atomic.LoadInt64(&m.MessagesPublished)) / duration
+}
+
+func (m *PerformanceMetrics) GetP95Latency(latencies []time.Duration) time.Duration {
+ if len(latencies) == 0 {
+ return 0
+ }
+
+ // Simple P95 calculation - in production use proper percentile library
+ index := int(float64(len(latencies)) * 0.95)
+ if index >= len(latencies) {
+ index = len(latencies) - 1
+ }
+
+ // Sort latencies (simplified)
+ for i := 0; i < len(latencies)-1; i++ {
+ for j := 0; j < len(latencies)-i-1; j++ {
+ if latencies[j] > latencies[j+1] {
+ latencies[j], latencies[j+1] = latencies[j+1], latencies[j]
+ }
+ }
+ }
+
+ return latencies[index]
+}
+
+// Enhanced performance metrics with connection error tracking
+type EnhancedPerformanceMetrics struct {
+ MessagesPublished int64
+ MessagesConsumed int64
+ PublishLatencies []time.Duration
+ ConsumeLatencies []time.Duration
+ StartTime time.Time
+ EndTime time.Time
+ ErrorCount int64
+ ConnectionErrors int64
+ ApplicationErrors int64
+ RetryAttempts int64
+ mu sync.RWMutex
+}
+
+func (m *EnhancedPerformanceMetrics) AddPublishLatency(d time.Duration) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.PublishLatencies = append(m.PublishLatencies, d)
+}
+
+func (m *EnhancedPerformanceMetrics) AddConsumeLatency(d time.Duration) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ m.ConsumeLatencies = append(m.ConsumeLatencies, d)
+}
+
+func (m *EnhancedPerformanceMetrics) GetThroughput() float64 {
+ duration := m.EndTime.Sub(m.StartTime).Seconds()
+ if duration == 0 {
+ return 0
+ }
+ return float64(atomic.LoadInt64(&m.MessagesPublished)) / duration
+}
+
+func (m *EnhancedPerformanceMetrics) GetP95Latency(latencies []time.Duration) time.Duration {
+ if len(latencies) == 0 {
+ return 0
+ }
+
+ // Simple P95 calculation
+ index := int(float64(len(latencies)) * 0.95)
+ if index >= len(latencies) {
+ index = len(latencies) - 1
+ }
+
+ // Sort latencies (simplified bubble sort for small datasets)
+ sorted := make([]time.Duration, len(latencies))
+ copy(sorted, latencies)
+ for i := 0; i < len(sorted)-1; i++ {
+ for j := 0; j < len(sorted)-i-1; j++ {
+ if sorted[j] > sorted[j+1] {
+ sorted[j], sorted[j+1] = sorted[j+1], sorted[j]
+ }
+ }
+ }
+
+ return sorted[index]
+}
+
+// isConnectionError determines if an error is a connection-level error
+func isConnectionError(err error) bool {
+ if err == nil {
+ return false
+ }
+
+ errStr := err.Error()
+
+ // Check for gRPC status codes
+ if st, ok := status.FromError(err); ok {
+ switch st.Code() {
+ case codes.Unavailable, codes.DeadlineExceeded, codes.Canceled, codes.Unknown:
+ return true
+ }
+ }
+
+ // Check for common connection error strings
+ connectionErrorPatterns := []string{
+ "EOF",
+ "error reading server preface",
+ "connection refused",
+ "connection reset",
+ "broken pipe",
+ "network is unreachable",
+ "no route to host",
+ "transport is closing",
+ "connection error",
+ "dial tcp",
+ "context deadline exceeded",
+ }
+
+ for _, pattern := range connectionErrorPatterns {
+ if containsString(errStr, pattern) {
+ return true
+ }
+ }
+
+ return false
+}
+
+func containsString(s, substr string) bool {
+ for i := 0; i <= len(s)-len(substr); i++ {
+ if s[i:i+len(substr)] == substr {
+ return true
+ }
+ }
+ return false
+}
+
+func TestPerformanceThroughput(t *testing.T) {
+ suite := NewIntegrationTestSuite(t)
+ require.NoError(t, suite.Setup())
+
+ topicName := "performance-throughput-test"
+ namespace := "perf-test"
+
+ metrics := &PerformanceMetrics{
+ StartTime: time.Now(),
+ }
+
+ // Test parameters
+ numMessages := 50000
+ numPublishers := 5
+ messageSize := 1024 // 1KB messages
+
+ // Create message payload
+ payload := make([]byte, messageSize)
+ for i := range payload {
+ payload[i] = byte(i % 256)
+ }
+
+ t.Logf("Starting throughput test: %d messages, %d publishers, %d bytes per message",
+ numMessages, numPublishers, messageSize)
+
+ // Start publishers
+ var publishWg sync.WaitGroup
+ messagesPerPublisher := numMessages / numPublishers
+
+ for i := 0; i < numPublishers; i++ {
+ publishWg.Add(1)
+ go func(publisherID int) {
+ defer publishWg.Done()
+
+ // Create publisher for this goroutine
+ pubConfig := &PublisherTestConfig{
+ Namespace: namespace,
+ TopicName: topicName,
+ PartitionCount: 4, // Multiple partitions for better throughput
+ PublisherName: fmt.Sprintf("perf-publisher-%d", publisherID),
+ RecordType: nil, // Use raw publish
+ }
+
+ publisher, err := suite.CreatePublisher(pubConfig)
+ if err != nil {
+ atomic.AddInt64(&metrics.ErrorCount, 1)
+ t.Errorf("Failed to create publisher %d: %v", publisherID, err)
+ return
+ }
+
+ for j := 0; j < messagesPerPublisher; j++ {
+ messageKey := fmt.Sprintf("publisher-%d-msg-%d", publisherID, j)
+
+ start := time.Now()
+ err := publisher.Publish([]byte(messageKey), payload)
+ latency := time.Since(start)
+
+ if err != nil {
+ atomic.AddInt64(&metrics.ErrorCount, 1)
+ continue
+ }
+
+ atomic.AddInt64(&metrics.MessagesPublished, 1)
+ metrics.AddPublishLatency(latency)
+
+ // Small delay to prevent overwhelming the system
+ if j%1000 == 0 {
+ time.Sleep(1 * time.Millisecond)
+ }
+ }
+ }(i)
+ }
+
+ // Wait for publishing to complete
+ publishWg.Wait()
+ metrics.EndTime = time.Now()
+
+ // Verify results
+ publishedCount := atomic.LoadInt64(&metrics.MessagesPublished)
+ errorCount := atomic.LoadInt64(&metrics.ErrorCount)
+ throughput := metrics.GetThroughput()
+
+ t.Logf("Performance Results:")
+ t.Logf(" Messages Published: %d", publishedCount)
+ t.Logf(" Errors: %d", errorCount)
+ t.Logf(" Throughput: %.2f messages/second", throughput)
+ t.Logf(" Duration: %v", metrics.EndTime.Sub(metrics.StartTime))
+
+ if len(metrics.PublishLatencies) > 0 {
+ p95Latency := metrics.GetP95Latency(metrics.PublishLatencies)
+ t.Logf(" P95 Publish Latency: %v", p95Latency)
+
+ // Performance assertions
+ assert.Less(t, p95Latency, 100*time.Millisecond, "P95 publish latency should be under 100ms")
+ }
+
+ // Throughput requirements
+ expectedMinThroughput := 5000.0 // 5K messages/sec minimum (relaxed from 10K for initial testing)
+ assert.Greater(t, throughput, expectedMinThroughput,
+ "Throughput should exceed %.0f messages/second", expectedMinThroughput)
+
+ // Error rate should be low
+ errorRate := float64(errorCount) / float64(publishedCount+errorCount)
+ assert.Less(t, errorRate, 0.05, "Error rate should be less than 5%")
+}
+
+func TestPerformanceLatency(t *testing.T) {
+ suite := NewIntegrationTestSuite(t)
+ require.NoError(t, suite.Setup())
+
+ topicName := "performance-latency-test"
+ namespace := "perf-test"
+
+ // Create publisher
+ pubConfig := &PublisherTestConfig{
+ Namespace: namespace,
+ TopicName: topicName,
+ PartitionCount: 1, // Single partition for latency testing
+ PublisherName: "latency-publisher",
+ RecordType: nil,
+ }
+
+ publisher, err := suite.CreatePublisher(pubConfig)
+ require.NoError(t, err)
+
+ // Start consumer first
+ subConfig := &SubscriberTestConfig{
+ Namespace: namespace,
+ TopicName: topicName,
+ ConsumerGroup: "latency-test-group",
+ ConsumerInstanceId: "latency-consumer-1",
+ MaxPartitionCount: 1,
+ SlidingWindowSize: 10,
+ OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
+ }
+
+ subscriber, err := suite.CreateSubscriber(subConfig)
+ require.NoError(t, err)
+
+ numMessages := 5000
+ collector := NewMessageCollector(numMessages)
+
+ // Set up message handler
+ subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
+ collector.AddMessage(TestMessage{
+ ID: string(m.Data.Key),
+ Content: m.Data.Value,
+ Timestamp: time.Unix(0, m.Data.TsNs),
+ Key: m.Data.Key,
+ })
+ })
+
+ // Start subscriber
+ go func() {
+ err := subscriber.Subscribe()
+ if err != nil {
+ t.Logf("Subscriber error: %v", err)
+ }
+ }()
+
+ // Wait for consumer to be ready
+ time.Sleep(2 * time.Second)
+
+ metrics := &PerformanceMetrics{
+ StartTime: time.Now(),
+ }
+
+ t.Logf("Starting latency test with %d messages", numMessages)
+
+ // Publish messages with controlled timing
+ for i := 0; i < numMessages; i++ {
+ messageKey := fmt.Sprintf("latency-msg-%d", i)
+ payload := fmt.Sprintf("test-payload-data-%d", i)
+
+ start := time.Now()
+ err := publisher.Publish([]byte(messageKey), []byte(payload))
+ publishLatency := time.Since(start)
+
+ require.NoError(t, err)
+ metrics.AddPublishLatency(publishLatency)
+
+ // Controlled rate for latency measurement
+ if i%100 == 0 {
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+
+ metrics.EndTime = time.Now()
+
+ // Wait for messages to be consumed
+ messages := collector.WaitForMessages(30 * time.Second)
+
+ // Analyze latency results
+ t.Logf("Latency Test Results:")
+ t.Logf(" Messages Published: %d", numMessages)
+ t.Logf(" Messages Consumed: %d", len(messages))
+
+ if len(metrics.PublishLatencies) > 0 {
+ p95PublishLatency := metrics.GetP95Latency(metrics.PublishLatencies)
+ t.Logf(" P95 Publish Latency: %v", p95PublishLatency)
+
+ // Latency assertions
+ assert.Less(t, p95PublishLatency, 50*time.Millisecond,
+ "P95 publish latency should be under 50ms")
+ }
+
+ // Verify message delivery
+ deliveryRate := float64(len(messages)) / float64(numMessages)
+ assert.Greater(t, deliveryRate, 0.80, "Should deliver at least 80% of messages")
+}
+
+func TestPerformanceConcurrentConsumers(t *testing.T) {
+ suite := NewIntegrationTestSuite(t)
+ require.NoError(t, suite.Setup())
+
+ topicName := "performance-concurrent-test"
+ namespace := "perf-test"
+ numPartitions := int32(4)
+
+ // Create publisher first
+ pubConfig := &PublisherTestConfig{
+ Namespace: namespace,
+ TopicName: topicName,
+ PartitionCount: numPartitions,
+ PublisherName: "concurrent-publisher",
+ RecordType: nil,
+ }
+
+ publisher, err := suite.CreatePublisher(pubConfig)
+ require.NoError(t, err)
+
+ // Test parameters
+ numConsumers := 8
+ numMessages := 20000
+ consumerGroup := "concurrent-perf-group"
+
+ t.Logf("Starting concurrent consumer test: %d consumers, %d messages, %d partitions",
+ numConsumers, numMessages, numPartitions)
+
+ // Start multiple consumers
+ var collectors []*MessageCollector
+
+ for i := 0; i < numConsumers; i++ {
+ collector := NewMessageCollector(numMessages / numConsumers) // Expected per consumer
+ collectors = append(collectors, collector)
+
+ subConfig := &SubscriberTestConfig{
+ Namespace: namespace,
+ TopicName: topicName,
+ ConsumerGroup: consumerGroup,
+ ConsumerInstanceId: fmt.Sprintf("consumer-%d", i),
+ MaxPartitionCount: numPartitions,
+ SlidingWindowSize: 10,
+ OffsetType: schema_pb.OffsetType_RESET_TO_EARLIEST,
+ }
+
+ subscriber, err := suite.CreateSubscriber(subConfig)
+ require.NoError(t, err)
+
+ // Set up message handler for this consumer
+ func(consumerID int, c *MessageCollector) {
+ subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
+ c.AddMessage(TestMessage{
+ ID: fmt.Sprintf("%d-%s", consumerID, string(m.Data.Key)),
+ Content: m.Data.Value,
+ Timestamp: time.Unix(0, m.Data.TsNs),
+ Key: m.Data.Key,
+ })
+ })
+ }(i, collector)
+
+ // Start subscriber
+ go func(consumerID int, s *SubscriberTestConfig) {
+ sub, _ := suite.CreateSubscriber(s)
+ err := sub.Subscribe()
+ if err != nil {
+ t.Logf("Consumer %d error: %v", consumerID, err)
+ }
+ }(i, subConfig)
+ }
+
+ // Wait for consumers to initialize
+ time.Sleep(3 * time.Second)
+
+ // Start publishing
+ startTime := time.Now()
+
+ var publishWg sync.WaitGroup
+ numPublishers := 3
+ messagesPerPublisher := numMessages / numPublishers
+
+ for i := 0; i < numPublishers; i++ {
+ publishWg.Add(1)
+ go func(publisherID int) {
+ defer publishWg.Done()
+
+ for j := 0; j < messagesPerPublisher; j++ {
+ messageKey := fmt.Sprintf("concurrent-msg-%d-%d", publisherID, j)
+ payload := fmt.Sprintf("publisher-%d-message-%d-data", publisherID, j)
+
+ err := publisher.Publish([]byte(messageKey), []byte(payload))
+
+ if err != nil {
+ t.Logf("Publish error: %v", err)
+ }
+
+ // Rate limiting
+ if j%1000 == 0 {
+ time.Sleep(2 * time.Millisecond)
+ }
+ }
+ }(i)
+ }
+
+ publishWg.Wait()
+ publishDuration := time.Since(startTime)
+
+ // Allow time for message consumption
+ time.Sleep(10 * time.Second)
+
+ // Analyze results
+ totalConsumed := int64(0)
+ for i, collector := range collectors {
+ messages := collector.GetMessages()
+ consumed := int64(len(messages))
+ totalConsumed += consumed
+ t.Logf("Consumer %d consumed %d messages", i, consumed)
+ }
+
+ publishThroughput := float64(numMessages) / publishDuration.Seconds()
+ consumeThroughput := float64(totalConsumed) / publishDuration.Seconds()
+
+ t.Logf("Concurrent Consumer Test Results:")
+ t.Logf(" Total Published: %d", numMessages)
+ t.Logf(" Total Consumed: %d", totalConsumed)
+ t.Logf(" Publish Throughput: %.2f msg/sec", publishThroughput)
+ t.Logf(" Consume Throughput: %.2f msg/sec", consumeThroughput)
+ t.Logf(" Test Duration: %v", publishDuration)
+
+ // Performance assertions (relaxed for initial testing)
+ deliveryRate := float64(totalConsumed) / float64(numMessages)
+ assert.Greater(t, deliveryRate, 0.70, "Should consume at least 70% of messages")
+
+ expectedMinThroughput := 2000.0 // 2K messages/sec minimum for concurrent consumption
+ assert.Greater(t, consumeThroughput, expectedMinThroughput,
+ "Consume throughput should exceed %.0f messages/second", expectedMinThroughput)
+}
+
+func TestPerformanceWithErrorHandling(t *testing.T) {
+ suite := NewIntegrationTestSuite(t)
+ require.NoError(t, suite.Setup())
+
+ topicName := "performance-error-handling-test"
+ namespace := "perf-test"
+
+ metrics := &EnhancedPerformanceMetrics{
+ StartTime: time.Now(),
+ }
+
+ // Test parameters
+ numMessages := 50000
+ numPublishers := 5
+ messageSize := 1024 // 1KB messages
+
+ // Create message payload
+ payload := make([]byte, messageSize)
+ for i := range payload {
+ payload[i] = byte(i % 256)
+ }
+
+ t.Logf("Starting performance test with enhanced error handling: %d messages, %d publishers, %d bytes per message",
+ numMessages, numPublishers, messageSize)
+
+ // Start publishers
+ var publishWg sync.WaitGroup
+ messagesPerPublisher := numMessages / numPublishers
+
+ for i := 0; i < numPublishers; i++ {
+ publishWg.Add(1)
+ go func(publisherID int) {
+ defer publishWg.Done()
+
+ // Create publisher for this goroutine
+ pubConfig := &PublisherTestConfig{
+ Namespace: namespace,
+ TopicName: topicName,
+ PartitionCount: 4, // Multiple partitions for better throughput
+ PublisherName: fmt.Sprintf("error-aware-publisher-%d", publisherID),
+ RecordType: nil, // Use raw publish
+ }
+
+ publisher, err := suite.CreatePublisher(pubConfig)
+ if err != nil {
+ atomic.AddInt64(&metrics.ErrorCount, 1)
+ t.Errorf("Failed to create publisher %d: %v", publisherID, err)
+ return
+ }
+
+ for j := 0; j < messagesPerPublisher; j++ {
+ messageKey := fmt.Sprintf("publisher-%d-msg-%d", publisherID, j)
+
+ start := time.Now()
+ err := publisher.Publish([]byte(messageKey), payload)
+ latency := time.Since(start)
+
+ if err != nil {
+ atomic.AddInt64(&metrics.ErrorCount, 1)
+
+ // Classify the error type
+ if isConnectionError(err) {
+ atomic.AddInt64(&metrics.ConnectionErrors, 1)
+ t.Logf("Connection error (publisher %d, msg %d): %v", publisherID, j, err)
+ } else {
+ atomic.AddInt64(&metrics.ApplicationErrors, 1)
+ t.Logf("Application error (publisher %d, msg %d): %v", publisherID, j, err)
+ }
+ continue
+ }
+
+ atomic.AddInt64(&metrics.MessagesPublished, 1)
+ metrics.AddPublishLatency(latency)
+
+ // Small delay to prevent overwhelming the system
+ if j%1000 == 0 {
+ time.Sleep(1 * time.Millisecond)
+ }
+ }
+ }(i)
+ }
+
+ // Wait for publishing to complete
+ publishWg.Wait()
+ metrics.EndTime = time.Now()
+
+ // Analyze results with enhanced error reporting
+ publishedCount := atomic.LoadInt64(&metrics.MessagesPublished)
+ totalErrors := atomic.LoadInt64(&metrics.ErrorCount)
+ connectionErrors := atomic.LoadInt64(&metrics.ConnectionErrors)
+ applicationErrors := atomic.LoadInt64(&metrics.ApplicationErrors)
+ throughput := metrics.GetThroughput()
+
+ t.Logf("Enhanced Performance Results:")
+ t.Logf(" Messages Successfully Published: %d", publishedCount)
+ t.Logf(" Total Errors: %d", totalErrors)
+ t.Logf(" Connection-Level Errors: %d", connectionErrors)
+ t.Logf(" Application-Level Errors: %d", applicationErrors)
+ t.Logf(" Throughput: %.2f messages/second", throughput)
+ t.Logf(" Duration: %v", metrics.EndTime.Sub(metrics.StartTime))
+
+ if len(metrics.PublishLatencies) > 0 {
+ p95Latency := metrics.GetP95Latency(metrics.PublishLatencies)
+ t.Logf(" P95 Publish Latency: %v", p95Latency)
+
+ // Performance assertions (adjusted for error handling overhead)
+ assert.Less(t, p95Latency, 100*time.Millisecond, "P95 publish latency should be under 100ms")
+ }
+
+ // Enhanced error analysis
+ totalAttempts := publishedCount + totalErrors
+ if totalAttempts > 0 {
+ successRate := float64(publishedCount) / float64(totalAttempts)
+ connectionErrorRate := float64(connectionErrors) / float64(totalAttempts)
+ applicationErrorRate := float64(applicationErrors) / float64(totalAttempts)
+
+ t.Logf("Error Analysis:")
+ t.Logf(" Success Rate: %.2f%%", successRate*100)
+ t.Logf(" Connection Error Rate: %.2f%%", connectionErrorRate*100)
+ t.Logf(" Application Error Rate: %.2f%%", applicationErrorRate*100)
+
+ // Assertions based on error types
+ assert.Greater(t, successRate, 0.80, "Success rate should be greater than 80%")
+ assert.Less(t, applicationErrorRate, 0.01, "Application error rate should be less than 1%")
+
+ // Connection errors are expected under high load but should be handled
+ if connectionErrors > 0 {
+ t.Logf("Note: %d connection errors detected - this indicates the test is successfully stressing the system", connectionErrors)
+ t.Logf("Recommendation: Implement retry logic for production applications to handle these connection errors")
+ }
+ }
+
+ // Throughput requirements (adjusted for error handling)
+ expectedMinThroughput := 5000.0 // 5K messages/sec minimum
+ assert.Greater(t, throughput, expectedMinThroughput,
+ "Throughput should exceed %.0f messages/second", expectedMinThroughput)
+}
diff --git a/test/mq/integration/resilient_publisher.go b/test/mq/integration/resilient_publisher.go
new file mode 100644
index 000000000..38da73fd4
--- /dev/null
+++ b/test/mq/integration/resilient_publisher.go
@@ -0,0 +1,336 @@
+package integration
+
+import (
+ "fmt"
+ "math"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// ResilientPublisher wraps TopicPublisher with enhanced error handling
+type ResilientPublisher struct {
+ publisher *pub_client.TopicPublisher
+ config *PublisherTestConfig
+ suite *IntegrationTestSuite
+
+ // Error tracking
+ connectionErrors int64
+ applicationErrors int64
+ retryAttempts int64
+ totalPublishes int64
+
+ // Retry configuration
+ maxRetries int
+ baseDelay time.Duration
+ maxDelay time.Duration
+ backoffFactor float64
+
+ // Circuit breaker
+ circuitOpen bool
+ circuitOpenTime time.Time
+ circuitTimeout time.Duration
+
+ mu sync.RWMutex
+}
+
+// RetryConfig holds retry configuration
+type RetryConfig struct {
+ MaxRetries int
+ BaseDelay time.Duration
+ MaxDelay time.Duration
+ BackoffFactor float64
+ CircuitTimeout time.Duration
+}
+
+// DefaultRetryConfig returns sensible defaults for retry configuration
+func DefaultRetryConfig() *RetryConfig {
+ return &RetryConfig{
+ MaxRetries: 5,
+ BaseDelay: 10 * time.Millisecond,
+ MaxDelay: 5 * time.Second,
+ BackoffFactor: 2.0,
+ CircuitTimeout: 30 * time.Second,
+ }
+}
+
+// NewResilientPublisher creates a new resilient publisher
+func (its *IntegrationTestSuite) CreateResilientPublisher(config *PublisherTestConfig, retryConfig *RetryConfig) (*ResilientPublisher, error) {
+ if retryConfig == nil {
+ retryConfig = DefaultRetryConfig()
+ }
+
+ publisher, err := its.CreatePublisher(config)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create base publisher: %v", err)
+ }
+
+ return &ResilientPublisher{
+ publisher: publisher,
+ config: config,
+ suite: its,
+ maxRetries: retryConfig.MaxRetries,
+ baseDelay: retryConfig.BaseDelay,
+ maxDelay: retryConfig.MaxDelay,
+ backoffFactor: retryConfig.BackoffFactor,
+ circuitTimeout: retryConfig.CircuitTimeout,
+ }, nil
+}
+
+// PublishWithRetry publishes a message with retry logic and error handling
+func (rp *ResilientPublisher) PublishWithRetry(key, value []byte) error {
+ atomic.AddInt64(&rp.totalPublishes, 1)
+
+ // Check circuit breaker
+ if rp.isCircuitOpen() {
+ atomic.AddInt64(&rp.applicationErrors, 1)
+ return fmt.Errorf("circuit breaker is open")
+ }
+
+ var lastErr error
+ for attempt := 0; attempt <= rp.maxRetries; attempt++ {
+ if attempt > 0 {
+ atomic.AddInt64(&rp.retryAttempts, 1)
+ delay := rp.calculateDelay(attempt)
+ glog.V(1).Infof("Retrying publish after %v (attempt %d/%d)", delay, attempt, rp.maxRetries)
+ time.Sleep(delay)
+ }
+
+ err := rp.publisher.Publish(key, value)
+ if err == nil {
+ // Success - reset circuit breaker if it was open
+ rp.resetCircuitBreaker()
+ return nil
+ }
+
+ lastErr = err
+
+ // Classify error type
+ if rp.isConnectionError(err) {
+ atomic.AddInt64(&rp.connectionErrors, 1)
+ glog.V(1).Infof("Connection error on attempt %d: %v", attempt+1, err)
+
+ // For connection errors, try to recreate the publisher
+ if attempt < rp.maxRetries {
+ if recreateErr := rp.recreatePublisher(); recreateErr != nil {
+ glog.Warningf("Failed to recreate publisher: %v", recreateErr)
+ }
+ }
+ continue
+ } else {
+ // Application error - don't retry
+ atomic.AddInt64(&rp.applicationErrors, 1)
+ glog.Warningf("Application error (not retrying): %v", err)
+ break
+ }
+ }
+
+ // All retries exhausted or non-retryable error
+ rp.openCircuitBreaker()
+ return fmt.Errorf("publish failed after %d attempts, last error: %v", rp.maxRetries+1, lastErr)
+}
+
+// PublishRecord publishes a record with retry logic
+func (rp *ResilientPublisher) PublishRecord(key []byte, record *schema_pb.RecordValue) error {
+ atomic.AddInt64(&rp.totalPublishes, 1)
+
+ if rp.isCircuitOpen() {
+ atomic.AddInt64(&rp.applicationErrors, 1)
+ return fmt.Errorf("circuit breaker is open")
+ }
+
+ var lastErr error
+ for attempt := 0; attempt <= rp.maxRetries; attempt++ {
+ if attempt > 0 {
+ atomic.AddInt64(&rp.retryAttempts, 1)
+ delay := rp.calculateDelay(attempt)
+ time.Sleep(delay)
+ }
+
+ err := rp.publisher.PublishRecord(key, record)
+ if err == nil {
+ rp.resetCircuitBreaker()
+ return nil
+ }
+
+ lastErr = err
+
+ if rp.isConnectionError(err) {
+ atomic.AddInt64(&rp.connectionErrors, 1)
+ if attempt < rp.maxRetries {
+ if recreateErr := rp.recreatePublisher(); recreateErr != nil {
+ glog.Warningf("Failed to recreate publisher: %v", recreateErr)
+ }
+ }
+ continue
+ } else {
+ atomic.AddInt64(&rp.applicationErrors, 1)
+ break
+ }
+ }
+
+ rp.openCircuitBreaker()
+ return fmt.Errorf("publish record failed after %d attempts, last error: %v", rp.maxRetries+1, lastErr)
+}
+
+// recreatePublisher attempts to recreate the underlying publisher
+func (rp *ResilientPublisher) recreatePublisher() error {
+ rp.mu.Lock()
+ defer rp.mu.Unlock()
+
+ // Shutdown old publisher
+ if rp.publisher != nil {
+ rp.publisher.Shutdown()
+ }
+
+ // Create new publisher
+ newPublisher, err := rp.suite.CreatePublisher(rp.config)
+ if err != nil {
+ return fmt.Errorf("failed to recreate publisher: %v", err)
+ }
+
+ rp.publisher = newPublisher
+ glog.V(1).Infof("Successfully recreated publisher")
+ return nil
+}
+
+// isConnectionError determines if an error is a connection-level error that should be retried
+func (rp *ResilientPublisher) isConnectionError(err error) bool {
+ if err == nil {
+ return false
+ }
+
+ errStr := err.Error()
+
+ // Check for gRPC status codes
+ if st, ok := status.FromError(err); ok {
+ switch st.Code() {
+ case codes.Unavailable, codes.DeadlineExceeded, codes.Canceled, codes.Unknown:
+ return true
+ }
+ }
+
+ // Check for common connection error strings
+ connectionErrorPatterns := []string{
+ "EOF",
+ "error reading server preface",
+ "connection refused",
+ "connection reset",
+ "broken pipe",
+ "network is unreachable",
+ "no route to host",
+ "transport is closing",
+ "connection error",
+ "dial tcp",
+ "context deadline exceeded",
+ }
+
+ for _, pattern := range connectionErrorPatterns {
+ if contains(errStr, pattern) {
+ return true
+ }
+ }
+
+ return false
+}
+
+// contains checks if a string contains a substring (case-insensitive)
+func contains(s, substr string) bool {
+ return len(s) >= len(substr) &&
+ (s == substr ||
+ len(s) > len(substr) &&
+ (s[:len(substr)] == substr ||
+ s[len(s)-len(substr):] == substr ||
+ containsSubstring(s, substr)))
+}
+
+func containsSubstring(s, substr string) bool {
+ for i := 0; i <= len(s)-len(substr); i++ {
+ if s[i:i+len(substr)] == substr {
+ return true
+ }
+ }
+ return false
+}
+
+// calculateDelay calculates exponential backoff delay
+func (rp *ResilientPublisher) calculateDelay(attempt int) time.Duration {
+ delay := float64(rp.baseDelay) * math.Pow(rp.backoffFactor, float64(attempt-1))
+ if delay > float64(rp.maxDelay) {
+ delay = float64(rp.maxDelay)
+ }
+ return time.Duration(delay)
+}
+
+// Circuit breaker methods
+func (rp *ResilientPublisher) isCircuitOpen() bool {
+ rp.mu.RLock()
+ defer rp.mu.RUnlock()
+
+ if !rp.circuitOpen {
+ return false
+ }
+
+ // Check if circuit should be reset
+ if time.Since(rp.circuitOpenTime) > rp.circuitTimeout {
+ return false
+ }
+
+ return true
+}
+
+func (rp *ResilientPublisher) openCircuitBreaker() {
+ rp.mu.Lock()
+ defer rp.mu.Unlock()
+
+ rp.circuitOpen = true
+ rp.circuitOpenTime = time.Now()
+ glog.Warningf("Circuit breaker opened due to repeated failures")
+}
+
+func (rp *ResilientPublisher) resetCircuitBreaker() {
+ rp.mu.Lock()
+ defer rp.mu.Unlock()
+
+ if rp.circuitOpen {
+ rp.circuitOpen = false
+ glog.V(1).Infof("Circuit breaker reset")
+ }
+}
+
+// GetErrorStats returns error statistics
+func (rp *ResilientPublisher) GetErrorStats() ErrorStats {
+ return ErrorStats{
+ ConnectionErrors: atomic.LoadInt64(&rp.connectionErrors),
+ ApplicationErrors: atomic.LoadInt64(&rp.applicationErrors),
+ RetryAttempts: atomic.LoadInt64(&rp.retryAttempts),
+ TotalPublishes: atomic.LoadInt64(&rp.totalPublishes),
+ CircuitOpen: rp.isCircuitOpen(),
+ }
+}
+
+// ErrorStats holds error statistics
+type ErrorStats struct {
+ ConnectionErrors int64
+ ApplicationErrors int64
+ RetryAttempts int64
+ TotalPublishes int64
+ CircuitOpen bool
+}
+
+// Shutdown gracefully shuts down the resilient publisher
+func (rp *ResilientPublisher) Shutdown() error {
+ rp.mu.Lock()
+ defer rp.mu.Unlock()
+
+ if rp.publisher != nil {
+ return rp.publisher.Shutdown()
+ }
+ return nil
+}
diff --git a/test/mq/simple_demo.go b/test/mq/simple_demo.go
new file mode 100644
index 000000000..35446e742
--- /dev/null
+++ b/test/mq/simple_demo.go
@@ -0,0 +1,56 @@
+package main
+
+import (
+ "fmt"
+ "log"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+)
+
+func main() {
+ log.Println("Starting SeaweedMQ logging test...")
+
+ // Create publisher configuration
+ config := &pub_client.PublisherConfiguration{
+ Topic: topic.NewTopic("test", "logging-demo"),
+ PartitionCount: 3,
+ Brokers: []string{"127.0.0.1:17777"},
+ PublisherName: "logging-test-client",
+ }
+
+ log.Println("Creating topic publisher...")
+ publisher, err := pub_client.NewTopicPublisher(config)
+ if err != nil {
+ log.Printf("Failed to create publisher: %v", err)
+ return
+ }
+ defer publisher.Shutdown()
+
+ log.Println("Publishing test messages...")
+
+ // Publish some test messages
+ for i := 0; i < 100; i++ {
+ key := fmt.Sprintf("key-%d", i)
+ value := fmt.Sprintf("message-%d-timestamp-%d", i, time.Now().Unix())
+
+ err := publisher.Publish([]byte(key), []byte(value))
+ if err != nil {
+ log.Printf("Failed to publish message %d: %v", i, err)
+ }
+
+ // Small delay to create some connection stress
+ if i%10 == 0 {
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+
+ log.Println("Finishing publish...")
+ err = publisher.FinishPublish()
+ if err != nil {
+ log.Printf("Failed to finish publish: %v", err)
+ }
+
+ log.Println("Test completed successfully!")
+}