aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/topology/internal.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/topology/internal.go')
-rw-r--r--weed/admin/topology/internal.go114
1 files changed, 114 insertions, 0 deletions
diff --git a/weed/admin/topology/internal.go b/weed/admin/topology/internal.go
new file mode 100644
index 000000000..72e37f6c1
--- /dev/null
+++ b/weed/admin/topology/internal.go
@@ -0,0 +1,114 @@
+package topology
+
+import (
+ "fmt"
+ "time"
+)
+
+// reassignTaskStates assigns tasks to the appropriate disks
+func (at *ActiveTopology) reassignTaskStates() {
+ // Clear existing task assignments
+ for _, disk := range at.disks {
+ disk.pendingTasks = nil
+ disk.assignedTasks = nil
+ disk.recentTasks = nil
+ }
+
+ // Reassign pending tasks
+ for _, task := range at.pendingTasks {
+ at.assignTaskToDisk(task)
+ }
+
+ // Reassign assigned tasks
+ for _, task := range at.assignedTasks {
+ at.assignTaskToDisk(task)
+ }
+
+ // Reassign recent tasks
+ for _, task := range at.recentTasks {
+ at.assignTaskToDisk(task)
+ }
+}
+
+// assignTaskToDisk assigns a task to the appropriate disk(s)
+func (at *ActiveTopology) assignTaskToDisk(task *taskState) {
+ addedDisks := make(map[string]bool)
+
+ // Local helper function to assign task to a disk and avoid code duplication
+ assign := func(server string, diskID uint32) {
+ key := fmt.Sprintf("%s:%d", server, diskID)
+ if server == "" || addedDisks[key] {
+ return
+ }
+ if disk, exists := at.disks[key]; exists {
+ switch task.Status {
+ case TaskStatusPending:
+ disk.pendingTasks = append(disk.pendingTasks, task)
+ case TaskStatusInProgress:
+ disk.assignedTasks = append(disk.assignedTasks, task)
+ case TaskStatusCompleted:
+ disk.recentTasks = append(disk.recentTasks, task)
+ }
+ addedDisks[key] = true
+ }
+ }
+
+ // Assign to all source disks
+ for _, source := range task.Sources {
+ assign(source.SourceServer, source.SourceDisk)
+ }
+
+ // Assign to all destination disks (duplicates automatically avoided by helper)
+ for _, dest := range task.Destinations {
+ assign(dest.TargetServer, dest.TargetDisk)
+ }
+}
+
+// isDiskAvailable checks if a disk can accept new tasks
+func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool {
+ // Check if disk has too many pending and active tasks
+ activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
+ if activeLoad >= MaxConcurrentTasksPerDisk {
+ return false
+ }
+
+ // Check for conflicting task types
+ for _, task := range disk.assignedTasks {
+ if at.areTaskTypesConflicting(task.TaskType, taskType) {
+ return false
+ }
+ }
+
+ return true
+}
+
+// areTaskTypesConflicting checks if two task types conflict
+func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool {
+ // Examples of conflicting task types
+ conflictMap := map[TaskType][]TaskType{
+ TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding},
+ TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding},
+ TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance},
+ }
+
+ if conflicts, exists := conflictMap[existing]; exists {
+ for _, conflictType := range conflicts {
+ if conflictType == new {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// cleanupRecentTasks removes old recent tasks
+func (at *ActiveTopology) cleanupRecentTasks() {
+ cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second)
+
+ for taskID, task := range at.recentTasks {
+ if task.CompletedAt.Before(cutoff) {
+ delete(at.recentTasks, taskID)
+ }
+ }
+}