aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_volume_balance.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_volume_balance.go')
-rw-r--r--weed/shell/command_volume_balance.go113
1 files changed, 88 insertions, 25 deletions
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go
index 928dec02a..a1e3c1ab6 100644
--- a/weed/shell/command_volume_balance.go
+++ b/weed/shell/command_volume_balance.go
@@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
"os"
"sort"
@@ -85,6 +86,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
volumeServers := collectVolumeServersByDc(resp.TopologyInfo, *dc)
volumeReplicas, _ := collectVolumeReplicaLocations(resp)
+ diskTypes := collectVolumeDiskTypes(resp.TopologyInfo)
if *collection == "EACH_COLLECTION" {
collections, err := ListCollectionNames(commandEnv, true, false)
@@ -92,16 +94,16 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return err
}
for _, c := range collections {
- if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
+ if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil {
return err
}
}
} else if *collection == "ALL_COLLECTIONS" {
- if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
+ if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil {
return err
}
} else {
- if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
+ if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil {
return err
}
}
@@ -109,7 +111,18 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer
return nil
}
-func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
+func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
+
+ for _, diskType := range diskTypes {
+ if err := balanceVolumeServersByDiskType(commandEnv, diskType, volumeReplicas, nodes, volumeSizeLimit, collection, applyBalancing); err != nil {
+ return err
+ }
+ }
+ return nil
+
+}
+
+func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error {
// balance writable volumes
for _, n := range nodes {
@@ -119,10 +132,10 @@ func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*V
return false
}
}
- return !v.ReadOnly && v.Size < volumeSizeLimit
+ return v.DiskType == string(diskType) && (!v.ReadOnly && v.Size < volumeSizeLimit)
})
}
- if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortWritableVolumes, applyBalancing); err != nil {
+ if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortWritableVolumes, applyBalancing); err != nil {
return err
}
@@ -134,10 +147,10 @@ func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*V
return false
}
}
- return v.ReadOnly || v.Size >= volumeSizeLimit
+ return v.DiskType == string(diskType) && (v.ReadOnly || v.Size >= volumeSizeLimit)
})
}
- if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortReadOnlyVolumes, applyBalancing); err != nil {
+ if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortReadOnlyVolumes, applyBalancing); err != nil {
return err
}
@@ -162,6 +175,25 @@ func collectVolumeServersByDc(t *master_pb.TopologyInfo, selectedDataCenter stri
return
}
+func collectVolumeDiskTypes(t *master_pb.TopologyInfo) (diskTypes []types.DiskType) {
+ knownTypes := make(map[string]bool)
+ for _, dc := range t.DataCenterInfos {
+ for _, r := range dc.RackInfos {
+ for _, dn := range r.DataNodeInfos {
+ for diskType, _ := range dn.DiskInfos {
+ if _, found := knownTypes[diskType]; !found {
+ knownTypes[diskType] = true
+ }
+ }
+ }
+ }
+ }
+ for diskType, _ := range knownTypes {
+ diskTypes = append(diskTypes, types.ToDiskType(diskType))
+ }
+ return
+}
+
type Node struct {
info *master_pb.DataNodeInfo
selectedVolumes map[uint32]*master_pb.VolumeInformationMessage
@@ -169,19 +201,43 @@ type Node struct {
rack string
}
-func (n *Node) localVolumeRatio() float64 {
- return divide(len(n.selectedVolumes), int(n.info.MaxVolumeCount))
+type CapacityFunc func(*master_pb.DataNodeInfo) int
+
+func capacityByMaxVolumeCount(diskType types.DiskType) CapacityFunc {
+ return func(info *master_pb.DataNodeInfo) int {
+ diskInfo, found := info.DiskInfos[string(diskType)]
+ if !found {
+ return 0
+ }
+ return int(diskInfo.MaxVolumeCount)
+ }
+}
+
+func capacityByFreeVolumeCount(diskType types.DiskType) CapacityFunc {
+ return func(info *master_pb.DataNodeInfo) int {
+ diskInfo, found := info.DiskInfos[string(diskType)]
+ if !found {
+ return 0
+ }
+ return int(diskInfo.MaxVolumeCount - diskInfo.VolumeCount)
+ }
+}
+
+func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 {
+ return divide(len(n.selectedVolumes), capacityFunc(n.info))
}
-func (n *Node) localVolumeNextRatio() float64 {
- return divide(len(n.selectedVolumes)+1, int(n.info.MaxVolumeCount))
+func (n *Node) localVolumeNextRatio(capacityFunc CapacityFunc) float64 {
+ return divide(len(n.selectedVolumes)+1, capacityFunc(n.info))
}
func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) {
n.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage)
- for _, v := range n.info.VolumeInfos {
- if fn(v) {
- n.selectedVolumes[v.Id] = v
+ for _, diskInfo := range n.info.DiskInfos {
+ for _, v := range diskInfo.VolumeInfos {
+ if fn(v) {
+ n.selectedVolumes[v.Id] = v
+ }
}
}
}
@@ -198,33 +254,40 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) {
})
}
-func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) {
+func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, capacityFunc CapacityFunc, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) {
selectedVolumeCount, volumeMaxCount := 0, 0
+ var nodesWithCapacity []*Node
for _, dn := range nodes {
selectedVolumeCount += len(dn.selectedVolumes)
- volumeMaxCount += int(dn.info.MaxVolumeCount)
+ capacity := capacityFunc(dn.info)
+ if capacity > 0 {
+ nodesWithCapacity = append(nodesWithCapacity, dn)
+ }
+ volumeMaxCount += capacity
}
idealVolumeRatio := divide(selectedVolumeCount, volumeMaxCount)
hasMoved := true
+ // fmt.Fprintf(os.Stdout, " total %d volumes, max %d volumes, idealVolumeRatio %f\n", selectedVolumeCount, volumeMaxCount, idealVolumeRatio)
+
for hasMoved {
hasMoved = false
- sort.Slice(nodes, func(i, j int) bool {
- return nodes[i].localVolumeRatio() < nodes[j].localVolumeRatio()
+ sort.Slice(nodesWithCapacity, func(i, j int) bool {
+ return nodesWithCapacity[i].localVolumeRatio(capacityFunc) < nodesWithCapacity[j].localVolumeRatio(capacityFunc)
})
- fullNode := nodes[len(nodes)-1]
+ fullNode := nodesWithCapacity[len(nodesWithCapacity)-1]
var candidateVolumes []*master_pb.VolumeInformationMessage
for _, v := range fullNode.selectedVolumes {
candidateVolumes = append(candidateVolumes, v)
}
sortCandidatesFn(candidateVolumes)
- for i := 0; i < len(nodes)-1; i++ {
- emptyNode := nodes[i]
- if !(fullNode.localVolumeRatio() > idealVolumeRatio && emptyNode.localVolumeNextRatio() <= idealVolumeRatio) {
+ for i := 0; i < len(nodesWithCapacity)-1; i++ {
+ emptyNode := nodesWithCapacity[i]
+ if !(fullNode.localVolumeRatio(capacityFunc) > idealVolumeRatio && emptyNode.localVolumeNextRatio(capacityFunc) <= idealVolumeRatio) {
// no more volume servers with empty slots
break
}
@@ -279,9 +342,9 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f
if v.Collection == "" {
collectionPrefix = ""
}
- fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
+ fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id)
if applyChange {
- return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second)
+ return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, v.DiskType)
}
return nil
}