diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-08-01 11:18:32 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-08-01 11:18:32 -0700 |
| commit | 0975968e71b05368d5f28f788cf863c2042c2696 (patch) | |
| tree | 5162a5fe3d9c88fb43f49f57326a4fd5b8cde74c /weed/worker/tasks/erasure_coding/detection.go | |
| parent | 1cba609bfa2306cc2885df212febd5ff954aa693 (diff) | |
| download | seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.tar.xz seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.zip | |
admin: Refactor task destination planning (#7063)
* refactor planning into task detection
* refactoring worker tasks
* refactor
* compiles, but only balance task is registered
* compiles, but has nil exception
* avoid nil logger
* add back ec task
* setting ec log directory
* implement balance and vacuum tasks
* EC tasks will no longer fail with "file not found" errors
* Use ReceiveFile API to send locally generated shards
* distributing shard files and ecx,ecj,vif files
* generate .ecx files correctly
* do not mount all possible EC shards (0-13) on every destination
* use constants
* delete all replicas
* rename files
* pass in volume size to tasks
Diffstat (limited to 'weed/worker/tasks/erasure_coding/detection.go')
| -rw-r--r-- | weed/worker/tasks/erasure_coding/detection.go | 314 |
1 files changed, 295 insertions, 19 deletions
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index 450080a12..1122d2721 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -5,7 +5,10 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/admin/topology" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) @@ -69,6 +72,38 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB), ScheduleAt: now, } + + // Plan EC destinations if ActiveTopology is available + if clusterInfo.ActiveTopology != nil { + multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig) + if err != nil { + glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err) + continue // Skip this volume if destination planning fails + } + + // Find all volume replicas from topology + replicas := findVolumeReplicas(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) + glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas) + + // Create typed parameters with EC destination information and replicas + result.TypedParams = &worker_pb.TaskParams{ + VolumeId: metric.VolumeID, + Server: metric.Server, + Collection: metric.Collection, + VolumeSize: metric.Size, // Store original volume size for tracking changes + Replicas: replicas, // Include all volume replicas for deletion + TaskParams: &worker_pb.TaskParams_ErasureCodingParams{ + ErasureCodingParams: createECTaskParams(multiPlan), + }, + } + + glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs", + metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs) + } else { + glog.Warningf("No ActiveTopology available for destination planning in EC detection") + continue // Skip this volume if no topology available + } + results = append(results, result) } else { // Count debug reasons @@ -105,36 +140,277 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI return results, nil } -// Scheduling implements the scheduling logic for erasure coding tasks -func Scheduling(task *types.Task, runningTasks []*types.Task, availableWorkers []*types.Worker, config base.TaskConfig) bool { - ecConfig := config.(*Config) +// planECDestinations plans the destinations for erasure coding operation +// This function implements EC destination planning logic directly in the detection phase +func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) { + // Get source node information from topology + var sourceRack, sourceDC string - // Check if we have available workers - if len(availableWorkers) == 0 { - return false + // Extract rack and DC from topology info + topologyInfo := activeTopology.GetTopologyInfo() + if topologyInfo != nil { + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, dataNodeInfo := range rack.DataNodeInfos { + if dataNodeInfo.Id == metric.Server { + sourceDC = dc.Id + sourceRack = rack.Id + break + } + } + if sourceRack != "" { + break + } + } + if sourceDC != "" { + break + } + } + } + + // Determine minimum shard disk locations based on configuration + minTotalDisks := 4 + + // Get available disks for EC placement (include source node for EC) + availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeErasureCoding, "") + if len(availableDisks) < minTotalDisks { + return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d", minTotalDisks, len(availableDisks)) + } + + // Select best disks for EC placement with rack/DC diversity + selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount) + if len(selectedDisks) < minTotalDisks { + return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), minTotalDisks) + } + + var plans []*topology.DestinationPlan + rackCount := make(map[string]int) + dcCount := make(map[string]int) + + for _, disk := range selectedDisks { + plan := &topology.DestinationPlan{ + TargetNode: disk.NodeID, + TargetDisk: disk.DiskID, + TargetRack: disk.Rack, + TargetDC: disk.DataCenter, + ExpectedSize: 0, // EC shards don't have predetermined size + PlacementScore: calculateECScore(disk, sourceRack, sourceDC), + Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC), + } + plans = append(plans, plan) + + // Count rack and DC diversity + rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) + rackCount[rackKey]++ + dcCount[disk.DataCenter]++ + } + + return &topology.MultiDestinationPlan{ + Plans: plans, + TotalShards: len(plans), + SuccessfulRack: len(rackCount), + SuccessfulDCs: len(dcCount), + }, nil +} + +// createECTaskParams creates EC task parameters from the multi-destination plan +func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams { + var destinations []*worker_pb.ECDestination + + for _, plan := range multiPlan.Plans { + destination := &worker_pb.ECDestination{ + Node: plan.TargetNode, + DiskId: plan.TargetDisk, + Rack: plan.TargetRack, + DataCenter: plan.TargetDC, + PlacementScore: plan.PlacementScore, + } + destinations = append(destinations, destination) + } + + // Collect placement conflicts from all destinations + var placementConflicts []string + for _, plan := range multiPlan.Plans { + placementConflicts = append(placementConflicts, plan.Conflicts...) + } + + return &worker_pb.ErasureCodingTaskParams{ + Destinations: destinations, + DataShards: erasure_coding.DataShardsCount, // Standard data shards + ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards + PlacementConflicts: placementConflicts, + } +} + +// selectBestECDestinations selects multiple disks for EC shard placement with diversity +func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo { + if len(disks) == 0 { + return nil + } + + // Group disks by rack and DC for diversity + rackGroups := make(map[string][]*topology.DiskInfo) + for _, disk := range disks { + rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) + rackGroups[rackKey] = append(rackGroups[rackKey], disk) + } + + var selected []*topology.DiskInfo + usedRacks := make(map[string]bool) + + // First pass: select one disk from each rack for maximum diversity + for rackKey, rackDisks := range rackGroups { + if len(selected) >= shardsNeeded { + break + } + + // Select best disk from this rack + bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC) + if bestDisk != nil { + selected = append(selected, bestDisk) + usedRacks[rackKey] = true + } + } + + // Second pass: if we need more disks, select from racks we've already used + if len(selected) < shardsNeeded { + for _, disk := range disks { + if len(selected) >= shardsNeeded { + break + } + + // Skip if already selected + alreadySelected := false + for _, sel := range selected { + if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID { + alreadySelected = true + break + } + } + + if !alreadySelected && isDiskSuitableForEC(disk) { + selected = append(selected, disk) + } + } + } + + return selected +} + +// selectBestFromRack selects the best disk from a rack for EC placement +func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo { + if len(disks) == 0 { + return nil } - // Count running EC tasks - runningCount := 0 - for _, runningTask := range runningTasks { - if runningTask.Type == types.TaskTypeErasureCoding { - runningCount++ + var bestDisk *topology.DiskInfo + bestScore := -1.0 + + for _, disk := range disks { + if !isDiskSuitableForEC(disk) { + continue + } + + score := calculateECScore(disk, sourceRack, sourceDC) + if score > bestScore { + bestScore = score + bestDisk = disk } } - // Check concurrency limit - if runningCount >= ecConfig.MaxConcurrent { + return bestDisk +} + +// calculateECScore calculates placement score for EC operations +func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 { + if disk.DiskInfo == nil { + return 0.0 + } + + score := 0.0 + + // Prefer disks with available capacity + if disk.DiskInfo.MaxVolumeCount > 0 { + utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount) + score += (1.0 - utilization) * 50.0 // Up to 50 points for available capacity + } + + // Prefer different racks for better distribution + if disk.Rack != sourceRack { + score += 30.0 + } + + // Prefer different data centers for better distribution + if disk.DataCenter != sourceDC { + score += 20.0 + } + + // Consider current load + score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load + + return score +} + +// isDiskSuitableForEC checks if a disk is suitable for EC placement +func isDiskSuitableForEC(disk *topology.DiskInfo) bool { + if disk.DiskInfo == nil { + return false + } + + // Check if disk has capacity + if disk.DiskInfo.VolumeCount >= disk.DiskInfo.MaxVolumeCount { + return false + } + + // Check if disk is not overloaded + if disk.LoadCount > 10 { // Arbitrary threshold return false } - // Check if any worker can handle EC tasks - for _, worker := range availableWorkers { - for _, capability := range worker.Capabilities { - if capability == types.TaskTypeErasureCoding { - return true + return true +} + +// checkECPlacementConflicts checks for placement rule conflicts in EC operations +func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC string) []string { + var conflicts []string + + // For EC, being on the same rack as source is often acceptable + // but we note it as potential conflict for monitoring + if disk.Rack == sourceRack && disk.DataCenter == sourceDC { + conflicts = append(conflicts, "same_rack_as_source") + } + + return conflicts +} + +// findVolumeReplicas finds all servers that have replicas of the specified volume +func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string { + if activeTopology == nil { + return []string{} + } + + topologyInfo := activeTopology.GetTopologyInfo() + if topologyInfo == nil { + return []string{} + } + + var replicaServers []string + + // Iterate through all nodes to find volume replicas + for _, dc := range topologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, nodeInfo := range rack.DataNodeInfos { + for _, diskInfo := range nodeInfo.DiskInfos { + for _, volumeInfo := range diskInfo.VolumeInfos { + if volumeInfo.Id == volumeID && volumeInfo.Collection == collection { + replicaServers = append(replicaServers, nodeInfo.Id) + break // Found volume on this node, move to next node + } + } + } } } } - return false + return replicaServers } |
