aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_volume_fix_replication.go
diff options
context:
space:
mode:
authorshibinbin <shibinbin@megvii.com>2020-06-04 17:24:18 +0800
committershibinbin <shibinbin@megvii.com>2020-06-04 17:24:18 +0800
commit40334bc28d3fa694ce59b4e65077efb845264d20 (patch)
treea085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/shell/command_volume_fix_replication.go
parentd892cad15d748327c2b7c649f6398ff35d8dce0b (diff)
parentfbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff)
downloadseaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz
seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
-rw-r--r--weed/shell/command_volume_fix_replication.go151
1 files changed, 127 insertions, 24 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 7a1a77cbe..19da89b67 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -44,15 +44,18 @@ func (c *commandVolumeFixReplication) Help() string {
func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
takeAction := true
if len(args) > 0 && args[0] == "-n" {
takeAction = false
}
var resp *master_pb.VolumeListResponse
- ctx := context.Background()
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
@@ -113,12 +116,12 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
break
}
- err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{
+ err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
VolumeId: volumeInfo.Id,
SourceDataNode: sourceNode.dataNode.Id,
})
- return replicateErr
+ return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr)
})
if err != nil {
@@ -146,31 +149,131 @@ func keepDataNodesSorted(dataNodes []location) {
})
}
+/*
+ if on an existing data node {
+ return false
+ }
+ if different from existing dcs {
+ if lack on different dcs {
+ return true
+ }else{
+ return false
+ }
+ }
+ if not on primary dc {
+ return false
+ }
+ if different from existing racks {
+ if lack on different racks {
+ return true
+ }else{
+ return false
+ }
+ }
+ if not on primary rack {
+ return false
+ }
+ if lacks on same rack {
+ return true
+ } else {
+ return false
+ }
+*/
func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
- existingDataCenters := make(map[string]bool)
- existingRacks := make(map[string]bool)
- existingDataNodes := make(map[string]bool)
+ existingDataNodes := make(map[string]int)
for _, loc := range existingLocations {
- existingDataCenters[loc.DataCenter()] = true
- existingRacks[loc.Rack()] = true
- existingDataNodes[loc.String()] = true
+ existingDataNodes[loc.String()] += 1
+ }
+ sameDataNodeCount := existingDataNodes[possibleLocation.String()]
+ // avoid duplicated volume on the same data node
+ if sameDataNodeCount > 0 {
+ return false
}
- if replicaPlacement.DiffDataCenterCount >= len(existingDataCenters) {
- // check dc, good if different from any existing data centers
- _, found := existingDataCenters[possibleLocation.DataCenter()]
- return !found
- } else if replicaPlacement.DiffRackCount >= len(existingRacks) {
- // check rack, good if different from any existing racks
- _, found := existingRacks[possibleLocation.Rack()]
- return !found
- } else if replicaPlacement.SameRackCount >= len(existingDataNodes) {
- // check data node, good if different from any existing data nodes
- _, found := existingDataNodes[possibleLocation.String()]
- return !found
+ existingDataCenters := make(map[string]int)
+ for _, loc := range existingLocations {
+ existingDataCenters[loc.DataCenter()] += 1
+ }
+ primaryDataCenters, _ := findTopKeys(existingDataCenters)
+
+ // ensure data center count is within limit
+ if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found {
+ // different from existing dcs
+ if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 {
+ // lack on different dcs
+ return true
+ } else {
+ // adding this would go over the different dcs limit
+ return false
+ }
+ }
+ // now this is same as one of the existing data center
+ if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) {
+ // not on one of the primary dcs
+ return false
}
+ // now this is one of the primary dcs
+ existingRacks := make(map[string]int)
+ for _, loc := range existingLocations {
+ if loc.DataCenter() != possibleLocation.DataCenter() {
+ continue
+ }
+ existingRacks[loc.Rack()] += 1
+ }
+ primaryRacks, _ := findTopKeys(existingRacks)
+ sameRackCount := existingRacks[possibleLocation.Rack()]
+
+ // ensure rack count is within limit
+ if _, found := existingRacks[possibleLocation.Rack()]; !found {
+ // different from existing racks
+ if len(existingRacks) < replicaPlacement.DiffRackCount+1 {
+ // lack on different racks
+ return true
+ } else {
+ // adding this would go over the different racks limit
+ return false
+ }
+ }
+ // now this is same as one of the existing racks
+ if !isAmong(possibleLocation.Rack(), primaryRacks) {
+ // not on the primary rack
+ return false
+ }
+
+ // now this is on the primary rack
+
+ // different from existing data nodes
+ if sameRackCount < replicaPlacement.SameRackCount+1 {
+ // lack on same rack
+ return true
+ } else {
+ // adding this would go over the same data node limit
+ return false
+ }
+
+}
+
+func findTopKeys(m map[string]int) (topKeys []string, max int) {
+ for k, c := range m {
+ if max < c {
+ topKeys = topKeys[:0]
+ topKeys = append(topKeys, k)
+ max = c
+ } else if max == c {
+ topKeys = append(topKeys, k)
+ }
+ }
+ return
+}
+
+func isAmong(key string, keys []string) bool {
+ for _, k := range keys {
+ if k == key {
+ return true
+ }
+ }
return false
}