aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-09-07 16:00:10 -0700
committerChris Lu <chris.lu@gmail.com>2020-09-07 16:00:10 -0700
commit64a621bcc8e8acfd3eef14e0a08967c759bd84f0 (patch)
tree1839ee1b7770b31dcca4953e6ce2da66cff5e6da
parentc18ea21f7a4ed2d96798032f6e1dad07860fd914 (diff)
downloadseaweedfs-64a621bcc8e8acfd3eef14e0a08967c759bd84f0.tar.xz
seaweedfs-64a621bcc8e8acfd3eef14e0a08967c759bd84f0.zip
shell: volume.fix.replication also purge over replicated volumes
-rw-r--r--weed/shell/command_volume_fix_replication.go114
1 files changed, 86 insertions, 28 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 4a1d2e056..d94e7ded8 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -3,8 +3,8 @@ package shell
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"io"
- "math/rand"
"sort"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -27,11 +27,13 @@ func (c *commandVolumeFixReplication) Name() string {
func (c *commandVolumeFixReplication) Help() string {
return `add replicas to volumes that are missing replicas
- This command finds all under-replicated volumes, and finds volume servers with free slots.
+ This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
+
+ This command also finds all under-replicated volumes, and finds volume servers with free slots.
If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
volume.fix.replication -n # do not take action
- volume.fix.replication # actually copying the volume files and mount the volume
+ volume.fix.replication # actually deleting or copying the volume files and mount the volume
Note:
* each time this will only add back one replica for one volume id. If there are multiple replicas
@@ -69,12 +71,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
loc := newLocation(dc, string(rack), dn)
for _, v := range dn.VolumeInfos {
- if v.ReplicaPlacement > 0 {
- volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
- location: &loc,
- info: v,
- })
- }
+ volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
+ location: &loc,
+ info: v,
+ })
}
allLocations = append(allLocations, loc)
})
@@ -82,7 +82,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
// find all under replicated volumes
var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
for vid, replicas := range volumeReplicas {
- replica := replicas[rand.Intn(len(replicas))]
+ replica := replicas[0]
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
if replicaPlacement.GetCopyCount() > len(replicas) {
underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
@@ -92,6 +92,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
}
+ if len(overReplicatedVolumeIds) > 0 {
+ return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
+ }
+
if len(underReplicatedVolumeIds) == 0 {
return fmt.Errorf("no under replicated volumes")
}
@@ -107,10 +111,31 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
+func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
+ for _, vid := range overReplicatedVolumeIds {
+ replicas := volumeReplicas[vid]
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
+
+ replica := pickOneReplicaToDelete(replicas, replicaPlacement)
+
+ fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
+
+ if !takeAction {
+ break
+ }
+
+ if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil {
+ return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
+ }
+
+ }
+ return nil
+}
+
func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
for _, vid := range underReplicatedVolumeIds {
replicas := volumeReplicas[vid]
- replica := replicas[rand.Intn(len(replicas))]
+ replica := pickOneReplicaToCopyFrom(replicas)
replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
foundNewLocation := false
for _, dst := range allLocations {
@@ -191,20 +216,13 @@ func keepDataNodesSorted(dataNodes []location) {
*/
func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
- existingDataNodes := make(map[string]int)
- for _, replica := range replicas {
- existingDataNodes[replica.location.String()] += 1
- }
- sameDataNodeCount := existingDataNodes[possibleLocation.String()]
- // avoid duplicated volume on the same data node
- if sameDataNodeCount > 0 {
+ existingDataCenters, _, existingDataNodes := countReplicas(replicas)
+
+ if _, found := existingDataNodes[possibleLocation.String()]; found {
+ // avoid duplicated volume on the same data node
return false
}
- existingDataCenters := make(map[string]int)
- for _, replica := range replicas {
- existingDataCenters[replica.location.DataCenter()] += 1
- }
primaryDataCenters, _ := findTopKeys(existingDataCenters)
// ensure data center count is within limit
@@ -225,20 +243,20 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, rep
}
// now this is one of the primary dcs
- existingRacks := make(map[string]int)
+ primaryDcRacks := make(map[string]int)
for _, replica := range replicas {
if replica.location.DataCenter() != possibleLocation.DataCenter() {
continue
}
- existingRacks[replica.location.Rack()] += 1
+ primaryDcRacks[replica.location.Rack()] += 1
}
- primaryRacks, _ := findTopKeys(existingRacks)
- sameRackCount := existingRacks[possibleLocation.Rack()]
+ primaryRacks, _ := findTopKeys(primaryDcRacks)
+ sameRackCount := primaryDcRacks[possibleLocation.Rack()]
// ensure rack count is within limit
- if _, found := existingRacks[possibleLocation.Rack()]; !found {
+ if _, found := primaryDcRacks[possibleLocation.Rack()]; !found {
// different from existing racks
- if len(existingRacks) < replicaPlacement.DiffRackCount+1 {
+ if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 {
// lack on different racks
return true
} else {
@@ -317,3 +335,43 @@ func (l location) Rack() string {
func (l location) DataCenter() string {
return l.dc
}
+
+func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
+ mostRecent := replicas[0]
+ for _, replica := range replicas {
+ if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
+ mostRecent = replica
+ }
+ }
+ return mostRecent
+}
+
+func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
+ diffDc = make(map[string]int)
+ diffRack = make(map[string]int)
+ diffNode = make(map[string]int)
+ for _, replica := range replicas {
+ diffDc[replica.location.DataCenter()] += 1
+ diffRack[replica.location.Rack()] += 1
+ diffNode[replica.location.String()] += 1
+ }
+ return
+}
+
+func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
+
+ allSame := true
+ oldest := replicas[0]
+ for _, replica := range replicas {
+ if replica.info.ModifiedAtSecond < oldest.info.ModifiedAtSecond {
+ oldest = replica
+ allSame = false
+ }
+ }
+ if !allSame {
+ return oldest
+ }
+
+ // TODO what if all the replicas have the same timestamp?
+ return oldest
+}