aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_volume_fix_replication.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
-rw-r--r--weed/shell/command_volume_fix_replication.go227
1 files changed, 178 insertions, 49 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index efd5ae5de..4d986d252 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -4,11 +4,14 @@ import (
"context"
"flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "golang.org/x/exp/slices"
"io"
"path/filepath"
- "sort"
+ "strconv"
+ "time"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -51,58 +54,112 @@ func (c *commandVolumeFixReplication) Help() string {
func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- if err = commandEnv.confirmIsLocked(); err != nil {
- return
- }
-
volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
retryCount := volFixReplicationCommand.Int("retry", 0, "how many times to retry")
+ volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle")
+
if err = volFixReplicationCommand.Parse(args); err != nil {
return nil
}
+ if err = commandEnv.confirmIsLocked(args); err != nil {
+ return
+ }
+
takeAction := !*skipChange
- // collect topology information
- topologyInfo, _, err := collectTopologyInfo(commandEnv)
- if err != nil {
- return err
- }
+ underReplicatedVolumeIdsCount := 1
+ for underReplicatedVolumeIdsCount > 0 {
+ fixedVolumeReplicas := map[string]int{}
- // find all volumes that needs replication
- // collect all data nodes
- volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
+ // collect topology information
+ topologyInfo, _, err := collectTopologyInfo(commandEnv, 15*time.Second)
+ if err != nil {
+ return err
+ }
- if len(allLocations) == 0 {
- return fmt.Errorf("no data nodes at all")
- }
+ // find all volumes that needs replication
+ // collect all data nodes
+ volumeReplicas, allLocations := collectVolumeReplicaLocations(topologyInfo)
- // find all under replicated volumes
- var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
- for vid, replicas := range volumeReplicas {
- replica := replicas[0]
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
- if replicaPlacement.GetCopyCount() > len(replicas) {
- underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
- } else if replicaPlacement.GetCopyCount() < len(replicas) {
- overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
- fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
+ if len(allLocations) == 0 {
+ return fmt.Errorf("no data nodes at all")
}
- }
- if len(overReplicatedVolumeIds) > 0 {
- return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
- }
+ // find all under replicated volumes
+ var underReplicatedVolumeIds, overReplicatedVolumeIds, misplacedVolumeIds []uint32
+ for vid, replicas := range volumeReplicas {
+ replica := replicas[0]
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
+ if replicaPlacement.GetCopyCount() > len(replicas) {
+ underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
+ } else if replicaPlacement.GetCopyCount() < len(replicas) {
+ overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
+ fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
+ } else if isMisplaced(replicas, replicaPlacement) {
+ misplacedVolumeIds = append(misplacedVolumeIds, vid)
+ fmt.Fprintf(writer, "volume %d replication %s is not well placed %+v\n", replica.info.Id, replicaPlacement, replicas)
+ }
+ }
- if len(underReplicatedVolumeIds) == 0 {
- return nil
- }
+ if len(overReplicatedVolumeIds) > 0 {
+ if err := c.deleteOneVolume(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete); err != nil {
+ return err
+ }
+ }
- // find the most under populated data nodes
- return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount)
+ if len(misplacedVolumeIds) > 0 {
+ if err := c.deleteOneVolume(commandEnv, writer, takeAction, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume); err != nil {
+ return err
+ }
+ }
+
+ underReplicatedVolumeIdsCount = len(underReplicatedVolumeIds)
+ if underReplicatedVolumeIdsCount > 0 {
+ // find the most under populated data nodes
+ fixedVolumeReplicas, err = c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations, *retryCount, *volumesPerStep)
+ if err != nil {
+ return err
+ }
+ }
+
+ if *skipChange {
+ break
+ }
+ // check that the topology has been updated
+ if len(fixedVolumeReplicas) > 0 {
+ fixedVolumes := make([]string, 0, len(fixedVolumeReplicas))
+ for k, _ := range fixedVolumeReplicas {
+ fixedVolumes = append(fixedVolumes, k)
+ }
+ volumeIdLocations, err := lookupVolumeIds(commandEnv, fixedVolumes)
+ if err != nil {
+ return err
+ }
+ for _, volumeIdLocation := range volumeIdLocations {
+ volumeId := volumeIdLocation.VolumeOrFileId
+ volumeIdLocationCount := len(volumeIdLocation.Locations)
+ i := 0
+ for fixedVolumeReplicas[volumeId] >= volumeIdLocationCount {
+ fmt.Fprintf(writer, "the number of locations for volume %s has not increased yet, let's wait\n", volumeId)
+ time.Sleep(time.Duration(i+1) * time.Second * 7)
+ volumeLocIds, err := lookupVolumeIds(commandEnv, []string{volumeId})
+ if err != nil {
+ return err
+ }
+ volumeIdLocationCount = len(volumeLocIds[0].Locations)
+ if *retryCount <= i {
+ return fmt.Errorf("replicas volume %s mismatch in topology", volumeId)
+ }
+ i += 1
+ }
+ }
+ }
+ }
+ return nil
}
func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[uint32][]*VolumeReplica, []location) {
@@ -123,12 +180,14 @@ func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[ui
return volumeReplicas, allLocations
}
-func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
+type SelectOneVolumeFunc func(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica
+
+func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error {
for _, vid := range overReplicatedVolumeIds {
replicas := volumeReplicas[vid]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
- replica := pickOneReplicaToDelete(replicas, replicaPlacement)
+ replica := selectOneVolumeFn(replicas, replicaPlacement)
// check collection name pattern
if *c.collectionPattern != "" {
@@ -141,13 +200,24 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
}
}
+ collectionIsMismatch := false
+ for _, volumeReplica := range replicas {
+ if volumeReplica.info.Collection != replica.info.Collection {
+ fmt.Fprintf(writer, "skip delete volume %d as collection %s is mismatch: %s\n", replica.info.Id, replica.info.Collection, volumeReplica.info.Collection)
+ collectionIsMismatch = true
+ }
+ }
+ if collectionIsMismatch {
+ continue
+ }
+
fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
if !takeAction {
break
}
- if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil {
+ if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), pb.NewServerAddressFromDataNode(replica.location.dataNode)); err != nil {
return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
}
@@ -155,16 +225,22 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
return nil
}
-func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int) (err error) {
-
+func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, retryCount int, volumesPerStep int) (fixedVolumes map[string]int, err error) {
+ fixedVolumes = map[string]int{}
+ if len(underReplicatedVolumeIds) > volumesPerStep && volumesPerStep > 0 {
+ underReplicatedVolumeIds = underReplicatedVolumeIds[0:volumesPerStep]
+ }
for _, vid := range underReplicatedVolumeIds {
for i := 0; i < retryCount+1; i++ {
if err = c.fixOneUnderReplicatedVolume(commandEnv, writer, takeAction, volumeReplicas, vid, allLocations); err == nil {
+ if takeAction {
+ fixedVolumes[strconv.FormatUint(uint64(vid), 10)] = len(volumeReplicas[vid])
+ }
break
}
}
}
- return
+ return fixedVolumes, nil
}
func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, volumeReplicas map[uint32][]*VolumeReplica, vid uint32, allLocations []location) error {
@@ -200,14 +276,28 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
break
}
- err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
+ err := operation.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dst.dataNode), commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: replica.info.Id,
- SourceDataNode: replica.location.dataNode.Id,
+ SourceDataNode: string(pb.NewServerAddressFromDataNode(replica.location.dataNode)),
})
if replicateErr != nil {
return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
}
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ break
+ } else {
+ return recvErr
+ }
+ }
+ if resp.ProcessedBytes > 0 {
+ fmt.Fprintf(writer, "volume %d processed %d bytes\n", replica.info.Id, resp.ProcessedBytes)
+ }
+ }
+
return nil
})
@@ -229,8 +319,8 @@ func (c *commandVolumeFixReplication) fixOneUnderReplicatedVolume(commandEnv *Co
func keepDataNodesSorted(dataNodes []location, diskType types.DiskType) {
fn := capacityByFreeVolumeCount(diskType)
- sort.Slice(dataNodes, func(i, j int) bool {
- return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode)
+ slices.SortFunc(dataNodes, func(a, b location) bool {
+ return fn(a.dataNode) > fn(b.dataNode)
})
}
@@ -409,9 +499,7 @@ func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[st
}
func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
-
- sort.Slice(replicas, func(i, j int) bool {
- a, b := replicas[i], replicas[j]
+ slices.SortFunc(replicas, func(a, b *VolumeReplica) bool {
if a.info.Size != b.info.Size {
return a.info.Size < b.info.Size
}
@@ -427,3 +515,44 @@ func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_b
return replicas[0]
}
+
+// check and fix misplaced volumes
+
+func isMisplaced(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) bool {
+
+ for i := 0; i < len(replicas); i++ {
+ others := otherThan(replicas, i)
+ if satisfyReplicaPlacement(replicaPlacement, others, *replicas[i].location) {
+ return false
+ }
+ }
+
+ return true
+
+}
+
+func otherThan(replicas []*VolumeReplica, index int) (others []*VolumeReplica) {
+ for i := 0; i < len(replicas); i++ {
+ if index != i {
+ others = append(others, replicas[i])
+ }
+ }
+ return
+}
+
+func pickOneMisplacedVolume(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) (toDelete *VolumeReplica) {
+
+ var deletionCandidates []*VolumeReplica
+ for i := 0; i < len(replicas); i++ {
+ others := otherThan(replicas, i)
+ if !isMisplaced(others, replicaPlacement) {
+ deletionCandidates = append(deletionCandidates, replicas[i])
+ }
+ }
+ if len(deletionCandidates) > 0 {
+ return pickOneReplicaToDelete(deletionCandidates, replicaPlacement)
+ }
+
+ return pickOneReplicaToDelete(replicas, replicaPlacement)
+
+}