aboutsummaryrefslogtreecommitdiff
path: root/test/mq/integration/resilient_publisher.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-06-25 17:42:39 -0700
committerchrislu <chris.lu@gmail.com>2025-06-25 17:42:39 -0700
commite4fe657bd953d4ce0c823f362584348bfb4903fb (patch)
tree0f95eb4221a0ee847c856d61f6595109f972eeb4 /test/mq/integration/resilient_publisher.go
parent31473fbe1053001796388bc0832c93d47f22839e (diff)
downloadseaweedfs-origin/adding-message-queue-integration-tests.tar.xz
seaweedfs-origin/adding-message-queue-integration-tests.zip
Diffstat (limited to 'test/mq/integration/resilient_publisher.go')
-rw-r--r--test/mq/integration/resilient_publisher.go336
1 files changed, 336 insertions, 0 deletions
diff --git a/test/mq/integration/resilient_publisher.go b/test/mq/integration/resilient_publisher.go
new file mode 100644
index 000000000..38da73fd4
--- /dev/null
+++ b/test/mq/integration/resilient_publisher.go
@@ -0,0 +1,336 @@
+package integration
+
+import (
+ "fmt"
+ "math"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// ResilientPublisher wraps TopicPublisher with enhanced error handling
+type ResilientPublisher struct {
+ publisher *pub_client.TopicPublisher
+ config *PublisherTestConfig
+ suite *IntegrationTestSuite
+
+ // Error tracking
+ connectionErrors int64
+ applicationErrors int64
+ retryAttempts int64
+ totalPublishes int64
+
+ // Retry configuration
+ maxRetries int
+ baseDelay time.Duration
+ maxDelay time.Duration
+ backoffFactor float64
+
+ // Circuit breaker
+ circuitOpen bool
+ circuitOpenTime time.Time
+ circuitTimeout time.Duration
+
+ mu sync.RWMutex
+}
+
+// RetryConfig holds retry configuration
+type RetryConfig struct {
+ MaxRetries int
+ BaseDelay time.Duration
+ MaxDelay time.Duration
+ BackoffFactor float64
+ CircuitTimeout time.Duration
+}
+
+// DefaultRetryConfig returns sensible defaults for retry configuration
+func DefaultRetryConfig() *RetryConfig {
+ return &RetryConfig{
+ MaxRetries: 5,
+ BaseDelay: 10 * time.Millisecond,
+ MaxDelay: 5 * time.Second,
+ BackoffFactor: 2.0,
+ CircuitTimeout: 30 * time.Second,
+ }
+}
+
+// NewResilientPublisher creates a new resilient publisher
+func (its *IntegrationTestSuite) CreateResilientPublisher(config *PublisherTestConfig, retryConfig *RetryConfig) (*ResilientPublisher, error) {
+ if retryConfig == nil {
+ retryConfig = DefaultRetryConfig()
+ }
+
+ publisher, err := its.CreatePublisher(config)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create base publisher: %v", err)
+ }
+
+ return &ResilientPublisher{
+ publisher: publisher,
+ config: config,
+ suite: its,
+ maxRetries: retryConfig.MaxRetries,
+ baseDelay: retryConfig.BaseDelay,
+ maxDelay: retryConfig.MaxDelay,
+ backoffFactor: retryConfig.BackoffFactor,
+ circuitTimeout: retryConfig.CircuitTimeout,
+ }, nil
+}
+
+// PublishWithRetry publishes a message with retry logic and error handling
+func (rp *ResilientPublisher) PublishWithRetry(key, value []byte) error {
+ atomic.AddInt64(&rp.totalPublishes, 1)
+
+ // Check circuit breaker
+ if rp.isCircuitOpen() {
+ atomic.AddInt64(&rp.applicationErrors, 1)
+ return fmt.Errorf("circuit breaker is open")
+ }
+
+ var lastErr error
+ for attempt := 0; attempt <= rp.maxRetries; attempt++ {
+ if attempt > 0 {
+ atomic.AddInt64(&rp.retryAttempts, 1)
+ delay := rp.calculateDelay(attempt)
+ glog.V(1).Infof("Retrying publish after %v (attempt %d/%d)", delay, attempt, rp.maxRetries)
+ time.Sleep(delay)
+ }
+
+ err := rp.publisher.Publish(key, value)
+ if err == nil {
+ // Success - reset circuit breaker if it was open
+ rp.resetCircuitBreaker()
+ return nil
+ }
+
+ lastErr = err
+
+ // Classify error type
+ if rp.isConnectionError(err) {
+ atomic.AddInt64(&rp.connectionErrors, 1)
+ glog.V(1).Infof("Connection error on attempt %d: %v", attempt+1, err)
+
+ // For connection errors, try to recreate the publisher
+ if attempt < rp.maxRetries {
+ if recreateErr := rp.recreatePublisher(); recreateErr != nil {
+ glog.Warningf("Failed to recreate publisher: %v", recreateErr)
+ }
+ }
+ continue
+ } else {
+ // Application error - don't retry
+ atomic.AddInt64(&rp.applicationErrors, 1)
+ glog.Warningf("Application error (not retrying): %v", err)
+ break
+ }
+ }
+
+ // All retries exhausted or non-retryable error
+ rp.openCircuitBreaker()
+ return fmt.Errorf("publish failed after %d attempts, last error: %v", rp.maxRetries+1, lastErr)
+}
+
+// PublishRecord publishes a record with retry logic
+func (rp *ResilientPublisher) PublishRecord(key []byte, record *schema_pb.RecordValue) error {
+ atomic.AddInt64(&rp.totalPublishes, 1)
+
+ if rp.isCircuitOpen() {
+ atomic.AddInt64(&rp.applicationErrors, 1)
+ return fmt.Errorf("circuit breaker is open")
+ }
+
+ var lastErr error
+ for attempt := 0; attempt <= rp.maxRetries; attempt++ {
+ if attempt > 0 {
+ atomic.AddInt64(&rp.retryAttempts, 1)
+ delay := rp.calculateDelay(attempt)
+ time.Sleep(delay)
+ }
+
+ err := rp.publisher.PublishRecord(key, record)
+ if err == nil {
+ rp.resetCircuitBreaker()
+ return nil
+ }
+
+ lastErr = err
+
+ if rp.isConnectionError(err) {
+ atomic.AddInt64(&rp.connectionErrors, 1)
+ if attempt < rp.maxRetries {
+ if recreateErr := rp.recreatePublisher(); recreateErr != nil {
+ glog.Warningf("Failed to recreate publisher: %v", recreateErr)
+ }
+ }
+ continue
+ } else {
+ atomic.AddInt64(&rp.applicationErrors, 1)
+ break
+ }
+ }
+
+ rp.openCircuitBreaker()
+ return fmt.Errorf("publish record failed after %d attempts, last error: %v", rp.maxRetries+1, lastErr)
+}
+
+// recreatePublisher attempts to recreate the underlying publisher
+func (rp *ResilientPublisher) recreatePublisher() error {
+ rp.mu.Lock()
+ defer rp.mu.Unlock()
+
+ // Shutdown old publisher
+ if rp.publisher != nil {
+ rp.publisher.Shutdown()
+ }
+
+ // Create new publisher
+ newPublisher, err := rp.suite.CreatePublisher(rp.config)
+ if err != nil {
+ return fmt.Errorf("failed to recreate publisher: %v", err)
+ }
+
+ rp.publisher = newPublisher
+ glog.V(1).Infof("Successfully recreated publisher")
+ return nil
+}
+
+// isConnectionError determines if an error is a connection-level error that should be retried
+func (rp *ResilientPublisher) isConnectionError(err error) bool {
+ if err == nil {
+ return false
+ }
+
+ errStr := err.Error()
+
+ // Check for gRPC status codes
+ if st, ok := status.FromError(err); ok {
+ switch st.Code() {
+ case codes.Unavailable, codes.DeadlineExceeded, codes.Canceled, codes.Unknown:
+ return true
+ }
+ }
+
+ // Check for common connection error strings
+ connectionErrorPatterns := []string{
+ "EOF",
+ "error reading server preface",
+ "connection refused",
+ "connection reset",
+ "broken pipe",
+ "network is unreachable",
+ "no route to host",
+ "transport is closing",
+ "connection error",
+ "dial tcp",
+ "context deadline exceeded",
+ }
+
+ for _, pattern := range connectionErrorPatterns {
+ if contains(errStr, pattern) {
+ return true
+ }
+ }
+
+ return false
+}
+
+// contains checks if a string contains a substring (case-insensitive)
+func contains(s, substr string) bool {
+ return len(s) >= len(substr) &&
+ (s == substr ||
+ len(s) > len(substr) &&
+ (s[:len(substr)] == substr ||
+ s[len(s)-len(substr):] == substr ||
+ containsSubstring(s, substr)))
+}
+
+func containsSubstring(s, substr string) bool {
+ for i := 0; i <= len(s)-len(substr); i++ {
+ if s[i:i+len(substr)] == substr {
+ return true
+ }
+ }
+ return false
+}
+
+// calculateDelay calculates exponential backoff delay
+func (rp *ResilientPublisher) calculateDelay(attempt int) time.Duration {
+ delay := float64(rp.baseDelay) * math.Pow(rp.backoffFactor, float64(attempt-1))
+ if delay > float64(rp.maxDelay) {
+ delay = float64(rp.maxDelay)
+ }
+ return time.Duration(delay)
+}
+
+// Circuit breaker methods
+func (rp *ResilientPublisher) isCircuitOpen() bool {
+ rp.mu.RLock()
+ defer rp.mu.RUnlock()
+
+ if !rp.circuitOpen {
+ return false
+ }
+
+ // Check if circuit should be reset
+ if time.Since(rp.circuitOpenTime) > rp.circuitTimeout {
+ return false
+ }
+
+ return true
+}
+
+func (rp *ResilientPublisher) openCircuitBreaker() {
+ rp.mu.Lock()
+ defer rp.mu.Unlock()
+
+ rp.circuitOpen = true
+ rp.circuitOpenTime = time.Now()
+ glog.Warningf("Circuit breaker opened due to repeated failures")
+}
+
+func (rp *ResilientPublisher) resetCircuitBreaker() {
+ rp.mu.Lock()
+ defer rp.mu.Unlock()
+
+ if rp.circuitOpen {
+ rp.circuitOpen = false
+ glog.V(1).Infof("Circuit breaker reset")
+ }
+}
+
+// GetErrorStats returns error statistics
+func (rp *ResilientPublisher) GetErrorStats() ErrorStats {
+ return ErrorStats{
+ ConnectionErrors: atomic.LoadInt64(&rp.connectionErrors),
+ ApplicationErrors: atomic.LoadInt64(&rp.applicationErrors),
+ RetryAttempts: atomic.LoadInt64(&rp.retryAttempts),
+ TotalPublishes: atomic.LoadInt64(&rp.totalPublishes),
+ CircuitOpen: rp.isCircuitOpen(),
+ }
+}
+
+// ErrorStats holds error statistics
+type ErrorStats struct {
+ ConnectionErrors int64
+ ApplicationErrors int64
+ RetryAttempts int64
+ TotalPublishes int64
+ CircuitOpen bool
+}
+
+// Shutdown gracefully shuts down the resilient publisher
+func (rp *ResilientPublisher) Shutdown() error {
+ rp.mu.Lock()
+ defer rp.mu.Unlock()
+
+ if rp.publisher != nil {
+ return rp.publisher.Shutdown()
+ }
+ return nil
+}