diff options
Diffstat (limited to 'weed/admin/topology/active_topology_test.go')
| -rw-r--r-- | weed/admin/topology/active_topology_test.go | 154 |
1 files changed, 127 insertions, 27 deletions
diff --git a/weed/admin/topology/active_topology_test.go b/weed/admin/topology/active_topology_test.go index 4e8b0b3a8..9b0990f21 100644 --- a/weed/admin/topology/active_topology_test.go +++ b/weed/admin/topology/active_topology_test.go @@ -1,6 +1,7 @@ package topology import ( + "fmt" "testing" "time" @@ -9,6 +10,16 @@ import ( "github.com/stretchr/testify/require" ) +// Helper function to find a disk by ID for testing - reduces code duplication +func findDiskByID(disks []*DiskInfo, diskID uint32) *DiskInfo { + for _, disk := range disks { + if disk.DiskID == diskID { + return disk + } + } + return nil +} + // TestActiveTopologyBasicOperations tests basic topology management func TestActiveTopologyBasicOperations(t *testing.T) { topology := NewActiveTopology(10) @@ -58,8 +69,19 @@ func TestTaskLifecycle(t *testing.T) { taskID := "balance-001" // 1. Add pending task - topology.AddPendingTask(taskID, TaskTypeBalance, 1001, - "10.0.0.1:8080", 0, "10.0.0.2:8080", 1) + err := topology.AddPendingTask(TaskSpec{ + TaskID: taskID, + TaskType: TaskTypeBalance, + VolumeID: 1001, + VolumeSize: 1024 * 1024 * 1024, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "10.0.0.2:8080", DiskID: 1}, + }, + }) + assert.NoError(t, err, "Should add pending task successfully") // Verify pending state assert.Equal(t, 1, len(topology.pendingTasks)) @@ -77,7 +99,7 @@ func TestTaskLifecycle(t *testing.T) { assert.Equal(t, 1, len(targetDisk.pendingTasks)) // 2. Assign task - err := topology.AssignTask(taskID) + err = topology.AssignTask(taskID) require.NoError(t, err) // Verify assigned state @@ -258,8 +280,7 @@ func TestTargetSelectionScenarios(t *testing.T) { assert.NotEqual(t, tt.excludeNode, disk.NodeID, "Available disk should not be on excluded node") - load := tt.topology.GetDiskLoad(disk.NodeID, disk.DiskID) - assert.Less(t, load, 2, "Disk load should be less than 2") + assert.Less(t, disk.LoadCount, 2, "Disk load should be less than 2") } }) } @@ -271,37 +292,65 @@ func TestDiskLoadCalculation(t *testing.T) { topology.UpdateTopology(createSampleTopology()) // Initially no load - load := topology.GetDiskLoad("10.0.0.1:8080", 0) - assert.Equal(t, 0, load) + disks := topology.GetNodeDisks("10.0.0.1:8080") + targetDisk := findDiskByID(disks, 0) + require.NotNil(t, targetDisk, "Should find disk with ID 0") + assert.Equal(t, 0, targetDisk.LoadCount) // Add pending task - topology.AddPendingTask("task1", TaskTypeBalance, 1001, - "10.0.0.1:8080", 0, "10.0.0.2:8080", 1) + err := topology.AddPendingTask(TaskSpec{ + TaskID: "task1", + TaskType: TaskTypeBalance, + VolumeID: 1001, + VolumeSize: 1024 * 1024 * 1024, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "10.0.0.2:8080", DiskID: 1}, + }, + }) + assert.NoError(t, err, "Should add pending task successfully") // Check load increased - load = topology.GetDiskLoad("10.0.0.1:8080", 0) - assert.Equal(t, 1, load) + disks = topology.GetNodeDisks("10.0.0.1:8080") + targetDisk = findDiskByID(disks, 0) + assert.Equal(t, 1, targetDisk.LoadCount) // Add another task to same disk - topology.AddPendingTask("task2", TaskTypeVacuum, 1002, - "10.0.0.1:8080", 0, "", 0) + err = topology.AddPendingTask(TaskSpec{ + TaskID: "task2", + TaskType: TaskTypeVacuum, + VolumeID: 1002, + VolumeSize: 0, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "", DiskID: 0}, // Vacuum doesn't have a destination + }, + }) + assert.NoError(t, err, "Should add vacuum task successfully") - load = topology.GetDiskLoad("10.0.0.1:8080", 0) - assert.Equal(t, 2, load) + disks = topology.GetNodeDisks("10.0.0.1:8080") + targetDisk = findDiskByID(disks, 0) + assert.Equal(t, 2, targetDisk.LoadCount) // Move one task to assigned topology.AssignTask("task1") // Load should still be 2 (1 pending + 1 assigned) - load = topology.GetDiskLoad("10.0.0.1:8080", 0) - assert.Equal(t, 2, load) + disks = topology.GetNodeDisks("10.0.0.1:8080") + targetDisk = findDiskByID(disks, 0) + assert.Equal(t, 2, targetDisk.LoadCount) // Complete one task topology.CompleteTask("task1") // Load should decrease to 1 - load = topology.GetDiskLoad("10.0.0.1:8080", 0) - assert.Equal(t, 1, load) + disks = topology.GetNodeDisks("10.0.0.1:8080") + targetDisk = findDiskByID(disks, 0) + assert.Equal(t, 1, targetDisk.LoadCount) } // TestTaskConflictDetection tests task conflict detection @@ -310,8 +359,19 @@ func TestTaskConflictDetection(t *testing.T) { topology.UpdateTopology(createSampleTopology()) // Add a balance task - topology.AddPendingTask("balance1", TaskTypeBalance, 1001, - "10.0.0.1:8080", 0, "10.0.0.2:8080", 1) + err := topology.AddPendingTask(TaskSpec{ + TaskID: "balance1", + TaskType: TaskTypeBalance, + VolumeID: 1001, + VolumeSize: 1024 * 1024 * 1024, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "10.0.0.2:8080", DiskID: 1}, + }, + }) + assert.NoError(t, err, "Should add balance task successfully") topology.AssignTask("balance1") // Try to get available disks for vacuum (conflicts with balance) @@ -448,8 +508,22 @@ func createTopologyWithLoad() *ActiveTopology { topology.UpdateTopology(createSampleTopology()) // Add some existing tasks to create load - topology.AddPendingTask("existing1", TaskTypeVacuum, 2001, - "10.0.0.1:8080", 0, "", 0) + err := topology.AddPendingTask(TaskSpec{ + TaskID: "existing1", + TaskType: TaskTypeVacuum, + VolumeID: 2001, + VolumeSize: 0, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "", DiskID: 0}, // Vacuum doesn't have a destination + }, + }) + if err != nil { + // In test helper function, just log error instead of failing + fmt.Printf("Warning: Failed to add existing task: %v\n", err) + } topology.AssignTask("existing1") return topology @@ -466,12 +540,38 @@ func createTopologyWithConflicts() *ActiveTopology { topology.UpdateTopology(createSampleTopology()) // Add conflicting tasks - topology.AddPendingTask("balance1", TaskTypeBalance, 3001, - "10.0.0.1:8080", 0, "10.0.0.2:8080", 0) + err := topology.AddPendingTask(TaskSpec{ + TaskID: "balance1", + TaskType: TaskTypeBalance, + VolumeID: 3001, + VolumeSize: 1024 * 1024 * 1024, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 0}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "10.0.0.2:8080", DiskID: 0}, + }, + }) + if err != nil { + fmt.Printf("Warning: Failed to add balance task: %v\n", err) + } topology.AssignTask("balance1") - topology.AddPendingTask("ec1", TaskTypeErasureCoding, 3002, - "10.0.0.1:8080", 1, "", 0) + err = topology.AddPendingTask(TaskSpec{ + TaskID: "ec1", + TaskType: TaskTypeErasureCoding, + VolumeID: 3002, + VolumeSize: 1024 * 1024 * 1024, + Sources: []TaskSourceSpec{ + {ServerID: "10.0.0.1:8080", DiskID: 1}, + }, + Destinations: []TaskDestinationSpec{ + {ServerID: "", DiskID: 0}, // EC doesn't have single destination + }, + }) + if err != nil { + fmt.Printf("Warning: Failed to add EC task: %v\n", err) + } topology.AssignTask("ec1") return topology |
