diff options
Diffstat (limited to 'weed/admin/topology/active_topology.go')
| -rw-r--r-- | weed/admin/topology/active_topology.go | 301 |
1 files changed, 0 insertions, 301 deletions
diff --git a/weed/admin/topology/active_topology.go b/weed/admin/topology/active_topology.go index 9ce63bfa7..bfa03a72f 100644 --- a/weed/admin/topology/active_topology.go +++ b/weed/admin/topology/active_topology.go @@ -332,307 +332,6 @@ type MultiDestinationPlan struct { SuccessfulDCs int `json:"successful_dcs"` } -// PlanBalanceDestination finds the best destination for a balance operation -func (at *ActiveTopology) PlanBalanceDestination(volumeID uint32, sourceNode string, sourceRack string, sourceDC string, volumeSize uint64) (*DestinationPlan, error) { - at.mutex.RLock() - defer at.mutex.RUnlock() - - // Get available disks, excluding the source node - availableDisks := at.getAvailableDisksForPlanning(TaskTypeBalance, sourceNode) - if len(availableDisks) == 0 { - return nil, fmt.Errorf("no available disks for balance operation") - } - - // Score each disk for balance placement - bestDisk := at.selectBestBalanceDestination(availableDisks, sourceRack, sourceDC, volumeSize) - if bestDisk == nil { - return nil, fmt.Errorf("no suitable destination found for balance operation") - } - - return &DestinationPlan{ - TargetNode: bestDisk.NodeID, - TargetDisk: bestDisk.DiskID, - TargetRack: bestDisk.Rack, - TargetDC: bestDisk.DataCenter, - ExpectedSize: volumeSize, - PlacementScore: at.calculatePlacementScore(bestDisk, sourceRack, sourceDC), - Conflicts: at.checkPlacementConflicts(bestDisk, TaskTypeBalance), - }, nil -} - -// PlanECDestinations finds multiple destinations for EC shard distribution -func (at *ActiveTopology) PlanECDestinations(volumeID uint32, sourceNode string, sourceRack string, sourceDC string, shardsNeeded int) (*MultiDestinationPlan, error) { - at.mutex.RLock() - defer at.mutex.RUnlock() - - // Get available disks for EC placement - availableDisks := at.getAvailableDisksForPlanning(TaskTypeErasureCoding, "") - if len(availableDisks) < shardsNeeded { - return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d", shardsNeeded, len(availableDisks)) - } - - // Select best disks for EC placement with rack/DC diversity - selectedDisks := at.selectBestECDestinations(availableDisks, sourceRack, sourceDC, shardsNeeded) - if len(selectedDisks) < shardsNeeded { - return nil, fmt.Errorf("could not find %d suitable destinations for EC placement", shardsNeeded) - } - - var plans []*DestinationPlan - rackCount := make(map[string]int) - dcCount := make(map[string]int) - - for _, disk := range selectedDisks { - plan := &DestinationPlan{ - TargetNode: disk.NodeID, - TargetDisk: disk.DiskID, - TargetRack: disk.Rack, - TargetDC: disk.DataCenter, - ExpectedSize: 0, // EC shards don't have predetermined size - PlacementScore: at.calculatePlacementScore(disk, sourceRack, sourceDC), - Conflicts: at.checkPlacementConflicts(disk, TaskTypeErasureCoding), - } - plans = append(plans, plan) - - // Count rack and DC diversity - rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) - rackCount[rackKey]++ - dcCount[disk.DataCenter]++ - } - - return &MultiDestinationPlan{ - Plans: plans, - TotalShards: len(plans), - SuccessfulRack: len(rackCount), - SuccessfulDCs: len(dcCount), - }, nil -} - -// getAvailableDisksForPlanning returns disks available for destination planning -func (at *ActiveTopology) getAvailableDisksForPlanning(taskType TaskType, excludeNodeID string) []*activeDisk { - var available []*activeDisk - - for _, disk := range at.disks { - if excludeNodeID != "" && disk.NodeID == excludeNodeID { - continue // Skip excluded node - } - - if at.isDiskAvailable(disk, taskType) { - available = append(available, disk) - } - } - - return available -} - -// selectBestBalanceDestination selects the best disk for balance operation -func (at *ActiveTopology) selectBestBalanceDestination(disks []*activeDisk, sourceRack string, sourceDC string, volumeSize uint64) *activeDisk { - if len(disks) == 0 { - return nil - } - - var bestDisk *activeDisk - bestScore := -1.0 - - for _, disk := range disks { - score := at.calculateBalanceScore(disk, sourceRack, sourceDC, volumeSize) - if score > bestScore { - bestScore = score - bestDisk = disk - } - } - - return bestDisk -} - -// selectBestECDestinations selects multiple disks for EC shard placement with diversity -func (at *ActiveTopology) selectBestECDestinations(disks []*activeDisk, sourceRack string, sourceDC string, shardsNeeded int) []*activeDisk { - if len(disks) == 0 { - return nil - } - - // Group disks by rack and DC for diversity - rackGroups := make(map[string][]*activeDisk) - for _, disk := range disks { - rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) - rackGroups[rackKey] = append(rackGroups[rackKey], disk) - } - - var selected []*activeDisk - usedRacks := make(map[string]bool) - - // First pass: select one disk from each rack for maximum diversity - for rackKey, rackDisks := range rackGroups { - if len(selected) >= shardsNeeded { - break - } - - // Select best disk from this rack - bestDisk := at.selectBestFromRack(rackDisks, sourceRack, sourceDC) - if bestDisk != nil { - selected = append(selected, bestDisk) - usedRacks[rackKey] = true - } - } - - // Second pass: if we need more disks, select from racks we've already used - if len(selected) < shardsNeeded { - for _, disk := range disks { - if len(selected) >= shardsNeeded { - break - } - - // Skip if already selected - alreadySelected := false - for _, sel := range selected { - if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID { - alreadySelected = true - break - } - } - - if !alreadySelected && at.isDiskAvailable(disk, TaskTypeErasureCoding) { - selected = append(selected, disk) - } - } - } - - return selected -} - -// selectBestFromRack selects the best disk from a rack -func (at *ActiveTopology) selectBestFromRack(disks []*activeDisk, sourceRack string, sourceDC string) *activeDisk { - if len(disks) == 0 { - return nil - } - - var bestDisk *activeDisk - bestScore := -1.0 - - for _, disk := range disks { - if !at.isDiskAvailable(disk, TaskTypeErasureCoding) { - continue - } - - score := at.calculateECScore(disk, sourceRack, sourceDC) - if score > bestScore { - bestScore = score - bestDisk = disk - } - } - - return bestDisk -} - -// calculateBalanceScore calculates placement score for balance operations -func (at *ActiveTopology) calculateBalanceScore(disk *activeDisk, sourceRack string, sourceDC string, volumeSize uint64) float64 { - score := 0.0 - - // Prefer disks with lower load - activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks) - score += (2.0 - float64(activeLoad)) * 40.0 // Max 80 points for load - - // Prefer disks with more free space - if disk.DiskInfo.DiskInfo.MaxVolumeCount > 0 { - freeRatio := float64(disk.DiskInfo.DiskInfo.MaxVolumeCount-disk.DiskInfo.DiskInfo.VolumeCount) / float64(disk.DiskInfo.DiskInfo.MaxVolumeCount) - score += freeRatio * 20.0 // Max 20 points for free space - } - - // Rack diversity bonus (prefer different rack) - if disk.Rack != sourceRack { - score += 10.0 - } - - // DC diversity bonus (prefer different DC) - if disk.DataCenter != sourceDC { - score += 5.0 - } - - return score -} - -// calculateECScore calculates placement score for EC operations -func (at *ActiveTopology) calculateECScore(disk *activeDisk, sourceRack string, sourceDC string) float64 { - score := 0.0 - - // Prefer disks with lower load - activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks) - score += (2.0 - float64(activeLoad)) * 30.0 // Max 60 points for load - - // Prefer disks with more free space - if disk.DiskInfo.DiskInfo.MaxVolumeCount > 0 { - freeRatio := float64(disk.DiskInfo.DiskInfo.MaxVolumeCount-disk.DiskInfo.DiskInfo.VolumeCount) / float64(disk.DiskInfo.DiskInfo.MaxVolumeCount) - score += freeRatio * 20.0 // Max 20 points for free space - } - - // Strong rack diversity preference for EC - if disk.Rack != sourceRack { - score += 20.0 - } - - // Strong DC diversity preference for EC - if disk.DataCenter != sourceDC { - score += 15.0 - } - - return score -} - -// calculatePlacementScore calculates overall placement quality score -func (at *ActiveTopology) calculatePlacementScore(disk *activeDisk, sourceRack string, sourceDC string) float64 { - score := 0.0 - - // Load factor - activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks) - loadScore := (2.0 - float64(activeLoad)) / 2.0 // Normalize to 0-1 - score += loadScore * 0.4 - - // Capacity factor - if disk.DiskInfo.DiskInfo.MaxVolumeCount > 0 { - freeRatio := float64(disk.DiskInfo.DiskInfo.MaxVolumeCount-disk.DiskInfo.DiskInfo.VolumeCount) / float64(disk.DiskInfo.DiskInfo.MaxVolumeCount) - score += freeRatio * 0.3 - } - - // Diversity factor - diversityScore := 0.0 - if disk.Rack != sourceRack { - diversityScore += 0.5 - } - if disk.DataCenter != sourceDC { - diversityScore += 0.5 - } - score += diversityScore * 0.3 - - return score // Score between 0.0 and 1.0 -} - -// checkPlacementConflicts checks for placement rule violations -func (at *ActiveTopology) checkPlacementConflicts(disk *activeDisk, taskType TaskType) []string { - var conflicts []string - - // Check load limits - activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks) - if activeLoad >= 2 { - conflicts = append(conflicts, fmt.Sprintf("disk_load_high_%d", activeLoad)) - } - - // Check capacity limits - if disk.DiskInfo.DiskInfo.MaxVolumeCount > 0 { - usageRatio := float64(disk.DiskInfo.DiskInfo.VolumeCount) / float64(disk.DiskInfo.DiskInfo.MaxVolumeCount) - if usageRatio > 0.9 { - conflicts = append(conflicts, "disk_capacity_high") - } - } - - // Check for conflicting task types - for _, task := range disk.assignedTasks { - if at.areTaskTypesConflicting(task.TaskType, taskType) { - conflicts = append(conflicts, fmt.Sprintf("task_conflict_%s", task.TaskType)) - } - } - - return conflicts -} - // Private methods // reassignTaskStates assigns tasks to the appropriate disks |
