aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/topology/active_topology.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/topology/active_topology.go')
-rw-r--r--weed/admin/topology/active_topology.go741
1 files changed, 741 insertions, 0 deletions
diff --git a/weed/admin/topology/active_topology.go b/weed/admin/topology/active_topology.go
new file mode 100644
index 000000000..9ce63bfa7
--- /dev/null
+++ b/weed/admin/topology/active_topology.go
@@ -0,0 +1,741 @@
+package topology
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+)
+
+// TaskType represents different types of maintenance operations
+type TaskType string
+
+// TaskStatus represents the current status of a task
+type TaskStatus string
+
+// Common task type constants
+const (
+ TaskTypeVacuum TaskType = "vacuum"
+ TaskTypeBalance TaskType = "balance"
+ TaskTypeErasureCoding TaskType = "erasure_coding"
+ TaskTypeReplication TaskType = "replication"
+)
+
+// Common task status constants
+const (
+ TaskStatusPending TaskStatus = "pending"
+ TaskStatusInProgress TaskStatus = "in_progress"
+ TaskStatusCompleted TaskStatus = "completed"
+)
+
+// taskState represents the current state of tasks affecting the topology (internal)
+type taskState struct {
+ VolumeID uint32 `json:"volume_id"`
+ TaskType TaskType `json:"task_type"`
+ SourceServer string `json:"source_server"`
+ SourceDisk uint32 `json:"source_disk"`
+ TargetServer string `json:"target_server,omitempty"`
+ TargetDisk uint32 `json:"target_disk,omitempty"`
+ Status TaskStatus `json:"status"`
+ StartedAt time.Time `json:"started_at"`
+ CompletedAt time.Time `json:"completed_at,omitempty"`
+}
+
+// DiskInfo represents a disk with its current state and ongoing tasks (public for external access)
+type DiskInfo struct {
+ NodeID string `json:"node_id"`
+ DiskID uint32 `json:"disk_id"`
+ DiskType string `json:"disk_type"`
+ DataCenter string `json:"data_center"`
+ Rack string `json:"rack"`
+ DiskInfo *master_pb.DiskInfo `json:"disk_info"`
+ LoadCount int `json:"load_count"` // Number of active tasks
+}
+
+// activeDisk represents internal disk state (private)
+type activeDisk struct {
+ *DiskInfo
+ pendingTasks []*taskState
+ assignedTasks []*taskState
+ recentTasks []*taskState // Completed in last N seconds
+}
+
+// activeNode represents a node with its disks (private)
+type activeNode struct {
+ nodeID string
+ dataCenter string
+ rack string
+ nodeInfo *master_pb.DataNodeInfo
+ disks map[uint32]*activeDisk // DiskID -> activeDisk
+}
+
+// ActiveTopology provides a real-time view of cluster state with task awareness
+type ActiveTopology struct {
+ // Core topology from master
+ topologyInfo *master_pb.TopologyInfo
+ lastUpdated time.Time
+
+ // Structured topology for easy access (private)
+ nodes map[string]*activeNode // NodeID -> activeNode
+ disks map[string]*activeDisk // "NodeID:DiskID" -> activeDisk
+
+ // Task states affecting the topology (private)
+ pendingTasks map[string]*taskState
+ assignedTasks map[string]*taskState
+ recentTasks map[string]*taskState
+
+ // Configuration
+ recentTaskWindowSeconds int
+
+ // Synchronization
+ mutex sync.RWMutex
+}
+
+// NewActiveTopology creates a new ActiveTopology instance
+func NewActiveTopology(recentTaskWindowSeconds int) *ActiveTopology {
+ if recentTaskWindowSeconds <= 0 {
+ recentTaskWindowSeconds = 10 // Default 10 seconds
+ }
+
+ return &ActiveTopology{
+ nodes: make(map[string]*activeNode),
+ disks: make(map[string]*activeDisk),
+ pendingTasks: make(map[string]*taskState),
+ assignedTasks: make(map[string]*taskState),
+ recentTasks: make(map[string]*taskState),
+ recentTaskWindowSeconds: recentTaskWindowSeconds,
+ }
+}
+
+// UpdateTopology updates the topology information from master
+func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error {
+ at.mutex.Lock()
+ defer at.mutex.Unlock()
+
+ at.topologyInfo = topologyInfo
+ at.lastUpdated = time.Now()
+
+ // Rebuild structured topology
+ at.nodes = make(map[string]*activeNode)
+ at.disks = make(map[string]*activeDisk)
+
+ for _, dc := range topologyInfo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, nodeInfo := range rack.DataNodeInfos {
+ node := &activeNode{
+ nodeID: nodeInfo.Id,
+ dataCenter: dc.Id,
+ rack: rack.Id,
+ nodeInfo: nodeInfo,
+ disks: make(map[uint32]*activeDisk),
+ }
+
+ // Add disks for this node
+ for diskType, diskInfo := range nodeInfo.DiskInfos {
+ disk := &activeDisk{
+ DiskInfo: &DiskInfo{
+ NodeID: nodeInfo.Id,
+ DiskID: diskInfo.DiskId,
+ DiskType: diskType,
+ DataCenter: dc.Id,
+ Rack: rack.Id,
+ DiskInfo: diskInfo,
+ },
+ }
+
+ diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
+ node.disks[diskInfo.DiskId] = disk
+ at.disks[diskKey] = disk
+ }
+
+ at.nodes[nodeInfo.Id] = node
+ }
+ }
+ }
+
+ // Reassign task states to updated topology
+ at.reassignTaskStates()
+
+ glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks", len(at.nodes), len(at.disks))
+ return nil
+}
+
+// AddPendingTask adds a pending task to the topology
+func (at *ActiveTopology) AddPendingTask(taskID string, taskType TaskType, volumeID uint32,
+ sourceServer string, sourceDisk uint32, targetServer string, targetDisk uint32) {
+ at.mutex.Lock()
+ defer at.mutex.Unlock()
+
+ task := &taskState{
+ VolumeID: volumeID,
+ TaskType: taskType,
+ SourceServer: sourceServer,
+ SourceDisk: sourceDisk,
+ TargetServer: targetServer,
+ TargetDisk: targetDisk,
+ Status: TaskStatusPending,
+ StartedAt: time.Now(),
+ }
+
+ at.pendingTasks[taskID] = task
+ at.assignTaskToDisk(task)
+}
+
+// AssignTask moves a task from pending to assigned
+func (at *ActiveTopology) AssignTask(taskID string) error {
+ at.mutex.Lock()
+ defer at.mutex.Unlock()
+
+ task, exists := at.pendingTasks[taskID]
+ if !exists {
+ return fmt.Errorf("pending task %s not found", taskID)
+ }
+
+ delete(at.pendingTasks, taskID)
+ task.Status = TaskStatusInProgress
+ at.assignedTasks[taskID] = task
+ at.reassignTaskStates()
+
+ return nil
+}
+
+// CompleteTask moves a task from assigned to recent
+func (at *ActiveTopology) CompleteTask(taskID string) error {
+ at.mutex.Lock()
+ defer at.mutex.Unlock()
+
+ task, exists := at.assignedTasks[taskID]
+ if !exists {
+ return fmt.Errorf("assigned task %s not found", taskID)
+ }
+
+ delete(at.assignedTasks, taskID)
+ task.Status = TaskStatusCompleted
+ task.CompletedAt = time.Now()
+ at.recentTasks[taskID] = task
+ at.reassignTaskStates()
+
+ // Clean up old recent tasks
+ at.cleanupRecentTasks()
+
+ return nil
+}
+
+// GetAvailableDisks returns disks that can accept new tasks of the given type
+func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ var available []*DiskInfo
+
+ for _, disk := range at.disks {
+ if disk.NodeID == excludeNodeID {
+ continue // Skip excluded node
+ }
+
+ if at.isDiskAvailable(disk, taskType) {
+ // Create a copy with current load count
+ diskCopy := *disk.DiskInfo
+ diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
+ available = append(available, &diskCopy)
+ }
+ }
+
+ return available
+}
+
+// GetDiskLoad returns the current load on a disk (number of active tasks)
+func (at *ActiveTopology) GetDiskLoad(nodeID string, diskID uint32) int {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
+ disk, exists := at.disks[diskKey]
+ if !exists {
+ return 0
+ }
+
+ return len(disk.pendingTasks) + len(disk.assignedTasks)
+}
+
+// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection)
+func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ for _, task := range at.recentTasks {
+ if task.VolumeID == volumeID && task.TaskType == taskType {
+ return true
+ }
+ }
+
+ return false
+}
+
+// GetAllNodes returns information about all nodes (public interface)
+func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ result := make(map[string]*master_pb.DataNodeInfo)
+ for nodeID, node := range at.nodes {
+ result[nodeID] = node.nodeInfo
+ }
+ return result
+}
+
+// GetTopologyInfo returns the current topology information (read-only access)
+func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+ return at.topologyInfo
+}
+
+// GetNodeDisks returns all disks for a specific node
+func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ node, exists := at.nodes[nodeID]
+ if !exists {
+ return nil
+ }
+
+ var disks []*DiskInfo
+ for _, disk := range node.disks {
+ diskCopy := *disk.DiskInfo
+ diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
+ disks = append(disks, &diskCopy)
+ }
+
+ return disks
+}
+
+// DestinationPlan represents a planned destination for a volume/shard operation
+type DestinationPlan struct {
+ TargetNode string `json:"target_node"`
+ TargetDisk uint32 `json:"target_disk"`
+ TargetRack string `json:"target_rack"`
+ TargetDC string `json:"target_dc"`
+ ExpectedSize uint64 `json:"expected_size"`
+ PlacementScore float64 `json:"placement_score"`
+ Conflicts []string `json:"conflicts"`
+}
+
+// MultiDestinationPlan represents multiple planned destinations for operations like EC
+type MultiDestinationPlan struct {
+ Plans []*DestinationPlan `json:"plans"`
+ TotalShards int `json:"total_shards"`
+ SuccessfulRack int `json:"successful_racks"`
+ SuccessfulDCs int `json:"successful_dcs"`
+}
+
+// PlanBalanceDestination finds the best destination for a balance operation
+func (at *ActiveTopology) PlanBalanceDestination(volumeID uint32, sourceNode string, sourceRack string, sourceDC string, volumeSize uint64) (*DestinationPlan, error) {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ // Get available disks, excluding the source node
+ availableDisks := at.getAvailableDisksForPlanning(TaskTypeBalance, sourceNode)
+ if len(availableDisks) == 0 {
+ return nil, fmt.Errorf("no available disks for balance operation")
+ }
+
+ // Score each disk for balance placement
+ bestDisk := at.selectBestBalanceDestination(availableDisks, sourceRack, sourceDC, volumeSize)
+ if bestDisk == nil {
+ return nil, fmt.Errorf("no suitable destination found for balance operation")
+ }
+
+ return &DestinationPlan{
+ TargetNode: bestDisk.NodeID,
+ TargetDisk: bestDisk.DiskID,
+ TargetRack: bestDisk.Rack,
+ TargetDC: bestDisk.DataCenter,
+ ExpectedSize: volumeSize,
+ PlacementScore: at.calculatePlacementScore(bestDisk, sourceRack, sourceDC),
+ Conflicts: at.checkPlacementConflicts(bestDisk, TaskTypeBalance),
+ }, nil
+}
+
+// PlanECDestinations finds multiple destinations for EC shard distribution
+func (at *ActiveTopology) PlanECDestinations(volumeID uint32, sourceNode string, sourceRack string, sourceDC string, shardsNeeded int) (*MultiDestinationPlan, error) {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ // Get available disks for EC placement
+ availableDisks := at.getAvailableDisksForPlanning(TaskTypeErasureCoding, "")
+ if len(availableDisks) < shardsNeeded {
+ return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d", shardsNeeded, len(availableDisks))
+ }
+
+ // Select best disks for EC placement with rack/DC diversity
+ selectedDisks := at.selectBestECDestinations(availableDisks, sourceRack, sourceDC, shardsNeeded)
+ if len(selectedDisks) < shardsNeeded {
+ return nil, fmt.Errorf("could not find %d suitable destinations for EC placement", shardsNeeded)
+ }
+
+ var plans []*DestinationPlan
+ rackCount := make(map[string]int)
+ dcCount := make(map[string]int)
+
+ for _, disk := range selectedDisks {
+ plan := &DestinationPlan{
+ TargetNode: disk.NodeID,
+ TargetDisk: disk.DiskID,
+ TargetRack: disk.Rack,
+ TargetDC: disk.DataCenter,
+ ExpectedSize: 0, // EC shards don't have predetermined size
+ PlacementScore: at.calculatePlacementScore(disk, sourceRack, sourceDC),
+ Conflicts: at.checkPlacementConflicts(disk, TaskTypeErasureCoding),
+ }
+ plans = append(plans, plan)
+
+ // Count rack and DC diversity
+ rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
+ rackCount[rackKey]++
+ dcCount[disk.DataCenter]++
+ }
+
+ return &MultiDestinationPlan{
+ Plans: plans,
+ TotalShards: len(plans),
+ SuccessfulRack: len(rackCount),
+ SuccessfulDCs: len(dcCount),
+ }, nil
+}
+
+// getAvailableDisksForPlanning returns disks available for destination planning
+func (at *ActiveTopology) getAvailableDisksForPlanning(taskType TaskType, excludeNodeID string) []*activeDisk {
+ var available []*activeDisk
+
+ for _, disk := range at.disks {
+ if excludeNodeID != "" && disk.NodeID == excludeNodeID {
+ continue // Skip excluded node
+ }
+
+ if at.isDiskAvailable(disk, taskType) {
+ available = append(available, disk)
+ }
+ }
+
+ return available
+}
+
+// selectBestBalanceDestination selects the best disk for balance operation
+func (at *ActiveTopology) selectBestBalanceDestination(disks []*activeDisk, sourceRack string, sourceDC string, volumeSize uint64) *activeDisk {
+ if len(disks) == 0 {
+ return nil
+ }
+
+ var bestDisk *activeDisk
+ bestScore := -1.0
+
+ for _, disk := range disks {
+ score := at.calculateBalanceScore(disk, sourceRack, sourceDC, volumeSize)
+ if score > bestScore {
+ bestScore = score
+ bestDisk = disk
+ }
+ }
+
+ return bestDisk
+}
+
+// selectBestECDestinations selects multiple disks for EC shard placement with diversity
+func (at *ActiveTopology) selectBestECDestinations(disks []*activeDisk, sourceRack string, sourceDC string, shardsNeeded int) []*activeDisk {
+ if len(disks) == 0 {
+ return nil
+ }
+
+ // Group disks by rack and DC for diversity
+ rackGroups := make(map[string][]*activeDisk)
+ for _, disk := range disks {
+ rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
+ rackGroups[rackKey] = append(rackGroups[rackKey], disk)
+ }
+
+ var selected []*activeDisk
+ usedRacks := make(map[string]bool)
+
+ // First pass: select one disk from each rack for maximum diversity
+ for rackKey, rackDisks := range rackGroups {
+ if len(selected) >= shardsNeeded {
+ break
+ }
+
+ // Select best disk from this rack
+ bestDisk := at.selectBestFromRack(rackDisks, sourceRack, sourceDC)
+ if bestDisk != nil {
+ selected = append(selected, bestDisk)
+ usedRacks[rackKey] = true
+ }
+ }
+
+ // Second pass: if we need more disks, select from racks we've already used
+ if len(selected) < shardsNeeded {
+ for _, disk := range disks {
+ if len(selected) >= shardsNeeded {
+ break
+ }
+
+ // Skip if already selected
+ alreadySelected := false
+ for _, sel := range selected {
+ if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID {
+ alreadySelected = true
+ break
+ }
+ }
+
+ if !alreadySelected && at.isDiskAvailable(disk, TaskTypeErasureCoding) {
+ selected = append(selected, disk)
+ }
+ }
+ }
+
+ return selected
+}
+
+// selectBestFromRack selects the best disk from a rack
+func (at *ActiveTopology) selectBestFromRack(disks []*activeDisk, sourceRack string, sourceDC string) *activeDisk {
+ if len(disks) == 0 {
+ return nil
+ }
+
+ var bestDisk *activeDisk
+ bestScore := -1.0
+
+ for _, disk := range disks {
+ if !at.isDiskAvailable(disk, TaskTypeErasureCoding) {
+ continue
+ }
+
+ score := at.calculateECScore(disk, sourceRack, sourceDC)
+ if score > bestScore {
+ bestScore = score
+ bestDisk = disk
+ }
+ }
+
+ return bestDisk
+}
+
+// calculateBalanceScore calculates placement score for balance operations
+func (at *ActiveTopology) calculateBalanceScore(disk *activeDisk, sourceRack string, sourceDC string, volumeSize uint64) float64 {
+ score := 0.0
+
+ // Prefer disks with lower load
+ activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
+ score += (2.0 - float64(activeLoad)) * 40.0 // Max 80 points for load
+
+ // Prefer disks with more free space
+ if disk.DiskInfo.DiskInfo.MaxVolumeCount > 0 {
+ freeRatio := float64(disk.DiskInfo.DiskInfo.MaxVolumeCount-disk.DiskInfo.DiskInfo.VolumeCount) / float64(disk.DiskInfo.DiskInfo.MaxVolumeCount)
+ score += freeRatio * 20.0 // Max 20 points for free space
+ }
+
+ // Rack diversity bonus (prefer different rack)
+ if disk.Rack != sourceRack {
+ score += 10.0
+ }
+
+ // DC diversity bonus (prefer different DC)
+ if disk.DataCenter != sourceDC {
+ score += 5.0
+ }
+
+ return score
+}
+
+// calculateECScore calculates placement score for EC operations
+func (at *ActiveTopology) calculateECScore(disk *activeDisk, sourceRack string, sourceDC string) float64 {
+ score := 0.0
+
+ // Prefer disks with lower load
+ activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
+ score += (2.0 - float64(activeLoad)) * 30.0 // Max 60 points for load
+
+ // Prefer disks with more free space
+ if disk.DiskInfo.DiskInfo.MaxVolumeCount > 0 {
+ freeRatio := float64(disk.DiskInfo.DiskInfo.MaxVolumeCount-disk.DiskInfo.DiskInfo.VolumeCount) / float64(disk.DiskInfo.DiskInfo.MaxVolumeCount)
+ score += freeRatio * 20.0 // Max 20 points for free space
+ }
+
+ // Strong rack diversity preference for EC
+ if disk.Rack != sourceRack {
+ score += 20.0
+ }
+
+ // Strong DC diversity preference for EC
+ if disk.DataCenter != sourceDC {
+ score += 15.0
+ }
+
+ return score
+}
+
+// calculatePlacementScore calculates overall placement quality score
+func (at *ActiveTopology) calculatePlacementScore(disk *activeDisk, sourceRack string, sourceDC string) float64 {
+ score := 0.0
+
+ // Load factor
+ activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
+ loadScore := (2.0 - float64(activeLoad)) / 2.0 // Normalize to 0-1
+ score += loadScore * 0.4
+
+ // Capacity factor
+ if disk.DiskInfo.DiskInfo.MaxVolumeCount > 0 {
+ freeRatio := float64(disk.DiskInfo.DiskInfo.MaxVolumeCount-disk.DiskInfo.DiskInfo.VolumeCount) / float64(disk.DiskInfo.DiskInfo.MaxVolumeCount)
+ score += freeRatio * 0.3
+ }
+
+ // Diversity factor
+ diversityScore := 0.0
+ if disk.Rack != sourceRack {
+ diversityScore += 0.5
+ }
+ if disk.DataCenter != sourceDC {
+ diversityScore += 0.5
+ }
+ score += diversityScore * 0.3
+
+ return score // Score between 0.0 and 1.0
+}
+
+// checkPlacementConflicts checks for placement rule violations
+func (at *ActiveTopology) checkPlacementConflicts(disk *activeDisk, taskType TaskType) []string {
+ var conflicts []string
+
+ // Check load limits
+ activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
+ if activeLoad >= 2 {
+ conflicts = append(conflicts, fmt.Sprintf("disk_load_high_%d", activeLoad))
+ }
+
+ // Check capacity limits
+ if disk.DiskInfo.DiskInfo.MaxVolumeCount > 0 {
+ usageRatio := float64(disk.DiskInfo.DiskInfo.VolumeCount) / float64(disk.DiskInfo.DiskInfo.MaxVolumeCount)
+ if usageRatio > 0.9 {
+ conflicts = append(conflicts, "disk_capacity_high")
+ }
+ }
+
+ // Check for conflicting task types
+ for _, task := range disk.assignedTasks {
+ if at.areTaskTypesConflicting(task.TaskType, taskType) {
+ conflicts = append(conflicts, fmt.Sprintf("task_conflict_%s", task.TaskType))
+ }
+ }
+
+ return conflicts
+}
+
+// Private methods
+
+// reassignTaskStates assigns tasks to the appropriate disks
+func (at *ActiveTopology) reassignTaskStates() {
+ // Clear existing task assignments
+ for _, disk := range at.disks {
+ disk.pendingTasks = nil
+ disk.assignedTasks = nil
+ disk.recentTasks = nil
+ }
+
+ // Reassign pending tasks
+ for _, task := range at.pendingTasks {
+ at.assignTaskToDisk(task)
+ }
+
+ // Reassign assigned tasks
+ for _, task := range at.assignedTasks {
+ at.assignTaskToDisk(task)
+ }
+
+ // Reassign recent tasks
+ for _, task := range at.recentTasks {
+ at.assignTaskToDisk(task)
+ }
+}
+
+// assignTaskToDisk assigns a task to the appropriate disk(s)
+func (at *ActiveTopology) assignTaskToDisk(task *taskState) {
+ // Assign to source disk
+ sourceKey := fmt.Sprintf("%s:%d", task.SourceServer, task.SourceDisk)
+ if sourceDisk, exists := at.disks[sourceKey]; exists {
+ switch task.Status {
+ case TaskStatusPending:
+ sourceDisk.pendingTasks = append(sourceDisk.pendingTasks, task)
+ case TaskStatusInProgress:
+ sourceDisk.assignedTasks = append(sourceDisk.assignedTasks, task)
+ case TaskStatusCompleted:
+ sourceDisk.recentTasks = append(sourceDisk.recentTasks, task)
+ }
+ }
+
+ // Assign to target disk if it exists and is different from source
+ if task.TargetServer != "" && (task.TargetServer != task.SourceServer || task.TargetDisk != task.SourceDisk) {
+ targetKey := fmt.Sprintf("%s:%d", task.TargetServer, task.TargetDisk)
+ if targetDisk, exists := at.disks[targetKey]; exists {
+ switch task.Status {
+ case TaskStatusPending:
+ targetDisk.pendingTasks = append(targetDisk.pendingTasks, task)
+ case TaskStatusInProgress:
+ targetDisk.assignedTasks = append(targetDisk.assignedTasks, task)
+ case TaskStatusCompleted:
+ targetDisk.recentTasks = append(targetDisk.recentTasks, task)
+ }
+ }
+ }
+}
+
+// isDiskAvailable checks if a disk can accept new tasks
+func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool {
+ // Check if disk has too many active tasks
+ activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
+ if activeLoad >= 2 { // Max 2 concurrent tasks per disk
+ return false
+ }
+
+ // Check for conflicting task types
+ for _, task := range disk.assignedTasks {
+ if at.areTaskTypesConflicting(task.TaskType, taskType) {
+ return false
+ }
+ }
+
+ return true
+}
+
+// areTaskTypesConflicting checks if two task types conflict
+func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool {
+ // Examples of conflicting task types
+ conflictMap := map[TaskType][]TaskType{
+ TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding},
+ TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding},
+ TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance},
+ }
+
+ if conflicts, exists := conflictMap[existing]; exists {
+ for _, conflictType := range conflicts {
+ if conflictType == new {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// cleanupRecentTasks removes old recent tasks
+func (at *ActiveTopology) cleanupRecentTasks() {
+ cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second)
+
+ for taskID, task := range at.recentTasks {
+ if task.CompletedAt.Before(cutoff) {
+ delete(at.recentTasks, taskID)
+ }
+ }
+}