diff options
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 45 |
1 files changed, 42 insertions, 3 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 21b3ead6b..f03cd550e 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -10,6 +10,8 @@ import ( "io" "path/filepath" "sort" + "strconv" + "time" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -70,6 +72,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, underReplicatedVolumeIdsCount := 1 for underReplicatedVolumeIdsCount > 0 { + fixedVolumeReplicas := map[string]int{} + // collect topology information topologyInfo, _, err := collectTopologyInfo(commandEnv) if err != nil { @@ -106,7 +110,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) if underReplicatedVolumeIdsCount > 0 { // find the most under populated data nodes - if err := c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep); err != nil { + err, fixedVolumeReplicas = c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep) + if err != nil { return err } } @@ -114,6 +119,36 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, if *skipChange { break } + + // check that the topology has been updated + if len(fixedVolumeReplicas) > 0 { + fixedVolumes := make([]string, 0, len(fixedVolumeReplicas)) + for k, _ := range fixedVolumeReplicas { + fixedVolumes = append(fixedVolumes, k) + } + err, volumeIdLocations := LookupVolumeIds(commandEnv, fixedVolumes) + if err != nil { + return err + } + for _, volumeIdLocation := range volumeIdLocations { + volumeId := volumeIdLocation.VolumeOrFileId + volumeIdLocationCount := len(volumeIdLocation.Locations) + i := 0 + for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount { + fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId) + time.Sleep(time.Duration(i+1) * time.Second * 7) + err, volumeLocIds := LookupVolumeIds(commandEnv, []string{volumeId}) + if err != nil { + return err + } + volumeIdLocationCount = len(volumeLocIds[0].Locations) + if *retryCount > i { + return fmt.Errorf("replicas volume %s mismatch in topology", volumeId) + } + i += 1 + } + } + } } return nil } @@ -168,18 +203,22 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma return nil } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (err error) { +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (err error, fixedVolumes map[string]int) { + fixedVolumes = map[string]int{} if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 { underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep] } for _, vid := range underReplicatedVolumeIds { for i := 0; i < retryCount+1; i++ { if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil { + if takeAction { + fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid]) + } break } } } - return + return nil, fixedVolumes } func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error { |
