diff options
Diffstat (limited to 'test/mq/integration/resilient_publisher.go')
| -rw-r--r-- | test/mq/integration/resilient_publisher.go | 336 |
1 files changed, 336 insertions, 0 deletions
diff --git a/test/mq/integration/resilient_publisher.go b/test/mq/integration/resilient_publisher.go new file mode 100644 index 000000000..38da73fd4 --- /dev/null +++ b/test/mq/integration/resilient_publisher.go @@ -0,0 +1,336 @@ +package integration + +import ( + "fmt" + "math" + "sync" + "sync/atomic" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// ResilientPublisher wraps TopicPublisher with enhanced error handling +type ResilientPublisher struct { + publisher *pub_client.TopicPublisher + config *PublisherTestConfig + suite *IntegrationTestSuite + + // Error tracking + connectionErrors int64 + applicationErrors int64 + retryAttempts int64 + totalPublishes int64 + + // Retry configuration + maxRetries int + baseDelay time.Duration + maxDelay time.Duration + backoffFactor float64 + + // Circuit breaker + circuitOpen bool + circuitOpenTime time.Time + circuitTimeout time.Duration + + mu sync.RWMutex +} + +// RetryConfig holds retry configuration +type RetryConfig struct { + MaxRetries int + BaseDelay time.Duration + MaxDelay time.Duration + BackoffFactor float64 + CircuitTimeout time.Duration +} + +// DefaultRetryConfig returns sensible defaults for retry configuration +func DefaultRetryConfig() *RetryConfig { + return &RetryConfig{ + MaxRetries: 5, + BaseDelay: 10 * time.Millisecond, + MaxDelay: 5 * time.Second, + BackoffFactor: 2.0, + CircuitTimeout: 30 * time.Second, + } +} + +// NewResilientPublisher creates a new resilient publisher +func (its *IntegrationTestSuite) CreateResilientPublisher(config *PublisherTestConfig, retryConfig *RetryConfig) (*ResilientPublisher, error) { + if retryConfig == nil { + retryConfig = DefaultRetryConfig() + } + + publisher, err := its.CreatePublisher(config) + if err != nil { + return nil, fmt.Errorf("failed to create base publisher: %v", err) + } + + return &ResilientPublisher{ + publisher: publisher, + config: config, + suite: its, + maxRetries: retryConfig.MaxRetries, + baseDelay: retryConfig.BaseDelay, + maxDelay: retryConfig.MaxDelay, + backoffFactor: retryConfig.BackoffFactor, + circuitTimeout: retryConfig.CircuitTimeout, + }, nil +} + +// PublishWithRetry publishes a message with retry logic and error handling +func (rp *ResilientPublisher) PublishWithRetry(key, value []byte) error { + atomic.AddInt64(&rp.totalPublishes, 1) + + // Check circuit breaker + if rp.isCircuitOpen() { + atomic.AddInt64(&rp.applicationErrors, 1) + return fmt.Errorf("circuit breaker is open") + } + + var lastErr error + for attempt := 0; attempt <= rp.maxRetries; attempt++ { + if attempt > 0 { + atomic.AddInt64(&rp.retryAttempts, 1) + delay := rp.calculateDelay(attempt) + glog.V(1).Infof("Retrying publish after %v (attempt %d/%d)", delay, attempt, rp.maxRetries) + time.Sleep(delay) + } + + err := rp.publisher.Publish(key, value) + if err == nil { + // Success - reset circuit breaker if it was open + rp.resetCircuitBreaker() + return nil + } + + lastErr = err + + // Classify error type + if rp.isConnectionError(err) { + atomic.AddInt64(&rp.connectionErrors, 1) + glog.V(1).Infof("Connection error on attempt %d: %v", attempt+1, err) + + // For connection errors, try to recreate the publisher + if attempt < rp.maxRetries { + if recreateErr := rp.recreatePublisher(); recreateErr != nil { + glog.Warningf("Failed to recreate publisher: %v", recreateErr) + } + } + continue + } else { + // Application error - don't retry + atomic.AddInt64(&rp.applicationErrors, 1) + glog.Warningf("Application error (not retrying): %v", err) + break + } + } + + // All retries exhausted or non-retryable error + rp.openCircuitBreaker() + return fmt.Errorf("publish failed after %d attempts, last error: %v", rp.maxRetries+1, lastErr) +} + +// PublishRecord publishes a record with retry logic +func (rp *ResilientPublisher) PublishRecord(key []byte, record *schema_pb.RecordValue) error { + atomic.AddInt64(&rp.totalPublishes, 1) + + if rp.isCircuitOpen() { + atomic.AddInt64(&rp.applicationErrors, 1) + return fmt.Errorf("circuit breaker is open") + } + + var lastErr error + for attempt := 0; attempt <= rp.maxRetries; attempt++ { + if attempt > 0 { + atomic.AddInt64(&rp.retryAttempts, 1) + delay := rp.calculateDelay(attempt) + time.Sleep(delay) + } + + err := rp.publisher.PublishRecord(key, record) + if err == nil { + rp.resetCircuitBreaker() + return nil + } + + lastErr = err + + if rp.isConnectionError(err) { + atomic.AddInt64(&rp.connectionErrors, 1) + if attempt < rp.maxRetries { + if recreateErr := rp.recreatePublisher(); recreateErr != nil { + glog.Warningf("Failed to recreate publisher: %v", recreateErr) + } + } + continue + } else { + atomic.AddInt64(&rp.applicationErrors, 1) + break + } + } + + rp.openCircuitBreaker() + return fmt.Errorf("publish record failed after %d attempts, last error: %v", rp.maxRetries+1, lastErr) +} + +// recreatePublisher attempts to recreate the underlying publisher +func (rp *ResilientPublisher) recreatePublisher() error { + rp.mu.Lock() + defer rp.mu.Unlock() + + // Shutdown old publisher + if rp.publisher != nil { + rp.publisher.Shutdown() + } + + // Create new publisher + newPublisher, err := rp.suite.CreatePublisher(rp.config) + if err != nil { + return fmt.Errorf("failed to recreate publisher: %v", err) + } + + rp.publisher = newPublisher + glog.V(1).Infof("Successfully recreated publisher") + return nil +} + +// isConnectionError determines if an error is a connection-level error that should be retried +func (rp *ResilientPublisher) isConnectionError(err error) bool { + if err == nil { + return false + } + + errStr := err.Error() + + // Check for gRPC status codes + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.Unavailable, codes.DeadlineExceeded, codes.Canceled, codes.Unknown: + return true + } + } + + // Check for common connection error strings + connectionErrorPatterns := []string{ + "EOF", + "error reading server preface", + "connection refused", + "connection reset", + "broken pipe", + "network is unreachable", + "no route to host", + "transport is closing", + "connection error", + "dial tcp", + "context deadline exceeded", + } + + for _, pattern := range connectionErrorPatterns { + if contains(errStr, pattern) { + return true + } + } + + return false +} + +// contains checks if a string contains a substring (case-insensitive) +func contains(s, substr string) bool { + return len(s) >= len(substr) && + (s == substr || + len(s) > len(substr) && + (s[:len(substr)] == substr || + s[len(s)-len(substr):] == substr || + containsSubstring(s, substr))) +} + +func containsSubstring(s, substr string) bool { + for i := 0; i <= len(s)-len(substr); i++ { + if s[i:i+len(substr)] == substr { + return true + } + } + return false +} + +// calculateDelay calculates exponential backoff delay +func (rp *ResilientPublisher) calculateDelay(attempt int) time.Duration { + delay := float64(rp.baseDelay) * math.Pow(rp.backoffFactor, float64(attempt-1)) + if delay > float64(rp.maxDelay) { + delay = float64(rp.maxDelay) + } + return time.Duration(delay) +} + +// Circuit breaker methods +func (rp *ResilientPublisher) isCircuitOpen() bool { + rp.mu.RLock() + defer rp.mu.RUnlock() + + if !rp.circuitOpen { + return false + } + + // Check if circuit should be reset + if time.Since(rp.circuitOpenTime) > rp.circuitTimeout { + return false + } + + return true +} + +func (rp *ResilientPublisher) openCircuitBreaker() { + rp.mu.Lock() + defer rp.mu.Unlock() + + rp.circuitOpen = true + rp.circuitOpenTime = time.Now() + glog.Warningf("Circuit breaker opened due to repeated failures") +} + +func (rp *ResilientPublisher) resetCircuitBreaker() { + rp.mu.Lock() + defer rp.mu.Unlock() + + if rp.circuitOpen { + rp.circuitOpen = false + glog.V(1).Infof("Circuit breaker reset") + } +} + +// GetErrorStats returns error statistics +func (rp *ResilientPublisher) GetErrorStats() ErrorStats { + return ErrorStats{ + ConnectionErrors: atomic.LoadInt64(&rp.connectionErrors), + ApplicationErrors: atomic.LoadInt64(&rp.applicationErrors), + RetryAttempts: atomic.LoadInt64(&rp.retryAttempts), + TotalPublishes: atomic.LoadInt64(&rp.totalPublishes), + CircuitOpen: rp.isCircuitOpen(), + } +} + +// ErrorStats holds error statistics +type ErrorStats struct { + ConnectionErrors int64 + ApplicationErrors int64 + RetryAttempts int64 + TotalPublishes int64 + CircuitOpen bool +} + +// Shutdown gracefully shuts down the resilient publisher +func (rp *ResilientPublisher) Shutdown() error { + rp.mu.Lock() + defer rp.mu.Unlock() + + if rp.publisher != nil { + return rp.publisher.Shutdown() + } + return nil +} |
