diff options
Diffstat (limited to 'weed/shell/command_volume_balance.go')
| -rw-r--r-- | weed/shell/command_volume_balance.go | 63 |
1 files changed, 51 insertions, 12 deletions
diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 005236806..4f96c3e08 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "io" "os" "sort" @@ -83,6 +84,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer } typeToNodes := collectVolumeServersByType(resp.TopologyInfo, *dc) + volumeReplicas, _ := collectVolumeReplicaLocations(resp) for maxVolumeCount, volumeServers := range typeToNodes { if len(volumeServers) < 2 { @@ -95,16 +97,16 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return err } for _, c := range collections { - if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { + if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { return err } } } else if *collection == "ALL_COLLECTIONS" { - if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { + if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { return err } } else { - if err = balanceVolumeServers(commandEnv, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { + if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { return err } } @@ -113,7 +115,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return nil } -func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { +func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { // balance writable volumes for _, n := range nodes { @@ -126,7 +128,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit return !v.ReadOnly && v.Size < volumeSizeLimit }) } - if err := balanceSelectedVolume(commandEnv, nodes, sortWritableVolumes, applyBalancing); err != nil { + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortWritableVolumes, applyBalancing); err != nil { return err } @@ -141,7 +143,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, nodes []*Node, volumeSizeLimit return v.ReadOnly || v.Size >= volumeSizeLimit }) } - if err := balanceSelectedVolume(commandEnv, nodes, sortReadOnlyVolumes, applyBalancing); err != nil { + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortReadOnlyVolumes, applyBalancing); err != nil { return err } @@ -186,7 +188,7 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) { }) } -func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) { +func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) { selectedVolumeCount := 0 for _, dn := range nodes { selectedVolumeCount += len(dn.selectedVolumes) @@ -216,7 +218,7 @@ func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidates // no more volume servers with empty slots break } - hasMoved, err = attemptToMoveOneVolume(commandEnv, fullNode, candidateVolumes, emptyNode, applyBalancing) + hasMoved, err = attemptToMoveOneVolume(commandEnv, volumeReplicas, fullNode, candidateVolumes, emptyNode, applyBalancing) if err != nil { return } @@ -229,13 +231,12 @@ func balanceSelectedVolume(commandEnv *CommandEnv, nodes []*Node, sortCandidates return nil } -func attemptToMoveOneVolume(commandEnv *CommandEnv, fullNode *Node, candidateVolumes []*master_pb.VolumeInformationMessage, emptyNode *Node, applyBalancing bool) (hasMoved bool, err error) { +func attemptToMoveOneVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, fullNode *Node, candidateVolumes []*master_pb.VolumeInformationMessage, emptyNode *Node, applyBalancing bool) (hasMoved bool, err error) { for _, v := range candidateVolumes { if v.ReplicaPlacement > 0 { - if fullNode.dc != emptyNode.dc && fullNode.rack != emptyNode.rack { - // TODO this logic is too simple, but should work most of the time - // Need a correct algorithm to handle all different cases + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(v.ReplicaPlacement)) + if !isGoodMove(replicaPlacement, volumeReplicas[v.Id], fullNode, emptyNode) { continue } } @@ -273,3 +274,41 @@ func (node *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) b } } } + +func isGoodMove(placement *super_block.ReplicaPlacement, existingReplicas []*VolumeReplica, sourceNode, targetNode *Node) bool { + for _, replica := range existingReplicas { + if replica.location.dataNode.Id == targetNode.info.Id && + replica.location.rack == targetNode.rack && + replica.location.dc == targetNode.dc { + // never move to existing nodes + return false + } + } + dcs, racks := make(map[string]bool), make(map[string]int) + for _, replica := range existingReplicas { + if replica.location.dataNode.Id != sourceNode.info.Id { + dcs[replica.location.DataCenter()] = true + racks[replica.location.Rack()]++ + } + } + + dcs[targetNode.dc] = true + racks[fmt.Sprintf("%s %s", targetNode.dc, targetNode.rack)]++ + + if len(dcs) > placement.DiffDataCenterCount+1 { + return false + } + + if len(racks) > placement.DiffRackCount+1 { + return false + } + + for _, sameRackCount := range racks { + if sameRackCount > placement.SameRackCount+1 { + return false + } + } + + return true + +} |
