aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_volume_fix_replication.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
-rw-r--r--weed/shell/command_volume_fix_replication.go201
1 files changed, 140 insertions, 61 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 6b5e4e735..471b24a2a 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -2,9 +2,10 @@ package shell
import (
"context"
+ "flag"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"io"
- "math/rand"
"sort"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -27,16 +28,18 @@ func (c *commandVolumeFixReplication) Name() string {
func (c *commandVolumeFixReplication) Help() string {
return `add replicas to volumes that are missing replicas
- This command file all under-replicated volumes, and find volume servers with free slots.
+ This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
+
+ This command also finds all under-replicated volumes, and finds volume servers with free slots.
If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
volume.fix.replication -n # do not take action
- volume.fix.replication # actually copying the volume files and mount the volume
+ volume.fix.replication # actually deleting or copying the volume files and mount the volume
Note:
* each time this will only add back one replica for one volume id. If there are multiple replicas
are missing, e.g. multiple volume servers are new, you may need to run this multiple times.
- * do not run this too quick within seconds, since the new volume replica may take a few seconds
+ * do not run this too quickly within seconds, since the new volume replica may take a few seconds
to register itself to the master.
`
@@ -48,11 +51,14 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
return
}
- takeAction := true
- if len(args) > 0 && args[0] == "-n" {
- takeAction = false
+ volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
+ if err = volFixReplicationCommand.Parse(args); err != nil {
+ return nil
}
+ takeAction := !*skipChange
+
var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
@@ -64,53 +70,89 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
// find all volumes that needs replication
// collect all data nodes
- replicatedVolumeLocations := make(map[uint32][]location)
- replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage)
+ volumeReplicas, allLocations := collectVolumeReplicaLocations(resp)
+
+ if len(allLocations) == 0 {
+ return fmt.Errorf("no data nodes at all")
+ }
+
+ // find all under replicated volumes
+ var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
+ for vid, replicas := range volumeReplicas {
+ replica := replicas[0]
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
+ if replicaPlacement.GetCopyCount() > len(replicas) {
+ underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
+ } else if replicaPlacement.GetCopyCount() < len(replicas) {
+ overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
+ fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
+ }
+ }
+
+ if len(overReplicatedVolumeIds) > 0 {
+ return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
+ }
+
+ if len(underReplicatedVolumeIds) == 0 {
+ return nil
+ }
+
+ // find the most under populated data nodes
+ keepDataNodesSorted(allLocations)
+
+ return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations)
+
+}
+
+func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint32][]*VolumeReplica, []location) {
+ volumeReplicas := make(map[uint32][]*VolumeReplica)
var allLocations []location
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
loc := newLocation(dc, string(rack), dn)
for _, v := range dn.VolumeInfos {
- if v.ReplicaPlacement > 0 {
- replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc)
- replicatedVolumeInfo[v.Id] = v
- }
+ volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
+ location: &loc,
+ info: v,
+ })
}
allLocations = append(allLocations, loc)
})
+ return volumeReplicas, allLocations
+}
- // find all under replicated volumes
- underReplicatedVolumeLocations := make(map[uint32][]location)
- for vid, locations := range replicatedVolumeLocations {
- volumeInfo := replicatedVolumeInfo[vid]
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
- if replicaPlacement.GetCopyCount() > len(locations) {
- underReplicatedVolumeLocations[vid] = locations
+func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
+ for _, vid := range overReplicatedVolumeIds {
+ replicas := volumeReplicas[vid]
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
+
+ replica := pickOneReplicaToDelete(replicas, replicaPlacement)
+
+ fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
+
+ if !takeAction {
+ break
}
- }
- if len(underReplicatedVolumeLocations) == 0 {
- return fmt.Errorf("no under replicated volumes")
- }
+ if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil {
+ return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
+ }
- if len(allLocations) == 0 {
- return fmt.Errorf("no data nodes at all")
}
+ return nil
+}
- // find the most under populated data nodes
- keepDataNodesSorted(allLocations)
-
- for vid, locations := range underReplicatedVolumeLocations {
- volumeInfo := replicatedVolumeInfo[vid]
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
+func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
+ for _, vid := range underReplicatedVolumeIds {
+ replicas := volumeReplicas[vid]
+ replica := pickOneReplicaToCopyFrom(replicas)
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
foundNewLocation := false
for _, dst := range allLocations {
// check whether data nodes satisfy the constraints
- if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) {
+ if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
// ask the volume server to replicate the volume
- sourceNodes := underReplicatedVolumeLocations[vid]
- sourceNode := sourceNodes[rand.Intn(len(sourceNodes))]
foundNewLocation = true
- fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id)
+ fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
if !takeAction {
break
@@ -118,11 +160,11 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
- VolumeId: volumeInfo.Id,
- SourceDataNode: sourceNode.dataNode.Id,
+ VolumeId: replica.info.Id,
+ SourceDataNode: replica.location.dataNode.Id,
})
if replicateErr != nil {
- return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr)
+ return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
}
return nil
})
@@ -138,11 +180,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
}
if !foundNewLocation {
- fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations)
+ fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
}
}
-
return nil
}
@@ -182,22 +223,15 @@ func keepDataNodesSorted(dataNodes []location) {
return false
}
*/
-func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
+func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
- existingDataNodes := make(map[string]int)
- for _, loc := range existingLocations {
- existingDataNodes[loc.String()] += 1
- }
- sameDataNodeCount := existingDataNodes[possibleLocation.String()]
- // avoid duplicated volume on the same data node
- if sameDataNodeCount > 0 {
+ existingDataCenters, _, existingDataNodes := countReplicas(replicas)
+
+ if _, found := existingDataNodes[possibleLocation.String()]; found {
+ // avoid duplicated volume on the same data node
return false
}
- existingDataCenters := make(map[string]int)
- for _, loc := range existingLocations {
- existingDataCenters[loc.DataCenter()] += 1
- }
primaryDataCenters, _ := findTopKeys(existingDataCenters)
// ensure data center count is within limit
@@ -218,20 +252,20 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, exi
}
// now this is one of the primary dcs
- existingRacks := make(map[string]int)
- for _, loc := range existingLocations {
- if loc.DataCenter() != possibleLocation.DataCenter() {
+ primaryDcRacks := make(map[string]int)
+ for _, replica := range replicas {
+ if replica.location.DataCenter() != possibleLocation.DataCenter() {
continue
}
- existingRacks[loc.Rack()] += 1
+ primaryDcRacks[replica.location.Rack()] += 1
}
- primaryRacks, _ := findTopKeys(existingRacks)
- sameRackCount := existingRacks[possibleLocation.Rack()]
+ primaryRacks, _ := findTopKeys(primaryDcRacks)
+ sameRackCount := primaryDcRacks[possibleLocation.Rack()]
// ensure rack count is within limit
- if _, found := existingRacks[possibleLocation.Rack()]; !found {
+ if _, found := primaryDcRacks[possibleLocation.Rack()]; !found {
// different from existing racks
- if len(existingRacks) < replicaPlacement.DiffRackCount+1 {
+ if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 {
// lack on different racks
return true
} else {
@@ -280,6 +314,11 @@ func isAmong(key string, keys []string) bool {
return false
}
+type VolumeReplica struct {
+ location *location
+ info *master_pb.VolumeInformationMessage
+}
+
type location struct {
dc string
rack string
@@ -305,3 +344,43 @@ func (l location) Rack() string {
func (l location) DataCenter() string {
return l.dc
}
+
+func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
+ mostRecent := replicas[0]
+ for _, replica := range replicas {
+ if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
+ mostRecent = replica
+ }
+ }
+ return mostRecent
+}
+
+func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
+ diffDc = make(map[string]int)
+ diffRack = make(map[string]int)
+ diffNode = make(map[string]int)
+ for _, replica := range replicas {
+ diffDc[replica.location.DataCenter()] += 1
+ diffRack[replica.location.Rack()] += 1
+ diffNode[replica.location.String()] += 1
+ }
+ return
+}
+
+func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
+
+ allSame := true
+ oldest := replicas[0]
+ for _, replica := range replicas {
+ if replica.info.ModifiedAtSecond < oldest.info.ModifiedAtSecond {
+ oldest = replica
+ allSame = false
+ }
+ }
+ if !allSame {
+ return oldest
+ }
+
+ // TODO what if all the replicas have the same timestamp?
+ return oldest
+}