diff options
Diffstat (limited to 'weed/admin/maintenance/maintenance_integration.go')
| -rw-r--r-- | weed/admin/maintenance/maintenance_integration.go | 409 |
1 files changed, 409 insertions, 0 deletions
diff --git a/weed/admin/maintenance/maintenance_integration.go b/weed/admin/maintenance/maintenance_integration.go new file mode 100644 index 000000000..9a965d38a --- /dev/null +++ b/weed/admin/maintenance/maintenance_integration.go @@ -0,0 +1,409 @@ +package maintenance + +import ( + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/worker/tasks" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// MaintenanceIntegration bridges the task system with existing maintenance +type MaintenanceIntegration struct { + taskRegistry *types.TaskRegistry + uiRegistry *types.UIRegistry + + // Bridge to existing system + maintenanceQueue *MaintenanceQueue + maintenancePolicy *MaintenancePolicy + + // Type conversion maps + taskTypeMap map[types.TaskType]MaintenanceTaskType + revTaskTypeMap map[MaintenanceTaskType]types.TaskType + priorityMap map[types.TaskPriority]MaintenanceTaskPriority + revPriorityMap map[MaintenanceTaskPriority]types.TaskPriority +} + +// NewMaintenanceIntegration creates the integration bridge +func NewMaintenanceIntegration(queue *MaintenanceQueue, policy *MaintenancePolicy) *MaintenanceIntegration { + integration := &MaintenanceIntegration{ + taskRegistry: tasks.GetGlobalTypesRegistry(), // Use global types registry with auto-registered tasks + uiRegistry: tasks.GetGlobalUIRegistry(), // Use global UI registry with auto-registered UI providers + maintenanceQueue: queue, + maintenancePolicy: policy, + } + + // Initialize type conversion maps + integration.initializeTypeMaps() + + // Register all tasks + integration.registerAllTasks() + + return integration +} + +// initializeTypeMaps creates the type conversion maps for dynamic conversion +func (s *MaintenanceIntegration) initializeTypeMaps() { + // Initialize empty maps + s.taskTypeMap = make(map[types.TaskType]MaintenanceTaskType) + s.revTaskTypeMap = make(map[MaintenanceTaskType]types.TaskType) + + // Build task type mappings dynamically from registered tasks after registration + // This will be called from registerAllTasks() after all tasks are registered + + // Priority mappings (these are static and don't depend on registered tasks) + s.priorityMap = map[types.TaskPriority]MaintenanceTaskPriority{ + types.TaskPriorityLow: PriorityLow, + types.TaskPriorityNormal: PriorityNormal, + types.TaskPriorityHigh: PriorityHigh, + } + + // Reverse priority mappings + s.revPriorityMap = map[MaintenanceTaskPriority]types.TaskPriority{ + PriorityLow: types.TaskPriorityLow, + PriorityNormal: types.TaskPriorityNormal, + PriorityHigh: types.TaskPriorityHigh, + PriorityCritical: types.TaskPriorityHigh, // Map critical to high + } +} + +// buildTaskTypeMappings dynamically builds task type mappings from registered tasks +func (s *MaintenanceIntegration) buildTaskTypeMappings() { + // Clear existing mappings + s.taskTypeMap = make(map[types.TaskType]MaintenanceTaskType) + s.revTaskTypeMap = make(map[MaintenanceTaskType]types.TaskType) + + // Build mappings from registered detectors + for workerTaskType := range s.taskRegistry.GetAllDetectors() { + // Convert types.TaskType to MaintenanceTaskType by string conversion + maintenanceTaskType := MaintenanceTaskType(string(workerTaskType)) + + s.taskTypeMap[workerTaskType] = maintenanceTaskType + s.revTaskTypeMap[maintenanceTaskType] = workerTaskType + + glog.V(3).Infof("Dynamically mapped task type: %s <-> %s", workerTaskType, maintenanceTaskType) + } + + glog.V(2).Infof("Built %d dynamic task type mappings", len(s.taskTypeMap)) +} + +// registerAllTasks registers all available tasks +func (s *MaintenanceIntegration) registerAllTasks() { + // Tasks are already auto-registered via import statements + // No manual registration needed + + // Build dynamic type mappings from registered tasks + s.buildTaskTypeMappings() + + // Configure tasks from policy + s.configureTasksFromPolicy() + + registeredTaskTypes := make([]string, 0, len(s.taskTypeMap)) + for _, maintenanceTaskType := range s.taskTypeMap { + registeredTaskTypes = append(registeredTaskTypes, string(maintenanceTaskType)) + } + glog.V(1).Infof("Registered tasks: %v", registeredTaskTypes) +} + +// configureTasksFromPolicy dynamically configures all registered tasks based on the maintenance policy +func (s *MaintenanceIntegration) configureTasksFromPolicy() { + if s.maintenancePolicy == nil { + return + } + + // Configure all registered detectors and schedulers dynamically using policy configuration + configuredCount := 0 + + // Get all registered task types from the registry + for taskType, detector := range s.taskRegistry.GetAllDetectors() { + // Configure detector using policy-based configuration + s.configureDetectorFromPolicy(taskType, detector) + configuredCount++ + } + + for taskType, scheduler := range s.taskRegistry.GetAllSchedulers() { + // Configure scheduler using policy-based configuration + s.configureSchedulerFromPolicy(taskType, scheduler) + } + + glog.V(1).Infof("Dynamically configured %d task types from maintenance policy", configuredCount) +} + +// configureDetectorFromPolicy configures a detector using policy-based configuration +func (s *MaintenanceIntegration) configureDetectorFromPolicy(taskType types.TaskType, detector types.TaskDetector) { + // Try to configure using PolicyConfigurableDetector interface if supported + if configurableDetector, ok := detector.(types.PolicyConfigurableDetector); ok { + configurableDetector.ConfigureFromPolicy(s.maintenancePolicy) + glog.V(2).Infof("Configured detector %s using policy interface", taskType) + return + } + + // Apply basic configuration that all detectors should support + if basicDetector, ok := detector.(interface{ SetEnabled(bool) }); ok { + // Convert task system type to maintenance task type for policy lookup + maintenanceTaskType, exists := s.taskTypeMap[taskType] + if exists { + enabled := s.maintenancePolicy.IsTaskEnabled(maintenanceTaskType) + basicDetector.SetEnabled(enabled) + glog.V(3).Infof("Set enabled=%v for detector %s", enabled, taskType) + } + } + + // For detectors that don't implement PolicyConfigurableDetector interface, + // they should be updated to implement it for full policy-based configuration + glog.V(2).Infof("Detector %s should implement PolicyConfigurableDetector interface for full policy support", taskType) +} + +// configureSchedulerFromPolicy configures a scheduler using policy-based configuration +func (s *MaintenanceIntegration) configureSchedulerFromPolicy(taskType types.TaskType, scheduler types.TaskScheduler) { + // Try to configure using PolicyConfigurableScheduler interface if supported + if configurableScheduler, ok := scheduler.(types.PolicyConfigurableScheduler); ok { + configurableScheduler.ConfigureFromPolicy(s.maintenancePolicy) + glog.V(2).Infof("Configured scheduler %s using policy interface", taskType) + return + } + + // Apply basic configuration that all schedulers should support + maintenanceTaskType, exists := s.taskTypeMap[taskType] + if !exists { + glog.V(3).Infof("No maintenance task type mapping for %s, skipping configuration", taskType) + return + } + + // Set enabled status if scheduler supports it + if enableableScheduler, ok := scheduler.(interface{ SetEnabled(bool) }); ok { + enabled := s.maintenancePolicy.IsTaskEnabled(maintenanceTaskType) + enableableScheduler.SetEnabled(enabled) + glog.V(3).Infof("Set enabled=%v for scheduler %s", enabled, taskType) + } + + // Set max concurrent if scheduler supports it + if concurrentScheduler, ok := scheduler.(interface{ SetMaxConcurrent(int) }); ok { + maxConcurrent := s.maintenancePolicy.GetMaxConcurrent(maintenanceTaskType) + if maxConcurrent > 0 { + concurrentScheduler.SetMaxConcurrent(maxConcurrent) + glog.V(3).Infof("Set max concurrent=%d for scheduler %s", maxConcurrent, taskType) + } + } + + // For schedulers that don't implement PolicyConfigurableScheduler interface, + // they should be updated to implement it for full policy-based configuration + glog.V(2).Infof("Scheduler %s should implement PolicyConfigurableScheduler interface for full policy support", taskType) +} + +// ScanWithTaskDetectors performs a scan using the task system +func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.VolumeHealthMetrics) ([]*TaskDetectionResult, error) { + var allResults []*TaskDetectionResult + + // Create cluster info + clusterInfo := &types.ClusterInfo{ + TotalVolumes: len(volumeMetrics), + LastUpdated: time.Now(), + } + + // Run detection for each registered task type + for taskType, detector := range s.taskRegistry.GetAllDetectors() { + if !detector.IsEnabled() { + continue + } + + glog.V(2).Infof("Running detection for task type: %s", taskType) + + results, err := detector.ScanForTasks(volumeMetrics, clusterInfo) + if err != nil { + glog.Errorf("Failed to scan for %s tasks: %v", taskType, err) + continue + } + + // Convert results to existing system format + for _, result := range results { + existingResult := s.convertToExistingFormat(result) + if existingResult != nil { + allResults = append(allResults, existingResult) + } + } + + glog.V(2).Infof("Found %d %s tasks", len(results), taskType) + } + + return allResults, nil +} + +// convertToExistingFormat converts task results to existing system format using dynamic mapping +func (s *MaintenanceIntegration) convertToExistingFormat(result *types.TaskDetectionResult) *TaskDetectionResult { + // Convert types using mapping tables + existingType, exists := s.taskTypeMap[result.TaskType] + if !exists { + glog.Warningf("Unknown task type %s, skipping conversion", result.TaskType) + // Return nil to indicate conversion failed - caller should handle this + return nil + } + + existingPriority, exists := s.priorityMap[result.Priority] + if !exists { + glog.Warningf("Unknown priority %d, defaulting to normal", result.Priority) + existingPriority = PriorityNormal + } + + return &TaskDetectionResult{ + TaskType: existingType, + VolumeID: result.VolumeID, + Server: result.Server, + Collection: result.Collection, + Priority: existingPriority, + Reason: result.Reason, + Parameters: result.Parameters, + ScheduleAt: result.ScheduleAt, + } +} + +// CanScheduleWithTaskSchedulers determines if a task can be scheduled using task schedulers with dynamic type conversion +func (s *MaintenanceIntegration) CanScheduleWithTaskSchedulers(task *MaintenanceTask, runningTasks []*MaintenanceTask, availableWorkers []*MaintenanceWorker) bool { + // Convert existing types to task types using mapping + taskType, exists := s.revTaskTypeMap[task.Type] + if !exists { + glog.V(2).Infof("Unknown task type %s for scheduling, falling back to existing logic", task.Type) + return false // Fallback to existing logic for unknown types + } + + // Convert task objects + taskObject := s.convertTaskToTaskSystem(task) + if taskObject == nil { + glog.V(2).Infof("Failed to convert task %s for scheduling", task.ID) + return false + } + + runningTaskObjects := s.convertTasksToTaskSystem(runningTasks) + workerObjects := s.convertWorkersToTaskSystem(availableWorkers) + + // Get the appropriate scheduler + scheduler := s.taskRegistry.GetScheduler(taskType) + if scheduler == nil { + glog.V(2).Infof("No scheduler found for task type %s", taskType) + return false + } + + return scheduler.CanScheduleNow(taskObject, runningTaskObjects, workerObjects) +} + +// convertTaskToTaskSystem converts existing task to task system format using dynamic mapping +func (s *MaintenanceIntegration) convertTaskToTaskSystem(task *MaintenanceTask) *types.Task { + // Convert task type using mapping + taskType, exists := s.revTaskTypeMap[task.Type] + if !exists { + glog.Errorf("Unknown task type %s in conversion, cannot convert task", task.Type) + // Return nil to indicate conversion failed + return nil + } + + // Convert priority using mapping + priority, exists := s.revPriorityMap[task.Priority] + if !exists { + glog.Warningf("Unknown priority %d in conversion, defaulting to normal", task.Priority) + priority = types.TaskPriorityNormal + } + + return &types.Task{ + ID: task.ID, + Type: taskType, + Priority: priority, + VolumeID: task.VolumeID, + Server: task.Server, + Collection: task.Collection, + Parameters: task.Parameters, + CreatedAt: task.CreatedAt, + } +} + +// convertTasksToTaskSystem converts multiple tasks +func (s *MaintenanceIntegration) convertTasksToTaskSystem(tasks []*MaintenanceTask) []*types.Task { + var result []*types.Task + for _, task := range tasks { + converted := s.convertTaskToTaskSystem(task) + if converted != nil { + result = append(result, converted) + } + } + return result +} + +// convertWorkersToTaskSystem converts workers to task system format using dynamic mapping +func (s *MaintenanceIntegration) convertWorkersToTaskSystem(workers []*MaintenanceWorker) []*types.Worker { + var result []*types.Worker + for _, worker := range workers { + capabilities := make([]types.TaskType, 0, len(worker.Capabilities)) + for _, cap := range worker.Capabilities { + // Convert capability using mapping + taskType, exists := s.revTaskTypeMap[cap] + if exists { + capabilities = append(capabilities, taskType) + } else { + glog.V(3).Infof("Unknown capability %s for worker %s, skipping", cap, worker.ID) + } + } + + result = append(result, &types.Worker{ + ID: worker.ID, + Address: worker.Address, + Capabilities: capabilities, + MaxConcurrent: worker.MaxConcurrent, + CurrentLoad: worker.CurrentLoad, + }) + } + return result +} + +// GetTaskScheduler returns the scheduler for a task type using dynamic mapping +func (s *MaintenanceIntegration) GetTaskScheduler(taskType MaintenanceTaskType) types.TaskScheduler { + // Convert task type using mapping + taskSystemType, exists := s.revTaskTypeMap[taskType] + if !exists { + glog.V(3).Infof("Unknown task type %s for scheduler", taskType) + return nil + } + + return s.taskRegistry.GetScheduler(taskSystemType) +} + +// GetUIProvider returns the UI provider for a task type using dynamic mapping +func (s *MaintenanceIntegration) GetUIProvider(taskType MaintenanceTaskType) types.TaskUIProvider { + // Convert task type using mapping + taskSystemType, exists := s.revTaskTypeMap[taskType] + if !exists { + glog.V(3).Infof("Unknown task type %s for UI provider", taskType) + return nil + } + + return s.uiRegistry.GetProvider(taskSystemType) +} + +// GetAllTaskStats returns stats for all registered tasks +func (s *MaintenanceIntegration) GetAllTaskStats() []*types.TaskStats { + var stats []*types.TaskStats + + for taskType, detector := range s.taskRegistry.GetAllDetectors() { + uiProvider := s.uiRegistry.GetProvider(taskType) + if uiProvider == nil { + continue + } + + stat := &types.TaskStats{ + TaskType: taskType, + DisplayName: uiProvider.GetDisplayName(), + Enabled: detector.IsEnabled(), + LastScan: time.Now().Add(-detector.ScanInterval()), + NextScan: time.Now().Add(detector.ScanInterval()), + ScanInterval: detector.ScanInterval(), + MaxConcurrent: s.taskRegistry.GetScheduler(taskType).GetMaxConcurrent(), + // Would need to get these from actual queue/stats + PendingTasks: 0, + RunningTasks: 0, + CompletedToday: 0, + FailedToday: 0, + } + + stats = append(stats, stat) + } + + return stats +} |
