aboutsummaryrefslogtreecommitdiff
path: root/weed/worker/tasks/balance/detection.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/worker/tasks/balance/detection.go')
-rw-r--r--weed/worker/tasks/balance/detection.go34
1 files changed, 34 insertions, 0 deletions
diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go
index 102f532a8..be03fb92f 100644
--- a/weed/worker/tasks/balance/detection.go
+++ b/weed/worker/tasks/balance/detection.go
@@ -83,7 +83,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
+ // Generate task ID for ActiveTopology integration
+ taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix())
+
task := &types.TaskDetectionResult{
+ TaskID: taskID, // Link to ActiveTopology pending task
TaskType: types.TaskTypeBalance,
VolumeID: selectedVolume.VolumeID,
Server: selectedVolume.Server,
@@ -103,6 +107,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// Create typed parameters with destination information
task.TypedParams = &worker_pb.TaskParams{
+ TaskId: taskID, // Link to ActiveTopology pending task
VolumeId: selectedVolume.VolumeID,
Server: selectedVolume.Server,
Collection: selectedVolume.Collection,
@@ -121,6 +126,35 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s (score: %.2f)",
selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode, destinationPlan.PlacementScore)
+
+ // Add pending balance task to ActiveTopology for capacity management
+
+ // Find the actual disk containing the volume on the source server
+ sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
+ if !found {
+ return nil, fmt.Errorf("BALANCE: Could not find volume %d (collection: %s) on source server %s - unable to create balance task",
+ selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
+ }
+ targetDisk := destinationPlan.TargetDisk
+
+ err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{
+ TaskID: taskID,
+ TaskType: topology.TaskTypeBalance,
+ VolumeID: selectedVolume.VolumeID,
+ VolumeSize: int64(selectedVolume.Size),
+ Sources: []topology.TaskSourceSpec{
+ {ServerID: selectedVolume.Server, DiskID: sourceDisk},
+ },
+ Destinations: []topology.TaskDestinationSpec{
+ {ServerID: destinationPlan.TargetNode, DiskID: targetDisk},
+ },
+ })
+ if err != nil {
+ return nil, fmt.Errorf("BALANCE: Failed to add pending task for volume %d: %v", selectedVolume.VolumeID, err)
+ }
+
+ glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d",
+ taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk)
} else {
glog.Warningf("No ActiveTopology available for destination planning in balance detection")
return nil, nil