diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-08-01 11:18:32 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-08-01 11:18:32 -0700 |
| commit | 0975968e71b05368d5f28f788cf863c2042c2696 (patch) | |
| tree | 5162a5fe3d9c88fb43f49f57326a4fd5b8cde74c /weed/admin/maintenance/maintenance_integration.go | |
| parent | 1cba609bfa2306cc2885df212febd5ff954aa693 (diff) | |
| download | seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.tar.xz seaweedfs-0975968e71b05368d5f28f788cf863c2042c2696.zip | |
admin: Refactor task destination planning (#7063)
* refactor planning into task detection
* refactoring worker tasks
* refactor
* compiles, but only balance task is registered
* compiles, but has nil exception
* avoid nil logger
* add back ec task
* setting ec log directory
* implement balance and vacuum tasks
* EC tasks will no longer fail with "file not found" errors
* Use ReceiveFile API to send locally generated shards
* distributing shard files and ecx,ecj,vif files
* generate .ecx files correctly
* do not mount all possible EC shards (0-13) on every destination
* use constants
* delete all replicas
* rename files
* pass in volume size to tasks
Diffstat (limited to 'weed/admin/maintenance/maintenance_integration.go')
| -rw-r--r-- | weed/admin/maintenance/maintenance_integration.go | 467 |
1 files changed, 16 insertions, 451 deletions
diff --git a/weed/admin/maintenance/maintenance_integration.go b/weed/admin/maintenance/maintenance_integration.go index 1bdd7ffcc..553f32eb8 100644 --- a/weed/admin/maintenance/maintenance_integration.go +++ b/weed/admin/maintenance/maintenance_integration.go @@ -1,20 +1,13 @@ package maintenance import ( - "context" - "fmt" "time" "github.com/seaweedfs/seaweedfs/weed/admin/topology" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/operation" - "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/types" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" ) // MaintenanceIntegration bridges the task system with existing maintenance @@ -225,8 +218,9 @@ func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.Vo // Create cluster info clusterInfo := &types.ClusterInfo{ - TotalVolumes: len(filteredMetrics), - LastUpdated: time.Now(), + TotalVolumes: len(filteredMetrics), + LastUpdated: time.Now(), + ActiveTopology: s.activeTopology, // Provide ActiveTopology for destination planning } // Run detection for each registered task type @@ -250,8 +244,12 @@ func (s *MaintenanceIntegration) ScanWithTaskDetectors(volumeMetrics []*types.Vo // Double-check for conflicts with pending operations opType := s.mapMaintenanceTaskTypeToPendingOperationType(existingResult.TaskType) if !s.pendingOperations.WouldConflictWithPending(existingResult.VolumeID, opType) { - // Plan destination for operations that need it - s.planDestinationForTask(existingResult, opType) + // All task types should now have TypedParams populated during detection phase + if existingResult.TypedParams == nil { + glog.Warningf("Task %s for volume %d has no typed parameters - skipping (task parameter creation may have failed)", + existingResult.TaskType, existingResult.VolumeID) + continue + } allResults = append(allResults, existingResult) } else { glog.V(2).Infof("Skipping task %s for volume %d due to conflict with pending operation", @@ -342,7 +340,7 @@ func (s *MaintenanceIntegration) CanScheduleWithTaskSchedulers(task *Maintenance } // convertTaskToTaskSystem converts existing task to task system format using dynamic mapping -func (s *MaintenanceIntegration) convertTaskToTaskSystem(task *MaintenanceTask) *types.Task { +func (s *MaintenanceIntegration) convertTaskToTaskSystem(task *MaintenanceTask) *types.TaskInput { // Convert task type using mapping taskType, exists := s.revTaskTypeMap[task.Type] if !exists { @@ -358,7 +356,7 @@ func (s *MaintenanceIntegration) convertTaskToTaskSystem(task *MaintenanceTask) priority = types.TaskPriorityNormal } - return &types.Task{ + return &types.TaskInput{ ID: task.ID, Type: taskType, Priority: priority, @@ -371,8 +369,8 @@ func (s *MaintenanceIntegration) convertTaskToTaskSystem(task *MaintenanceTask) } // convertTasksToTaskSystem converts multiple tasks -func (s *MaintenanceIntegration) convertTasksToTaskSystem(tasks []*MaintenanceTask) []*types.Task { - var result []*types.Task +func (s *MaintenanceIntegration) convertTasksToTaskSystem(tasks []*MaintenanceTask) []*types.TaskInput { + var result []*types.TaskInput for _, task := range tasks { converted := s.convertTaskToTaskSystem(task) if converted != nil { @@ -383,8 +381,8 @@ func (s *MaintenanceIntegration) convertTasksToTaskSystem(tasks []*MaintenanceTa } // convertWorkersToTaskSystem converts workers to task system format using dynamic mapping -func (s *MaintenanceIntegration) convertWorkersToTaskSystem(workers []*MaintenanceWorker) []*types.Worker { - var result []*types.Worker +func (s *MaintenanceIntegration) convertWorkersToTaskSystem(workers []*MaintenanceWorker) []*types.WorkerData { + var result []*types.WorkerData for _, worker := range workers { capabilities := make([]types.TaskType, 0, len(worker.Capabilities)) for _, cap := range worker.Capabilities { @@ -397,7 +395,7 @@ func (s *MaintenanceIntegration) convertWorkersToTaskSystem(workers []*Maintenan } } - result = append(result, &types.Worker{ + result = append(result, &types.WorkerData{ ID: worker.ID, Address: worker.Address, Capabilities: capabilities, @@ -489,436 +487,3 @@ func (s *MaintenanceIntegration) GetPendingOperations() *PendingOperations { func (s *MaintenanceIntegration) GetActiveTopology() *topology.ActiveTopology { return s.activeTopology } - -// planDestinationForTask plans the destination for a task that requires it and creates typed protobuf parameters -func (s *MaintenanceIntegration) planDestinationForTask(task *TaskDetectionResult, opType PendingOperationType) { - // Only plan destinations for operations that move volumes/shards - if opType == OpTypeVacuum { - // For vacuum tasks, create VacuumTaskParams - s.createVacuumTaskParams(task) - return - } - - glog.V(1).Infof("Planning destination for %s task on volume %d (server: %s)", task.TaskType, task.VolumeID, task.Server) - - // Use ActiveTopology for destination planning - destinationPlan, err := s.planDestinationWithActiveTopology(task, opType) - - if err != nil { - glog.Warningf("Failed to plan primary destination for %s task volume %d: %v", - task.TaskType, task.VolumeID, err) - // Don't return here - still try to create task params which might work with multiple destinations - } - - // Create typed protobuf parameters based on operation type - switch opType { - case OpTypeErasureCoding: - if destinationPlan == nil { - glog.Warningf("Cannot create EC task for volume %d: destination planning failed", task.VolumeID) - return - } - s.createErasureCodingTaskParams(task, destinationPlan) - case OpTypeVolumeMove, OpTypeVolumeBalance: - if destinationPlan == nil { - glog.Warningf("Cannot create balance task for volume %d: destination planning failed", task.VolumeID) - return - } - s.createBalanceTaskParams(task, destinationPlan.(*topology.DestinationPlan)) - case OpTypeReplication: - if destinationPlan == nil { - glog.Warningf("Cannot create replication task for volume %d: destination planning failed", task.VolumeID) - return - } - s.createReplicationTaskParams(task, destinationPlan.(*topology.DestinationPlan)) - default: - glog.V(2).Infof("Unknown operation type for task %s: %v", task.TaskType, opType) - } - - if destinationPlan != nil { - switch plan := destinationPlan.(type) { - case *topology.DestinationPlan: - glog.V(1).Infof("Completed destination planning for %s task on volume %d: %s -> %s", - task.TaskType, task.VolumeID, task.Server, plan.TargetNode) - case *topology.MultiDestinationPlan: - glog.V(1).Infof("Completed EC destination planning for volume %d: %s -> %d destinations (racks: %d, DCs: %d)", - task.VolumeID, task.Server, len(plan.Plans), plan.SuccessfulRack, plan.SuccessfulDCs) - } - } else { - glog.V(1).Infof("Completed destination planning for %s task on volume %d: no destination planned", - task.TaskType, task.VolumeID) - } -} - -// createVacuumTaskParams creates typed parameters for vacuum tasks -func (s *MaintenanceIntegration) createVacuumTaskParams(task *TaskDetectionResult) { - // Get configuration from policy instead of using hard-coded values - vacuumConfig := GetVacuumTaskConfig(s.maintenancePolicy, MaintenanceTaskType("vacuum")) - - // Use configured values or defaults if config is not available - garbageThreshold := 0.3 // Default 30% - verifyChecksum := true // Default to verify - batchSize := int32(1000) // Default batch size - workingDir := "/tmp/seaweedfs_vacuum_work" // Default working directory - - if vacuumConfig != nil { - garbageThreshold = vacuumConfig.GarbageThreshold - // Note: VacuumTaskConfig has GarbageThreshold, MinVolumeAgeHours, MinIntervalSeconds - // Other fields like VerifyChecksum, BatchSize, WorkingDir would need to be added - // to the protobuf definition if they should be configurable - } - - // Create typed protobuf parameters - task.TypedParams = &worker_pb.TaskParams{ - VolumeId: task.VolumeID, - Server: task.Server, - Collection: task.Collection, - TaskParams: &worker_pb.TaskParams_VacuumParams{ - VacuumParams: &worker_pb.VacuumTaskParams{ - GarbageThreshold: garbageThreshold, - ForceVacuum: false, - BatchSize: batchSize, - WorkingDir: workingDir, - VerifyChecksum: verifyChecksum, - }, - }, - } -} - -// planDestinationWithActiveTopology uses ActiveTopology to plan destinations -func (s *MaintenanceIntegration) planDestinationWithActiveTopology(task *TaskDetectionResult, opType PendingOperationType) (interface{}, error) { - // Get source node information from topology - var sourceRack, sourceDC string - - // Extract rack and DC from topology info - topologyInfo := s.activeTopology.GetTopologyInfo() - if topologyInfo != nil { - for _, dc := range topologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, dataNodeInfo := range rack.DataNodeInfos { - if dataNodeInfo.Id == task.Server { - sourceDC = dc.Id - sourceRack = rack.Id - break - } - } - if sourceRack != "" { - break - } - } - if sourceDC != "" { - break - } - } - } - - switch opType { - case OpTypeVolumeBalance, OpTypeVolumeMove: - // Plan single destination for balance operation - return s.activeTopology.PlanBalanceDestination(task.VolumeID, task.Server, sourceRack, sourceDC, 0) - - case OpTypeErasureCoding: - // Plan multiple destinations for EC operation using adaptive shard counts - // Start with the default configuration, but fall back to smaller configurations if insufficient disks - totalShards := s.getOptimalECShardCount() - multiPlan, err := s.activeTopology.PlanECDestinations(task.VolumeID, task.Server, sourceRack, sourceDC, totalShards) - if err != nil { - return nil, err - } - if multiPlan != nil && len(multiPlan.Plans) > 0 { - // Return the multi-destination plan for EC - return multiPlan, nil - } - return nil, fmt.Errorf("no EC destinations found") - - default: - return nil, fmt.Errorf("unsupported operation type for destination planning: %v", opType) - } -} - -// createErasureCodingTaskParams creates typed parameters for EC tasks -func (s *MaintenanceIntegration) createErasureCodingTaskParams(task *TaskDetectionResult, destinationPlan interface{}) { - // Determine EC shard counts based on the number of planned destinations - multiPlan, ok := destinationPlan.(*topology.MultiDestinationPlan) - if !ok { - glog.Warningf("EC task for volume %d received unexpected destination plan type", task.VolumeID) - task.TypedParams = nil - return - } - - // Use adaptive shard configuration based on actual planned destinations - totalShards := len(multiPlan.Plans) - dataShards, parityShards := s.getECShardCounts(totalShards) - - // Extract disk-aware destinations from the multi-destination plan - var destinations []*worker_pb.ECDestination - var allConflicts []string - - for _, plan := range multiPlan.Plans { - allConflicts = append(allConflicts, plan.Conflicts...) - - // Create disk-aware destination - destinations = append(destinations, &worker_pb.ECDestination{ - Node: plan.TargetNode, - DiskId: plan.TargetDisk, - Rack: plan.TargetRack, - DataCenter: plan.TargetDC, - PlacementScore: plan.PlacementScore, - }) - } - - glog.V(1).Infof("EC destination planning for volume %d: got %d destinations (%d+%d shards) across %d racks and %d DCs", - task.VolumeID, len(destinations), dataShards, parityShards, multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs) - - if len(destinations) == 0 { - glog.Warningf("No destinations available for EC task volume %d - rejecting task", task.VolumeID) - task.TypedParams = nil - return - } - - // Collect existing EC shard locations for cleanup - existingShardLocations := s.collectExistingEcShardLocations(task.VolumeID) - - // Create EC task parameters - ecParams := &worker_pb.ErasureCodingTaskParams{ - Destinations: destinations, // Disk-aware destinations - DataShards: dataShards, - ParityShards: parityShards, - WorkingDir: "/tmp/seaweedfs_ec_work", - MasterClient: "localhost:9333", - CleanupSource: true, - ExistingShardLocations: existingShardLocations, // Pass existing shards for cleanup - } - - // Add placement conflicts if any - if len(allConflicts) > 0 { - // Remove duplicates - conflictMap := make(map[string]bool) - var uniqueConflicts []string - for _, conflict := range allConflicts { - if !conflictMap[conflict] { - conflictMap[conflict] = true - uniqueConflicts = append(uniqueConflicts, conflict) - } - } - ecParams.PlacementConflicts = uniqueConflicts - } - - // Wrap in TaskParams - task.TypedParams = &worker_pb.TaskParams{ - VolumeId: task.VolumeID, - Server: task.Server, - Collection: task.Collection, - TaskParams: &worker_pb.TaskParams_ErasureCodingParams{ - ErasureCodingParams: ecParams, - }, - } - - glog.V(1).Infof("Created EC task params with %d destinations for volume %d", - len(destinations), task.VolumeID) -} - -// createBalanceTaskParams creates typed parameters for balance/move tasks -func (s *MaintenanceIntegration) createBalanceTaskParams(task *TaskDetectionResult, destinationPlan *topology.DestinationPlan) { - // balanceConfig could be used for future config options like ImbalanceThreshold, MinServerCount - - // Create balance task parameters - balanceParams := &worker_pb.BalanceTaskParams{ - DestNode: destinationPlan.TargetNode, - EstimatedSize: destinationPlan.ExpectedSize, - DestRack: destinationPlan.TargetRack, - DestDc: destinationPlan.TargetDC, - PlacementScore: destinationPlan.PlacementScore, - ForceMove: false, // Default to false - TimeoutSeconds: 300, // Default 5 minutes - } - - // Add placement conflicts if any - if len(destinationPlan.Conflicts) > 0 { - balanceParams.PlacementConflicts = destinationPlan.Conflicts - } - - // Note: balanceConfig would have ImbalanceThreshold, MinServerCount if needed for future enhancements - - // Wrap in TaskParams - task.TypedParams = &worker_pb.TaskParams{ - VolumeId: task.VolumeID, - Server: task.Server, - Collection: task.Collection, - TaskParams: &worker_pb.TaskParams_BalanceParams{ - BalanceParams: balanceParams, - }, - } - - glog.V(1).Infof("Created balance task params for volume %d: %s -> %s (score: %.2f)", - task.VolumeID, task.Server, destinationPlan.TargetNode, destinationPlan.PlacementScore) -} - -// createReplicationTaskParams creates typed parameters for replication tasks -func (s *MaintenanceIntegration) createReplicationTaskParams(task *TaskDetectionResult, destinationPlan *topology.DestinationPlan) { - // replicationConfig could be used for future config options like TargetReplicaCount - - // Create replication task parameters - replicationParams := &worker_pb.ReplicationTaskParams{ - DestNode: destinationPlan.TargetNode, - DestRack: destinationPlan.TargetRack, - DestDc: destinationPlan.TargetDC, - PlacementScore: destinationPlan.PlacementScore, - } - - // Add placement conflicts if any - if len(destinationPlan.Conflicts) > 0 { - replicationParams.PlacementConflicts = destinationPlan.Conflicts - } - - // Note: replicationConfig would have TargetReplicaCount if needed for future enhancements - - // Wrap in TaskParams - task.TypedParams = &worker_pb.TaskParams{ - VolumeId: task.VolumeID, - Server: task.Server, - Collection: task.Collection, - TaskParams: &worker_pb.TaskParams_ReplicationParams{ - ReplicationParams: replicationParams, - }, - } - - glog.V(1).Infof("Created replication task params for volume %d: %s -> %s", - task.VolumeID, task.Server, destinationPlan.TargetNode) -} - -// getOptimalECShardCount returns the optimal number of EC shards based on available disks -// Uses a simplified approach to avoid blocking during UI access -func (s *MaintenanceIntegration) getOptimalECShardCount() int { - // Try to get available disks quickly, but don't block if topology is busy - availableDisks := s.getAvailableDisksQuickly() - - // EC configurations in order of preference: (data+parity=total) - // Use smaller configurations for smaller clusters - if availableDisks >= 14 { - glog.V(1).Infof("Using default EC configuration: 10+4=14 shards for %d available disks", availableDisks) - return 14 // Default: 10+4 - } else if availableDisks >= 6 { - glog.V(1).Infof("Using small cluster EC configuration: 4+2=6 shards for %d available disks", availableDisks) - return 6 // Small cluster: 4+2 - } else if availableDisks >= 4 { - glog.V(1).Infof("Using minimal EC configuration: 3+1=4 shards for %d available disks", availableDisks) - return 4 // Minimal: 3+1 - } else { - glog.V(1).Infof("Using very small cluster EC configuration: 2+1=3 shards for %d available disks", availableDisks) - return 3 // Very small: 2+1 - } -} - -// getAvailableDisksQuickly returns available disk count with a fast path to avoid UI blocking -func (s *MaintenanceIntegration) getAvailableDisksQuickly() int { - // Use ActiveTopology's optimized disk counting if available - // Use empty task type and node filter for general availability check - allDisks := s.activeTopology.GetAvailableDisks(topology.TaskTypeErasureCoding, "") - if len(allDisks) > 0 { - return len(allDisks) - } - - // Fallback: try to count from topology but don't hold locks for too long - topologyInfo := s.activeTopology.GetTopologyInfo() - return s.countAvailableDisks(topologyInfo) -} - -// countAvailableDisks counts the total number of available disks in the topology -func (s *MaintenanceIntegration) countAvailableDisks(topologyInfo *master_pb.TopologyInfo) int { - if topologyInfo == nil { - return 0 - } - - diskCount := 0 - for _, dc := range topologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, node := range rack.DataNodeInfos { - diskCount += len(node.DiskInfos) - } - } - } - - return diskCount -} - -// getECShardCounts determines data and parity shard counts for a given total -func (s *MaintenanceIntegration) getECShardCounts(totalShards int) (int32, int32) { - // Map total shards to (data, parity) configurations - switch totalShards { - case 14: - return 10, 4 // Default: 10+4 - case 9: - return 6, 3 // Medium: 6+3 - case 6: - return 4, 2 // Small: 4+2 - case 4: - return 3, 1 // Minimal: 3+1 - case 3: - return 2, 1 // Very small: 2+1 - default: - // For any other total, try to maintain roughly 3:1 or 4:1 ratio - if totalShards >= 4 { - parityShards := totalShards / 4 - if parityShards < 1 { - parityShards = 1 - } - dataShards := totalShards - parityShards - return int32(dataShards), int32(parityShards) - } - // Fallback for very small clusters - return int32(totalShards - 1), 1 - } -} - -// collectExistingEcShardLocations queries the master for existing EC shard locations during planning -func (s *MaintenanceIntegration) collectExistingEcShardLocations(volumeId uint32) []*worker_pb.ExistingECShardLocation { - var existingShardLocations []*worker_pb.ExistingECShardLocation - - // Use insecure connection for simplicity - in production this might be configurable - grpcDialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) - - err := operation.WithMasterServerClient(false, pb.ServerAddress("localhost:9333"), grpcDialOption, - func(masterClient master_pb.SeaweedClient) error { - req := &master_pb.LookupEcVolumeRequest{ - VolumeId: volumeId, - } - resp, err := masterClient.LookupEcVolume(context.Background(), req) - if err != nil { - // If volume doesn't exist as EC volume, that's fine - just no existing shards - glog.V(1).Infof("LookupEcVolume for volume %d returned: %v (this is normal if no existing EC shards)", volumeId, err) - return nil - } - - // Group shard locations by server - serverShardMap := make(map[string][]uint32) - for _, shardIdLocation := range resp.ShardIdLocations { - shardId := uint32(shardIdLocation.ShardId) - for _, location := range shardIdLocation.Locations { - serverAddr := pb.NewServerAddressFromLocation(location) - serverShardMap[string(serverAddr)] = append(serverShardMap[string(serverAddr)], shardId) - } - } - - // Convert to protobuf format - for serverAddr, shardIds := range serverShardMap { - existingShardLocations = append(existingShardLocations, &worker_pb.ExistingECShardLocation{ - Node: serverAddr, - ShardIds: shardIds, - }) - } - - return nil - }) - - if err != nil { - glog.Errorf("Failed to lookup existing EC shards from master for volume %d: %v", volumeId, err) - // Return empty list - cleanup will be skipped but task can continue - return []*worker_pb.ExistingECShardLocation{} - } - - if len(existingShardLocations) > 0 { - glog.V(1).Infof("Found existing EC shards for volume %d on %d servers during planning", volumeId, len(existingShardLocations)) - } - - return existingShardLocations -} |
