diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2021-10-11 16:35:47 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-10-11 16:35:47 +0800 |
| commit | 0548ed3a1b5aa15b035b452519568bc017c065a3 (patch) | |
| tree | 4d77f014c56a1f5e1f18fc78413aa86c492ad41c /weed/shell/command_volume_fix_replication.go | |
| parent | 1de733fda507e1da94b2e4741c74ba7e5e2c5f76 (diff) | |
| parent | 84d2e1bdd099550aaba494c88324c8c0dbc08776 (diff) | |
| download | seaweedfs-0548ed3a1b5aa15b035b452519568bc017c065a3.tar.xz seaweedfs-0548ed3a1b5aa15b035b452519568bc017c065a3.zip | |
Merge pull request #82 from chrislusf/master
sync
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 134 |
1 files changed, 94 insertions, 40 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index efd5ae5de..5d7506c0b 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -4,11 +4,14 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "path/filepath" "sort" + "strconv" + "time" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -51,58 +54,103 @@ func (c *commandVolumeFixReplication) Help() string { func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - if err = commandEnv.confirmIsLocked(); err != nil { - return - } - volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry") + volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle") + if err = volFixReplicationCommand.Parse(args); err != nil { return nil } + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + takeAction := !*skipChange - // collect topology information - topologyInfo, _, err := collectTopologyInfo(commandEnv) - if err != nil { - return err - } + underReplicatedVolumeIdsCount := 1 + for underReplicatedVolumeIdsCount > 0 { + fixedVolumeReplicas := map[string]int{} - // find all volumes that needs replication - // collect all data nodes - volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo) + // collect topology information + topologyInfo, _, err := collectTopologyInfo(commandEnv) + if err != nil { + return err + } - if len(allLocations) == 0 { - return fmt.Errorf("no data nodes at all") - } + // find all volumes that needs replication + // collect all data nodes + volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo) - // find all under replicated volumes - var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 - for vid, replicas := range volumeReplicas { - replica := replicas[0] - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) - if replicaPlacement.GetCopyCount() > len(replicas) { - underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) - } else if replicaPlacement.GetCopyCount() < len(replicas) { - overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) - fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) + if len(allLocations) == 0 { + return fmt.Errorf("no data nodes at all") } - } - if len(overReplicatedVolumeIds) > 0 { - return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations) - } + // find all under replicated volumes + var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 + for vid, replicas := range volumeReplicas { + replica := replicas[0] + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) + if replicaPlacement.GetCopyCount() > len(replicas) { + underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) + } else if replicaPlacement.GetCopyCount() < len(replicas) { + overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) + fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) + } + } - if len(underReplicatedVolumeIds) == 0 { - return nil - } + if len(overReplicatedVolumeIds) > 0 { + if err := c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations); err != nil { + return err + } + } - // find the most under populated data nodes - return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount) + underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) + if underReplicatedVolumeIdsCount > 0 { + // find the most under populated data nodes + fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep) + if err != nil { + return err + } + } + + if *skipChange { + break + } + // check that the topology has been updated + if len(fixedVolumeReplicas) > 0 { + fixedVolumes := make([]string, 0, len(fixedVolumeReplicas)) + for k, _ := range fixedVolumeReplicas { + fixedVolumes = append(fixedVolumes, k) + } + volumeIdLocations, err := lookupVolumeIds(commandEnv, fixedVolumes) + if err != nil { + return err + } + for _, volumeIdLocation := range volumeIdLocations { + volumeId := volumeIdLocation.VolumeOrFileId + volumeIdLocationCount := len(volumeIdLocation.Locations) + i := 0 + for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount { + fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId) + time.Sleep(time.Duration(i+1) * time.Second * 7) + volumeLocIds, err := lookupVolumeIds(commandEnv, []string{volumeId}) + if err != nil { + return err + } + volumeIdLocationCount = len(volumeLocIds[0].Locations) + if *retryCount > i { + return fmt.Errorf("replicas volume %s mismatch in topology", volumeId) + } + i += 1 + } + } + } + } + return nil } func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) { @@ -147,7 +195,7 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma break } - if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil { + if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), pb.NewServerAddressFromDataNode(replica.location.dataNode)); err != nil { return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err) } @@ -155,16 +203,22 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma return nil } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) { - +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) { + fixedVolumes = map[string]int{} + if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 { + underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep] + } for _, vid := range underReplicatedVolumeIds { for i := 0; i < retryCount+1; i++ { if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil { + if takeAction { + fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid]) + } break } } } - return + return fixedVolumes, nil } func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error { @@ -200,10 +254,10 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co break } - err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: replica.info.Id, - SourceDataNode: replica.location.dataNode.Id, + SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)), }) if replicateErr != nil { return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) |
