diff options
Diffstat (limited to 'weed/shell/command_volume_balance.go')
| -rw-r--r-- | weed/shell/command_volume_balance.go | 49 |
1 files changed, 28 insertions, 21 deletions
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index d7ef0d005..349f52f1c 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -27,7 +27,7 @@ func (c *commandVolumeBalance) Name() string { func (c *commandVolumeBalance) Help() string { return `balance all volumes among volume servers - volume.balance [-c ALL|EACH_COLLECTION|<collection_name>] [-force] [-dataCenter=<data_center_name>] + volume.balance [-collection ALL|EACH_COLLECTION|<collection_name>] [-force] [-dataCenter=<data_center_name>] Algorithm: @@ -69,9 +69,8 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer } var resp *master_pb.VolumeListResponse - ctx := context.Background() - err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) return err }) if err != nil { @@ -79,8 +78,10 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer } typeToNodes := collectVolumeServersByType(resp.TopologyInfo, *dc) - for _, volumeServers := range typeToNodes { + + for maxVolumeCount, volumeServers := range typeToNodes { if len(volumeServers) < 2 { + fmt.Printf("only 1 node is configured max %d volumes, skipping balancing\n", maxVolumeCount) continue } if *collection == "EACH_COLLECTION" { @@ -93,8 +94,8 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return err } } - } else if *collection == "ALL" { - if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL", *applyBalancing); err != nil { + } else if *collection == "ALL_COLLECTIONS" { + if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { return err } } else { @@ -107,18 +108,12 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return nil } -func balanceVolumeServers(commandEnv *CommandEnv, dataNodeInfos []*master_pb.DataNodeInfo, volumeSizeLimit uint64, collection string, applyBalancing bool) error { - var nodes []*Node - for _, dn := range dataNodeInfos { - nodes = append(nodes, &Node{ - info: dn, - }) - } +func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { // balance writable volumes for _, n := range nodes { n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { - if collection != "ALL" { + if collection != "ALL_COLLECTIONS" { if v.Collection != collection { return false } @@ -133,7 +128,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, dataNodeInfos []*master_pb.Dat // balance readable volumes for _, n := range nodes { n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { - if collection != "ALL" { + if collection != "ALL_COLLECTIONS" { if v.Collection != collection { return false } @@ -148,15 +143,19 @@ func balanceVolumeServers(commandEnv *CommandEnv, dataNodeInfos []*master_pb.Dat return nil } -func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*master_pb.DataNodeInfo) { - typeToNodes = make(map[uint64][]*master_pb.DataNodeInfo) +func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter string) (typeToNodes map[uint64][]*Node) { + typeToNodes = make(map[uint64][]*Node) for _, dc := range t.DataCenterInfos { if selectedDataCenter != "" && dc.Id != selectedDataCenter { continue } for _, r := range dc.RackInfos { for _, dn := range r.DataNodeInfos { - typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], dn) + typeToNodes[dn.MaxVolumeCount] = append(typeToNodes[dn.MaxVolumeCount], &Node{ + info: dn, + dc: dc.Id, + rack: r.Id, + }) } } } @@ -166,6 +165,8 @@ func collectVolumeServersByType(t *master_pb.TopologyInfo, selectedDataCenter st type Node struct { info *master_pb.DataNodeInfo selectedVolumes map[uint32]*master_pb.VolumeInformationMessage + dc string + rack string } func sortWritableVolumes(volumes []*master_pb.VolumeInformationMessage) { @@ -207,6 +208,13 @@ func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidates sortCandidatesFn(candidateVolumes) for _, v := range candidateVolumes { + if v.ReplicaPlacement > 0 { + if fullNode.dc != emptyNode.dc && fullNode.rack != emptyNode.rack { + // TODO this logic is too simple, but should work most of the time + // Need a correct algorithm to handle all different cases + continue + } + } if _, found := emptyNode.selectedVolumes[v.Id]; !found { if err := moveVolume(commandEnv, v, fullNode, emptyNode, applyBalancing); err == nil { delete(fullNode.selectedVolumes, v.Id) @@ -230,8 +238,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f } fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) if applyBalancing { - ctx := context.Background() - return LiveMoveVolume(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second) + return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second) } return nil } |
