aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/maintenance/maintenance_integration.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/maintenance/maintenance_integration.go')
-rw-r--r--weed/admin/maintenance/maintenance_integration.go409
1 files changed, 409 insertions, 0 deletions
diff --git a/weed/admin/maintenance/maintenance_integration.go b/weed/admin/maintenance/maintenance_integration.go
new file mode 100644
index 000000000..9a965d38a
--- /dev/null
+++ b/weed/admin/maintenance/maintenance_integration.go
@@ -0,0 +1,409 @@
+package maintenance
+
+import (
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/worker/tasks"
+ "github.com/seaweedfs/seaweedfs/weed/worker/types"
+)
+
+// MaintenanceIntegration bridges the task system with existing maintenance
+type MaintenanceIntegration struct {
+ taskRegistry *types.TaskRegistry
+ uiRegistry *types.UIRegistry
+
+ // Bridge to existing system
+ maintenanceQueue *MaintenanceQueue
+ maintenancePolicy *MaintenancePolicy
+
+ // Type conversion maps
+ taskTypeMap map[types.TaskType]MaintenanceTaskType
+ revTaskTypeMap map[MaintenanceTaskType]types.TaskType
+ priorityMap map[types.TaskPriority]MaintenanceTaskPriority
+ revPriorityMap map[MaintenanceTaskPriority]types.TaskPriority
+}
+
+// NewMaintenanceIntegration creates the integration bridge
+func NewMaintenanceIntegration(queue *MaintenanceQueue, policy *MaintenancePolicy) *MaintenanceIntegration {
+ integration := &MaintenanceIntegration{
+ taskRegistry: tasks.GetGlobalTypesRegistry(), // Use global types registry with auto-registered tasks
+ uiRegistry: tasks.GetGlobalUIRegistry(), // Use global UI registry with auto-registered UI providers
+ maintenanceQueue: queue,
+ maintenancePolicy: policy,
+ }
+
+ // Initialize type conversion maps
+ integration.initializeTypeMaps()
+
+ // Register all tasks
+ integration.registerAllTasks()
+
+ return integration
+}
+
+// initializeTypeMaps creates the type conversion maps for dynamic conversion
+func (s *MaintenanceIntegration) initializeTypeMaps() {
+ // Initialize empty maps
+ s.taskTypeMap = make(map[types.TaskType]MaintenanceTaskType)
+ s.revTaskTypeMap = make(map[MaintenanceTaskType]types.TaskType)
+
+ // Build task type mappings dynamically from registered tasks after registration
+ // This will be called from registerAllTasks() after all tasks are registered
+
+ // Priority mappings (these are static and don't depend on registered tasks)
+ s.priorityMap = map[types.TaskPriority]MaintenanceTaskPriority{
+ types.TaskPriorityLow: PriorityLow,
+ types.TaskPriorityNormal: PriorityNormal,
+ types.TaskPriorityHigh: PriorityHigh,
+ }
+
+ // Reverse priority mappings
+ s.revPriorityMap = map[MaintenanceTaskPriority]types.TaskPriority{
+ PriorityLow: types.TaskPriorityLow,
+ PriorityNormal: types.TaskPriorityNormal,
+ PriorityHigh: types.TaskPriorityHigh,
+ PriorityCritical: types.TaskPriorityHigh, // Map critical to high
+ }
+}
+
+// buildTaskTypeMappings dynamically builds task type mappings from registered tasks
+func (s *MaintenanceIntegration) buildTaskTypeMappings() {
+ // Clear existing mappings
+ s.taskTypeMap = make(map[types.TaskType]MaintenanceTaskType)
+ s.revTaskTypeMap = make(map[MaintenanceTaskType]types.TaskType)
+
+ // Build mappings from registered detectors
+ for workerTaskType := range s.taskRegistry.GetAllDetectors() {
+ // Convert types.TaskType to MaintenanceTaskType by string conversion
+ maintenanceTaskType := MaintenanceTaskType(string(workerTaskType))
+
+ s.taskTypeMap[workerTaskType] = maintenanceTaskType
+ s.revTaskTypeMap[maintenanceTaskType] = workerTaskType
+
+ glog.V(3).Infof("Dynamically mapped task type: %s <-> %s", workerTaskType, maintenanceTaskType)
+ }
+
+ glog.V(2).Infof("Built %d dynamic task type mappings", len(s.taskTypeMap))
+}
+
+// registerAllTasks registers all available tasks
+func (s *MaintenanceIntegration) registerAllTasks() {
+ // Tasks are already auto-registered via import statements
+ // No manual registration needed
+
+ // Build dynamic type mappings from registered tasks
+ s.buildTaskTypeMappings()
+
+ // Configure tasks from policy
+ s.configureTasksFromPolicy()
+
+ registeredTaskTypes := make([]string, 0, len(s.taskTypeMap))
+ for _, maintenanceTaskType := range s.taskTypeMap {
+ registeredTaskTypes = append(registeredTaskTypes, string(maintenanceTaskType))
+ }
+ glog.V(1).Infof("Registered tasks: %v", registeredTaskTypes)
+}
+
+// configureTasksFromPolicy dynamically configures all registered tasks based on the maintenance policy
+func (s *MaintenanceIntegration) configureTasksFromPolicy() {
+ if s.maintenancePolicy == nil {
+ return
+ }
+
+ // Configure all registered detectors and schedulers dynamically using policy configuration
+ configuredCount := 0
+
+ // Get all registered task types from the registry
+ for taskType, detector := range s.taskRegistry.GetAllDetectors() {
+ // Configure detector using policy-based configuration
+ s.configureDetectorFromPolicy(taskType, detector)
+ configuredCount++
+ }
+
+ for taskType, scheduler := range s.taskRegistry.GetAllSchedulers() {
+ // Configure scheduler using policy-based configuration
+ s.configureSchedulerFromPolicy(taskType, scheduler)
+ }
+
+ glog.V(1).Infof("Dynamically configured %d task types from maintenance policy", configuredCount)
+}
+
+// configureDetectorFromPolicy configures a detector using policy-based configuration
+func (s *MaintenanceIntegration) configureDetectorFromPolicy(taskType types.TaskType, detector types.TaskDetector) {
+ // Try to configure using PolicyConfigurableDetector interface if supported
+ if configurableDetector, ok := detector.(types.PolicyConfigurableDetector); ok {
+ configurableDetector.ConfigureFromPolicy(s.maintenancePolicy)
+ glog.V(2).Infof("Configured detector %s using policy interface", taskType)
+ return
+ }
+
+ // Apply basic configuration that all detectors should support
+ if basicDetector, ok := detector.(interface{ SetEnabled(bool) }); ok {
+ // Convert task system type to maintenance task type for policy lookup
+ maintenanceTaskType, exists := s.taskTypeMap[taskType]
+ if exists {
+ enabled := s.maintenancePolicy.IsTaskEnabled(maintenanceTaskType)
+ basicDetector.SetEnabled(enabled)
+ glog.V(3).Infof("Set enabled=%v for detector %s", enabled, taskType)
+ }
+ }
+
+ // For detectors that don't implement PolicyConfigurableDetector interface,
+ // they should be updated to implement it for full policy-based configuration
+ glog.V(2).Infof("Detector %s should implement PolicyConfigurableDetector interface for full policy support", taskType)
+}
+
+// configureSchedulerFromPolicy configures a scheduler using policy-based configuration
+func (s *MaintenanceIntegration) configureSchedulerFromPolicy(taskType types.TaskType, scheduler types.TaskScheduler) {
+ // Try to configure using PolicyConfigurableScheduler interface if supported
+ if configurableScheduler, ok := scheduler.(types.PolicyConfigurableScheduler); ok {
+ configurableScheduler.ConfigureFromPolicy(s.maintenancePolicy)
+ glog.V(2).Infof("Configured scheduler %s using policy interface", taskType)
+ return
+ }
+
+ // Apply basic configuration that all schedulers should support
+ maintenanceTaskType, exists := s.taskTypeMap[taskType]
+ if !exists {
+ glog.V(3).Infof("No maintenance task type mapping for %s, skipping configuration", taskType)
+ return
+ }
+
+ // Set enabled status if scheduler supports it
+ if enableableScheduler, ok := scheduler.(interface{ SetEnabled(bool) }); ok {
+ enabled := s.maintenancePolicy.IsTaskEnabled(maintenanceTaskType)
+ enableableScheduler.SetEnabled(enabled)
+ glog.V(3).Infof("Set enabled=%v for scheduler %s", enabled, taskType)
+ }
+
+ // Set max concurrent if scheduler supports it
+ if concurrentScheduler, ok := scheduler.(interface{ SetMaxConcurrent(int) }); ok {
+ maxConcurrent := s.maintenancePolicy.GetMaxConcurrent(maintenanceTaskType)
+ if maxConcurrent > 0 {
+ concurrentScheduler.SetMaxConcurrent(maxConcurrent)
+ glog.V(3).Infof("Set max concurrent=%d for scheduler %s", maxConcurrent, taskType)
+ }
+ }
+
+ // For schedulers that don't implement PolicyConfigurableScheduler interface,
+ // they should be updated to implement it for full policy-based configuration
+ glog.V(2).Infof("Scheduler %s should implement PolicyConfigurableScheduler interface for full policy support", taskType)
+}
+
+// ScanWithTaskDetectors performs a scan using the task system
+func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.VolumeHealthMetrics) ([]*TaskDetectionResult, error) {
+ var allResults []*TaskDetectionResult
+
+ // Create cluster info
+ clusterInfo := &types.ClusterInfo{
+ TotalVolumes: len(volumeMetrics),
+ LastUpdated: time.Now(),
+ }
+
+ // Run detection for each registered task type
+ for taskType, detector := range s.taskRegistry.GetAllDetectors() {
+ if !detector.IsEnabled() {
+ continue
+ }
+
+ glog.V(2).Infof("Running detection for task type: %s", taskType)
+
+ results, err := detector.ScanForTasks(volumeMetrics, clusterInfo)
+ if err != nil {
+ glog.Errorf("Failed to scan for %s tasks: %v", taskType, err)
+ continue
+ }
+
+ // Convert results to existing system format
+ for _, result := range results {
+ existingResult := s.convertToExistingFormat(result)
+ if existingResult != nil {
+ allResults = append(allResults, existingResult)
+ }
+ }
+
+ glog.V(2).Infof("Found %d %s tasks", len(results), taskType)
+ }
+
+ return allResults, nil
+}
+
+// convertToExistingFormat converts task results to existing system format using dynamic mapping
+func (s *MaintenanceIntegration) convertToExistingFormat(result *types.TaskDetectionResult) *TaskDetectionResult {
+ // Convert types using mapping tables
+ existingType, exists := s.taskTypeMap[result.TaskType]
+ if !exists {
+ glog.Warningf("Unknown task type %s, skipping conversion", result.TaskType)
+ // Return nil to indicate conversion failed - caller should handle this
+ return nil
+ }
+
+ existingPriority, exists := s.priorityMap[result.Priority]
+ if !exists {
+ glog.Warningf("Unknown priority %d, defaulting to normal", result.Priority)
+ existingPriority = PriorityNormal
+ }
+
+ return &TaskDetectionResult{
+ TaskType: existingType,
+ VolumeID: result.VolumeID,
+ Server: result.Server,
+ Collection: result.Collection,
+ Priority: existingPriority,
+ Reason: result.Reason,
+ Parameters: result.Parameters,
+ ScheduleAt: result.ScheduleAt,
+ }
+}
+
+// CanScheduleWithTaskSchedulers determines if a task can be scheduled using task schedulers with dynamic type conversion
+func (s *MaintenanceIntegration) CanScheduleWithTaskSchedulers(task *MaintenanceTask, runningTasks []*MaintenanceTask, availableWorkers []*MaintenanceWorker) bool {
+ // Convert existing types to task types using mapping
+ taskType, exists := s.revTaskTypeMap[task.Type]
+ if !exists {
+ glog.V(2).Infof("Unknown task type %s for scheduling, falling back to existing logic", task.Type)
+ return false // Fallback to existing logic for unknown types
+ }
+
+ // Convert task objects
+ taskObject := s.convertTaskToTaskSystem(task)
+ if taskObject == nil {
+ glog.V(2).Infof("Failed to convert task %s for scheduling", task.ID)
+ return false
+ }
+
+ runningTaskObjects := s.convertTasksToTaskSystem(runningTasks)
+ workerObjects := s.convertWorkersToTaskSystem(availableWorkers)
+
+ // Get the appropriate scheduler
+ scheduler := s.taskRegistry.GetScheduler(taskType)
+ if scheduler == nil {
+ glog.V(2).Infof("No scheduler found for task type %s", taskType)
+ return false
+ }
+
+ return scheduler.CanScheduleNow(taskObject, runningTaskObjects, workerObjects)
+}
+
+// convertTaskToTaskSystem converts existing task to task system format using dynamic mapping
+func (s *MaintenanceIntegration) convertTaskToTaskSystem(task *MaintenanceTask) *types.Task {
+ // Convert task type using mapping
+ taskType, exists := s.revTaskTypeMap[task.Type]
+ if !exists {
+ glog.Errorf("Unknown task type %s in conversion, cannot convert task", task.Type)
+ // Return nil to indicate conversion failed
+ return nil
+ }
+
+ // Convert priority using mapping
+ priority, exists := s.revPriorityMap[task.Priority]
+ if !exists {
+ glog.Warningf("Unknown priority %d in conversion, defaulting to normal", task.Priority)
+ priority = types.TaskPriorityNormal
+ }
+
+ return &types.Task{
+ ID: task.ID,
+ Type: taskType,
+ Priority: priority,
+ VolumeID: task.VolumeID,
+ Server: task.Server,
+ Collection: task.Collection,
+ Parameters: task.Parameters,
+ CreatedAt: task.CreatedAt,
+ }
+}
+
+// convertTasksToTaskSystem converts multiple tasks
+func (s *MaintenanceIntegration) convertTasksToTaskSystem(tasks []*MaintenanceTask) []*types.Task {
+ var result []*types.Task
+ for _, task := range tasks {
+ converted := s.convertTaskToTaskSystem(task)
+ if converted != nil {
+ result = append(result, converted)
+ }
+ }
+ return result
+}
+
+// convertWorkersToTaskSystem converts workers to task system format using dynamic mapping
+func (s *MaintenanceIntegration) convertWorkersToTaskSystem(workers []*MaintenanceWorker) []*types.Worker {
+ var result []*types.Worker
+ for _, worker := range workers {
+ capabilities := make([]types.TaskType, 0, len(worker.Capabilities))
+ for _, cap := range worker.Capabilities {
+ // Convert capability using mapping
+ taskType, exists := s.revTaskTypeMap[cap]
+ if exists {
+ capabilities = append(capabilities, taskType)
+ } else {
+ glog.V(3).Infof("Unknown capability %s for worker %s, skipping", cap, worker.ID)
+ }
+ }
+
+ result = append(result, &types.Worker{
+ ID: worker.ID,
+ Address: worker.Address,
+ Capabilities: capabilities,
+ MaxConcurrent: worker.MaxConcurrent,
+ CurrentLoad: worker.CurrentLoad,
+ })
+ }
+ return result
+}
+
+// GetTaskScheduler returns the scheduler for a task type using dynamic mapping
+func (s *MaintenanceIntegration) GetTaskScheduler(taskType MaintenanceTaskType) types.TaskScheduler {
+ // Convert task type using mapping
+ taskSystemType, exists := s.revTaskTypeMap[taskType]
+ if !exists {
+ glog.V(3).Infof("Unknown task type %s for scheduler", taskType)
+ return nil
+ }
+
+ return s.taskRegistry.GetScheduler(taskSystemType)
+}
+
+// GetUIProvider returns the UI provider for a task type using dynamic mapping
+func (s *MaintenanceIntegration) GetUIProvider(taskType MaintenanceTaskType) types.TaskUIProvider {
+ // Convert task type using mapping
+ taskSystemType, exists := s.revTaskTypeMap[taskType]
+ if !exists {
+ glog.V(3).Infof("Unknown task type %s for UI provider", taskType)
+ return nil
+ }
+
+ return s.uiRegistry.GetProvider(taskSystemType)
+}
+
+// GetAllTaskStats returns stats for all registered tasks
+func (s *MaintenanceIntegration) GetAllTaskStats() []*types.TaskStats {
+ var stats []*types.TaskStats
+
+ for taskType, detector := range s.taskRegistry.GetAllDetectors() {
+ uiProvider := s.uiRegistry.GetProvider(taskType)
+ if uiProvider == nil {
+ continue
+ }
+
+ stat := &types.TaskStats{
+ TaskType: taskType,
+ DisplayName: uiProvider.GetDisplayName(),
+ Enabled: detector.IsEnabled(),
+ LastScan: time.Now().Add(-detector.ScanInterval()),
+ NextScan: time.Now().Add(detector.ScanInterval()),
+ ScanInterval: detector.ScanInterval(),
+ MaxConcurrent: s.taskRegistry.GetScheduler(taskType).GetMaxConcurrent(),
+ // Would need to get these from actual queue/stats
+ PendingTasks: 0,
+ RunningTasks: 0,
+ CompletedToday: 0,
+ FailedToday: 0,
+ }
+
+ stats = append(stats, stat)
+ }
+
+ return stats
+}