diff options
Diffstat (limited to 'weed/worker/tasks')
| -rw-r--r-- | weed/worker/tasks/balance/detection.go | 34 | ||||
| -rw-r--r-- | weed/worker/tasks/base/volume_utils.go | 36 | ||||
| -rw-r--r-- | weed/worker/tasks/erasure_coding/detection.go | 169 | ||||
| -rw-r--r-- | weed/worker/tasks/vacuum/detection.go | 6 |
4 files changed, 232 insertions, 13 deletions
diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index 102f532a8..be03fb92f 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -83,7 +83,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)", imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer) + // Generate task ID for ActiveTopology integration + taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix()) + task := &types.TaskDetectionResult{ + TaskID: taskID, // Link to ActiveTopology pending task TaskType: types.TaskTypeBalance, VolumeID: selectedVolume.VolumeID, Server: selectedVolume.Server, @@ -103,6 +107,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Create typed parameters with destination information task.TypedParams = &worker_pb.TaskParams{ + TaskId: taskID, // Link to ActiveTopology pending task VolumeId: selectedVolume.VolumeID, Server: selectedVolume.Server, Collection: selectedVolume.Collection, @@ -121,6 +126,35 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s (score: %.2f)", selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode, destinationPlan.PlacementScore) + + // Add pending balance task to ActiveTopology for capacity management + + // Find the actual disk containing the volume on the source server + sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) + if !found { + return nil, fmt.Errorf("BALANCE: Could not find volume %d (collection: %s) on source server %s - unable to create balance task", + selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server) + } + targetDisk := destinationPlan.TargetDisk + + err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ + TaskID: taskID, + TaskType: topology.TaskTypeBalance, + VolumeID: selectedVolume.VolumeID, + VolumeSize: int64(selectedVolume.Size), + Sources: []topology.TaskSourceSpec{ + {ServerID: selectedVolume.Server, DiskID: sourceDisk}, + }, + Destinations: []topology.TaskDestinationSpec{ + {ServerID: destinationPlan.TargetNode, DiskID: targetDisk}, + }, + }) + if err != nil { + return nil, fmt.Errorf("BALANCE: Failed to add pending task for volume %d: %v", selectedVolume.VolumeID, err) + } + + glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d", + taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk) } else { glog.Warningf("No ActiveTopology available for destination planning in balance detection") return nil, nil diff --git a/weed/worker/tasks/base/volume_utils.go b/weed/worker/tasks/base/volume_utils.go new file mode 100644 index 000000000..2aaf795b2 --- /dev/null +++ b/weed/worker/tasks/base/volume_utils.go @@ -0,0 +1,36 @@ +package base + +import ( + "github.com/seaweedfs/seaweedfs/weed/admin/topology" +) + +// FindVolumeDisk finds the disk ID where a specific volume is located on a given server. +// Returns the disk ID and a boolean indicating whether the volume was found. +// Uses O(1) indexed lookup for optimal performance on large clusters. +// +// This is a shared utility function used by multiple task detection algorithms +// (balance, vacuum, etc.) to locate volumes efficiently. +// +// Example usage: +// +// // In balance task: find source disk for a volume that needs to be moved +// sourceDisk, found := base.FindVolumeDisk(topology, volumeID, collection, sourceServer) +// +// // In vacuum task: find disk containing volume that needs cleanup +// diskID, exists := base.FindVolumeDisk(topology, volumeID, collection, serverID) +func FindVolumeDisk(activeTopology *topology.ActiveTopology, volumeID uint32, collection string, serverID string) (uint32, bool) { + if activeTopology == nil { + return 0, false + } + + // Use the new O(1) indexed lookup for better performance + locations := activeTopology.GetVolumeLocations(volumeID, collection) + for _, loc := range locations { + if loc.ServerID == serverID { + return loc.DiskID, true + } + } + + // Volume not found on this server + return 0, false +} diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index 9cf87cdf6..ec632436f 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -61,7 +61,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Check quiet duration and fullness criteria if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { + // Generate task ID for ActiveTopology integration + taskID := fmt.Sprintf("ec_vol_%d_%d", metric.VolumeID, now.Unix()) + result := &types.TaskDetectionResult{ + TaskID: taskID, // Link to ActiveTopology pending task TaskType: types.TaskTypeErasureCoding, VolumeID: metric.VolumeID, Server: metric.Server, @@ -81,12 +85,117 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI continue // Skip this volume if destination planning fails } - // Find all volume replicas from topology - replicas := findVolumeReplicas(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) + // Calculate expected shard size for EC operation + // Each data shard will be approximately volumeSize / dataShards + expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount) + + // Add pending EC shard task to ActiveTopology for capacity management + + // Extract shard destinations from multiPlan + var shardDestinations []string + var shardDiskIDs []uint32 + for _, plan := range multiPlan.Plans { + shardDestinations = append(shardDestinations, plan.TargetNode) + shardDiskIDs = append(shardDiskIDs, plan.TargetDisk) + } + + // Find all volume replica locations (server + disk) from topology + replicaLocations := findVolumeReplicaLocations(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) + if len(replicaLocations) == 0 { + glog.Warningf("No replica locations found for volume %d, skipping EC", metric.VolumeID) + continue + } + + // Find existing EC shards from previous failed attempts + existingECShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) + + // Combine volume replicas and existing EC shards for cleanup + var allSourceLocations []topology.TaskSourceLocation + + // Add volume replicas (will free volume slots) + for _, replica := range replicaLocations { + allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{ + ServerID: replica.ServerID, + DiskID: replica.DiskID, + CleanupType: topology.CleanupVolumeReplica, + }) + } + + // Add existing EC shards (will free shard slots) + duplicateCheck := make(map[string]bool) + for _, replica := range replicaLocations { + key := fmt.Sprintf("%s:%d", replica.ServerID, replica.DiskID) + duplicateCheck[key] = true + } + + for _, shard := range existingECShards { + key := fmt.Sprintf("%s:%d", shard.ServerID, shard.DiskID) + if !duplicateCheck[key] { // Avoid duplicates if EC shards are on same disk as volume replicas + allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{ + ServerID: shard.ServerID, + DiskID: shard.DiskID, + CleanupType: topology.CleanupECShards, + }) + duplicateCheck[key] = true + } + } + + glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)", + len(replicaLocations), len(existingECShards), metric.VolumeID, len(allSourceLocations)) + + // Convert TaskSourceLocation to TaskSourceSpec + sources := make([]topology.TaskSourceSpec, len(allSourceLocations)) + for i, srcLoc := range allSourceLocations { + sources[i] = topology.TaskSourceSpec{ + ServerID: srcLoc.ServerID, + DiskID: srcLoc.DiskID, + CleanupType: srcLoc.CleanupType, + } + } + + // Convert shard destinations to TaskDestinationSpec + destinations := make([]topology.TaskDestinationSpec, len(shardDestinations)) + shardImpact := topology.CalculateECShardStorageImpact(1, int64(expectedShardSize)) // 1 shard per destination + shardSize := int64(expectedShardSize) + for i, dest := range shardDestinations { + destinations[i] = topology.TaskDestinationSpec{ + ServerID: dest, + DiskID: shardDiskIDs[i], + StorageImpact: &shardImpact, + EstimatedSize: &shardSize, + } + } + + err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{ + TaskID: taskID, + TaskType: topology.TaskTypeErasureCoding, + VolumeID: metric.VolumeID, + VolumeSize: int64(metric.Size), + Sources: sources, + Destinations: destinations, + }) + if err != nil { + glog.Warningf("Failed to add pending EC shard task to ActiveTopology for volume %d: %v", metric.VolumeID, err) + continue // Skip this volume if topology task addition fails + } + + glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations", + taskID, metric.VolumeID, len(allSourceLocations), len(multiPlan.Plans)) + + // Find all volume replicas from topology (for legacy worker compatibility) + var replicas []string + serverSet := make(map[string]struct{}) + for _, loc := range replicaLocations { + if _, found := serverSet[loc.ServerID]; !found { + replicas = append(replicas, loc.ServerID) + serverSet[loc.ServerID] = struct{}{} + } + } glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas) // Create typed parameters with EC destination information and replicas result.TypedParams = &worker_pb.TaskParams{ + TaskId: taskID, // Link to ActiveTopology pending task VolumeId: metric.VolumeID, Server: metric.Server, Collection: metric.Collection, @@ -143,6 +252,9 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // planECDestinations plans the destinations for erasure coding operation // This function implements EC destination planning logic directly in the detection phase func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) { + // Calculate expected shard size for EC operation + expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount) + // Get source node information from topology var sourceRack, sourceDC string @@ -168,10 +280,12 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V } } - // Get available disks for EC placement (include source node for EC) - availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeErasureCoding, "") + // Get available disks for EC placement with effective capacity consideration (includes pending tasks) + // For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1 + // For EC, we need at least 1 available volume slot on a disk to consider it for placement. + availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, metric.Server, 1) if len(availableDisks) < erasure_coding.MinTotalDisks { - return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d", erasure_coding.MinTotalDisks, len(availableDisks)) + return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks)) } // Select best disks for EC placement with rack/DC diversity @@ -190,7 +304,7 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V TargetDisk: disk.DiskID, TargetRack: disk.Rack, TargetDC: disk.DataCenter, - ExpectedSize: 0, // EC shards don't have predetermined size + ExpectedSize: expectedShardSize, // Set calculated EC shard size PlacementScore: calculateECScore(disk, sourceRack, sourceDC), Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC), } @@ -202,6 +316,22 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V dcCount[disk.DataCenter]++ } + // Log capacity utilization information using ActiveTopology's encapsulated logic + totalEffectiveCapacity := int64(0) + for _, plan := range plans { + effectiveCapacity := activeTopology.GetEffectiveAvailableCapacity(plan.TargetNode, plan.TargetDisk) + totalEffectiveCapacity += effectiveCapacity + } + + glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs, total effective capacity=%d slots", + metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount), totalEffectiveCapacity) + + // Log storage impact for EC task (source only - EC has multiple targets handled individually) + sourceChange, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, int64(metric.Size)) + glog.V(2).Infof("EC task capacity management: source_reserves_with_zero_impact={VolumeSlots:%d, ShardSlots:%d}, %d_targets_will_receive_shards, estimated_size=%d", + sourceChange.VolumeSlots, sourceChange.ShardSlots, len(plans), metric.Size) + glog.V(2).Infof("EC source reserves capacity but with zero StorageSlotChange impact") + return &topology.MultiDestinationPlan{ Plans: plans, TotalShards: len(plans), @@ -354,13 +484,8 @@ func isDiskSuitableForEC(disk *topology.DiskInfo) bool { return false } - // Check if disk has capacity - if disk.DiskInfo.VolumeCount >= disk.DiskInfo.MaxVolumeCount { - return false - } - - // Check if disk is not overloaded - if disk.LoadCount > 10 { // Arbitrary threshold + // Check if disk is not overloaded with tasks + if disk.LoadCount > topology.MaxTaskLoadForECPlacement { return false } @@ -380,6 +505,24 @@ func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC str return conflicts } +// findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume +// Uses O(1) indexed lookup for optimal performance on large clusters. +func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica { + if activeTopology == nil { + return nil + } + return activeTopology.GetVolumeLocations(volumeID, collection) +} + +// findExistingECShards finds existing EC shards for a volume (from previous failed EC attempts) +// Uses O(1) indexed lookup for optimal performance on large clusters. +func findExistingECShards(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica { + if activeTopology == nil { + return nil + } + return activeTopology.GetECShardLocations(volumeID, collection) +} + // findVolumeReplicas finds all servers that have replicas of the specified volume func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string { if activeTopology == nil { diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go index 23f82ad34..0c14bb956 100644 --- a/weed/worker/tasks/vacuum/detection.go +++ b/weed/worker/tasks/vacuum/detection.go @@ -1,6 +1,7 @@ package vacuum import ( + "fmt" "time" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -31,7 +32,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI priority = types.TaskPriorityHigh } + // Generate task ID for future ActiveTopology integration + taskID := fmt.Sprintf("vacuum_vol_%d_%d", metric.VolumeID, time.Now().Unix()) + result := &types.TaskDetectionResult{ + TaskID: taskID, // For future ActiveTopology integration TaskType: types.TaskTypeVacuum, VolumeID: metric.VolumeID, Server: metric.Server, @@ -96,6 +101,7 @@ func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.Volum // Create typed protobuf parameters return &worker_pb.TaskParams{ + TaskId: task.TaskID, // Link to ActiveTopology pending task (if integrated) VolumeId: task.VolumeID, Server: task.Server, Collection: task.Collection, |
