diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2021-02-23 13:41:30 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-02-23 13:41:30 +0800 |
| commit | 620b91f23eaf5718088dc9ddcf91540967d0c8a6 (patch) | |
| tree | 04e92a8f92b548e26080040d009f23a51d9cc521 /weed/shell/command_volume_balance.go | |
| parent | 690d7c10b826b53bf823faef76603cd6ad83aa1d (diff) | |
| parent | 90cdf9dcace5595b31104df3a3b7e4038a7db341 (diff) | |
| download | seaweedfs-620b91f23eaf5718088dc9ddcf91540967d0c8a6.tar.xz seaweedfs-620b91f23eaf5718088dc9ddcf91540967d0c8a6.zip | |
Merge pull request #73 from chrislusf/master
sync
Diffstat (limited to 'weed/shell/command_volume_balance.go')
| -rw-r--r-- | weed/shell/command_volume_balance.go | 125 |
1 files changed, 92 insertions, 33 deletions
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 928dec02a..e0c41f310 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -1,10 +1,10 @@ package shell import ( - "context" "flag" "fmt" "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "os" "sort" @@ -74,17 +74,15 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return nil } - var resp *master_pb.VolumeListResponse - err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { - resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) - return err - }) + // collect topology information + topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv) if err != nil { return err } - volumeServers := collectVolumeServersByDc(resp.TopologyInfo, *dc) - volumeReplicas, _ := collectVolumeReplicaLocations(resp) + volumeServers := collectVolumeServersByDc(topologyInfo, *dc) + volumeReplicas, _ := collectVolumeReplicaLocations(topologyInfo) + diskTypes := collectVolumeDiskTypes(topologyInfo) if *collection == "EACH_COLLECTION" { collections, err := ListCollectionNames(commandEnv, true, false) @@ -92,16 +90,16 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return err } for _, c := range collections { - if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { + if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { return err } } } else if *collection == "ALL_COLLECTIONS" { - if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { + if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { return err } } else { - if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { + if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, volumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { return err } } @@ -109,7 +107,18 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return nil } -func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { +func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { + + for _, diskType := range diskTypes { + if err := balanceVolumeServersByDiskType(commandEnv, diskType, volumeReplicas, nodes, volumeSizeLimit, collection, applyBalancing); err != nil { + return err + } + } + return nil + +} + +func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { // balance writable volumes for _, n := range nodes { @@ -119,10 +128,10 @@ func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*V return false } } - return !v.ReadOnly && v.Size < volumeSizeLimit + return v.DiskType == string(diskType) && (!v.ReadOnly && v.Size < volumeSizeLimit) }) } - if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortWritableVolumes, applyBalancing); err != nil { + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortWritableVolumes, applyBalancing); err != nil { return err } @@ -134,10 +143,10 @@ func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*V return false } } - return v.ReadOnly || v.Size >= volumeSizeLimit + return v.DiskType == string(diskType) && (v.ReadOnly || v.Size >= volumeSizeLimit) }) } - if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortReadOnlyVolumes, applyBalancing); err != nil { + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortReadOnlyVolumes, applyBalancing); err != nil { return err } @@ -162,6 +171,25 @@ func collectVolumeServersByDc(t *master_pb.TopologyInfo, selectedDataCenter stri return } +func collectVolumeDiskTypes(t *master_pb.TopologyInfo) (diskTypes []types.DiskType) { + knownTypes := make(map[string]bool) + for _, dc := range t.DataCenterInfos { + for _, r := range dc.RackInfos { + for _, dn := range r.DataNodeInfos { + for diskType, _ := range dn.DiskInfos { + if _, found := knownTypes[diskType]; !found { + knownTypes[diskType] = true + } + } + } + } + } + for diskType, _ := range knownTypes { + diskTypes = append(diskTypes, types.ToDiskType(diskType)) + } + return +} + type Node struct { info *master_pb.DataNodeInfo selectedVolumes map[uint32]*master_pb.VolumeInformationMessage @@ -169,19 +197,43 @@ type Node struct { rack string } -func (n *Node) localVolumeRatio() float64 { - return divide(len(n.selectedVolumes), int(n.info.MaxVolumeCount)) +type CapacityFunc func(*master_pb.DataNodeInfo) int + +func capacityByMaxVolumeCount(diskType types.DiskType) CapacityFunc { + return func(info *master_pb.DataNodeInfo) int { + diskInfo, found := info.DiskInfos[string(diskType)] + if !found { + return 0 + } + return int(diskInfo.MaxVolumeCount) + } } -func (n *Node) localVolumeNextRatio() float64 { - return divide(len(n.selectedVolumes)+1, int(n.info.MaxVolumeCount)) +func capacityByFreeVolumeCount(diskType types.DiskType) CapacityFunc { + return func(info *master_pb.DataNodeInfo) int { + diskInfo, found := info.DiskInfos[string(diskType)] + if !found { + return 0 + } + return int(diskInfo.MaxVolumeCount - diskInfo.VolumeCount) + } +} + +func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 { + return divide(len(n.selectedVolumes), capacityFunc(n.info)) +} + +func (n *Node) localVolumeNextRatio(capacityFunc CapacityFunc) float64 { + return divide(len(n.selectedVolumes)+1, capacityFunc(n.info)) } func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) { n.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage) - for _, v := range n.info.VolumeInfos { - if fn(v) { - n.selectedVolumes[v.Id] = v + for _, diskInfo := range n.info.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if fn(v) { + n.selectedVolumes[v.Id] = v + } } } } @@ -198,33 +250,40 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) { }) } -func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) { +func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, capacityFunc CapacityFunc, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) { selectedVolumeCount, volumeMaxCount := 0, 0 + var nodesWithCapacity []*Node for _, dn := range nodes { selectedVolumeCount += len(dn.selectedVolumes) - volumeMaxCount += int(dn.info.MaxVolumeCount) + capacity := capacityFunc(dn.info) + if capacity > 0 { + nodesWithCapacity = append(nodesWithCapacity, dn) + } + volumeMaxCount += capacity } idealVolumeRatio := divide(selectedVolumeCount, volumeMaxCount) hasMoved := true + // fmt.Fprintf(os.Stdout, " total %d volumes, max %d volumes, idealVolumeRatio %f\n", selectedVolumeCount, volumeMaxCount, idealVolumeRatio) + for hasMoved { hasMoved = false - sort.Slice(nodes, func(i, j int) bool { - return nodes[i].localVolumeRatio() < nodes[j].localVolumeRatio() + sort.Slice(nodesWithCapacity, func(i, j int) bool { + return nodesWithCapacity[i].localVolumeRatio(capacityFunc) < nodesWithCapacity[j].localVolumeRatio(capacityFunc) }) - fullNode := nodes[len(nodes)-1] + fullNode := nodesWithCapacity[len(nodesWithCapacity)-1] var candidateVolumes []*master_pb.VolumeInformationMessage for _, v := range fullNode.selectedVolumes { candidateVolumes = append(candidateVolumes, v) } sortCandidatesFn(candidateVolumes) - for i := 0; i < len(nodes)-1; i++ { - emptyNode := nodes[i] - if !(fullNode.localVolumeRatio() > idealVolumeRatio && emptyNode.localVolumeNextRatio() <= idealVolumeRatio) { + for i := 0; i < len(nodesWithCapacity)-1; i++ { + emptyNode := nodesWithCapacity[i] + if !(fullNode.localVolumeRatio(capacityFunc) > idealVolumeRatio && emptyNode.localVolumeNextRatio(capacityFunc) <= idealVolumeRatio) { // no more volume servers with empty slots break } @@ -279,9 +338,9 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f if v.Collection == "" { collectionPrefix = "" } - fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) + fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) if applyChange { - return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second) + return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, v.DiskType) } return nil } |
