aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/erasure_coding
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-12-02 12:30:15 -0800
committerGitHub <noreply@github.com>2025-12-02 12:30:15 -0800
commit4f038820dc480a7374b928a17cc9121474ba4172 (patch)
treed15e1dade3537fbd462e819403b3347433ff37bd /weed/storage/erasure_coding
parentebb06a3908990c31dcfb4995a69682d836228179 (diff)
downloadseaweedfs-4f038820dc480a7374b928a17cc9121474ba4172.tar.xz
seaweedfs-4f038820dc480a7374b928a17cc9121474ba4172.zip
Add disk-aware EC rebalancing (#7597)
* Add placement package for EC shard placement logic - Consolidate EC shard placement algorithm for reuse across shell and worker tasks - Support multi-pass selection: racks, then servers, then disks - Include proper spread verification and scoring functions - Comprehensive test coverage for various cluster topologies * Make ec.balance disk-aware for multi-disk servers - Add EcDisk struct to track individual disks on volume servers - Update EcNode to maintain per-disk shard distribution - Parse disk_id from EC shard information during topology collection - Implement pickBestDiskOnNode() for selecting best disk per shard - Add diskDistributionScore() for tie-breaking node selection - Update all move operations to specify target disk in RPC calls - Improves shard balance within multi-disk servers, not just across servers * Use placement package in EC detection for consistent disk-level placement - Replace custom EC disk selection logic with shared placement package - Convert topology DiskInfo to placement.DiskCandidate format - Use SelectDestinations() for multi-rack/server/disk spreading - Convert placement results back to topology DiskInfo for task creation - Ensures EC detection uses same placement logic as shell commands * Make volume server evacuation disk-aware - Use pickBestDiskOnNode() when selecting evacuation target disk - Specify target disk in evacuation RPC requests - Maintains balanced disk distribution during server evacuations * Rename PlacementConfig to PlacementRequest for clarity PlacementRequest better reflects that this is a request for placement rather than a configuration object. This improves API semantics. * Rename DefaultConfig to DefaultPlacementRequest Aligns with the PlacementRequest type naming for consistency * Address review comments from Gemini and CodeRabbit Fix HIGH issues: - Fix empty disk discovery: Now discovers all disks from VolumeInfos, not just from EC shards. This ensures disks without EC shards are still considered for placement. - Fix EC shard count calculation in detection.go: Now correctly filters by DiskId and sums actual shard counts using ShardBits.ShardIdCount() instead of just counting EcShardInfo entries. Fix MEDIUM issues: - Add disk ID to evacuation log messages for consistency with other logging - Remove unused serverToDisks variable in placement.go - Fix comment that incorrectly said 'ascending' when sorting is 'descending' * add ec tests * Update ec-integration-tests.yml * Update ec_integration_test.go * Fix EC integration tests CI: build weed binary and update actions - Add 'Build weed binary' step before running tests - Update actions/setup-go from v4 to v6 (Node20 compatibility) - Update actions/checkout from v2 to v4 (Node20 compatibility) - Move working-directory to test step only * Add disk-aware EC rebalancing integration tests - Add TestDiskAwareECRebalancing test with multi-disk cluster setup - Test EC encode with disk awareness (shows disk ID in output) - Test EC balance with disk-level shard distribution - Add helper functions for disk-level verification: - startMultiDiskCluster: 3 servers x 4 disks each - countShardsPerDisk: track shards per disk per server - calculateDiskShardVariance: measure distribution balance - Verify no single disk is overloaded with shards
Diffstat (limited to 'weed/storage/erasure_coding')
-rw-r--r--weed/storage/erasure_coding/placement/placement.go420
-rw-r--r--weed/storage/erasure_coding/placement/placement_test.go517
2 files changed, 937 insertions, 0 deletions
diff --git a/weed/storage/erasure_coding/placement/placement.go b/weed/storage/erasure_coding/placement/placement.go
new file mode 100644
index 000000000..67e21c1f8
--- /dev/null
+++ b/weed/storage/erasure_coding/placement/placement.go
@@ -0,0 +1,420 @@
+// Package placement provides consolidated EC shard placement logic used by
+// both shell commands and worker tasks.
+//
+// This package encapsulates the algorithms for:
+// - Selecting destination nodes/disks for EC shards
+// - Ensuring proper spread across racks, servers, and disks
+// - Balancing shards across the cluster
+package placement
+
+import (
+ "fmt"
+ "sort"
+)
+
+// DiskCandidate represents a disk that can receive EC shards
+type DiskCandidate struct {
+ NodeID string
+ DiskID uint32
+ DataCenter string
+ Rack string
+
+ // Capacity information
+ VolumeCount int64
+ MaxVolumeCount int64
+ ShardCount int // Current number of EC shards on this disk
+ FreeSlots int // Available slots for new shards
+
+ // Load information
+ LoadCount int // Number of active tasks on this disk
+}
+
+// NodeCandidate represents a server node that can receive EC shards
+type NodeCandidate struct {
+ NodeID string
+ DataCenter string
+ Rack string
+ FreeSlots int
+ ShardCount int // Total shards across all disks
+ Disks []*DiskCandidate // All disks on this node
+}
+
+// PlacementRequest configures EC shard placement behavior
+type PlacementRequest struct {
+ // ShardsNeeded is the total number of shards to place
+ ShardsNeeded int
+
+ // MaxShardsPerServer limits how many shards can be placed on a single server
+ // 0 means no limit (but prefer spreading when possible)
+ MaxShardsPerServer int
+
+ // MaxShardsPerRack limits how many shards can be placed in a single rack
+ // 0 means no limit
+ MaxShardsPerRack int
+
+ // MaxTaskLoad is the maximum task load count for a disk to be considered
+ MaxTaskLoad int
+
+ // PreferDifferentServers when true, spreads shards across different servers
+ // before using multiple disks on the same server
+ PreferDifferentServers bool
+
+ // PreferDifferentRacks when true, spreads shards across different racks
+ // before using multiple servers in the same rack
+ PreferDifferentRacks bool
+}
+
+// DefaultPlacementRequest returns the default placement configuration
+func DefaultPlacementRequest() PlacementRequest {
+ return PlacementRequest{
+ ShardsNeeded: 14,
+ MaxShardsPerServer: 0,
+ MaxShardsPerRack: 0,
+ MaxTaskLoad: 5,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+}
+
+// PlacementResult contains the selected destinations for EC shards
+type PlacementResult struct {
+ SelectedDisks []*DiskCandidate
+
+ // Statistics
+ ServersUsed int
+ RacksUsed int
+ DCsUsed int
+
+ // Distribution maps
+ ShardsPerServer map[string]int
+ ShardsPerRack map[string]int
+ ShardsPerDC map[string]int
+}
+
+// SelectDestinations selects the best disks for EC shard placement.
+// This is the main entry point for EC placement logic.
+//
+// The algorithm works in multiple passes:
+// 1. First pass: Select one disk from each rack (maximize rack diversity)
+// 2. Second pass: Select one disk from each unused server in used racks (maximize server diversity)
+// 3. Third pass: Select additional disks from servers already used (maximize disk diversity)
+func SelectDestinations(disks []*DiskCandidate, config PlacementRequest) (*PlacementResult, error) {
+ if len(disks) == 0 {
+ return nil, fmt.Errorf("no disk candidates provided")
+ }
+ if config.ShardsNeeded <= 0 {
+ return nil, fmt.Errorf("shardsNeeded must be positive, got %d", config.ShardsNeeded)
+ }
+
+ // Filter suitable disks
+ suitable := filterSuitableDisks(disks, config)
+ if len(suitable) == 0 {
+ return nil, fmt.Errorf("no suitable disks found after filtering")
+ }
+
+ // Build indexes for efficient lookup
+ rackToDisks := groupDisksByRack(suitable)
+
+ result := &PlacementResult{
+ SelectedDisks: make([]*DiskCandidate, 0, config.ShardsNeeded),
+ ShardsPerServer: make(map[string]int),
+ ShardsPerRack: make(map[string]int),
+ ShardsPerDC: make(map[string]int),
+ }
+
+ usedDisks := make(map[string]bool) // "nodeID:diskID" -> bool
+ usedServers := make(map[string]bool) // nodeID -> bool
+ usedRacks := make(map[string]bool) // "dc:rack" -> bool
+
+ // Pass 1: Select one disk from each rack (maximize rack diversity)
+ if config.PreferDifferentRacks {
+ // Sort racks by number of available servers (descending) to prioritize racks with more options
+ sortedRacks := sortRacksByServerCount(rackToDisks)
+ for _, rackKey := range sortedRacks {
+ if len(result.SelectedDisks) >= config.ShardsNeeded {
+ break
+ }
+ rackDisks := rackToDisks[rackKey]
+ // Select best disk from this rack, preferring a new server
+ disk := selectBestDiskFromRack(rackDisks, usedServers, usedDisks, config)
+ if disk != nil {
+ addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
+ }
+ }
+ }
+
+ // Pass 2: Select disks from unused servers in already-used racks
+ if config.PreferDifferentServers && len(result.SelectedDisks) < config.ShardsNeeded {
+ for _, rackKey := range getSortedRackKeys(rackToDisks) {
+ if len(result.SelectedDisks) >= config.ShardsNeeded {
+ break
+ }
+ rackDisks := rackToDisks[rackKey]
+ for _, disk := range sortDisksByScore(rackDisks) {
+ if len(result.SelectedDisks) >= config.ShardsNeeded {
+ break
+ }
+ diskKey := getDiskKey(disk)
+ if usedDisks[diskKey] {
+ continue
+ }
+ // Skip if server already used (we want different servers in this pass)
+ if usedServers[disk.NodeID] {
+ continue
+ }
+ // Check server limit
+ if config.MaxShardsPerServer > 0 && result.ShardsPerServer[disk.NodeID] >= config.MaxShardsPerServer {
+ continue
+ }
+ // Check rack limit
+ if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack {
+ continue
+ }
+ addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
+ }
+ }
+ }
+
+ // Pass 3: Fill remaining slots from already-used servers (different disks)
+ // Use round-robin across servers to balance shards evenly
+ if len(result.SelectedDisks) < config.ShardsNeeded {
+ // Group remaining disks by server
+ serverToRemainingDisks := make(map[string][]*DiskCandidate)
+ for _, disk := range suitable {
+ if !usedDisks[getDiskKey(disk)] {
+ serverToRemainingDisks[disk.NodeID] = append(serverToRemainingDisks[disk.NodeID], disk)
+ }
+ }
+
+ // Sort each server's disks by score
+ for serverID := range serverToRemainingDisks {
+ serverToRemainingDisks[serverID] = sortDisksByScore(serverToRemainingDisks[serverID])
+ }
+
+ // Round-robin: repeatedly select from the server with the fewest shards
+ for len(result.SelectedDisks) < config.ShardsNeeded {
+ // Find server with fewest shards that still has available disks
+ var bestServer string
+ minShards := -1
+ for serverID, disks := range serverToRemainingDisks {
+ if len(disks) == 0 {
+ continue
+ }
+ // Check server limit
+ if config.MaxShardsPerServer > 0 && result.ShardsPerServer[serverID] >= config.MaxShardsPerServer {
+ continue
+ }
+ shardCount := result.ShardsPerServer[serverID]
+ if minShards == -1 || shardCount < minShards {
+ minShards = shardCount
+ bestServer = serverID
+ } else if shardCount == minShards && serverID < bestServer {
+ // Tie-break by server name for determinism
+ bestServer = serverID
+ }
+ }
+
+ if bestServer == "" {
+ // No more servers with available disks
+ break
+ }
+
+ // Pop the best disk from this server
+ disks := serverToRemainingDisks[bestServer]
+ disk := disks[0]
+ serverToRemainingDisks[bestServer] = disks[1:]
+
+ // Check rack limit
+ if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack {
+ continue
+ }
+
+ addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
+ }
+ }
+
+ // Calculate final statistics
+ result.ServersUsed = len(usedServers)
+ result.RacksUsed = len(usedRacks)
+ dcSet := make(map[string]bool)
+ for _, disk := range result.SelectedDisks {
+ dcSet[disk.DataCenter] = true
+ }
+ result.DCsUsed = len(dcSet)
+
+ return result, nil
+}
+
+// filterSuitableDisks filters disks that are suitable for EC placement
+func filterSuitableDisks(disks []*DiskCandidate, config PlacementRequest) []*DiskCandidate {
+ var suitable []*DiskCandidate
+ for _, disk := range disks {
+ if disk.FreeSlots <= 0 {
+ continue
+ }
+ if config.MaxTaskLoad > 0 && disk.LoadCount > config.MaxTaskLoad {
+ continue
+ }
+ suitable = append(suitable, disk)
+ }
+ return suitable
+}
+
+// groupDisksByRack groups disks by their rack (dc:rack key)
+func groupDisksByRack(disks []*DiskCandidate) map[string][]*DiskCandidate {
+ result := make(map[string][]*DiskCandidate)
+ for _, disk := range disks {
+ key := getRackKey(disk)
+ result[key] = append(result[key], disk)
+ }
+ return result
+}
+
+// groupDisksByServer groups disks by their server
+func groupDisksByServer(disks []*DiskCandidate) map[string][]*DiskCandidate {
+ result := make(map[string][]*DiskCandidate)
+ for _, disk := range disks {
+ result[disk.NodeID] = append(result[disk.NodeID], disk)
+ }
+ return result
+}
+
+// getRackKey returns the unique key for a rack (dc:rack)
+func getRackKey(disk *DiskCandidate) string {
+ return fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
+}
+
+// getDiskKey returns the unique key for a disk (nodeID:diskID)
+func getDiskKey(disk *DiskCandidate) string {
+ return fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID)
+}
+
+// sortRacksByServerCount returns rack keys sorted by number of servers (ascending)
+func sortRacksByServerCount(rackToDisks map[string][]*DiskCandidate) []string {
+ // Count unique servers per rack
+ rackServerCount := make(map[string]int)
+ for rackKey, disks := range rackToDisks {
+ servers := make(map[string]bool)
+ for _, disk := range disks {
+ servers[disk.NodeID] = true
+ }
+ rackServerCount[rackKey] = len(servers)
+ }
+
+ keys := getSortedRackKeys(rackToDisks)
+ sort.Slice(keys, func(i, j int) bool {
+ // Sort by server count (descending) to pick from racks with more options first
+ return rackServerCount[keys[i]] > rackServerCount[keys[j]]
+ })
+ return keys
+}
+
+// getSortedRackKeys returns rack keys in a deterministic order
+func getSortedRackKeys(rackToDisks map[string][]*DiskCandidate) []string {
+ keys := make([]string, 0, len(rackToDisks))
+ for k := range rackToDisks {
+ keys = append(keys, k)
+ }
+ sort.Strings(keys)
+ return keys
+}
+
+// selectBestDiskFromRack selects the best disk from a rack for EC placement
+// It prefers servers that haven't been used yet
+func selectBestDiskFromRack(disks []*DiskCandidate, usedServers, usedDisks map[string]bool, config PlacementRequest) *DiskCandidate {
+ var bestDisk *DiskCandidate
+ bestScore := -1.0
+ bestIsFromUnusedServer := false
+
+ for _, disk := range disks {
+ if usedDisks[getDiskKey(disk)] {
+ continue
+ }
+ isFromUnusedServer := !usedServers[disk.NodeID]
+ score := calculateDiskScore(disk)
+
+ // Prefer unused servers
+ if isFromUnusedServer && !bestIsFromUnusedServer {
+ bestDisk = disk
+ bestScore = score
+ bestIsFromUnusedServer = true
+ } else if isFromUnusedServer == bestIsFromUnusedServer && score > bestScore {
+ bestDisk = disk
+ bestScore = score
+ }
+ }
+
+ return bestDisk
+}
+
+// sortDisksByScore returns disks sorted by score (best first)
+func sortDisksByScore(disks []*DiskCandidate) []*DiskCandidate {
+ sorted := make([]*DiskCandidate, len(disks))
+ copy(sorted, disks)
+ sort.Slice(sorted, func(i, j int) bool {
+ return calculateDiskScore(sorted[i]) > calculateDiskScore(sorted[j])
+ })
+ return sorted
+}
+
+// calculateDiskScore calculates a score for a disk candidate
+// Higher score is better
+func calculateDiskScore(disk *DiskCandidate) float64 {
+ score := 0.0
+
+ // Primary factor: available capacity (lower utilization is better)
+ if disk.MaxVolumeCount > 0 {
+ utilization := float64(disk.VolumeCount) / float64(disk.MaxVolumeCount)
+ score += (1.0 - utilization) * 60.0 // Up to 60 points
+ } else {
+ score += 30.0 // Default if no max count
+ }
+
+ // Secondary factor: fewer shards already on this disk is better
+ score += float64(10-disk.ShardCount) * 2.0 // Up to 20 points
+
+ // Tertiary factor: lower load is better
+ score += float64(10 - disk.LoadCount) // Up to 10 points
+
+ return score
+}
+
+// addDiskToResult adds a disk to the result and updates tracking maps
+func addDiskToResult(result *PlacementResult, disk *DiskCandidate,
+ usedDisks, usedServers, usedRacks map[string]bool) {
+ diskKey := getDiskKey(disk)
+ rackKey := getRackKey(disk)
+
+ result.SelectedDisks = append(result.SelectedDisks, disk)
+ usedDisks[diskKey] = true
+ usedServers[disk.NodeID] = true
+ usedRacks[rackKey] = true
+ result.ShardsPerServer[disk.NodeID]++
+ result.ShardsPerRack[rackKey]++
+ result.ShardsPerDC[disk.DataCenter]++
+}
+
+// VerifySpread checks if the placement result meets diversity requirements
+func VerifySpread(result *PlacementResult, minServers, minRacks int) error {
+ if result.ServersUsed < minServers {
+ return fmt.Errorf("only %d servers used, need at least %d", result.ServersUsed, minServers)
+ }
+ if result.RacksUsed < minRacks {
+ return fmt.Errorf("only %d racks used, need at least %d", result.RacksUsed, minRacks)
+ }
+ return nil
+}
+
+// CalculateIdealDistribution returns the ideal number of shards per server
+// when we have a certain number of shards and servers
+func CalculateIdealDistribution(totalShards, numServers int) (min, max int) {
+ if numServers <= 0 {
+ return 0, totalShards
+ }
+ min = totalShards / numServers
+ max = min
+ if totalShards%numServers != 0 {
+ max = min + 1
+ }
+ return
+}
diff --git a/weed/storage/erasure_coding/placement/placement_test.go b/weed/storage/erasure_coding/placement/placement_test.go
new file mode 100644
index 000000000..6cb94a4da
--- /dev/null
+++ b/weed/storage/erasure_coding/placement/placement_test.go
@@ -0,0 +1,517 @@
+package placement
+
+import (
+"testing"
+)
+
+// Helper function to create disk candidates for testing
+func makeDisk(nodeID string, diskID uint32, dc, rack string, freeSlots int) *DiskCandidate {
+ return &DiskCandidate{
+ NodeID: nodeID,
+ DiskID: diskID,
+ DataCenter: dc,
+ Rack: rack,
+ VolumeCount: 0,
+ MaxVolumeCount: 100,
+ ShardCount: 0,
+ FreeSlots: freeSlots,
+ LoadCount: 0,
+ }
+}
+
+func TestSelectDestinations_SingleRack(t *testing.T) {
+ // Test: 3 servers in same rack, each with 2 disks, need 6 shards
+ // Expected: Should spread across all 6 disks (one per disk)
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server3", 0, "dc1", "rack1", 10),
+ makeDisk("server3", 1, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 6,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 6 {
+ t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Verify all 3 servers are used
+ if result.ServersUsed != 3 {
+ t.Errorf("expected 3 servers used, got %d", result.ServersUsed)
+ }
+
+ // Verify each disk is unique
+ diskSet := make(map[string]bool)
+ for _, disk := range result.SelectedDisks {
+ key := getDiskKey(disk)
+ if diskSet[key] {
+ t.Errorf("disk %s selected multiple times", key)
+ }
+ diskSet[key] = true
+ }
+}
+
+func TestSelectDestinations_MultipleRacks(t *testing.T) {
+ // Test: 2 racks with 2 servers each, each server has 2 disks
+ // Need 8 shards
+ // Expected: Should spread across all 8 disks
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server3", 0, "dc1", "rack2", 10),
+ makeDisk("server3", 1, "dc1", "rack2", 10),
+ makeDisk("server4", 0, "dc1", "rack2", 10),
+ makeDisk("server4", 1, "dc1", "rack2", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 8,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 8 {
+ t.Errorf("expected 8 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Verify all 4 servers are used
+ if result.ServersUsed != 4 {
+ t.Errorf("expected 4 servers used, got %d", result.ServersUsed)
+ }
+
+ // Verify both racks are used
+ if result.RacksUsed != 2 {
+ t.Errorf("expected 2 racks used, got %d", result.RacksUsed)
+ }
+}
+
+func TestSelectDestinations_PrefersDifferentServers(t *testing.T) {
+ // Test: 4 servers with 4 disks each, need 4 shards
+ // Expected: Should use one disk from each server
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server1", 2, "dc1", "rack1", 10),
+ makeDisk("server1", 3, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 2, "dc1", "rack1", 10),
+ makeDisk("server2", 3, "dc1", "rack1", 10),
+ makeDisk("server3", 0, "dc1", "rack1", 10),
+ makeDisk("server3", 1, "dc1", "rack1", 10),
+ makeDisk("server3", 2, "dc1", "rack1", 10),
+ makeDisk("server3", 3, "dc1", "rack1", 10),
+ makeDisk("server4", 0, "dc1", "rack1", 10),
+ makeDisk("server4", 1, "dc1", "rack1", 10),
+ makeDisk("server4", 2, "dc1", "rack1", 10),
+ makeDisk("server4", 3, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 4,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 4 {
+ t.Errorf("expected 4 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Verify all 4 servers are used (one shard per server)
+ if result.ServersUsed != 4 {
+ t.Errorf("expected 4 servers used, got %d", result.ServersUsed)
+ }
+
+ // Each server should have exactly 1 shard
+ for server, count := range result.ShardsPerServer {
+ if count != 1 {
+ t.Errorf("server %s has %d shards, expected 1", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_SpilloverToMultipleDisksPerServer(t *testing.T) {
+ // Test: 2 servers with 4 disks each, need 6 shards
+ // Expected: First pick one from each server (2 shards), then one more from each (4 shards),
+ // then fill remaining from any server (6 shards)
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server1", 2, "dc1", "rack1", 10),
+ makeDisk("server1", 3, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 2, "dc1", "rack1", 10),
+ makeDisk("server2", 3, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 6,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 6 {
+ t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Both servers should be used
+ if result.ServersUsed != 2 {
+ t.Errorf("expected 2 servers used, got %d", result.ServersUsed)
+ }
+
+ // Each server should have exactly 3 shards (balanced)
+ for server, count := range result.ShardsPerServer {
+ if count != 3 {
+ t.Errorf("server %s has %d shards, expected 3", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_MaxShardsPerServer(t *testing.T) {
+ // Test: 2 servers with 4 disks each, need 6 shards, max 2 per server
+ // Expected: Should only select 4 shards (2 per server limit)
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server1", 2, "dc1", "rack1", 10),
+ makeDisk("server1", 3, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 2, "dc1", "rack1", 10),
+ makeDisk("server2", 3, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 6,
+ MaxShardsPerServer: 2,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // Should only get 4 shards due to server limit
+ if len(result.SelectedDisks) != 4 {
+ t.Errorf("expected 4 selected disks (limit 2 per server), got %d", len(result.SelectedDisks))
+ }
+
+ // No server should exceed the limit
+ for server, count := range result.ShardsPerServer {
+ if count > 2 {
+ t.Errorf("server %s has %d shards, exceeds limit of 2", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_14ShardsAcross7Servers(t *testing.T) {
+ // Test: Real-world EC scenario - 14 shards across 7 servers with 2 disks each
+ // Expected: Should spread evenly (2 shards per server)
+ var disks []*DiskCandidate
+ for i := 1; i <= 7; i++ {
+ serverID := "server" + string(rune('0'+i))
+ disks = append(disks, makeDisk(serverID, 0, "dc1", "rack1", 10))
+ disks = append(disks, makeDisk(serverID, 1, "dc1", "rack1", 10))
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 14,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 14 {
+ t.Errorf("expected 14 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // All 7 servers should be used
+ if result.ServersUsed != 7 {
+ t.Errorf("expected 7 servers used, got %d", result.ServersUsed)
+ }
+
+ // Each server should have exactly 2 shards
+ for server, count := range result.ShardsPerServer {
+ if count != 2 {
+ t.Errorf("server %s has %d shards, expected 2", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_FewerServersThanShards(t *testing.T) {
+ // Test: Only 3 servers but need 6 shards
+ // Expected: Should distribute evenly (2 per server)
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server1", 2, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 2, "dc1", "rack1", 10),
+ makeDisk("server3", 0, "dc1", "rack1", 10),
+ makeDisk("server3", 1, "dc1", "rack1", 10),
+ makeDisk("server3", 2, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 6,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 6 {
+ t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // All 3 servers should be used
+ if result.ServersUsed != 3 {
+ t.Errorf("expected 3 servers used, got %d", result.ServersUsed)
+ }
+
+ // Each server should have exactly 2 shards
+ for server, count := range result.ShardsPerServer {
+ if count != 2 {
+ t.Errorf("server %s has %d shards, expected 2", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_NoSuitableDisks(t *testing.T) {
+ // Test: All disks have no free slots
+ disks := []*DiskCandidate{
+ {NodeID: "server1", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 0},
+ {NodeID: "server2", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 0},
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 4,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ _, err := SelectDestinations(disks, config)
+ if err == nil {
+ t.Error("expected error for no suitable disks, got nil")
+ }
+}
+
+func TestSelectDestinations_EmptyInput(t *testing.T) {
+ config := DefaultPlacementRequest()
+ _, err := SelectDestinations([]*DiskCandidate{}, config)
+ if err == nil {
+ t.Error("expected error for empty input, got nil")
+ }
+}
+
+func TestSelectDestinations_FiltersByLoad(t *testing.T) {
+ // Test: Some disks have too high load
+ disks := []*DiskCandidate{
+ {NodeID: "server1", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 10},
+ {NodeID: "server2", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 2},
+ {NodeID: "server3", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 1},
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 2,
+ MaxTaskLoad: 5,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // Should only select from server2 and server3 (server1 has too high load)
+ for _, disk := range result.SelectedDisks {
+ if disk.NodeID == "server1" {
+ t.Errorf("disk from server1 should not be selected (load too high)")
+ }
+ }
+}
+
+func TestCalculateDiskScore(t *testing.T) {
+ // Test that score calculation works as expected
+ lowUtilDisk := &DiskCandidate{
+ VolumeCount: 10,
+ MaxVolumeCount: 100,
+ ShardCount: 0,
+ LoadCount: 0,
+ }
+
+ highUtilDisk := &DiskCandidate{
+ VolumeCount: 90,
+ MaxVolumeCount: 100,
+ ShardCount: 5,
+ LoadCount: 5,
+ }
+
+ lowScore := calculateDiskScore(lowUtilDisk)
+ highScore := calculateDiskScore(highUtilDisk)
+
+ if lowScore <= highScore {
+ t.Errorf("low utilization disk should have higher score: low=%f, high=%f", lowScore, highScore)
+ }
+}
+
+func TestCalculateIdealDistribution(t *testing.T) {
+ tests := []struct {
+ totalShards int
+ numServers int
+ expectedMin int
+ expectedMax int
+ }{
+ {14, 7, 2, 2}, // Even distribution
+ {14, 4, 3, 4}, // Uneven: 14/4 = 3 remainder 2
+ {6, 3, 2, 2}, // Even distribution
+ {7, 3, 2, 3}, // Uneven: 7/3 = 2 remainder 1
+ {10, 0, 0, 10}, // Edge case: no servers
+ {0, 5, 0, 0}, // Edge case: no shards
+ }
+
+ for _, tt := range tests {
+ min, max := CalculateIdealDistribution(tt.totalShards, tt.numServers)
+ if min != tt.expectedMin || max != tt.expectedMax {
+ t.Errorf("CalculateIdealDistribution(%d, %d) = (%d, %d), want (%d, %d)",
+tt.totalShards, tt.numServers, min, max, tt.expectedMin, tt.expectedMax)
+ }
+ }
+}
+
+func TestVerifySpread(t *testing.T) {
+ result := &PlacementResult{
+ ServersUsed: 3,
+ RacksUsed: 2,
+ }
+
+ // Should pass
+ if err := VerifySpread(result, 3, 2); err != nil {
+ t.Errorf("unexpected error: %v", err)
+ }
+
+ // Should fail - not enough servers
+ if err := VerifySpread(result, 4, 2); err == nil {
+ t.Error("expected error for insufficient servers")
+ }
+
+ // Should fail - not enough racks
+ if err := VerifySpread(result, 3, 3); err == nil {
+ t.Error("expected error for insufficient racks")
+ }
+}
+
+func TestSelectDestinations_MultiDC(t *testing.T) {
+ // Test: 2 DCs, each with 2 racks, each rack has 2 servers
+ disks := []*DiskCandidate{
+ // DC1, Rack1
+ makeDisk("dc1-r1-s1", 0, "dc1", "rack1", 10),
+ makeDisk("dc1-r1-s1", 1, "dc1", "rack1", 10),
+ makeDisk("dc1-r1-s2", 0, "dc1", "rack1", 10),
+ makeDisk("dc1-r1-s2", 1, "dc1", "rack1", 10),
+ // DC1, Rack2
+ makeDisk("dc1-r2-s1", 0, "dc1", "rack2", 10),
+ makeDisk("dc1-r2-s1", 1, "dc1", "rack2", 10),
+ makeDisk("dc1-r2-s2", 0, "dc1", "rack2", 10),
+ makeDisk("dc1-r2-s2", 1, "dc1", "rack2", 10),
+ // DC2, Rack1
+ makeDisk("dc2-r1-s1", 0, "dc2", "rack1", 10),
+ makeDisk("dc2-r1-s1", 1, "dc2", "rack1", 10),
+ makeDisk("dc2-r1-s2", 0, "dc2", "rack1", 10),
+ makeDisk("dc2-r1-s2", 1, "dc2", "rack1", 10),
+ // DC2, Rack2
+ makeDisk("dc2-r2-s1", 0, "dc2", "rack2", 10),
+ makeDisk("dc2-r2-s1", 1, "dc2", "rack2", 10),
+ makeDisk("dc2-r2-s2", 0, "dc2", "rack2", 10),
+ makeDisk("dc2-r2-s2", 1, "dc2", "rack2", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 8,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 8 {
+ t.Errorf("expected 8 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Should use all 4 racks
+ if result.RacksUsed != 4 {
+ t.Errorf("expected 4 racks used, got %d", result.RacksUsed)
+ }
+
+ // Should use both DCs
+ if result.DCsUsed != 2 {
+ t.Errorf("expected 2 DCs used, got %d", result.DCsUsed)
+ }
+}
+
+func TestSelectDestinations_SameRackDifferentDC(t *testing.T) {
+ // Test: Same rack name in different DCs should be treated as different racks
+ disks := []*DiskCandidate{
+ makeDisk("dc1-s1", 0, "dc1", "rack1", 10),
+ makeDisk("dc2-s1", 0, "dc2", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 2,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // Should use 2 racks (dc1:rack1 and dc2:rack1 are different)
+ if result.RacksUsed != 2 {
+ t.Errorf("expected 2 racks used (different DCs), got %d", result.RacksUsed)
+ }
+}