diff options
Diffstat (limited to 'weed/admin/maintenance/maintenance_queue.go')
| -rw-r--r-- | weed/admin/maintenance/maintenance_queue.go | 500 |
1 files changed, 500 insertions, 0 deletions
diff --git a/weed/admin/maintenance/maintenance_queue.go b/weed/admin/maintenance/maintenance_queue.go new file mode 100644 index 000000000..580a98718 --- /dev/null +++ b/weed/admin/maintenance/maintenance_queue.go @@ -0,0 +1,500 @@ +package maintenance + +import ( + "sort" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +// NewMaintenanceQueue creates a new maintenance queue +func NewMaintenanceQueue(policy *MaintenancePolicy) *MaintenanceQueue { + queue := &MaintenanceQueue{ + tasks: make(map[string]*MaintenanceTask), + workers: make(map[string]*MaintenanceWorker), + pendingTasks: make([]*MaintenanceTask, 0), + policy: policy, + } + return queue +} + +// SetIntegration sets the integration reference +func (mq *MaintenanceQueue) SetIntegration(integration *MaintenanceIntegration) { + mq.integration = integration + glog.V(1).Infof("Maintenance queue configured with integration") +} + +// AddTask adds a new maintenance task to the queue +func (mq *MaintenanceQueue) AddTask(task *MaintenanceTask) { + mq.mutex.Lock() + defer mq.mutex.Unlock() + + task.ID = generateTaskID() + task.Status = TaskStatusPending + task.CreatedAt = time.Now() + task.MaxRetries = 3 // Default retry count + + mq.tasks[task.ID] = task + mq.pendingTasks = append(mq.pendingTasks, task) + + // Sort pending tasks by priority and schedule time + sort.Slice(mq.pendingTasks, func(i, j int) bool { + if mq.pendingTasks[i].Priority != mq.pendingTasks[j].Priority { + return mq.pendingTasks[i].Priority > mq.pendingTasks[j].Priority + } + return mq.pendingTasks[i].ScheduledAt.Before(mq.pendingTasks[j].ScheduledAt) + }) + + glog.V(2).Infof("Added maintenance task %s: %s for volume %d", task.ID, task.Type, task.VolumeID) +} + +// AddTasksFromResults converts detection results to tasks and adds them to the queue +func (mq *MaintenanceQueue) AddTasksFromResults(results []*TaskDetectionResult) { + for _, result := range results { + task := &MaintenanceTask{ + Type: result.TaskType, + Priority: result.Priority, + VolumeID: result.VolumeID, + Server: result.Server, + Collection: result.Collection, + Parameters: result.Parameters, + Reason: result.Reason, + ScheduledAt: result.ScheduleAt, + } + mq.AddTask(task) + } +} + +// GetNextTask returns the next available task for a worker +func (mq *MaintenanceQueue) GetNextTask(workerID string, capabilities []MaintenanceTaskType) *MaintenanceTask { + mq.mutex.Lock() + defer mq.mutex.Unlock() + + worker, exists := mq.workers[workerID] + if !exists { + return nil + } + + // Check if worker has capacity + if worker.CurrentLoad >= worker.MaxConcurrent { + return nil + } + + now := time.Now() + + // Find the next suitable task + for i, task := range mq.pendingTasks { + // Check if it's time to execute the task + if task.ScheduledAt.After(now) { + continue + } + + // Check if worker can handle this task type + if !mq.workerCanHandle(task.Type, capabilities) { + continue + } + + // Check scheduling logic - use simplified system if available, otherwise fallback + if !mq.canScheduleTaskNow(task) { + continue + } + + // Assign task to worker + task.Status = TaskStatusAssigned + task.WorkerID = workerID + startTime := now + task.StartedAt = &startTime + + // Remove from pending tasks + mq.pendingTasks = append(mq.pendingTasks[:i], mq.pendingTasks[i+1:]...) + + // Update worker + worker.CurrentTask = task + worker.CurrentLoad++ + worker.Status = "busy" + + glog.V(2).Infof("Assigned task %s to worker %s", task.ID, workerID) + return task + } + + return nil +} + +// CompleteTask marks a task as completed +func (mq *MaintenanceQueue) CompleteTask(taskID string, error string) { + mq.mutex.Lock() + defer mq.mutex.Unlock() + + task, exists := mq.tasks[taskID] + if !exists { + return + } + + completedTime := time.Now() + task.CompletedAt = &completedTime + + if error != "" { + task.Status = TaskStatusFailed + task.Error = error + + // Check if task should be retried + if task.RetryCount < task.MaxRetries { + task.RetryCount++ + task.Status = TaskStatusPending + task.WorkerID = "" + task.StartedAt = nil + task.CompletedAt = nil + task.Error = "" + task.ScheduledAt = time.Now().Add(15 * time.Minute) // Retry delay + + mq.pendingTasks = append(mq.pendingTasks, task) + glog.V(2).Infof("Retrying task %s (attempt %d/%d)", taskID, task.RetryCount, task.MaxRetries) + } else { + glog.Errorf("Task %s failed permanently after %d retries: %s", taskID, task.MaxRetries, error) + } + } else { + task.Status = TaskStatusCompleted + task.Progress = 100 + glog.V(2).Infof("Task %s completed successfully", taskID) + } + + // Update worker + if task.WorkerID != "" { + if worker, exists := mq.workers[task.WorkerID]; exists { + worker.CurrentTask = nil + worker.CurrentLoad-- + if worker.CurrentLoad == 0 { + worker.Status = "active" + } + } + } +} + +// UpdateTaskProgress updates the progress of a running task +func (mq *MaintenanceQueue) UpdateTaskProgress(taskID string, progress float64) { + mq.mutex.RLock() + defer mq.mutex.RUnlock() + + if task, exists := mq.tasks[taskID]; exists { + task.Progress = progress + task.Status = TaskStatusInProgress + } +} + +// RegisterWorker registers a new worker +func (mq *MaintenanceQueue) RegisterWorker(worker *MaintenanceWorker) { + mq.mutex.Lock() + defer mq.mutex.Unlock() + + worker.LastHeartbeat = time.Now() + worker.Status = "active" + worker.CurrentLoad = 0 + mq.workers[worker.ID] = worker + + glog.V(1).Infof("Registered maintenance worker %s at %s", worker.ID, worker.Address) +} + +// UpdateWorkerHeartbeat updates worker heartbeat +func (mq *MaintenanceQueue) UpdateWorkerHeartbeat(workerID string) { + mq.mutex.Lock() + defer mq.mutex.Unlock() + + if worker, exists := mq.workers[workerID]; exists { + worker.LastHeartbeat = time.Now() + } +} + +// GetRunningTaskCount returns the number of running tasks of a specific type +func (mq *MaintenanceQueue) GetRunningTaskCount(taskType MaintenanceTaskType) int { + mq.mutex.RLock() + defer mq.mutex.RUnlock() + + count := 0 + for _, task := range mq.tasks { + if task.Type == taskType && (task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress) { + count++ + } + } + return count +} + +// WasTaskRecentlyCompleted checks if a similar task was recently completed +func (mq *MaintenanceQueue) WasTaskRecentlyCompleted(taskType MaintenanceTaskType, volumeID uint32, server string, now time.Time) bool { + mq.mutex.RLock() + defer mq.mutex.RUnlock() + + // Get the repeat prevention interval for this task type + interval := mq.getRepeatPreventionInterval(taskType) + cutoff := now.Add(-interval) + + for _, task := range mq.tasks { + if task.Type == taskType && + task.VolumeID == volumeID && + task.Server == server && + task.Status == TaskStatusCompleted && + task.CompletedAt != nil && + task.CompletedAt.After(cutoff) { + return true + } + } + return false +} + +// getRepeatPreventionInterval returns the interval for preventing task repetition +func (mq *MaintenanceQueue) getRepeatPreventionInterval(taskType MaintenanceTaskType) time.Duration { + // First try to get default from task scheduler + if mq.integration != nil { + if scheduler := mq.integration.GetTaskScheduler(taskType); scheduler != nil { + defaultInterval := scheduler.GetDefaultRepeatInterval() + if defaultInterval > 0 { + glog.V(3).Infof("Using task scheduler default repeat interval for %s: %v", taskType, defaultInterval) + return defaultInterval + } + } + } + + // Fallback to policy configuration if no scheduler available or scheduler doesn't provide default + if mq.policy != nil { + repeatIntervalHours := mq.policy.GetRepeatInterval(taskType) + if repeatIntervalHours > 0 { + interval := time.Duration(repeatIntervalHours) * time.Hour + glog.V(3).Infof("Using policy configuration repeat interval for %s: %v", taskType, interval) + return interval + } + } + + // Ultimate fallback - but avoid hardcoded values where possible + glog.V(2).Infof("No scheduler or policy configuration found for task type %s, using minimal default: 1h", taskType) + return time.Hour // Minimal safe default +} + +// GetTasks returns tasks with optional filtering +func (mq *MaintenanceQueue) GetTasks(status MaintenanceTaskStatus, taskType MaintenanceTaskType, limit int) []*MaintenanceTask { + mq.mutex.RLock() + defer mq.mutex.RUnlock() + + var tasks []*MaintenanceTask + for _, task := range mq.tasks { + if status != "" && task.Status != status { + continue + } + if taskType != "" && task.Type != taskType { + continue + } + tasks = append(tasks, task) + if limit > 0 && len(tasks) >= limit { + break + } + } + + // Sort by creation time (newest first) + sort.Slice(tasks, func(i, j int) bool { + return tasks[i].CreatedAt.After(tasks[j].CreatedAt) + }) + + return tasks +} + +// GetWorkers returns all registered workers +func (mq *MaintenanceQueue) GetWorkers() []*MaintenanceWorker { + mq.mutex.RLock() + defer mq.mutex.RUnlock() + + var workers []*MaintenanceWorker + for _, worker := range mq.workers { + workers = append(workers, worker) + } + return workers +} + +// generateTaskID generates a unique ID for tasks +func generateTaskID() string { + const charset = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, 8) + for i := range b { + b[i] = charset[i%len(charset)] + } + return string(b) +} + +// CleanupOldTasks removes old completed and failed tasks +func (mq *MaintenanceQueue) CleanupOldTasks(retention time.Duration) int { + mq.mutex.Lock() + defer mq.mutex.Unlock() + + cutoff := time.Now().Add(-retention) + removed := 0 + + for id, task := range mq.tasks { + if (task.Status == TaskStatusCompleted || task.Status == TaskStatusFailed) && + task.CompletedAt != nil && + task.CompletedAt.Before(cutoff) { + delete(mq.tasks, id) + removed++ + } + } + + glog.V(2).Infof("Cleaned up %d old maintenance tasks", removed) + return removed +} + +// RemoveStaleWorkers removes workers that haven't sent heartbeat recently +func (mq *MaintenanceQueue) RemoveStaleWorkers(timeout time.Duration) int { + mq.mutex.Lock() + defer mq.mutex.Unlock() + + cutoff := time.Now().Add(-timeout) + removed := 0 + + for id, worker := range mq.workers { + if worker.LastHeartbeat.Before(cutoff) { + // Mark any assigned tasks as failed + for _, task := range mq.tasks { + if task.WorkerID == id && (task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress) { + task.Status = TaskStatusFailed + task.Error = "Worker became unavailable" + completedTime := time.Now() + task.CompletedAt = &completedTime + } + } + + delete(mq.workers, id) + removed++ + glog.Warningf("Removed stale maintenance worker %s", id) + } + } + + return removed +} + +// GetStats returns maintenance statistics +func (mq *MaintenanceQueue) GetStats() *MaintenanceStats { + mq.mutex.RLock() + defer mq.mutex.RUnlock() + + stats := &MaintenanceStats{ + TotalTasks: len(mq.tasks), + TasksByStatus: make(map[MaintenanceTaskStatus]int), + TasksByType: make(map[MaintenanceTaskType]int), + ActiveWorkers: 0, + } + + today := time.Now().Truncate(24 * time.Hour) + var totalDuration time.Duration + var completedTasks int + + for _, task := range mq.tasks { + stats.TasksByStatus[task.Status]++ + stats.TasksByType[task.Type]++ + + if task.CompletedAt != nil && task.CompletedAt.After(today) { + if task.Status == TaskStatusCompleted { + stats.CompletedToday++ + } else if task.Status == TaskStatusFailed { + stats.FailedToday++ + } + + if task.StartedAt != nil { + duration := task.CompletedAt.Sub(*task.StartedAt) + totalDuration += duration + completedTasks++ + } + } + } + + for _, worker := range mq.workers { + if worker.Status == "active" || worker.Status == "busy" { + stats.ActiveWorkers++ + } + } + + if completedTasks > 0 { + stats.AverageTaskTime = totalDuration / time.Duration(completedTasks) + } + + return stats +} + +// workerCanHandle checks if a worker can handle a specific task type +func (mq *MaintenanceQueue) workerCanHandle(taskType MaintenanceTaskType, capabilities []MaintenanceTaskType) bool { + for _, capability := range capabilities { + if capability == taskType { + return true + } + } + return false +} + +// canScheduleTaskNow determines if a task can be scheduled using task schedulers or fallback logic +func (mq *MaintenanceQueue) canScheduleTaskNow(task *MaintenanceTask) bool { + // Try task scheduling logic first + if mq.integration != nil { + // Get all running tasks and available workers + runningTasks := mq.getRunningTasks() + availableWorkers := mq.getAvailableWorkers() + + canSchedule := mq.integration.CanScheduleWithTaskSchedulers(task, runningTasks, availableWorkers) + glog.V(3).Infof("Task scheduler decision for task %s (%s): %v", task.ID, task.Type, canSchedule) + return canSchedule + } + + // Fallback to hardcoded logic + return mq.canExecuteTaskType(task.Type) +} + +// canExecuteTaskType checks if we can execute more tasks of this type (concurrency limits) - fallback logic +func (mq *MaintenanceQueue) canExecuteTaskType(taskType MaintenanceTaskType) bool { + runningCount := mq.GetRunningTaskCount(taskType) + maxConcurrent := mq.getMaxConcurrentForTaskType(taskType) + + return runningCount < maxConcurrent +} + +// getMaxConcurrentForTaskType returns the maximum concurrent tasks allowed for a task type +func (mq *MaintenanceQueue) getMaxConcurrentForTaskType(taskType MaintenanceTaskType) int { + // First try to get default from task scheduler + if mq.integration != nil { + if scheduler := mq.integration.GetTaskScheduler(taskType); scheduler != nil { + maxConcurrent := scheduler.GetMaxConcurrent() + if maxConcurrent > 0 { + glog.V(3).Infof("Using task scheduler max concurrent for %s: %d", taskType, maxConcurrent) + return maxConcurrent + } + } + } + + // Fallback to policy configuration if no scheduler available or scheduler doesn't provide default + if mq.policy != nil { + maxConcurrent := mq.policy.GetMaxConcurrent(taskType) + if maxConcurrent > 0 { + glog.V(3).Infof("Using policy configuration max concurrent for %s: %d", taskType, maxConcurrent) + return maxConcurrent + } + } + + // Ultimate fallback - minimal safe default + glog.V(2).Infof("No scheduler or policy configuration found for task type %s, using minimal default: 1", taskType) + return 1 +} + +// getRunningTasks returns all currently running tasks +func (mq *MaintenanceQueue) getRunningTasks() []*MaintenanceTask { + var runningTasks []*MaintenanceTask + for _, task := range mq.tasks { + if task.Status == TaskStatusAssigned || task.Status == TaskStatusInProgress { + runningTasks = append(runningTasks, task) + } + } + return runningTasks +} + +// getAvailableWorkers returns all workers that can take more work +func (mq *MaintenanceQueue) getAvailableWorkers() []*MaintenanceWorker { + var availableWorkers []*MaintenanceWorker + for _, worker := range mq.workers { + if worker.Status == "active" && worker.CurrentLoad < worker.MaxConcurrent { + availableWorkers = append(availableWorkers, worker) + } + } + return availableWorkers +} |
