aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/maintenance/maintenance_manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/maintenance/maintenance_manager.go')
-rw-r--r--weed/admin/maintenance/maintenance_manager.go225
1 files changed, 193 insertions, 32 deletions
diff --git a/weed/admin/maintenance/maintenance_manager.go b/weed/admin/maintenance/maintenance_manager.go
index 5d87d817e..4aab137e0 100644
--- a/weed/admin/maintenance/maintenance_manager.go
+++ b/weed/admin/maintenance/maintenance_manager.go
@@ -7,8 +7,76 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/balance"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
)
+// buildPolicyFromTaskConfigs loads task configurations from separate files and builds a MaintenancePolicy
+func buildPolicyFromTaskConfigs() *worker_pb.MaintenancePolicy {
+ policy := &worker_pb.MaintenancePolicy{
+ GlobalMaxConcurrent: 4,
+ DefaultRepeatIntervalSeconds: 6 * 3600, // 6 hours in seconds
+ DefaultCheckIntervalSeconds: 12 * 3600, // 12 hours in seconds
+ TaskPolicies: make(map[string]*worker_pb.TaskPolicy),
+ }
+
+ // Load vacuum task configuration
+ if vacuumConfig := vacuum.LoadConfigFromPersistence(nil); vacuumConfig != nil {
+ policy.TaskPolicies["vacuum"] = &worker_pb.TaskPolicy{
+ Enabled: vacuumConfig.Enabled,
+ MaxConcurrent: int32(vacuumConfig.MaxConcurrent),
+ RepeatIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(vacuumConfig.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_VacuumConfig{
+ VacuumConfig: &worker_pb.VacuumTaskConfig{
+ GarbageThreshold: float64(vacuumConfig.GarbageThreshold),
+ MinVolumeAgeHours: int32(vacuumConfig.MinVolumeAgeSeconds / 3600), // Convert seconds to hours
+ MinIntervalSeconds: int32(vacuumConfig.MinIntervalSeconds),
+ },
+ },
+ }
+ }
+
+ // Load erasure coding task configuration
+ if ecConfig := erasure_coding.LoadConfigFromPersistence(nil); ecConfig != nil {
+ policy.TaskPolicies["erasure_coding"] = &worker_pb.TaskPolicy{
+ Enabled: ecConfig.Enabled,
+ MaxConcurrent: int32(ecConfig.MaxConcurrent),
+ RepeatIntervalSeconds: int32(ecConfig.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(ecConfig.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_ErasureCodingConfig{
+ ErasureCodingConfig: &worker_pb.ErasureCodingTaskConfig{
+ FullnessRatio: float64(ecConfig.FullnessRatio),
+ QuietForSeconds: int32(ecConfig.QuietForSeconds),
+ MinVolumeSizeMb: int32(ecConfig.MinSizeMB),
+ CollectionFilter: ecConfig.CollectionFilter,
+ },
+ },
+ }
+ }
+
+ // Load balance task configuration
+ if balanceConfig := balance.LoadConfigFromPersistence(nil); balanceConfig != nil {
+ policy.TaskPolicies["balance"] = &worker_pb.TaskPolicy{
+ Enabled: balanceConfig.Enabled,
+ MaxConcurrent: int32(balanceConfig.MaxConcurrent),
+ RepeatIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
+ CheckIntervalSeconds: int32(balanceConfig.ScanIntervalSeconds),
+ TaskConfig: &worker_pb.TaskPolicy_BalanceConfig{
+ BalanceConfig: &worker_pb.BalanceTaskConfig{
+ ImbalanceThreshold: float64(balanceConfig.ImbalanceThreshold),
+ MinServerCount: int32(balanceConfig.MinServerCount),
+ },
+ },
+ }
+ }
+
+ glog.V(1).Infof("Built maintenance policy from separate task configs - %d task policies loaded", len(policy.TaskPolicies))
+ return policy
+}
+
// MaintenanceManager coordinates the maintenance system
type MaintenanceManager struct {
config *MaintenanceConfig
@@ -18,11 +86,12 @@ type MaintenanceManager struct {
running bool
stopChan chan struct{}
// Error handling and backoff
- errorCount int
- lastError error
- lastErrorTime time.Time
- backoffDelay time.Duration
- mutex sync.RWMutex
+ errorCount int
+ lastError error
+ lastErrorTime time.Time
+ backoffDelay time.Duration
+ mutex sync.RWMutex
+ scanInProgress bool
}
// NewMaintenanceManager creates a new maintenance manager
@@ -31,8 +100,15 @@ func NewMaintenanceManager(adminClient AdminClient, config *MaintenanceConfig) *
config = DefaultMaintenanceConfig()
}
- queue := NewMaintenanceQueue(config.Policy)
- scanner := NewMaintenanceScanner(adminClient, config.Policy, queue)
+ // Use the policy from the config (which is populated from separate task files in LoadMaintenanceConfig)
+ policy := config.Policy
+ if policy == nil {
+ // Fallback: build policy from separate task configuration files if not already populated
+ policy = buildPolicyFromTaskConfigs()
+ }
+
+ queue := NewMaintenanceQueue(policy)
+ scanner := NewMaintenanceScanner(adminClient, policy, queue)
return &MaintenanceManager{
config: config,
@@ -125,23 +201,14 @@ func (mm *MaintenanceManager) scanLoop() {
return
case <-ticker.C:
glog.V(1).Infof("Performing maintenance scan every %v", scanInterval)
- mm.performScan()
-
- // Adjust ticker interval based on error state
- mm.mutex.RLock()
- currentInterval := scanInterval
- if mm.errorCount > 0 {
- // Use backoff delay when there are errors
- currentInterval = mm.backoffDelay
- if currentInterval > scanInterval {
- // Don't make it longer than the configured interval * 10
- maxInterval := scanInterval * 10
- if currentInterval > maxInterval {
- currentInterval = maxInterval
- }
- }
+
+ // Use the same synchronization as TriggerScan to prevent concurrent scans
+ if err := mm.triggerScanInternal(false); err != nil {
+ glog.V(1).Infof("Scheduled scan skipped: %v", err)
}
- mm.mutex.RUnlock()
+
+ // Adjust ticker interval based on error state (read error state safely)
+ currentInterval := mm.getScanInterval(scanInterval)
// Reset ticker with new interval if needed
if currentInterval != scanInterval {
@@ -152,6 +219,26 @@ func (mm *MaintenanceManager) scanLoop() {
}
}
+// getScanInterval safely reads the current scan interval with error backoff
+func (mm *MaintenanceManager) getScanInterval(baseInterval time.Duration) time.Duration {
+ mm.mutex.RLock()
+ defer mm.mutex.RUnlock()
+
+ if mm.errorCount > 0 {
+ // Use backoff delay when there are errors
+ currentInterval := mm.backoffDelay
+ if currentInterval > baseInterval {
+ // Don't make it longer than the configured interval * 10
+ maxInterval := baseInterval * 10
+ if currentInterval > maxInterval {
+ currentInterval = maxInterval
+ }
+ }
+ return currentInterval
+ }
+ return baseInterval
+}
+
// cleanupLoop periodically cleans up old tasks and stale workers
func (mm *MaintenanceManager) cleanupLoop() {
cleanupInterval := time.Duration(mm.config.CleanupIntervalSeconds) * time.Second
@@ -170,25 +257,54 @@ func (mm *MaintenanceManager) cleanupLoop() {
// performScan executes a maintenance scan with error handling and backoff
func (mm *MaintenanceManager) performScan() {
- mm.mutex.Lock()
- defer mm.mutex.Unlock()
+ defer func() {
+ // Always reset scan in progress flag when done
+ mm.mutex.Lock()
+ mm.scanInProgress = false
+ mm.mutex.Unlock()
+ }()
- glog.V(2).Infof("Starting maintenance scan")
+ glog.Infof("Starting maintenance scan...")
results, err := mm.scanner.ScanForMaintenanceTasks()
if err != nil {
+ // Handle scan error
+ mm.mutex.Lock()
mm.handleScanError(err)
+ mm.mutex.Unlock()
+ glog.Warningf("Maintenance scan failed: %v", err)
return
}
- // Scan succeeded, reset error tracking
+ // Scan succeeded - update state and process results
+ mm.handleScanSuccess(results)
+}
+
+// handleScanSuccess processes successful scan results with proper lock management
+func (mm *MaintenanceManager) handleScanSuccess(results []*TaskDetectionResult) {
+ // Update manager state first
+ mm.mutex.Lock()
mm.resetErrorTracking()
+ taskCount := len(results)
+ mm.mutex.Unlock()
+
+ if taskCount > 0 {
+ // Count tasks by type for logging (outside of lock)
+ taskCounts := make(map[MaintenanceTaskType]int)
+ for _, result := range results {
+ taskCounts[result.TaskType]++
+ }
- if len(results) > 0 {
+ // Add tasks to queue (no manager lock held)
mm.queue.AddTasksFromResults(results)
- glog.V(1).Infof("Maintenance scan completed: added %d tasks", len(results))
+
+ // Log detailed scan results
+ glog.Infof("Maintenance scan completed: found %d tasks", taskCount)
+ for taskType, count := range taskCounts {
+ glog.Infof(" - %s: %d tasks", taskType, count)
+ }
} else {
- glog.V(2).Infof("Maintenance scan completed: no tasks needed")
+ glog.Infof("Maintenance scan completed: no maintenance tasks needed")
}
}
@@ -272,8 +388,19 @@ func (mm *MaintenanceManager) performCleanup() {
removedTasks := mm.queue.CleanupOldTasks(taskRetention)
removedWorkers := mm.queue.RemoveStaleWorkers(workerTimeout)
- if removedTasks > 0 || removedWorkers > 0 {
- glog.V(1).Infof("Cleanup completed: removed %d old tasks and %d stale workers", removedTasks, removedWorkers)
+ // Clean up stale pending operations (operations running for more than 4 hours)
+ staleOperationTimeout := 4 * time.Hour
+ removedOperations := 0
+ if mm.scanner != nil && mm.scanner.integration != nil {
+ pendingOps := mm.scanner.integration.GetPendingOperations()
+ if pendingOps != nil {
+ removedOperations = pendingOps.CleanupStaleOperations(staleOperationTimeout)
+ }
+ }
+
+ if removedTasks > 0 || removedWorkers > 0 || removedOperations > 0 {
+ glog.V(1).Infof("Cleanup completed: removed %d old tasks, %d stale workers, and %d stale operations",
+ removedTasks, removedWorkers, removedOperations)
}
}
@@ -311,6 +438,21 @@ func (mm *MaintenanceManager) GetStats() *MaintenanceStats {
return stats
}
+// ReloadTaskConfigurations reloads task configurations from the current policy
+func (mm *MaintenanceManager) ReloadTaskConfigurations() error {
+ mm.mutex.Lock()
+ defer mm.mutex.Unlock()
+
+ // Trigger configuration reload in the integration layer
+ if mm.scanner != nil && mm.scanner.integration != nil {
+ mm.scanner.integration.ConfigureTasksFromPolicy()
+ glog.V(1).Infof("Task configurations reloaded from policy")
+ return nil
+ }
+
+ return fmt.Errorf("integration not available for configuration reload")
+}
+
// GetErrorState returns the current error state for monitoring
func (mm *MaintenanceManager) GetErrorState() (errorCount int, lastError error, backoffDelay time.Duration) {
mm.mutex.RLock()
@@ -330,10 +472,29 @@ func (mm *MaintenanceManager) GetWorkers() []*MaintenanceWorker {
// TriggerScan manually triggers a maintenance scan
func (mm *MaintenanceManager) TriggerScan() error {
+ return mm.triggerScanInternal(true)
+}
+
+// triggerScanInternal handles both manual and automatic scan triggers
+func (mm *MaintenanceManager) triggerScanInternal(isManual bool) error {
if !mm.running {
return fmt.Errorf("maintenance manager is not running")
}
+ // Prevent multiple concurrent scans
+ mm.mutex.Lock()
+ if mm.scanInProgress {
+ mm.mutex.Unlock()
+ if isManual {
+ glog.V(1).Infof("Manual scan already in progress, ignoring trigger request")
+ } else {
+ glog.V(2).Infof("Automatic scan already in progress, ignoring scheduled scan")
+ }
+ return fmt.Errorf("scan already in progress")
+ }
+ mm.scanInProgress = true
+ mm.mutex.Unlock()
+
go mm.performScan()
return nil
}