aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_volume_fix_replication.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
-rw-r--r--weed/shell/command_volume_fix_replication.go101
1 files changed, 55 insertions, 46 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 538351fd0..9e6280788 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -58,6 +58,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
+ retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry")
if err = volFixReplicationCommand.Parse(args); err != nil {
return nil
}
@@ -100,7 +101,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
// find the most under populated data nodes
- return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations)
+ return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount)
}
@@ -154,64 +155,72 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
return nil
}
-func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
+func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) {
for _, vid := range underReplicatedVolumeIds {
- replicas := volumeReplicas[vid]
- replica := pickOneReplicaToCopyFrom(replicas)
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
- foundNewLocation := false
- hasSkippedCollection := false
- keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
- fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
- for _, dst := range allLocations {
- // check whether data nodes satisfy the constraints
- if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
- // check collection name pattern
- if *c.collectionPattern != "" {
- matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
- if err != nil {
- return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
- }
- if !matched {
- hasSkippedCollection = true
- break
- }
- }
-
- // ask the volume server to replicate the volume
- foundNewLocation = true
- fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
+ for i := 0; i < retryCount+1; i++ {
+ if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil {
+ continue
+ }
+ }
+ }
+ return
+}
- if !takeAction {
+func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error {
+ replicas := volumeReplicas[vid]
+ replica := pickOneReplicaToCopyFrom(replicas)
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
+ foundNewLocation := false
+ hasSkippedCollection := false
+ keepDataNodesSorted(allLocations, types.ToDiskType(replica.info.DiskType))
+ fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
+ for _, dst := range allLocations {
+ // check whether data nodes satisfy the constraints
+ if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
+ // check collection name pattern
+ if *c.collectionPattern != "" {
+ matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
+ if err != nil {
+ return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
+ }
+ if !matched {
+ hasSkippedCollection = true
break
}
+ }
- err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
- VolumeId: replica.info.Id,
- SourceDataNode: replica.location.dataNode.Id,
- })
- if replicateErr != nil {
- return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
- }
- return nil
- })
+ // ask the volume server to replicate the volume
+ foundNewLocation = true
+ fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
- if err != nil {
- return err
+ if !takeAction {
+ break
+ }
+
+ err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
+ VolumeId: replica.info.Id,
+ SourceDataNode: replica.location.dataNode.Id,
+ })
+ if replicateErr != nil {
+ return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
}
+ return nil
+ })
- // adjust free volume count
- dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount--
- break
+ if err != nil {
+ return err
}
- }
- if !foundNewLocation && !hasSkippedCollection {
- fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
+ // adjust free volume count
+ dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount--
+ break
}
+ }
+ if !foundNewLocation && !hasSkippedCollection {
+ fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
}
return nil
}