diff options
Diffstat (limited to 'weed/admin/topology/task_management.go')
| -rw-r--r-- | weed/admin/topology/task_management.go | 264 |
1 files changed, 264 insertions, 0 deletions
diff --git a/weed/admin/topology/task_management.go b/weed/admin/topology/task_management.go new file mode 100644 index 000000000..b240adcd8 --- /dev/null +++ b/weed/admin/topology/task_management.go @@ -0,0 +1,264 @@ +package topology + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +// AssignTask moves a task from pending to assigned and reserves capacity +func (at *ActiveTopology) AssignTask(taskID string) error { + at.mutex.Lock() + defer at.mutex.Unlock() + + task, exists := at.pendingTasks[taskID] + if !exists { + return fmt.Errorf("pending task %s not found", taskID) + } + + // Check if all destination disks have sufficient capacity to reserve + for _, dest := range task.Destinations { + targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk) + if targetDisk, exists := at.disks[targetKey]; exists { + availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk) + + // Check if we have enough total capacity using the improved unified comparison + if !availableCapacity.CanAccommodate(dest.StorageChange) { + return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v", + dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange) + } + } else if dest.TargetServer != "" { + // Fail fast if destination disk is not found in topology + return fmt.Errorf("destination disk %s not found in topology", targetKey) + } + } + + // Move task to assigned and reserve capacity + delete(at.pendingTasks, taskID) + task.Status = TaskStatusInProgress + at.assignedTasks[taskID] = task + at.reassignTaskStates() + + // Log capacity reservation information for all sources and destinations + totalSourceImpact := StorageSlotChange{} + totalDestImpact := StorageSlotChange{} + for _, source := range task.Sources { + totalSourceImpact.AddInPlace(source.StorageChange) + } + for _, dest := range task.Destinations { + totalDestImpact.AddInPlace(dest.StorageChange) + } + + glog.V(2).Infof("Task %s assigned and capacity reserved: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)", + taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots, + len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots) + + return nil +} + +// CompleteTask moves a task from assigned to recent and releases reserved capacity +// NOTE: This only releases the reserved capacity. The actual topology update (VolumeCount changes) +// should be handled by the master when it receives the task completion notification. +func (at *ActiveTopology) CompleteTask(taskID string) error { + at.mutex.Lock() + defer at.mutex.Unlock() + + task, exists := at.assignedTasks[taskID] + if !exists { + return fmt.Errorf("assigned task %s not found", taskID) + } + + // Release reserved capacity by moving task to completed state + delete(at.assignedTasks, taskID) + task.Status = TaskStatusCompleted + task.CompletedAt = time.Now() + at.recentTasks[taskID] = task + at.reassignTaskStates() + + // Log capacity release information for all sources and destinations + totalSourceImpact := StorageSlotChange{} + totalDestImpact := StorageSlotChange{} + for _, source := range task.Sources { + totalSourceImpact.AddInPlace(source.StorageChange) + } + for _, dest := range task.Destinations { + totalDestImpact.AddInPlace(dest.StorageChange) + } + + glog.V(2).Infof("Task %s completed and capacity released: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)", + taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots, + len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots) + + // Clean up old recent tasks + at.cleanupRecentTasks() + + return nil +} + +// ApplyActualStorageChange updates the topology to reflect actual storage changes after task completion +// This should be called when the master updates the topology with new VolumeCount information +func (at *ActiveTopology) ApplyActualStorageChange(nodeID string, diskID uint32, volumeCountChange int64) { + at.mutex.Lock() + defer at.mutex.Unlock() + + diskKey := fmt.Sprintf("%s:%d", nodeID, diskID) + if disk, exists := at.disks[diskKey]; exists && disk.DiskInfo != nil && disk.DiskInfo.DiskInfo != nil { + oldCount := disk.DiskInfo.DiskInfo.VolumeCount + disk.DiskInfo.DiskInfo.VolumeCount += volumeCountChange + + glog.V(2).Infof("Applied actual storage change on disk %s: volume_count %d -> %d (change: %+d)", + diskKey, oldCount, disk.DiskInfo.DiskInfo.VolumeCount, volumeCountChange) + } +} + +// AddPendingTask is the unified function that handles both simple and complex task creation +func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error { + // Validation + if len(spec.Sources) == 0 { + return fmt.Errorf("at least one source is required") + } + if len(spec.Destinations) == 0 { + return fmt.Errorf("at least one destination is required") + } + + at.mutex.Lock() + defer at.mutex.Unlock() + + // Build sources array + sources := make([]TaskSource, len(spec.Sources)) + for i, sourceSpec := range spec.Sources { + var storageImpact StorageSlotChange + var estimatedSize int64 + + if sourceSpec.StorageImpact != nil { + // Use manually specified impact + storageImpact = *sourceSpec.StorageImpact + } else { + // Auto-calculate based on task type and cleanup type + storageImpact = at.calculateSourceStorageImpact(spec.TaskType, sourceSpec.CleanupType, spec.VolumeSize) + } + + if sourceSpec.EstimatedSize != nil { + estimatedSize = *sourceSpec.EstimatedSize + } else { + estimatedSize = spec.VolumeSize // Default to volume size + } + + sources[i] = TaskSource{ + SourceServer: sourceSpec.ServerID, + SourceDisk: sourceSpec.DiskID, + StorageChange: storageImpact, + EstimatedSize: estimatedSize, + } + } + + // Build destinations array + destinations := make([]TaskDestination, len(spec.Destinations)) + for i, destSpec := range spec.Destinations { + var storageImpact StorageSlotChange + var estimatedSize int64 + + if destSpec.StorageImpact != nil { + // Use manually specified impact + storageImpact = *destSpec.StorageImpact + } else { + // Auto-calculate based on task type + _, storageImpact = CalculateTaskStorageImpact(spec.TaskType, spec.VolumeSize) + } + + if destSpec.EstimatedSize != nil { + estimatedSize = *destSpec.EstimatedSize + } else { + estimatedSize = spec.VolumeSize // Default to volume size + } + + destinations[i] = TaskDestination{ + TargetServer: destSpec.ServerID, + TargetDisk: destSpec.DiskID, + StorageChange: storageImpact, + EstimatedSize: estimatedSize, + } + } + + // Create the task + task := &taskState{ + VolumeID: spec.VolumeID, + TaskType: spec.TaskType, + Status: TaskStatusPending, + StartedAt: time.Now(), + EstimatedSize: spec.VolumeSize, + Sources: sources, + Destinations: destinations, + } + + at.pendingTasks[spec.TaskID] = task + at.assignTaskToDisk(task) + + glog.V(2).Infof("Added pending %s task %s: volume %d, %d sources, %d destinations", + spec.TaskType, spec.TaskID, spec.VolumeID, len(sources), len(destinations)) + + return nil +} + +// calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type +func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange { + switch taskType { + case TaskTypeErasureCoding: + switch cleanupType { + case CleanupVolumeReplica: + impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize) + return impact + case CleanupECShards: + return CalculateECShardCleanupImpact(volumeSize) + default: + impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize) + return impact + } + default: + impact, _ := CalculateTaskStorageImpact(taskType, volumeSize) + return impact + } +} + +// SourceCleanupType indicates what type of data needs to be cleaned up from a source +type SourceCleanupType int + +const ( + CleanupVolumeReplica SourceCleanupType = iota // Clean up volume replica (frees volume slots) + CleanupECShards // Clean up existing EC shards (frees shard slots) +) + +// TaskSourceSpec represents a source specification for task creation +type TaskSourceSpec struct { + ServerID string + DiskID uint32 + CleanupType SourceCleanupType // For EC: volume replica vs existing shards + StorageImpact *StorageSlotChange // Optional: manual override + EstimatedSize *int64 // Optional: manual override +} + +// TaskDestinationSpec represents a destination specification for task creation +type TaskDestinationSpec struct { + ServerID string + DiskID uint32 + StorageImpact *StorageSlotChange // Optional: manual override + EstimatedSize *int64 // Optional: manual override +} + +// TaskSpec represents a complete task specification +type TaskSpec struct { + TaskID string + TaskType TaskType + VolumeID uint32 + VolumeSize int64 // Used for auto-calculation when manual impacts not provided + Sources []TaskSourceSpec // Can be single or multiple + Destinations []TaskDestinationSpec // Can be single or multiple +} + +// TaskSourceLocation represents a source location for task creation (DEPRECATED: use TaskSourceSpec) +type TaskSourceLocation struct { + ServerID string + DiskID uint32 + CleanupType SourceCleanupType // What type of cleanup is needed +} |
