diff options
Diffstat (limited to 'weed/worker/tasks/balance/detection.go')
| -rw-r--r-- | weed/worker/tasks/balance/detection.go | 34 |
1 files changed, 34 insertions, 0 deletions
diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 102f532a8..be03fb92f 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -83,7 +83,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + // Generate task ID for ActiveTopology integration + taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix()) + task := &types.TaskDetectionResult{ + TaskID: taskID, // Link to ActiveTopology pending task TaskType: types.TaskTypeBalance, VolumeID: selectedVolume.VolumeID, Server: selectedVolume.Server, @@ -103,6 +107,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Create typed parameters with destination information task.TypedParams = &worker_pb.TaskParams{ + TaskId: taskID, // Link to ActiveTopology pending task VolumeId: selectedVolume.VolumeID, Server: selectedVolume.Server, Collection: selectedVolume.Collection, @@ -121,6 +126,35 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s (score: %.2f)", selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode, destinationPlan.PlacementScore) + + // Add pending balance task to ActiveTopology for capacity management + + // Find the actual disk containing the volume on the source server + sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) + if !found { + return nil, fmt.Errorf("BALANCE: Could not find volume %d (collection: %s) on source server %s - unable to create balance task", + selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) + } + targetDisk := destinationPlan.TargetDisk + + err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ + TaskID: taskID, + TaskType: topology.TaskTypeBalance, + VolumeID: selectedVolume.VolumeID, + VolumeSize: int64(selectedVolume.Size), + Sources: []topology.TaskSourceSpec{ + {ServerID: selectedVolume.Server, DiskID: sourceDisk}, + }, + Destinations: []topology.TaskDestinationSpec{ + {ServerID: destinationPlan.TargetNode, DiskID: targetDisk}, + }, + }) + if err != nil { + return nil, fmt.Errorf("BALANCE: Failed to add pending task for volume %d: %v", selectedVolume.VolumeID, err) + } + + glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d", + taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk) } else { glog.Warningf("No ActiveTopology available for destination planning in balance detection") return nil, nil |
