aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-07-12 13:47:21 +0500
committerKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-07-12 13:47:21 +0500
commit8372721a62f2809ce99aaaf31f4bad5bbf2d99b1 (patch)
treedf8cbc45150dc6d1da63f4ccf7776d0eb9539042
parentee95d23a22d55a12662c1ebc8e2292b61f505bb0 (diff)
downloadseaweedfs-8372721a62f2809ce99aaaf31f4bad5bbf2d99b1.tar.xz
seaweedfs-8372721a62f2809ce99aaaf31f4bad5bbf2d99b1.zip
update topologyInfo
-rw-r--r--weed/shell/command_volume_server_evacuate.go32
1 files changed, 21 insertions, 11 deletions
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go
index dad8d8626..195cc2699 100644
--- a/weed/shell/command_volume_server_evacuate.go
+++ b/weed/shell/command_volume_server_evacuate.go
@@ -11,6 +11,7 @@ import (
"golang.org/x/exp/slices"
"io"
"os"
+ "time"
)
func init() {
@@ -18,6 +19,7 @@ func init() {
}
type commandVolumeServerEvacuate struct {
+ topologyInfo *master_pb.TopologyInfo
targetServer string
volumeRack string
}
@@ -58,12 +60,12 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv,
}
infoAboutSimulationMode(writer, *applyChange, "-force")
- if err = commandEnv.confirmIsLocked(args); err != nil {
+ if err = commandEnv.confirmIsLocked(args); err != nil && *applyChange {
return
}
- if *volumeServer == "" {
- return fmt.Errorf("need to specify volume server by -node=<host>:<port>")
+ if *volumeServer == "" && *volumeRack == "" {
+ return fmt.Errorf("need to specify volume server by -node=<host>:<port> or source rack")
}
if *targetServer != "" {
c.targetServer = *targetServer
@@ -88,25 +90,33 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn
// list all the volumes
// collect topology information
- topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
+ c.topologyInfo, _, err = collectTopologyInfo(commandEnv, 0)
if err != nil {
return err
}
- if err := c.evacuateNormalVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
+ go func() {
+ for {
+ if topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Minute); err != nil {
+ c.topologyInfo = topologyInfo
+ }
+ }
+ }()
+
+ if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err
}
- if err := c.evacuateEcVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
+ if err := c.evacuateEcVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
return err
}
return nil
}
-func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
+func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this volume server
- volumeServers := collectVolumeServersByDc(topologyInfo, "")
+ volumeServers := collectVolumeServersByDc(c.topologyInfo, "")
thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer)
if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster", volumeServer)
@@ -115,7 +125,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
// move away normal volumes
for _, thisNode := range thisNodes {
for _, diskInfo := range thisNode.info.DiskInfos {
- volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo)
+ volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo)
for _, vol := range diskInfo.VolumeInfos {
hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
if err != nil {
@@ -136,9 +146,9 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE
return nil
}
-func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
+func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
// find this ec volume server
- ecNodes, _ := collectEcVolumeServersByDc(topologyInfo, "")
+ ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "")
thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
if len(thisNodes) == 0 {
return fmt.Errorf("%s is not found in this cluster\n", volumeServer)