diff options
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_fs_configure.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_volume_balance.go | 89 | ||||
| -rw-r--r-- | weed/shell/command_volume_server_evacuate.go | 2 |
3 files changed, 73 insertions, 20 deletions
diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go index 06ae15c9e..a891c1b47 100644 --- a/weed/shell/command_fs_configure.go +++ b/weed/shell/command_fs_configure.go @@ -52,6 +52,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io collection := fsConfigureCommand.String("collection", "", "assign writes to this collection") replication := fsConfigureCommand.String("replication", "", "assign writes with this replication") ttl := fsConfigureCommand.String("ttl", "", "assign writes with this ttl") + diskType := fsConfigureCommand.String("disk", "", "[hdd|ssd] choose between hard drive or solid state drive") fsync := fsConfigureCommand.Bool("fsync", false, "fsync for the writes") volumeGrowthCount := fsConfigureCommand.Int("volumeGrowthCount", 0, "the number of physical volumes to add if no writable volumes") isDelete := fsConfigureCommand.Bool("delete", false, "delete the configuration by locationPrefix") @@ -81,6 +82,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io Replication: *replication, Ttl: *ttl, Fsync: *fsync, + DiskType: *diskType, VolumeGrowthCount: uint32(*volumeGrowthCount), } diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 928dec02a..a57e07827 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/super_block" "io" "os" @@ -111,7 +112,8 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { - // balance writable volumes + // balance writable hdd volumes + // fmt.Fprintf(os.Stdout, "\nbalance collection %s writable hdd volumes\n", collection) for _, n := range nodes { n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { if collection != "ALL_COLLECTIONS" { @@ -119,14 +121,15 @@ func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*V return false } } - return !v.ReadOnly && v.Size < volumeSizeLimit + return v.DiskType == string(storage.HardDriveType) && (!v.ReadOnly && v.Size < volumeSizeLimit) }) } - if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortWritableVolumes, applyBalancing); err != nil { + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount, sortWritableVolumes, applyBalancing); err != nil { return err } - // balance readable volumes + // balance readable hdd volumes + // fmt.Fprintf(os.Stdout, "\nbalance collection %s readable hdd volumes\n", collection) for _, n := range nodes { n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { if collection != "ALL_COLLECTIONS" { @@ -134,10 +137,42 @@ func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*V return false } } - return v.ReadOnly || v.Size >= volumeSizeLimit + return v.DiskType == string(storage.HardDriveType) && (v.ReadOnly || v.Size >= volumeSizeLimit) }) } - if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortReadOnlyVolumes, applyBalancing); err != nil { + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount, sortReadOnlyVolumes, applyBalancing); err != nil { + return err + } + + // balance writable ssd volumes + // fmt.Fprintf(os.Stdout, "\nbalance collection %s writable ssd volumes\n", collection) + for _, n := range nodes { + n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { + if collection != "ALL_COLLECTIONS" { + if v.Collection != collection { + return false + } + } + return v.DiskType == string(storage.SsdType) && (!v.ReadOnly && v.Size < volumeSizeLimit) + }) + } + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxSsdVolumeCount, sortWritableVolumes, applyBalancing); err != nil { + return err + } + + // balance readable ssd volumes + // fmt.Fprintf(os.Stdout, "\nbalance collection %s readable ssd volumes\n", collection) + for _, n := range nodes { + n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool { + if collection != "ALL_COLLECTIONS" { + if v.Collection != collection { + return false + } + } + return v.DiskType == string(storage.SsdType) && (v.ReadOnly || v.Size >= volumeSizeLimit) + }) + } + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxSsdVolumeCount, sortReadOnlyVolumes, applyBalancing); err != nil { return err } @@ -169,12 +204,21 @@ type Node struct { rack string } -func (n *Node) localVolumeRatio() float64 { - return divide(len(n.selectedVolumes), int(n.info.MaxVolumeCount)) +type CapacityFunc func(*master_pb.DataNodeInfo) int + +func capacityByMaxSsdVolumeCount(info *master_pb.DataNodeInfo) int { + return int(info.MaxSsdVolumeCount) +} +func capacityByMaxVolumeCount(info *master_pb.DataNodeInfo) int { + return int(info.MaxVolumeCount) } -func (n *Node) localVolumeNextRatio() float64 { - return divide(len(n.selectedVolumes)+1, int(n.info.MaxVolumeCount)) +func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 { + return divide(len(n.selectedVolumes), capacityFunc(n.info)) +} + +func (n *Node) localVolumeNextRatio(capacityFunc CapacityFunc) float64 { + return divide(len(n.selectedVolumes)+1, capacityFunc(n.info)) } func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) { @@ -198,33 +242,40 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) { }) } -func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) { +func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, capacityFunc CapacityFunc, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) { selectedVolumeCount, volumeMaxCount := 0, 0 + var nodesWithCapacity []*Node for _, dn := range nodes { selectedVolumeCount += len(dn.selectedVolumes) - volumeMaxCount += int(dn.info.MaxVolumeCount) + capacity := capacityFunc(dn.info) + if capacity > 0 { + nodesWithCapacity = append(nodesWithCapacity, dn) + } + volumeMaxCount += capacity } idealVolumeRatio := divide(selectedVolumeCount, volumeMaxCount) hasMoved := true + // fmt.Fprintf(os.Stdout, " total %d volumes, max %d volumes, idealVolumeRatio %f\n", selectedVolumeCount, volumeMaxCount, idealVolumeRatio) + for hasMoved { hasMoved = false - sort.Slice(nodes, func(i, j int) bool { - return nodes[i].localVolumeRatio() < nodes[j].localVolumeRatio() + sort.Slice(nodesWithCapacity, func(i, j int) bool { + return nodesWithCapacity[i].localVolumeRatio(capacityFunc) < nodesWithCapacity[j].localVolumeRatio(capacityFunc) }) - fullNode := nodes[len(nodes)-1] + fullNode := nodesWithCapacity[len(nodesWithCapacity)-1] var candidateVolumes []*master_pb.VolumeInformationMessage for _, v := range fullNode.selectedVolumes { candidateVolumes = append(candidateVolumes, v) } sortCandidatesFn(candidateVolumes) - for i := 0; i < len(nodes)-1; i++ { - emptyNode := nodes[i] - if !(fullNode.localVolumeRatio() > idealVolumeRatio && emptyNode.localVolumeNextRatio() <= idealVolumeRatio) { + for i := 0; i < len(nodesWithCapacity)-1; i++ { + emptyNode := nodesWithCapacity[i] + if !(fullNode.localVolumeRatio(capacityFunc) > idealVolumeRatio && emptyNode.localVolumeNextRatio(capacityFunc) <= idealVolumeRatio) { // no more volume servers with empty slots break } @@ -279,7 +330,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f if v.Collection == "" { collectionPrefix = "" } - fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) + fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) if applyChange { return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second) } diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index a82454cd3..1539ccb19 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -175,7 +175,7 @@ func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEc func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { sort.Slice(otherNodes, func(i, j int) bool { - return otherNodes[i].localVolumeRatio() < otherNodes[j].localVolumeRatio() + return otherNodes[i].localVolumeRatio(capacityByMaxVolumeCount)+otherNodes[i].localVolumeRatio(capacityByMaxSsdVolumeCount) < otherNodes[j].localVolumeRatio(capacityByMaxVolumeCount)+otherNodes[j].localVolumeRatio(capacityByMaxSsdVolumeCount) }) for i := 0; i < len(otherNodes); i++ { |
