diff options
Diffstat (limited to 'weed/admin/topology/internal.go')
| -rw-r--r-- | weed/admin/topology/internal.go | 114 |
1 files changed, 114 insertions, 0 deletions
diff --git a/weed/admin/topology/internal.go b/weed/admin/topology/internal.go new file mode 100644 index 000000000..72e37f6c1 --- /dev/null +++ b/weed/admin/topology/internal.go @@ -0,0 +1,114 @@ +package topology + +import ( + "fmt" + "time" +) + +// reassignTaskStates assigns tasks to the appropriate disks +func (at *ActiveTopology) reassignTaskStates() { + // Clear existing task assignments + for _, disk := range at.disks { + disk.pendingTasks = nil + disk.assignedTasks = nil + disk.recentTasks = nil + } + + // Reassign pending tasks + for _, task := range at.pendingTasks { + at.assignTaskToDisk(task) + } + + // Reassign assigned tasks + for _, task := range at.assignedTasks { + at.assignTaskToDisk(task) + } + + // Reassign recent tasks + for _, task := range at.recentTasks { + at.assignTaskToDisk(task) + } +} + +// assignTaskToDisk assigns a task to the appropriate disk(s) +func (at *ActiveTopology) assignTaskToDisk(task *taskState) { + addedDisks := make(map[string]bool) + + // Local helper function to assign task to a disk and avoid code duplication + assign := func(server string, diskID uint32) { + key := fmt.Sprintf("%s:%d", server, diskID) + if server == "" || addedDisks[key] { + return + } + if disk, exists := at.disks[key]; exists { + switch task.Status { + case TaskStatusPending: + disk.pendingTasks = append(disk.pendingTasks, task) + case TaskStatusInProgress: + disk.assignedTasks = append(disk.assignedTasks, task) + case TaskStatusCompleted: + disk.recentTasks = append(disk.recentTasks, task) + } + addedDisks[key] = true + } + } + + // Assign to all source disks + for _, source := range task.Sources { + assign(source.SourceServer, source.SourceDisk) + } + + // Assign to all destination disks (duplicates automatically avoided by helper) + for _, dest := range task.Destinations { + assign(dest.TargetServer, dest.TargetDisk) + } +} + +// isDiskAvailable checks if a disk can accept new tasks +func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool { + // Check if disk has too many pending and active tasks + activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks) + if activeLoad >= MaxConcurrentTasksPerDisk { + return false + } + + // Check for conflicting task types + for _, task := range disk.assignedTasks { + if at.areTaskTypesConflicting(task.TaskType, taskType) { + return false + } + } + + return true +} + +// areTaskTypesConflicting checks if two task types conflict +func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool { + // Examples of conflicting task types + conflictMap := map[TaskType][]TaskType{ + TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding}, + TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding}, + TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance}, + } + + if conflicts, exists := conflictMap[existing]; exists { + for _, conflictType := range conflicts { + if conflictType == new { + return true + } + } + } + + return false +} + +// cleanupRecentTasks removes old recent tasks +func (at *ActiveTopology) cleanupRecentTasks() { + cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second) + + for taskID, task := range at.recentTasks { + if task.CompletedAt.Before(cutoff) { + delete(at.recentTasks, taskID) + } + } +} |
