aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-09-07 12:35:02 -0700
committerChris Lu <chris.lu@gmail.com>2020-09-07 12:35:02 -0700
commitd80538a18744d20943fc42f315a14cb76b76961a (patch)
tree2337972bdd397ff63ab8febb3ca7654abf58a80f
parent1a09bc43d119b564ee2a9a950ad3a795c838d54a (diff)
downloadseaweedfs-d80538a18744d20943fc42f315a14cb76b76961a.tar.xz
seaweedfs-d80538a18744d20943fc42f315a14cb76b76961a.zip
refactoring
-rw-r--r--weed/shell/command_volume_fix_replication.go76
1 files changed, 40 insertions, 36 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 8f9700292..4a1d2e056 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -64,35 +64,35 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
// find all volumes that needs replication
// collect all data nodes
- replicatedVolumeLocations := make(map[uint32][]location)
- replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage)
+ volumeReplicas := make(map[uint32][]*VolumeReplica)
var allLocations []location
eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
loc := newLocation(dc, string(rack), dn)
for _, v := range dn.VolumeInfos {
if v.ReplicaPlacement > 0 {
- replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc)
- replicatedVolumeInfo[v.Id] = v
+ volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
+ location: &loc,
+ info: v,
+ })
}
}
allLocations = append(allLocations, loc)
})
// find all under replicated volumes
- underReplicatedVolumeLocations := make(map[uint32][]location)
- overReplicatedVolumeLocations := make(map[uint32][]location)
- for vid, locations := range replicatedVolumeLocations {
- volumeInfo := replicatedVolumeInfo[vid]
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
- if replicaPlacement.GetCopyCount() > len(locations) {
- underReplicatedVolumeLocations[vid] = locations
- } else if replicaPlacement.GetCopyCount() < len(locations) {
- overReplicatedVolumeLocations[vid] = locations
- fmt.Fprintf(writer, "volume %d replication %s, but over replicated:%+v\n", volumeInfo.Id, replicaPlacement, locations)
+ var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
+ for vid, replicas := range volumeReplicas {
+ replica := replicas[rand.Intn(len(replicas))]
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
+ if replicaPlacement.GetCopyCount() > len(replicas) {
+ underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
+ } else if replicaPlacement.GetCopyCount() < len(replicas) {
+ overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
+ fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
}
}
- if len(underReplicatedVolumeLocations) == 0 {
+ if len(underReplicatedVolumeIds) == 0 {
return fmt.Errorf("no under replicated volumes")
}
@@ -103,23 +103,22 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
// find the most under populated data nodes
keepDataNodesSorted(allLocations)
- return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeLocations, replicatedVolumeInfo, allLocations)
+ return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations)
}
-func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeLocations map[uint32][]location, replicatedVolumeInfo map[uint32]*master_pb.VolumeInformationMessage, allLocations []location) error {
- for vid, locations := range underReplicatedVolumeLocations {
- volumeInfo := replicatedVolumeInfo[vid]
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
+func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
+ for _, vid := range underReplicatedVolumeIds {
+ replicas := volumeReplicas[vid]
+ replica := replicas[rand.Intn(len(replicas))]
+ replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
foundNewLocation := false
for _, dst := range allLocations {
// check whether data nodes satisfy the constraints
- if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) {
+ if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
// ask the volume server to replicate the volume
- sourceNodes := underReplicatedVolumeLocations[vid]
- sourceNode := sourceNodes[rand.Intn(len(sourceNodes))]
foundNewLocation = true
- fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id)
+ fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
if !takeAction {
break
@@ -127,11 +126,11 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
- VolumeId: volumeInfo.Id,
- SourceDataNode: sourceNode.dataNode.Id,
+ VolumeId: replica.info.Id,
+ SourceDataNode: replica.location.dataNode.Id,
})
if replicateErr != nil {
- return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr)
+ return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
}
return nil
})
@@ -147,7 +146,7 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
}
}
if !foundNewLocation {
- fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations)
+ fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
}
}
@@ -190,11 +189,11 @@ func keepDataNodesSorted(dataNodes []location) {
return false
}
*/
-func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
+func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
existingDataNodes := make(map[string]int)
- for _, loc := range existingLocations {
- existingDataNodes[loc.String()] += 1
+ for _, replica := range replicas {
+ existingDataNodes[replica.location.String()] += 1
}
sameDataNodeCount := existingDataNodes[possibleLocation.String()]
// avoid duplicated volume on the same data node
@@ -203,8 +202,8 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, exi
}
existingDataCenters := make(map[string]int)
- for _, loc := range existingLocations {
- existingDataCenters[loc.DataCenter()] += 1
+ for _, replica := range replicas {
+ existingDataCenters[replica.location.DataCenter()] += 1
}
primaryDataCenters, _ := findTopKeys(existingDataCenters)
@@ -227,11 +226,11 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, exi
// now this is one of the primary dcs
existingRacks := make(map[string]int)
- for _, loc := range existingLocations {
- if loc.DataCenter() != possibleLocation.DataCenter() {
+ for _, replica := range replicas {
+ if replica.location.DataCenter() != possibleLocation.DataCenter() {
continue
}
- existingRacks[loc.Rack()] += 1
+ existingRacks[replica.location.Rack()] += 1
}
primaryRacks, _ := findTopKeys(existingRacks)
sameRackCount := existingRacks[possibleLocation.Rack()]
@@ -288,6 +287,11 @@ func isAmong(key string, keys []string) bool {
return false
}
+type VolumeReplica struct {
+ location *location
+ info *master_pb.VolumeInformationMessage
+}
+
type location struct {
dc string
rack string