aboutsummaryrefslogtreecommitdiff
path: root/weed/admin/topology/active_topology_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/admin/topology/active_topology_test.go')
-rw-r--r--weed/admin/topology/active_topology_test.go154
1 files changed, 127 insertions, 27 deletions
diff --git a/weed/admin/topology/active_topology_test.go b/weed/admin/topology/active_topology_test.go
index 4e8b0b3a8..9b0990f21 100644
--- a/weed/admin/topology/active_topology_test.go
+++ b/weed/admin/topology/active_topology_test.go
@@ -1,6 +1,7 @@
package topology
import (
+ "fmt"
"testing"
"time"
@@ -9,6 +10,16 @@ import (
"github.com/stretchr/testify/require"
)
+// Helper function to find a disk by ID for testing - reduces code duplication
+func findDiskByID(disks []*DiskInfo, diskID uint32) *DiskInfo {
+ for _, disk := range disks {
+ if disk.DiskID == diskID {
+ return disk
+ }
+ }
+ return nil
+}
+
// TestActiveTopologyBasicOperations tests basic topology management
func TestActiveTopologyBasicOperations(t *testing.T) {
topology := NewActiveTopology(10)
@@ -58,8 +69,19 @@ func TestTaskLifecycle(t *testing.T) {
taskID := "balance-001"
// 1. Add pending task
- topology.AddPendingTask(taskID, TaskTypeBalance, 1001,
- "10.0.0.1:8080", 0, "10.0.0.2:8080", 1)
+ err := topology.AddPendingTask(TaskSpec{
+ TaskID: taskID,
+ TaskType: TaskTypeBalance,
+ VolumeID: 1001,
+ VolumeSize: 1024 * 1024 * 1024,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "10.0.0.2:8080", DiskID: 1},
+ },
+ })
+ assert.NoError(t, err, "Should add pending task successfully")
// Verify pending state
assert.Equal(t, 1, len(topology.pendingTasks))
@@ -77,7 +99,7 @@ func TestTaskLifecycle(t *testing.T) {
assert.Equal(t, 1, len(targetDisk.pendingTasks))
// 2. Assign task
- err := topology.AssignTask(taskID)
+ err = topology.AssignTask(taskID)
require.NoError(t, err)
// Verify assigned state
@@ -258,8 +280,7 @@ func TestTargetSelectionScenarios(t *testing.T) {
assert.NotEqual(t, tt.excludeNode, disk.NodeID,
"Available disk should not be on excluded node")
- load := tt.topology.GetDiskLoad(disk.NodeID, disk.DiskID)
- assert.Less(t, load, 2, "Disk load should be less than 2")
+ assert.Less(t, disk.LoadCount, 2, "Disk load should be less than 2")
}
})
}
@@ -271,37 +292,65 @@ func TestDiskLoadCalculation(t *testing.T) {
topology.UpdateTopology(createSampleTopology())
// Initially no load
- load := topology.GetDiskLoad("10.0.0.1:8080", 0)
- assert.Equal(t, 0, load)
+ disks := topology.GetNodeDisks("10.0.0.1:8080")
+ targetDisk := findDiskByID(disks, 0)
+ require.NotNil(t, targetDisk, "Should find disk with ID 0")
+ assert.Equal(t, 0, targetDisk.LoadCount)
// Add pending task
- topology.AddPendingTask("task1", TaskTypeBalance, 1001,
- "10.0.0.1:8080", 0, "10.0.0.2:8080", 1)
+ err := topology.AddPendingTask(TaskSpec{
+ TaskID: "task1",
+ TaskType: TaskTypeBalance,
+ VolumeID: 1001,
+ VolumeSize: 1024 * 1024 * 1024,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "10.0.0.2:8080", DiskID: 1},
+ },
+ })
+ assert.NoError(t, err, "Should add pending task successfully")
// Check load increased
- load = topology.GetDiskLoad("10.0.0.1:8080", 0)
- assert.Equal(t, 1, load)
+ disks = topology.GetNodeDisks("10.0.0.1:8080")
+ targetDisk = findDiskByID(disks, 0)
+ assert.Equal(t, 1, targetDisk.LoadCount)
// Add another task to same disk
- topology.AddPendingTask("task2", TaskTypeVacuum, 1002,
- "10.0.0.1:8080", 0, "", 0)
+ err = topology.AddPendingTask(TaskSpec{
+ TaskID: "task2",
+ TaskType: TaskTypeVacuum,
+ VolumeID: 1002,
+ VolumeSize: 0,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "", DiskID: 0}, // Vacuum doesn't have a destination
+ },
+ })
+ assert.NoError(t, err, "Should add vacuum task successfully")
- load = topology.GetDiskLoad("10.0.0.1:8080", 0)
- assert.Equal(t, 2, load)
+ disks = topology.GetNodeDisks("10.0.0.1:8080")
+ targetDisk = findDiskByID(disks, 0)
+ assert.Equal(t, 2, targetDisk.LoadCount)
// Move one task to assigned
topology.AssignTask("task1")
// Load should still be 2 (1 pending + 1 assigned)
- load = topology.GetDiskLoad("10.0.0.1:8080", 0)
- assert.Equal(t, 2, load)
+ disks = topology.GetNodeDisks("10.0.0.1:8080")
+ targetDisk = findDiskByID(disks, 0)
+ assert.Equal(t, 2, targetDisk.LoadCount)
// Complete one task
topology.CompleteTask("task1")
// Load should decrease to 1
- load = topology.GetDiskLoad("10.0.0.1:8080", 0)
- assert.Equal(t, 1, load)
+ disks = topology.GetNodeDisks("10.0.0.1:8080")
+ targetDisk = findDiskByID(disks, 0)
+ assert.Equal(t, 1, targetDisk.LoadCount)
}
// TestTaskConflictDetection tests task conflict detection
@@ -310,8 +359,19 @@ func TestTaskConflictDetection(t *testing.T) {
topology.UpdateTopology(createSampleTopology())
// Add a balance task
- topology.AddPendingTask("balance1", TaskTypeBalance, 1001,
- "10.0.0.1:8080", 0, "10.0.0.2:8080", 1)
+ err := topology.AddPendingTask(TaskSpec{
+ TaskID: "balance1",
+ TaskType: TaskTypeBalance,
+ VolumeID: 1001,
+ VolumeSize: 1024 * 1024 * 1024,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "10.0.0.2:8080", DiskID: 1},
+ },
+ })
+ assert.NoError(t, err, "Should add balance task successfully")
topology.AssignTask("balance1")
// Try to get available disks for vacuum (conflicts with balance)
@@ -448,8 +508,22 @@ func createTopologyWithLoad() *ActiveTopology {
topology.UpdateTopology(createSampleTopology())
// Add some existing tasks to create load
- topology.AddPendingTask("existing1", TaskTypeVacuum, 2001,
- "10.0.0.1:8080", 0, "", 0)
+ err := topology.AddPendingTask(TaskSpec{
+ TaskID: "existing1",
+ TaskType: TaskTypeVacuum,
+ VolumeID: 2001,
+ VolumeSize: 0,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "", DiskID: 0}, // Vacuum doesn't have a destination
+ },
+ })
+ if err != nil {
+ // In test helper function, just log error instead of failing
+ fmt.Printf("Warning: Failed to add existing task: %v\n", err)
+ }
topology.AssignTask("existing1")
return topology
@@ -466,12 +540,38 @@ func createTopologyWithConflicts() *ActiveTopology {
topology.UpdateTopology(createSampleTopology())
// Add conflicting tasks
- topology.AddPendingTask("balance1", TaskTypeBalance, 3001,
- "10.0.0.1:8080", 0, "10.0.0.2:8080", 0)
+ err := topology.AddPendingTask(TaskSpec{
+ TaskID: "balance1",
+ TaskType: TaskTypeBalance,
+ VolumeID: 3001,
+ VolumeSize: 1024 * 1024 * 1024,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 0},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "10.0.0.2:8080", DiskID: 0},
+ },
+ })
+ if err != nil {
+ fmt.Printf("Warning: Failed to add balance task: %v\n", err)
+ }
topology.AssignTask("balance1")
- topology.AddPendingTask("ec1", TaskTypeErasureCoding, 3002,
- "10.0.0.1:8080", 1, "", 0)
+ err = topology.AddPendingTask(TaskSpec{
+ TaskID: "ec1",
+ TaskType: TaskTypeErasureCoding,
+ VolumeID: 3002,
+ VolumeSize: 1024 * 1024 * 1024,
+ Sources: []TaskSourceSpec{
+ {ServerID: "10.0.0.1:8080", DiskID: 1},
+ },
+ Destinations: []TaskDestinationSpec{
+ {ServerID: "", DiskID: 0}, // EC doesn't have single destination
+ },
+ })
+ if err != nil {
+ fmt.Printf("Warning: Failed to add EC task: %v\n", err)
+ }
topology.AssignTask("ec1")
return topology