diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-08-09 21:47:29 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-08-09 21:47:29 -0700 |
| commit | 25bbf4c3d44b1c8a9aa4980e37ed399ec249f771 (patch) | |
| tree | aabb2ce3c6f55e4cf1e26ce2b6989086c17830bc /weed/worker/tasks/erasure_coding/detection.go | |
| parent | 3ac2a2e22d863753a6b568596fbe9d76d03023b5 (diff) | |
| download | seaweedfs-25bbf4c3d44b1c8a9aa4980e37ed399ec249f771.tar.xz seaweedfs-25bbf4c3d44b1c8a9aa4980e37ed399ec249f771.zip | |
Admin UI: Fetch task logs (#7114)
* show task details
* loading tasks
* task UI works
* generic rendering
* rendering the export link
* removing placementConflicts from task parameters
* remove TaskSourceLocation
* remove "Server ID" column
* rendering balance task source
* sources and targets
* fix ec task generation
* move info
* render timeline
* simplified worker id
* simplify
* read task logs from worker
* isValidTaskID
* address comments
* Update weed/worker/tasks/balance/execution.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/worker/tasks/erasure_coding/ec_task.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update weed/worker/tasks/task_log_handler.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix shard ids
* plan distributing shard id
* rendering planned shards in task details
* remove Conflicts
* worker logs correctly
* pass in dc and rack
* task logging
* Update weed/admin/maintenance/maintenance_queue.go
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
* display log details
* logs have fields now
* sort field keys
* fix link
* fix collection filtering
* avoid hard coded ec shard counts
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Diffstat (limited to 'weed/worker/tasks/erasure_coding/detection.go')
| -rw-r--r-- | weed/worker/tasks/erasure_coding/detection.go | 194 |
1 files changed, 117 insertions, 77 deletions
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index ec632436f..cd74bed33 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -61,6 +61,8 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Check quiet duration and fullness criteria if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio { + glog.Infof("EC Detection: Volume %d meets all criteria, attempting to create task", metric.VolumeID) + // Generate task ID for ActiveTopology integration taskID := fmt.Sprintf("ec_vol_%d_%d", metric.VolumeID, now.Unix()) @@ -79,11 +81,13 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI // Plan EC destinations if ActiveTopology is available if clusterInfo.ActiveTopology != nil { + glog.Infof("EC Detection: ActiveTopology available, planning destinations for volume %d", metric.VolumeID) multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig) if err != nil { glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err) continue // Skip this volume if destination planning fails } + glog.Infof("EC Detection: Successfully planned %d destinations for volume %d", len(multiPlan.Plans), metric.VolumeID) // Calculate expected shard size for EC operation // Each data shard will be approximately volumeSize / dataShards @@ -100,23 +104,27 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI } // Find all volume replica locations (server + disk) from topology + glog.Infof("EC Detection: Looking for replica locations for volume %d", metric.VolumeID) replicaLocations := findVolumeReplicaLocations(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) if len(replicaLocations) == 0 { glog.Warningf("No replica locations found for volume %d, skipping EC", metric.VolumeID) continue } + glog.Infof("EC Detection: Found %d replica locations for volume %d", len(replicaLocations), metric.VolumeID) // Find existing EC shards from previous failed attempts existingECShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection) // Combine volume replicas and existing EC shards for cleanup - var allSourceLocations []topology.TaskSourceLocation + var sources []topology.TaskSourceSpec // Add volume replicas (will free volume slots) for _, replica := range replicaLocations { - allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{ + sources = append(sources, topology.TaskSourceSpec{ ServerID: replica.ServerID, DiskID: replica.DiskID, + DataCenter: replica.DataCenter, + Rack: replica.Rack, CleanupType: topology.CleanupVolumeReplica, }) } @@ -131,9 +139,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI for _, shard := range existingECShards { key := fmt.Sprintf("%s:%d", shard.ServerID, shard.DiskID) if !duplicateCheck[key] { // Avoid duplicates if EC shards are on same disk as volume replicas - allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{ + sources = append(sources, topology.TaskSourceSpec{ ServerID: shard.ServerID, DiskID: shard.DiskID, + DataCenter: shard.DataCenter, + Rack: shard.Rack, CleanupType: topology.CleanupECShards, }) duplicateCheck[key] = true @@ -141,17 +151,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI } glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)", - len(replicaLocations), len(existingECShards), metric.VolumeID, len(allSourceLocations)) - - // Convert TaskSourceLocation to TaskSourceSpec - sources := make([]topology.TaskSourceSpec, len(allSourceLocations)) - for i, srcLoc := range allSourceLocations { - sources[i] = topology.TaskSourceSpec{ - ServerID: srcLoc.ServerID, - DiskID: srcLoc.DiskID, - CleanupType: srcLoc.CleanupType, - } - } + len(replicaLocations), len(existingECShards), metric.VolumeID, len(sources)) // Convert shard destinations to TaskDestinationSpec destinations := make([]topology.TaskDestinationSpec, len(shardDestinations)) @@ -180,27 +180,21 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI } glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations", - taskID, metric.VolumeID, len(allSourceLocations), len(multiPlan.Plans)) - - // Find all volume replicas from topology (for legacy worker compatibility) - var replicas []string - serverSet := make(map[string]struct{}) - for _, loc := range replicaLocations { - if _, found := serverSet[loc.ServerID]; !found { - replicas = append(replicas, loc.ServerID) - serverSet[loc.ServerID] = struct{}{} - } - } - glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas) + taskID, metric.VolumeID, len(sources), len(multiPlan.Plans)) - // Create typed parameters with EC destination information and replicas + // Create unified sources and targets for EC task result.TypedParams = &worker_pb.TaskParams{ TaskId: taskID, // Link to ActiveTopology pending task VolumeId: metric.VolumeID, - Server: metric.Server, Collection: metric.Collection, VolumeSize: metric.Size, // Store original volume size for tracking changes - Replicas: replicas, // Include all volume replicas for deletion + + // Unified sources - all sources that will be processed/cleaned up + Sources: convertTaskSourcesToProtobuf(sources, metric.VolumeID), + + // Unified targets - all EC shard destinations + Targets: createECTargets(multiPlan), + TaskParams: &worker_pb.TaskParams_ErasureCodingParams{ ErasureCodingParams: createECTaskParams(multiPlan), }, @@ -213,6 +207,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI continue // Skip this volume if no topology available } + glog.Infof("EC Detection: Successfully created EC task for volume %d, adding to results", metric.VolumeID) results = append(results, result) } else { // Count debug reasons @@ -283,7 +278,8 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V // Get available disks for EC placement with effective capacity consideration (includes pending tasks) // For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1 // For EC, we need at least 1 available volume slot on a disk to consider it for placement. - availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, metric.Server, 1) + // Note: We don't exclude the source server since the original volume will be deleted after EC conversion + availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, "", 1) if len(availableDisks) < erasure_coding.MinTotalDisks { return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks)) } @@ -306,7 +302,6 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V TargetDC: disk.DataCenter, ExpectedSize: expectedShardSize, // Set calculated EC shard size PlacementScore: calculateECScore(disk, sourceRack, sourceDC), - Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC), } plans = append(plans, plan) @@ -340,32 +335,96 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V }, nil } -// createECTaskParams creates EC task parameters from the multi-destination plan -func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams { - var destinations []*worker_pb.ECDestination - - for _, plan := range multiPlan.Plans { - destination := &worker_pb.ECDestination{ - Node: plan.TargetNode, - DiskId: plan.TargetDisk, - Rack: plan.TargetRack, - DataCenter: plan.TargetDC, - PlacementScore: plan.PlacementScore, +// createECTargets creates unified TaskTarget structures from the multi-destination plan +// with proper shard ID assignment during planning phase +func createECTargets(multiPlan *topology.MultiDestinationPlan) []*worker_pb.TaskTarget { + var targets []*worker_pb.TaskTarget + numTargets := len(multiPlan.Plans) + + // Create shard assignment arrays for each target (round-robin distribution) + targetShards := make([][]uint32, numTargets) + for i := range targetShards { + targetShards[i] = make([]uint32, 0) + } + + // Distribute shards in round-robin fashion to spread both data and parity shards + // This ensures each target gets a mix of data shards (0-9) and parity shards (10-13) + for shardId := uint32(0); shardId < uint32(erasure_coding.TotalShardsCount); shardId++ { + targetIndex := int(shardId) % numTargets + targetShards[targetIndex] = append(targetShards[targetIndex], shardId) + } + + // Create targets with assigned shard IDs + for i, plan := range multiPlan.Plans { + target := &worker_pb.TaskTarget{ + Node: plan.TargetNode, + DiskId: plan.TargetDisk, + Rack: plan.TargetRack, + DataCenter: plan.TargetDC, + ShardIds: targetShards[i], // Round-robin assigned shards + EstimatedSize: plan.ExpectedSize, + } + targets = append(targets, target) + + // Log shard assignment with data/parity classification + dataShards := make([]uint32, 0) + parityShards := make([]uint32, 0) + for _, shardId := range targetShards[i] { + if shardId < uint32(erasure_coding.DataShardsCount) { + dataShards = append(dataShards, shardId) + } else { + parityShards = append(parityShards, shardId) + } } - destinations = append(destinations, destination) + glog.V(2).Infof("EC planning: target %s assigned shards %v (data: %v, parity: %v)", + plan.TargetNode, targetShards[i], dataShards, parityShards) } - // Collect placement conflicts from all destinations - var placementConflicts []string - for _, plan := range multiPlan.Plans { - placementConflicts = append(placementConflicts, plan.Conflicts...) + glog.V(1).Infof("EC planning: distributed %d shards across %d targets using round-robin (data shards 0-%d, parity shards %d-%d)", + erasure_coding.TotalShardsCount, numTargets, + erasure_coding.DataShardsCount-1, erasure_coding.DataShardsCount, erasure_coding.TotalShardsCount-1) + return targets +} + +// convertTaskSourcesToProtobuf converts topology.TaskSourceSpec to worker_pb.TaskSource +func convertTaskSourcesToProtobuf(sources []topology.TaskSourceSpec, volumeID uint32) []*worker_pb.TaskSource { + var protobufSources []*worker_pb.TaskSource + + for _, source := range sources { + pbSource := &worker_pb.TaskSource{ + Node: source.ServerID, + DiskId: source.DiskID, + DataCenter: source.DataCenter, + Rack: source.Rack, + } + + // Convert storage impact to estimated size + if source.EstimatedSize != nil { + pbSource.EstimatedSize = uint64(*source.EstimatedSize) + } + + // Set appropriate volume ID or shard IDs based on cleanup type + switch source.CleanupType { + case topology.CleanupVolumeReplica: + // This is a volume replica, use the actual volume ID + pbSource.VolumeId = volumeID + case topology.CleanupECShards: + // This is EC shards, also use the volume ID for consistency + pbSource.VolumeId = volumeID + // Note: ShardIds would need to be passed separately if we need specific shard info + } + + protobufSources = append(protobufSources, pbSource) } + return protobufSources +} + +// createECTaskParams creates clean EC task parameters (destinations now in unified targets) +func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams { return &worker_pb.ErasureCodingTaskParams{ - Destinations: destinations, - DataShards: erasure_coding.DataShardsCount, // Standard data shards - ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards - PlacementConflicts: placementConflicts, + DataShards: erasure_coding.DataShardsCount, // Standard data shards + ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards } } @@ -456,25 +515,19 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa score := 0.0 - // Prefer disks with available capacity + // Prefer disks with available capacity (primary factor) if disk.DiskInfo.MaxVolumeCount > 0 { utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount) - score += (1.0 - utilization) * 50.0 // Up to 50 points for available capacity + score += (1.0 - utilization) * 60.0 // Up to 60 points for available capacity } - // Prefer different racks for better distribution - if disk.Rack != sourceRack { - score += 30.0 - } - - // Prefer different data centers for better distribution - if disk.DataCenter != sourceDC { - score += 20.0 - } - - // Consider current load + // Consider current load (secondary factor) score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load + // Note: We don't penalize placing shards on the same rack/DC as source + // since the original volume will be deleted after EC conversion. + // This allows for better network efficiency and storage utilization. + return score } @@ -492,19 +545,6 @@ func isDiskSuitableForEC(disk *topology.DiskInfo) bool { return true } -// checkECPlacementConflicts checks for placement rule conflicts in EC operations -func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC string) []string { - var conflicts []string - - // For EC, being on the same rack as source is often acceptable - // but we note it as potential conflict for monitoring - if disk.Rack == sourceRack && disk.DataCenter == sourceDC { - conflicts = append(conflicts, "same_rack_as_source") - } - - return conflicts -} - // findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume // Uses O(1) indexed lookup for optimal performance on large clusters. func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica { |
