aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/shell/command_ec_common.go190
-rw-r--r--weed/shell/command_volume_server_evacuate.go10
-rw-r--r--weed/storage/erasure_coding/placement/placement.go420
-rw-r--r--weed/storage/erasure_coding/placement/placement_test.go517
-rw-r--r--weed/worker/tasks/erasure_coding/detection.go130
5 files changed, 1195 insertions, 72 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index f059b4e74..f2cc581da 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -26,12 +26,25 @@ type DataCenterId string
type EcNodeId string
type RackId string
+// EcDisk represents a single disk on a volume server
+type EcDisk struct {
+ diskId uint32
+ diskType string
+ freeEcSlots int
+ ecShardCount int // Total EC shards on this disk
+ // Map of volumeId -> shardBits for shards on this disk
+ ecShards map[needle.VolumeId]erasure_coding.ShardBits
+}
+
type EcNode struct {
info *master_pb.DataNodeInfo
dc DataCenterId
rack RackId
freeEcSlot int
+ // disks maps diskId -> EcDisk for disk-level balancing
+ disks map[uint32]*EcDisk
}
+
type CandidateEcNode struct {
ecNode *EcNode
shardCount int
@@ -229,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol
return collections
}
-func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
+func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
@@ -242,7 +255,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
// ask destination node to copy shard and the ecx file from source node, and mount it
- copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
+ copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress, destDiskId)
if err != nil {
return err
}
@@ -259,7 +272,11 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
return err
}
- fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
+ if destDiskId > 0 {
+ fmt.Printf("moved ec shard %d.%d %s => %s (disk %d)\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
+ }
}
@@ -272,7 +289,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
targetServer *EcNode, shardIdsToCopy []uint32,
- volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
+ volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress, destDiskId uint32) (copiedShardIds []uint32, err error) {
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
@@ -289,6 +306,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(existingLocation),
+ DiskId: destDiskId,
})
if copyErr != nil {
return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
@@ -410,12 +428,74 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
}
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
- ecNodes = append(ecNodes, &EcNode{
+ ecNode := &EcNode{
info: dn,
dc: dc,
rack: rack,
freeEcSlot: int(freeEcSlots),
- })
+ disks: make(map[uint32]*EcDisk),
+ }
+
+ // Build disk-level information from volumes and EC shards
+ // First, discover all unique disk IDs from VolumeInfos (includes empty disks)
+ allDiskIds := make(map[uint32]string) // diskId -> diskType
+ for diskType, diskInfo := range dn.DiskInfos {
+ if diskInfo == nil {
+ continue
+ }
+ // Get all disk IDs from volumes
+ for _, vi := range diskInfo.VolumeInfos {
+ allDiskIds[vi.DiskId] = diskType
+ }
+ // Also get disk IDs from EC shards
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ allDiskIds[ecShardInfo.DiskId] = diskType
+ }
+ }
+
+ // Group EC shards by disk_id
+ diskShards := make(map[uint32]map[needle.VolumeId]erasure_coding.ShardBits)
+ for _, diskInfo := range dn.DiskInfos {
+ if diskInfo == nil {
+ continue
+ }
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ diskId := ecShardInfo.DiskId
+ if diskShards[diskId] == nil {
+ diskShards[diskId] = make(map[needle.VolumeId]erasure_coding.ShardBits)
+ }
+ vid := needle.VolumeId(ecShardInfo.Id)
+ diskShards[diskId][vid] = erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
+ }
+ }
+
+ // Create EcDisk for each discovered disk
+ diskCount := len(allDiskIds)
+ if diskCount == 0 {
+ diskCount = 1
+ }
+ freePerDisk := int(freeEcSlots) / diskCount
+
+ for diskId, diskType := range allDiskIds {
+ shards := diskShards[diskId]
+ if shards == nil {
+ shards = make(map[needle.VolumeId]erasure_coding.ShardBits)
+ }
+ totalShardCount := 0
+ for _, shardBits := range shards {
+ totalShardCount += shardBits.ShardIdCount()
+ }
+
+ ecNode.disks[diskId] = &EcDisk{
+ diskId: diskId,
+ diskType: diskType,
+ freeEcSlots: freePerDisk,
+ ecShardCount: totalShardCount,
+ ecShards: shards,
+ }
+ }
+
+ ecNodes = append(ecNodes, ecNode)
totalFreeEcSlots += freeEcSlots
})
return
@@ -884,10 +964,16 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
for _, shards := range fullDiskInfo.EcShardInfos {
if _, found := emptyNodeIds[shards.Id]; !found {
for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
+ vid := needle.VolumeId(shards.Id)
+ destDiskId := pickBestDiskOnNode(emptyNode, vid)
- fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
+ if destDiskId > 0 {
+ fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
+ }
- err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing)
+ err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing)
if err != nil {
return err
}
@@ -957,18 +1043,98 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi
if len(targets) == 0 {
return nil, errors.New(details)
}
+
+ // When multiple nodes have the same shard count, prefer nodes with better disk distribution
+ // (i.e., nodes with more disks that have fewer shards of this volume)
+ if len(targets) > 1 {
+ slices.SortFunc(targets, func(a, b *EcNode) int {
+ aScore := diskDistributionScore(a, vid)
+ bScore := diskDistributionScore(b, vid)
+ return aScore - bScore // Lower score is better
+ })
+ return targets[0], nil
+ }
+
return targets[rand.IntN(len(targets))], nil
}
+// diskDistributionScore calculates a score for how well-distributed shards are on the node's disks
+// Lower score is better (means more room for balanced distribution)
+func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int {
+ if len(ecNode.disks) == 0 {
+ return 0
+ }
+
+ // Sum the existing shard count for this volume on each disk
+ // Lower total means more room for new shards
+ score := 0
+ for _, disk := range ecNode.disks {
+ if shardBits, ok := disk.ecShards[vid]; ok {
+ score += shardBits.ShardIdCount() * 10 // Weight shards of this volume heavily
+ }
+ score += disk.ecShardCount // Also consider total shards on disk
+ }
+ return score
+}
+
+// pickBestDiskOnNode selects the best disk on a node for placing a new EC shard
+// It prefers disks with fewer shards and more free slots
+func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 {
+ if len(ecNode.disks) == 0 {
+ return 0 // No disk info available, let the server decide
+ }
+
+ var bestDiskId uint32
+ bestScore := -1
+
+ for diskId, disk := range ecNode.disks {
+ if disk.freeEcSlots <= 0 {
+ continue
+ }
+
+ // Check if this volume already has shards on this disk
+ existingShards := 0
+ if shardBits, ok := disk.ecShards[vid]; ok {
+ existingShards = shardBits.ShardIdCount()
+ }
+
+ // Score: prefer disks with fewer total shards and fewer shards of this volume
+ // Lower score is better
+ score := disk.ecShardCount*10 + existingShards*100
+
+ if bestScore == -1 || score < bestScore {
+ bestScore = score
+ bestDiskId = diskId
+ }
+ }
+
+ return bestDiskId
+}
+
+// pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk
+func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, uint32, error) {
+ node, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinations)
+ if err != nil {
+ return nil, 0, err
+ }
+
+ diskId := pickBestDiskOnNode(node, vid)
+ return node, diskId, nil
+}
+
func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error {
- destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
+ destNode, destDiskId, err := ecb.pickEcNodeAndDiskToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
if err != nil {
- fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error())
+ fmt.Printf("WARNING: Could not find suitable target node for %d.%d:\n%s", vid, shardId, err.Error())
return nil
}
- fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
- return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing)
+ if destDiskId > 0 {
+ fmt.Printf("%s moves ec shard %d.%d to %s (disk %d)\n", existingLocation.info.Id, vid, shardId, destNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
+ }
+ return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing)
}
func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go
index 5c1805c89..6135eb3eb 100644
--- a/weed/shell/command_volume_server_evacuate.go
+++ b/weed/shell/command_volume_server_evacuate.go
@@ -197,8 +197,14 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv
if ecShardInfo.Collection != "" {
collectionPrefix = ecShardInfo.Collection + "_"
}
- fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
- err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
+ vid := needle.VolumeId(ecShardInfo.Id)
+ destDiskId := pickBestDiskOnNode(emptyNode, vid)
+ if destDiskId > 0 {
+ fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId)
+ } else {
+ fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
+ }
+ err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange)
if err != nil {
return
} else {
diff --git a/weed/storage/erasure_coding/placement/placement.go b/weed/storage/erasure_coding/placement/placement.go
new file mode 100644
index 000000000..67e21c1f8
--- /dev/null
+++ b/weed/storage/erasure_coding/placement/placement.go
@@ -0,0 +1,420 @@
+// Package placement provides consolidated EC shard placement logic used by
+// both shell commands and worker tasks.
+//
+// This package encapsulates the algorithms for:
+// - Selecting destination nodes/disks for EC shards
+// - Ensuring proper spread across racks, servers, and disks
+// - Balancing shards across the cluster
+package placement
+
+import (
+ "fmt"
+ "sort"
+)
+
+// DiskCandidate represents a disk that can receive EC shards
+type DiskCandidate struct {
+ NodeID string
+ DiskID uint32
+ DataCenter string
+ Rack string
+
+ // Capacity information
+ VolumeCount int64
+ MaxVolumeCount int64
+ ShardCount int // Current number of EC shards on this disk
+ FreeSlots int // Available slots for new shards
+
+ // Load information
+ LoadCount int // Number of active tasks on this disk
+}
+
+// NodeCandidate represents a server node that can receive EC shards
+type NodeCandidate struct {
+ NodeID string
+ DataCenter string
+ Rack string
+ FreeSlots int
+ ShardCount int // Total shards across all disks
+ Disks []*DiskCandidate // All disks on this node
+}
+
+// PlacementRequest configures EC shard placement behavior
+type PlacementRequest struct {
+ // ShardsNeeded is the total number of shards to place
+ ShardsNeeded int
+
+ // MaxShardsPerServer limits how many shards can be placed on a single server
+ // 0 means no limit (but prefer spreading when possible)
+ MaxShardsPerServer int
+
+ // MaxShardsPerRack limits how many shards can be placed in a single rack
+ // 0 means no limit
+ MaxShardsPerRack int
+
+ // MaxTaskLoad is the maximum task load count for a disk to be considered
+ MaxTaskLoad int
+
+ // PreferDifferentServers when true, spreads shards across different servers
+ // before using multiple disks on the same server
+ PreferDifferentServers bool
+
+ // PreferDifferentRacks when true, spreads shards across different racks
+ // before using multiple servers in the same rack
+ PreferDifferentRacks bool
+}
+
+// DefaultPlacementRequest returns the default placement configuration
+func DefaultPlacementRequest() PlacementRequest {
+ return PlacementRequest{
+ ShardsNeeded: 14,
+ MaxShardsPerServer: 0,
+ MaxShardsPerRack: 0,
+ MaxTaskLoad: 5,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+}
+
+// PlacementResult contains the selected destinations for EC shards
+type PlacementResult struct {
+ SelectedDisks []*DiskCandidate
+
+ // Statistics
+ ServersUsed int
+ RacksUsed int
+ DCsUsed int
+
+ // Distribution maps
+ ShardsPerServer map[string]int
+ ShardsPerRack map[string]int
+ ShardsPerDC map[string]int
+}
+
+// SelectDestinations selects the best disks for EC shard placement.
+// This is the main entry point for EC placement logic.
+//
+// The algorithm works in multiple passes:
+// 1. First pass: Select one disk from each rack (maximize rack diversity)
+// 2. Second pass: Select one disk from each unused server in used racks (maximize server diversity)
+// 3. Third pass: Select additional disks from servers already used (maximize disk diversity)
+func SelectDestinations(disks []*DiskCandidate, config PlacementRequest) (*PlacementResult, error) {
+ if len(disks) == 0 {
+ return nil, fmt.Errorf("no disk candidates provided")
+ }
+ if config.ShardsNeeded <= 0 {
+ return nil, fmt.Errorf("shardsNeeded must be positive, got %d", config.ShardsNeeded)
+ }
+
+ // Filter suitable disks
+ suitable := filterSuitableDisks(disks, config)
+ if len(suitable) == 0 {
+ return nil, fmt.Errorf("no suitable disks found after filtering")
+ }
+
+ // Build indexes for efficient lookup
+ rackToDisks := groupDisksByRack(suitable)
+
+ result := &PlacementResult{
+ SelectedDisks: make([]*DiskCandidate, 0, config.ShardsNeeded),
+ ShardsPerServer: make(map[string]int),
+ ShardsPerRack: make(map[string]int),
+ ShardsPerDC: make(map[string]int),
+ }
+
+ usedDisks := make(map[string]bool) // "nodeID:diskID" -> bool
+ usedServers := make(map[string]bool) // nodeID -> bool
+ usedRacks := make(map[string]bool) // "dc:rack" -> bool
+
+ // Pass 1: Select one disk from each rack (maximize rack diversity)
+ if config.PreferDifferentRacks {
+ // Sort racks by number of available servers (descending) to prioritize racks with more options
+ sortedRacks := sortRacksByServerCount(rackToDisks)
+ for _, rackKey := range sortedRacks {
+ if len(result.SelectedDisks) >= config.ShardsNeeded {
+ break
+ }
+ rackDisks := rackToDisks[rackKey]
+ // Select best disk from this rack, preferring a new server
+ disk := selectBestDiskFromRack(rackDisks, usedServers, usedDisks, config)
+ if disk != nil {
+ addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
+ }
+ }
+ }
+
+ // Pass 2: Select disks from unused servers in already-used racks
+ if config.PreferDifferentServers && len(result.SelectedDisks) < config.ShardsNeeded {
+ for _, rackKey := range getSortedRackKeys(rackToDisks) {
+ if len(result.SelectedDisks) >= config.ShardsNeeded {
+ break
+ }
+ rackDisks := rackToDisks[rackKey]
+ for _, disk := range sortDisksByScore(rackDisks) {
+ if len(result.SelectedDisks) >= config.ShardsNeeded {
+ break
+ }
+ diskKey := getDiskKey(disk)
+ if usedDisks[diskKey] {
+ continue
+ }
+ // Skip if server already used (we want different servers in this pass)
+ if usedServers[disk.NodeID] {
+ continue
+ }
+ // Check server limit
+ if config.MaxShardsPerServer > 0 && result.ShardsPerServer[disk.NodeID] >= config.MaxShardsPerServer {
+ continue
+ }
+ // Check rack limit
+ if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack {
+ continue
+ }
+ addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
+ }
+ }
+ }
+
+ // Pass 3: Fill remaining slots from already-used servers (different disks)
+ // Use round-robin across servers to balance shards evenly
+ if len(result.SelectedDisks) < config.ShardsNeeded {
+ // Group remaining disks by server
+ serverToRemainingDisks := make(map[string][]*DiskCandidate)
+ for _, disk := range suitable {
+ if !usedDisks[getDiskKey(disk)] {
+ serverToRemainingDisks[disk.NodeID] = append(serverToRemainingDisks[disk.NodeID], disk)
+ }
+ }
+
+ // Sort each server's disks by score
+ for serverID := range serverToRemainingDisks {
+ serverToRemainingDisks[serverID] = sortDisksByScore(serverToRemainingDisks[serverID])
+ }
+
+ // Round-robin: repeatedly select from the server with the fewest shards
+ for len(result.SelectedDisks) < config.ShardsNeeded {
+ // Find server with fewest shards that still has available disks
+ var bestServer string
+ minShards := -1
+ for serverID, disks := range serverToRemainingDisks {
+ if len(disks) == 0 {
+ continue
+ }
+ // Check server limit
+ if config.MaxShardsPerServer > 0 && result.ShardsPerServer[serverID] >= config.MaxShardsPerServer {
+ continue
+ }
+ shardCount := result.ShardsPerServer[serverID]
+ if minShards == -1 || shardCount < minShards {
+ minShards = shardCount
+ bestServer = serverID
+ } else if shardCount == minShards && serverID < bestServer {
+ // Tie-break by server name for determinism
+ bestServer = serverID
+ }
+ }
+
+ if bestServer == "" {
+ // No more servers with available disks
+ break
+ }
+
+ // Pop the best disk from this server
+ disks := serverToRemainingDisks[bestServer]
+ disk := disks[0]
+ serverToRemainingDisks[bestServer] = disks[1:]
+
+ // Check rack limit
+ if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack {
+ continue
+ }
+
+ addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
+ }
+ }
+
+ // Calculate final statistics
+ result.ServersUsed = len(usedServers)
+ result.RacksUsed = len(usedRacks)
+ dcSet := make(map[string]bool)
+ for _, disk := range result.SelectedDisks {
+ dcSet[disk.DataCenter] = true
+ }
+ result.DCsUsed = len(dcSet)
+
+ return result, nil
+}
+
+// filterSuitableDisks filters disks that are suitable for EC placement
+func filterSuitableDisks(disks []*DiskCandidate, config PlacementRequest) []*DiskCandidate {
+ var suitable []*DiskCandidate
+ for _, disk := range disks {
+ if disk.FreeSlots <= 0 {
+ continue
+ }
+ if config.MaxTaskLoad > 0 && disk.LoadCount > config.MaxTaskLoad {
+ continue
+ }
+ suitable = append(suitable, disk)
+ }
+ return suitable
+}
+
+// groupDisksByRack groups disks by their rack (dc:rack key)
+func groupDisksByRack(disks []*DiskCandidate) map[string][]*DiskCandidate {
+ result := make(map[string][]*DiskCandidate)
+ for _, disk := range disks {
+ key := getRackKey(disk)
+ result[key] = append(result[key], disk)
+ }
+ return result
+}
+
+// groupDisksByServer groups disks by their server
+func groupDisksByServer(disks []*DiskCandidate) map[string][]*DiskCandidate {
+ result := make(map[string][]*DiskCandidate)
+ for _, disk := range disks {
+ result[disk.NodeID] = append(result[disk.NodeID], disk)
+ }
+ return result
+}
+
+// getRackKey returns the unique key for a rack (dc:rack)
+func getRackKey(disk *DiskCandidate) string {
+ return fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
+}
+
+// getDiskKey returns the unique key for a disk (nodeID:diskID)
+func getDiskKey(disk *DiskCandidate) string {
+ return fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID)
+}
+
+// sortRacksByServerCount returns rack keys sorted by number of servers (ascending)
+func sortRacksByServerCount(rackToDisks map[string][]*DiskCandidate) []string {
+ // Count unique servers per rack
+ rackServerCount := make(map[string]int)
+ for rackKey, disks := range rackToDisks {
+ servers := make(map[string]bool)
+ for _, disk := range disks {
+ servers[disk.NodeID] = true
+ }
+ rackServerCount[rackKey] = len(servers)
+ }
+
+ keys := getSortedRackKeys(rackToDisks)
+ sort.Slice(keys, func(i, j int) bool {
+ // Sort by server count (descending) to pick from racks with more options first
+ return rackServerCount[keys[i]] > rackServerCount[keys[j]]
+ })
+ return keys
+}
+
+// getSortedRackKeys returns rack keys in a deterministic order
+func getSortedRackKeys(rackToDisks map[string][]*DiskCandidate) []string {
+ keys := make([]string, 0, len(rackToDisks))
+ for k := range rackToDisks {
+ keys = append(keys, k)
+ }
+ sort.Strings(keys)
+ return keys
+}
+
+// selectBestDiskFromRack selects the best disk from a rack for EC placement
+// It prefers servers that haven't been used yet
+func selectBestDiskFromRack(disks []*DiskCandidate, usedServers, usedDisks map[string]bool, config PlacementRequest) *DiskCandidate {
+ var bestDisk *DiskCandidate
+ bestScore := -1.0
+ bestIsFromUnusedServer := false
+
+ for _, disk := range disks {
+ if usedDisks[getDiskKey(disk)] {
+ continue
+ }
+ isFromUnusedServer := !usedServers[disk.NodeID]
+ score := calculateDiskScore(disk)
+
+ // Prefer unused servers
+ if isFromUnusedServer && !bestIsFromUnusedServer {
+ bestDisk = disk
+ bestScore = score
+ bestIsFromUnusedServer = true
+ } else if isFromUnusedServer == bestIsFromUnusedServer && score > bestScore {
+ bestDisk = disk
+ bestScore = score
+ }
+ }
+
+ return bestDisk
+}
+
+// sortDisksByScore returns disks sorted by score (best first)
+func sortDisksByScore(disks []*DiskCandidate) []*DiskCandidate {
+ sorted := make([]*DiskCandidate, len(disks))
+ copy(sorted, disks)
+ sort.Slice(sorted, func(i, j int) bool {
+ return calculateDiskScore(sorted[i]) > calculateDiskScore(sorted[j])
+ })
+ return sorted
+}
+
+// calculateDiskScore calculates a score for a disk candidate
+// Higher score is better
+func calculateDiskScore(disk *DiskCandidate) float64 {
+ score := 0.0
+
+ // Primary factor: available capacity (lower utilization is better)
+ if disk.MaxVolumeCount > 0 {
+ utilization := float64(disk.VolumeCount) / float64(disk.MaxVolumeCount)
+ score += (1.0 - utilization) * 60.0 // Up to 60 points
+ } else {
+ score += 30.0 // Default if no max count
+ }
+
+ // Secondary factor: fewer shards already on this disk is better
+ score += float64(10-disk.ShardCount) * 2.0 // Up to 20 points
+
+ // Tertiary factor: lower load is better
+ score += float64(10 - disk.LoadCount) // Up to 10 points
+
+ return score
+}
+
+// addDiskToResult adds a disk to the result and updates tracking maps
+func addDiskToResult(result *PlacementResult, disk *DiskCandidate,
+ usedDisks, usedServers, usedRacks map[string]bool) {
+ diskKey := getDiskKey(disk)
+ rackKey := getRackKey(disk)
+
+ result.SelectedDisks = append(result.SelectedDisks, disk)
+ usedDisks[diskKey] = true
+ usedServers[disk.NodeID] = true
+ usedRacks[rackKey] = true
+ result.ShardsPerServer[disk.NodeID]++
+ result.ShardsPerRack[rackKey]++
+ result.ShardsPerDC[disk.DataCenter]++
+}
+
+// VerifySpread checks if the placement result meets diversity requirements
+func VerifySpread(result *PlacementResult, minServers, minRacks int) error {
+ if result.ServersUsed < minServers {
+ return fmt.Errorf("only %d servers used, need at least %d", result.ServersUsed, minServers)
+ }
+ if result.RacksUsed < minRacks {
+ return fmt.Errorf("only %d racks used, need at least %d", result.RacksUsed, minRacks)
+ }
+ return nil
+}
+
+// CalculateIdealDistribution returns the ideal number of shards per server
+// when we have a certain number of shards and servers
+func CalculateIdealDistribution(totalShards, numServers int) (min, max int) {
+ if numServers <= 0 {
+ return 0, totalShards
+ }
+ min = totalShards / numServers
+ max = min
+ if totalShards%numServers != 0 {
+ max = min + 1
+ }
+ return
+}
diff --git a/weed/storage/erasure_coding/placement/placement_test.go b/weed/storage/erasure_coding/placement/placement_test.go
new file mode 100644
index 000000000..6cb94a4da
--- /dev/null
+++ b/weed/storage/erasure_coding/placement/placement_test.go
@@ -0,0 +1,517 @@
+package placement
+
+import (
+"testing"
+)
+
+// Helper function to create disk candidates for testing
+func makeDisk(nodeID string, diskID uint32, dc, rack string, freeSlots int) *DiskCandidate {
+ return &DiskCandidate{
+ NodeID: nodeID,
+ DiskID: diskID,
+ DataCenter: dc,
+ Rack: rack,
+ VolumeCount: 0,
+ MaxVolumeCount: 100,
+ ShardCount: 0,
+ FreeSlots: freeSlots,
+ LoadCount: 0,
+ }
+}
+
+func TestSelectDestinations_SingleRack(t *testing.T) {
+ // Test: 3 servers in same rack, each with 2 disks, need 6 shards
+ // Expected: Should spread across all 6 disks (one per disk)
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server3", 0, "dc1", "rack1", 10),
+ makeDisk("server3", 1, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 6,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 6 {
+ t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Verify all 3 servers are used
+ if result.ServersUsed != 3 {
+ t.Errorf("expected 3 servers used, got %d", result.ServersUsed)
+ }
+
+ // Verify each disk is unique
+ diskSet := make(map[string]bool)
+ for _, disk := range result.SelectedDisks {
+ key := getDiskKey(disk)
+ if diskSet[key] {
+ t.Errorf("disk %s selected multiple times", key)
+ }
+ diskSet[key] = true
+ }
+}
+
+func TestSelectDestinations_MultipleRacks(t *testing.T) {
+ // Test: 2 racks with 2 servers each, each server has 2 disks
+ // Need 8 shards
+ // Expected: Should spread across all 8 disks
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server3", 0, "dc1", "rack2", 10),
+ makeDisk("server3", 1, "dc1", "rack2", 10),
+ makeDisk("server4", 0, "dc1", "rack2", 10),
+ makeDisk("server4", 1, "dc1", "rack2", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 8,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 8 {
+ t.Errorf("expected 8 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Verify all 4 servers are used
+ if result.ServersUsed != 4 {
+ t.Errorf("expected 4 servers used, got %d", result.ServersUsed)
+ }
+
+ // Verify both racks are used
+ if result.RacksUsed != 2 {
+ t.Errorf("expected 2 racks used, got %d", result.RacksUsed)
+ }
+}
+
+func TestSelectDestinations_PrefersDifferentServers(t *testing.T) {
+ // Test: 4 servers with 4 disks each, need 4 shards
+ // Expected: Should use one disk from each server
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server1", 2, "dc1", "rack1", 10),
+ makeDisk("server1", 3, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 2, "dc1", "rack1", 10),
+ makeDisk("server2", 3, "dc1", "rack1", 10),
+ makeDisk("server3", 0, "dc1", "rack1", 10),
+ makeDisk("server3", 1, "dc1", "rack1", 10),
+ makeDisk("server3", 2, "dc1", "rack1", 10),
+ makeDisk("server3", 3, "dc1", "rack1", 10),
+ makeDisk("server4", 0, "dc1", "rack1", 10),
+ makeDisk("server4", 1, "dc1", "rack1", 10),
+ makeDisk("server4", 2, "dc1", "rack1", 10),
+ makeDisk("server4", 3, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 4,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 4 {
+ t.Errorf("expected 4 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Verify all 4 servers are used (one shard per server)
+ if result.ServersUsed != 4 {
+ t.Errorf("expected 4 servers used, got %d", result.ServersUsed)
+ }
+
+ // Each server should have exactly 1 shard
+ for server, count := range result.ShardsPerServer {
+ if count != 1 {
+ t.Errorf("server %s has %d shards, expected 1", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_SpilloverToMultipleDisksPerServer(t *testing.T) {
+ // Test: 2 servers with 4 disks each, need 6 shards
+ // Expected: First pick one from each server (2 shards), then one more from each (4 shards),
+ // then fill remaining from any server (6 shards)
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server1", 2, "dc1", "rack1", 10),
+ makeDisk("server1", 3, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 2, "dc1", "rack1", 10),
+ makeDisk("server2", 3, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 6,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 6 {
+ t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Both servers should be used
+ if result.ServersUsed != 2 {
+ t.Errorf("expected 2 servers used, got %d", result.ServersUsed)
+ }
+
+ // Each server should have exactly 3 shards (balanced)
+ for server, count := range result.ShardsPerServer {
+ if count != 3 {
+ t.Errorf("server %s has %d shards, expected 3", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_MaxShardsPerServer(t *testing.T) {
+ // Test: 2 servers with 4 disks each, need 6 shards, max 2 per server
+ // Expected: Should only select 4 shards (2 per server limit)
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server1", 2, "dc1", "rack1", 10),
+ makeDisk("server1", 3, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 2, "dc1", "rack1", 10),
+ makeDisk("server2", 3, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 6,
+ MaxShardsPerServer: 2,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // Should only get 4 shards due to server limit
+ if len(result.SelectedDisks) != 4 {
+ t.Errorf("expected 4 selected disks (limit 2 per server), got %d", len(result.SelectedDisks))
+ }
+
+ // No server should exceed the limit
+ for server, count := range result.ShardsPerServer {
+ if count > 2 {
+ t.Errorf("server %s has %d shards, exceeds limit of 2", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_14ShardsAcross7Servers(t *testing.T) {
+ // Test: Real-world EC scenario - 14 shards across 7 servers with 2 disks each
+ // Expected: Should spread evenly (2 shards per server)
+ var disks []*DiskCandidate
+ for i := 1; i <= 7; i++ {
+ serverID := "server" + string(rune('0'+i))
+ disks = append(disks, makeDisk(serverID, 0, "dc1", "rack1", 10))
+ disks = append(disks, makeDisk(serverID, 1, "dc1", "rack1", 10))
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 14,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 14 {
+ t.Errorf("expected 14 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // All 7 servers should be used
+ if result.ServersUsed != 7 {
+ t.Errorf("expected 7 servers used, got %d", result.ServersUsed)
+ }
+
+ // Each server should have exactly 2 shards
+ for server, count := range result.ShardsPerServer {
+ if count != 2 {
+ t.Errorf("server %s has %d shards, expected 2", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_FewerServersThanShards(t *testing.T) {
+ // Test: Only 3 servers but need 6 shards
+ // Expected: Should distribute evenly (2 per server)
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server1", 2, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 2, "dc1", "rack1", 10),
+ makeDisk("server3", 0, "dc1", "rack1", 10),
+ makeDisk("server3", 1, "dc1", "rack1", 10),
+ makeDisk("server3", 2, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 6,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 6 {
+ t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // All 3 servers should be used
+ if result.ServersUsed != 3 {
+ t.Errorf("expected 3 servers used, got %d", result.ServersUsed)
+ }
+
+ // Each server should have exactly 2 shards
+ for server, count := range result.ShardsPerServer {
+ if count != 2 {
+ t.Errorf("server %s has %d shards, expected 2", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_NoSuitableDisks(t *testing.T) {
+ // Test: All disks have no free slots
+ disks := []*DiskCandidate{
+ {NodeID: "server1", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 0},
+ {NodeID: "server2", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 0},
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 4,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ _, err := SelectDestinations(disks, config)
+ if err == nil {
+ t.Error("expected error for no suitable disks, got nil")
+ }
+}
+
+func TestSelectDestinations_EmptyInput(t *testing.T) {
+ config := DefaultPlacementRequest()
+ _, err := SelectDestinations([]*DiskCandidate{}, config)
+ if err == nil {
+ t.Error("expected error for empty input, got nil")
+ }
+}
+
+func TestSelectDestinations_FiltersByLoad(t *testing.T) {
+ // Test: Some disks have too high load
+ disks := []*DiskCandidate{
+ {NodeID: "server1", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 10},
+ {NodeID: "server2", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 2},
+ {NodeID: "server3", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 1},
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 2,
+ MaxTaskLoad: 5,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // Should only select from server2 and server3 (server1 has too high load)
+ for _, disk := range result.SelectedDisks {
+ if disk.NodeID == "server1" {
+ t.Errorf("disk from server1 should not be selected (load too high)")
+ }
+ }
+}
+
+func TestCalculateDiskScore(t *testing.T) {
+ // Test that score calculation works as expected
+ lowUtilDisk := &DiskCandidate{
+ VolumeCount: 10,
+ MaxVolumeCount: 100,
+ ShardCount: 0,
+ LoadCount: 0,
+ }
+
+ highUtilDisk := &DiskCandidate{
+ VolumeCount: 90,
+ MaxVolumeCount: 100,
+ ShardCount: 5,
+ LoadCount: 5,
+ }
+
+ lowScore := calculateDiskScore(lowUtilDisk)
+ highScore := calculateDiskScore(highUtilDisk)
+
+ if lowScore <= highScore {
+ t.Errorf("low utilization disk should have higher score: low=%f, high=%f", lowScore, highScore)
+ }
+}
+
+func TestCalculateIdealDistribution(t *testing.T) {
+ tests := []struct {
+ totalShards int
+ numServers int
+ expectedMin int
+ expectedMax int
+ }{
+ {14, 7, 2, 2}, // Even distribution
+ {14, 4, 3, 4}, // Uneven: 14/4 = 3 remainder 2
+ {6, 3, 2, 2}, // Even distribution
+ {7, 3, 2, 3}, // Uneven: 7/3 = 2 remainder 1
+ {10, 0, 0, 10}, // Edge case: no servers
+ {0, 5, 0, 0}, // Edge case: no shards
+ }
+
+ for _, tt := range tests {
+ min, max := CalculateIdealDistribution(tt.totalShards, tt.numServers)
+ if min != tt.expectedMin || max != tt.expectedMax {
+ t.Errorf("CalculateIdealDistribution(%d, %d) = (%d, %d), want (%d, %d)",
+tt.totalShards, tt.numServers, min, max, tt.expectedMin, tt.expectedMax)
+ }
+ }
+}
+
+func TestVerifySpread(t *testing.T) {
+ result := &PlacementResult{
+ ServersUsed: 3,
+ RacksUsed: 2,
+ }
+
+ // Should pass
+ if err := VerifySpread(result, 3, 2); err != nil {
+ t.Errorf("unexpected error: %v", err)
+ }
+
+ // Should fail - not enough servers
+ if err := VerifySpread(result, 4, 2); err == nil {
+ t.Error("expected error for insufficient servers")
+ }
+
+ // Should fail - not enough racks
+ if err := VerifySpread(result, 3, 3); err == nil {
+ t.Error("expected error for insufficient racks")
+ }
+}
+
+func TestSelectDestinations_MultiDC(t *testing.T) {
+ // Test: 2 DCs, each with 2 racks, each rack has 2 servers
+ disks := []*DiskCandidate{
+ // DC1, Rack1
+ makeDisk("dc1-r1-s1", 0, "dc1", "rack1", 10),
+ makeDisk("dc1-r1-s1", 1, "dc1", "rack1", 10),
+ makeDisk("dc1-r1-s2", 0, "dc1", "rack1", 10),
+ makeDisk("dc1-r1-s2", 1, "dc1", "rack1", 10),
+ // DC1, Rack2
+ makeDisk("dc1-r2-s1", 0, "dc1", "rack2", 10),
+ makeDisk("dc1-r2-s1", 1, "dc1", "rack2", 10),
+ makeDisk("dc1-r2-s2", 0, "dc1", "rack2", 10),
+ makeDisk("dc1-r2-s2", 1, "dc1", "rack2", 10),
+ // DC2, Rack1
+ makeDisk("dc2-r1-s1", 0, "dc2", "rack1", 10),
+ makeDisk("dc2-r1-s1", 1, "dc2", "rack1", 10),
+ makeDisk("dc2-r1-s2", 0, "dc2", "rack1", 10),
+ makeDisk("dc2-r1-s2", 1, "dc2", "rack1", 10),
+ // DC2, Rack2
+ makeDisk("dc2-r2-s1", 0, "dc2", "rack2", 10),
+ makeDisk("dc2-r2-s1", 1, "dc2", "rack2", 10),
+ makeDisk("dc2-r2-s2", 0, "dc2", "rack2", 10),
+ makeDisk("dc2-r2-s2", 1, "dc2", "rack2", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 8,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 8 {
+ t.Errorf("expected 8 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Should use all 4 racks
+ if result.RacksUsed != 4 {
+ t.Errorf("expected 4 racks used, got %d", result.RacksUsed)
+ }
+
+ // Should use both DCs
+ if result.DCsUsed != 2 {
+ t.Errorf("expected 2 DCs used, got %d", result.DCsUsed)
+ }
+}
+
+func TestSelectDestinations_SameRackDifferentDC(t *testing.T) {
+ // Test: Same rack name in different DCs should be treated as different racks
+ disks := []*DiskCandidate{
+ makeDisk("dc1-s1", 0, "dc1", "rack1", 10),
+ makeDisk("dc2-s1", 0, "dc2", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 2,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // Should use 2 racks (dc1:rack1 and dc2:rack1 are different)
+ if result.RacksUsed != 2 {
+ t.Errorf("expected 2 racks used (different DCs), got %d", result.RacksUsed)
+ }
+}
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go
index cd74bed33..c5568fe26 100644
--- a/weed/worker/tasks/erasure_coding/detection.go
+++ b/weed/worker/tasks/erasure_coding/detection.go
@@ -9,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@@ -429,85 +430,100 @@ func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.Era
}
// selectBestECDestinations selects multiple disks for EC shard placement with diversity
+// Uses the consolidated placement package for proper rack/server/disk spreading
func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo {
if len(disks) == 0 {
return nil
}
- // Group disks by rack and DC for diversity
- rackGroups := make(map[string][]*topology.DiskInfo)
- for _, disk := range disks {
- rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
- rackGroups[rackKey] = append(rackGroups[rackKey], disk)
+ // Convert topology.DiskInfo to placement.DiskCandidate
+ candidates := diskInfosToCandidates(disks)
+ if len(candidates) == 0 {
+ return nil
}
- var selected []*topology.DiskInfo
- usedRacks := make(map[string]bool)
+ // Configure placement for EC shards
+ config := placement.PlacementRequest{
+ ShardsNeeded: shardsNeeded,
+ MaxShardsPerServer: 0, // No hard limit, but prefer spreading
+ MaxShardsPerRack: 0, // No hard limit, but prefer spreading
+ MaxTaskLoad: topology.MaxTaskLoadForECPlacement,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
- // First pass: select one disk from each rack for maximum diversity
- for rackKey, rackDisks := range rackGroups {
- if len(selected) >= shardsNeeded {
- break
- }
+ // Use the shared placement algorithm
+ result, err := placement.SelectDestinations(candidates, config)
+ if err != nil {
+ glog.V(2).Infof("EC placement failed: %v", err)
+ return nil
+ }
- // Select best disk from this rack
- bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC)
- if bestDisk != nil {
- selected = append(selected, bestDisk)
- usedRacks[rackKey] = true
+ // Convert back to topology.DiskInfo
+ return candidatesToDiskInfos(result.SelectedDisks, disks)
+}
+
+// diskInfosToCandidates converts topology.DiskInfo slice to placement.DiskCandidate slice
+func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidate {
+ var candidates []*placement.DiskCandidate
+ for _, disk := range disks {
+ if disk.DiskInfo == nil {
+ continue
}
- }
- // Second pass: if we need more disks, select from racks we've already used
- if len(selected) < shardsNeeded {
- for _, disk := range disks {
- if len(selected) >= shardsNeeded {
- break
- }
+ // Calculate free slots (using default max if not set)
+ freeSlots := int(disk.DiskInfo.MaxVolumeCount - disk.DiskInfo.VolumeCount)
+ if freeSlots < 0 {
+ freeSlots = 0
+ }
- // Skip if already selected
- alreadySelected := false
- for _, sel := range selected {
- if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID {
- alreadySelected = true
- break
+ // Calculate EC shard count for this specific disk
+ // EcShardInfos contains all shards, so we need to filter by DiskId and sum actual shard counts
+ ecShardCount := 0
+ if disk.DiskInfo.EcShardInfos != nil {
+ for _, shardInfo := range disk.DiskInfo.EcShardInfos {
+ if shardInfo.DiskId == disk.DiskID {
+ ecShardCount += erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIdCount()
}
}
-
- if !alreadySelected && isDiskSuitableForEC(disk) {
- selected = append(selected, disk)
- }
}
- }
- return selected
+ candidates = append(candidates, &placement.DiskCandidate{
+ NodeID: disk.NodeID,
+ DiskID: disk.DiskID,
+ DataCenter: disk.DataCenter,
+ Rack: disk.Rack,
+ VolumeCount: disk.DiskInfo.VolumeCount,
+ MaxVolumeCount: disk.DiskInfo.MaxVolumeCount,
+ ShardCount: ecShardCount,
+ FreeSlots: freeSlots,
+ LoadCount: disk.LoadCount,
+ })
+ }
+ return candidates
}
-// selectBestFromRack selects the best disk from a rack for EC placement
-func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo {
- if len(disks) == 0 {
- return nil
+// candidatesToDiskInfos converts placement results back to topology.DiskInfo
+func candidatesToDiskInfos(candidates []*placement.DiskCandidate, originalDisks []*topology.DiskInfo) []*topology.DiskInfo {
+ // Create a map for quick lookup
+ diskMap := make(map[string]*topology.DiskInfo)
+ for _, disk := range originalDisks {
+ key := fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID)
+ diskMap[key] = disk
}
- var bestDisk *topology.DiskInfo
- bestScore := -1.0
-
- for _, disk := range disks {
- if !isDiskSuitableForEC(disk) {
- continue
- }
-
- score := calculateECScore(disk, sourceRack, sourceDC)
- if score > bestScore {
- bestScore = score
- bestDisk = disk
+ var result []*topology.DiskInfo
+ for _, candidate := range candidates {
+ key := fmt.Sprintf("%s:%d", candidate.NodeID, candidate.DiskID)
+ if disk, ok := diskMap[key]; ok {
+ result = append(result, disk)
}
}
-
- return bestDisk
+ return result
}
// calculateECScore calculates placement score for EC operations
+// Used for logging and plan metadata
func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 {
if disk.DiskInfo == nil {
return 0.0
@@ -524,14 +540,12 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa
// Consider current load (secondary factor)
score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
- // Note: We don't penalize placing shards on the same rack/DC as source
- // since the original volume will be deleted after EC conversion.
- // This allows for better network efficiency and storage utilization.
-
return score
}
// isDiskSuitableForEC checks if a disk is suitable for EC placement
+// Note: This is kept for backward compatibility but the placement package
+// handles filtering internally
func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
if disk.DiskInfo == nil {
return false