diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-08-01 11:18:32 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-08-01 11:18:32 -0700 |
| commit | 0975968e71b05368d5f28f788cf863c2042c2696 (patch) | |
| tree | 5162a5fe3d9c88fb43f49f57326a4fd5b8cde74c /weed/worker/tasks/balance/detection.go | |
| parent | 1cba609bfa2306cc2885df212febd5ff954aa693 (diff) | |
| download | seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.tar.xz seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.zip | |
admin: Refactor task destination planning (#7063)
* refactor planning into task detection
* refactoring worker tasks
* refactor
* compiles, but only balance task is registered
* compiles, but has nil exception
* avoid nil logger
* add back ec task
* setting ec log directory
* implement balance and vacuum tasks
* EC tasks will no longer fail with "file not found" errors
* Use ReceiveFile API to send locally generated shards
* distributing shard files and ecx,ecj,vif files
* generate .ecx files correctly
* do not mount all possible EC shards (0-13) on every destination
* use constants
* delete all replicas
* rename files
* pass in volume size to tasks
Diffstat (limited to 'weed/worker/tasks/balance/detection.go')
| -rw-r--r-- | weed/worker/tasks/balance/detection.go | 154 |
1 files changed, 127 insertions, 27 deletions
diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index f4bcf3ca3..102f532a8 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -4,7 +4,9 @@ import ( "fmt" "time" + "github.com/seaweedfs/seaweedfs/weed/admin/topology" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -89,46 +91,144 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI Priority: types.TaskPriorityNormal, Reason: reason, ScheduleAt: time.Now(), - // TypedParams will be populated by the maintenance integration - // with destination planning information + } + + // Plan destination if ActiveTopology is available + if clusterInfo.ActiveTopology != nil { + destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume) + if err != nil { + glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err) + return nil, nil // Skip this task if destination planning fails + } + + // Create typed parameters with destination information + task.TypedParams = &worker_pb.TaskParams{ + VolumeId: selectedVolume.VolumeID, + Server: selectedVolume.Server, + Collection: selectedVolume.Collection, + VolumeSize: selectedVolume.Size, // Store original volume size for tracking changes + TaskParams: &worker_pb.TaskParams_BalanceParams{ + BalanceParams: &worker_pb.BalanceTaskParams{ + DestNode: destinationPlan.TargetNode, + EstimatedSize: destinationPlan.ExpectedSize, + PlacementScore: destinationPlan.PlacementScore, + PlacementConflicts: destinationPlan.Conflicts, + ForceMove: false, + TimeoutSeconds: 600, // 10 minutes default + }, + }, + } + + glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s (score: %.2f)", + selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode, destinationPlan.PlacementScore) + } else { + glog.Warningf("No ActiveTopology available for destination planning in balance detection") + return nil, nil } return []*types.TaskDetectionResult{task}, nil } -// Scheduling implements the scheduling logic for balance tasks -func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { - balanceConfig := config.(*Config) - - // Count running balance tasks - runningBalanceCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeBalance { - runningBalanceCount++ +// planBalanceDestination plans the destination for a balance operation +// This function implements destination planning logic directly in the detection phase +func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVolume *types.VolumeHealthMetrics) (*topology.DestinationPlan, error) { + // Get source node information from topology + var sourceRack, sourceDC string + + // Extract rack and DC from topology info + topologyInfo := activeTopology.GetTopologyInfo() + if topologyInfo != nil { + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, dataNodeInfo := range rack.DataNodeInfos { + if dataNodeInfo.Id == selectedVolume.Server { + sourceDC = dc.Id + sourceRack = rack.Id + break + } + } + if sourceRack != "" { + break + } + } + if sourceDC != "" { + break + } } } - // Check concurrency limit - if runningBalanceCount >= balanceConfig.MaxConcurrent { - return false + // Get available disks, excluding the source node + availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeBalance, selectedVolume.Server) + if len(availableDisks) == 0 { + return nil, fmt.Errorf("no available disks for balance operation") } - // Check if we have available workers - availableWorkerCount := 0 - for _, worker := range availableWorkers { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeBalance { - availableWorkerCount++ - break - } + // Find the best destination disk based on balance criteria + var bestDisk *topology.DiskInfo + bestScore := -1.0 + + for _, disk := range availableDisks { + score := calculateBalanceScore(disk, sourceRack, sourceDC, selectedVolume.Size) + if score > bestScore { + bestScore = score + bestDisk = disk } } - return availableWorkerCount > 0 + if bestDisk == nil { + return nil, fmt.Errorf("no suitable destination found for balance operation") + } + + return &topology.DestinationPlan{ + TargetNode: bestDisk.NodeID, + TargetDisk: bestDisk.DiskID, + TargetRack: bestDisk.Rack, + TargetDC: bestDisk.DataCenter, + ExpectedSize: selectedVolume.Size, + PlacementScore: bestScore, + Conflicts: checkPlacementConflicts(bestDisk, sourceRack, sourceDC), + }, nil +} + +// calculateBalanceScore calculates placement score for balance operations +func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, volumeSize uint64) float64 { + if disk.DiskInfo == nil { + return 0.0 + } + + score := 0.0 + + // Prefer disks with lower current volume count (better for balance) + if disk.DiskInfo.MaxVolumeCount > 0 { + utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount) + score += (1.0 - utilization) * 40.0 // Up to 40 points for low utilization + } + + // Prefer different racks for better distribution + if disk.Rack != sourceRack { + score += 30.0 + } + + // Prefer different data centers for better distribution + if disk.DataCenter != sourceDC { + score += 20.0 + } + + // Prefer disks with lower current load + score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load + + return score } -// CreateTask creates a new balance task instance -func CreateTask(params types.TaskParams) (types.TaskInterface, error) { - // Create and return the balance task using existing Task type - return NewTask(params.Server, params.VolumeID, params.Collection), nil +// checkPlacementConflicts checks for placement rule conflicts +func checkPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC string) []string { + var conflicts []string + + // For now, implement basic conflict detection + // This could be extended with more sophisticated placement rules + if disk.Rack == sourceRack && disk.DataCenter == sourceDC { + conflicts = append(conflicts, "same_rack_as_source") + } + + return conflicts } |
