diff options
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 48 |
1 files changed, 41 insertions, 7 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index cee45ee1d..23d6e7156 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -6,8 +6,10 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/types" "golang.org/x/exp/slices" + "google.golang.org/grpc" "io" "path/filepath" "strconv" @@ -57,7 +59,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") - noDelete := volFixReplicationCommand.Bool("noDelete", false, "Do not delete over-replicated volumes, only fix under-replication") + doDelete := volFixReplicationCommand.Bool("doDelete", true, "Also delete over-replicated volumes besides fixing under-replication") + doCheck := volFixReplicationCommand.Bool("doCheck", true, "Also check synchronization before deleting") retryCount := volFixReplicationCommand.Int("retry", 5, "how many times to retry") volumesPerStep := volFixReplicationCommand.Int("volumesPerStep", 0, "how many volumes to fix in one cycle") @@ -70,7 +73,6 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } takeAction := !*skipChange - doDeletes := !*noDelete underReplicatedVolumeIdsCount := 1 for underReplicatedVolumeIdsCount > 0 { @@ -111,14 +113,14 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, return fmt.Errorf("lock is lost") } - if len(overReplicatedVolumeIds) > 0 && doDeletes { - if err := c.deleteOneVolume(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete); err != nil { + if len(overReplicatedVolumeIds) > 0 && *doDelete { + if err := c.deleteOneVolume(commandEnv, writer, takeAction, *doCheck, overReplicatedVolumeIds, volumeReplicas, allLocations, pickOneReplicaToDelete); err != nil { return err } } - if len(misplacedVolumeIds) > 0 && doDeletes { - if err := c.deleteOneVolume(commandEnv, writer, takeAction, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume); err != nil { + if len(misplacedVolumeIds) > 0 && *doDelete { + if err := c.deleteOneVolume(commandEnv, writer, takeAction, *doCheck, misplacedVolumeIds, volumeReplicas, allLocations, pickOneMisplacedVolume); err != nil { return err } } @@ -189,7 +191,28 @@ func collectVolumeReplicaLocations(topologyInfo *master_pb.TopologyInfo) (map[ui type SelectOneVolumeFunc func(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica -func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error { +func checkOneVolume(a *VolumeReplica, b *VolumeReplica, writer io.Writer, grpcDialOption grpc.DialOption) (err error) { + aDB, bDB := needle_map.NewMemDb(), needle_map.NewMemDb() + defer func() { + aDB.Close() + bDB.Close() + }() + + // read index db + readIndexDbCutoffFrom := uint64(time.Now().UnixNano()) + if err = readIndexDatabase(aDB, a.info.Collection, a.info.Id, pb.NewServerAddressFromDataNode(a.location.dataNode), false, writer, grpcDialOption); err != nil { + return fmt.Errorf("readIndexDatabase %s volume %d: %v", a.location.dataNode, a.info.Id, err) + } + if err := readIndexDatabase(bDB, b.info.Collection, b.info.Id, pb.NewServerAddressFromDataNode(b.location.dataNode), false, writer, grpcDialOption); err != nil { + return fmt.Errorf("readIndexDatabase %s volume %d: %v", b.location.dataNode, b.info.Id, err) + } + if _, err = doVolumeCheckDisk(aDB, bDB, a, b, false, writer, true, false, float64(1), readIndexDbCutoffFrom, grpcDialOption); err != nil { + return fmt.Errorf("doVolumeCheckDisk source:%s target:%s volume %d: %v", a.location.dataNode.Id, b.location.dataNode.Id, a.info.Id, err) + } + return +} + +func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, writer io.Writer, takeAction bool, doCheck bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location, selectOneVolumeFn SelectOneVolumeFunc) error { for _, vid := range overReplicatedVolumeIds { replicas := volumeReplicas[vid] replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement)) @@ -224,6 +247,17 @@ func (c *commandVolumeFixReplication) deleteOneVolume(commandEnv *CommandEnv, wr break } + if doCheck { + for _, replicaB := range replicas { + if replicaB.location.dataNode == replica.location.dataNode { + continue + } + if err := checkOneVolume(replica, replicaB, writer, commandEnv.option.GrpcDialOption); err != nil { + return fmt.Errorf("sync volume %d on %s and %s: %v\n", replica.info.Id, replica.location.dataNode.Id, replicaB.location.dataNode.Id, err) + } + } + } + if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), pb.NewServerAddressFromDataNode(replica.location.dataNode), false); err != nil { return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err) |
