diff options
| author | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2022-07-12 13:47:21 +0500 |
|---|---|---|
| committer | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2022-07-12 13:47:21 +0500 |
| commit | 8372721a62f2809ce99aaaf31f4bad5bbf2d99b1 (patch) | |
| tree | df8cbc45150dc6d1da63f4ccf7776d0eb9539042 | |
| parent | ee95d23a22d55a12662c1ebc8e2292b61f505bb0 (diff) | |
| download | seaweedfs-8372721a62f2809ce99aaaf31f4bad5bbf2d99b1.tar.xz seaweedfs-8372721a62f2809ce99aaaf31f4bad5bbf2d99b1.zip | |
update topologyInfo
| -rw-r--r-- | weed/shell/command_volume_server_evacuate.go | 32 |
1 files changed, 21 insertions, 11 deletions
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index dad8d8626..195cc2699 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -11,6 +11,7 @@ import ( "golang.org/x/exp/slices" "io" "os" + "time" ) func init() { @@ -18,6 +19,7 @@ func init() { } type commandVolumeServerEvacuate struct { + topologyInfo *master_pb.TopologyInfo targetServer string volumeRack string } @@ -58,12 +60,12 @@ func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, } infoAboutSimulationMode(writer, *applyChange, "-force") - if err = commandEnv.confirmIsLocked(args); err != nil { + if err = commandEnv.confirmIsLocked(args); err != nil && *applyChange { return } - if *volumeServer == "" { - return fmt.Errorf("need to specify volume server by -node=<host>:<port>") + if *volumeServer == "" && *volumeRack == "" { + return fmt.Errorf("need to specify volume server by -node=<host>:<port> or source rack") } if *targetServer != "" { c.targetServer = *targetServer @@ -88,25 +90,33 @@ func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEn // list all the volumes // collect topology information - topologyInfo, _, err := collectTopologyInfo(commandEnv, 0) + c.topologyInfo, _, err = collectTopologyInfo(commandEnv, 0) if err != nil { return err } - if err := c.evacuateNormalVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil { + go func() { + for { + if topologyInfo, _, err := collectTopologyInfo(commandEnv, 5*time.Minute); err != nil { + c.topologyInfo = topologyInfo + } + } + }() + + if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err } - if err := c.evacuateEcVolumes(commandEnv, topologyInfo, volumeServer, skipNonMoveable, applyChange, writer); err != nil { + if err := c.evacuateEcVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil { return err } return nil } -func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { +func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this volume server - volumeServers := collectVolumeServersByDc(topologyInfo, "") + volumeServers := collectVolumeServersByDc(c.topologyInfo, "") thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster", volumeServer) @@ -115,7 +125,7 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE // move away normal volumes for _, thisNode := range thisNodes { for _, diskInfo := range thisNode.info.DiskInfos { - volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) + volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo) for _, vol := range diskInfo.VolumeInfos { hasMoved, err := c.moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) if err != nil { @@ -136,9 +146,9 @@ func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandE return nil } -func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { +func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error { // find this ec volume server - ecNodes, _ := collectEcVolumeServersByDc(topologyInfo, "") + ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "") thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer) if len(thisNodes) == 0 { return fmt.Errorf("%s is not found in this cluster\n", volumeServer) |
