aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_volume_fix_replication.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
-rw-r--r--weed/shell/command_volume_fix_replication.go45
1 files changed, 42 insertions, 3 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 21b3ead6b..f03cd550e 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -10,6 +10,8 @@ import (
"io"
"path/filepath"
"sort"
+ "strconv"
+ "time"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -70,6 +72,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
underReplicatedVolumeIdsCount := 1
for underReplicatedVolumeIdsCount > 0 {
+ fixedVolumeReplicas := map[string]int{}
+
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil {
@@ -106,7 +110,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds)
if underReplicatedVolumeIdsCount > 0 {
// find the most under populated data nodes
- if err := c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep); err != nil {
+ err, fixedVolumeReplicas = c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep)
+ if err != nil {
return err
}
}
@@ -114,6 +119,36 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
if *skipChange {
break
}
+
+ // check that the topology has been updated
+ if len(fixedVolumeReplicas) > 0 {
+ fixedVolumes := make([]string, 0, len(fixedVolumeReplicas))
+ for k, _ := range fixedVolumeReplicas {
+ fixedVolumes = append(fixedVolumes, k)
+ }
+ err, volumeIdLocations := LookupVolumeIds(commandEnv, fixedVolumes)
+ if err != nil {
+ return err
+ }
+ for _, volumeIdLocation := range volumeIdLocations {
+ volumeId := volumeIdLocation.VolumeOrFileId
+ volumeIdLocationCount := len(volumeIdLocation.Locations)
+ i := 0
+ for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount {
+ fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId)
+ time.Sleep(time.Duration(i+1) * time.Second * 7)
+ err, volumeLocIds := LookupVolumeIds(commandEnv, []string{volumeId})
+ if err != nil {
+ return err
+ }
+ volumeIdLocationCount = len(volumeLocIds[0].Locations)
+ if *retryCount > i {
+ return fmt.Errorf("replicas volume %s mismatch in topology", volumeId)
+ }
+ i += 1
+ }
+ }
+ }
}
return nil
}
@@ -168,18 +203,22 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
return nil
}
-func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (err error) {
+func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (err error, fixedVolumes map[string]int) {
+ fixedVolumes = map[string]int{}
if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 {
underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep]
}
for _, vid := range underReplicatedVolumeIds {
for i := 0; i < retryCount+1; i++ {
if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil {
+ if takeAction {
+ fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid])
+ }
break
}
}
}
- return
+ return nil, fixedVolumes
}
func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error {