aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2021-10-01 12:05:51 -0700
committerGitHub <noreply@github.com>2021-10-01 12:05:51 -0700
commit88e006bc6d7a3019067ea94e47196a0b0bf0b3c7 (patch)
treea62042d28b5ff687d4dd4cfa8e4e7ea7a3e517df
parentf58ea6a2ee0430828474ef4a36beaffbf9edfbd8 (diff)
parent5e64b22b45dc272254cd9d5eeb1e51814035d7fd (diff)
downloadseaweedfs-88e006bc6d7a3019067ea94e47196a0b0bf0b3c7.tar.xz
seaweedfs-88e006bc6d7a3019067ea94e47196a0b0bf0b3c7.zip
Merge pull request #2349 from kmlebedev/fix_repl_volumes_per_step
Topology update for every Nth id volume
-rw-r--r--weed/shell/command_ec_decode.go12
-rw-r--r--weed/shell/command_volume_fix_replication.go119
2 files changed, 98 insertions, 33 deletions
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index 18cea0504..3483156cb 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -208,6 +208,18 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr
}
+func LookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (err error, volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation) {
+ var resp *master_pb.LookupVolumeResponse
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{VolumeOrFileIds: volumeIds})
+ return err
+ })
+ if err != nil {
+ return err, nil
+ }
+ return nil, resp.VolumeIdLocations
+}
+
func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
var resp *master_pb.VolumeListResponse
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 76a582b31..f03cd550e 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -10,6 +10,8 @@ import (
"io"
"path/filepath"
"sort"
+ "strconv"
+ "time"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -56,6 +58,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry")
+ volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle")
+
if err = volFixReplicationCommand.Parse(args); err != nil {
return nil
}
@@ -66,44 +70,87 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
takeAction := !*skipChange
- // collect topology information
- topologyInfo, _, err := collectTopologyInfo(commandEnv)
- if err != nil {
- return err
- }
+ underReplicatedVolumeIdsCount := 1
+ for underReplicatedVolumeIdsCount > 0 {
+ fixedVolumeReplicas := map[string]int{}
- // find all volumes that needs replication
- // collect all data nodes
- volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
+ // collect topology information
+ topologyInfo, _, err := collectTopologyInfo(commandEnv)
+ if err != nil {
+ return err
+ }
- if len(allLocations) == 0 {
- return fmt.Errorf("no data nodes at all")
- }
+ // find all volumes that needs replication
+ // collect all data nodes
+ volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
- // find all under replicated volumes
- var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
- for vid, replicas := range volumeReplicas {
- replica := replicas[0]
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
- if replicaPlacement.GetCopyCount() > len(replicas) {
- underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
- } else if replicaPlacement.GetCopyCount() < len(replicas) {
- overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
- fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
+ if len(allLocations) == 0 {
+ return fmt.Errorf("no data nodes at all")
}
- }
- if len(overReplicatedVolumeIds) > 0 {
- return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
- }
+ // find all under replicated volumes
+ var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
+ for vid, replicas := range volumeReplicas {
+ replica := replicas[0]
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
+ if replicaPlacement.GetCopyCount() > len(replicas) {
+ underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
+ } else if replicaPlacement.GetCopyCount() < len(replicas) {
+ overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
+ fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
+ }
+ }
- if len(underReplicatedVolumeIds) == 0 {
- return nil
- }
+ if len(overReplicatedVolumeIds) > 0 {
+ if err := c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations); err != nil {
+ return err
+ }
+ }
- // find the most under populated data nodes
- return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount)
+ underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds)
+ if underReplicatedVolumeIdsCount > 0 {
+ // find the most under populated data nodes
+ err, fixedVolumeReplicas = c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep)
+ if err != nil {
+ return err
+ }
+ }
+ if *skipChange {
+ break
+ }
+
+ // check that the topology has been updated
+ if len(fixedVolumeReplicas) > 0 {
+ fixedVolumes := make([]string, 0, len(fixedVolumeReplicas))
+ for k, _ := range fixedVolumeReplicas {
+ fixedVolumes = append(fixedVolumes, k)
+ }
+ err, volumeIdLocations := LookupVolumeIds(commandEnv, fixedVolumes)
+ if err != nil {
+ return err
+ }
+ for _, volumeIdLocation := range volumeIdLocations {
+ volumeId := volumeIdLocation.VolumeOrFileId
+ volumeIdLocationCount := len(volumeIdLocation.Locations)
+ i := 0
+ for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount {
+ fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId)
+ time.Sleep(time.Duration(i+1) * time.Second * 7)
+ err, volumeLocIds := LookupVolumeIds(commandEnv, []string{volumeId})
+ if err != nil {
+ return err
+ }
+ volumeIdLocationCount = len(volumeLocIds[0].Locations)
+ if *retryCount > i {
+ return fmt.Errorf("replicas volume %s mismatch in topology", volumeId)
+ }
+ i += 1
+ }
+ }
+ }
+ }
+ return nil
}
func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) {
@@ -156,16 +203,22 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
return nil
}
-func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) {
-
+func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (err error, fixedVolumes map[string]int) {
+ fixedVolumes = map[string]int{}
+ if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 {
+ underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep]
+ }
for _, vid := range underReplicatedVolumeIds {
for i := 0; i < retryCount+1; i++ {
if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil {
+ if takeAction {
+ fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid])
+ }
break
}
}
}
- return
+ return nil, fixedVolumes
}
func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error {