aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2024-12-02 17:44:07 +0100
committerGitHub <noreply@github.com>2024-12-02 08:44:07 -0800
commitb2ba7d7408500bb450884a288327a1fb53e67fae (patch)
treeb16eac42446ecd69bf5c2a8b3a57afb49b0d644b /weed
parent9a741a61b195ccdf9815ab64dc9f0725f620f836 (diff)
downloadseaweedfs-b2ba7d7408500bb450884a288327a1fb53e67fae.tar.xz
seaweedfs-b2ba7d7408500bb450884a288327a1fb53e67fae.zip
Resolve replica placement for EC volumes from master server defaults. (#6303)
Diffstat (limited to 'weed')
-rw-r--r--weed/shell/command_ec_balance.go11
-rw-r--r--weed/shell/command_ec_common.go67
-rw-r--r--weed/shell/command_ec_common_test.go53
3 files changed, 103 insertions, 28 deletions
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 6004ad7e4..24dea3e2d 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -17,10 +17,11 @@ func (c *commandEcBalance) Name() string {
return "ec.balance"
}
+// TODO: Update help string and move to command_ec_common.go once shard replica placement logic is enabled.
func (c *commandEcBalance) Help() string {
return `balance all ec shards among all racks and volume servers
- ec.balance [-c EACH_COLLECTION|<collection_name>] [-force] [-dataCenter <data_center>]
+ ec.balance [-c EACH_COLLECTION|<collection_name>] [-force] [-dataCenter <data_center>] [-shardReplicaPlacement <replica_placement>]
Algorithm:
@@ -100,6 +101,7 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
+ shardReplicaPlacement := balanceCommand.String("shardReplicaPlacement", "", "replica placement for EC shards, or master default if empty (currently unused)")
applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan")
if err = balanceCommand.Parse(args); err != nil {
return nil
@@ -121,5 +123,10 @@ func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.W
}
fmt.Printf("balanceEcVolumes collections %+v\n", len(collections))
- return EcBalance(commandEnv, collections, *dc, *applyBalancing)
+ rp, err := parseReplicaPlacementArg(commandEnv, *shardReplicaPlacement)
+ if err != nil {
+ return err
+ }
+
+ return EcBalance(commandEnv, collections, *dc, rp, *applyBalancing)
}
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 3b4a0ff25..906e2e0dc 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -39,6 +39,42 @@ type EcRack struct {
freeEcSlot int
}
+var (
+ // Overridable functions for testing.
+ getDefaultReplicaPlacement = _getDefaultReplicaPlacement
+)
+
+func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
+ var resp *master_pb.GetMasterConfigurationResponse
+ var err error
+
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ return err
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
+}
+func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) {
+ if replicaStr != "" {
+ rp, err := super_block.NewReplicaPlacementFromString(replicaStr)
+ if err == nil {
+ fmt.Printf("using replica placement %q for EC volumes\n", rp.String())
+ }
+ return rp, err
+ }
+
+ // No replica placement argument provided, resolve from master default settings.
+ rp, err := getDefaultReplicaPlacement(commandEnv)
+ if err == nil {
+ fmt.Printf("using master default replica placement %q for EC volumes\n", rp.String())
+ }
+ return rp, err
+}
+
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
if !commandEnv.isLocked() {
@@ -840,15 +876,19 @@ func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needl
return vidLocations
}
-// TODO: EC volumes have no replica placement info :( Maybe rely on the master's default?
-func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_block.ReplicaPlacement, error) {
+// TODO: EC volumes have no topology replica placement info :( We need a better solution to resolve topology, and balancing, for those.
+func volumeIdToReplicaPlacement(commandEnv *CommandEnv, vid needle.VolumeId, nodes []*EcNode, ecReplicaPlacement *super_block.ReplicaPlacement) (*super_block.ReplicaPlacement, error) {
for _, ecNode := range nodes {
for _, diskInfo := range ecNode.info.DiskInfos {
for _, volumeInfo := range diskInfo.VolumeInfos {
- if needle.VolumeId(volumeInfo.Id) != vid {
- continue
+ if needle.VolumeId(volumeInfo.Id) == vid {
+ return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
+ }
+ }
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ if needle.VolumeId(ecShardInfo.Id) == vid {
+ return ecReplicaPlacement, nil
}
- return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
}
}
}
@@ -856,22 +896,7 @@ func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_bl
return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid)
}
-func getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
- var resp *master_pb.GetMasterConfigurationResponse
- var err error
-
- err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
- resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
- return err
- })
- if err != nil {
- return nil, err
- }
-
- return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
-}
-
-func EcBalance(commandEnv *CommandEnv, collections []string, dc string, applyBalancing bool) (err error) {
+func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, applyBalancing bool) (err error) {
if len(collections) == 0 {
return fmt.Errorf("no collections to balance")
}
diff --git a/weed/shell/command_ec_common_test.go b/weed/shell/command_ec_common_test.go
index 08e4a41c7..76609c89d 100644
--- a/weed/shell/command_ec_common_test.go
+++ b/weed/shell/command_ec_common_test.go
@@ -32,6 +32,40 @@ func errorCheck(got error, want string) error {
}
return nil
}
+func TestParseReplicaPlacementArg(t *testing.T) {
+ getDefaultReplicaPlacementOrig := getDefaultReplicaPlacement
+ getDefaultReplicaPlacement = func(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
+ return super_block.NewReplicaPlacementFromString("123")
+ }
+ defer func() {
+ getDefaultReplicaPlacement = getDefaultReplicaPlacementOrig
+ }()
+
+ testCases := []struct {
+ argument string
+ want string
+ wantErr string
+ }{
+ {"lalala", "lal", "unexpected replication type"},
+ {"", "123", ""},
+ {"021", "021", ""},
+ }
+
+ for _, tc := range testCases {
+ commandEnv := &CommandEnv{}
+ got, gotErr := parseReplicaPlacementArg(commandEnv, tc.argument)
+
+ if err := errorCheck(gotErr, tc.wantErr); err != nil {
+ t.Errorf("argument %q: %s", tc.argument, err.Error())
+ continue
+ }
+
+ want, _ := super_block.NewReplicaPlacementFromString(tc.want)
+ if !got.Equals(want) {
+ t.Errorf("got replica placement %q, want %q", got.String(), want.String())
+ }
+ }
+}
func TestEcDistribution(t *testing.T) {
@@ -55,26 +89,35 @@ func TestEcDistribution(t *testing.T) {
}
func TestVolumeIdToReplicaPlacement(t *testing.T) {
+ ecReplicaPlacement, _ := super_block.NewReplicaPlacementFromString("123")
+
testCases := []struct {
topology *master_pb.TopologyInfo
vid string
want string
wantErr string
}{
- {topology1, "", "", "failed to resolve replica placement for volume ID 0"},
- {topology1, "0", "", "failed to resolve replica placement for volume ID 0"},
+ {topology1, "", "", "failed to resolve replica placement"},
+ {topology1, "0", "", "failed to resolve replica placement"},
{topology1, "1", "100", ""},
{topology1, "296", "100", ""},
- {topology2, "", "", "failed to resolve replica placement for volume ID 0"},
- {topology2, "19012", "", "failed to resolve replica placement for volume ID 19012"},
+ {topology2, "", "", "failed to resolve replica placement"},
+ {topology2, "19012", "", "failed to resolve replica placement"},
{topology2, "6271", "002", ""},
{topology2, "17932", "002", ""},
+ {topologyEc, "", "", "failed to resolve replica placement"},
+ {topologyEc, "0", "", "failed to resolve replica placement"},
+ {topologyEc, "6225", "002", ""},
+ {topologyEc, "6241", "002", ""},
+ {topologyEc, "9577", "123", ""}, // EC volume
+ {topologyEc, "12737", "123", ""}, // EC volume
}
for _, tc := range testCases {
+ commandEnv := &CommandEnv{}
vid, _ := needle.NewVolumeId(tc.vid)
ecNodes, _ := collectEcVolumeServersByDc(tc.topology, "")
- got, gotErr := volumeIdToReplicaPlacement(vid, ecNodes)
+ got, gotErr := volumeIdToReplicaPlacement(commandEnv, vid, ecNodes, ecReplicaPlacement)
if err := errorCheck(gotErr, tc.wantErr); err != nil {
t.Errorf("volume %q: %s", tc.vid, err.Error())