From 05034aade5da8525b6067dece4a738cc664dabc4 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 6 Sep 2020 12:44:02 -0700 Subject: printout over replicated locations --- weed/shell/command_volume_fix_replication.go | 2 ++ 1 file changed, 2 insertions(+) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index e17f35c67..59f4bff10 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -85,6 +85,8 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) if replicaPlacement.GetCopyCount() > len(locations) { underReplicatedVolumeLocations[vid] = locations + } else if replicaPlacement.GetCopyCount() < len(locations) { + fmt.Fprintf(writer, "volume %d replication %s, but over repliacated:%+v\n", volumeInfo.Id, replicaPlacement, locations) } } -- cgit v1.2.3 From 2b643f477dd3ff0b50afc8f8052732e95e3f014f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 6 Sep 2020 12:47:55 -0700 Subject: typo --- weed/shell/command_volume_fix_replication.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 59f4bff10..53f88cab2 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -86,7 +86,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, if replicaPlacement.GetCopyCount() > len(locations) { underReplicatedVolumeLocations[vid] = locations } else if replicaPlacement.GetCopyCount() < len(locations) { - fmt.Fprintf(writer, "volume %d replication %s, but over repliacated:%+v\n", volumeInfo.Id, replicaPlacement, locations) + fmt.Fprintf(writer, "volume %d replication %s, but over replicated:%+v\n", volumeInfo.Id, replicaPlacement, locations) } } -- cgit v1.2.3 From 1a09bc43d119b564ee2a9a950ad3a795c838d54a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 7 Sep 2020 11:31:33 -0700 Subject: refactor --- weed/shell/command_volume_fix_replication.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 53f88cab2..8f9700292 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -36,7 +36,7 @@ func (c *commandVolumeFixReplication) Help() string { Note: * each time this will only add back one replica for one volume id. If there are multiple replicas are missing, e.g. multiple volume servers are new, you may need to run this multiple times. - * do not run this too quick within seconds, since the new volume replica may take a few seconds + * do not run this too quickly within seconds, since the new volume replica may take a few seconds to register itself to the master. ` @@ -80,12 +80,14 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, // find all under replicated volumes underReplicatedVolumeLocations := make(map[uint32][]location) + overReplicatedVolumeLocations := make(map[uint32][]location) for vid, locations := range replicatedVolumeLocations { volumeInfo := replicatedVolumeInfo[vid] replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) if replicaPlacement.GetCopyCount() > len(locations) { underReplicatedVolumeLocations[vid] = locations } else if replicaPlacement.GetCopyCount() < len(locations) { + overReplicatedVolumeLocations[vid] = locations fmt.Fprintf(writer, "volume %d replication %s, but over replicated:%+v\n", volumeInfo.Id, replicaPlacement, locations) } } @@ -101,6 +103,11 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, // find the most under populated data nodes keepDataNodesSorted(allLocations) + return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeLocations, replicatedVolumeInfo, allLocations) + +} + +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeLocations map[uint32][]location, replicatedVolumeInfo map[uint32]*master_pb.VolumeInformationMessage, allLocations []location) error { for vid, locations := range underReplicatedVolumeLocations { volumeInfo := replicatedVolumeInfo[vid] replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) @@ -144,7 +151,6 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } } - return nil } -- cgit v1.2.3 From d80538a18744d20943fc42f315a14cb76b76961a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 7 Sep 2020 12:35:02 -0700 Subject: refactoring --- weed/shell/command_volume_fix_replication.go | 76 +++++++++++++++------------- 1 file changed, 40 insertions(+), 36 deletions(-) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 8f9700292..4a1d2e056 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -64,35 +64,35 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, // find all volumes that needs replication // collect all data nodes - replicatedVolumeLocations := make(map[uint32][]location) - replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage) + volumeReplicas := make(map[uint32][]*VolumeReplica) var allLocations []location eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { loc := newLocation(dc, string(rack), dn) for _, v := range dn.VolumeInfos { if v.ReplicaPlacement > 0 { - replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc) - replicatedVolumeInfo[v.Id] = v + volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ + location: &loc, + info: v, + }) } } allLocations = append(allLocations, loc) }) // find all under replicated volumes - underReplicatedVolumeLocations := make(map[uint32][]location) - overReplicatedVolumeLocations := make(map[uint32][]location) - for vid, locations := range replicatedVolumeLocations { - volumeInfo := replicatedVolumeInfo[vid] - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) - if replicaPlacement.GetCopyCount() > len(locations) { - underReplicatedVolumeLocations[vid] = locations - } else if replicaPlacement.GetCopyCount() < len(locations) { - overReplicatedVolumeLocations[vid] = locations - fmt.Fprintf(writer, "volume %d replication %s, but over replicated:%+v\n", volumeInfo.Id, replicaPlacement, locations) + var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 + for vid, replicas := range volumeReplicas { + replica := replicas[rand.Intn(len(replicas))] + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) + if replicaPlacement.GetCopyCount() > len(replicas) { + underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) + } else if replicaPlacement.GetCopyCount() < len(replicas) { + overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid) + fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas)) } } - if len(underReplicatedVolumeLocations) == 0 { + if len(underReplicatedVolumeIds) == 0 { return fmt.Errorf("no under replicated volumes") } @@ -103,23 +103,22 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, // find the most under populated data nodes keepDataNodesSorted(allLocations) - return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeLocations, replicatedVolumeInfo, allLocations) + return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations) } -func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeLocations map[uint32][]location, replicatedVolumeInfo map[uint32]*master_pb.VolumeInformationMessage, allLocations []location) error { - for vid, locations := range underReplicatedVolumeLocations { - volumeInfo := replicatedVolumeInfo[vid] - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) +func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { + for _, vid := range underReplicatedVolumeIds { + replicas := volumeReplicas[vid] + replica := replicas[rand.Intn(len(replicas))] + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) foundNewLocation := false for _, dst := range allLocations { // check whether data nodes satisfy the constraints - if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) { + if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { // ask the volume server to replicate the volume - sourceNodes := underReplicatedVolumeLocations[vid] - sourceNode := sourceNodes[rand.Intn(len(sourceNodes))] foundNewLocation = true - fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id) + fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) if !takeAction { break @@ -127,11 +126,11 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ - VolumeId: volumeInfo.Id, - SourceDataNode: sourceNode.dataNode.Id, + VolumeId: replica.info.Id, + SourceDataNode: replica.location.dataNode.Id, }) if replicateErr != nil { - return fmt.Errorf("copying from %s => %s : %v", sourceNode.dataNode.Id, dst.dataNode.Id, replicateErr) + return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr) } return nil }) @@ -147,7 +146,7 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm } } if !foundNewLocation { - fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations) + fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) } } @@ -190,11 +189,11 @@ func keepDataNodesSorted(dataNodes []location) { return false } */ -func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, existingLocations []location, possibleLocation location) bool { +func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool { existingDataNodes := make(map[string]int) - for _, loc := range existingLocations { - existingDataNodes[loc.String()] += 1 + for _, replica := range replicas { + existingDataNodes[replica.location.String()] += 1 } sameDataNodeCount := existingDataNodes[possibleLocation.String()] // avoid duplicated volume on the same data node @@ -203,8 +202,8 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, exi } existingDataCenters := make(map[string]int) - for _, loc := range existingLocations { - existingDataCenters[loc.DataCenter()] += 1 + for _, replica := range replicas { + existingDataCenters[replica.location.DataCenter()] += 1 } primaryDataCenters, _ := findTopKeys(existingDataCenters) @@ -227,11 +226,11 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, exi // now this is one of the primary dcs existingRacks := make(map[string]int) - for _, loc := range existingLocations { - if loc.DataCenter() != possibleLocation.DataCenter() { + for _, replica := range replicas { + if replica.location.DataCenter() != possibleLocation.DataCenter() { continue } - existingRacks[loc.Rack()] += 1 + existingRacks[replica.location.Rack()] += 1 } primaryRacks, _ := findTopKeys(existingRacks) sameRackCount := existingRacks[possibleLocation.Rack()] @@ -288,6 +287,11 @@ func isAmong(key string, keys []string) bool { return false } +type VolumeReplica struct { + location *location + info *master_pb.VolumeInformationMessage +} + type location struct { dc string rack string -- cgit v1.2.3 From 64a621bcc8e8acfd3eef14e0a08967c759bd84f0 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 7 Sep 2020 16:00:10 -0700 Subject: shell: volume.fix.replication also purge over replicated volumes --- weed/shell/command_volume_fix_replication.go | 114 ++++++++++++++++++++------- 1 file changed, 86 insertions(+), 28 deletions(-) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 4a1d2e056..d94e7ded8 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -3,8 +3,8 @@ package shell import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "io" - "math/rand" "sort" "github.com/chrislusf/seaweedfs/weed/operation" @@ -27,11 +27,13 @@ func (c *commandVolumeFixReplication) Name() string { func (c *commandVolumeFixReplication) Help() string { return `add replicas to volumes that are missing replicas - This command finds all under-replicated volumes, and finds volume servers with free slots. + This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop. + + This command also finds all under-replicated volumes, and finds volume servers with free slots. If the free slots satisfy the replication requirement, the volume content is copied over and mounted. volume.fix.replication -n # do not take action - volume.fix.replication # actually copying the volume files and mount the volume + volume.fix.replication # actually deleting or copying the volume files and mount the volume Note: * each time this will only add back one replica for one volume id. If there are multiple replicas @@ -69,12 +71,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { loc := newLocation(dc, string(rack), dn) for _, v := range dn.VolumeInfos { - if v.ReplicaPlacement > 0 { - volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ - location: &loc, - info: v, - }) - } + volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ + location: &loc, + info: v, + }) } allLocations = append(allLocations, loc) }) @@ -82,7 +82,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, // find all under replicated volumes var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 for vid, replicas := range volumeReplicas { - replica := replicas[rand.Intn(len(replicas))] + replica := replicas[0] replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) if replicaPlacement.GetCopyCount() > len(replicas) { underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid) @@ -92,6 +92,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } } + if len(overReplicatedVolumeIds) > 0 { + return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations) + } + if len(underReplicatedVolumeIds) == 0 { return fmt.Errorf("no under replicated volumes") } @@ -107,10 +111,31 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } +func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { + for _, vid := range overReplicatedVolumeIds { + replicas := volumeReplicas[vid] + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement)) + + replica := pickOneReplicaToDelete(replicas, replicaPlacement) + + fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id) + + if !takeAction { + break + } + + if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil { + return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err) + } + + } + return nil +} + func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { for _, vid := range underReplicatedVolumeIds { replicas := volumeReplicas[vid] - replica := replicas[rand.Intn(len(replicas))] + replica := pickOneReplicaToCopyFrom(replicas) replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) foundNewLocation := false for _, dst := range allLocations { @@ -191,20 +216,13 @@ func keepDataNodesSorted(dataNodes []location) { */ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool { - existingDataNodes := make(map[string]int) - for _, replica := range replicas { - existingDataNodes[replica.location.String()] += 1 - } - sameDataNodeCount := existingDataNodes[possibleLocation.String()] - // avoid duplicated volume on the same data node - if sameDataNodeCount > 0 { + existingDataCenters, _, existingDataNodes := countReplicas(replicas) + + if _, found := existingDataNodes[possibleLocation.String()]; found { + // avoid duplicated volume on the same data node return false } - existingDataCenters := make(map[string]int) - for _, replica := range replicas { - existingDataCenters[replica.location.DataCenter()] += 1 - } primaryDataCenters, _ := findTopKeys(existingDataCenters) // ensure data center count is within limit @@ -225,20 +243,20 @@ func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, rep } // now this is one of the primary dcs - existingRacks := make(map[string]int) + primaryDcRacks := make(map[string]int) for _, replica := range replicas { if replica.location.DataCenter() != possibleLocation.DataCenter() { continue } - existingRacks[replica.location.Rack()] += 1 + primaryDcRacks[replica.location.Rack()] += 1 } - primaryRacks, _ := findTopKeys(existingRacks) - sameRackCount := existingRacks[possibleLocation.Rack()] + primaryRacks, _ := findTopKeys(primaryDcRacks) + sameRackCount := primaryDcRacks[possibleLocation.Rack()] // ensure rack count is within limit - if _, found := existingRacks[possibleLocation.Rack()]; !found { + if _, found := primaryDcRacks[possibleLocation.Rack()]; !found { // different from existing racks - if len(existingRacks) < replicaPlacement.DiffRackCount+1 { + if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 { // lack on different racks return true } else { @@ -317,3 +335,43 @@ func (l location) Rack() string { func (l location) DataCenter() string { return l.dc } + +func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica { + mostRecent := replicas[0] + for _, replica := range replicas { + if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond { + mostRecent = replica + } + } + return mostRecent +} + +func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) { + diffDc = make(map[string]int) + diffRack = make(map[string]int) + diffNode = make(map[string]int) + for _, replica := range replicas { + diffDc[replica.location.DataCenter()] += 1 + diffRack[replica.location.Rack()] += 1 + diffNode[replica.location.String()] += 1 + } + return +} + +func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica { + + allSame := true + oldest := replicas[0] + for _, replica := range replicas { + if replica.info.ModifiedAtSecond < oldest.info.ModifiedAtSecond { + oldest = replica + allSame = false + } + } + if !allSame { + return oldest + } + + // TODO what if all the replicas have the same timestamp? + return oldest +} -- cgit v1.2.3 From d1b816212f2fd535c22ecdc0e84a38b31a3472a3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 7 Sep 2020 16:03:05 -0700 Subject: return nil if no need to do anything --- weed/shell/command_volume_fix_replication.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index d94e7ded8..735d07800 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -97,7 +97,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } if len(underReplicatedVolumeIds) == 0 { - return fmt.Errorf("no under replicated volumes") + return nil } if len(allLocations) == 0 { -- cgit v1.2.3 From 387ab6796f274151f802ccdab8756b959b5fb1cb Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 9 Sep 2020 11:21:23 -0700 Subject: filer: cross cluster synchronization --- weed/shell/command_volume_fix_replication.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 735d07800..061a58891 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -79,6 +79,10 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, allLocations = append(allLocations, loc) }) + if len(allLocations) == 0 { + return fmt.Errorf("no data nodes at all") + } + // find all under replicated volumes var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32 for vid, replicas := range volumeReplicas { @@ -100,10 +104,6 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, return nil } - if len(allLocations) == 0 { - return fmt.Errorf("no data nodes at all") - } - // find the most under populated data nodes keepDataNodesSorted(allLocations) -- cgit v1.2.3 From e60b2117c3bf36f9a5ff4a1cf1b9216124546f8b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 11 Sep 2020 00:29:25 -0700 Subject: shell: volume balance follows replica placement --- weed/shell/command_volume_fix_replication.go | 29 ++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 061a58891..b32ccaaab 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -66,18 +66,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, // find all volumes that needs replication // collect all data nodes - volumeReplicas := make(map[uint32][]*VolumeReplica) - var allLocations []location - eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - loc := newLocation(dc, string(rack), dn) - for _, v := range dn.VolumeInfos { - volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ - location: &loc, - info: v, - }) - } - allLocations = append(allLocations, loc) - }) + volumeReplicas, allLocations := collectVolumeReplicaLocations(resp) if len(allLocations) == 0 { return fmt.Errorf("no data nodes at all") @@ -111,6 +100,22 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } +func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint32][]*VolumeReplica, []location) { + volumeReplicas := make(map[uint32][]*VolumeReplica) + var allLocations []location + eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + loc := newLocation(dc, string(rack), dn) + for _, v := range dn.VolumeInfos { + volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ + location: &loc, + info: v, + }) + } + allLocations = append(allLocations, loc) + }) + return volumeReplicas, allLocations +} + func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { for _, vid := range overReplicatedVolumeIds { replicas := volumeReplicas[vid] -- cgit v1.2.3