diff options
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 227 |
1 files changed, 178 insertions, 49 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index efd5ae5de..4d986d252 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -4,11 +4,14 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" + "golang.org/x/exp/slices" "io" "path/filepath" - "sort" + "strconv" + "time" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -51,58 +54,112 @@ func (c *commandVolumeFixReplication) Help() string { func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { - if err = commandEnv.confirmIsLocked(); err != nil { - return - } - volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry") + volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle") + if err = volFixReplicationCommand.Parse(args); err != nil { return nil } + if err = commandEnv.confirmIsLocked(args); err != nil { + return + } + takeAction := !*skipChange - // collect topology information - topologyInfo, _, err := collectTopologyInfo(commandEnv) - if err != nil { - return err - } + underReplicatedVolumeIdsCount := 1 + for underReplicatedVolumeIdsCount > 0 { + fixedVolumeReplicas := map[string]int{} - // find all volumes that needs replication - // collect all data nodes - volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo) + // collect topology information + topologyInfo, _, err := collectTopologyInfo(commandEnv, 15*time.Second) + if err != nil { + return err + } - if len(allLocations) == 0 { - return fmt.Errorf("no data nodes at all") - } + // find all volumes that needs replication + // collect all data nodes + volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo) - // find all under replicated volumes - var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 - for vid, replicas := range volumeReplicas { - replica := replicas[0] - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) - if replicaPlacement.GetCopyCount() > len(replicas) { - underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) - } else if replicaPlacement.GetCopyCount() < len(replicas) { - overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) - fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) + if len(allLocations) == 0 { + return fmt.Errorf("no data nodes at all") } - } - if len(overReplicatedVolumeIds) > 0 { - return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations) - } + // find all under replicated volumes + var underReplicatedVolumeIds, overReplicatedVolumeIds, misplacedVolumeIds []uint32 + for vid, replicas := range volumeReplicas { + replica := replicas[0] + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) + if replicaPlacement.GetCopyCount() > len(replicas) { + underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) + } else if replicaPlacement.GetCopyCount() < len(replicas) { + overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) + fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) + } else if isMisplaced(replicas, replicaPlacement) { + misplacedVolumeIds = append(misplacedVolumeIds, vid) + fmt.Fprintf(writer, "volume %d replication %s is not well placed %+v\n", replica.info.Id, replicaPlacement, replicas) + } + } - if len(underReplicatedVolumeIds) == 0 { - return nil - } + if len(overReplicatedVolumeIds) > 0 { + if err := c.deleteOneVolume(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete); err != nil { + return err + } + } - // find the most under populated data nodes - return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount) + if len(misplacedVolumeIds) > 0 { + if err := c.deleteOneVolume(commandEnv, writer, takeAction, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume); err != nil { + return err + } + } + + underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds) + if underReplicatedVolumeIdsCount > 0 { + // find the most under populated data nodes + fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep) + if err != nil { + return err + } + } + + if *skipChange { + break + } + // check that the topology has been updated + if len(fixedVolumeReplicas) > 0 { + fixedVolumes := make([]string, 0, len(fixedVolumeReplicas)) + for k, _ := range fixedVolumeReplicas { + fixedVolumes = append(fixedVolumes, k) + } + volumeIdLocations, err := lookupVolumeIds(commandEnv, fixedVolumes) + if err != nil { + return err + } + for _, volumeIdLocation := range volumeIdLocations { + volumeId := volumeIdLocation.VolumeOrFileId + volumeIdLocationCount := len(volumeIdLocation.Locations) + i := 0 + for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount { + fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId) + time.Sleep(time.Duration(i+1) * time.Second * 7) + volumeLocIds, err := lookupVolumeIds(commandEnv, []string{volumeId}) + if err != nil { + return err + } + volumeIdLocationCount = len(volumeLocIds[0].Locations) + if *retryCount <= i { + return fmt.Errorf("replicas volume %s mismatch in topology", volumeId) + } + i += 1 + } + } + } + } + return nil } func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) { @@ -123,12 +180,14 @@ func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[ui return volumeReplicas, allLocations } -func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { +type SelectOneVolumeFunc func(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica + +func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error { for _, vid := range overReplicatedVolumeIds { replicas := volumeReplicas[vid] replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement)) - replica := pickOneReplicaToDelete(replicas, replicaPlacement) + replica := selectOneVolumeFn(replicas, replicaPlacement) // check collection name pattern if *c.collectionPattern != "" { @@ -141,13 +200,24 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma } } + collectionIsMismatch := false + for _, volumeReplica := range replicas { + if volumeReplica.info.Collection != replica.info.Collection { + fmt.Fprintf(writer, "skip delete volume %d as collection %s is mismatch: %s\n", replica.info.Id, replica.info.Collection, volumeReplica.info.Collection) + collectionIsMismatch = true + } + } + if collectionIsMismatch { + continue + } + fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id) if !takeAction { break } - if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil { + if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), pb.NewServerAddressFromDataNode(replica.location.dataNode)); err != nil { return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err) } @@ -155,16 +225,22 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma return nil } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) { - +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) { + fixedVolumes = map[string]int{} + if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 { + underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep] + } for _, vid := range underReplicatedVolumeIds { for i := 0; i < retryCount+1; i++ { if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil { + if takeAction { + fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid]) + } break } } } - return + return fixedVolumes, nil } func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error { @@ -200,14 +276,28 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co break } - err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ + err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: replica.info.Id, - SourceDataNode: replica.location.dataNode.Id, + SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)), }) if replicateErr != nil { return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) } + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break + } else { + return recvErr + } + } + if resp.ProcessedBytes > 0 { + fmt.Fprintf(writer, "volume %d processed %d bytes\n", replica.info.Id, resp.ProcessedBytes) + } + } + return nil }) @@ -229,8 +319,8 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) { fn := capacityByFreeVolumeCount(diskType) - sort.Slice(dataNodes, func(i, j int) bool { - return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode) + slices.SortFunc(dataNodes, func(a, b location) bool { + return fn(a.dataNode) > fn(b.dataNode) }) } @@ -409,9 +499,7 @@ func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[st } func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica { - - sort.Slice(replicas, func(i, j int) bool { - a, b := replicas[i], replicas[j] + slices.SortFunc(replicas, func(a, b *VolumeReplica) bool { if a.info.Size != b.info.Size { return a.info.Size < b.info.Size } @@ -427,3 +515,44 @@ func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_b return replicas[0] } + +// check and fix misplaced volumes + +func isMisplaced(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) bool { + + for i := 0; i < len(replicas); i++ { + others := otherThan(replicas, i) + if satisfyReplicaPlacement(replicaPlacement, others, *replicas[i].location) { + return false + } + } + + return true + +} + +func otherThan(replicas []*VolumeReplica, index int) (others []*VolumeReplica) { + for i := 0; i < len(replicas); i++ { + if index != i { + others = append(others, replicas[i]) + } + } + return +} + +func pickOneMisplacedVolume(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) (toDelete *VolumeReplica) { + + var deletionCandidates []*VolumeReplica + for i := 0; i < len(replicas); i++ { + others := otherThan(replicas, i) + if !isMisplaced(others, replicaPlacement) { + deletionCandidates = append(deletionCandidates, replicas[i]) + } + } + if len(deletionCandidates) > 0 { + return pickOneReplicaToDelete(deletionCandidates, replicaPlacement) + } + + return pickOneReplicaToDelete(replicas, replicaPlacement) + +} |
