aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/admin/topology/active_topology.go425
-rw-r--r--weed/admin/topology/active_topology_test.go154
-rw-r--r--weed/admin/topology/capacity.go300
-rw-r--r--weed/admin/topology/internal.go114
-rw-r--r--weed/admin/topology/storage_impact.go50
-rw-r--r--weed/admin/topology/storage_slot_test.go1004
-rw-r--r--weed/admin/topology/structs.go120
-rw-r--r--weed/admin/topology/task_management.go264
-rw-r--r--weed/admin/topology/topology_management.go253
-rw-r--r--weed/admin/topology/types.go97
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go5
-rw-r--r--weed/pb/worker.proto1
-rw-r--r--weed/pb/worker_pb/worker.pb.go13
-rw-r--r--weed/worker/tasks/balance/detection.go34
-rw-r--r--weed/worker/tasks/base/volume_utils.go36
-rw-r--r--weed/worker/tasks/erasure_coding/detection.go169
-rw-r--r--weed/worker/tasks/vacuum/detection.go6
-rw-r--r--weed/worker/types/task_types.go1
18 files changed, 2579 insertions, 467 deletions
diff --git a/weed/admin/topology/active_topology.go b/weed/admin/topology/active_topology.go
index bfa03a72f..e4ef5c5d0 100644
--- a/weed/admin/topology/active_topology.go
+++ b/weed/admin/topology/active_topology.go
@@ -1,98 +1,5 @@
package topology
-import (
- "fmt"
- "sync"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
-)
-
-// TaskType represents different types of maintenance operations
-type TaskType string
-
-// TaskStatus represents the current status of a task
-type TaskStatus string
-
-// Common task type constants
-const (
- TaskTypeVacuum TaskType = "vacuum"
- TaskTypeBalance TaskType = "balance"
- TaskTypeErasureCoding TaskType = "erasure_coding"
- TaskTypeReplication TaskType = "replication"
-)
-
-// Common task status constants
-const (
- TaskStatusPending TaskStatus = "pending"
- TaskStatusInProgress TaskStatus = "in_progress"
- TaskStatusCompleted TaskStatus = "completed"
-)
-
-// taskState represents the current state of tasks affecting the topology (internal)
-type taskState struct {
- VolumeID uint32 `json:"volume_id"`
- TaskType TaskType `json:"task_type"`
- SourceServer string `json:"source_server"`
- SourceDisk uint32 `json:"source_disk"`
- TargetServer string `json:"target_server,omitempty"`
- TargetDisk uint32 `json:"target_disk,omitempty"`
- Status TaskStatus `json:"status"`
- StartedAt time.Time `json:"started_at"`
- CompletedAt time.Time `json:"completed_at,omitempty"`
-}
-
-// DiskInfo represents a disk with its current state and ongoing tasks (public for external access)
-type DiskInfo struct {
- NodeID string `json:"node_id"`
- DiskID uint32 `json:"disk_id"`
- DiskType string `json:"disk_type"`
- DataCenter string `json:"data_center"`
- Rack string `json:"rack"`
- DiskInfo *master_pb.DiskInfo `json:"disk_info"`
- LoadCount int `json:"load_count"` // Number of active tasks
-}
-
-// activeDisk represents internal disk state (private)
-type activeDisk struct {
- *DiskInfo
- pendingTasks []*taskState
- assignedTasks []*taskState
- recentTasks []*taskState // Completed in last N seconds
-}
-
-// activeNode represents a node with its disks (private)
-type activeNode struct {
- nodeID string
- dataCenter string
- rack string
- nodeInfo *master_pb.DataNodeInfo
- disks map[uint32]*activeDisk // DiskID -> activeDisk
-}
-
-// ActiveTopology provides a real-time view of cluster state with task awareness
-type ActiveTopology struct {
- // Core topology from master
- topologyInfo *master_pb.TopologyInfo
- lastUpdated time.Time
-
- // Structured topology for easy access (private)
- nodes map[string]*activeNode // NodeID -> activeNode
- disks map[string]*activeDisk // "NodeID:DiskID" -> activeDisk
-
- // Task states affecting the topology (private)
- pendingTasks map[string]*taskState
- assignedTasks map[string]*taskState
- recentTasks map[string]*taskState
-
- // Configuration
- recentTaskWindowSeconds int
-
- // Synchronization
- mutex sync.RWMutex
-}
-
// NewActiveTopology creates a new ActiveTopology instance
func NewActiveTopology(recentTaskWindowSeconds int) *ActiveTopology {
if recentTaskWindowSeconds <= 0 {
@@ -102,339 +9,11 @@ func NewActiveTopology(recentTaskWindowSeconds int) *ActiveTopology {
return &ActiveTopology{
nodes: make(map[string]*activeNode),
disks: make(map[string]*activeDisk),
+ volumeIndex: make(map[uint32][]string),
+ ecShardIndex: make(map[uint32][]string),
pendingTasks: make(map[string]*taskState),
assignedTasks: make(map[string]*taskState),
recentTasks: make(map[string]*taskState),
recentTaskWindowSeconds: recentTaskWindowSeconds,
}
}
-
-// UpdateTopology updates the topology information from master
-func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error {
- at.mutex.Lock()
- defer at.mutex.Unlock()
-
- at.topologyInfo = topologyInfo
- at.lastUpdated = time.Now()
-
- // Rebuild structured topology
- at.nodes = make(map[string]*activeNode)
- at.disks = make(map[string]*activeDisk)
-
- for _, dc := range topologyInfo.DataCenterInfos {
- for _, rack := range dc.RackInfos {
- for _, nodeInfo := range rack.DataNodeInfos {
- node := &activeNode{
- nodeID: nodeInfo.Id,
- dataCenter: dc.Id,
- rack: rack.Id,
- nodeInfo: nodeInfo,
- disks: make(map[uint32]*activeDisk),
- }
-
- // Add disks for this node
- for diskType, diskInfo := range nodeInfo.DiskInfos {
- disk := &activeDisk{
- DiskInfo: &DiskInfo{
- NodeID: nodeInfo.Id,
- DiskID: diskInfo.DiskId,
- DiskType: diskType,
- DataCenter: dc.Id,
- Rack: rack.Id,
- DiskInfo: diskInfo,
- },
- }
-
- diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
- node.disks[diskInfo.DiskId] = disk
- at.disks[diskKey] = disk
- }
-
- at.nodes[nodeInfo.Id] = node
- }
- }
- }
-
- // Reassign task states to updated topology
- at.reassignTaskStates()
-
- glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks", len(at.nodes), len(at.disks))
- return nil
-}
-
-// AddPendingTask adds a pending task to the topology
-func (at *ActiveTopology) AddPendingTask(taskID string, taskType TaskType, volumeID uint32,
- sourceServer string, sourceDisk uint32, targetServer string, targetDisk uint32) {
- at.mutex.Lock()
- defer at.mutex.Unlock()
-
- task := &taskState{
- VolumeID: volumeID,
- TaskType: taskType,
- SourceServer: sourceServer,
- SourceDisk: sourceDisk,
- TargetServer: targetServer,
- TargetDisk: targetDisk,
- Status: TaskStatusPending,
- StartedAt: time.Now(),
- }
-
- at.pendingTasks[taskID] = task
- at.assignTaskToDisk(task)
-}
-
-// AssignTask moves a task from pending to assigned
-func (at *ActiveTopology) AssignTask(taskID string) error {
- at.mutex.Lock()
- defer at.mutex.Unlock()
-
- task, exists := at.pendingTasks[taskID]
- if !exists {
- return fmt.Errorf("pending task %s not found", taskID)
- }
-
- delete(at.pendingTasks, taskID)
- task.Status = TaskStatusInProgress
- at.assignedTasks[taskID] = task
- at.reassignTaskStates()
-
- return nil
-}
-
-// CompleteTask moves a task from assigned to recent
-func (at *ActiveTopology) CompleteTask(taskID string) error {
- at.mutex.Lock()
- defer at.mutex.Unlock()
-
- task, exists := at.assignedTasks[taskID]
- if !exists {
- return fmt.Errorf("assigned task %s not found", taskID)
- }
-
- delete(at.assignedTasks, taskID)
- task.Status = TaskStatusCompleted
- task.CompletedAt = time.Now()
- at.recentTasks[taskID] = task
- at.reassignTaskStates()
-
- // Clean up old recent tasks
- at.cleanupRecentTasks()
-
- return nil
-}
-
-// GetAvailableDisks returns disks that can accept new tasks of the given type
-func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
-
- var available []*DiskInfo
-
- for _, disk := range at.disks {
- if disk.NodeID == excludeNodeID {
- continue // Skip excluded node
- }
-
- if at.isDiskAvailable(disk, taskType) {
- // Create a copy with current load count
- diskCopy := *disk.DiskInfo
- diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
- available = append(available, &diskCopy)
- }
- }
-
- return available
-}
-
-// GetDiskLoad returns the current load on a disk (number of active tasks)
-func (at *ActiveTopology) GetDiskLoad(nodeID string, diskID uint32) int {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
-
- diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
- disk, exists := at.disks[diskKey]
- if !exists {
- return 0
- }
-
- return len(disk.pendingTasks) + len(disk.assignedTasks)
-}
-
-// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection)
-func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
-
- for _, task := range at.recentTasks {
- if task.VolumeID == volumeID && task.TaskType == taskType {
- return true
- }
- }
-
- return false
-}
-
-// GetAllNodes returns information about all nodes (public interface)
-func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
-
- result := make(map[string]*master_pb.DataNodeInfo)
- for nodeID, node := range at.nodes {
- result[nodeID] = node.nodeInfo
- }
- return result
-}
-
-// GetTopologyInfo returns the current topology information (read-only access)
-func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
- return at.topologyInfo
-}
-
-// GetNodeDisks returns all disks for a specific node
-func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo {
- at.mutex.RLock()
- defer at.mutex.RUnlock()
-
- node, exists := at.nodes[nodeID]
- if !exists {
- return nil
- }
-
- var disks []*DiskInfo
- for _, disk := range node.disks {
- diskCopy := *disk.DiskInfo
- diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
- disks = append(disks, &diskCopy)
- }
-
- return disks
-}
-
-// DestinationPlan represents a planned destination for a volume/shard operation
-type DestinationPlan struct {
- TargetNode string `json:"target_node"`
- TargetDisk uint32 `json:"target_disk"`
- TargetRack string `json:"target_rack"`
- TargetDC string `json:"target_dc"`
- ExpectedSize uint64 `json:"expected_size"`
- PlacementScore float64 `json:"placement_score"`
- Conflicts []string `json:"conflicts"`
-}
-
-// MultiDestinationPlan represents multiple planned destinations for operations like EC
-type MultiDestinationPlan struct {
- Plans []*DestinationPlan `json:"plans"`
- TotalShards int `json:"total_shards"`
- SuccessfulRack int `json:"successful_racks"`
- SuccessfulDCs int `json:"successful_dcs"`
-}
-
-// Private methods
-
-// reassignTaskStates assigns tasks to the appropriate disks
-func (at *ActiveTopology) reassignTaskStates() {
- // Clear existing task assignments
- for _, disk := range at.disks {
- disk.pendingTasks = nil
- disk.assignedTasks = nil
- disk.recentTasks = nil
- }
-
- // Reassign pending tasks
- for _, task := range at.pendingTasks {
- at.assignTaskToDisk(task)
- }
-
- // Reassign assigned tasks
- for _, task := range at.assignedTasks {
- at.assignTaskToDisk(task)
- }
-
- // Reassign recent tasks
- for _, task := range at.recentTasks {
- at.assignTaskToDisk(task)
- }
-}
-
-// assignTaskToDisk assigns a task to the appropriate disk(s)
-func (at *ActiveTopology) assignTaskToDisk(task *taskState) {
- // Assign to source disk
- sourceKey := fmt.Sprintf("%s:%d", task.SourceServer, task.SourceDisk)
- if sourceDisk, exists := at.disks[sourceKey]; exists {
- switch task.Status {
- case TaskStatusPending:
- sourceDisk.pendingTasks = append(sourceDisk.pendingTasks, task)
- case TaskStatusInProgress:
- sourceDisk.assignedTasks = append(sourceDisk.assignedTasks, task)
- case TaskStatusCompleted:
- sourceDisk.recentTasks = append(sourceDisk.recentTasks, task)
- }
- }
-
- // Assign to target disk if it exists and is different from source
- if task.TargetServer != "" && (task.TargetServer != task.SourceServer || task.TargetDisk != task.SourceDisk) {
- targetKey := fmt.Sprintf("%s:%d", task.TargetServer, task.TargetDisk)
- if targetDisk, exists := at.disks[targetKey]; exists {
- switch task.Status {
- case TaskStatusPending:
- targetDisk.pendingTasks = append(targetDisk.pendingTasks, task)
- case TaskStatusInProgress:
- targetDisk.assignedTasks = append(targetDisk.assignedTasks, task)
- case TaskStatusCompleted:
- targetDisk.recentTasks = append(targetDisk.recentTasks, task)
- }
- }
- }
-}
-
-// isDiskAvailable checks if a disk can accept new tasks
-func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool {
- // Check if disk has too many active tasks
- activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
- if activeLoad >= 2 { // Max 2 concurrent tasks per disk
- return false
- }
-
- // Check for conflicting task types
- for _, task := range disk.assignedTasks {
- if at.areTaskTypesConflicting(task.TaskType, taskType) {
- return false
- }
- }
-
- return true
-}
-
-// areTaskTypesConflicting checks if two task types conflict
-func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool {
- // Examples of conflicting task types
- conflictMap := map[TaskType][]TaskType{
- TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding},
- TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding},
- TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance},
- }
-
- if conflicts, exists := conflictMap[existing]; exists {
- for _, conflictType := range conflicts {
- if conflictType == new {
- return true
- }
- }
- }
-
- return false
-}
-
-// cleanupRecentTasks removes old recent tasks
-func (at *ActiveTopology) cleanupRecentTasks() {
- cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second)
-
- for taskID, task := range at.recentTasks {
- if task.CompletedAt.Before(cutoff) {
- delete(at.recentTasks, taskID)
- }
- }
-}
diff --git a/weed/admin/topology/active_topology_test.go b/weed/admin/topology/active_topology_test.go
index 4e8b0b3a8..9b0990f21 100644
--- a/weed/admin/topology/active_topology_test.go
+++ b/weed/admin/topology/active_topology_test.go
@@ -1,6 +1,7 @@
package topology
import (
+ "fmt"
"testing"
"time"
@@ -9,6 +10,16 @@ import (
"github.com/stretchr/testify/require"
)
+// Helper function to find a disk by ID for testing - reduces code duplication
+func findDiskByID(disks []*DiskInfo, diskID uint32) *DiskInfo {
+ for _, disk := range disks {
+ if disk.DiskID == diskID {
+ return disk
+ }
+ }
+ return nil
+}
+
// TestActiveTopologyBasicOperations tests basic topology management
func TestActiveTopologyBasicOperations(t *testing.T) {
topology := NewActiveTopology(10)
@@ -58,8 +69,19 @@ func TestTaskLifecycle(t *testing.T) {
taskID := "balance-001"
// 1. Add pending task
- topology.AddPendingTask(taskID, TaskTypeBalance, 1001,
- "10.0.0.1:8080", 0, "10.0.0.2:8080", 1)
+ err := topology.AddPendingTask(TaskSpec{
+ TaskID: taskID,
+ TaskType: TaskTypeBalance,
+ VolumeID: 1001,
+ VolumeSize: 1024 * 1024 * 1024,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "10.0.0.2:8080", DiskID: 1},
+ },
+ })
+ assert.NoError(t, err, "Should add pending task successfully")
// Verify pending state
assert.Equal(t, 1, len(topology.pendingTasks))
@@ -77,7 +99,7 @@ func TestTaskLifecycle(t *testing.T) {
assert.Equal(t, 1, len(targetDisk.pendingTasks))
// 2. Assign task
- err := topology.AssignTask(taskID)
+ err = topology.AssignTask(taskID)
require.NoError(t, err)
// Verify assigned state
@@ -258,8 +280,7 @@ func TestTargetSelectionScenarios(t *testing.T) {
assert.NotEqual(t, tt.excludeNode, disk.NodeID,
"Available disk should not be on excluded node")
- load := tt.topology.GetDiskLoad(disk.NodeID, disk.DiskID)
- assert.Less(t, load, 2, "Disk load should be less than 2")
+ assert.Less(t, disk.LoadCount, 2, "Disk load should be less than 2")
}
})
}
@@ -271,37 +292,65 @@ func TestDiskLoadCalculation(t *testing.T) {
topology.UpdateTopology(createSampleTopology())
// Initially no load
- load := topology.GetDiskLoad("10.0.0.1:8080", 0)
- assert.Equal(t, 0, load)
+ disks := topology.GetNodeDisks("10.0.0.1:8080")
+ targetDisk := findDiskByID(disks, 0)
+ require.NotNil(t, targetDisk, "Should find disk with ID 0")
+ assert.Equal(t, 0, targetDisk.LoadCount)
// Add pending task
- topology.AddPendingTask("task1", TaskTypeBalance, 1001,
- "10.0.0.1:8080", 0, "10.0.0.2:8080", 1)
+ err := topology.AddPendingTask(TaskSpec{
+ TaskID: "task1",
+ TaskType: TaskTypeBalance,
+ VolumeID: 1001,
+ VolumeSize: 1024 * 1024 * 1024,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "10.0.0.2:8080", DiskID: 1},
+ },
+ })
+ assert.NoError(t, err, "Should add pending task successfully")
// Check load increased
- load = topology.GetDiskLoad("10.0.0.1:8080", 0)
- assert.Equal(t, 1, load)
+ disks = topology.GetNodeDisks("10.0.0.1:8080")
+ targetDisk = findDiskByID(disks, 0)
+ assert.Equal(t, 1, targetDisk.LoadCount)
// Add another task to same disk
- topology.AddPendingTask("task2", TaskTypeVacuum, 1002,
- "10.0.0.1:8080", 0, "", 0)
+ err = topology.AddPendingTask(TaskSpec{
+ TaskID: "task2",
+ TaskType: TaskTypeVacuum,
+ VolumeID: 1002,
+ VolumeSize: 0,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "", DiskID: 0}, // Vacuum doesn't have a destination
+ },
+ })
+ assert.NoError(t, err, "Should add vacuum task successfully")
- load = topology.GetDiskLoad("10.0.0.1:8080", 0)
- assert.Equal(t, 2, load)
+ disks = topology.GetNodeDisks("10.0.0.1:8080")
+ targetDisk = findDiskByID(disks, 0)
+ assert.Equal(t, 2, targetDisk.LoadCount)
// Move one task to assigned
topology.AssignTask("task1")
// Load should still be 2 (1 pending + 1 assigned)
- load = topology.GetDiskLoad("10.0.0.1:8080", 0)
- assert.Equal(t, 2, load)
+ disks = topology.GetNodeDisks("10.0.0.1:8080")
+ targetDisk = findDiskByID(disks, 0)
+ assert.Equal(t, 2, targetDisk.LoadCount)
// Complete one task
topology.CompleteTask("task1")
// Load should decrease to 1
- load = topology.GetDiskLoad("10.0.0.1:8080", 0)
- assert.Equal(t, 1, load)
+ disks = topology.GetNodeDisks("10.0.0.1:8080")
+ targetDisk = findDiskByID(disks, 0)
+ assert.Equal(t, 1, targetDisk.LoadCount)
}
// TestTaskConflictDetection tests task conflict detection
@@ -310,8 +359,19 @@ func TestTaskConflictDetection(t *testing.T) {
topology.UpdateTopology(createSampleTopology())
// Add a balance task
- topology.AddPendingTask("balance1", TaskTypeBalance, 1001,
- "10.0.0.1:8080", 0, "10.0.0.2:8080", 1)
+ err := topology.AddPendingTask(TaskSpec{
+ TaskID: "balance1",
+ TaskType: TaskTypeBalance,
+ VolumeID: 1001,
+ VolumeSize: 1024 * 1024 * 1024,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "10.0.0.2:8080", DiskID: 1},
+ },
+ })
+ assert.NoError(t, err, "Should add balance task successfully")
topology.AssignTask("balance1")
// Try to get available disks for vacuum (conflicts with balance)
@@ -448,8 +508,22 @@ func createTopologyWithLoad() *ActiveTopology {
topology.UpdateTopology(createSampleTopology())
// Add some existing tasks to create load
- topology.AddPendingTask("existing1", TaskTypeVacuum, 2001,
- "10.0.0.1:8080", 0, "", 0)
+ err := topology.AddPendingTask(TaskSpec{
+ TaskID: "existing1",
+ TaskType: TaskTypeVacuum,
+ VolumeID: 2001,
+ VolumeSize: 0,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "", DiskID: 0}, // Vacuum doesn't have a destination
+ },
+ })
+ if err != nil {
+ // In test helper function, just log error instead of failing
+ fmt.Printf("Warning: Failed to add existing task: %v\n", err)
+ }
topology.AssignTask("existing1")
return topology
@@ -466,12 +540,38 @@ func createTopologyWithConflicts() *ActiveTopology {
topology.UpdateTopology(createSampleTopology())
// Add conflicting tasks
- topology.AddPendingTask("balance1", TaskTypeBalance, 3001,
- "10.0.0.1:8080", 0, "10.0.0.2:8080", 0)
+ err := topology.AddPendingTask(TaskSpec{
+ TaskID: "balance1",
+ TaskType: TaskTypeBalance,
+ VolumeID: 3001,
+ VolumeSize: 1024 * 1024 * 1024,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "10.0.0.2:8080", DiskID: 0},
+ },
+ })
+ if err != nil {
+ fmt.Printf("Warning: Failed to add balance task: %v\n", err)
+ }
topology.AssignTask("balance1")
- topology.AddPendingTask("ec1", TaskTypeErasureCoding, 3002,
- "10.0.0.1:8080", 1, "", 0)
+ err = topology.AddPendingTask(TaskSpec{
+ TaskID: "ec1",
+ TaskType: TaskTypeErasureCoding,
+ VolumeID: 3002,
+ VolumeSize: 1024 * 1024 * 1024,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 1},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "", DiskID: 0}, // EC doesn't have single destination
+ },
+ })
+ if err != nil {
+ fmt.Printf("Warning: Failed to add EC task: %v\n", err)
+ }
topology.AssignTask("ec1")
return topology
diff --git a/weed/admin/topology/capacity.go b/weed/admin/topology/capacity.go
new file mode 100644
index 000000000..a595ed369
--- /dev/null
+++ b/weed/admin/topology/capacity.go
@@ -0,0 +1,300 @@
+package topology
+
+import (
+ "fmt"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+)
+
+// GetEffectiveAvailableCapacity returns the effective available capacity for a disk
+// This considers BOTH pending and assigned tasks for capacity reservation.
+//
+// Formula: BaseAvailable - (VolumeSlots + ShardSlots/ShardsPerVolumeSlot) from all tasks
+//
+// The calculation includes:
+// - Pending tasks: Reserve capacity immediately when added
+// - Assigned tasks: Continue to reserve capacity during execution
+// - Recently completed tasks are NOT counted against capacity
+func (at *ActiveTopology) GetEffectiveAvailableCapacity(nodeID string, diskID uint32) int64 {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
+ disk, exists := at.disks[diskKey]
+ if !exists {
+ return 0
+ }
+
+ if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
+ return 0
+ }
+
+ // Use the same logic as getEffectiveAvailableCapacityUnsafe but with locking
+ capacity := at.getEffectiveAvailableCapacityUnsafe(disk)
+ return int64(capacity.VolumeSlots)
+}
+
+// GetEffectiveAvailableCapacityDetailed returns detailed available capacity as StorageSlotChange
+// This provides granular information about available volume slots and shard slots
+func (at *ActiveTopology) GetEffectiveAvailableCapacityDetailed(nodeID string, diskID uint32) StorageSlotChange {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
+ disk, exists := at.disks[diskKey]
+ if !exists {
+ return StorageSlotChange{}
+ }
+
+ if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
+ return StorageSlotChange{}
+ }
+
+ return at.getEffectiveAvailableCapacityUnsafe(disk)
+}
+
+// GetEffectiveCapacityImpact returns the StorageSlotChange impact for a disk
+// This shows the net impact from all pending and assigned tasks
+func (at *ActiveTopology) GetEffectiveCapacityImpact(nodeID string, diskID uint32) StorageSlotChange {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
+ disk, exists := at.disks[diskKey]
+ if !exists {
+ return StorageSlotChange{}
+ }
+
+ return at.getEffectiveCapacityUnsafe(disk)
+}
+
+// GetDisksWithEffectiveCapacity returns disks with sufficient effective capacity
+// This method considers BOTH pending and assigned tasks for capacity reservation using StorageSlotChange.
+//
+// Parameters:
+// - taskType: type of task to check compatibility for
+// - excludeNodeID: node to exclude from results
+// - minCapacity: minimum effective capacity required (in volume slots)
+//
+// Returns: DiskInfo objects where VolumeCount reflects capacity reserved by all tasks
+func (at *ActiveTopology) GetDisksWithEffectiveCapacity(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ var available []*DiskInfo
+
+ for _, disk := range at.disks {
+ if disk.NodeID == excludeNodeID {
+ continue // Skip excluded node
+ }
+
+ if at.isDiskAvailable(disk, taskType) {
+ effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
+
+ // Only include disks that meet minimum capacity requirement
+ if int64(effectiveCapacity.VolumeSlots) >= minCapacity {
+ // Create a new DiskInfo with current capacity information
+ diskCopy := DiskInfo{
+ NodeID: disk.DiskInfo.NodeID,
+ DiskID: disk.DiskInfo.DiskID,
+ DiskType: disk.DiskInfo.DiskType,
+ DataCenter: disk.DiskInfo.DataCenter,
+ Rack: disk.DiskInfo.Rack,
+ LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks), // Count all tasks
+ }
+
+ // Create a new protobuf DiskInfo to avoid modifying the original
+ diskInfoCopy := &master_pb.DiskInfo{
+ DiskId: disk.DiskInfo.DiskInfo.DiskId,
+ MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
+ VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(effectiveCapacity.VolumeSlots),
+ VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
+ EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
+ RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
+ ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
+ FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
+ }
+ diskCopy.DiskInfo = diskInfoCopy
+
+ available = append(available, &diskCopy)
+ }
+ }
+ }
+
+ return available
+}
+
+// GetDisksForPlanning returns disks considering both active and pending tasks for planning decisions
+// This helps avoid over-scheduling tasks to the same disk
+func (at *ActiveTopology) GetDisksForPlanning(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ var available []*DiskInfo
+
+ for _, disk := range at.disks {
+ if disk.NodeID == excludeNodeID {
+ continue // Skip excluded node
+ }
+
+ // Consider both pending and active tasks for scheduling decisions
+ if at.isDiskAvailableForPlanning(disk, taskType) {
+ // Check if disk can accommodate new task considering pending tasks
+ planningCapacity := at.getPlanningCapacityUnsafe(disk)
+
+ if int64(planningCapacity.VolumeSlots) >= minCapacity {
+ // Create a new DiskInfo with planning information
+ diskCopy := DiskInfo{
+ NodeID: disk.DiskInfo.NodeID,
+ DiskID: disk.DiskInfo.DiskID,
+ DiskType: disk.DiskInfo.DiskType,
+ DataCenter: disk.DiskInfo.DataCenter,
+ Rack: disk.DiskInfo.Rack,
+ LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks),
+ }
+
+ // Create a new protobuf DiskInfo to avoid modifying the original
+ diskInfoCopy := &master_pb.DiskInfo{
+ DiskId: disk.DiskInfo.DiskInfo.DiskId,
+ MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
+ VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(planningCapacity.VolumeSlots),
+ VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
+ EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
+ RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
+ ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
+ FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
+ }
+ diskCopy.DiskInfo = diskInfoCopy
+
+ available = append(available, &diskCopy)
+ }
+ }
+ }
+
+ return available
+}
+
+// CanAccommodateTask checks if a disk can accommodate a new task considering all constraints
+func (at *ActiveTopology) CanAccommodateTask(nodeID string, diskID uint32, taskType TaskType, volumesNeeded int64) bool {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
+ disk, exists := at.disks[diskKey]
+ if !exists {
+ return false
+ }
+
+ // Check basic availability
+ if !at.isDiskAvailable(disk, taskType) {
+ return false
+ }
+
+ // Check effective capacity
+ effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
+ return int64(effectiveCapacity.VolumeSlots) >= volumesNeeded
+}
+
+// getPlanningCapacityUnsafe considers both pending and active tasks for planning
+func (at *ActiveTopology) getPlanningCapacityUnsafe(disk *activeDisk) StorageSlotChange {
+ if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
+ return StorageSlotChange{}
+ }
+
+ baseAvailableVolumes := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
+
+ // Use the centralized helper function to calculate task storage impact
+ totalImpact := at.calculateTaskStorageImpact(disk)
+
+ // Calculate available capacity considering impact (negative impact reduces availability)
+ availableVolumeSlots := baseAvailableVolumes - totalImpact.ToVolumeSlots()
+ if availableVolumeSlots < 0 {
+ availableVolumeSlots = 0
+ }
+
+ // Return detailed capacity information
+ return StorageSlotChange{
+ VolumeSlots: int32(availableVolumeSlots),
+ ShardSlots: -totalImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
+ }
+}
+
+// isDiskAvailableForPlanning checks if disk can accept new tasks considering pending load
+func (at *ActiveTopology) isDiskAvailableForPlanning(disk *activeDisk, taskType TaskType) bool {
+ // Check total load including pending tasks
+ totalLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
+ if totalLoad >= MaxTotalTaskLoadPerDisk {
+ return false
+ }
+
+ // Check for conflicting task types in active tasks only
+ for _, task := range disk.assignedTasks {
+ if at.areTaskTypesConflicting(task.TaskType, taskType) {
+ return false
+ }
+ }
+
+ return true
+}
+
+// calculateTaskStorageImpact is a helper function that calculates the total storage impact
+// from all tasks (pending and assigned) on a given disk. This eliminates code duplication
+// between multiple capacity calculation functions.
+func (at *ActiveTopology) calculateTaskStorageImpact(disk *activeDisk) StorageSlotChange {
+ if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
+ return StorageSlotChange{}
+ }
+
+ totalImpact := StorageSlotChange{}
+
+ // Process both pending and assigned tasks with identical logic
+ taskLists := [][]*taskState{disk.pendingTasks, disk.assignedTasks}
+
+ for _, taskList := range taskLists {
+ for _, task := range taskList {
+ // Calculate impact for all source locations
+ for _, source := range task.Sources {
+ if source.SourceServer == disk.NodeID && source.SourceDisk == disk.DiskID {
+ totalImpact.AddInPlace(source.StorageChange)
+ }
+ }
+
+ // Calculate impact for all destination locations
+ for _, dest := range task.Destinations {
+ if dest.TargetServer == disk.NodeID && dest.TargetDisk == disk.DiskID {
+ totalImpact.AddInPlace(dest.StorageChange)
+ }
+ }
+ }
+ }
+
+ return totalImpact
+}
+
+// getEffectiveCapacityUnsafe returns effective capacity impact without locking (for internal use)
+// Returns StorageSlotChange representing the net impact from all tasks
+func (at *ActiveTopology) getEffectiveCapacityUnsafe(disk *activeDisk) StorageSlotChange {
+ return at.calculateTaskStorageImpact(disk)
+}
+
+// getEffectiveAvailableCapacityUnsafe returns detailed available capacity as StorageSlotChange
+func (at *ActiveTopology) getEffectiveAvailableCapacityUnsafe(disk *activeDisk) StorageSlotChange {
+ if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
+ return StorageSlotChange{}
+ }
+
+ baseAvailable := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
+ netImpact := at.getEffectiveCapacityUnsafe(disk)
+
+ // Calculate available volume slots (negative impact reduces availability)
+ availableVolumeSlots := baseAvailable - netImpact.ToVolumeSlots()
+ if availableVolumeSlots < 0 {
+ availableVolumeSlots = 0
+ }
+
+ // Return detailed capacity information
+ return StorageSlotChange{
+ VolumeSlots: int32(availableVolumeSlots),
+ ShardSlots: -netImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
+ }
+}
diff --git a/weed/admin/topology/internal.go b/weed/admin/topology/internal.go
new file mode 100644
index 000000000..72e37f6c1
--- /dev/null
+++ b/weed/admin/topology/internal.go
@@ -0,0 +1,114 @@
+package topology
+
+import (
+ "fmt"
+ "time"
+)
+
+// reassignTaskStates assigns tasks to the appropriate disks
+func (at *ActiveTopology) reassignTaskStates() {
+ // Clear existing task assignments
+ for _, disk := range at.disks {
+ disk.pendingTasks = nil
+ disk.assignedTasks = nil
+ disk.recentTasks = nil
+ }
+
+ // Reassign pending tasks
+ for _, task := range at.pendingTasks {
+ at.assignTaskToDisk(task)
+ }
+
+ // Reassign assigned tasks
+ for _, task := range at.assignedTasks {
+ at.assignTaskToDisk(task)
+ }
+
+ // Reassign recent tasks
+ for _, task := range at.recentTasks {
+ at.assignTaskToDisk(task)
+ }
+}
+
+// assignTaskToDisk assigns a task to the appropriate disk(s)
+func (at *ActiveTopology) assignTaskToDisk(task *taskState) {
+ addedDisks := make(map[string]bool)
+
+ // Local helper function to assign task to a disk and avoid code duplication
+ assign := func(server string, diskID uint32) {
+ key := fmt.Sprintf("%s:%d", server, diskID)
+ if server == "" || addedDisks[key] {
+ return
+ }
+ if disk, exists := at.disks[key]; exists {
+ switch task.Status {
+ case TaskStatusPending:
+ disk.pendingTasks = append(disk.pendingTasks, task)
+ case TaskStatusInProgress:
+ disk.assignedTasks = append(disk.assignedTasks, task)
+ case TaskStatusCompleted:
+ disk.recentTasks = append(disk.recentTasks, task)
+ }
+ addedDisks[key] = true
+ }
+ }
+
+ // Assign to all source disks
+ for _, source := range task.Sources {
+ assign(source.SourceServer, source.SourceDisk)
+ }
+
+ // Assign to all destination disks (duplicates automatically avoided by helper)
+ for _, dest := range task.Destinations {
+ assign(dest.TargetServer, dest.TargetDisk)
+ }
+}
+
+// isDiskAvailable checks if a disk can accept new tasks
+func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool {
+ // Check if disk has too many pending and active tasks
+ activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
+ if activeLoad >= MaxConcurrentTasksPerDisk {
+ return false
+ }
+
+ // Check for conflicting task types
+ for _, task := range disk.assignedTasks {
+ if at.areTaskTypesConflicting(task.TaskType, taskType) {
+ return false
+ }
+ }
+
+ return true
+}
+
+// areTaskTypesConflicting checks if two task types conflict
+func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool {
+ // Examples of conflicting task types
+ conflictMap := map[TaskType][]TaskType{
+ TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding},
+ TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding},
+ TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance},
+ }
+
+ if conflicts, exists := conflictMap[existing]; exists {
+ for _, conflictType := range conflicts {
+ if conflictType == new {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// cleanupRecentTasks removes old recent tasks
+func (at *ActiveTopology) cleanupRecentTasks() {
+ cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second)
+
+ for taskID, task := range at.recentTasks {
+ if task.CompletedAt.Before(cutoff) {
+ delete(at.recentTasks, taskID)
+ }
+ }
+}
diff --git a/weed/admin/topology/storage_impact.go b/weed/admin/topology/storage_impact.go
new file mode 100644
index 000000000..e325fc9cf
--- /dev/null
+++ b/weed/admin/topology/storage_impact.go
@@ -0,0 +1,50 @@
+package topology
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+)
+
+// CalculateTaskStorageImpact calculates storage impact for different task types
+func CalculateTaskStorageImpact(taskType TaskType, volumeSize int64) (sourceChange, targetChange StorageSlotChange) {
+ switch taskType {
+ case TaskTypeErasureCoding:
+ // EC task: distributes shards to MULTIPLE targets, source reserves with zero impact
+ // Source reserves capacity but with zero StorageSlotChange (no actual capacity consumption during planning)
+ // WARNING: EC has multiple targets! Use AddPendingTask with multiple destinations for proper multi-target handling
+ // This simplified function returns zero impact; real EC requires specialized multi-destination calculation
+ return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}
+
+ case TaskTypeBalance:
+ // Balance task: moves volume from source to target
+ // Source loses 1 volume, target gains 1 volume
+ return StorageSlotChange{VolumeSlots: -1, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0}
+
+ case TaskTypeVacuum:
+ // Vacuum task: frees space by removing deleted entries, no slot change
+ return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}
+
+ case TaskTypeReplication:
+ // Replication task: creates new replica on target
+ return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0}
+
+ default:
+ // Unknown task type, assume minimal impact
+ glog.Warningf("unhandled task type %s in CalculateTaskStorageImpact, assuming default impact", taskType)
+ return StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, StorageSlotChange{VolumeSlots: 1, ShardSlots: 0}
+ }
+}
+
+// CalculateECShardStorageImpact calculates storage impact for EC shards specifically
+func CalculateECShardStorageImpact(shardCount int32, expectedShardSize int64) StorageSlotChange {
+ // EC shards are typically much smaller than full volumes
+ // Use shard-level tracking for granular capacity planning
+ return StorageSlotChange{VolumeSlots: 0, ShardSlots: shardCount}
+}
+
+// CalculateECShardCleanupImpact calculates storage impact for cleaning up existing EC shards
+func CalculateECShardCleanupImpact(originalVolumeSize int64) StorageSlotChange {
+ // Cleaning up existing EC shards frees shard slots
+ // Use the actual EC configuration constants for accurate shard count
+ return StorageSlotChange{VolumeSlots: 0, ShardSlots: -int32(erasure_coding.TotalShardsCount)} // Negative = freed capacity
+}
diff --git a/weed/admin/topology/storage_slot_test.go b/weed/admin/topology/storage_slot_test.go
new file mode 100644
index 000000000..5a0ed3ce5
--- /dev/null
+++ b/weed/admin/topology/storage_slot_test.go
@@ -0,0 +1,1004 @@
+package topology
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+ "github.com/stretchr/testify/assert"
+)
+
+// NOTE: These tests are designed to work with any value of erasure_coding.DataShardsCount.
+// This ensures compatibility with custom erasure coding configurations where DataShardsCount
+// might be changed from the default value of 10. All shard-to-volume conversion calculations
+// are done dynamically using the actual constant value.
+
+// testGetDiskStorageImpact is a test helper that provides the same interface as the removed
+// GetDiskStorageImpact method. For simplicity, it returns the total impact as "planned"
+// and zeros for "reserved" since the distinction is not critical for most test scenarios.
+func testGetDiskStorageImpact(at *ActiveTopology, nodeID string, diskID uint32) (plannedVolumeSlots, reservedVolumeSlots int64, plannedShardSlots, reservedShardSlots int32, estimatedSize int64) {
+ impact := at.GetEffectiveCapacityImpact(nodeID, diskID)
+ // Return total impact as "planned" for test compatibility
+ return int64(impact.VolumeSlots), 0, impact.ShardSlots, 0, 0
+}
+
+// TestStorageSlotChangeArithmetic tests the arithmetic operations on StorageSlotChange
+func TestStorageSlotChangeArithmetic(t *testing.T) {
+ // Test basic arithmetic operations
+ a := StorageSlotChange{VolumeSlots: 5, ShardSlots: 10}
+ b := StorageSlotChange{VolumeSlots: 3, ShardSlots: 8}
+
+ // Test Add
+ sum := a.Add(b)
+ assert.Equal(t, StorageSlotChange{VolumeSlots: 8, ShardSlots: 18}, sum, "Add should work correctly")
+
+ // Test Subtract
+ diff := a.Subtract(b)
+ assert.Equal(t, StorageSlotChange{VolumeSlots: 2, ShardSlots: 2}, diff, "Subtract should work correctly")
+
+ // Test AddInPlace
+ c := StorageSlotChange{VolumeSlots: 1, ShardSlots: 2}
+ c.AddInPlace(b)
+ assert.Equal(t, StorageSlotChange{VolumeSlots: 4, ShardSlots: 10}, c, "AddInPlace should modify in place")
+
+ // Test SubtractInPlace
+ d := StorageSlotChange{VolumeSlots: 10, ShardSlots: 20}
+ d.SubtractInPlace(b)
+ assert.Equal(t, StorageSlotChange{VolumeSlots: 7, ShardSlots: 12}, d, "SubtractInPlace should modify in place")
+
+ // Test IsZero
+ zero := StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}
+ nonZero := StorageSlotChange{VolumeSlots: 1, ShardSlots: 0}
+ assert.True(t, zero.IsZero(), "Zero struct should return true for IsZero")
+ assert.False(t, nonZero.IsZero(), "Non-zero struct should return false for IsZero")
+
+ // Test ToVolumeSlots conversion
+ impact1 := StorageSlotChange{VolumeSlots: 5, ShardSlots: 10}
+ assert.Equal(t, int64(6), impact1.ToVolumeSlots(), fmt.Sprintf("ToVolumeSlots should be 5 + 10/%d = 6", erasure_coding.DataShardsCount))
+
+ impact2 := StorageSlotChange{VolumeSlots: -2, ShardSlots: 25}
+ assert.Equal(t, int64(0), impact2.ToVolumeSlots(), fmt.Sprintf("ToVolumeSlots should be -2 + 25/%d = 0", erasure_coding.DataShardsCount))
+
+ impact3 := StorageSlotChange{VolumeSlots: 3, ShardSlots: 7}
+ assert.Equal(t, int64(3), impact3.ToVolumeSlots(), fmt.Sprintf("ToVolumeSlots should be 3 + 7/%d = 3 (integer division)", erasure_coding.DataShardsCount))
+}
+
+// TestStorageSlotChange tests the new dual-level storage slot tracking
+func TestStorageSlotChange(t *testing.T) {
+ activeTopology := NewActiveTopology(10)
+
+ // Create test topology
+ topologyInfo := &master_pb.TopologyInfo{
+ DataCenterInfos: []*master_pb.DataCenterInfo{
+ {
+ Id: "dc1",
+ RackInfos: []*master_pb.RackInfo{
+ {
+ Id: "rack1",
+ DataNodeInfos: []*master_pb.DataNodeInfo{
+ {
+ Id: "10.0.0.1:8080",
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "hdd": {
+ DiskId: 0,
+ Type: "hdd",
+ VolumeCount: 5,
+ MaxVolumeCount: 20,
+ },
+ },
+ },
+ {
+ Id: "10.0.0.2:8080",
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "hdd": {
+ DiskId: 0,
+ Type: "hdd",
+ VolumeCount: 8,
+ MaxVolumeCount: 15,
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+
+ activeTopology.UpdateTopology(topologyInfo)
+
+ // Test 1: Basic storage slot calculation
+ ecSourceChange, ecTargetChange := CalculateTaskStorageImpact(TaskTypeErasureCoding, 1024*1024*1024)
+ assert.Equal(t, int32(0), ecSourceChange.VolumeSlots, "EC source reserves with zero StorageSlotChange impact")
+ assert.Equal(t, int32(0), ecSourceChange.ShardSlots, "EC source should have zero shard impact")
+ assert.Equal(t, int32(0), ecTargetChange.VolumeSlots, "EC should not directly impact target volume slots")
+ assert.Equal(t, int32(0), ecTargetChange.ShardSlots, "EC target should have zero shard impact from this simplified function")
+
+ balSourceChange, balTargetChange := CalculateTaskStorageImpact(TaskTypeBalance, 1024*1024*1024)
+ assert.Equal(t, int32(-1), balSourceChange.VolumeSlots, "Balance should free 1 volume slot on source")
+ assert.Equal(t, int32(1), balTargetChange.VolumeSlots, "Balance should consume 1 volume slot on target")
+
+ // Test 2: EC shard impact calculation
+ shardImpact := CalculateECShardStorageImpact(3, 100*1024*1024) // 3 shards, 100MB each
+ assert.Equal(t, int32(0), shardImpact.VolumeSlots, "EC shards should not impact volume slots")
+ assert.Equal(t, int32(3), shardImpact.ShardSlots, "EC should impact 3 shard slots")
+
+ // Test 3: Add EC task with shard-level tracking
+ sourceServer := "10.0.0.1:8080"
+ sourceDisk := uint32(0)
+ shardDestinations := []string{"10.0.0.2:8080", "10.0.0.2:8080"}
+ shardDiskIDs := []uint32{0, 0}
+
+ expectedShardSize := int64(50 * 1024 * 1024) // 50MB per shard
+ originalVolumeSize := int64(1024 * 1024 * 1024) // 1GB original
+
+ // Create source specs (single replica in this test)
+ sources := []TaskSourceSpec{
+ {ServerID: sourceServer, DiskID: sourceDisk, CleanupType: CleanupVolumeReplica},
+ }
+
+ // Create destination specs
+ destinations := make([]TaskDestinationSpec, len(shardDestinations))
+ shardImpact = CalculateECShardStorageImpact(1, expectedShardSize)
+ for i, dest := range shardDestinations {
+ destinations[i] = TaskDestinationSpec{
+ ServerID: dest,
+ DiskID: shardDiskIDs[i],
+ StorageImpact: &shardImpact,
+ EstimatedSize: &expectedShardSize,
+ }
+ }
+
+ err := activeTopology.AddPendingTask(TaskSpec{
+ TaskID: "ec_test",
+ TaskType: TaskTypeErasureCoding,
+ VolumeID: 100,
+ VolumeSize: originalVolumeSize,
+ Sources: sources,
+ Destinations: destinations,
+ })
+ assert.NoError(t, err, "Should add EC shard task successfully")
+
+ // Test 4: Check storage impact on source (EC reserves with zero impact)
+ sourceImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.1:8080", 0)
+ assert.Equal(t, int32(0), sourceImpact.VolumeSlots, "Source should show 0 volume slot impact (EC reserves with zero impact)")
+ assert.Equal(t, int32(0), sourceImpact.ShardSlots, "Source should show 0 shard slot impact")
+
+ // Test 5: Check storage impact on target (should gain shards)
+ targetImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.2:8080", 0)
+ assert.Equal(t, int32(0), targetImpact.VolumeSlots, "Target should show 0 volume slot impact (EC shards don't use volume slots)")
+ assert.Equal(t, int32(2), targetImpact.ShardSlots, "Target should show 2 shard slot impact")
+
+ // Test 6: Check effective capacity calculation (EC source reserves with zero StorageSlotChange)
+ sourceCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
+ targetCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
+
+ // Source: 15 original available (EC source reserves with zero StorageSlotChange impact)
+ assert.Equal(t, int64(15), sourceCapacity, "Source should have 15 available slots (EC source has zero StorageSlotChange impact)")
+
+ // Target: 7 original available - (2 shards / 10) = 7 (since 2/10 rounds down to 0)
+ assert.Equal(t, int64(7), targetCapacity, "Target should have 7 available slots (minimal shard impact)")
+
+ // Test 7: Add traditional balance task for comparison
+ err = activeTopology.AddPendingTask(TaskSpec{
+ TaskID: "balance_test",
+ TaskType: TaskTypeBalance,
+ VolumeID: 101,
+ VolumeSize: 512 * 1024 * 1024,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "10.0.0.2:8080", DiskID: 0},
+ },
+ })
+ assert.NoError(t, err, "Should add balance task successfully")
+
+ // Check updated impacts after adding balance task
+ finalSourceImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.1:8080", 0)
+ finalTargetImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.2:8080", 0)
+
+ assert.Equal(t, int32(-1), finalSourceImpact.VolumeSlots, "Source should show -1 volume slot impact (EC: 0, Balance: -1)")
+ assert.Equal(t, int32(1), finalTargetImpact.VolumeSlots, "Target should show 1 volume slot impact (Balance: +1)")
+ assert.Equal(t, int32(2), finalTargetImpact.ShardSlots, "Target should still show 2 shard slot impact (EC shards)")
+}
+
+// TestStorageSlotChangeCapacityCalculation tests the capacity calculation with mixed slot types
+func TestStorageSlotChangeCapacityCalculation(t *testing.T) {
+ activeTopology := NewActiveTopology(10)
+
+ // Create simple topology
+ topologyInfo := &master_pb.TopologyInfo{
+ DataCenterInfos: []*master_pb.DataCenterInfo{
+ {
+ Id: "dc1",
+ RackInfos: []*master_pb.RackInfo{
+ {
+ Id: "rack1",
+ DataNodeInfos: []*master_pb.DataNodeInfo{
+ {
+ Id: "10.0.0.1:8080",
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "hdd": {
+ DiskId: 0,
+ Type: "hdd",
+ VolumeCount: 10,
+ MaxVolumeCount: 100, // Large capacity for testing
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+
+ activeTopology.UpdateTopology(topologyInfo)
+
+ // Initial capacity
+ initialCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
+ assert.Equal(t, int64(90), initialCapacity, "Should start with 90 available slots")
+
+ // Add tasks with different shard slot impacts
+ targetImpact1 := StorageSlotChange{VolumeSlots: 0, ShardSlots: 5} // Target gains 5 shards
+ estimatedSize1 := int64(100 * 1024 * 1024)
+ err := activeTopology.AddPendingTask(TaskSpec{
+ TaskID: "shard_test_1",
+ TaskType: TaskTypeErasureCoding,
+ VolumeID: 100,
+ VolumeSize: estimatedSize1,
+ Sources: []TaskSourceSpec{
+ {ServerID: "", DiskID: 0}, // Source not applicable here
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0, StorageImpact: &targetImpact1, EstimatedSize: &estimatedSize1},
+ },
+ })
+ assert.NoError(t, err, "Should add shard test 1 successfully")
+
+ // Capacity should be reduced by pending tasks via StorageSlotChange
+ capacityAfterShards := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
+ // Dynamic calculation: 5 shards < DataShardsCount, so no volume impact
+ expectedImpact5 := int64(5 / erasure_coding.DataShardsCount) // Should be 0 for any reasonable DataShardsCount
+ assert.Equal(t, int64(90-expectedImpact5), capacityAfterShards, fmt.Sprintf("5 shard slots should consume %d volume slot equivalent (5/%d = %d)", expectedImpact5, erasure_coding.DataShardsCount, expectedImpact5))
+
+ // Add more shards to reach threshold
+ additionalShards := int32(erasure_coding.DataShardsCount) // Add exactly one volume worth of shards
+ targetImpact2 := StorageSlotChange{VolumeSlots: 0, ShardSlots: additionalShards} // Target gains additional shards
+ estimatedSize2 := int64(100 * 1024 * 1024)
+ err = activeTopology.AddPendingTask(TaskSpec{
+ TaskID: "shard_test_2",
+ TaskType: TaskTypeErasureCoding,
+ VolumeID: 101,
+ VolumeSize: estimatedSize2,
+ Sources: []TaskSourceSpec{
+ {ServerID: "", DiskID: 0}, // Source not applicable here
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0, StorageImpact: &targetImpact2, EstimatedSize: &estimatedSize2},
+ },
+ })
+ assert.NoError(t, err, "Should add shard test 2 successfully")
+
+ // Dynamic calculation: (5 + DataShardsCount) shards should consume 1 volume slot
+ totalShards := 5 + erasure_coding.DataShardsCount
+ expectedImpact15 := int64(totalShards / erasure_coding.DataShardsCount) // Should be 1
+ capacityAfterMoreShards := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
+ assert.Equal(t, int64(90-expectedImpact15), capacityAfterMoreShards, fmt.Sprintf("%d shard slots should consume %d volume slot equivalent (%d/%d = %d)", totalShards, expectedImpact15, totalShards, erasure_coding.DataShardsCount, expectedImpact15))
+
+ // Add a full volume task
+ targetImpact3 := StorageSlotChange{VolumeSlots: 1, ShardSlots: 0} // Target gains 1 volume
+ estimatedSize3 := int64(1024 * 1024 * 1024)
+ err = activeTopology.AddPendingTask(TaskSpec{
+ TaskID: "volume_test",
+ TaskType: TaskTypeBalance,
+ VolumeID: 102,
+ VolumeSize: estimatedSize3,
+ Sources: []TaskSourceSpec{
+ {ServerID: "", DiskID: 0}, // Source not applicable here
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0, StorageImpact: &targetImpact3, EstimatedSize: &estimatedSize3},
+ },
+ })
+ assert.NoError(t, err, "Should add volume test successfully")
+
+ // Capacity should be reduced by 1 more volume slot
+ finalCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
+ assert.Equal(t, int64(88), finalCapacity, "1 volume + 15 shard slots should consume 2 volume slots total")
+
+ // Verify the detailed storage impact
+ plannedVol, reservedVol, plannedShard, reservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.1:8080", 0)
+ assert.Equal(t, int64(1), plannedVol, "Should show 1 planned volume slot")
+ assert.Equal(t, int64(0), reservedVol, "Should show 0 reserved volume slots")
+ assert.Equal(t, int32(15), plannedShard, "Should show 15 planned shard slots")
+ assert.Equal(t, int32(0), reservedShard, "Should show 0 reserved shard slots")
+}
+
+// TestECMultipleTargets demonstrates proper handling of EC operations with multiple targets
+func TestECMultipleTargets(t *testing.T) {
+ activeTopology := NewActiveTopology(10)
+
+ // Create test topology with multiple target nodes
+ topologyInfo := &master_pb.TopologyInfo{
+ DataCenterInfos: []*master_pb.DataCenterInfo{
+ {
+ Id: "dc1",
+ RackInfos: []*master_pb.RackInfo{
+ {
+ Id: "rack1",
+ DataNodeInfos: []*master_pb.DataNodeInfo{
+ {
+ Id: "10.0.0.1:8080", // Source
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 10, MaxVolumeCount: 50},
+ },
+ },
+ {
+ Id: "10.0.0.2:8080", // Target 1
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 5, MaxVolumeCount: 30},
+ },
+ },
+ {
+ Id: "10.0.0.3:8080", // Target 2
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 8, MaxVolumeCount: 40},
+ },
+ },
+ {
+ Id: "10.0.0.4:8080", // Target 3
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 12, MaxVolumeCount: 35},
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+
+ activeTopology.UpdateTopology(topologyInfo)
+
+ // Demonstrate why CalculateTaskStorageImpact is insufficient for EC
+ sourceChange, targetChange := CalculateTaskStorageImpact(TaskTypeErasureCoding, 1*1024*1024*1024)
+ assert.Equal(t, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, sourceChange, "Source reserves with zero StorageSlotChange")
+ assert.Equal(t, StorageSlotChange{VolumeSlots: 0, ShardSlots: 0}, targetChange, "Target has zero impact from simplified function - insufficient for multi-target EC")
+
+ // Proper way: Use AddPendingTask for multiple targets
+ sourceServer := "10.0.0.1:8080"
+ sourceDisk := uint32(0)
+
+ // EC typically distributes shards across multiple targets
+ shardDestinations := []string{
+ "10.0.0.2:8080", "10.0.0.2:8080", "10.0.0.2:8080", "10.0.0.2:8080", "10.0.0.2:8080", // 5 shards to target 1
+ "10.0.0.3:8080", "10.0.0.3:8080", "10.0.0.3:8080", "10.0.0.3:8080", "10.0.0.3:8080", // 5 shards to target 2
+ "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", // 4 shards to target 3
+ }
+ shardDiskIDs := make([]uint32, len(shardDestinations))
+ for i := range shardDiskIDs {
+ shardDiskIDs[i] = 0
+ }
+
+ // Create source specs (single replica in this test)
+ sources := []TaskSourceSpec{
+ {ServerID: sourceServer, DiskID: sourceDisk, CleanupType: CleanupVolumeReplica},
+ }
+
+ // Create destination specs
+ destinations := make([]TaskDestinationSpec, len(shardDestinations))
+ expectedShardSize := int64(50 * 1024 * 1024)
+ shardImpact := CalculateECShardStorageImpact(1, expectedShardSize)
+ for i, dest := range shardDestinations {
+ destinations[i] = TaskDestinationSpec{
+ ServerID: dest,
+ DiskID: shardDiskIDs[i],
+ StorageImpact: &shardImpact,
+ EstimatedSize: &expectedShardSize,
+ }
+ }
+
+ err := activeTopology.AddPendingTask(TaskSpec{
+ TaskID: "ec_multi_target",
+ TaskType: TaskTypeErasureCoding,
+ VolumeID: 200,
+ VolumeSize: 1 * 1024 * 1024 * 1024,
+ Sources: sources,
+ Destinations: destinations,
+ })
+ assert.NoError(t, err, "Should add multi-target EC task successfully")
+
+ // Verify source impact (EC reserves with zero StorageSlotChange)
+ sourcePlannedVol, sourceReservedVol, sourcePlannedShard, sourceReservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.1:8080", 0)
+ assert.Equal(t, int64(0), sourcePlannedVol, "Source should reserve with zero volume slot impact")
+ assert.Equal(t, int64(0), sourceReservedVol, "Source should not have reserved capacity yet")
+ assert.Equal(t, int32(0), sourcePlannedShard, "Source should not have planned shard impact")
+ assert.Equal(t, int32(0), sourceReservedShard, "Source should not have reserved shard impact")
+ // Note: EstimatedSize tracking is no longer exposed via public API
+
+ // Verify target impacts (planned, not yet reserved)
+ target1PlannedVol, target1ReservedVol, target1PlannedShard, target1ReservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.2:8080", 0)
+ target2PlannedVol, target2ReservedVol, target2PlannedShard, target2ReservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.3:8080", 0)
+ target3PlannedVol, target3ReservedVol, target3PlannedShard, target3ReservedShard, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.4:8080", 0)
+
+ assert.Equal(t, int64(0), target1PlannedVol, "Target 1 should not have planned volume impact")
+ assert.Equal(t, int32(5), target1PlannedShard, "Target 1 should plan to receive 5 shards")
+ assert.Equal(t, int64(0), target1ReservedVol, "Target 1 should not have reserved capacity yet")
+ assert.Equal(t, int32(0), target1ReservedShard, "Target 1 should not have reserved shards yet")
+
+ assert.Equal(t, int64(0), target2PlannedVol, "Target 2 should not have planned volume impact")
+ assert.Equal(t, int32(5), target2PlannedShard, "Target 2 should plan to receive 5 shards")
+ assert.Equal(t, int64(0), target2ReservedVol, "Target 2 should not have reserved capacity yet")
+ assert.Equal(t, int32(0), target2ReservedShard, "Target 2 should not have reserved shards yet")
+
+ assert.Equal(t, int64(0), target3PlannedVol, "Target 3 should not have planned volume impact")
+ assert.Equal(t, int32(4), target3PlannedShard, "Target 3 should plan to receive 4 shards")
+ assert.Equal(t, int64(0), target3ReservedVol, "Target 3 should not have reserved capacity yet")
+ assert.Equal(t, int32(0), target3ReservedShard, "Target 3 should not have reserved shards yet")
+
+ // Verify effective capacity (considers both pending and active tasks via StorageSlotChange)
+ sourceCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
+ target1Capacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
+ target2Capacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.3:8080", 0)
+ target3Capacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.4:8080", 0)
+
+ // Dynamic capacity calculations based on actual DataShardsCount
+ expectedTarget1Impact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact
+ expectedTarget2Impact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact
+ expectedTarget3Impact := int64(4 / erasure_coding.DataShardsCount) // 4 shards impact
+
+ assert.Equal(t, int64(40), sourceCapacity, "Source: 40 (EC source reserves with zero StorageSlotChange impact)")
+ assert.Equal(t, int64(25-expectedTarget1Impact), target1Capacity, fmt.Sprintf("Target 1: 25 - %d (5 shards/%d = %d impact) = %d", expectedTarget1Impact, erasure_coding.DataShardsCount, expectedTarget1Impact, 25-expectedTarget1Impact))
+ assert.Equal(t, int64(32-expectedTarget2Impact), target2Capacity, fmt.Sprintf("Target 2: 32 - %d (5 shards/%d = %d impact) = %d", expectedTarget2Impact, erasure_coding.DataShardsCount, expectedTarget2Impact, 32-expectedTarget2Impact))
+ assert.Equal(t, int64(23-expectedTarget3Impact), target3Capacity, fmt.Sprintf("Target 3: 23 - %d (4 shards/%d = %d impact) = %d", expectedTarget3Impact, erasure_coding.DataShardsCount, expectedTarget3Impact, 23-expectedTarget3Impact))
+
+ t.Logf("EC operation distributed %d shards across %d targets", len(shardDestinations), 3)
+ t.Logf("Capacity impacts: EC source reserves with zero impact, Targets minimal (shards < %d)", erasure_coding.DataShardsCount)
+}
+
+// TestCapacityReservationCycle demonstrates the complete task lifecycle and capacity management
+func TestCapacityReservationCycle(t *testing.T) {
+ activeTopology := NewActiveTopology(10)
+
+ // Create test topology
+ topologyInfo := &master_pb.TopologyInfo{
+ DataCenterInfos: []*master_pb.DataCenterInfo{
+ {
+ Id: "dc1",
+ RackInfos: []*master_pb.RackInfo{
+ {
+ Id: "rack1",
+ DataNodeInfos: []*master_pb.DataNodeInfo{
+ {
+ Id: "10.0.0.1:8080",
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 10, MaxVolumeCount: 20},
+ },
+ },
+ {
+ Id: "10.0.0.2:8080",
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "hdd": {DiskId: 0, Type: "hdd", VolumeCount: 5, MaxVolumeCount: 15},
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+ activeTopology.UpdateTopology(topologyInfo)
+
+ // Initial capacity
+ sourceCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
+ targetCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
+ assert.Equal(t, int64(10), sourceCapacity, "Source initial capacity")
+ assert.Equal(t, int64(10), targetCapacity, "Target initial capacity")
+
+ // Step 1: Add pending task (should reserve capacity via StorageSlotChange)
+ err := activeTopology.AddPendingTask(TaskSpec{
+ TaskID: "balance_test",
+ TaskType: TaskTypeBalance,
+ VolumeID: 123,
+ VolumeSize: 1 * 1024 * 1024 * 1024,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "10.0.0.2:8080", DiskID: 0},
+ },
+ })
+ assert.NoError(t, err, "Should add balance test successfully")
+
+ sourceCapacityAfterPending := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
+ targetCapacityAfterPending := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
+ assert.Equal(t, int64(11), sourceCapacityAfterPending, "Source should gain capacity from pending balance task (balance source frees 1 slot)")
+ assert.Equal(t, int64(9), targetCapacityAfterPending, "Target should consume capacity from pending task (balance reserves 1 slot)")
+
+ // Verify planning capacity considers the same pending tasks
+ planningDisks := activeTopology.GetDisksForPlanning(TaskTypeBalance, "", 1)
+ assert.Len(t, planningDisks, 2, "Both disks should be available for planning")
+
+ // Step 2: Assign task (capacity already reserved by pending task)
+ err = activeTopology.AssignTask("balance_test")
+ assert.NoError(t, err, "Should assign task successfully")
+
+ sourceCapacityAfterAssign := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
+ targetCapacityAfterAssign := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
+
+ assert.Equal(t, int64(11), sourceCapacityAfterAssign, "Source capacity should remain same (already accounted by pending)")
+ assert.Equal(t, int64(9), targetCapacityAfterAssign, "Target capacity should remain same (already accounted by pending)")
+
+ // Note: Detailed task state tracking (planned vs reserved) is no longer exposed via public API
+ // The important functionality is that capacity calculations remain consistent
+
+ // Step 3: Complete task (should release reserved capacity)
+ err = activeTopology.CompleteTask("balance_test")
+ assert.NoError(t, err, "Should complete task successfully")
+
+ sourceCapacityAfterComplete := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
+ targetCapacityAfterComplete := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
+ assert.Equal(t, int64(10), sourceCapacityAfterComplete, "Source should return to original capacity")
+ assert.Equal(t, int64(10), targetCapacityAfterComplete, "Target should return to original capacity")
+
+ // Step 4: Apply actual storage change (simulates master topology update)
+ activeTopology.ApplyActualStorageChange("10.0.0.1:8080", 0, -1) // Source loses 1 volume
+ activeTopology.ApplyActualStorageChange("10.0.0.2:8080", 0, 1) // Target gains 1 volume
+
+ // Final capacity should reflect actual topology changes
+ finalSourceCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
+ finalTargetCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
+ assert.Equal(t, int64(11), finalSourceCapacity, "Source: (20-9) = 11 after losing 1 volume")
+ assert.Equal(t, int64(9), finalTargetCapacity, "Target: (15-6) = 9 after gaining 1 volume")
+
+ t.Logf("Capacity lifecycle with StorageSlotChange: Pending -> Assigned -> Released -> Applied")
+ t.Logf("Source: 10 -> 11 -> 11 -> 10 -> 11 (freed by pending balance, then applied)")
+ t.Logf("Target: 10 -> 9 -> 9 -> 10 -> 9 (reserved by pending, then applied)")
+}
+
+// TestReplicatedVolumeECOperations tests EC operations on replicated volumes
+func TestReplicatedVolumeECOperations(t *testing.T) {
+ activeTopology := NewActiveTopology(10)
+
+ // Setup cluster with multiple servers for replicated volumes
+ activeTopology.UpdateTopology(&master_pb.TopologyInfo{
+ DataCenterInfos: []*master_pb.DataCenterInfo{
+ {
+ Id: "dc1",
+ RackInfos: []*master_pb.RackInfo{
+ {
+ Id: "rack1",
+ DataNodeInfos: []*master_pb.DataNodeInfo{
+ {
+ Id: "10.0.0.1:8080",
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 10},
+ },
+ },
+ {
+ Id: "10.0.0.2:8080",
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 5},
+ },
+ },
+ {
+ Id: "10.0.0.3:8080",
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 3},
+ },
+ },
+ {
+ Id: "10.0.0.4:8080",
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 15},
+ },
+ },
+ {
+ Id: "10.0.0.5:8080",
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 20},
+ },
+ },
+ {
+ Id: "10.0.0.6:8080",
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 25},
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ })
+
+ // Test: EC operation on replicated volume (3 replicas)
+ volumeID := uint32(300)
+ originalVolumeSize := int64(1024 * 1024 * 1024) // 1GB
+
+ // Create source specs for replicated volume (3 replicas)
+ sources := []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Replica 1
+ {ServerID: "10.0.0.2:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Replica 2
+ {ServerID: "10.0.0.3:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Replica 3
+ }
+
+ // EC destinations (shards distributed across different servers than sources)
+ shardDestinations := []string{
+ "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", "10.0.0.4:8080", // 5 shards
+ "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", // 5 shards
+ "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", // 4 shards
+ }
+ shardDiskIDs := make([]uint32, len(shardDestinations))
+ for i := range shardDiskIDs {
+ shardDiskIDs[i] = 0
+ }
+
+ expectedShardSize := int64(50 * 1024 * 1024) // 50MB per shard
+
+ // Create destination specs
+ destinations := make([]TaskDestinationSpec, len(shardDestinations))
+ shardImpact := CalculateECShardStorageImpact(1, expectedShardSize)
+ for i, dest := range shardDestinations {
+ destinations[i] = TaskDestinationSpec{
+ ServerID: dest,
+ DiskID: shardDiskIDs[i],
+ StorageImpact: &shardImpact,
+ EstimatedSize: &expectedShardSize,
+ }
+ }
+
+ // Create EC task for replicated volume
+ err := activeTopology.AddPendingTask(TaskSpec{
+ TaskID: "ec_replicated",
+ TaskType: TaskTypeErasureCoding,
+ VolumeID: volumeID,
+ VolumeSize: originalVolumeSize,
+ Sources: sources,
+ Destinations: destinations,
+ })
+ assert.NoError(t, err, "Should successfully create EC task for replicated volume")
+
+ // Verify capacity impact on all source replicas (each should reserve with zero impact)
+ for i, source := range sources {
+ plannedVol, reservedVol, plannedShard, reservedShard, _ := testGetDiskStorageImpact(activeTopology, source.ServerID, source.DiskID)
+ assert.Equal(t, int64(0), plannedVol, fmt.Sprintf("Source replica %d should reserve with zero volume slot impact", i+1))
+ assert.Equal(t, int64(0), reservedVol, fmt.Sprintf("Source replica %d should have no active volume slots", i+1))
+ assert.Equal(t, int32(0), plannedShard, fmt.Sprintf("Source replica %d should have no planned shard slots", i+1))
+ assert.Equal(t, int32(0), reservedShard, fmt.Sprintf("Source replica %d should have no active shard slots", i+1))
+ // Note: EstimatedSize tracking is no longer exposed via public API
+ }
+
+ // Verify capacity impact on EC destinations
+ destinationCounts := make(map[string]int)
+ for _, dest := range shardDestinations {
+ destinationCounts[dest]++
+ }
+
+ for serverID, expectedShards := range destinationCounts {
+ plannedVol, _, plannedShard, _, _ := testGetDiskStorageImpact(activeTopology, serverID, 0)
+ assert.Equal(t, int64(0), plannedVol, fmt.Sprintf("Destination %s should have no planned volume slots", serverID))
+ assert.Equal(t, int32(expectedShards), plannedShard, fmt.Sprintf("Destination %s should plan to receive %d shards", serverID, expectedShards))
+ }
+
+ // Verify effective capacity calculation for sources (should have zero EC impact)
+ sourceCapacity1 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
+ sourceCapacity2 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.2:8080", 0)
+ sourceCapacity3 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.3:8080", 0)
+
+ // All sources should have same capacity as baseline (EC source reserves with zero impact)
+ assert.Equal(t, int64(90), sourceCapacity1, "Source 1: 100 - 10 (current) - 0 (EC source impact) = 90")
+ assert.Equal(t, int64(95), sourceCapacity2, "Source 2: 100 - 5 (current) - 0 (EC source impact) = 95")
+ assert.Equal(t, int64(97), sourceCapacity3, "Source 3: 100 - 3 (current) - 0 (EC source impact) = 97")
+
+ // Verify effective capacity calculation for destinations (should be reduced by shard slots)
+ destCapacity4 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.4:8080", 0)
+ destCapacity5 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.5:8080", 0)
+ destCapacity6 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.6:8080", 0)
+
+ // Dynamic shard impact calculations
+ dest4ShardImpact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact
+ dest5ShardImpact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact
+ dest6ShardImpact := int64(4 / erasure_coding.DataShardsCount) // 4 shards impact
+
+ // Destinations should be reduced by shard impact
+ assert.Equal(t, int64(85-dest4ShardImpact), destCapacity4, fmt.Sprintf("Dest 4: 100 - 15 (current) - %d (5 shards/%d = %d impact) = %d", dest4ShardImpact, erasure_coding.DataShardsCount, dest4ShardImpact, 85-dest4ShardImpact))
+ assert.Equal(t, int64(80-dest5ShardImpact), destCapacity5, fmt.Sprintf("Dest 5: 100 - 20 (current) - %d (5 shards/%d = %d impact) = %d", dest5ShardImpact, erasure_coding.DataShardsCount, dest5ShardImpact, 80-dest5ShardImpact))
+ assert.Equal(t, int64(75-dest6ShardImpact), destCapacity6, fmt.Sprintf("Dest 6: 100 - 25 (current) - %d (4 shards/%d = %d impact) = %d", dest6ShardImpact, erasure_coding.DataShardsCount, dest6ShardImpact, 75-dest6ShardImpact))
+
+ t.Logf("Replicated volume EC operation: %d source replicas, %d EC shards distributed across %d destinations",
+ len(sources), len(shardDestinations), len(destinationCounts))
+ t.Logf("Each source replica reserves with zero capacity impact, destinations receive EC shards")
+}
+
+// TestECWithOldShardCleanup tests EC operations that need to clean up old shards from previous failed attempts
+func TestECWithOldShardCleanup(t *testing.T) {
+ activeTopology := NewActiveTopology(10)
+
+ // Setup cluster with servers
+ activeTopology.UpdateTopology(&master_pb.TopologyInfo{
+ DataCenterInfos: []*master_pb.DataCenterInfo{
+ {
+ Id: "dc1",
+ RackInfos: []*master_pb.RackInfo{
+ {
+ Id: "rack1",
+ DataNodeInfos: []*master_pb.DataNodeInfo{
+ {
+ Id: "10.0.0.1:8080",
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 10},
+ },
+ },
+ {
+ Id: "10.0.0.2:8080",
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 5},
+ },
+ },
+ {
+ Id: "10.0.0.3:8080", // Had old EC shards from previous failed attempt
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 3},
+ },
+ },
+ {
+ Id: "10.0.0.4:8080", // Had old EC shards from previous failed attempt
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 7},
+ },
+ },
+ {
+ Id: "10.0.0.5:8080", // New EC destination
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 20},
+ },
+ },
+ {
+ Id: "10.0.0.6:8080", // New EC destination
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 25},
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ })
+
+ // Test: EC operation that needs to clean up both volume replicas AND old EC shards
+ volumeID := uint32(400)
+ originalVolumeSize := int64(1024 * 1024 * 1024) // 1GB
+
+ // Create source specs: volume replicas + old EC shard locations
+ sources := []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Volume replica 1
+ {ServerID: "10.0.0.2:8080", DiskID: 0, CleanupType: CleanupVolumeReplica}, // Volume replica 2
+ {ServerID: "10.0.0.3:8080", DiskID: 0, CleanupType: CleanupECShards}, // Old EC shards from failed attempt
+ {ServerID: "10.0.0.4:8080", DiskID: 0, CleanupType: CleanupECShards}, // Old EC shards from failed attempt
+ }
+
+ // EC destinations (new complete set of shards)
+ shardDestinations := []string{
+ "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", // 5 shards
+ "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", "10.0.0.5:8080", // 4 more shards (9 total)
+ "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", "10.0.0.6:8080", // 5 shards
+ }
+ shardDiskIDs := make([]uint32, len(shardDestinations))
+ for i := range shardDiskIDs {
+ shardDiskIDs[i] = 0
+ }
+
+ expectedShardSize := int64(50 * 1024 * 1024) // 50MB per shard
+
+ // Create destination specs
+ destinations := make([]TaskDestinationSpec, len(shardDestinations))
+ shardImpact := CalculateECShardStorageImpact(1, expectedShardSize)
+ for i, dest := range shardDestinations {
+ destinations[i] = TaskDestinationSpec{
+ ServerID: dest,
+ DiskID: shardDiskIDs[i],
+ StorageImpact: &shardImpact,
+ EstimatedSize: &expectedShardSize,
+ }
+ }
+
+ // Create EC task that cleans up both volume replicas and old EC shards
+ err := activeTopology.AddPendingTask(TaskSpec{
+ TaskID: "ec_cleanup",
+ TaskType: TaskTypeErasureCoding,
+ VolumeID: volumeID,
+ VolumeSize: originalVolumeSize,
+ Sources: sources,
+ Destinations: destinations,
+ })
+ assert.NoError(t, err, "Should successfully create EC task with mixed cleanup types")
+
+ // Verify capacity impact on volume replica sources (zero impact for EC)
+ for i := 0; i < 2; i++ {
+ source := sources[i]
+ plannedVol, _, plannedShard, _, _ := testGetDiskStorageImpact(activeTopology, source.ServerID, source.DiskID)
+ assert.Equal(t, int64(0), plannedVol, fmt.Sprintf("Volume replica source %d should have zero volume slot impact", i+1))
+ assert.Equal(t, int32(0), plannedShard, fmt.Sprintf("Volume replica source %d should have zero shard slot impact", i+1))
+ // Note: EstimatedSize tracking is no longer exposed via public API
+ }
+
+ // Verify capacity impact on old EC shard sources (should free shard slots)
+ for i := 2; i < 4; i++ {
+ source := sources[i]
+ plannedVol, _, plannedShard, _, _ := testGetDiskStorageImpact(activeTopology, source.ServerID, source.DiskID)
+ assert.Equal(t, int64(0), plannedVol, fmt.Sprintf("EC shard source %d should have zero volume slot impact", i+1))
+ assert.Equal(t, int32(-erasure_coding.TotalShardsCount), plannedShard, fmt.Sprintf("EC shard source %d should free %d shard slots", i+1, erasure_coding.TotalShardsCount))
+ // Note: EstimatedSize tracking is no longer exposed via public API
+ }
+
+ // Verify capacity impact on new EC destinations
+ destPlan5, _, destShard5, _, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.5:8080", 0)
+ destPlan6, _, destShard6, _, _ := testGetDiskStorageImpact(activeTopology, "10.0.0.6:8080", 0)
+
+ assert.Equal(t, int64(0), destPlan5, "New EC destination 5 should have no planned volume slots")
+ assert.Equal(t, int32(9), destShard5, "New EC destination 5 should plan to receive 9 shards")
+ assert.Equal(t, int64(0), destPlan6, "New EC destination 6 should have no planned volume slots")
+ assert.Equal(t, int32(5), destShard6, "New EC destination 6 should plan to receive 5 shards")
+
+ // Verify effective capacity calculation shows proper impact
+ capacity3 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.3:8080", 0) // Freeing old EC shards
+ capacity4 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.4:8080", 0) // Freeing old EC shards
+ capacity5 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.5:8080", 0) // Receiving new EC shards
+ capacity6 := activeTopology.GetEffectiveAvailableCapacity("10.0.0.6:8080", 0) // Receiving new EC shards
+
+ // Servers freeing old EC shards should have INCREASED capacity (freed shard slots provide capacity)
+ assert.Equal(t, int64(98), capacity3, fmt.Sprintf("Server 3: 100 - 3 (current) + 1 (freeing %d shards) = 98", erasure_coding.TotalShardsCount))
+ assert.Equal(t, int64(94), capacity4, fmt.Sprintf("Server 4: 100 - 7 (current) + 1 (freeing %d shards) = 94", erasure_coding.TotalShardsCount))
+
+ // Servers receiving new EC shards should have slightly reduced capacity
+ server5ShardImpact := int64(9 / erasure_coding.DataShardsCount) // 9 shards impact
+ server6ShardImpact := int64(5 / erasure_coding.DataShardsCount) // 5 shards impact
+
+ assert.Equal(t, int64(80-server5ShardImpact), capacity5, fmt.Sprintf("Server 5: 100 - 20 (current) - %d (9 shards/%d = %d impact) = %d", server5ShardImpact, erasure_coding.DataShardsCount, server5ShardImpact, 80-server5ShardImpact))
+ assert.Equal(t, int64(75-server6ShardImpact), capacity6, fmt.Sprintf("Server 6: 100 - 25 (current) - %d (5 shards/%d = %d impact) = %d", server6ShardImpact, erasure_coding.DataShardsCount, server6ShardImpact, 75-server6ShardImpact))
+
+ t.Logf("EC operation with cleanup: %d volume replicas + %d old EC shard locations → %d new EC shards",
+ 2, 2, len(shardDestinations))
+ t.Logf("Volume sources have zero impact, old EC shard sources free capacity, new destinations consume shard slots")
+}
+
+// TestDetailedCapacityCalculations tests the new StorageSlotChange-based capacity calculation functions
+func TestDetailedCapacityCalculations(t *testing.T) {
+ activeTopology := NewActiveTopology(10)
+
+ // Setup cluster
+ activeTopology.UpdateTopology(&master_pb.TopologyInfo{
+ DataCenterInfos: []*master_pb.DataCenterInfo{
+ {
+ Id: "dc1",
+ RackInfos: []*master_pb.RackInfo{
+ {
+ Id: "rack1",
+ DataNodeInfos: []*master_pb.DataNodeInfo{
+ {
+ Id: "10.0.0.1:8080",
+ DiskInfos: map[string]*master_pb.DiskInfo{
+ "0": {DiskId: 0, Type: "hdd", MaxVolumeCount: 100, VolumeCount: 20},
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ })
+
+ // Test: Add an EC task and check detailed capacity
+ sources := []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0, CleanupType: CleanupVolumeReplica},
+ }
+
+ shardDestinations := []string{"10.0.0.1:8080", "10.0.0.1:8080", "10.0.0.1:8080", "10.0.0.1:8080", "10.0.0.1:8080"}
+ shardDiskIDs := []uint32{0, 0, 0, 0, 0}
+
+ // Create destination specs
+ destinations := make([]TaskDestinationSpec, len(shardDestinations))
+ expectedShardSize := int64(50 * 1024 * 1024)
+ shardImpact := CalculateECShardStorageImpact(1, expectedShardSize)
+ for i, dest := range shardDestinations {
+ destinations[i] = TaskDestinationSpec{
+ ServerID: dest,
+ DiskID: shardDiskIDs[i],
+ StorageImpact: &shardImpact,
+ EstimatedSize: &expectedShardSize,
+ }
+ }
+
+ err := activeTopology.AddPendingTask(TaskSpec{
+ TaskID: "detailed_test",
+ TaskType: TaskTypeErasureCoding,
+ VolumeID: 500,
+ VolumeSize: 1024 * 1024 * 1024,
+ Sources: sources,
+ Destinations: destinations,
+ })
+ assert.NoError(t, err, "Should add EC task successfully")
+
+ // Test the new detailed capacity function
+ detailedCapacity := activeTopology.GetEffectiveAvailableCapacityDetailed("10.0.0.1:8080", 0)
+ simpleCapacity := activeTopology.GetEffectiveAvailableCapacity("10.0.0.1:8080", 0)
+
+ // The simple capacity should match the volume slots from detailed capacity
+ assert.Equal(t, int64(detailedCapacity.VolumeSlots), simpleCapacity, "Simple capacity should match detailed volume slots")
+
+ // Verify detailed capacity has both volume and shard information
+ assert.Equal(t, int32(80), detailedCapacity.VolumeSlots, "Should have 80 available volume slots (100 - 20 current, no volume impact from EC)")
+ assert.Equal(t, int32(-5), detailedCapacity.ShardSlots, "Should show -5 available shard slots (5 destination shards)")
+
+ // Verify capacity impact
+ capacityImpact := activeTopology.GetEffectiveCapacityImpact("10.0.0.1:8080", 0)
+ assert.Equal(t, int32(0), capacityImpact.VolumeSlots, "EC source should have zero volume slot impact")
+ assert.Equal(t, int32(5), capacityImpact.ShardSlots, "Should have positive shard slot impact (consuming 5 shards)")
+
+ t.Logf("Detailed capacity calculation: VolumeSlots=%d, ShardSlots=%d",
+ detailedCapacity.VolumeSlots, detailedCapacity.ShardSlots)
+ t.Logf("Capacity impact: VolumeSlots=%d, ShardSlots=%d",
+ capacityImpact.VolumeSlots, capacityImpact.ShardSlots)
+ t.Logf("Simple capacity (backward compatible): %d", simpleCapacity)
+}
+
+// TestStorageSlotChangeConversions tests the conversion and accommodation methods for StorageSlotChange
+// This test is designed to work with any value of erasure_coding.DataShardsCount, making it
+// compatible with custom erasure coding configurations.
+func TestStorageSlotChangeConversions(t *testing.T) {
+ // Get the actual erasure coding constants for dynamic testing
+ dataShards := int32(erasure_coding.DataShardsCount)
+
+ // Test conversion constants
+ assert.Equal(t, int(dataShards), ShardsPerVolumeSlot, fmt.Sprintf("Should use erasure_coding.DataShardsCount (%d) shards per volume slot", dataShards))
+
+ // Test basic conversions using dynamic values
+ volumeOnly := StorageSlotChange{VolumeSlots: 5, ShardSlots: 0}
+ shardOnly := StorageSlotChange{VolumeSlots: 0, ShardSlots: 2 * dataShards} // 2 volume equivalents in shards
+ mixed := StorageSlotChange{VolumeSlots: 2, ShardSlots: dataShards + 5} // 2 volumes + 1.5 volume equivalent in shards
+
+ // Test ToVolumeSlots conversion - these should work regardless of DataShardsCount value
+ assert.Equal(t, int64(5), volumeOnly.ToVolumeSlots(), "5 volume slots = 5 volume slots")
+ assert.Equal(t, int64(2), shardOnly.ToVolumeSlots(), fmt.Sprintf("%d shard slots = 2 volume slots", 2*dataShards))
+ expectedMixedVolumes := int64(2 + (dataShards+5)/dataShards) // 2 + floor((DataShardsCount+5)/DataShardsCount)
+ assert.Equal(t, expectedMixedVolumes, mixed.ToVolumeSlots(), fmt.Sprintf("2 volume + %d shards = %d volume slots", dataShards+5, expectedMixedVolumes))
+
+ // Test ToShardSlots conversion
+ expectedVolumeShards := int32(5 * dataShards)
+ assert.Equal(t, expectedVolumeShards, volumeOnly.ToShardSlots(), fmt.Sprintf("5 volume slots = %d shard slots", expectedVolumeShards))
+ assert.Equal(t, 2*dataShards, shardOnly.ToShardSlots(), fmt.Sprintf("%d shard slots = %d shard slots", 2*dataShards, 2*dataShards))
+ expectedMixedShards := int32(2*dataShards + dataShards + 5)
+ assert.Equal(t, expectedMixedShards, mixed.ToShardSlots(), fmt.Sprintf("2 volume + %d shards = %d shard slots", dataShards+5, expectedMixedShards))
+
+ // Test capacity accommodation checks using shard-based comparison
+ availableVolumes := int32(10)
+ available := StorageSlotChange{VolumeSlots: availableVolumes, ShardSlots: 0} // availableVolumes * dataShards shard slots available
+
+ smallVolumeRequest := StorageSlotChange{VolumeSlots: 3, ShardSlots: 0} // Needs 3 * dataShards shard slots
+ largeVolumeRequest := StorageSlotChange{VolumeSlots: availableVolumes + 5, ShardSlots: 0} // Needs more than available
+ shardRequest := StorageSlotChange{VolumeSlots: 0, ShardSlots: 5 * dataShards} // Needs 5 volume equivalents in shards
+ mixedRequest := StorageSlotChange{VolumeSlots: 8, ShardSlots: 3 * dataShards} // Needs 11 volume equivalents total
+
+ smallShardsNeeded := 3 * dataShards
+ availableShards := availableVolumes * dataShards
+ largeShardsNeeded := (availableVolumes + 5) * dataShards
+ shardShardsNeeded := 5 * dataShards
+ mixedShardsNeeded := 8*dataShards + 3*dataShards
+
+ assert.True(t, available.CanAccommodate(smallVolumeRequest), fmt.Sprintf("Should accommodate small volume request (%d <= %d shards)", smallShardsNeeded, availableShards))
+ assert.False(t, available.CanAccommodate(largeVolumeRequest), fmt.Sprintf("Should NOT accommodate large volume request (%d > %d shards)", largeShardsNeeded, availableShards))
+ assert.True(t, available.CanAccommodate(shardRequest), fmt.Sprintf("Should accommodate shard request (%d <= %d shards)", shardShardsNeeded, availableShards))
+ assert.False(t, available.CanAccommodate(mixedRequest), fmt.Sprintf("Should NOT accommodate mixed request (%d > %d shards)", mixedShardsNeeded, availableShards))
+
+ t.Logf("Conversion tests passed: %d shards = 1 volume slot", ShardsPerVolumeSlot)
+ t.Logf("Mixed capacity (%d volumes + %d shards) = %d equivalent volume slots",
+ mixed.VolumeSlots, mixed.ShardSlots, mixed.ToVolumeSlots())
+ t.Logf("Available capacity (%d volumes) = %d total shard slots",
+ available.VolumeSlots, available.ToShardSlots())
+ t.Logf("NOTE: This test adapts automatically to erasure_coding.DataShardsCount = %d", erasure_coding.DataShardsCount)
+}
diff --git a/weed/admin/topology/structs.go b/weed/admin/topology/structs.go
new file mode 100644
index 000000000..f2d29eb5f
--- /dev/null
+++ b/weed/admin/topology/structs.go
@@ -0,0 +1,120 @@
+package topology
+
+import (
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+)
+
+// TaskSource represents a single source in a multi-source task (for replicated volume cleanup)
+type TaskSource struct {
+ SourceServer string `json:"source_server"`
+ SourceDisk uint32 `json:"source_disk"`
+ StorageChange StorageSlotChange `json:"storage_change"` // Storage impact on this source
+ EstimatedSize int64 `json:"estimated_size"` // Estimated size for this source
+}
+
+// TaskDestination represents a single destination in a multi-destination task
+type TaskDestination struct {
+ TargetServer string `json:"target_server"`
+ TargetDisk uint32 `json:"target_disk"`
+ StorageChange StorageSlotChange `json:"storage_change"` // Storage impact on this destination
+ EstimatedSize int64 `json:"estimated_size"` // Estimated size for this destination
+}
+
+// taskState represents the current state of tasks affecting the topology (internal)
+// Uses unified multi-source/multi-destination design:
+// - Single-source tasks (balance, vacuum, replication): 1 source, 1 destination
+// - Multi-source EC tasks (replicated volumes): N sources, M destinations
+type taskState struct {
+ VolumeID uint32 `json:"volume_id"`
+ TaskType TaskType `json:"task_type"`
+ Status TaskStatus `json:"status"`
+ StartedAt time.Time `json:"started_at"`
+ CompletedAt time.Time `json:"completed_at,omitempty"`
+ EstimatedSize int64 `json:"estimated_size"` // Total estimated size of task
+
+ // Unified source and destination arrays (always used)
+ Sources []TaskSource `json:"sources"` // Source locations (1+ for all task types)
+ Destinations []TaskDestination `json:"destinations"` // Destination locations (1+ for all task types)
+}
+
+// DiskInfo represents a disk with its current state and ongoing tasks (public for external access)
+type DiskInfo struct {
+ NodeID string `json:"node_id"`
+ DiskID uint32 `json:"disk_id"`
+ DiskType string `json:"disk_type"`
+ DataCenter string `json:"data_center"`
+ Rack string `json:"rack"`
+ DiskInfo *master_pb.DiskInfo `json:"disk_info"`
+ LoadCount int `json:"load_count"` // Number of active tasks
+}
+
+// activeDisk represents internal disk state (private)
+type activeDisk struct {
+ *DiskInfo
+ pendingTasks []*taskState
+ assignedTasks []*taskState
+ recentTasks []*taskState // Completed in last N seconds
+}
+
+// activeNode represents a node with its disks (private)
+type activeNode struct {
+ nodeID string
+ dataCenter string
+ rack string
+ nodeInfo *master_pb.DataNodeInfo
+ disks map[uint32]*activeDisk // DiskID -> activeDisk
+}
+
+// ActiveTopology provides a real-time view of cluster state with task awareness
+type ActiveTopology struct {
+ // Core topology from master
+ topologyInfo *master_pb.TopologyInfo
+ lastUpdated time.Time
+
+ // Structured topology for easy access (private)
+ nodes map[string]*activeNode // NodeID -> activeNode
+ disks map[string]*activeDisk // "NodeID:DiskID" -> activeDisk
+
+ // Performance indexes for O(1) lookups (private)
+ volumeIndex map[uint32][]string // VolumeID -> list of "NodeID:DiskID" where volume replicas exist
+ ecShardIndex map[uint32][]string // VolumeID -> list of "NodeID:DiskID" where EC shards exist
+
+ // Task states affecting the topology (private)
+ pendingTasks map[string]*taskState
+ assignedTasks map[string]*taskState
+ recentTasks map[string]*taskState
+
+ // Configuration
+ recentTaskWindowSeconds int
+
+ // Synchronization
+ mutex sync.RWMutex
+}
+
+// DestinationPlan represents a planned destination for a volume/shard operation
+type DestinationPlan struct {
+ TargetNode string `json:"target_node"`
+ TargetDisk uint32 `json:"target_disk"`
+ TargetRack string `json:"target_rack"`
+ TargetDC string `json:"target_dc"`
+ ExpectedSize uint64 `json:"expected_size"`
+ PlacementScore float64 `json:"placement_score"`
+ Conflicts []string `json:"conflicts"`
+}
+
+// MultiDestinationPlan represents multiple planned destinations for operations like EC
+type MultiDestinationPlan struct {
+ Plans []*DestinationPlan `json:"plans"`
+ TotalShards int `json:"total_shards"`
+ SuccessfulRack int `json:"successful_racks"`
+ SuccessfulDCs int `json:"successful_dcs"`
+}
+
+// VolumeReplica represents a replica location with server and disk information
+type VolumeReplica struct {
+ ServerID string `json:"server_id"`
+ DiskID uint32 `json:"disk_id"`
+}
diff --git a/weed/admin/topology/task_management.go b/weed/admin/topology/task_management.go
new file mode 100644
index 000000000..b240adcd8
--- /dev/null
+++ b/weed/admin/topology/task_management.go
@@ -0,0 +1,264 @@
+package topology
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+)
+
+// AssignTask moves a task from pending to assigned and reserves capacity
+func (at *ActiveTopology) AssignTask(taskID string) error {
+ at.mutex.Lock()
+ defer at.mutex.Unlock()
+
+ task, exists := at.pendingTasks[taskID]
+ if !exists {
+ return fmt.Errorf("pending task %s not found", taskID)
+ }
+
+ // Check if all destination disks have sufficient capacity to reserve
+ for _, dest := range task.Destinations {
+ targetKey := fmt.Sprintf("%s:%d", dest.TargetServer, dest.TargetDisk)
+ if targetDisk, exists := at.disks[targetKey]; exists {
+ availableCapacity := at.getEffectiveAvailableCapacityUnsafe(targetDisk)
+
+ // Check if we have enough total capacity using the improved unified comparison
+ if !availableCapacity.CanAccommodate(dest.StorageChange) {
+ return fmt.Errorf("insufficient capacity on target disk %s:%d. Available: %+v, Required: %+v",
+ dest.TargetServer, dest.TargetDisk, availableCapacity, dest.StorageChange)
+ }
+ } else if dest.TargetServer != "" {
+ // Fail fast if destination disk is not found in topology
+ return fmt.Errorf("destination disk %s not found in topology", targetKey)
+ }
+ }
+
+ // Move task to assigned and reserve capacity
+ delete(at.pendingTasks, taskID)
+ task.Status = TaskStatusInProgress
+ at.assignedTasks[taskID] = task
+ at.reassignTaskStates()
+
+ // Log capacity reservation information for all sources and destinations
+ totalSourceImpact := StorageSlotChange{}
+ totalDestImpact := StorageSlotChange{}
+ for _, source := range task.Sources {
+ totalSourceImpact.AddInPlace(source.StorageChange)
+ }
+ for _, dest := range task.Destinations {
+ totalDestImpact.AddInPlace(dest.StorageChange)
+ }
+
+ glog.V(2).Infof("Task %s assigned and capacity reserved: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
+ taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
+ len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
+
+ return nil
+}
+
+// CompleteTask moves a task from assigned to recent and releases reserved capacity
+// NOTE: This only releases the reserved capacity. The actual topology update (VolumeCount changes)
+// should be handled by the master when it receives the task completion notification.
+func (at *ActiveTopology) CompleteTask(taskID string) error {
+ at.mutex.Lock()
+ defer at.mutex.Unlock()
+
+ task, exists := at.assignedTasks[taskID]
+ if !exists {
+ return fmt.Errorf("assigned task %s not found", taskID)
+ }
+
+ // Release reserved capacity by moving task to completed state
+ delete(at.assignedTasks, taskID)
+ task.Status = TaskStatusCompleted
+ task.CompletedAt = time.Now()
+ at.recentTasks[taskID] = task
+ at.reassignTaskStates()
+
+ // Log capacity release information for all sources and destinations
+ totalSourceImpact := StorageSlotChange{}
+ totalDestImpact := StorageSlotChange{}
+ for _, source := range task.Sources {
+ totalSourceImpact.AddInPlace(source.StorageChange)
+ }
+ for _, dest := range task.Destinations {
+ totalDestImpact.AddInPlace(dest.StorageChange)
+ }
+
+ glog.V(2).Infof("Task %s completed and capacity released: %d sources (VolumeSlots:%d, ShardSlots:%d), %d destinations (VolumeSlots:%d, ShardSlots:%d)",
+ taskID, len(task.Sources), totalSourceImpact.VolumeSlots, totalSourceImpact.ShardSlots,
+ len(task.Destinations), totalDestImpact.VolumeSlots, totalDestImpact.ShardSlots)
+
+ // Clean up old recent tasks
+ at.cleanupRecentTasks()
+
+ return nil
+}
+
+// ApplyActualStorageChange updates the topology to reflect actual storage changes after task completion
+// This should be called when the master updates the topology with new VolumeCount information
+func (at *ActiveTopology) ApplyActualStorageChange(nodeID string, diskID uint32, volumeCountChange int64) {
+ at.mutex.Lock()
+ defer at.mutex.Unlock()
+
+ diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
+ if disk, exists := at.disks[diskKey]; exists && disk.DiskInfo != nil && disk.DiskInfo.DiskInfo != nil {
+ oldCount := disk.DiskInfo.DiskInfo.VolumeCount
+ disk.DiskInfo.DiskInfo.VolumeCount += volumeCountChange
+
+ glog.V(2).Infof("Applied actual storage change on disk %s: volume_count %d -> %d (change: %+d)",
+ diskKey, oldCount, disk.DiskInfo.DiskInfo.VolumeCount, volumeCountChange)
+ }
+}
+
+// AddPendingTask is the unified function that handles both simple and complex task creation
+func (at *ActiveTopology) AddPendingTask(spec TaskSpec) error {
+ // Validation
+ if len(spec.Sources) == 0 {
+ return fmt.Errorf("at least one source is required")
+ }
+ if len(spec.Destinations) == 0 {
+ return fmt.Errorf("at least one destination is required")
+ }
+
+ at.mutex.Lock()
+ defer at.mutex.Unlock()
+
+ // Build sources array
+ sources := make([]TaskSource, len(spec.Sources))
+ for i, sourceSpec := range spec.Sources {
+ var storageImpact StorageSlotChange
+ var estimatedSize int64
+
+ if sourceSpec.StorageImpact != nil {
+ // Use manually specified impact
+ storageImpact = *sourceSpec.StorageImpact
+ } else {
+ // Auto-calculate based on task type and cleanup type
+ storageImpact = at.calculateSourceStorageImpact(spec.TaskType, sourceSpec.CleanupType, spec.VolumeSize)
+ }
+
+ if sourceSpec.EstimatedSize != nil {
+ estimatedSize = *sourceSpec.EstimatedSize
+ } else {
+ estimatedSize = spec.VolumeSize // Default to volume size
+ }
+
+ sources[i] = TaskSource{
+ SourceServer: sourceSpec.ServerID,
+ SourceDisk: sourceSpec.DiskID,
+ StorageChange: storageImpact,
+ EstimatedSize: estimatedSize,
+ }
+ }
+
+ // Build destinations array
+ destinations := make([]TaskDestination, len(spec.Destinations))
+ for i, destSpec := range spec.Destinations {
+ var storageImpact StorageSlotChange
+ var estimatedSize int64
+
+ if destSpec.StorageImpact != nil {
+ // Use manually specified impact
+ storageImpact = *destSpec.StorageImpact
+ } else {
+ // Auto-calculate based on task type
+ _, storageImpact = CalculateTaskStorageImpact(spec.TaskType, spec.VolumeSize)
+ }
+
+ if destSpec.EstimatedSize != nil {
+ estimatedSize = *destSpec.EstimatedSize
+ } else {
+ estimatedSize = spec.VolumeSize // Default to volume size
+ }
+
+ destinations[i] = TaskDestination{
+ TargetServer: destSpec.ServerID,
+ TargetDisk: destSpec.DiskID,
+ StorageChange: storageImpact,
+ EstimatedSize: estimatedSize,
+ }
+ }
+
+ // Create the task
+ task := &taskState{
+ VolumeID: spec.VolumeID,
+ TaskType: spec.TaskType,
+ Status: TaskStatusPending,
+ StartedAt: time.Now(),
+ EstimatedSize: spec.VolumeSize,
+ Sources: sources,
+ Destinations: destinations,
+ }
+
+ at.pendingTasks[spec.TaskID] = task
+ at.assignTaskToDisk(task)
+
+ glog.V(2).Infof("Added pending %s task %s: volume %d, %d sources, %d destinations",
+ spec.TaskType, spec.TaskID, spec.VolumeID, len(sources), len(destinations))
+
+ return nil
+}
+
+// calculateSourceStorageImpact calculates storage impact for sources based on task type and cleanup type
+func (at *ActiveTopology) calculateSourceStorageImpact(taskType TaskType, cleanupType SourceCleanupType, volumeSize int64) StorageSlotChange {
+ switch taskType {
+ case TaskTypeErasureCoding:
+ switch cleanupType {
+ case CleanupVolumeReplica:
+ impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
+ return impact
+ case CleanupECShards:
+ return CalculateECShardCleanupImpact(volumeSize)
+ default:
+ impact, _ := CalculateTaskStorageImpact(TaskTypeErasureCoding, volumeSize)
+ return impact
+ }
+ default:
+ impact, _ := CalculateTaskStorageImpact(taskType, volumeSize)
+ return impact
+ }
+}
+
+// SourceCleanupType indicates what type of data needs to be cleaned up from a source
+type SourceCleanupType int
+
+const (
+ CleanupVolumeReplica SourceCleanupType = iota // Clean up volume replica (frees volume slots)
+ CleanupECShards // Clean up existing EC shards (frees shard slots)
+)
+
+// TaskSourceSpec represents a source specification for task creation
+type TaskSourceSpec struct {
+ ServerID string
+ DiskID uint32
+ CleanupType SourceCleanupType // For EC: volume replica vs existing shards
+ StorageImpact *StorageSlotChange // Optional: manual override
+ EstimatedSize *int64 // Optional: manual override
+}
+
+// TaskDestinationSpec represents a destination specification for task creation
+type TaskDestinationSpec struct {
+ ServerID string
+ DiskID uint32
+ StorageImpact *StorageSlotChange // Optional: manual override
+ EstimatedSize *int64 // Optional: manual override
+}
+
+// TaskSpec represents a complete task specification
+type TaskSpec struct {
+ TaskID string
+ TaskType TaskType
+ VolumeID uint32
+ VolumeSize int64 // Used for auto-calculation when manual impacts not provided
+ Sources []TaskSourceSpec // Can be single or multiple
+ Destinations []TaskDestinationSpec // Can be single or multiple
+}
+
+// TaskSourceLocation represents a source location for task creation (DEPRECATED: use TaskSourceSpec)
+type TaskSourceLocation struct {
+ ServerID string
+ DiskID uint32
+ CleanupType SourceCleanupType // What type of cleanup is needed
+}
diff --git a/weed/admin/topology/topology_management.go b/weed/admin/topology/topology_management.go
new file mode 100644
index 000000000..e12839484
--- /dev/null
+++ b/weed/admin/topology/topology_management.go
@@ -0,0 +1,253 @@
+package topology
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+)
+
+// UpdateTopology updates the topology information from master
+func (at *ActiveTopology) UpdateTopology(topologyInfo *master_pb.TopologyInfo) error {
+ at.mutex.Lock()
+ defer at.mutex.Unlock()
+
+ at.topologyInfo = topologyInfo
+ at.lastUpdated = time.Now()
+
+ // Rebuild structured topology
+ at.nodes = make(map[string]*activeNode)
+ at.disks = make(map[string]*activeDisk)
+
+ for _, dc := range topologyInfo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, nodeInfo := range rack.DataNodeInfos {
+ node := &activeNode{
+ nodeID: nodeInfo.Id,
+ dataCenter: dc.Id,
+ rack: rack.Id,
+ nodeInfo: nodeInfo,
+ disks: make(map[uint32]*activeDisk),
+ }
+
+ // Add disks for this node
+ for diskType, diskInfo := range nodeInfo.DiskInfos {
+ disk := &activeDisk{
+ DiskInfo: &DiskInfo{
+ NodeID: nodeInfo.Id,
+ DiskID: diskInfo.DiskId,
+ DiskType: diskType,
+ DataCenter: dc.Id,
+ Rack: rack.Id,
+ DiskInfo: diskInfo,
+ },
+ }
+
+ diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
+ node.disks[diskInfo.DiskId] = disk
+ at.disks[diskKey] = disk
+ }
+
+ at.nodes[nodeInfo.Id] = node
+ }
+ }
+ }
+
+ // Rebuild performance indexes for O(1) lookups
+ at.rebuildIndexes()
+
+ // Reassign task states to updated topology
+ at.reassignTaskStates()
+
+ glog.V(1).Infof("ActiveTopology updated: %d nodes, %d disks, %d volume entries, %d EC shard entries",
+ len(at.nodes), len(at.disks), len(at.volumeIndex), len(at.ecShardIndex))
+ return nil
+}
+
+// GetAvailableDisks returns disks that can accept new tasks of the given type
+// NOTE: For capacity-aware operations, prefer GetDisksWithEffectiveCapacity
+func (at *ActiveTopology) GetAvailableDisks(taskType TaskType, excludeNodeID string) []*DiskInfo {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ var available []*DiskInfo
+
+ for _, disk := range at.disks {
+ if disk.NodeID == excludeNodeID {
+ continue // Skip excluded node
+ }
+
+ if at.isDiskAvailable(disk, taskType) {
+ // Create a copy with current load count and effective capacity
+ diskCopy := *disk.DiskInfo
+ diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
+ available = append(available, &diskCopy)
+ }
+ }
+
+ return available
+}
+
+// HasRecentTaskForVolume checks if a volume had a recent task (to avoid immediate re-detection)
+func (at *ActiveTopology) HasRecentTaskForVolume(volumeID uint32, taskType TaskType) bool {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ for _, task := range at.recentTasks {
+ if task.VolumeID == volumeID && task.TaskType == taskType {
+ return true
+ }
+ }
+
+ return false
+}
+
+// GetAllNodes returns information about all nodes (public interface)
+func (at *ActiveTopology) GetAllNodes() map[string]*master_pb.DataNodeInfo {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ result := make(map[string]*master_pb.DataNodeInfo)
+ for nodeID, node := range at.nodes {
+ result[nodeID] = node.nodeInfo
+ }
+ return result
+}
+
+// GetTopologyInfo returns the current topology information (read-only access)
+func (at *ActiveTopology) GetTopologyInfo() *master_pb.TopologyInfo {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+ return at.topologyInfo
+}
+
+// GetNodeDisks returns all disks for a specific node
+func (at *ActiveTopology) GetNodeDisks(nodeID string) []*DiskInfo {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ node, exists := at.nodes[nodeID]
+ if !exists {
+ return nil
+ }
+
+ var disks []*DiskInfo
+ for _, disk := range node.disks {
+ diskCopy := *disk.DiskInfo
+ diskCopy.LoadCount = len(disk.pendingTasks) + len(disk.assignedTasks)
+ disks = append(disks, &diskCopy)
+ }
+
+ return disks
+}
+
+// rebuildIndexes rebuilds the volume and EC shard indexes for O(1) lookups
+func (at *ActiveTopology) rebuildIndexes() {
+ // Clear existing indexes
+ at.volumeIndex = make(map[uint32][]string)
+ at.ecShardIndex = make(map[uint32][]string)
+
+ // Rebuild indexes from current topology
+ for _, dc := range at.topologyInfo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, nodeInfo := range rack.DataNodeInfos {
+ for _, diskInfo := range nodeInfo.DiskInfos {
+ diskKey := fmt.Sprintf("%s:%d", nodeInfo.Id, diskInfo.DiskId)
+
+ // Index volumes
+ for _, volumeInfo := range diskInfo.VolumeInfos {
+ volumeID := volumeInfo.Id
+ at.volumeIndex[volumeID] = append(at.volumeIndex[volumeID], diskKey)
+ }
+
+ // Index EC shards
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ volumeID := ecShardInfo.Id
+ at.ecShardIndex[volumeID] = append(at.ecShardIndex[volumeID], diskKey)
+ }
+ }
+ }
+ }
+ }
+}
+
+// GetVolumeLocations returns the disk locations for a volume using O(1) lookup
+func (at *ActiveTopology) GetVolumeLocations(volumeID uint32, collection string) []VolumeReplica {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ diskKeys, exists := at.volumeIndex[volumeID]
+ if !exists {
+ return []VolumeReplica{}
+ }
+
+ var replicas []VolumeReplica
+ for _, diskKey := range diskKeys {
+ if disk, diskExists := at.disks[diskKey]; diskExists {
+ // Verify collection matches (since index doesn't include collection)
+ if at.volumeMatchesCollection(disk, volumeID, collection) {
+ replicas = append(replicas, VolumeReplica{
+ ServerID: disk.NodeID,
+ DiskID: disk.DiskID,
+ })
+ }
+ }
+ }
+
+ return replicas
+}
+
+// GetECShardLocations returns the disk locations for EC shards using O(1) lookup
+func (at *ActiveTopology) GetECShardLocations(volumeID uint32, collection string) []VolumeReplica {
+ at.mutex.RLock()
+ defer at.mutex.RUnlock()
+
+ diskKeys, exists := at.ecShardIndex[volumeID]
+ if !exists {
+ return []VolumeReplica{}
+ }
+
+ var ecShards []VolumeReplica
+ for _, diskKey := range diskKeys {
+ if disk, diskExists := at.disks[diskKey]; diskExists {
+ // Verify collection matches (since index doesn't include collection)
+ if at.ecShardMatchesCollection(disk, volumeID, collection) {
+ ecShards = append(ecShards, VolumeReplica{
+ ServerID: disk.NodeID,
+ DiskID: disk.DiskID,
+ })
+ }
+ }
+ }
+
+ return ecShards
+}
+
+// volumeMatchesCollection checks if a volume on a disk matches the given collection
+func (at *ActiveTopology) volumeMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool {
+ if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
+ return false
+ }
+
+ for _, volumeInfo := range disk.DiskInfo.DiskInfo.VolumeInfos {
+ if volumeInfo.Id == volumeID && volumeInfo.Collection == collection {
+ return true
+ }
+ }
+ return false
+}
+
+// ecShardMatchesCollection checks if EC shards on a disk match the given collection
+func (at *ActiveTopology) ecShardMatchesCollection(disk *activeDisk, volumeID uint32, collection string) bool {
+ if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
+ return false
+ }
+
+ for _, ecShardInfo := range disk.DiskInfo.DiskInfo.EcShardInfos {
+ if ecShardInfo.Id == volumeID && ecShardInfo.Collection == collection {
+ return true
+ }
+ }
+ return false
+}
diff --git a/weed/admin/topology/types.go b/weed/admin/topology/types.go
new file mode 100644
index 000000000..df0103529
--- /dev/null
+++ b/weed/admin/topology/types.go
@@ -0,0 +1,97 @@
+package topology
+
+import "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+
+// TaskType represents different types of maintenance operations
+type TaskType string
+
+// TaskStatus represents the current status of a task
+type TaskStatus string
+
+// Common task type constants
+const (
+ TaskTypeVacuum TaskType = "vacuum"
+ TaskTypeBalance TaskType = "balance"
+ TaskTypeErasureCoding TaskType = "erasure_coding"
+ TaskTypeReplication TaskType = "replication"
+)
+
+// Common task status constants
+const (
+ TaskStatusPending TaskStatus = "pending"
+ TaskStatusInProgress TaskStatus = "in_progress"
+ TaskStatusCompleted TaskStatus = "completed"
+)
+
+// Task and capacity management configuration constants
+const (
+ // MaxConcurrentTasksPerDisk defines the maximum number of concurrent tasks per disk
+ // This prevents overloading a single disk with too many simultaneous operations
+ MaxConcurrentTasksPerDisk = 2
+
+ // MaxTotalTaskLoadPerDisk defines the maximum total task load (pending + active) per disk
+ // This allows more tasks to be queued but limits the total pipeline depth
+ MaxTotalTaskLoadPerDisk = 3
+
+ // MaxTaskLoadForECPlacement defines the maximum task load to consider a disk for EC placement
+ // This threshold ensures disks aren't overloaded when planning EC operations
+ MaxTaskLoadForECPlacement = 10
+)
+
+// StorageSlotChange represents storage impact at both volume and shard levels
+type StorageSlotChange struct {
+ VolumeSlots int32 `json:"volume_slots"` // Volume-level slot changes (full volumes)
+ ShardSlots int32 `json:"shard_slots"` // Shard-level slot changes (EC shards, fractional capacity)
+}
+
+// Add returns a new StorageSlotChange with the sum of this and other
+func (s StorageSlotChange) Add(other StorageSlotChange) StorageSlotChange {
+ return StorageSlotChange{
+ VolumeSlots: s.VolumeSlots + other.VolumeSlots,
+ ShardSlots: s.ShardSlots + other.ShardSlots,
+ }
+}
+
+// Subtract returns a new StorageSlotChange with other subtracted from this
+func (s StorageSlotChange) Subtract(other StorageSlotChange) StorageSlotChange {
+ return StorageSlotChange{
+ VolumeSlots: s.VolumeSlots - other.VolumeSlots,
+ ShardSlots: s.ShardSlots - other.ShardSlots,
+ }
+}
+
+// AddInPlace adds other to this StorageSlotChange in-place
+func (s *StorageSlotChange) AddInPlace(other StorageSlotChange) {
+ s.VolumeSlots += other.VolumeSlots
+ s.ShardSlots += other.ShardSlots
+}
+
+// SubtractInPlace subtracts other from this StorageSlotChange in-place
+func (s *StorageSlotChange) SubtractInPlace(other StorageSlotChange) {
+ s.VolumeSlots -= other.VolumeSlots
+ s.ShardSlots -= other.ShardSlots
+}
+
+// IsZero returns true if both VolumeSlots and ShardSlots are zero
+func (s StorageSlotChange) IsZero() bool {
+ return s.VolumeSlots == 0 && s.ShardSlots == 0
+}
+
+// ShardsPerVolumeSlot defines how many EC shards are equivalent to one volume slot
+const ShardsPerVolumeSlot = erasure_coding.DataShardsCount
+
+// ToVolumeSlots converts the entire StorageSlotChange to equivalent volume slots
+func (s StorageSlotChange) ToVolumeSlots() int64 {
+ return int64(s.VolumeSlots) + int64(s.ShardSlots)/ShardsPerVolumeSlot
+}
+
+// ToShardSlots converts the entire StorageSlotChange to equivalent shard slots
+func (s StorageSlotChange) ToShardSlots() int32 {
+ return s.ShardSlots + s.VolumeSlots*ShardsPerVolumeSlot
+}
+
+// CanAccommodate checks if this StorageSlotChange can accommodate the required StorageSlotChange
+// Both are converted to shard slots for a more precise comparison
+func (s StorageSlotChange) CanAccommodate(required StorageSlotChange) bool {
+ return s.ToShardSlots() >= required.ToShardSlots()
+}
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index 23fbbd546..a83b33341 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -4,13 +4,14 @@ import (
"context"
"database/sql"
"fmt"
+ "strings"
+ "sync"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket"
"github.com/seaweedfs/seaweedfs/weed/util"
- "strings"
- "sync"
)
type SqlGenerator interface {
diff --git a/weed/pb/worker.proto b/weed/pb/worker.proto
index 0ab115bb2..811f94591 100644
--- a/weed/pb/worker.proto
+++ b/weed/pb/worker.proto
@@ -94,6 +94,7 @@ message TaskAssignment {
// TaskParams contains task-specific parameters with typed variants
message TaskParams {
+ string task_id = 12; // ActiveTopology task ID for lifecycle management
uint32 volume_id = 1;
string server = 2;
string collection = 3;
diff --git a/weed/pb/worker_pb/worker.pb.go b/weed/pb/worker_pb/worker.pb.go
index f6b3e9fb1..ff7d60545 100644
--- a/weed/pb/worker_pb/worker.pb.go
+++ b/weed/pb/worker_pb/worker.pb.go
@@ -804,6 +804,7 @@ func (x *TaskAssignment) GetMetadata() map[string]string {
// TaskParams contains task-specific parameters with typed variants
type TaskParams struct {
state protoimpl.MessageState `protogen:"open.v1"`
+ TaskId string `protobuf:"bytes,12,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` // ActiveTopology task ID for lifecycle management
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId,proto3" json:"volume_id,omitempty"`
Server string `protobuf:"bytes,2,opt,name=server,proto3" json:"server,omitempty"`
Collection string `protobuf:"bytes,3,opt,name=collection,proto3" json:"collection,omitempty"`
@@ -854,6 +855,13 @@ func (*TaskParams) Descriptor() ([]byte, []int) {
return file_worker_proto_rawDescGZIP(), []int{8}
}
+func (x *TaskParams) GetTaskId() string {
+ if x != nil {
+ return x.TaskId
+ }
+ return ""
+}
+
func (x *TaskParams) GetVolumeId() uint32 {
if x != nil {
return x.VolumeId
@@ -2869,9 +2877,10 @@ const file_worker_proto_rawDesc = "" +
"\bmetadata\x18\x06 \x03(\v2'.worker_pb.TaskAssignment.MetadataEntryR\bmetadata\x1a;\n" +
"\rMetadataEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
- "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x9a\x04\n" +
+ "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xb3\x04\n" +
"\n" +
- "TaskParams\x12\x1b\n" +
+ "TaskParams\x12\x17\n" +
+ "\atask_id\x18\f \x01(\tR\x06taskId\x12\x1b\n" +
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\x12\x16\n" +
"\x06server\x18\x02 \x01(\tR\x06server\x12\x1e\n" +
"\n" +
diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go
index 102f532a8..be03fb92f 100644
--- a/weed/worker/tasks/balance/detection.go
+++ b/weed/worker/tasks/balance/detection.go
@@ -83,7 +83,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
+ // Generate task ID for ActiveTopology integration
+ taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix())
+
task := &types.TaskDetectionResult{
+ TaskID: taskID, // Link to ActiveTopology pending task
TaskType: types.TaskTypeBalance,
VolumeID: selectedVolume.VolumeID,
Server: selectedVolume.Server,
@@ -103,6 +107,7 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// Create typed parameters with destination information
task.TypedParams = &worker_pb.TaskParams{
+ TaskId: taskID, // Link to ActiveTopology pending task
VolumeId: selectedVolume.VolumeID,
Server: selectedVolume.Server,
Collection: selectedVolume.Collection,
@@ -121,6 +126,35 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s (score: %.2f)",
selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode, destinationPlan.PlacementScore)
+
+ // Add pending balance task to ActiveTopology for capacity management
+
+ // Find the actual disk containing the volume on the source server
+ sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
+ if !found {
+ return nil, fmt.Errorf("BALANCE: Could not find volume %d (collection: %s) on source server %s - unable to create balance task",
+ selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
+ }
+ targetDisk := destinationPlan.TargetDisk
+
+ err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{
+ TaskID: taskID,
+ TaskType: topology.TaskTypeBalance,
+ VolumeID: selectedVolume.VolumeID,
+ VolumeSize: int64(selectedVolume.Size),
+ Sources: []topology.TaskSourceSpec{
+ {ServerID: selectedVolume.Server, DiskID: sourceDisk},
+ },
+ Destinations: []topology.TaskDestinationSpec{
+ {ServerID: destinationPlan.TargetNode, DiskID: targetDisk},
+ },
+ })
+ if err != nil {
+ return nil, fmt.Errorf("BALANCE: Failed to add pending task for volume %d: %v", selectedVolume.VolumeID, err)
+ }
+
+ glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d",
+ taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk)
} else {
glog.Warningf("No ActiveTopology available for destination planning in balance detection")
return nil, nil
diff --git a/weed/worker/tasks/base/volume_utils.go b/weed/worker/tasks/base/volume_utils.go
new file mode 100644
index 000000000..2aaf795b2
--- /dev/null
+++ b/weed/worker/tasks/base/volume_utils.go
@@ -0,0 +1,36 @@
+package base
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/admin/topology"
+)
+
+// FindVolumeDisk finds the disk ID where a specific volume is located on a given server.
+// Returns the disk ID and a boolean indicating whether the volume was found.
+// Uses O(1) indexed lookup for optimal performance on large clusters.
+//
+// This is a shared utility function used by multiple task detection algorithms
+// (balance, vacuum, etc.) to locate volumes efficiently.
+//
+// Example usage:
+//
+// // In balance task: find source disk for a volume that needs to be moved
+// sourceDisk, found := base.FindVolumeDisk(topology, volumeID, collection, sourceServer)
+//
+// // In vacuum task: find disk containing volume that needs cleanup
+// diskID, exists := base.FindVolumeDisk(topology, volumeID, collection, serverID)
+func FindVolumeDisk(activeTopology *topology.ActiveTopology, volumeID uint32, collection string, serverID string) (uint32, bool) {
+ if activeTopology == nil {
+ return 0, false
+ }
+
+ // Use the new O(1) indexed lookup for better performance
+ locations := activeTopology.GetVolumeLocations(volumeID, collection)
+ for _, loc := range locations {
+ if loc.ServerID == serverID {
+ return loc.DiskID, true
+ }
+ }
+
+ // Volume not found on this server
+ return 0, false
+}
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go
index 9cf87cdf6..ec632436f 100644
--- a/weed/worker/tasks/erasure_coding/detection.go
+++ b/weed/worker/tasks/erasure_coding/detection.go
@@ -61,7 +61,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// Check quiet duration and fullness criteria
if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
+ // Generate task ID for ActiveTopology integration
+ taskID := fmt.Sprintf("ec_vol_%d_%d", metric.VolumeID, now.Unix())
+
result := &types.TaskDetectionResult{
+ TaskID: taskID, // Link to ActiveTopology pending task
TaskType: types.TaskTypeErasureCoding,
VolumeID: metric.VolumeID,
Server: metric.Server,
@@ -81,12 +85,117 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
continue // Skip this volume if destination planning fails
}
- // Find all volume replicas from topology
- replicas := findVolumeReplicas(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
+ // Calculate expected shard size for EC operation
+ // Each data shard will be approximately volumeSize / dataShards
+ expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount)
+
+ // Add pending EC shard task to ActiveTopology for capacity management
+
+ // Extract shard destinations from multiPlan
+ var shardDestinations []string
+ var shardDiskIDs []uint32
+ for _, plan := range multiPlan.Plans {
+ shardDestinations = append(shardDestinations, plan.TargetNode)
+ shardDiskIDs = append(shardDiskIDs, plan.TargetDisk)
+ }
+
+ // Find all volume replica locations (server + disk) from topology
+ replicaLocations := findVolumeReplicaLocations(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
+ if len(replicaLocations) == 0 {
+ glog.Warningf("No replica locations found for volume %d, skipping EC", metric.VolumeID)
+ continue
+ }
+
+ // Find existing EC shards from previous failed attempts
+ existingECShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
+
+ // Combine volume replicas and existing EC shards for cleanup
+ var allSourceLocations []topology.TaskSourceLocation
+
+ // Add volume replicas (will free volume slots)
+ for _, replica := range replicaLocations {
+ allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{
+ ServerID: replica.ServerID,
+ DiskID: replica.DiskID,
+ CleanupType: topology.CleanupVolumeReplica,
+ })
+ }
+
+ // Add existing EC shards (will free shard slots)
+ duplicateCheck := make(map[string]bool)
+ for _, replica := range replicaLocations {
+ key := fmt.Sprintf("%s:%d", replica.ServerID, replica.DiskID)
+ duplicateCheck[key] = true
+ }
+
+ for _, shard := range existingECShards {
+ key := fmt.Sprintf("%s:%d", shard.ServerID, shard.DiskID)
+ if !duplicateCheck[key] { // Avoid duplicates if EC shards are on same disk as volume replicas
+ allSourceLocations = append(allSourceLocations, topology.TaskSourceLocation{
+ ServerID: shard.ServerID,
+ DiskID: shard.DiskID,
+ CleanupType: topology.CleanupECShards,
+ })
+ duplicateCheck[key] = true
+ }
+ }
+
+ glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)",
+ len(replicaLocations), len(existingECShards), metric.VolumeID, len(allSourceLocations))
+
+ // Convert TaskSourceLocation to TaskSourceSpec
+ sources := make([]topology.TaskSourceSpec, len(allSourceLocations))
+ for i, srcLoc := range allSourceLocations {
+ sources[i] = topology.TaskSourceSpec{
+ ServerID: srcLoc.ServerID,
+ DiskID: srcLoc.DiskID,
+ CleanupType: srcLoc.CleanupType,
+ }
+ }
+
+ // Convert shard destinations to TaskDestinationSpec
+ destinations := make([]topology.TaskDestinationSpec, len(shardDestinations))
+ shardImpact := topology.CalculateECShardStorageImpact(1, int64(expectedShardSize)) // 1 shard per destination
+ shardSize := int64(expectedShardSize)
+ for i, dest := range shardDestinations {
+ destinations[i] = topology.TaskDestinationSpec{
+ ServerID: dest,
+ DiskID: shardDiskIDs[i],
+ StorageImpact: &shardImpact,
+ EstimatedSize: &shardSize,
+ }
+ }
+
+ err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{
+ TaskID: taskID,
+ TaskType: topology.TaskTypeErasureCoding,
+ VolumeID: metric.VolumeID,
+ VolumeSize: int64(metric.Size),
+ Sources: sources,
+ Destinations: destinations,
+ })
+ if err != nil {
+ glog.Warningf("Failed to add pending EC shard task to ActiveTopology for volume %d: %v", metric.VolumeID, err)
+ continue // Skip this volume if topology task addition fails
+ }
+
+ glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations",
+ taskID, metric.VolumeID, len(allSourceLocations), len(multiPlan.Plans))
+
+ // Find all volume replicas from topology (for legacy worker compatibility)
+ var replicas []string
+ serverSet := make(map[string]struct{})
+ for _, loc := range replicaLocations {
+ if _, found := serverSet[loc.ServerID]; !found {
+ replicas = append(replicas, loc.ServerID)
+ serverSet[loc.ServerID] = struct{}{}
+ }
+ }
glog.V(1).Infof("Found %d replicas for volume %d: %v", len(replicas), metric.VolumeID, replicas)
// Create typed parameters with EC destination information and replicas
result.TypedParams = &worker_pb.TaskParams{
+ TaskId: taskID, // Link to ActiveTopology pending task
VolumeId: metric.VolumeID,
Server: metric.Server,
Collection: metric.Collection,
@@ -143,6 +252,9 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
// planECDestinations plans the destinations for erasure coding operation
// This function implements EC destination planning logic directly in the detection phase
func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) {
+ // Calculate expected shard size for EC operation
+ expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount)
+
// Get source node information from topology
var sourceRack, sourceDC string
@@ -168,10 +280,12 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
}
}
- // Get available disks for EC placement (include source node for EC)
- availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeErasureCoding, "")
+ // Get available disks for EC placement with effective capacity consideration (includes pending tasks)
+ // For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1
+ // For EC, we need at least 1 available volume slot on a disk to consider it for placement.
+ availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, metric.Server, 1)
if len(availableDisks) < erasure_coding.MinTotalDisks {
- return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d", erasure_coding.MinTotalDisks, len(availableDisks))
+ return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks))
}
// Select best disks for EC placement with rack/DC diversity
@@ -190,7 +304,7 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
TargetDisk: disk.DiskID,
TargetRack: disk.Rack,
TargetDC: disk.DataCenter,
- ExpectedSize: 0, // EC shards don't have predetermined size
+ ExpectedSize: expectedShardSize, // Set calculated EC shard size
PlacementScore: calculateECScore(disk, sourceRack, sourceDC),
Conflicts: checkECPlacementConflicts(disk, sourceRack, sourceDC),
}
@@ -202,6 +316,22 @@ func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.V
dcCount[disk.DataCenter]++
}
+ // Log capacity utilization information using ActiveTopology's encapsulated logic
+ totalEffectiveCapacity := int64(0)
+ for _, plan := range plans {
+ effectiveCapacity := activeTopology.GetEffectiveAvailableCapacity(plan.TargetNode, plan.TargetDisk)
+ totalEffectiveCapacity += effectiveCapacity
+ }
+
+ glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs, total effective capacity=%d slots",
+ metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount), totalEffectiveCapacity)
+
+ // Log storage impact for EC task (source only - EC has multiple targets handled individually)
+ sourceChange, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, int64(metric.Size))
+ glog.V(2).Infof("EC task capacity management: source_reserves_with_zero_impact={VolumeSlots:%d, ShardSlots:%d}, %d_targets_will_receive_shards, estimated_size=%d",
+ sourceChange.VolumeSlots, sourceChange.ShardSlots, len(plans), metric.Size)
+ glog.V(2).Infof("EC source reserves capacity but with zero StorageSlotChange impact")
+
return &topology.MultiDestinationPlan{
Plans: plans,
TotalShards: len(plans),
@@ -354,13 +484,8 @@ func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
return false
}
- // Check if disk has capacity
- if disk.DiskInfo.VolumeCount >= disk.DiskInfo.MaxVolumeCount {
- return false
- }
-
- // Check if disk is not overloaded
- if disk.LoadCount > 10 { // Arbitrary threshold
+ // Check if disk is not overloaded with tasks
+ if disk.LoadCount > topology.MaxTaskLoadForECPlacement {
return false
}
@@ -380,6 +505,24 @@ func checkECPlacementConflicts(disk *topology.DiskInfo, sourceRack, sourceDC str
return conflicts
}
+// findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume
+// Uses O(1) indexed lookup for optimal performance on large clusters.
+func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {
+ if activeTopology == nil {
+ return nil
+ }
+ return activeTopology.GetVolumeLocations(volumeID, collection)
+}
+
+// findExistingECShards finds existing EC shards for a volume (from previous failed EC attempts)
+// Uses O(1) indexed lookup for optimal performance on large clusters.
+func findExistingECShards(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {
+ if activeTopology == nil {
+ return nil
+ }
+ return activeTopology.GetECShardLocations(volumeID, collection)
+}
+
// findVolumeReplicas finds all servers that have replicas of the specified volume
func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string {
if activeTopology == nil {
diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go
index 23f82ad34..0c14bb956 100644
--- a/weed/worker/tasks/vacuum/detection.go
+++ b/weed/worker/tasks/vacuum/detection.go
@@ -1,6 +1,7 @@
package vacuum
import (
+ "fmt"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -31,7 +32,11 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI
priority = types.TaskPriorityHigh
}
+ // Generate task ID for future ActiveTopology integration
+ taskID := fmt.Sprintf("vacuum_vol_%d_%d", metric.VolumeID, time.Now().Unix())
+
result := &types.TaskDetectionResult{
+ TaskID: taskID, // For future ActiveTopology integration
TaskType: types.TaskTypeVacuum,
VolumeID: metric.VolumeID,
Server: metric.Server,
@@ -96,6 +101,7 @@ func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.Volum
// Create typed protobuf parameters
return &worker_pb.TaskParams{
+ TaskId: task.TaskID, // Link to ActiveTopology pending task (if integrated)
VolumeId: task.VolumeID,
Server: task.Server,
Collection: task.Collection,
diff --git a/weed/worker/types/task_types.go b/weed/worker/types/task_types.go
index ed7fc8f07..d5dbc4008 100644
--- a/weed/worker/types/task_types.go
+++ b/weed/worker/types/task_types.go
@@ -73,6 +73,7 @@ type TaskParams struct {
// TaskDetectionResult represents the result of scanning for maintenance needs
type TaskDetectionResult struct {
+ TaskID string `json:"task_id"` // ActiveTopology task ID for lifecycle management
TaskType TaskType `json:"task_type"`
VolumeID uint32 `json:"volume_id,omitempty"`
Server string `json:"server,omitempty"`