aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/maintenance/maintenance_manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/maintenance/maintenance_manager.go')
-rw-r--r--weed/admin/maintenance/maintenance_manager.go407
1 files changed, 407 insertions, 0 deletions
diff --git a/weed/admin/maintenance/maintenance_manager.go b/weed/admin/maintenance/maintenance_manager.go
new file mode 100644
index 000000000..17d1eef6d
--- /dev/null
+++ b/weed/admin/maintenance/maintenance_manager.go
@@ -0,0 +1,407 @@
+package maintenance
+
+import (
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+)
+
+// MaintenanceManager coordinates the maintenance system
+type MaintenanceManager struct {
+ config *MaintenanceConfig
+ scanner *MaintenanceScanner
+ queue *MaintenanceQueue
+ adminClient AdminClient
+ running bool
+ stopChan chan struct{}
+ // Error handling and backoff
+ errorCount int
+ lastError error
+ lastErrorTime time.Time
+ backoffDelay time.Duration
+ mutex sync.RWMutex
+}
+
+// NewMaintenanceManager creates a new maintenance manager
+func NewMaintenanceManager(adminClient AdminClient, config *MaintenanceConfig) *MaintenanceManager {
+ if config == nil {
+ config = DefaultMaintenanceConfig()
+ }
+
+ queue := NewMaintenanceQueue(config.Policy)
+ scanner := NewMaintenanceScanner(adminClient, config.Policy, queue)
+
+ return &MaintenanceManager{
+ config: config,
+ scanner: scanner,
+ queue: queue,
+ adminClient: adminClient,
+ stopChan: make(chan struct{}),
+ backoffDelay: time.Second, // Start with 1 second backoff
+ }
+}
+
+// Start begins the maintenance manager
+func (mm *MaintenanceManager) Start() error {
+ if !mm.config.Enabled {
+ glog.V(1).Infof("Maintenance system is disabled")
+ return nil
+ }
+
+ // Validate configuration durations to prevent ticker panics
+ if err := mm.validateConfig(); err != nil {
+ return fmt.Errorf("invalid maintenance configuration: %v", err)
+ }
+
+ mm.running = true
+
+ // Start background processes
+ go mm.scanLoop()
+ go mm.cleanupLoop()
+
+ glog.Infof("Maintenance manager started with scan interval %ds", mm.config.ScanIntervalSeconds)
+ return nil
+}
+
+// validateConfig validates the maintenance configuration durations
+func (mm *MaintenanceManager) validateConfig() error {
+ if mm.config.ScanIntervalSeconds <= 0 {
+ glog.Warningf("Invalid scan interval %ds, using default 30m", mm.config.ScanIntervalSeconds)
+ mm.config.ScanIntervalSeconds = 30 * 60 // 30 minutes in seconds
+ }
+
+ if mm.config.CleanupIntervalSeconds <= 0 {
+ glog.Warningf("Invalid cleanup interval %ds, using default 24h", mm.config.CleanupIntervalSeconds)
+ mm.config.CleanupIntervalSeconds = 24 * 60 * 60 // 24 hours in seconds
+ }
+
+ if mm.config.WorkerTimeoutSeconds <= 0 {
+ glog.Warningf("Invalid worker timeout %ds, using default 5m", mm.config.WorkerTimeoutSeconds)
+ mm.config.WorkerTimeoutSeconds = 5 * 60 // 5 minutes in seconds
+ }
+
+ if mm.config.TaskTimeoutSeconds <= 0 {
+ glog.Warningf("Invalid task timeout %ds, using default 2h", mm.config.TaskTimeoutSeconds)
+ mm.config.TaskTimeoutSeconds = 2 * 60 * 60 // 2 hours in seconds
+ }
+
+ if mm.config.RetryDelaySeconds <= 0 {
+ glog.Warningf("Invalid retry delay %ds, using default 15m", mm.config.RetryDelaySeconds)
+ mm.config.RetryDelaySeconds = 15 * 60 // 15 minutes in seconds
+ }
+
+ if mm.config.TaskRetentionSeconds <= 0 {
+ glog.Warningf("Invalid task retention %ds, using default 168h", mm.config.TaskRetentionSeconds)
+ mm.config.TaskRetentionSeconds = 7 * 24 * 60 * 60 // 7 days in seconds
+ }
+
+ return nil
+}
+
+// IsRunning returns whether the maintenance manager is currently running
+func (mm *MaintenanceManager) IsRunning() bool {
+ return mm.running
+}
+
+// Stop terminates the maintenance manager
+func (mm *MaintenanceManager) Stop() {
+ mm.running = false
+ close(mm.stopChan)
+ glog.Infof("Maintenance manager stopped")
+}
+
+// scanLoop periodically scans for maintenance tasks with adaptive timing
+func (mm *MaintenanceManager) scanLoop() {
+ scanInterval := time.Duration(mm.config.ScanIntervalSeconds) * time.Second
+ ticker := time.NewTicker(scanInterval)
+ defer ticker.Stop()
+
+ for mm.running {
+ select {
+ case <-mm.stopChan:
+ return
+ case <-ticker.C:
+ glog.V(1).Infof("Performing maintenance scan every %v", scanInterval)
+ mm.performScan()
+
+ // Adjust ticker interval based on error state
+ mm.mutex.RLock()
+ currentInterval := scanInterval
+ if mm.errorCount > 0 {
+ // Use backoff delay when there are errors
+ currentInterval = mm.backoffDelay
+ if currentInterval > scanInterval {
+ // Don't make it longer than the configured interval * 10
+ maxInterval := scanInterval * 10
+ if currentInterval > maxInterval {
+ currentInterval = maxInterval
+ }
+ }
+ }
+ mm.mutex.RUnlock()
+
+ // Reset ticker with new interval if needed
+ if currentInterval != scanInterval {
+ ticker.Stop()
+ ticker = time.NewTicker(currentInterval)
+ }
+ }
+ }
+}
+
+// cleanupLoop periodically cleans up old tasks and stale workers
+func (mm *MaintenanceManager) cleanupLoop() {
+ cleanupInterval := time.Duration(mm.config.CleanupIntervalSeconds) * time.Second
+ ticker := time.NewTicker(cleanupInterval)
+ defer ticker.Stop()
+
+ for mm.running {
+ select {
+ case <-mm.stopChan:
+ return
+ case <-ticker.C:
+ mm.performCleanup()
+ }
+ }
+}
+
+// performScan executes a maintenance scan with error handling and backoff
+func (mm *MaintenanceManager) performScan() {
+ mm.mutex.Lock()
+ defer mm.mutex.Unlock()
+
+ glog.V(2).Infof("Starting maintenance scan")
+
+ results, err := mm.scanner.ScanForMaintenanceTasks()
+ if err != nil {
+ mm.handleScanError(err)
+ return
+ }
+
+ // Scan succeeded, reset error tracking
+ mm.resetErrorTracking()
+
+ if len(results) > 0 {
+ mm.queue.AddTasksFromResults(results)
+ glog.V(1).Infof("Maintenance scan completed: added %d tasks", len(results))
+ } else {
+ glog.V(2).Infof("Maintenance scan completed: no tasks needed")
+ }
+}
+
+// handleScanError handles scan errors with exponential backoff and reduced logging
+func (mm *MaintenanceManager) handleScanError(err error) {
+ now := time.Now()
+ mm.errorCount++
+ mm.lastError = err
+ mm.lastErrorTime = now
+
+ // Use exponential backoff with jitter
+ if mm.errorCount > 1 {
+ mm.backoffDelay = mm.backoffDelay * 2
+ if mm.backoffDelay > 5*time.Minute {
+ mm.backoffDelay = 5 * time.Minute // Cap at 5 minutes
+ }
+ }
+
+ // Reduce log frequency based on error count and time
+ shouldLog := false
+ if mm.errorCount <= 3 {
+ // Log first 3 errors immediately
+ shouldLog = true
+ } else if mm.errorCount <= 10 && mm.errorCount%3 == 0 {
+ // Log every 3rd error for errors 4-10
+ shouldLog = true
+ } else if mm.errorCount%10 == 0 {
+ // Log every 10th error after that
+ shouldLog = true
+ }
+
+ if shouldLog {
+ // Check if it's a connection error to provide better messaging
+ if isConnectionError(err) {
+ if mm.errorCount == 1 {
+ glog.Errorf("Maintenance scan failed: %v (will retry with backoff)", err)
+ } else {
+ glog.Errorf("Maintenance scan still failing after %d attempts: %v (backoff: %v)",
+ mm.errorCount, err, mm.backoffDelay)
+ }
+ } else {
+ glog.Errorf("Maintenance scan failed: %v", err)
+ }
+ } else {
+ // Use debug level for suppressed errors
+ glog.V(3).Infof("Maintenance scan failed (error #%d, suppressed): %v", mm.errorCount, err)
+ }
+}
+
+// resetErrorTracking resets error tracking when scan succeeds
+func (mm *MaintenanceManager) resetErrorTracking() {
+ if mm.errorCount > 0 {
+ glog.V(1).Infof("Maintenance scan recovered after %d failed attempts", mm.errorCount)
+ mm.errorCount = 0
+ mm.lastError = nil
+ mm.backoffDelay = time.Second // Reset to initial delay
+ }
+}
+
+// isConnectionError checks if the error is a connection-related error
+func isConnectionError(err error) bool {
+ if err == nil {
+ return false
+ }
+ errStr := err.Error()
+ return strings.Contains(errStr, "connection refused") ||
+ strings.Contains(errStr, "connection error") ||
+ strings.Contains(errStr, "dial tcp") ||
+ strings.Contains(errStr, "connection timeout") ||
+ strings.Contains(errStr, "no route to host") ||
+ strings.Contains(errStr, "network unreachable")
+}
+
+// performCleanup cleans up old tasks and stale workers
+func (mm *MaintenanceManager) performCleanup() {
+ glog.V(2).Infof("Starting maintenance cleanup")
+
+ taskRetention := time.Duration(mm.config.TaskRetentionSeconds) * time.Second
+ workerTimeout := time.Duration(mm.config.WorkerTimeoutSeconds) * time.Second
+
+ removedTasks := mm.queue.CleanupOldTasks(taskRetention)
+ removedWorkers := mm.queue.RemoveStaleWorkers(workerTimeout)
+
+ if removedTasks > 0 || removedWorkers > 0 {
+ glog.V(1).Infof("Cleanup completed: removed %d old tasks and %d stale workers", removedTasks, removedWorkers)
+ }
+}
+
+// GetQueue returns the maintenance queue
+func (mm *MaintenanceManager) GetQueue() *MaintenanceQueue {
+ return mm.queue
+}
+
+// GetConfig returns the maintenance configuration
+func (mm *MaintenanceManager) GetConfig() *MaintenanceConfig {
+ return mm.config
+}
+
+// GetStats returns maintenance statistics
+func (mm *MaintenanceManager) GetStats() *MaintenanceStats {
+ stats := mm.queue.GetStats()
+
+ mm.mutex.RLock()
+ defer mm.mutex.RUnlock()
+
+ stats.LastScanTime = time.Now() // Would need to track this properly
+
+ // Calculate next scan time based on current error state
+ scanInterval := time.Duration(mm.config.ScanIntervalSeconds) * time.Second
+ nextScanInterval := scanInterval
+ if mm.errorCount > 0 {
+ nextScanInterval = mm.backoffDelay
+ maxInterval := scanInterval * 10
+ if nextScanInterval > maxInterval {
+ nextScanInterval = maxInterval
+ }
+ }
+ stats.NextScanTime = time.Now().Add(nextScanInterval)
+
+ return stats
+}
+
+// GetErrorState returns the current error state for monitoring
+func (mm *MaintenanceManager) GetErrorState() (errorCount int, lastError error, backoffDelay time.Duration) {
+ mm.mutex.RLock()
+ defer mm.mutex.RUnlock()
+ return mm.errorCount, mm.lastError, mm.backoffDelay
+}
+
+// GetTasks returns tasks with filtering
+func (mm *MaintenanceManager) GetTasks(status MaintenanceTaskStatus, taskType MaintenanceTaskType, limit int) []*MaintenanceTask {
+ return mm.queue.GetTasks(status, taskType, limit)
+}
+
+// GetWorkers returns all registered workers
+func (mm *MaintenanceManager) GetWorkers() []*MaintenanceWorker {
+ return mm.queue.GetWorkers()
+}
+
+// TriggerScan manually triggers a maintenance scan
+func (mm *MaintenanceManager) TriggerScan() error {
+ if !mm.running {
+ return fmt.Errorf("maintenance manager is not running")
+ }
+
+ go mm.performScan()
+ return nil
+}
+
+// UpdateConfig updates the maintenance configuration
+func (mm *MaintenanceManager) UpdateConfig(config *MaintenanceConfig) error {
+ if config == nil {
+ return fmt.Errorf("config cannot be nil")
+ }
+
+ mm.config = config
+ mm.queue.policy = config.Policy
+ mm.scanner.policy = config.Policy
+
+ glog.V(1).Infof("Maintenance configuration updated")
+ return nil
+}
+
+// CancelTask cancels a pending task
+func (mm *MaintenanceManager) CancelTask(taskID string) error {
+ mm.queue.mutex.Lock()
+ defer mm.queue.mutex.Unlock()
+
+ task, exists := mm.queue.tasks[taskID]
+ if !exists {
+ return fmt.Errorf("task %s not found", taskID)
+ }
+
+ if task.Status == TaskStatusPending {
+ task.Status = TaskStatusCancelled
+ task.CompletedAt = &[]time.Time{time.Now()}[0]
+
+ // Remove from pending tasks
+ for i, pendingTask := range mm.queue.pendingTasks {
+ if pendingTask.ID == taskID {
+ mm.queue.pendingTasks = append(mm.queue.pendingTasks[:i], mm.queue.pendingTasks[i+1:]...)
+ break
+ }
+ }
+
+ glog.V(2).Infof("Cancelled task %s", taskID)
+ return nil
+ }
+
+ return fmt.Errorf("task %s cannot be cancelled (status: %s)", taskID, task.Status)
+}
+
+// RegisterWorker registers a new worker
+func (mm *MaintenanceManager) RegisterWorker(worker *MaintenanceWorker) {
+ mm.queue.RegisterWorker(worker)
+}
+
+// GetNextTask returns the next task for a worker
+func (mm *MaintenanceManager) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask {
+ return mm.queue.GetNextTask(workerID, capabilities)
+}
+
+// CompleteTask marks a task as completed
+func (mm *MaintenanceManager) CompleteTask(taskID string, error string) {
+ mm.queue.CompleteTask(taskID, error)
+}
+
+// UpdateTaskProgress updates task progress
+func (mm *MaintenanceManager) UpdateTaskProgress(taskID string, progress float64) {
+ mm.queue.UpdateTaskProgress(taskID, progress)
+}
+
+// UpdateWorkerHeartbeat updates worker heartbeat
+func (mm *MaintenanceManager) UpdateWorkerHeartbeat(workerID string) {
+ mm.queue.UpdateWorkerHeartbeat(workerID)
+}