diff options
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 21 |
1 files changed, 9 insertions, 12 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 7b2eb6769..538351fd0 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -64,18 +64,15 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, takeAction := !*skipChange - var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - return err - }) + // collect topology information + topologyInfo, _, err := collectTopologyInfo(commandEnv) if err != nil { return err } // find all volumes that needs replication // collect all data nodes - volumeReplicas, allLocations := collectVolumeReplicaLocations(resp) + volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo) if len(allLocations) == 0 { return fmt.Errorf("no data nodes at all") @@ -107,10 +104,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } -func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint32][]*VolumeReplica, []location) { +func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) { volumeReplicas := make(map[uint32][]*VolumeReplica) var allLocations []location - eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { loc := newLocation(dc, string(rack), dn) for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { @@ -165,10 +162,10 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) foundNewLocation := false hasSkippedCollection := false - keepDataNodesSorted(allLocations, replica.info.DiskType) + keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType)) + fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) for _, dst := range allLocations { // check whether data nodes satisfy the constraints - fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { // check collection name pattern if *c.collectionPattern != "" { @@ -219,8 +216,8 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm return nil } -func keepDataNodesSorted(dataNodes []location, diskType string) { - fn := capacityByFreeVolumeCount(types.ToDiskType(diskType)) +func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) { + fn := capacityByFreeVolumeCount(diskType) sort.Slice(dataNodes, func(i, j int) bool { return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode) }) |
