aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_volume_server_evacuate.go45
1 files changed, 37 insertions, 8 deletions
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go
index 6135eb3eb..fce88d2c4 100644
--- a/weed/shell/command_volume_server_evacuate.go
+++ b/weed/shell/command_volume_server_evacuate.go
@@ -4,7 +4,6 @@ import (
"flag"
"fmt"
"io"
- "os"
"slices"
@@ -158,6 +157,9 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this ec volume server
+ // We collect topology once at the start and track capacity changes ourselves
+ // (via freeEcSlot decrement after each move) rather than repeatedly refreshing,
+ // which would give a false sense of correctness since topology could be stale.
ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "")
thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
if len(thisNodes) == 0 {
@@ -168,9 +170,9 @@ func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv,
for _, thisNode := range thisNodes {
for _, diskInfo := range thisNode.info.DiskInfos {
for _, ecShardInfo := range diskInfo.EcShardInfos {
- hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange)
+ hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange, writer)
if err != nil {
- fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err)
+ fmt.Fprintf(writer, "move away volume %d from %s: %v\n", ecShardInfo.Id, volumeServer, err)
}
if !hasMoved {
if skipNonMoveable {
@@ -185,14 +187,31 @@ func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv,
return nil
}
-func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) {
+func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool, writer io.Writer) (hasMoved bool, err error) {
for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
+ // Sort by: 1) fewest shards of this volume, 2) most free EC slots
+ // This ensures we prefer nodes with capacity and balanced shard distribution
slices.SortFunc(otherNodes, func(a, b *EcNode) int {
- return a.localShardIdCount(ecShardInfo.Id) - b.localShardIdCount(ecShardInfo.Id)
+ aShards := a.localShardIdCount(ecShardInfo.Id)
+ bShards := b.localShardIdCount(ecShardInfo.Id)
+ if aShards != bShards {
+ return aShards - bShards // Prefer fewer shards
+ }
+ return b.freeEcSlot - a.freeEcSlot // Then prefer more free slots
})
+
+ shardMoved := false
+ skippedNodes := 0
for i := 0; i < len(otherNodes); i++ {
emptyNode := otherNodes[i]
+
+ // Skip nodes with no free EC slots
+ if emptyNode.freeEcSlot <= 0 {
+ skippedNodes++
+ continue
+ }
+
collectionPrefix := ""
if ecShardInfo.Collection != "" {
collectionPrefix = ecShardInfo.Collection + "_"
@@ -200,19 +219,29 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv
vid := needle.VolumeId(ecShardInfo.Id)
destDiskId := pickBestDiskOnNode(emptyNode, vid)
if destDiskId > 0 {
- fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId)
+ fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId)
} else {
- fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
+ fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
}
err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange)
if err != nil {
+ hasMoved = false
return
} else {
hasMoved = true
+ shardMoved = true
+ // Update the node's free slot count after successful move
+ emptyNode.freeEcSlot--
break
}
}
- if !hasMoved {
+ if !shardMoved {
+ if skippedNodes > 0 {
+ fmt.Fprintf(writer, "no available destination for ec shard %d.%d: %d nodes have no free slots\n",
+ ecShardInfo.Id, shardId, skippedNodes)
+ }
+ // Ensure partial moves are reported as failures to prevent data loss
+ hasMoved = false
return
}
}