diff options
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/shell/command_volume_server_evacuate.go | 45 |
1 files changed, 37 insertions, 8 deletions
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 6135eb3eb..fce88d2c4 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -4,7 +4,6 @@ import ( "flag" "fmt" "io" - "os" "slices" @@ -158,6 +157,9 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this ec volume server + // We collect topology once at the start and track capacity changes ourselves + // (via freeEcSlot decrement after each move) rather than repeatedly refreshing, + // which would give a false sense of correctness since topology could be stale. ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "") thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer) if len(thisNodes) == 0 { @@ -168,9 +170,9 @@ func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, for _, thisNode := range thisNodes { for _, diskInfo := range thisNode.info.DiskInfos { for _, ecShardInfo := range diskInfo.EcShardInfos { - hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange) + hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange, writer) if err != nil { - fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err) + fmt.Fprintf(writer, "move away volume %d from %s: %v\n", ecShardInfo.Id, volumeServer, err) } if !hasMoved { if skipNonMoveable { @@ -185,14 +187,31 @@ func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, return nil } -func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) { +func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool, writer io.Writer) (hasMoved bool, err error) { for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() { + // Sort by: 1) fewest shards of this volume, 2) most free EC slots + // This ensures we prefer nodes with capacity and balanced shard distribution slices.SortFunc(otherNodes, func(a, b *EcNode) int { - return a.localShardIdCount(ecShardInfo.Id) - b.localShardIdCount(ecShardInfo.Id) + aShards := a.localShardIdCount(ecShardInfo.Id) + bShards := b.localShardIdCount(ecShardInfo.Id) + if aShards != bShards { + return aShards - bShards // Prefer fewer shards + } + return b.freeEcSlot - a.freeEcSlot // Then prefer more free slots }) + + shardMoved := false + skippedNodes := 0 for i := 0; i < len(otherNodes); i++ { emptyNode := otherNodes[i] + + // Skip nodes with no free EC slots + if emptyNode.freeEcSlot <= 0 { + skippedNodes++ + continue + } + collectionPrefix := "" if ecShardInfo.Collection != "" { collectionPrefix = ecShardInfo.Collection + "_" @@ -200,19 +219,29 @@ func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv vid := needle.VolumeId(ecShardInfo.Id) destDiskId := pickBestDiskOnNode(emptyNode, vid) if destDiskId > 0 { - fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId) + fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s (disk %d)\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id, destDiskId) } else { - fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id) + fmt.Fprintf(writer, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id) } err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, vid, shardId, emptyNode, destDiskId, applyChange) if err != nil { + hasMoved = false return } else { hasMoved = true + shardMoved = true + // Update the node's free slot count after successful move + emptyNode.freeEcSlot-- break } } - if !hasMoved { + if !shardMoved { + if skippedNodes > 0 { + fmt.Fprintf(writer, "no available destination for ec shard %d.%d: %d nodes have no free slots\n", + ecShardInfo.Id, shardId, skippedNodes) + } + // Ensure partial moves are reported as failures to prevent data loss + hasMoved = false return } } |
