aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_ec_balance.go68
-rw-r--r--weed/shell/command_ec_common.go126
-rw-r--r--weed/shell/command_ec_common_test.go1
-rw-r--r--weed/shell/command_ec_decode.go20
-rw-r--r--weed/shell/command_ec_encode.go73
-rw-r--r--weed/shell/command_ec_rebuild.go4
6 files changed, 156 insertions, 136 deletions
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 81d214fcc..9e27f40ce 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -24,73 +24,7 @@ func (c *commandEcBalance) Help() string {
ec.balance [-c EACH_COLLECTION|<collection_name>] [-force] [-dataCenter <data_center>] [-shardReplicaPlacement <replica_placement>]
Algorithm:
-
- func EcBalance() {
- for each collection:
- balanceEcVolumes(collectionName)
- for each rack:
- balanceEcRack(rack)
- }
-
- func balanceEcVolumes(collectionName){
- for each volume:
- doDeduplicateEcShards(volumeId)
-
- tracks rack~shardCount mapping
- for each volume:
- doBalanceEcShardsAcrossRacks(volumeId)
-
- for each volume:
- doBalanceEcShardsWithinRacks(volumeId)
- }
-
- // spread ec shards into more racks
- func doBalanceEcShardsAcrossRacks(volumeId){
- tracks rack~volumeIdShardCount mapping
- averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
- ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
- for each ecShardsToMove {
- destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement)
- destVolumeServers = volume servers on the destRack
- pickOneEcNodeAndMoveOneShard(destVolumeServers)
- }
- }
-
- func doBalanceEcShardsWithinRacks(volumeId){
- racks = collect all racks that the volume id is on
- for rack, shards := range racks
- doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
- }
-
- // move ec shards
- func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
- tracks volumeServer~volumeIdShardCount mapping
- averageShardCount = len(shards) / numVolumeServers
- volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
- ecShardsToMove = select overflown ec shards from volumeServersOverAverage
- for each ecShardsToMove {
- destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement)
- pickOneEcNodeAndMoveOneShard(destVolumeServers)
- }
- }
-
- // move ec shards while keeping shard distribution for the same volume unchanged or more even
- func balanceEcRack(rack){
- averageShardCount = total shards / numVolumeServers
- for hasMovedOneEcShard {
- sort all volume servers ordered by the number of local ec shards
- pick the volume server A with the lowest number of ec shards x
- pick the volume server B with the highest number of ec shards y
- if y > averageShardCount and x +1 <= averageShardCount {
- if B has a ec shard with volume id v that A does not have {
- move one ec shard v from B to A
- hasMovedOneEcShard = true
- }
- }
- }
- }
-
-`
+ ` + ecBalanceAlgorithmDescription
}
func (c *commandEcBalance) HasTag(CommandTag) bool {
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 625674bfd..4b672b2ef 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/rand/v2"
+ "time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
@@ -40,6 +41,72 @@ type EcRack struct {
}
var (
+ ecBalanceAlgorithmDescription = `
+ func EcBalance() {
+ for each collection:
+ balanceEcVolumes(collectionName)
+ for each rack:
+ balanceEcRack(rack)
+ }
+
+ func balanceEcVolumes(collectionName){
+ for each volume:
+ doDeduplicateEcShards(volumeId)
+
+ tracks rack~shardCount mapping
+ for each volume:
+ doBalanceEcShardsAcrossRacks(volumeId)
+
+ for each volume:
+ doBalanceEcShardsWithinRacks(volumeId)
+ }
+
+ // spread ec shards into more racks
+ func doBalanceEcShardsAcrossRacks(volumeId){
+ tracks rack~volumeIdShardCount mapping
+ averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
+ ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
+ for each ecShardsToMove {
+ destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement)
+ destVolumeServers = volume servers on the destRack
+ pickOneEcNodeAndMoveOneShard(destVolumeServers)
+ }
+ }
+
+ func doBalanceEcShardsWithinRacks(volumeId){
+ racks = collect all racks that the volume id is on
+ for rack, shards := range racks
+ doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
+ }
+
+ // move ec shards
+ func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
+ tracks volumeServer~volumeIdShardCount mapping
+ averageShardCount = len(shards) / numVolumeServers
+ volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
+ ecShardsToMove = select overflown ec shards from volumeServersOverAverage
+ for each ecShardsToMove {
+ destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement)
+ pickOneEcNodeAndMoveOneShard(destVolumeServers)
+ }
+ }
+
+ // move ec shards while keeping shard distribution for the same volume unchanged or more even
+ func balanceEcRack(rack){
+ averageShardCount = total shards / numVolumeServers
+ for hasMovedOneEcShard {
+ sort all volume servers ordered by the number of local ec shards
+ pick the volume server A with the lowest number of ec shards x
+ pick the volume server B with the highest number of ec shards y
+ if y > averageShardCount and x +1 <= averageShardCount {
+ if B has a ec shard with volume id v that A does not have {
+ move one ec shard v from B to A
+ hasMovedOneEcShard = true
+ }
+ }
+ }
+ }
+ `
// Overridable functions for testing.
getDefaultReplicaPlacement = _getDefaultReplicaPlacement
)
@@ -58,6 +125,7 @@ func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPl
return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
}
+
func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) {
if replicaStr != "" {
rp, err := super_block.NewReplicaPlacementFromString(replicaStr)
@@ -75,6 +143,45 @@ func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super
return rp, err
}
+func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
+
+ if delayBeforeCollecting > 0 {
+ time.Sleep(delayBeforeCollecting)
+ }
+
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return
+ }
+
+ return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
+
+}
+
+func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
+ // list all possible locations
+ // collect topology information
+ topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
+ if err != nil {
+ return
+ }
+
+ // find out all volume servers with one slot left.
+ ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
+
+ sortEcNodesByFreeslotsDescending(ecNodes)
+
+ return
+}
+
+func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
+ return collectEcNodesForDC(commandEnv, "")
+}
+
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
if !commandEnv.isLocked() {
@@ -243,22 +350,6 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int {
return 0
}
-func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
- // list all possible locations
- // collect topology information
- topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
- if err != nil {
- return
- }
-
- // find out all volume servers with one slot left.
- ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
-
- sortEcNodesByFreeslotsDescending(ecNodes)
-
- return
-}
-
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if selectedDataCenter != "" && selectedDataCenter != string(dc) {
@@ -523,7 +614,6 @@ func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int
})
}
-// TODO: Maybe remove averages constraints? We don't need those anymore now that we're properly balancing shards.
func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error {
racks := ecb.racks()
@@ -876,7 +966,7 @@ func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplic
}
// collect all ec nodes
- allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, dc)
+ allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc)
if err != nil {
return err
}
diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go
index 963706894..e9c9b1d99 100644
--- a/weed/shell/command_ec_common_test.go
+++ b/weed/shell/command_ec_common_test.go
@@ -32,6 +32,7 @@ func errorCheck(got error, want string) error {
}
return nil
}
+
func TestParseReplicaPlacementArg(t *testing.T) {
getDefaultReplicaPlacementOrig := getDefaultReplicaPlacement
getDefaultReplicaPlacement = func(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index d3e985a2f..673a9a4f2 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -5,7 +5,6 @@ import (
"flag"
"fmt"
"io"
- "time"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
@@ -241,25 +240,6 @@ func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocati
return resp.VolumeIdLocations, nil
}
-func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
-
- if delayBeforeCollecting > 0 {
- time.Sleep(delayBeforeCollecting)
- }
-
- var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
- return err
- })
- if err != nil {
- return
- }
-
- return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
-
-}
-
func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
vidMap := make(map[uint32]bool)
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 0b60694fc..a4f1eec54 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -42,7 +42,7 @@ func (c *commandEcEncode) Help() string {
This command will:
1. freeze one volume
2. apply erasure coding to the volume
- 3. move the encoded shards to multiple volume servers
+ 3. (optionally) re-balance encoded shards across multiple volume servers
The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford
to lose 4 volume servers.
@@ -53,7 +53,8 @@ func (c *commandEcEncode) Help() string {
If you only have less than 4 volume servers, with erasure coding, at least you can afford to
have 4 corrupted shard files.
-`
+ Re-balancing algorithm:
+ ` + ecBalanceAlgorithmDescription
}
func (c *commandEcEncode) HasTag(CommandTag) bool {
@@ -67,15 +68,22 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
collection := encodeCommand.String("collection", "", "the collection name")
fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period")
- parallelCopy := encodeCommand.Bool("parallelCopy", true, "copy shards in parallel")
+ // TODO: Add concurrency support to EcBalance and reenable this switch?
+ //parallelCopy := encodeCommand.Bool("parallelCopy", true, "copy shards in parallel")
forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes")
+ shardReplicaPlacement := encodeCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty")
+ applyBalancing := encodeCommand.Bool("rebalance", false, "re-balance EC shards after creation")
+
if err = encodeCommand.Parse(args); err != nil {
return nil
}
-
if err = commandEnv.confirmIsLocked(args); err != nil {
return
}
+ rp, err := parseReplicaPlacementArg(commandEnv, *shardReplicaPlacement)
+ if err != nil {
+ return err
+ }
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
@@ -94,29 +102,44 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
}
}
- vid := needle.VolumeId(*volumeId)
-
- // volumeId is provided
- if vid != 0 {
- return doEcEncode(commandEnv, *collection, vid, *parallelCopy)
+ var volumeIds []needle.VolumeId
+ if vid := needle.VolumeId(*volumeId); vid != 0 {
+ // volumeId is provided
+ volumeIds = append(volumeIds, vid)
+ } else {
+ // apply to all volumes in the collection
+ volumeIds, err = collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
+ if err != nil {
+ return err
+ }
}
- // apply to all volumes in the collection
- volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
- if err != nil {
- return err
+ var collections []string
+ if *collection != "" {
+ collections = []string{*collection}
+ } else {
+ // TODO: should we limit this to collections associated with the provided volume ID?
+ collections, err = ListCollectionNames(commandEnv, false, true)
+ if err != nil {
+ return err
+ }
}
- fmt.Printf("ec encode volumes: %v\n", volumeIds)
+
+ // encode all requested volumes...
for _, vid := range volumeIds {
- if err = doEcEncode(commandEnv, *collection, vid, *parallelCopy); err != nil {
- return err
+ if err = doEcEncode(commandEnv, *collection, vid); err != nil {
+ return fmt.Errorf("ec encode for volume %d: %v", vid, err)
}
}
+ // ...then re-balance ec shards.
+ if err := EcBalance(commandEnv, collections, "", rp, *applyBalancing); err != nil {
+ return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", collections, err)
+ }
return nil
}
-func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelCopy bool) (err error) {
+func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) error {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
}
@@ -130,23 +153,15 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
// fmt.Printf("found ec %d shards on %v\n", vid, locations)
// mark the volume as readonly
- err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false)
- if err != nil {
+ if err := markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false); err != nil {
return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
}
// generate ec shards
- err = generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].ServerAddress())
- if err != nil {
+ if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].ServerAddress()); err != nil {
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
}
- // balance the ec shards to current cluster
- err = spreadEcShards(commandEnv, vid, collection, locations, parallelCopy)
- if err != nil {
- return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err)
- }
-
return nil
}
@@ -166,9 +181,10 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId,
}
+// TODO: delete this (now unused) shard spread logic.
func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location, parallelCopy bool) (err error) {
- allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "")
+ allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv)
if err != nil {
return err
}
@@ -296,7 +312,6 @@ func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) {
}
func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
-
// collect topology information
topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
index b761ea676..8cae77434 100644
--- a/weed/shell/command_ec_rebuild.go
+++ b/weed/shell/command_ec_rebuild.go
@@ -4,10 +4,10 @@ import (
"context"
"flag"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"io"
"github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
@@ -74,7 +74,7 @@ func (c *commandEcRebuild) Do(args []string, commandEnv *CommandEnv, writer io.W
}
// collect all ec nodes
- allEcNodes, _, err := collectEcNodes(commandEnv, "")
+ allEcNodes, _, err := collectEcNodes(commandEnv)
if err != nil {
return err
}