aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/ec-integration-tests.yml41
-rw-r--r--test/erasure_coding/ec_integration_test.go438
-rw-r--r--weed/shell/command_ec_common.go190
-rw-r--r--weed/shell/command_volume_server_evacuate.go10
-rw-r--r--weed/storage/erasure_coding/placement/placement.go420
-rw-r--r--weed/storage/erasure_coding/placement/placement_test.go517
-rw-r--r--weed/worker/tasks/erasure_coding/detection.go130
7 files changed, 1673 insertions, 73 deletions
diff --git a/.github/workflows/ec-integration-tests.yml b/.github/workflows/ec-integration-tests.yml
new file mode 100644
index 000000000..ea476b77c
--- /dev/null
+++ b/.github/workflows/ec-integration-tests.yml
@@ -0,0 +1,41 @@
+name: "EC Integration Tests"
+
+on:
+ push:
+ branches: [ master ]
+ pull_request:
+ branches: [ master ]
+
+permissions:
+ contents: read
+
+jobs:
+ ec-integration-tests:
+ name: EC Integration Tests
+ runs-on: ubuntu-22.04
+ timeout-minutes: 30
+ steps:
+ - name: Set up Go 1.x
+ uses: actions/setup-go@v6
+ with:
+ go-version: ^1.24
+ id: go
+
+ - name: Check out code into the Go module directory
+ uses: actions/checkout@v4
+
+ - name: Build weed binary
+ run: |
+ cd weed && go build -o weed .
+
+ - name: Run EC Integration Tests
+ working-directory: test/erasure_coding
+ run: |
+ go test -v
+
+ - name: Archive logs
+ if: failure()
+ uses: actions/upload-artifact@v4
+ with:
+ name: ec-integration-test-logs
+ path: test/erasure_coding \ No newline at end of file
diff --git a/test/erasure_coding/ec_integration_test.go b/test/erasure_coding/ec_integration_test.go
index 87b9b40ba..67f8eed04 100644
--- a/test/erasure_coding/ec_integration_test.go
+++ b/test/erasure_coding/ec_integration_test.go
@@ -5,9 +5,11 @@ import (
"context"
"fmt"
"io"
+ "math"
"os"
"os/exec"
"path/filepath"
+ "strings"
"testing"
"time"
@@ -139,6 +141,15 @@ func TestECEncodingVolumeLocationTimingBug(t *testing.T) {
t.Logf("EC encoding completed successfully")
}
+ // Add detailed logging for EC encoding command
+ t.Logf("Debug: Executing EC encoding command for volume %d", volumeId)
+ t.Logf("Debug: Command arguments: %v", args)
+ if err != nil {
+ t.Logf("Debug: EC encoding command failed with error: %v", err)
+ } else {
+ t.Logf("Debug: EC encoding command completed successfully")
+ }
+
// The key test: check if the fix prevents the timing issue
if contains(outputStr, "Collecting volume locations") && contains(outputStr, "before EC encoding") {
t.Logf("FIX DETECTED: Volume locations collected BEFORE EC encoding (timing bug prevented)")
@@ -526,7 +537,8 @@ func uploadTestData(data []byte, masterAddress string) (needle.VolumeId, error)
func getVolumeLocations(commandEnv *shell.CommandEnv, volumeId needle.VolumeId) ([]string, error) {
// Retry mechanism to handle timing issues with volume registration
- for i := 0; i < 10; i++ {
+ // Increase retry attempts for volume location retrieval
+ for i := 0; i < 20; i++ { // Increased from 10 to 20 retries
locations, ok := commandEnv.MasterClient.GetLocationsClone(uint32(volumeId))
if ok {
var result []string
@@ -646,3 +658,427 @@ func TestECEncodingRegressionPrevention(t *testing.T) {
t.Log("Timing pattern regression test passed")
})
}
+
+// TestDiskAwareECRebalancing tests EC shard placement across multiple disks per server
+// This verifies the disk-aware EC rebalancing feature works correctly
+func TestDiskAwareECRebalancing(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping disk-aware integration test in short mode")
+ }
+
+ testDir, err := os.MkdirTemp("", "seaweedfs_disk_aware_ec_test_")
+ require.NoError(t, err)
+ defer os.RemoveAll(testDir)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
+ defer cancel()
+
+ // Start cluster with MULTIPLE DISKS per volume server
+ cluster, err := startMultiDiskCluster(ctx, testDir)
+ require.NoError(t, err)
+ defer cluster.Stop()
+
+ // Wait for servers to be ready
+ require.NoError(t, waitForServer("127.0.0.1:9334", 30*time.Second))
+ for i := 0; i < 3; i++ {
+ require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:809%d", i), 30*time.Second))
+ }
+
+ // Wait longer for volume servers to register with master and create volumes
+ t.Log("Waiting for volume servers to register with master...")
+ time.Sleep(10 * time.Second)
+
+ // Create command environment
+ options := &shell.ShellOptions{
+ Masters: stringPtr("127.0.0.1:9334"),
+ GrpcDialOption: grpc.WithInsecure(),
+ FilerGroup: stringPtr("default"),
+ }
+ commandEnv := shell.NewCommandEnv(options)
+
+ // Connect to master with longer timeout
+ ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
+ defer cancel2()
+ go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
+ commandEnv.MasterClient.WaitUntilConnected(ctx2)
+
+ // Wait for master client to fully sync
+ time.Sleep(5 * time.Second)
+
+ // Upload test data to create a volume - retry if volumes not ready
+ var volumeId needle.VolumeId
+ testData := []byte("Disk-aware EC rebalancing test data - this needs to be large enough to create a volume")
+ for retry := 0; retry < 5; retry++ {
+ volumeId, err = uploadTestDataToMaster(testData, "127.0.0.1:9334")
+ if err == nil {
+ break
+ }
+ t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, err)
+ time.Sleep(3 * time.Second)
+ }
+ require.NoError(t, err, "Failed to upload test data after retries")
+ t.Logf("Created volume %d for disk-aware EC test", volumeId)
+
+ // Wait for volume to be registered
+ time.Sleep(3 * time.Second)
+
+ t.Run("verify_multi_disk_setup", func(t *testing.T) {
+ // Verify that each server has multiple disk directories
+ for server := 0; server < 3; server++ {
+ diskCount := 0
+ for disk := 0; disk < 4; disk++ {
+ diskDir := filepath.Join(testDir, fmt.Sprintf("server%d_disk%d", server, disk))
+ if _, err := os.Stat(diskDir); err == nil {
+ diskCount++
+ }
+ }
+ assert.Equal(t, 4, diskCount, "Server %d should have 4 disk directories", server)
+ t.Logf("Server %d has %d disk directories", server, diskCount)
+ }
+ })
+
+ t.Run("ec_encode_with_disk_awareness", func(t *testing.T) {
+ // Get lock first
+ lockCmd := shell.Commands[findCommandIndex("lock")]
+ var lockOutput bytes.Buffer
+ err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
+ if err != nil {
+ t.Logf("Lock command failed: %v", err)
+ }
+
+ // Execute EC encoding
+ var output bytes.Buffer
+ ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
+ args := []string{"-volumeId", fmt.Sprintf("%d", volumeId), "-collection", "test", "-force"}
+
+ // Capture output
+ oldStdout := os.Stdout
+ oldStderr := os.Stderr
+ r, w, _ := os.Pipe()
+ os.Stdout = w
+ os.Stderr = w
+
+ err = ecEncodeCmd.Do(args, commandEnv, &output)
+
+ w.Close()
+ os.Stdout = oldStdout
+ os.Stderr = oldStderr
+
+ capturedOutput, _ := io.ReadAll(r)
+ outputStr := string(capturedOutput) + output.String()
+
+ t.Logf("EC encode output:\n%s", outputStr)
+
+ if err != nil {
+ t.Logf("EC encoding completed with error: %v", err)
+ } else {
+ t.Logf("EC encoding completed successfully")
+ }
+ })
+
+ t.Run("verify_disk_level_shard_distribution", func(t *testing.T) {
+ // Wait for shards to be distributed
+ time.Sleep(2 * time.Second)
+
+ // Count shards on each disk of each server
+ diskDistribution := countShardsPerDisk(testDir, uint32(volumeId))
+
+ totalShards := 0
+ disksWithShards := 0
+ maxShardsOnSingleDisk := 0
+
+ t.Logf("Disk-level shard distribution for volume %d:", volumeId)
+ for server, disks := range diskDistribution {
+ for diskId, shardCount := range disks {
+ if shardCount > 0 {
+ t.Logf(" %s disk %d: %d shards", server, diskId, shardCount)
+ totalShards += shardCount
+ disksWithShards++
+ if shardCount > maxShardsOnSingleDisk {
+ maxShardsOnSingleDisk = shardCount
+ }
+ }
+ }
+ }
+
+ t.Logf("Summary: %d total shards across %d disks (max %d on single disk)",
+ totalShards, disksWithShards, maxShardsOnSingleDisk)
+
+ // EC creates 14 shards (10 data + 4 parity), plus .ecx and .ecj files
+ // We should see shards distributed across multiple disks
+ if disksWithShards > 1 {
+ t.Logf("PASS: Shards distributed across %d disks", disksWithShards)
+ } else {
+ t.Logf("INFO: Shards on %d disk(s) - may be expected if volume was on single disk", disksWithShards)
+ }
+ })
+
+ t.Run("test_ec_balance_disk_awareness", func(t *testing.T) {
+ // Calculate initial disk balance variance
+ initialDistribution := countShardsPerDisk(testDir, uint32(volumeId))
+ initialVariance := calculateDiskShardVariance(initialDistribution)
+ t.Logf("Initial disk shard variance: %.2f", initialVariance)
+
+ // Run ec.balance command
+ var output bytes.Buffer
+ ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")]
+
+ oldStdout := os.Stdout
+ oldStderr := os.Stderr
+ r, w, _ := os.Pipe()
+ os.Stdout = w
+ os.Stderr = w
+
+ err := ecBalanceCmd.Do([]string{"-force"}, commandEnv, &output)
+
+ w.Close()
+ os.Stdout = oldStdout
+ os.Stderr = oldStderr
+
+ capturedOutput, _ := io.ReadAll(r)
+ outputStr := string(capturedOutput) + output.String()
+
+ if err != nil {
+ t.Logf("ec.balance error: %v", err)
+ }
+ t.Logf("ec.balance output:\n%s", outputStr)
+
+ // Wait for balance to complete
+ time.Sleep(2 * time.Second)
+
+ // Calculate final disk balance variance
+ finalDistribution := countShardsPerDisk(testDir, uint32(volumeId))
+ finalVariance := calculateDiskShardVariance(finalDistribution)
+ t.Logf("Final disk shard variance: %.2f", finalVariance)
+
+ t.Logf("Variance change: %.2f -> %.2f", initialVariance, finalVariance)
+ })
+
+ t.Run("verify_no_disk_overload", func(t *testing.T) {
+ // Verify that no single disk has too many shards of the same volume
+ diskDistribution := countShardsPerDisk(testDir, uint32(volumeId))
+
+ for server, disks := range diskDistribution {
+ for diskId, shardCount := range disks {
+ // With 14 EC shards and 12 disks (3 servers x 4 disks), ideally ~1-2 shards per disk
+ // Allow up to 4 shards per disk as a reasonable threshold
+ if shardCount > 4 {
+ t.Logf("WARNING: %s disk %d has %d shards (may indicate imbalance)",
+ server, diskId, shardCount)
+ }
+ }
+ }
+ })
+}
+
+// MultiDiskCluster represents a test cluster with multiple disks per volume server
+type MultiDiskCluster struct {
+ masterCmd *exec.Cmd
+ volumeServers []*exec.Cmd
+ testDir string
+}
+
+func (c *MultiDiskCluster) Stop() {
+ // Stop volume servers first
+ for _, cmd := range c.volumeServers {
+ if cmd != nil && cmd.Process != nil {
+ cmd.Process.Kill()
+ cmd.Wait()
+ }
+ }
+
+ // Stop master server
+ if c.masterCmd != nil && c.masterCmd.Process != nil {
+ c.masterCmd.Process.Kill()
+ c.masterCmd.Wait()
+ }
+}
+
+// startMultiDiskCluster starts a SeaweedFS cluster with multiple disks per volume server
+func startMultiDiskCluster(ctx context.Context, dataDir string) (*MultiDiskCluster, error) {
+ weedBinary := findWeedBinary()
+ if weedBinary == "" {
+ return nil, fmt.Errorf("weed binary not found")
+ }
+
+ cluster := &MultiDiskCluster{testDir: dataDir}
+
+ // Create master directory
+ masterDir := filepath.Join(dataDir, "master")
+ os.MkdirAll(masterDir, 0755)
+
+ // Start master server on a different port to avoid conflict
+ masterCmd := exec.CommandContext(ctx, weedBinary, "master",
+ "-port", "9334",
+ "-mdir", masterDir,
+ "-volumeSizeLimitMB", "10",
+ "-ip", "127.0.0.1",
+ )
+
+ masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
+ if err != nil {
+ return nil, fmt.Errorf("failed to create master log file: %v", err)
+ }
+ masterCmd.Stdout = masterLogFile
+ masterCmd.Stderr = masterLogFile
+
+ if err := masterCmd.Start(); err != nil {
+ return nil, fmt.Errorf("failed to start master server: %v", err)
+ }
+ cluster.masterCmd = masterCmd
+
+ // Wait for master to be ready
+ time.Sleep(2 * time.Second)
+
+ // Start 3 volume servers, each with 4 disks
+ const numServers = 3
+ const disksPerServer = 4
+
+ for i := 0; i < numServers; i++ {
+ // Create 4 disk directories per server
+ var diskDirs []string
+ var maxVolumes []string
+
+ for d := 0; d < disksPerServer; d++ {
+ diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d_disk%d", i, d))
+ if err := os.MkdirAll(diskDir, 0755); err != nil {
+ cluster.Stop()
+ return nil, fmt.Errorf("failed to create disk dir: %v", err)
+ }
+ diskDirs = append(diskDirs, diskDir)
+ maxVolumes = append(maxVolumes, "5")
+ }
+
+ port := fmt.Sprintf("809%d", i)
+ rack := fmt.Sprintf("rack%d", i)
+
+ volumeCmd := exec.CommandContext(ctx, weedBinary, "volume",
+ "-port", port,
+ "-dir", strings.Join(diskDirs, ","),
+ "-max", strings.Join(maxVolumes, ","),
+ "-mserver", "127.0.0.1:9334",
+ "-ip", "127.0.0.1",
+ "-dataCenter", "dc1",
+ "-rack", rack,
+ )
+
+ // Create log file for this volume server
+ logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i))
+ os.MkdirAll(logDir, 0755)
+ volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log"))
+ if err != nil {
+ cluster.Stop()
+ return nil, fmt.Errorf("failed to create volume log file: %v", err)
+ }
+ volumeCmd.Stdout = volumeLogFile
+ volumeCmd.Stderr = volumeLogFile
+
+ if err := volumeCmd.Start(); err != nil {
+ cluster.Stop()
+ return nil, fmt.Errorf("failed to start volume server %d: %v", i, err)
+ }
+ cluster.volumeServers = append(cluster.volumeServers, volumeCmd)
+ }
+
+ // Wait for volume servers to register with master
+ // Multi-disk servers may take longer to initialize
+ time.Sleep(8 * time.Second)
+
+ return cluster, nil
+}
+
+// uploadTestDataToMaster uploads test data to a specific master address
+func uploadTestDataToMaster(data []byte, masterAddress string) (needle.VolumeId, error) {
+ assignResult, err := operation.Assign(context.Background(), func(ctx context.Context) pb.ServerAddress {
+ return pb.ServerAddress(masterAddress)
+ }, grpc.WithInsecure(), &operation.VolumeAssignRequest{
+ Count: 1,
+ Collection: "test",
+ Replication: "000",
+ })
+ if err != nil {
+ return 0, err
+ }
+
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ return 0, err
+ }
+
+ uploadResult, err, _ := uploader.Upload(context.Background(), bytes.NewReader(data), &operation.UploadOption{
+ UploadUrl: "http://" + assignResult.Url + "/" + assignResult.Fid,
+ Filename: "testfile.txt",
+ MimeType: "text/plain",
+ })
+ if err != nil {
+ return 0, err
+ }
+
+ if uploadResult.Error != "" {
+ return 0, fmt.Errorf("upload error: %s", uploadResult.Error)
+ }
+
+ fid, err := needle.ParseFileIdFromString(assignResult.Fid)
+ if err != nil {
+ return 0, err
+ }
+
+ return fid.VolumeId, nil
+}
+
+// countShardsPerDisk counts EC shards on each disk of each server
+// Returns map: "serverN" -> map[diskId]shardCount
+func countShardsPerDisk(testDir string, volumeId uint32) map[string]map[int]int {
+ result := make(map[string]map[int]int)
+
+ const numServers = 3
+ const disksPerServer = 4
+
+ for server := 0; server < numServers; server++ {
+ serverKey := fmt.Sprintf("server%d", server)
+ result[serverKey] = make(map[int]int)
+
+ for disk := 0; disk < disksPerServer; disk++ {
+ diskDir := filepath.Join(testDir, fmt.Sprintf("server%d_disk%d", server, disk))
+ count, err := countECShardFiles(diskDir, volumeId)
+ if err == nil && count > 0 {
+ result[serverKey][disk] = count
+ }
+ }
+ }
+
+ return result
+}
+
+// calculateDiskShardVariance measures how evenly shards are distributed across disks
+// Lower variance means better distribution
+func calculateDiskShardVariance(distribution map[string]map[int]int) float64 {
+ var counts []float64
+
+ for _, disks := range distribution {
+ for _, count := range disks {
+ if count > 0 {
+ counts = append(counts, float64(count))
+ }
+ }
+ }
+
+ if len(counts) == 0 {
+ return 0
+ }
+
+ // Calculate mean
+ mean := 0.0
+ for _, c := range counts {
+ mean += c
+ }
+ mean /= float64(len(counts))
+
+ // Calculate variance
+ variance := 0.0
+ for _, c := range counts {
+ variance += (c - mean) * (c - mean)
+ }
+
+ return math.Sqrt(variance / float64(len(counts)))
+}
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index f059b4e74..f2cc581da 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -26,12 +26,25 @@ type DataCenterId string
type EcNodeId string
type RackId string
+// EcDisk represents a single disk on a volume server
+type EcDisk struct {
+ diskId uint32
+ diskType string
+ freeEcSlots int
+ ecShardCount int // Total EC shards on this disk
+ // Map of volumeId -> shardBits for shards on this disk
+ ecShards map[needle.VolumeId]erasure_coding.ShardBits
+}
+
type EcNode struct {
info *master_pb.DataNodeInfo
dc DataCenterId
rack RackId
freeEcSlot int
+ // disks maps diskId -> EcDisk for disk-level balancing
+ disks map[uint32]*EcDisk
}
+
type CandidateEcNode struct {
ecNode *EcNode
shardCount int
@@ -229,7 +242,7 @@ func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.Vol
return collections
}
-func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
+func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, destDiskId uint32, applyBalancing bool) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
@@ -242,7 +255,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
// ask destination node to copy shard and the ecx file from source node, and mount it
- copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
+ copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress, destDiskId)
if err != nil {
return err
}
@@ -259,7 +272,11 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
return err
}
- fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
+ if destDiskId > 0 {
+ fmt.Printf("moved ec shard %d.%d %s => %s (disk %d)\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
+ }
}
@@ -272,7 +289,7 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
targetServer *EcNode, shardIdsToCopy []uint32,
- volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
+ volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress, destDiskId uint32) (copiedShardIds []uint32, err error) {
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
@@ -289,6 +306,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(existingLocation),
+ DiskId: destDiskId,
})
if copyErr != nil {
return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
@@ -410,12 +428,74 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
}
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
- ecNodes = append(ecNodes, &EcNode{
+ ecNode := &EcNode{
info: dn,
dc: dc,
rack: rack,
freeEcSlot: int(freeEcSlots),
- })
+ disks: make(map[uint32]*EcDisk),
+ }
+
+ // Build disk-level information from volumes and EC shards
+ // First, discover all unique disk IDs from VolumeInfos (includes empty disks)
+ allDiskIds := make(map[uint32]string) // diskId -> diskType
+ for diskType, diskInfo := range dn.DiskInfos {
+ if diskInfo == nil {
+ continue
+ }
+ // Get all disk IDs from volumes
+ for _, vi := range diskInfo.VolumeInfos {
+ allDiskIds[vi.DiskId] = diskType
+ }
+ // Also get disk IDs from EC shards
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ allDiskIds[ecShardInfo.DiskId] = diskType
+ }
+ }
+
+ // Group EC shards by disk_id
+ diskShards := make(map[uint32]map[needle.VolumeId]erasure_coding.ShardBits)
+ for _, diskInfo := range dn.DiskInfos {
+ if diskInfo == nil {
+ continue
+ }
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ diskId := ecShardInfo.DiskId
+ if diskShards[diskId] == nil {
+ diskShards[diskId] = make(map[needle.VolumeId]erasure_coding.ShardBits)
+ }
+ vid := needle.VolumeId(ecShardInfo.Id)
+ diskShards[diskId][vid] = erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
+ }
+ }
+
+ // Create EcDisk for each discovered disk
+ diskCount := len(allDiskIds)
+ if diskCount == 0 {
+ diskCount = 1
+ }
+ freePerDisk := int(freeEcSlots) / diskCount
+
+ for diskId, diskType := range allDiskIds {
+ shards := diskShards[diskId]
+ if shards == nil {
+ shards = make(map[needle.VolumeId]erasure_coding.ShardBits)
+ }
+ totalShardCount := 0
+ for _, shardBits := range shards {
+ totalShardCount += shardBits.ShardIdCount()
+ }
+
+ ecNode.disks[diskId] = &EcDisk{
+ diskId: diskId,
+ diskType: diskType,
+ freeEcSlots: freePerDisk,
+ ecShardCount: totalShardCount,
+ ecShards: shards,
+ }
+ }
+
+ ecNodes = append(ecNodes, ecNode)
totalFreeEcSlots += freeEcSlots
})
return
@@ -884,10 +964,16 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
for _, shards := range fullDiskInfo.EcShardInfos {
if _, found := emptyNodeIds[shards.Id]; !found {
for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
+ vid := needle.VolumeId(shards.Id)
+ destDiskId := pickBestDiskOnNode(emptyNode, vid)
- fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
+ if destDiskId > 0 {
+ fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
+ }
- err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing)
+ err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing)
if err != nil {
return err
}
@@ -957,18 +1043,98 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi
if len(targets) == 0 {
return nil, errors.New(details)
}
+
+ // When multiple nodes have the same shard count, prefer nodes with better disk distribution
+ // (i.e., nodes with more disks that have fewer shards of this volume)
+ if len(targets) > 1 {
+ slices.SortFunc(targets, func(a, b *EcNode) int {
+ aScore := diskDistributionScore(a, vid)
+ bScore := diskDistributionScore(b, vid)
+ return aScore - bScore // Lower score is better
+ })
+ return targets[0], nil
+ }
+
return targets[rand.IntN(len(targets))], nil
}
+// diskDistributionScore calculates a score for how well-distributed shards are on the node's disks
+// Lower score is better (means more room for balanced distribution)
+func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int {
+ if len(ecNode.disks) == 0 {
+ return 0
+ }
+
+ // Sum the existing shard count for this volume on each disk
+ // Lower total means more room for new shards
+ score := 0
+ for _, disk := range ecNode.disks {
+ if shardBits, ok := disk.ecShards[vid]; ok {
+ score += shardBits.ShardIdCount() * 10 // Weight shards of this volume heavily
+ }
+ score += disk.ecShardCount // Also consider total shards on disk
+ }
+ return score
+}
+
+// pickBestDiskOnNode selects the best disk on a node for placing a new EC shard
+// It prefers disks with fewer shards and more free slots
+func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId) uint32 {
+ if len(ecNode.disks) == 0 {
+ return 0 // No disk info available, let the server decide
+ }
+
+ var bestDiskId uint32
+ bestScore := -1
+
+ for diskId, disk := range ecNode.disks {
+ if disk.freeEcSlots <= 0 {
+ continue
+ }
+
+ // Check if this volume already has shards on this disk
+ existingShards := 0
+ if shardBits, ok := disk.ecShards[vid]; ok {
+ existingShards = shardBits.ShardIdCount()
+ }
+
+ // Score: prefer disks with fewer total shards and fewer shards of this volume
+ // Lower score is better
+ score := disk.ecShardCount*10 + existingShards*100
+
+ if bestScore == -1 || score < bestScore {
+ bestScore = score
+ bestDiskId = diskId
+ }
+ }
+
+ return bestDiskId
+}
+
+// pickEcNodeAndDiskToBalanceShardsInto picks both a destination node and specific disk
+func (ecb *ecBalancer) pickEcNodeAndDiskToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, uint32, error) {
+ node, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinations)
+ if err != nil {
+ return nil, 0, err
+ }
+
+ diskId := pickBestDiskOnNode(node, vid)
+ return node, diskId, nil
+}
+
func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error {
- destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
+ destNode, destDiskId, err := ecb.pickEcNodeAndDiskToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
if err != nil {
- fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error())
+ fmt.Printf("WARNING: Could not find suitable target node for %d.%d:\n%s", vid, shardId, err.Error())
return nil
}
- fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
- return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing)
+ if destDiskId > 0 {
+ fmt.Printf("%s moves ec shard %d.%d to %s (disk %d)\n", existingLocation.info.Id, vid, shardId, destNode.info.Id, destDiskId)
+ } else {
+ fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
+ }
+ return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, destDiskId, ecb.applyBalancing)
}
func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go
index 5c1805c89..6135eb3eb 100644
--- a/weed/shell/command_volume_server_evacuate.go
+++ b/weed/shell/command_volume_server_evacuate.go
@@ -197,8 +197,14 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv
if ecShardInfo.Collection != "" {
collectionPrefix = ecShardInfo.Collection + "_"
}
- fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
- err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
+ vid := needle.VolumeId(ecShardInfo.Id)
+ destDiskId := pickBestDiskOnNode(emptyNode, vid)
+ if destDiskId > 0 {
+ fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId)
+ } else {
+ fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
+ }
+ err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange)
if err != nil {
return
} else {
diff --git a/weed/storage/erasure_coding/placement/placement.go b/weed/storage/erasure_coding/placement/placement.go
new file mode 100644
index 000000000..67e21c1f8
--- /dev/null
+++ b/weed/storage/erasure_coding/placement/placement.go
@@ -0,0 +1,420 @@
+// Package placement provides consolidated EC shard placement logic used by
+// both shell commands and worker tasks.
+//
+// This package encapsulates the algorithms for:
+// - Selecting destination nodes/disks for EC shards
+// - Ensuring proper spread across racks, servers, and disks
+// - Balancing shards across the cluster
+package placement
+
+import (
+ "fmt"
+ "sort"
+)
+
+// DiskCandidate represents a disk that can receive EC shards
+type DiskCandidate struct {
+ NodeID string
+ DiskID uint32
+ DataCenter string
+ Rack string
+
+ // Capacity information
+ VolumeCount int64
+ MaxVolumeCount int64
+ ShardCount int // Current number of EC shards on this disk
+ FreeSlots int // Available slots for new shards
+
+ // Load information
+ LoadCount int // Number of active tasks on this disk
+}
+
+// NodeCandidate represents a server node that can receive EC shards
+type NodeCandidate struct {
+ NodeID string
+ DataCenter string
+ Rack string
+ FreeSlots int
+ ShardCount int // Total shards across all disks
+ Disks []*DiskCandidate // All disks on this node
+}
+
+// PlacementRequest configures EC shard placement behavior
+type PlacementRequest struct {
+ // ShardsNeeded is the total number of shards to place
+ ShardsNeeded int
+
+ // MaxShardsPerServer limits how many shards can be placed on a single server
+ // 0 means no limit (but prefer spreading when possible)
+ MaxShardsPerServer int
+
+ // MaxShardsPerRack limits how many shards can be placed in a single rack
+ // 0 means no limit
+ MaxShardsPerRack int
+
+ // MaxTaskLoad is the maximum task load count for a disk to be considered
+ MaxTaskLoad int
+
+ // PreferDifferentServers when true, spreads shards across different servers
+ // before using multiple disks on the same server
+ PreferDifferentServers bool
+
+ // PreferDifferentRacks when true, spreads shards across different racks
+ // before using multiple servers in the same rack
+ PreferDifferentRacks bool
+}
+
+// DefaultPlacementRequest returns the default placement configuration
+func DefaultPlacementRequest() PlacementRequest {
+ return PlacementRequest{
+ ShardsNeeded: 14,
+ MaxShardsPerServer: 0,
+ MaxShardsPerRack: 0,
+ MaxTaskLoad: 5,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+}
+
+// PlacementResult contains the selected destinations for EC shards
+type PlacementResult struct {
+ SelectedDisks []*DiskCandidate
+
+ // Statistics
+ ServersUsed int
+ RacksUsed int
+ DCsUsed int
+
+ // Distribution maps
+ ShardsPerServer map[string]int
+ ShardsPerRack map[string]int
+ ShardsPerDC map[string]int
+}
+
+// SelectDestinations selects the best disks for EC shard placement.
+// This is the main entry point for EC placement logic.
+//
+// The algorithm works in multiple passes:
+// 1. First pass: Select one disk from each rack (maximize rack diversity)
+// 2. Second pass: Select one disk from each unused server in used racks (maximize server diversity)
+// 3. Third pass: Select additional disks from servers already used (maximize disk diversity)
+func SelectDestinations(disks []*DiskCandidate, config PlacementRequest) (*PlacementResult, error) {
+ if len(disks) == 0 {
+ return nil, fmt.Errorf("no disk candidates provided")
+ }
+ if config.ShardsNeeded <= 0 {
+ return nil, fmt.Errorf("shardsNeeded must be positive, got %d", config.ShardsNeeded)
+ }
+
+ // Filter suitable disks
+ suitable := filterSuitableDisks(disks, config)
+ if len(suitable) == 0 {
+ return nil, fmt.Errorf("no suitable disks found after filtering")
+ }
+
+ // Build indexes for efficient lookup
+ rackToDisks := groupDisksByRack(suitable)
+
+ result := &PlacementResult{
+ SelectedDisks: make([]*DiskCandidate, 0, config.ShardsNeeded),
+ ShardsPerServer: make(map[string]int),
+ ShardsPerRack: make(map[string]int),
+ ShardsPerDC: make(map[string]int),
+ }
+
+ usedDisks := make(map[string]bool) // "nodeID:diskID" -> bool
+ usedServers := make(map[string]bool) // nodeID -> bool
+ usedRacks := make(map[string]bool) // "dc:rack" -> bool
+
+ // Pass 1: Select one disk from each rack (maximize rack diversity)
+ if config.PreferDifferentRacks {
+ // Sort racks by number of available servers (descending) to prioritize racks with more options
+ sortedRacks := sortRacksByServerCount(rackToDisks)
+ for _, rackKey := range sortedRacks {
+ if len(result.SelectedDisks) >= config.ShardsNeeded {
+ break
+ }
+ rackDisks := rackToDisks[rackKey]
+ // Select best disk from this rack, preferring a new server
+ disk := selectBestDiskFromRack(rackDisks, usedServers, usedDisks, config)
+ if disk != nil {
+ addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
+ }
+ }
+ }
+
+ // Pass 2: Select disks from unused servers in already-used racks
+ if config.PreferDifferentServers && len(result.SelectedDisks) < config.ShardsNeeded {
+ for _, rackKey := range getSortedRackKeys(rackToDisks) {
+ if len(result.SelectedDisks) >= config.ShardsNeeded {
+ break
+ }
+ rackDisks := rackToDisks[rackKey]
+ for _, disk := range sortDisksByScore(rackDisks) {
+ if len(result.SelectedDisks) >= config.ShardsNeeded {
+ break
+ }
+ diskKey := getDiskKey(disk)
+ if usedDisks[diskKey] {
+ continue
+ }
+ // Skip if server already used (we want different servers in this pass)
+ if usedServers[disk.NodeID] {
+ continue
+ }
+ // Check server limit
+ if config.MaxShardsPerServer > 0 && result.ShardsPerServer[disk.NodeID] >= config.MaxShardsPerServer {
+ continue
+ }
+ // Check rack limit
+ if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack {
+ continue
+ }
+ addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
+ }
+ }
+ }
+
+ // Pass 3: Fill remaining slots from already-used servers (different disks)
+ // Use round-robin across servers to balance shards evenly
+ if len(result.SelectedDisks) < config.ShardsNeeded {
+ // Group remaining disks by server
+ serverToRemainingDisks := make(map[string][]*DiskCandidate)
+ for _, disk := range suitable {
+ if !usedDisks[getDiskKey(disk)] {
+ serverToRemainingDisks[disk.NodeID] = append(serverToRemainingDisks[disk.NodeID], disk)
+ }
+ }
+
+ // Sort each server's disks by score
+ for serverID := range serverToRemainingDisks {
+ serverToRemainingDisks[serverID] = sortDisksByScore(serverToRemainingDisks[serverID])
+ }
+
+ // Round-robin: repeatedly select from the server with the fewest shards
+ for len(result.SelectedDisks) < config.ShardsNeeded {
+ // Find server with fewest shards that still has available disks
+ var bestServer string
+ minShards := -1
+ for serverID, disks := range serverToRemainingDisks {
+ if len(disks) == 0 {
+ continue
+ }
+ // Check server limit
+ if config.MaxShardsPerServer > 0 && result.ShardsPerServer[serverID] >= config.MaxShardsPerServer {
+ continue
+ }
+ shardCount := result.ShardsPerServer[serverID]
+ if minShards == -1 || shardCount < minShards {
+ minShards = shardCount
+ bestServer = serverID
+ } else if shardCount == minShards && serverID < bestServer {
+ // Tie-break by server name for determinism
+ bestServer = serverID
+ }
+ }
+
+ if bestServer == "" {
+ // No more servers with available disks
+ break
+ }
+
+ // Pop the best disk from this server
+ disks := serverToRemainingDisks[bestServer]
+ disk := disks[0]
+ serverToRemainingDisks[bestServer] = disks[1:]
+
+ // Check rack limit
+ if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack {
+ continue
+ }
+
+ addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
+ }
+ }
+
+ // Calculate final statistics
+ result.ServersUsed = len(usedServers)
+ result.RacksUsed = len(usedRacks)
+ dcSet := make(map[string]bool)
+ for _, disk := range result.SelectedDisks {
+ dcSet[disk.DataCenter] = true
+ }
+ result.DCsUsed = len(dcSet)
+
+ return result, nil
+}
+
+// filterSuitableDisks filters disks that are suitable for EC placement
+func filterSuitableDisks(disks []*DiskCandidate, config PlacementRequest) []*DiskCandidate {
+ var suitable []*DiskCandidate
+ for _, disk := range disks {
+ if disk.FreeSlots <= 0 {
+ continue
+ }
+ if config.MaxTaskLoad > 0 && disk.LoadCount > config.MaxTaskLoad {
+ continue
+ }
+ suitable = append(suitable, disk)
+ }
+ return suitable
+}
+
+// groupDisksByRack groups disks by their rack (dc:rack key)
+func groupDisksByRack(disks []*DiskCandidate) map[string][]*DiskCandidate {
+ result := make(map[string][]*DiskCandidate)
+ for _, disk := range disks {
+ key := getRackKey(disk)
+ result[key] = append(result[key], disk)
+ }
+ return result
+}
+
+// groupDisksByServer groups disks by their server
+func groupDisksByServer(disks []*DiskCandidate) map[string][]*DiskCandidate {
+ result := make(map[string][]*DiskCandidate)
+ for _, disk := range disks {
+ result[disk.NodeID] = append(result[disk.NodeID], disk)
+ }
+ return result
+}
+
+// getRackKey returns the unique key for a rack (dc:rack)
+func getRackKey(disk *DiskCandidate) string {
+ return fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
+}
+
+// getDiskKey returns the unique key for a disk (nodeID:diskID)
+func getDiskKey(disk *DiskCandidate) string {
+ return fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID)
+}
+
+// sortRacksByServerCount returns rack keys sorted by number of servers (ascending)
+func sortRacksByServerCount(rackToDisks map[string][]*DiskCandidate) []string {
+ // Count unique servers per rack
+ rackServerCount := make(map[string]int)
+ for rackKey, disks := range rackToDisks {
+ servers := make(map[string]bool)
+ for _, disk := range disks {
+ servers[disk.NodeID] = true
+ }
+ rackServerCount[rackKey] = len(servers)
+ }
+
+ keys := getSortedRackKeys(rackToDisks)
+ sort.Slice(keys, func(i, j int) bool {
+ // Sort by server count (descending) to pick from racks with more options first
+ return rackServerCount[keys[i]] > rackServerCount[keys[j]]
+ })
+ return keys
+}
+
+// getSortedRackKeys returns rack keys in a deterministic order
+func getSortedRackKeys(rackToDisks map[string][]*DiskCandidate) []string {
+ keys := make([]string, 0, len(rackToDisks))
+ for k := range rackToDisks {
+ keys = append(keys, k)
+ }
+ sort.Strings(keys)
+ return keys
+}
+
+// selectBestDiskFromRack selects the best disk from a rack for EC placement
+// It prefers servers that haven't been used yet
+func selectBestDiskFromRack(disks []*DiskCandidate, usedServers, usedDisks map[string]bool, config PlacementRequest) *DiskCandidate {
+ var bestDisk *DiskCandidate
+ bestScore := -1.0
+ bestIsFromUnusedServer := false
+
+ for _, disk := range disks {
+ if usedDisks[getDiskKey(disk)] {
+ continue
+ }
+ isFromUnusedServer := !usedServers[disk.NodeID]
+ score := calculateDiskScore(disk)
+
+ // Prefer unused servers
+ if isFromUnusedServer && !bestIsFromUnusedServer {
+ bestDisk = disk
+ bestScore = score
+ bestIsFromUnusedServer = true
+ } else if isFromUnusedServer == bestIsFromUnusedServer && score > bestScore {
+ bestDisk = disk
+ bestScore = score
+ }
+ }
+
+ return bestDisk
+}
+
+// sortDisksByScore returns disks sorted by score (best first)
+func sortDisksByScore(disks []*DiskCandidate) []*DiskCandidate {
+ sorted := make([]*DiskCandidate, len(disks))
+ copy(sorted, disks)
+ sort.Slice(sorted, func(i, j int) bool {
+ return calculateDiskScore(sorted[i]) > calculateDiskScore(sorted[j])
+ })
+ return sorted
+}
+
+// calculateDiskScore calculates a score for a disk candidate
+// Higher score is better
+func calculateDiskScore(disk *DiskCandidate) float64 {
+ score := 0.0
+
+ // Primary factor: available capacity (lower utilization is better)
+ if disk.MaxVolumeCount > 0 {
+ utilization := float64(disk.VolumeCount) / float64(disk.MaxVolumeCount)
+ score += (1.0 - utilization) * 60.0 // Up to 60 points
+ } else {
+ score += 30.0 // Default if no max count
+ }
+
+ // Secondary factor: fewer shards already on this disk is better
+ score += float64(10-disk.ShardCount) * 2.0 // Up to 20 points
+
+ // Tertiary factor: lower load is better
+ score += float64(10 - disk.LoadCount) // Up to 10 points
+
+ return score
+}
+
+// addDiskToResult adds a disk to the result and updates tracking maps
+func addDiskToResult(result *PlacementResult, disk *DiskCandidate,
+ usedDisks, usedServers, usedRacks map[string]bool) {
+ diskKey := getDiskKey(disk)
+ rackKey := getRackKey(disk)
+
+ result.SelectedDisks = append(result.SelectedDisks, disk)
+ usedDisks[diskKey] = true
+ usedServers[disk.NodeID] = true
+ usedRacks[rackKey] = true
+ result.ShardsPerServer[disk.NodeID]++
+ result.ShardsPerRack[rackKey]++
+ result.ShardsPerDC[disk.DataCenter]++
+}
+
+// VerifySpread checks if the placement result meets diversity requirements
+func VerifySpread(result *PlacementResult, minServers, minRacks int) error {
+ if result.ServersUsed < minServers {
+ return fmt.Errorf("only %d servers used, need at least %d", result.ServersUsed, minServers)
+ }
+ if result.RacksUsed < minRacks {
+ return fmt.Errorf("only %d racks used, need at least %d", result.RacksUsed, minRacks)
+ }
+ return nil
+}
+
+// CalculateIdealDistribution returns the ideal number of shards per server
+// when we have a certain number of shards and servers
+func CalculateIdealDistribution(totalShards, numServers int) (min, max int) {
+ if numServers <= 0 {
+ return 0, totalShards
+ }
+ min = totalShards / numServers
+ max = min
+ if totalShards%numServers != 0 {
+ max = min + 1
+ }
+ return
+}
diff --git a/weed/storage/erasure_coding/placement/placement_test.go b/weed/storage/erasure_coding/placement/placement_test.go
new file mode 100644
index 000000000..6cb94a4da
--- /dev/null
+++ b/weed/storage/erasure_coding/placement/placement_test.go
@@ -0,0 +1,517 @@
+package placement
+
+import (
+"testing"
+)
+
+// Helper function to create disk candidates for testing
+func makeDisk(nodeID string, diskID uint32, dc, rack string, freeSlots int) *DiskCandidate {
+ return &DiskCandidate{
+ NodeID: nodeID,
+ DiskID: diskID,
+ DataCenter: dc,
+ Rack: rack,
+ VolumeCount: 0,
+ MaxVolumeCount: 100,
+ ShardCount: 0,
+ FreeSlots: freeSlots,
+ LoadCount: 0,
+ }
+}
+
+func TestSelectDestinations_SingleRack(t *testing.T) {
+ // Test: 3 servers in same rack, each with 2 disks, need 6 shards
+ // Expected: Should spread across all 6 disks (one per disk)
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server3", 0, "dc1", "rack1", 10),
+ makeDisk("server3", 1, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 6,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 6 {
+ t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Verify all 3 servers are used
+ if result.ServersUsed != 3 {
+ t.Errorf("expected 3 servers used, got %d", result.ServersUsed)
+ }
+
+ // Verify each disk is unique
+ diskSet := make(map[string]bool)
+ for _, disk := range result.SelectedDisks {
+ key := getDiskKey(disk)
+ if diskSet[key] {
+ t.Errorf("disk %s selected multiple times", key)
+ }
+ diskSet[key] = true
+ }
+}
+
+func TestSelectDestinations_MultipleRacks(t *testing.T) {
+ // Test: 2 racks with 2 servers each, each server has 2 disks
+ // Need 8 shards
+ // Expected: Should spread across all 8 disks
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server3", 0, "dc1", "rack2", 10),
+ makeDisk("server3", 1, "dc1", "rack2", 10),
+ makeDisk("server4", 0, "dc1", "rack2", 10),
+ makeDisk("server4", 1, "dc1", "rack2", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 8,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 8 {
+ t.Errorf("expected 8 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Verify all 4 servers are used
+ if result.ServersUsed != 4 {
+ t.Errorf("expected 4 servers used, got %d", result.ServersUsed)
+ }
+
+ // Verify both racks are used
+ if result.RacksUsed != 2 {
+ t.Errorf("expected 2 racks used, got %d", result.RacksUsed)
+ }
+}
+
+func TestSelectDestinations_PrefersDifferentServers(t *testing.T) {
+ // Test: 4 servers with 4 disks each, need 4 shards
+ // Expected: Should use one disk from each server
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server1", 2, "dc1", "rack1", 10),
+ makeDisk("server1", 3, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 2, "dc1", "rack1", 10),
+ makeDisk("server2", 3, "dc1", "rack1", 10),
+ makeDisk("server3", 0, "dc1", "rack1", 10),
+ makeDisk("server3", 1, "dc1", "rack1", 10),
+ makeDisk("server3", 2, "dc1", "rack1", 10),
+ makeDisk("server3", 3, "dc1", "rack1", 10),
+ makeDisk("server4", 0, "dc1", "rack1", 10),
+ makeDisk("server4", 1, "dc1", "rack1", 10),
+ makeDisk("server4", 2, "dc1", "rack1", 10),
+ makeDisk("server4", 3, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 4,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 4 {
+ t.Errorf("expected 4 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Verify all 4 servers are used (one shard per server)
+ if result.ServersUsed != 4 {
+ t.Errorf("expected 4 servers used, got %d", result.ServersUsed)
+ }
+
+ // Each server should have exactly 1 shard
+ for server, count := range result.ShardsPerServer {
+ if count != 1 {
+ t.Errorf("server %s has %d shards, expected 1", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_SpilloverToMultipleDisksPerServer(t *testing.T) {
+ // Test: 2 servers with 4 disks each, need 6 shards
+ // Expected: First pick one from each server (2 shards), then one more from each (4 shards),
+ // then fill remaining from any server (6 shards)
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server1", 2, "dc1", "rack1", 10),
+ makeDisk("server1", 3, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 2, "dc1", "rack1", 10),
+ makeDisk("server2", 3, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 6,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 6 {
+ t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Both servers should be used
+ if result.ServersUsed != 2 {
+ t.Errorf("expected 2 servers used, got %d", result.ServersUsed)
+ }
+
+ // Each server should have exactly 3 shards (balanced)
+ for server, count := range result.ShardsPerServer {
+ if count != 3 {
+ t.Errorf("server %s has %d shards, expected 3", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_MaxShardsPerServer(t *testing.T) {
+ // Test: 2 servers with 4 disks each, need 6 shards, max 2 per server
+ // Expected: Should only select 4 shards (2 per server limit)
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server1", 2, "dc1", "rack1", 10),
+ makeDisk("server1", 3, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 2, "dc1", "rack1", 10),
+ makeDisk("server2", 3, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 6,
+ MaxShardsPerServer: 2,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // Should only get 4 shards due to server limit
+ if len(result.SelectedDisks) != 4 {
+ t.Errorf("expected 4 selected disks (limit 2 per server), got %d", len(result.SelectedDisks))
+ }
+
+ // No server should exceed the limit
+ for server, count := range result.ShardsPerServer {
+ if count > 2 {
+ t.Errorf("server %s has %d shards, exceeds limit of 2", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_14ShardsAcross7Servers(t *testing.T) {
+ // Test: Real-world EC scenario - 14 shards across 7 servers with 2 disks each
+ // Expected: Should spread evenly (2 shards per server)
+ var disks []*DiskCandidate
+ for i := 1; i <= 7; i++ {
+ serverID := "server" + string(rune('0'+i))
+ disks = append(disks, makeDisk(serverID, 0, "dc1", "rack1", 10))
+ disks = append(disks, makeDisk(serverID, 1, "dc1", "rack1", 10))
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 14,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 14 {
+ t.Errorf("expected 14 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // All 7 servers should be used
+ if result.ServersUsed != 7 {
+ t.Errorf("expected 7 servers used, got %d", result.ServersUsed)
+ }
+
+ // Each server should have exactly 2 shards
+ for server, count := range result.ShardsPerServer {
+ if count != 2 {
+ t.Errorf("server %s has %d shards, expected 2", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_FewerServersThanShards(t *testing.T) {
+ // Test: Only 3 servers but need 6 shards
+ // Expected: Should distribute evenly (2 per server)
+ disks := []*DiskCandidate{
+ makeDisk("server1", 0, "dc1", "rack1", 10),
+ makeDisk("server1", 1, "dc1", "rack1", 10),
+ makeDisk("server1", 2, "dc1", "rack1", 10),
+ makeDisk("server2", 0, "dc1", "rack1", 10),
+ makeDisk("server2", 1, "dc1", "rack1", 10),
+ makeDisk("server2", 2, "dc1", "rack1", 10),
+ makeDisk("server3", 0, "dc1", "rack1", 10),
+ makeDisk("server3", 1, "dc1", "rack1", 10),
+ makeDisk("server3", 2, "dc1", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 6,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 6 {
+ t.Errorf("expected 6 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // All 3 servers should be used
+ if result.ServersUsed != 3 {
+ t.Errorf("expected 3 servers used, got %d", result.ServersUsed)
+ }
+
+ // Each server should have exactly 2 shards
+ for server, count := range result.ShardsPerServer {
+ if count != 2 {
+ t.Errorf("server %s has %d shards, expected 2", server, count)
+ }
+ }
+}
+
+func TestSelectDestinations_NoSuitableDisks(t *testing.T) {
+ // Test: All disks have no free slots
+ disks := []*DiskCandidate{
+ {NodeID: "server1", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 0},
+ {NodeID: "server2", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 0},
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 4,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ _, err := SelectDestinations(disks, config)
+ if err == nil {
+ t.Error("expected error for no suitable disks, got nil")
+ }
+}
+
+func TestSelectDestinations_EmptyInput(t *testing.T) {
+ config := DefaultPlacementRequest()
+ _, err := SelectDestinations([]*DiskCandidate{}, config)
+ if err == nil {
+ t.Error("expected error for empty input, got nil")
+ }
+}
+
+func TestSelectDestinations_FiltersByLoad(t *testing.T) {
+ // Test: Some disks have too high load
+ disks := []*DiskCandidate{
+ {NodeID: "server1", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 10},
+ {NodeID: "server2", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 2},
+ {NodeID: "server3", DiskID: 0, DataCenter: "dc1", Rack: "rack1", FreeSlots: 10, LoadCount: 1},
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 2,
+ MaxTaskLoad: 5,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // Should only select from server2 and server3 (server1 has too high load)
+ for _, disk := range result.SelectedDisks {
+ if disk.NodeID == "server1" {
+ t.Errorf("disk from server1 should not be selected (load too high)")
+ }
+ }
+}
+
+func TestCalculateDiskScore(t *testing.T) {
+ // Test that score calculation works as expected
+ lowUtilDisk := &DiskCandidate{
+ VolumeCount: 10,
+ MaxVolumeCount: 100,
+ ShardCount: 0,
+ LoadCount: 0,
+ }
+
+ highUtilDisk := &DiskCandidate{
+ VolumeCount: 90,
+ MaxVolumeCount: 100,
+ ShardCount: 5,
+ LoadCount: 5,
+ }
+
+ lowScore := calculateDiskScore(lowUtilDisk)
+ highScore := calculateDiskScore(highUtilDisk)
+
+ if lowScore <= highScore {
+ t.Errorf("low utilization disk should have higher score: low=%f, high=%f", lowScore, highScore)
+ }
+}
+
+func TestCalculateIdealDistribution(t *testing.T) {
+ tests := []struct {
+ totalShards int
+ numServers int
+ expectedMin int
+ expectedMax int
+ }{
+ {14, 7, 2, 2}, // Even distribution
+ {14, 4, 3, 4}, // Uneven: 14/4 = 3 remainder 2
+ {6, 3, 2, 2}, // Even distribution
+ {7, 3, 2, 3}, // Uneven: 7/3 = 2 remainder 1
+ {10, 0, 0, 10}, // Edge case: no servers
+ {0, 5, 0, 0}, // Edge case: no shards
+ }
+
+ for _, tt := range tests {
+ min, max := CalculateIdealDistribution(tt.totalShards, tt.numServers)
+ if min != tt.expectedMin || max != tt.expectedMax {
+ t.Errorf("CalculateIdealDistribution(%d, %d) = (%d, %d), want (%d, %d)",
+tt.totalShards, tt.numServers, min, max, tt.expectedMin, tt.expectedMax)
+ }
+ }
+}
+
+func TestVerifySpread(t *testing.T) {
+ result := &PlacementResult{
+ ServersUsed: 3,
+ RacksUsed: 2,
+ }
+
+ // Should pass
+ if err := VerifySpread(result, 3, 2); err != nil {
+ t.Errorf("unexpected error: %v", err)
+ }
+
+ // Should fail - not enough servers
+ if err := VerifySpread(result, 4, 2); err == nil {
+ t.Error("expected error for insufficient servers")
+ }
+
+ // Should fail - not enough racks
+ if err := VerifySpread(result, 3, 3); err == nil {
+ t.Error("expected error for insufficient racks")
+ }
+}
+
+func TestSelectDestinations_MultiDC(t *testing.T) {
+ // Test: 2 DCs, each with 2 racks, each rack has 2 servers
+ disks := []*DiskCandidate{
+ // DC1, Rack1
+ makeDisk("dc1-r1-s1", 0, "dc1", "rack1", 10),
+ makeDisk("dc1-r1-s1", 1, "dc1", "rack1", 10),
+ makeDisk("dc1-r1-s2", 0, "dc1", "rack1", 10),
+ makeDisk("dc1-r1-s2", 1, "dc1", "rack1", 10),
+ // DC1, Rack2
+ makeDisk("dc1-r2-s1", 0, "dc1", "rack2", 10),
+ makeDisk("dc1-r2-s1", 1, "dc1", "rack2", 10),
+ makeDisk("dc1-r2-s2", 0, "dc1", "rack2", 10),
+ makeDisk("dc1-r2-s2", 1, "dc1", "rack2", 10),
+ // DC2, Rack1
+ makeDisk("dc2-r1-s1", 0, "dc2", "rack1", 10),
+ makeDisk("dc2-r1-s1", 1, "dc2", "rack1", 10),
+ makeDisk("dc2-r1-s2", 0, "dc2", "rack1", 10),
+ makeDisk("dc2-r1-s2", 1, "dc2", "rack1", 10),
+ // DC2, Rack2
+ makeDisk("dc2-r2-s1", 0, "dc2", "rack2", 10),
+ makeDisk("dc2-r2-s1", 1, "dc2", "rack2", 10),
+ makeDisk("dc2-r2-s2", 0, "dc2", "rack2", 10),
+ makeDisk("dc2-r2-s2", 1, "dc2", "rack2", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 8,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ if len(result.SelectedDisks) != 8 {
+ t.Errorf("expected 8 selected disks, got %d", len(result.SelectedDisks))
+ }
+
+ // Should use all 4 racks
+ if result.RacksUsed != 4 {
+ t.Errorf("expected 4 racks used, got %d", result.RacksUsed)
+ }
+
+ // Should use both DCs
+ if result.DCsUsed != 2 {
+ t.Errorf("expected 2 DCs used, got %d", result.DCsUsed)
+ }
+}
+
+func TestSelectDestinations_SameRackDifferentDC(t *testing.T) {
+ // Test: Same rack name in different DCs should be treated as different racks
+ disks := []*DiskCandidate{
+ makeDisk("dc1-s1", 0, "dc1", "rack1", 10),
+ makeDisk("dc2-s1", 0, "dc2", "rack1", 10),
+ }
+
+ config := PlacementRequest{
+ ShardsNeeded: 2,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
+
+ result, err := SelectDestinations(disks, config)
+ if err != nil {
+ t.Fatalf("unexpected error: %v", err)
+ }
+
+ // Should use 2 racks (dc1:rack1 and dc2:rack1 are different)
+ if result.RacksUsed != 2 {
+ t.Errorf("expected 2 racks used (different DCs), got %d", result.RacksUsed)
+ }
+}
diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go
index cd74bed33..c5568fe26 100644
--- a/weed/worker/tasks/erasure_coding/detection.go
+++ b/weed/worker/tasks/erasure_coding/detection.go
@@ -9,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
)
@@ -429,85 +430,100 @@ func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.Era
}
// selectBestECDestinations selects multiple disks for EC shard placement with diversity
+// Uses the consolidated placement package for proper rack/server/disk spreading
func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo {
if len(disks) == 0 {
return nil
}
- // Group disks by rack and DC for diversity
- rackGroups := make(map[string][]*topology.DiskInfo)
- for _, disk := range disks {
- rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
- rackGroups[rackKey] = append(rackGroups[rackKey], disk)
+ // Convert topology.DiskInfo to placement.DiskCandidate
+ candidates := diskInfosToCandidates(disks)
+ if len(candidates) == 0 {
+ return nil
}
- var selected []*topology.DiskInfo
- usedRacks := make(map[string]bool)
+ // Configure placement for EC shards
+ config := placement.PlacementRequest{
+ ShardsNeeded: shardsNeeded,
+ MaxShardsPerServer: 0, // No hard limit, but prefer spreading
+ MaxShardsPerRack: 0, // No hard limit, but prefer spreading
+ MaxTaskLoad: topology.MaxTaskLoadForECPlacement,
+ PreferDifferentServers: true,
+ PreferDifferentRacks: true,
+ }
- // First pass: select one disk from each rack for maximum diversity
- for rackKey, rackDisks := range rackGroups {
- if len(selected) >= shardsNeeded {
- break
- }
+ // Use the shared placement algorithm
+ result, err := placement.SelectDestinations(candidates, config)
+ if err != nil {
+ glog.V(2).Infof("EC placement failed: %v", err)
+ return nil
+ }
- // Select best disk from this rack
- bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC)
- if bestDisk != nil {
- selected = append(selected, bestDisk)
- usedRacks[rackKey] = true
+ // Convert back to topology.DiskInfo
+ return candidatesToDiskInfos(result.SelectedDisks, disks)
+}
+
+// diskInfosToCandidates converts topology.DiskInfo slice to placement.DiskCandidate slice
+func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidate {
+ var candidates []*placement.DiskCandidate
+ for _, disk := range disks {
+ if disk.DiskInfo == nil {
+ continue
}
- }
- // Second pass: if we need more disks, select from racks we've already used
- if len(selected) < shardsNeeded {
- for _, disk := range disks {
- if len(selected) >= shardsNeeded {
- break
- }
+ // Calculate free slots (using default max if not set)
+ freeSlots := int(disk.DiskInfo.MaxVolumeCount - disk.DiskInfo.VolumeCount)
+ if freeSlots < 0 {
+ freeSlots = 0
+ }
- // Skip if already selected
- alreadySelected := false
- for _, sel := range selected {
- if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID {
- alreadySelected = true
- break
+ // Calculate EC shard count for this specific disk
+ // EcShardInfos contains all shards, so we need to filter by DiskId and sum actual shard counts
+ ecShardCount := 0
+ if disk.DiskInfo.EcShardInfos != nil {
+ for _, shardInfo := range disk.DiskInfo.EcShardInfos {
+ if shardInfo.DiskId == disk.DiskID {
+ ecShardCount += erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIdCount()
}
}
-
- if !alreadySelected && isDiskSuitableForEC(disk) {
- selected = append(selected, disk)
- }
}
- }
- return selected
+ candidates = append(candidates, &placement.DiskCandidate{
+ NodeID: disk.NodeID,
+ DiskID: disk.DiskID,
+ DataCenter: disk.DataCenter,
+ Rack: disk.Rack,
+ VolumeCount: disk.DiskInfo.VolumeCount,
+ MaxVolumeCount: disk.DiskInfo.MaxVolumeCount,
+ ShardCount: ecShardCount,
+ FreeSlots: freeSlots,
+ LoadCount: disk.LoadCount,
+ })
+ }
+ return candidates
}
-// selectBestFromRack selects the best disk from a rack for EC placement
-func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo {
- if len(disks) == 0 {
- return nil
+// candidatesToDiskInfos converts placement results back to topology.DiskInfo
+func candidatesToDiskInfos(candidates []*placement.DiskCandidate, originalDisks []*topology.DiskInfo) []*topology.DiskInfo {
+ // Create a map for quick lookup
+ diskMap := make(map[string]*topology.DiskInfo)
+ for _, disk := range originalDisks {
+ key := fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID)
+ diskMap[key] = disk
}
- var bestDisk *topology.DiskInfo
- bestScore := -1.0
-
- for _, disk := range disks {
- if !isDiskSuitableForEC(disk) {
- continue
- }
-
- score := calculateECScore(disk, sourceRack, sourceDC)
- if score > bestScore {
- bestScore = score
- bestDisk = disk
+ var result []*topology.DiskInfo
+ for _, candidate := range candidates {
+ key := fmt.Sprintf("%s:%d", candidate.NodeID, candidate.DiskID)
+ if disk, ok := diskMap[key]; ok {
+ result = append(result, disk)
}
}
-
- return bestDisk
+ return result
}
// calculateECScore calculates placement score for EC operations
+// Used for logging and plan metadata
func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 {
if disk.DiskInfo == nil {
return 0.0
@@ -524,14 +540,12 @@ func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) floa
// Consider current load (secondary factor)
score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
- // Note: We don't penalize placing shards on the same rack/DC as source
- // since the original volume will be deleted after EC conversion.
- // This allows for better network efficiency and storage utilization.
-
return score
}
// isDiskSuitableForEC checks if a disk is suitable for EC placement
+// Note: This is kept for backward compatibility but the placement package
+// handles filtering internally
func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
if disk.DiskInfo == nil {
return false