aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/shell/command_volume_fix_replication.go58
1 files changed, 35 insertions, 23 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index a687a0f23..65e212444 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -29,6 +29,7 @@ func init() {
type commandVolumeFixReplication struct {
collectionPattern *string
+ // TODO: move parameter flags here so we don't shuffle them around via function calls.
}
func (c *commandVolumeFixReplication) Name() string {
@@ -67,6 +68,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
applyChanges := volFixReplicationCommand.Bool("force", false, "apply the fix")
doDelete := volFixReplicationCommand.Bool("doDelete", true, "Also delete over-replicated volumes besides fixing under-replication")
doCheck := volFixReplicationCommand.Bool("doCheck", true, "Also check synchronization before deleting")
+ maxParallelization := volFixReplicationCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
retryCount := volFixReplicationCommand.Int("retry", 5, "how many times to retry")
volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle")
@@ -81,6 +83,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
return
}
+ ewg := NewErrorWaitGroup(*maxParallelization)
underReplicatedVolumeIdsCount := 1
for underReplicatedVolumeIdsCount > 0 {
fixedVolumeReplicas := map[string]int{}
@@ -107,6 +110,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
switch {
case replicaPlacement.GetCopyCount() > len(replicas) || !satisfyReplicaCurrentLocation(replicaPlacement, replicas):
underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
+ fmt.Fprintf(writer, "volume %d replication %s, but under replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
case isMisplaced(replicas, replicaPlacement):
misplacedVolumeIds = append(misplacedVolumeIds, vid)
fmt.Fprintf(writer, "volume %d replication %s is not well placed %s\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id)
@@ -115,30 +119,28 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
}
}
+ underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds)
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
}
- if len(overReplicatedVolumeIds) > 0 && *doDelete {
- if err := c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete); err != nil {
- return err
- }
- }
-
- if len(misplacedVolumeIds) > 0 && *doDelete {
- if err := c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume); err != nil {
- return err
- }
- }
-
- underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds)
- if underReplicatedVolumeIdsCount > 0 {
+ ewg.Reset()
+ ewg.Add(func() error {
// find the most underpopulated data nodes
fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, *applyChanges, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep)
- if err != nil {
- return err
- }
+ return err
+ })
+ if *doDelete {
+ ewg.Add(func() error {
+ return c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete)
+ })
+ ewg.Add(func() error {
+ return c.deleteOneVolume(commandEnv, writer, *applyChanges, *doCheck, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume)
+ })
+ }
+ if err := ewg.Wait(); err != nil {
+ return nil
}
if !*applyChanges {
@@ -219,8 +221,13 @@ func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, grpcDi
return
}
-func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, doCheck bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error {
- for _, vid := range overReplicatedVolumeIds {
+func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, doCheck bool, volumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error {
+ if len(volumeIds) == 0 {
+ // nothing to do
+ return nil
+ }
+
+ for _, vid := range volumeIds {
replicas := volumeReplicas[vid]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
@@ -279,12 +286,17 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr
return nil
}
-func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) {
+func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, applyChanges bool, volumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) {
fixedVolumes = map[string]int{}
- if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 {
- underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep]
+
+ if len(volumeIds) == 0 {
+ return fixedVolumes, nil
+ }
+
+ if len(volumeIds) > volumesPerStep && volumesPerStep > 0 {
+ volumeIds = volumeIds[0:volumesPerStep]
}
- for _, vid := range underReplicatedVolumeIds {
+ for _, vid := range volumeIds {
for i := 0; i < retryCount+1; i++ {
if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, applyChanges, volumeReplicas, vid, allLocations); err == nil {
if applyChanges {