diff options
Diffstat (limited to 'weed/admin/maintenance/maintenance_manager.go')
| -rw-r--r-- | weed/admin/maintenance/maintenance_manager.go | 225 |
1 files changed, 193 insertions, 32 deletions
diff --git a/weed/admin/maintenance/maintenance_manager.go b/weed/admin/maintenance/maintenance_manager.go index 5d87d817e..4aab137e0 100644 --- a/weed/admin/maintenance/maintenance_manager.go +++ b/weed/admin/maintenance/maintenance_manager.go @@ -7,8 +7,76 @@ import ( "time" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum" ) +// buildPolicyFromTaskConfigs loads task configurations from separate files and builds a MaintenancePolicy +func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy { + policy := &worker_pb.MaintenancePolicy{ + GlobalMaxConcurrent: 4, + DefaultRepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds + DefaultCheckIntervalSeconds: 12 * 3600, // 12 hours in seconds + TaskPolicies: make(map[string]*worker_pb.TaskPolicy), + } + + // Load vacuum task configuration + if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil { + policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{ + Enabled: vacuumConfig.Enabled, + MaxConcurrent: int32(vacuumConfig.MaxConcurrent), + RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds), + CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds), + TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{ + VacuumConfig: &worker_pb.VacuumTaskConfig{ + GarbageThreshold: float64(vacuumConfig.GarbageThreshold), + MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours + MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds), + }, + }, + } + } + + // Load erasure coding task configuration + if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil { + policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{ + Enabled: ecConfig.Enabled, + MaxConcurrent: int32(ecConfig.MaxConcurrent), + RepeatIntervalSeconds: int32(ecConfig.ScanIntervalSeconds), + CheckIntervalSeconds: int32(ecConfig.ScanIntervalSeconds), + TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{ + ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{ + FullnessRatio: float64(ecConfig.FullnessRatio), + QuietForSeconds: int32(ecConfig.QuietForSeconds), + MinVolumeSizeMb: int32(ecConfig.MinSizeMB), + CollectionFilter: ecConfig.CollectionFilter, + }, + }, + } + } + + // Load balance task configuration + if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil { + policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{ + Enabled: balanceConfig.Enabled, + MaxConcurrent: int32(balanceConfig.MaxConcurrent), + RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds), + CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds), + TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{ + BalanceConfig: &worker_pb.BalanceTaskConfig{ + ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold), + MinServerCount: int32(balanceConfig.MinServerCount), + }, + }, + } + } + + glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies)) + return policy +} + // MaintenanceManager coordinates the maintenance system type MaintenanceManager struct { config *MaintenanceConfig @@ -18,11 +86,12 @@ type MaintenanceManager struct { running bool stopChan chan struct{} // Error handling and backoff - errorCount int - lastError error - lastErrorTime time.Time - backoffDelay time.Duration - mutex sync.RWMutex + errorCount int + lastError error + lastErrorTime time.Time + backoffDelay time.Duration + mutex sync.RWMutex + scanInProgress bool } // NewMaintenanceManager creates a new maintenance manager @@ -31,8 +100,15 @@ func NewMaintenanceManager(adminClient AdminClient, config *MaintenanceConfig) * config = DefaultMaintenanceConfig() } - queue := NewMaintenanceQueue(config.Policy) - scanner := NewMaintenanceScanner(adminClient, config.Policy, queue) + // Use the policy from the config (which is populated from separate task files in LoadMaintenanceConfig) + policy := config.Policy + if policy == nil { + // Fallback: build policy from separate task configuration files if not already populated + policy = buildPolicyFromTaskConfigs() + } + + queue := NewMaintenanceQueue(policy) + scanner := NewMaintenanceScanner(adminClient, policy, queue) return &MaintenanceManager{ config: config, @@ -125,23 +201,14 @@ func (mm *MaintenanceManager) scanLoop() { return case <-ticker.C: glog.V(1).Infof("Performing maintenance scan every %v", scanInterval) - mm.performScan() - - // Adjust ticker interval based on error state - mm.mutex.RLock() - currentInterval := scanInterval - if mm.errorCount > 0 { - // Use backoff delay when there are errors - currentInterval = mm.backoffDelay - if currentInterval > scanInterval { - // Don't make it longer than the configured interval * 10 - maxInterval := scanInterval * 10 - if currentInterval > maxInterval { - currentInterval = maxInterval - } - } + + // Use the same synchronization as TriggerScan to prevent concurrent scans + if err := mm.triggerScanInternal(false); err != nil { + glog.V(1).Infof("Scheduled scan skipped: %v", err) } - mm.mutex.RUnlock() + + // Adjust ticker interval based on error state (read error state safely) + currentInterval := mm.getScanInterval(scanInterval) // Reset ticker with new interval if needed if currentInterval != scanInterval { @@ -152,6 +219,26 @@ func (mm *MaintenanceManager) scanLoop() { } } +// getScanInterval safely reads the current scan interval with error backoff +func (mm *MaintenanceManager) getScanInterval(baseInterval time.Duration) time.Duration { + mm.mutex.RLock() + defer mm.mutex.RUnlock() + + if mm.errorCount > 0 { + // Use backoff delay when there are errors + currentInterval := mm.backoffDelay + if currentInterval > baseInterval { + // Don't make it longer than the configured interval * 10 + maxInterval := baseInterval * 10 + if currentInterval > maxInterval { + currentInterval = maxInterval + } + } + return currentInterval + } + return baseInterval +} + // cleanupLoop periodically cleans up old tasks and stale workers func (mm *MaintenanceManager) cleanupLoop() { cleanupInterval := time.Duration(mm.config.CleanupIntervalSeconds) * time.Second @@ -170,25 +257,54 @@ func (mm *MaintenanceManager) cleanupLoop() { // performScan executes a maintenance scan with error handling and backoff func (mm *MaintenanceManager) performScan() { - mm.mutex.Lock() - defer mm.mutex.Unlock() + defer func() { + // Always reset scan in progress flag when done + mm.mutex.Lock() + mm.scanInProgress = false + mm.mutex.Unlock() + }() - glog.V(2).Infof("Starting maintenance scan") + glog.Infof("Starting maintenance scan...") results, err := mm.scanner.ScanForMaintenanceTasks() if err != nil { + // Handle scan error + mm.mutex.Lock() mm.handleScanError(err) + mm.mutex.Unlock() + glog.Warningf("Maintenance scan failed: %v", err) return } - // Scan succeeded, reset error tracking + // Scan succeeded - update state and process results + mm.handleScanSuccess(results) +} + +// handleScanSuccess processes successful scan results with proper lock management +func (mm *MaintenanceManager) handleScanSuccess(results []*TaskDetectionResult) { + // Update manager state first + mm.mutex.Lock() mm.resetErrorTracking() + taskCount := len(results) + mm.mutex.Unlock() + + if taskCount > 0 { + // Count tasks by type for logging (outside of lock) + taskCounts := make(map[MaintenanceTaskType]int) + for _, result := range results { + taskCounts[result.TaskType]++ + } - if len(results) > 0 { + // Add tasks to queue (no manager lock held) mm.queue.AddTasksFromResults(results) - glog.V(1).Infof("Maintenance scan completed: added %d tasks", len(results)) + + // Log detailed scan results + glog.Infof("Maintenance scan completed: found %d tasks", taskCount) + for taskType, count := range taskCounts { + glog.Infof(" - %s: %d tasks", taskType, count) + } } else { - glog.V(2).Infof("Maintenance scan completed: no tasks needed") + glog.Infof("Maintenance scan completed: no maintenance tasks needed") } } @@ -272,8 +388,19 @@ func (mm *MaintenanceManager) performCleanup() { removedTasks := mm.queue.CleanupOldTasks(taskRetention) removedWorkers := mm.queue.RemoveStaleWorkers(workerTimeout) - if removedTasks > 0 || removedWorkers > 0 { - glog.V(1).Infof("Cleanup completed: removed %d old tasks and %d stale workers", removedTasks, removedWorkers) + // Clean up stale pending operations (operations running for more than 4 hours) + staleOperationTimeout := 4 * time.Hour + removedOperations := 0 + if mm.scanner != nil && mm.scanner.integration != nil { + pendingOps := mm.scanner.integration.GetPendingOperations() + if pendingOps != nil { + removedOperations = pendingOps.CleanupStaleOperations(staleOperationTimeout) + } + } + + if removedTasks > 0 || removedWorkers > 0 || removedOperations > 0 { + glog.V(1).Infof("Cleanup completed: removed %d old tasks, %d stale workers, and %d stale operations", + removedTasks, removedWorkers, removedOperations) } } @@ -311,6 +438,21 @@ func (mm *MaintenanceManager) GetStats() *MaintenanceStats { return stats } +// ReloadTaskConfigurations reloads task configurations from the current policy +func (mm *MaintenanceManager) ReloadTaskConfigurations() error { + mm.mutex.Lock() + defer mm.mutex.Unlock() + + // Trigger configuration reload in the integration layer + if mm.scanner != nil && mm.scanner.integration != nil { + mm.scanner.integration.ConfigureTasksFromPolicy() + glog.V(1).Infof("Task configurations reloaded from policy") + return nil + } + + return fmt.Errorf("integration not available for configuration reload") +} + // GetErrorState returns the current error state for monitoring func (mm *MaintenanceManager) GetErrorState() (errorCount int, lastError error, backoffDelay time.Duration) { mm.mutex.RLock() @@ -330,10 +472,29 @@ func (mm *MaintenanceManager) GetWorkers() []*MaintenanceWorker { // TriggerScan manually triggers a maintenance scan func (mm *MaintenanceManager) TriggerScan() error { + return mm.triggerScanInternal(true) +} + +// triggerScanInternal handles both manual and automatic scan triggers +func (mm *MaintenanceManager) triggerScanInternal(isManual bool) error { if !mm.running { return fmt.Errorf("maintenance manager is not running") } + // Prevent multiple concurrent scans + mm.mutex.Lock() + if mm.scanInProgress { + mm.mutex.Unlock() + if isManual { + glog.V(1).Infof("Manual scan already in progress, ignoring trigger request") + } else { + glog.V(2).Infof("Automatic scan already in progress, ignoring scheduled scan") + } + return fmt.Errorf("scan already in progress") + } + mm.scanInProgress = true + mm.mutex.Unlock() + go mm.performScan() return nil } |
