aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/topology/task_management.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/topology/task_management.go')
-rw-r--r--weed/admin/topology/task_management.go264
1 files changed, 264 insertions, 0 deletions
diff --git a/weed/admin/topology/task_management.go b/weed/admin/topology/task_management.go
new file mode 100644
index 000000000..b240adcd8
--- /dev/null
+++ b/weed/admin/topology/task_management.go
@@ -0,0 +1,264 @@
+package topology
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+)
+
+// AssignTask moves a task from pending to assigned and reserves capacity
+func (at *ActiveTopology) AssignTask(taskID string) error {
+ at.mutex.Lock()
+ defer at.mutex.Unlock()
+
+ task, exists := at.pendingTasks[taskID]
+ if !exists {
+ return fmt.Errorf("pending task %s not found", taskID)
+ }
+
+ // Check if all destination disks have sufficient capacity to reserve
+ for _, dest := range task.Destinations {
+ targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk)
+ if targetDisk, exists := at.disks[targetKey]; exists {
+ availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk)
+
+ // Check if we have enough total capacity using the improved unified comparison
+ if !availableCapacity.CanAccommodate(dest.StorageChange) {
+ return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v",
+ dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange)
+ }
+ } else if dest.TargetServer != "" {
+ // Fail fast if destination disk is not found in topology
+ return fmt.Errorf("destination disk %s not found in topology", targetKey)
+ }
+ }
+
+ // Move task to assigned and reserve capacity
+ delete(at.pendingTasks, taskID)
+ task.Status = TaskStatusInProgress
+ at.assignedTasks[taskID] = task
+ at.reassignTaskStates()
+
+ // Log capacity reservation information for all sources and destinations
+ totalSourceImpact := StorageSlotChange{}
+ totalDestImpact := StorageSlotChange{}
+ for _, source := range task.Sources {
+ totalSourceImpact.AddInPlace(source.StorageChange)
+ }
+ for _, dest := range task.Destinations {
+ totalDestImpact.AddInPlace(dest.StorageChange)
+ }
+
+ glog.V(2).Infof("Task %s assigned and capacity reserved: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
+ taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
+ len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
+
+ return nil
+}
+
+// CompleteTask moves a task from assigned to recent and releases reserved capacity
+// NOTE: This only releases the reserved capacity. The actual topology update (VolumeCount changes)
+// should be handled by the master when it receives the task completion notification.
+func (at *ActiveTopology) CompleteTask(taskID string) error {
+ at.mutex.Lock()
+ defer at.mutex.Unlock()
+
+ task, exists := at.assignedTasks[taskID]
+ if !exists {
+ return fmt.Errorf("assigned task %s not found", taskID)
+ }
+
+ // Release reserved capacity by moving task to completed state
+ delete(at.assignedTasks, taskID)
+ task.Status = TaskStatusCompleted
+ task.CompletedAt = time.Now()
+ at.recentTasks[taskID] = task
+ at.reassignTaskStates()
+
+ // Log capacity release information for all sources and destinations
+ totalSourceImpact := StorageSlotChange{}
+ totalDestImpact := StorageSlotChange{}
+ for _, source := range task.Sources {
+ totalSourceImpact.AddInPlace(source.StorageChange)
+ }
+ for _, dest := range task.Destinations {
+ totalDestImpact.AddInPlace(dest.StorageChange)
+ }
+
+ glog.V(2).Infof("Task %s completed and capacity released: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
+ taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
+ len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
+
+ // Clean up old recent tasks
+ at.cleanupRecentTasks()
+
+ return nil
+}
+
+// ApplyActualStorageChange updates the topology to reflect actual storage changes after task completion
+// This should be called when the master updates the topology with new VolumeCount information
+func (at *ActiveTopology) ApplyActualStorageChange(nodeID string, diskID uint32, volumeCountChange int64) {
+ at.mutex.Lock()
+ defer at.mutex.Unlock()
+
+ diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
+ if disk, exists := at.disks[diskKey]; exists && disk.DiskInfo != nil && disk.DiskInfo.DiskInfo != nil {
+ oldCount := disk.DiskInfo.DiskInfo.VolumeCount
+ disk.DiskInfo.DiskInfo.VolumeCount += volumeCountChange
+
+ glog.V(2).Infof("Applied actual storage change on disk %s: volume_count %d -> %d (change: %+d)",
+ diskKey, oldCount, disk.DiskInfo.DiskInfo.VolumeCount, volumeCountChange)
+ }
+}
+
+// AddPendingTask is the unified function that handles both simple and complex task creation
+func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error {
+ // Validation
+ if len(spec.Sources) == 0 {
+ return fmt.Errorf("at least one source is required")
+ }
+ if len(spec.Destinations) == 0 {
+ return fmt.Errorf("at least one destination is required")
+ }
+
+ at.mutex.Lock()
+ defer at.mutex.Unlock()
+
+ // Build sources array
+ sources := make([]TaskSource, len(spec.Sources))
+ for i, sourceSpec := range spec.Sources {
+ var storageImpact StorageSlotChange
+ var estimatedSize int64
+
+ if sourceSpec.StorageImpact != nil {
+ // Use manually specified impact
+ storageImpact = *sourceSpec.StorageImpact
+ } else {
+ // Auto-calculate based on task type and cleanup type
+ storageImpact = at.calculateSourceStorageImpact(spec.TaskType, sourceSpec.CleanupType, spec.VolumeSize)
+ }
+
+ if sourceSpec.EstimatedSize != nil {
+ estimatedSize = *sourceSpec.EstimatedSize
+ } else {
+ estimatedSize = spec.VolumeSize // Default to volume size
+ }
+
+ sources[i] = TaskSource{
+ SourceServer: sourceSpec.ServerID,
+ SourceDisk: sourceSpec.DiskID,
+ StorageChange: storageImpact,
+ EstimatedSize: estimatedSize,
+ }
+ }
+
+ // Build destinations array
+ destinations := make([]TaskDestination, len(spec.Destinations))
+ for i, destSpec := range spec.Destinations {
+ var storageImpact StorageSlotChange
+ var estimatedSize int64
+
+ if destSpec.StorageImpact != nil {
+ // Use manually specified impact
+ storageImpact = *destSpec.StorageImpact
+ } else {
+ // Auto-calculate based on task type
+ _, storageImpact = CalculateTaskStorageImpact(spec.TaskType, spec.VolumeSize)
+ }
+
+ if destSpec.EstimatedSize != nil {
+ estimatedSize = *destSpec.EstimatedSize
+ } else {
+ estimatedSize = spec.VolumeSize // Default to volume size
+ }
+
+ destinations[i] = TaskDestination{
+ TargetServer: destSpec.ServerID,
+ TargetDisk: destSpec.DiskID,
+ StorageChange: storageImpact,
+ EstimatedSize: estimatedSize,
+ }
+ }
+
+ // Create the task
+ task := &taskState{
+ VolumeID: spec.VolumeID,
+ TaskType: spec.TaskType,
+ Status: TaskStatusPending,
+ StartedAt: time.Now(),
+ EstimatedSize: spec.VolumeSize,
+ Sources: sources,
+ Destinations: destinations,
+ }
+
+ at.pendingTasks[spec.TaskID] = task
+ at.assignTaskToDisk(task)
+
+ glog.V(2).Infof("Added pending %s task %s: volume %d, %d sources, %d destinations",
+ spec.TaskType, spec.TaskID, spec.VolumeID, len(sources), len(destinations))
+
+ return nil
+}
+
+// calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type
+func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange {
+ switch taskType {
+ case TaskTypeErasureCoding:
+ switch cleanupType {
+ case CleanupVolumeReplica:
+ impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
+ return impact
+ case CleanupECShards:
+ return CalculateECShardCleanupImpact(volumeSize)
+ default:
+ impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
+ return impact
+ }
+ default:
+ impact, _ := CalculateTaskStorageImpact(taskType, volumeSize)
+ return impact
+ }
+}
+
+// SourceCleanupType indicates what type of data needs to be cleaned up from a source
+type SourceCleanupType int
+
+const (
+ CleanupVolumeReplica SourceCleanupType = iota // Clean up volume replica (frees volume slots)
+ CleanupECShards // Clean up existing EC shards (frees shard slots)
+)
+
+// TaskSourceSpec represents a source specification for task creation
+type TaskSourceSpec struct {
+ ServerID string
+ DiskID uint32
+ CleanupType SourceCleanupType // For EC: volume replica vs existing shards
+ StorageImpact *StorageSlotChange // Optional: manual override
+ EstimatedSize *int64 // Optional: manual override
+}
+
+// TaskDestinationSpec represents a destination specification for task creation
+type TaskDestinationSpec struct {
+ ServerID string
+ DiskID uint32
+ StorageImpact *StorageSlotChange // Optional: manual override
+ EstimatedSize *int64 // Optional: manual override
+}
+
+// TaskSpec represents a complete task specification
+type TaskSpec struct {
+ TaskID string
+ TaskType TaskType
+ VolumeID uint32
+ VolumeSize int64 // Used for auto-calculation when manual impacts not provided
+ Sources []TaskSourceSpec // Can be single or multiple
+ Destinations []TaskDestinationSpec // Can be single or multiple
+}
+
+// TaskSourceLocation represents a source location for task creation (DEPRECATED: use TaskSourceSpec)
+type TaskSourceLocation struct {
+ ServerID string
+ DiskID uint32
+ CleanupType SourceCleanupType // What type of cleanup is needed
+}