aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/topology/active_topology.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/topology/active_topology.go')
-rw-r--r--weed/admin/topology/active_topology.go301
1 files changed, 0 insertions, 301 deletions
diff --git a/weed/admin/topology/active_topology.go b/weed/admin/topology/active_topology.go
index 9ce63bfa7..bfa03a72f 100644
--- a/weed/admin/topology/active_topology.go
+++ b/weed/admin/topology/active_topology.go
@@ -332,307 +332,6 @@ type MultiDestinationPlan struct {
SuccessfulDCs int `json:"successful_dcs"`
}
-// PlanBalanceDestination finds the best destination for a balance operation
-func (at *ActiveTopology) PlanBalanceDestination(volumeID uint32, sourceNode string, sourceRack string, sourceDC string, volumeSize uint64) (*DestinationPlan, error) {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
-
- // Get available disks, excluding the source node
- availableDisks := at.getAvailableDisksForPlanning(TaskTypeBalance, sourceNode)
- if len(availableDisks) == 0 {
- return nil, fmt.Errorf("no available disks for balance operation")
- }
-
- // Score each disk for balance placement
- bestDisk := at.selectBestBalanceDestination(availableDisks, sourceRack, sourceDC, volumeSize)
- if bestDisk == nil {
- return nil, fmt.Errorf("no suitable destination found for balance operation")
- }
-
- return &DestinationPlan{
- TargetNode: bestDisk.NodeID,
- TargetDisk: bestDisk.DiskID,
- TargetRack: bestDisk.Rack,
- TargetDC: bestDisk.DataCenter,
- ExpectedSize: volumeSize,
- PlacementScore: at.calculatePlacementScore(bestDisk, sourceRack, sourceDC),
- Conflicts: at.checkPlacementConflicts(bestDisk, TaskTypeBalance),
- }, nil
-}
-
-// PlanECDestinations finds multiple destinations for EC shard distribution
-func (at *ActiveTopology) PlanECDestinations(volumeID uint32, sourceNode string, sourceRack string, sourceDC string, shardsNeeded int) (*MultiDestinationPlan, error) {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
-
- // Get available disks for EC placement
- availableDisks := at.getAvailableDisksForPlanning(TaskTypeErasureCoding, "")
- if len(availableDisks) < shardsNeeded {
- return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d", shardsNeeded, len(availableDisks))
- }
-
- // Select best disks for EC placement with rack/DC diversity
- selectedDisks := at.selectBestECDestinations(availableDisks, sourceRack, sourceDC, shardsNeeded)
- if len(selectedDisks) < shardsNeeded {
- return nil, fmt.Errorf("could not find %d suitable destinations for EC placement", shardsNeeded)
- }
-
- var plans []*DestinationPlan
- rackCount := make(map[string]int)
- dcCount := make(map[string]int)
-
- for _, disk := range selectedDisks {
- plan := &DestinationPlan{
- TargetNode: disk.NodeID,
- TargetDisk: disk.DiskID,
- TargetRack: disk.Rack,
- TargetDC: disk.DataCenter,
- ExpectedSize: 0, // EC shards don't have predetermined size
- PlacementScore: at.calculatePlacementScore(disk, sourceRack, sourceDC),
- Conflicts: at.checkPlacementConflicts(disk, TaskTypeErasureCoding),
- }
- plans = append(plans, plan)
-
- // Count rack and DC diversity
- rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
- rackCount[rackKey]++
- dcCount[disk.DataCenter]++
- }
-
- return &MultiDestinationPlan{
- Plans: plans,
- TotalShards: len(plans),
- SuccessfulRack: len(rackCount),
- SuccessfulDCs: len(dcCount),
- }, nil
-}
-
-// getAvailableDisksForPlanning returns disks available for destination planning
-func (at *ActiveTopology) getAvailableDisksForPlanning(taskType TaskType, excludeNodeID string) []*activeDisk {
- var available []*activeDisk
-
- for _, disk := range at.disks {
- if excludeNodeID != "" && disk.NodeID == excludeNodeID {
- continue // Skip excluded node
- }
-
- if at.isDiskAvailable(disk, taskType) {
- available = append(available, disk)
- }
- }
-
- return available
-}
-
-// selectBestBalanceDestination selects the best disk for balance operation
-func (at *ActiveTopology) selectBestBalanceDestination(disks []*activeDisk, sourceRack string, sourceDC string, volumeSize uint64) *activeDisk {
- if len(disks) == 0 {
- return nil
- }
-
- var bestDisk *activeDisk
- bestScore := -1.0
-
- for _, disk := range disks {
- score := at.calculateBalanceScore(disk, sourceRack, sourceDC, volumeSize)
- if score > bestScore {
- bestScore = score
- bestDisk = disk
- }
- }
-
- return bestDisk
-}
-
-// selectBestECDestinations selects multiple disks for EC shard placement with diversity
-func (at *ActiveTopology) selectBestECDestinations(disks []*activeDisk, sourceRack string, sourceDC string, shardsNeeded int) []*activeDisk {
- if len(disks) == 0 {
- return nil
- }
-
- // Group disks by rack and DC for diversity
- rackGroups := make(map[string][]*activeDisk)
- for _, disk := range disks {
- rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
- rackGroups[rackKey] = append(rackGroups[rackKey], disk)
- }
-
- var selected []*activeDisk
- usedRacks := make(map[string]bool)
-
- // First pass: select one disk from each rack for maximum diversity
- for rackKey, rackDisks := range rackGroups {
- if len(selected) >= shardsNeeded {
- break
- }
-
- // Select best disk from this rack
- bestDisk := at.selectBestFromRack(rackDisks, sourceRack, sourceDC)
- if bestDisk != nil {
- selected = append(selected, bestDisk)
- usedRacks[rackKey] = true
- }
- }
-
- // Second pass: if we need more disks, select from racks we've already used
- if len(selected) < shardsNeeded {
- for _, disk := range disks {
- if len(selected) >= shardsNeeded {
- break
- }
-
- // Skip if already selected
- alreadySelected := false
- for _, sel := range selected {
- if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID {
- alreadySelected = true
- break
- }
- }
-
- if !alreadySelected && at.isDiskAvailable(disk, TaskTypeErasureCoding) {
- selected = append(selected, disk)
- }
- }
- }
-
- return selected
-}
-
-// selectBestFromRack selects the best disk from a rack
-func (at *ActiveTopology) selectBestFromRack(disks []*activeDisk, sourceRack string, sourceDC string) *activeDisk {
- if len(disks) == 0 {
- return nil
- }
-
- var bestDisk *activeDisk
- bestScore := -1.0
-
- for _, disk := range disks {
- if !at.isDiskAvailable(disk, TaskTypeErasureCoding) {
- continue
- }
-
- score := at.calculateECScore(disk, sourceRack, sourceDC)
- if score > bestScore {
- bestScore = score
- bestDisk = disk
- }
- }
-
- return bestDisk
-}
-
-// calculateBalanceScore calculates placement score for balance operations
-func (at *ActiveTopology) calculateBalanceScore(disk *activeDisk, sourceRack string, sourceDC string, volumeSize uint64) float64 {
- score := 0.0
-
- // Prefer disks with lower load
- activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
- score += (2.0 - float64(activeLoad)) * 40.0 // Max 80 points for load
-
- // Prefer disks with more free space
- if disk.DiskInfo.DiskInfo.MaxVolumeCount > 0 {
- freeRatio := float64(disk.DiskInfo.DiskInfo.MaxVolumeCount-disk.DiskInfo.DiskInfo.VolumeCount) / float64(disk.DiskInfo.DiskInfo.MaxVolumeCount)
- score += freeRatio * 20.0 // Max 20 points for free space
- }
-
- // Rack diversity bonus (prefer different rack)
- if disk.Rack != sourceRack {
- score += 10.0
- }
-
- // DC diversity bonus (prefer different DC)
- if disk.DataCenter != sourceDC {
- score += 5.0
- }
-
- return score
-}
-
-// calculateECScore calculates placement score for EC operations
-func (at *ActiveTopology) calculateECScore(disk *activeDisk, sourceRack string, sourceDC string) float64 {
- score := 0.0
-
- // Prefer disks with lower load
- activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
- score += (2.0 - float64(activeLoad)) * 30.0 // Max 60 points for load
-
- // Prefer disks with more free space
- if disk.DiskInfo.DiskInfo.MaxVolumeCount > 0 {
- freeRatio := float64(disk.DiskInfo.DiskInfo.MaxVolumeCount-disk.DiskInfo.DiskInfo.VolumeCount) / float64(disk.DiskInfo.DiskInfo.MaxVolumeCount)
- score += freeRatio * 20.0 // Max 20 points for free space
- }
-
- // Strong rack diversity preference for EC
- if disk.Rack != sourceRack {
- score += 20.0
- }
-
- // Strong DC diversity preference for EC
- if disk.DataCenter != sourceDC {
- score += 15.0
- }
-
- return score
-}
-
-// calculatePlacementScore calculates overall placement quality score
-func (at *ActiveTopology) calculatePlacementScore(disk *activeDisk, sourceRack string, sourceDC string) float64 {
- score := 0.0
-
- // Load factor
- activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
- loadScore := (2.0 - float64(activeLoad)) / 2.0 // Normalize to 0-1
- score += loadScore * 0.4
-
- // Capacity factor
- if disk.DiskInfo.DiskInfo.MaxVolumeCount > 0 {
- freeRatio := float64(disk.DiskInfo.DiskInfo.MaxVolumeCount-disk.DiskInfo.DiskInfo.VolumeCount) / float64(disk.DiskInfo.DiskInfo.MaxVolumeCount)
- score += freeRatio * 0.3
- }
-
- // Diversity factor
- diversityScore := 0.0
- if disk.Rack != sourceRack {
- diversityScore += 0.5
- }
- if disk.DataCenter != sourceDC {
- diversityScore += 0.5
- }
- score += diversityScore * 0.3
-
- return score // Score between 0.0 and 1.0
-}
-
-// checkPlacementConflicts checks for placement rule violations
-func (at *ActiveTopology) checkPlacementConflicts(disk *activeDisk, taskType TaskType) []string {
- var conflicts []string
-
- // Check load limits
- activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
- if activeLoad >= 2 {
- conflicts = append(conflicts, fmt.Sprintf("disk_load_high_%d", activeLoad))
- }
-
- // Check capacity limits
- if disk.DiskInfo.DiskInfo.MaxVolumeCount > 0 {
- usageRatio := float64(disk.DiskInfo.DiskInfo.VolumeCount) / float64(disk.DiskInfo.DiskInfo.MaxVolumeCount)
- if usageRatio > 0.9 {
- conflicts = append(conflicts, "disk_capacity_high")
- }
- }
-
- // Check for conflicting task types
- for _, task := range disk.assignedTasks {
- if at.areTaskTypesConflicting(task.TaskType, taskType) {
- conflicts = append(conflicts, fmt.Sprintf("task_conflict_%s", task.TaskType))
- }
- }
-
- return conflicts
-}
-
// Private methods
// reassignTaskStates assigns tasks to the appropriate disks