aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/balance/detection.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/balance/detection.go')
-rw-r--r--weed/worker/tasks/balance/detection.go154
1 files changed, 127 insertions, 27 deletions
diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go
index f4bcf3ca3..102f532a8 100644
--- a/weed/worker/tasks/balance/detection.go
+++ b/weed/worker/tasks/balance/detection.go
@@ -4,7 +4,9 @@ import (
"fmt"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@@ -89,46 +91,144 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
Priority: types.TaskPriorityNormal,
Reason: reason,
ScheduleAt: time.Now(),
- // TypedParams will be populated by the maintenance integration
- // with destination planning information
+ }
+
+ // Plan destination if ActiveTopology is available
+ if clusterInfo.ActiveTopology != nil {
+ destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume)
+ if err != nil {
+ glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err)
+ return nil, nil // Skip this task if destination planning fails
+ }
+
+ // Create typed parameters with destination information
+ task.TypedParams = &worker_pb.TaskParams{
+ VolumeId: selectedVolume.VolumeID,
+ Server: selectedVolume.Server,
+ Collection: selectedVolume.Collection,
+ VolumeSize: selectedVolume.Size, // Store original volume size for tracking changes
+ TaskParams: &worker_pb.TaskParams_BalanceParams{
+ BalanceParams: &worker_pb.BalanceTaskParams{
+ DestNode: destinationPlan.TargetNode,
+ EstimatedSize: destinationPlan.ExpectedSize,
+ PlacementScore: destinationPlan.PlacementScore,
+ PlacementConflicts: destinationPlan.Conflicts,
+ ForceMove: false,
+ TimeoutSeconds: 600, // 10 minutes default
+ },
+ },
+ }
+
+ glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s (score: %.2f)",
+ selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode, destinationPlan.PlacementScore)
+ } else {
+ glog.Warningf("No ActiveTopology available for destination planning in balance detection")
+ return nil, nil
}
return []*types.TaskDetectionResult{task}, nil
}
-// Scheduling implements the scheduling logic for balance tasks
-func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool {
- balanceConfig := config.(*Config)
-
- // Count running balance tasks
- runningBalanceCount := 0
- for _, runningTask := range runningTasks {
- if runningTask.Type == types.TaskTypeBalance {
- runningBalanceCount++
+// planBalanceDestination plans the destination for a balance operation
+// This function implements destination planning logic directly in the detection phase
+func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVolume *types.VolumeHealthMetrics) (*topology.DestinationPlan, error) {
+ // Get source node information from topology
+ var sourceRack, sourceDC string
+
+ // Extract rack and DC from topology info
+ topologyInfo := activeTopology.GetTopologyInfo()
+ if topologyInfo != nil {
+ for _, dc := range topologyInfo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, dataNodeInfo := range rack.DataNodeInfos {
+ if dataNodeInfo.Id == selectedVolume.Server {
+ sourceDC = dc.Id
+ sourceRack = rack.Id
+ break
+ }
+ }
+ if sourceRack != "" {
+ break
+ }
+ }
+ if sourceDC != "" {
+ break
+ }
}
}
- // Check concurrency limit
- if runningBalanceCount >= balanceConfig.MaxConcurrent {
- return false
+ // Get available disks, excluding the source node
+ availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeBalance, selectedVolume.Server)
+ if len(availableDisks) == 0 {
+ return nil, fmt.Errorf("no available disks for balance operation")
}
- // Check if we have available workers
- availableWorkerCount := 0
- for _, worker := range availableWorkers {
- for _, capability := range worker.Capabilities {
- if capability == types.TaskTypeBalance {
- availableWorkerCount++
- break
- }
+ // Find the best destination disk based on balance criteria
+ var bestDisk *topology.DiskInfo
+ bestScore := -1.0
+
+ for _, disk := range availableDisks {
+ score := calculateBalanceScore(disk, sourceRack, sourceDC, selectedVolume.Size)
+ if score > bestScore {
+ bestScore = score
+ bestDisk = disk
}
}
- return availableWorkerCount > 0
+ if bestDisk == nil {
+ return nil, fmt.Errorf("no suitable destination found for balance operation")
+ }
+
+ return &topology.DestinationPlan{
+ TargetNode: bestDisk.NodeID,
+ TargetDisk: bestDisk.DiskID,
+ TargetRack: bestDisk.Rack,
+ TargetDC: bestDisk.DataCenter,
+ ExpectedSize: selectedVolume.Size,
+ PlacementScore: bestScore,
+ Conflicts: checkPlacementConflicts(bestDisk, sourceRack, sourceDC),
+ }, nil
+}
+
+// calculateBalanceScore calculates placement score for balance operations
+func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, volumeSize uint64) float64 {
+ if disk.DiskInfo == nil {
+ return 0.0
+ }
+
+ score := 0.0
+
+ // Prefer disks with lower current volume count (better for balance)
+ if disk.DiskInfo.MaxVolumeCount > 0 {
+ utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount)
+ score += (1.0 - utilization) * 40.0 // Up to 40 points for low utilization
+ }
+
+ // Prefer different racks for better distribution
+ if disk.Rack != sourceRack {
+ score += 30.0
+ }
+
+ // Prefer different data centers for better distribution
+ if disk.DataCenter != sourceDC {
+ score += 20.0
+ }
+
+ // Prefer disks with lower current load
+ score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
+
+ return score
}
-// CreateTask creates a new balance task instance
-func CreateTask(params types.TaskParams) (types.TaskInterface, error) {
- // Create and return the balance task using existing Task type
- return NewTask(params.Server, params.VolumeID, params.Collection), nil
+// checkPlacementConflicts checks for placement rule conflicts
+func checkPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC string) []string {
+ var conflicts []string
+
+ // For now, implement basic conflict detection
+ // This could be extended with more sophisticated placement rules
+ if disk.Rack == sourceRack && disk.DataCenter == sourceDC {
+ conflicts = append(conflicts, "same_rack_as_source")
+ }
+
+ return conflicts
}