diff options
Diffstat (limited to 'weed/admin/maintenance/maintenance_manager.go')
| -rw-r--r-- | weed/admin/maintenance/maintenance_manager.go | 407 |
1 files changed, 407 insertions, 0 deletions
diff --git a/weed/admin/maintenance/maintenance_manager.go b/weed/admin/maintenance/maintenance_manager.go new file mode 100644 index 000000000..17d1eef6d --- /dev/null +++ b/weed/admin/maintenance/maintenance_manager.go @@ -0,0 +1,407 @@ +package maintenance + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +// MaintenanceManager coordinates the maintenance system +type MaintenanceManager struct { + config *MaintenanceConfig + scanner *MaintenanceScanner + queue *MaintenanceQueue + adminClient AdminClient + running bool + stopChan chan struct{} + // Error handling and backoff + errorCount int + lastError error + lastErrorTime time.Time + backoffDelay time.Duration + mutex sync.RWMutex +} + +// NewMaintenanceManager creates a new maintenance manager +func NewMaintenanceManager(adminClient AdminClient, config *MaintenanceConfig) *MaintenanceManager { + if config == nil { + config = DefaultMaintenanceConfig() + } + + queue := NewMaintenanceQueue(config.Policy) + scanner := NewMaintenanceScanner(adminClient, config.Policy, queue) + + return &MaintenanceManager{ + config: config, + scanner: scanner, + queue: queue, + adminClient: adminClient, + stopChan: make(chan struct{}), + backoffDelay: time.Second, // Start with 1 second backoff + } +} + +// Start begins the maintenance manager +func (mm *MaintenanceManager) Start() error { + if !mm.config.Enabled { + glog.V(1).Infof("Maintenance system is disabled") + return nil + } + + // Validate configuration durations to prevent ticker panics + if err := mm.validateConfig(); err != nil { + return fmt.Errorf("invalid maintenance configuration: %v", err) + } + + mm.running = true + + // Start background processes + go mm.scanLoop() + go mm.cleanupLoop() + + glog.Infof("Maintenance manager started with scan interval %ds", mm.config.ScanIntervalSeconds) + return nil +} + +// validateConfig validates the maintenance configuration durations +func (mm *MaintenanceManager) validateConfig() error { + if mm.config.ScanIntervalSeconds <= 0 { + glog.Warningf("Invalid scan interval %ds, using default 30m", mm.config.ScanIntervalSeconds) + mm.config.ScanIntervalSeconds = 30 * 60 // 30 minutes in seconds + } + + if mm.config.CleanupIntervalSeconds <= 0 { + glog.Warningf("Invalid cleanup interval %ds, using default 24h", mm.config.CleanupIntervalSeconds) + mm.config.CleanupIntervalSeconds = 24 * 60 * 60 // 24 hours in seconds + } + + if mm.config.WorkerTimeoutSeconds <= 0 { + glog.Warningf("Invalid worker timeout %ds, using default 5m", mm.config.WorkerTimeoutSeconds) + mm.config.WorkerTimeoutSeconds = 5 * 60 // 5 minutes in seconds + } + + if mm.config.TaskTimeoutSeconds <= 0 { + glog.Warningf("Invalid task timeout %ds, using default 2h", mm.config.TaskTimeoutSeconds) + mm.config.TaskTimeoutSeconds = 2 * 60 * 60 // 2 hours in seconds + } + + if mm.config.RetryDelaySeconds <= 0 { + glog.Warningf("Invalid retry delay %ds, using default 15m", mm.config.RetryDelaySeconds) + mm.config.RetryDelaySeconds = 15 * 60 // 15 minutes in seconds + } + + if mm.config.TaskRetentionSeconds <= 0 { + glog.Warningf("Invalid task retention %ds, using default 168h", mm.config.TaskRetentionSeconds) + mm.config.TaskRetentionSeconds = 7 * 24 * 60 * 60 // 7 days in seconds + } + + return nil +} + +// IsRunning returns whether the maintenance manager is currently running +func (mm *MaintenanceManager) IsRunning() bool { + return mm.running +} + +// Stop terminates the maintenance manager +func (mm *MaintenanceManager) Stop() { + mm.running = false + close(mm.stopChan) + glog.Infof("Maintenance manager stopped") +} + +// scanLoop periodically scans for maintenance tasks with adaptive timing +func (mm *MaintenanceManager) scanLoop() { + scanInterval := time.Duration(mm.config.ScanIntervalSeconds) * time.Second + ticker := time.NewTicker(scanInterval) + defer ticker.Stop() + + for mm.running { + select { + case <-mm.stopChan: + return + case <-ticker.C: + glog.V(1).Infof("Performing maintenance scan every %v", scanInterval) + mm.performScan() + + // Adjust ticker interval based on error state + mm.mutex.RLock() + currentInterval := scanInterval + if mm.errorCount > 0 { + // Use backoff delay when there are errors + currentInterval = mm.backoffDelay + if currentInterval > scanInterval { + // Don't make it longer than the configured interval * 10 + maxInterval := scanInterval * 10 + if currentInterval > maxInterval { + currentInterval = maxInterval + } + } + } + mm.mutex.RUnlock() + + // Reset ticker with new interval if needed + if currentInterval != scanInterval { + ticker.Stop() + ticker = time.NewTicker(currentInterval) + } + } + } +} + +// cleanupLoop periodically cleans up old tasks and stale workers +func (mm *MaintenanceManager) cleanupLoop() { + cleanupInterval := time.Duration(mm.config.CleanupIntervalSeconds) * time.Second + ticker := time.NewTicker(cleanupInterval) + defer ticker.Stop() + + for mm.running { + select { + case <-mm.stopChan: + return + case <-ticker.C: + mm.performCleanup() + } + } +} + +// performScan executes a maintenance scan with error handling and backoff +func (mm *MaintenanceManager) performScan() { + mm.mutex.Lock() + defer mm.mutex.Unlock() + + glog.V(2).Infof("Starting maintenance scan") + + results, err := mm.scanner.ScanForMaintenanceTasks() + if err != nil { + mm.handleScanError(err) + return + } + + // Scan succeeded, reset error tracking + mm.resetErrorTracking() + + if len(results) > 0 { + mm.queue.AddTasksFromResults(results) + glog.V(1).Infof("Maintenance scan completed: added %d tasks", len(results)) + } else { + glog.V(2).Infof("Maintenance scan completed: no tasks needed") + } +} + +// handleScanError handles scan errors with exponential backoff and reduced logging +func (mm *MaintenanceManager) handleScanError(err error) { + now := time.Now() + mm.errorCount++ + mm.lastError = err + mm.lastErrorTime = now + + // Use exponential backoff with jitter + if mm.errorCount > 1 { + mm.backoffDelay = mm.backoffDelay * 2 + if mm.backoffDelay > 5*time.Minute { + mm.backoffDelay = 5 * time.Minute // Cap at 5 minutes + } + } + + // Reduce log frequency based on error count and time + shouldLog := false + if mm.errorCount <= 3 { + // Log first 3 errors immediately + shouldLog = true + } else if mm.errorCount <= 10 && mm.errorCount%3 == 0 { + // Log every 3rd error for errors 4-10 + shouldLog = true + } else if mm.errorCount%10 == 0 { + // Log every 10th error after that + shouldLog = true + } + + if shouldLog { + // Check if it's a connection error to provide better messaging + if isConnectionError(err) { + if mm.errorCount == 1 { + glog.Errorf("Maintenance scan failed: %v (will retry with backoff)", err) + } else { + glog.Errorf("Maintenance scan still failing after %d attempts: %v (backoff: %v)", + mm.errorCount, err, mm.backoffDelay) + } + } else { + glog.Errorf("Maintenance scan failed: %v", err) + } + } else { + // Use debug level for suppressed errors + glog.V(3).Infof("Maintenance scan failed (error #%d, suppressed): %v", mm.errorCount, err) + } +} + +// resetErrorTracking resets error tracking when scan succeeds +func (mm *MaintenanceManager) resetErrorTracking() { + if mm.errorCount > 0 { + glog.V(1).Infof("Maintenance scan recovered after %d failed attempts", mm.errorCount) + mm.errorCount = 0 + mm.lastError = nil + mm.backoffDelay = time.Second // Reset to initial delay + } +} + +// isConnectionError checks if the error is a connection-related error +func isConnectionError(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + return strings.Contains(errStr, "connection refused") || + strings.Contains(errStr, "connection error") || + strings.Contains(errStr, "dial tcp") || + strings.Contains(errStr, "connection timeout") || + strings.Contains(errStr, "no route to host") || + strings.Contains(errStr, "network unreachable") +} + +// performCleanup cleans up old tasks and stale workers +func (mm *MaintenanceManager) performCleanup() { + glog.V(2).Infof("Starting maintenance cleanup") + + taskRetention := time.Duration(mm.config.TaskRetentionSeconds) * time.Second + workerTimeout := time.Duration(mm.config.WorkerTimeoutSeconds) * time.Second + + removedTasks := mm.queue.CleanupOldTasks(taskRetention) + removedWorkers := mm.queue.RemoveStaleWorkers(workerTimeout) + + if removedTasks > 0 || removedWorkers > 0 { + glog.V(1).Infof("Cleanup completed: removed %d old tasks and %d stale workers", removedTasks, removedWorkers) + } +} + +// GetQueue returns the maintenance queue +func (mm *MaintenanceManager) GetQueue() *MaintenanceQueue { + return mm.queue +} + +// GetConfig returns the maintenance configuration +func (mm *MaintenanceManager) GetConfig() *MaintenanceConfig { + return mm.config +} + +// GetStats returns maintenance statistics +func (mm *MaintenanceManager) GetStats() *MaintenanceStats { + stats := mm.queue.GetStats() + + mm.mutex.RLock() + defer mm.mutex.RUnlock() + + stats.LastScanTime = time.Now() // Would need to track this properly + + // Calculate next scan time based on current error state + scanInterval := time.Duration(mm.config.ScanIntervalSeconds) * time.Second + nextScanInterval := scanInterval + if mm.errorCount > 0 { + nextScanInterval = mm.backoffDelay + maxInterval := scanInterval * 10 + if nextScanInterval > maxInterval { + nextScanInterval = maxInterval + } + } + stats.NextScanTime = time.Now().Add(nextScanInterval) + + return stats +} + +// GetErrorState returns the current error state for monitoring +func (mm *MaintenanceManager) GetErrorState() (errorCount int, lastError error, backoffDelay time.Duration) { + mm.mutex.RLock() + defer mm.mutex.RUnlock() + return mm.errorCount, mm.lastError, mm.backoffDelay +} + +// GetTasks returns tasks with filtering +func (mm *MaintenanceManager) GetTasks(status MaintenanceTaskStatus, taskType MaintenanceTaskType, limit int) []*MaintenanceTask { + return mm.queue.GetTasks(status, taskType, limit) +} + +// GetWorkers returns all registered workers +func (mm *MaintenanceManager) GetWorkers() []*MaintenanceWorker { + return mm.queue.GetWorkers() +} + +// TriggerScan manually triggers a maintenance scan +func (mm *MaintenanceManager) TriggerScan() error { + if !mm.running { + return fmt.Errorf("maintenance manager is not running") + } + + go mm.performScan() + return nil +} + +// UpdateConfig updates the maintenance configuration +func (mm *MaintenanceManager) UpdateConfig(config *MaintenanceConfig) error { + if config == nil { + return fmt.Errorf("config cannot be nil") + } + + mm.config = config + mm.queue.policy = config.Policy + mm.scanner.policy = config.Policy + + glog.V(1).Infof("Maintenance configuration updated") + return nil +} + +// CancelTask cancels a pending task +func (mm *MaintenanceManager) CancelTask(taskID string) error { + mm.queue.mutex.Lock() + defer mm.queue.mutex.Unlock() + + task, exists := mm.queue.tasks[taskID] + if !exists { + return fmt.Errorf("task %s not found", taskID) + } + + if task.Status == TaskStatusPending { + task.Status = TaskStatusCancelled + task.CompletedAt = &[]time.Time{time.Now()}[0] + + // Remove from pending tasks + for i, pendingTask := range mm.queue.pendingTasks { + if pendingTask.ID == taskID { + mm.queue.pendingTasks = append(mm.queue.pendingTasks[:i], mm.queue.pendingTasks[i+1:]...) + break + } + } + + glog.V(2).Infof("Cancelled task %s", taskID) + return nil + } + + return fmt.Errorf("task %s cannot be cancelled (status: %s)", taskID, task.Status) +} + +// RegisterWorker registers a new worker +func (mm *MaintenanceManager) RegisterWorker(worker *MaintenanceWorker) { + mm.queue.RegisterWorker(worker) +} + +// GetNextTask returns the next task for a worker +func (mm *MaintenanceManager) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask { + return mm.queue.GetNextTask(workerID, capabilities) +} + +// CompleteTask marks a task as completed +func (mm *MaintenanceManager) CompleteTask(taskID string, error string) { + mm.queue.CompleteTask(taskID, error) +} + +// UpdateTaskProgress updates task progress +func (mm *MaintenanceManager) UpdateTaskProgress(taskID string, progress float64) { + mm.queue.UpdateTaskProgress(taskID, progress) +} + +// UpdateWorkerHeartbeat updates worker heartbeat +func (mm *MaintenanceManager) UpdateWorkerHeartbeat(workerID string) { + mm.queue.UpdateWorkerHeartbeat(workerID) +} |
