aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_volume_fix_replication.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@uber.com>2019-03-23 11:34:09 -0700
committerChris Lu <chris.lu@uber.com>2019-03-23 11:34:09 -0700
commit6b70b3610530d9b3637fd4179a0f2cecc5541fdd (patch)
tree21f9aaf5965c14b371b1aa5aaf618fcf3beeda07 /weed/shell/command_volume_fix_replication.go
parent95e0520182eeeb57921916dc694b64ff342c93e1 (diff)
downloadseaweedfs-6b70b3610530d9b3637fd4179a0f2cecc5541fdd.tar.xz
seaweedfs-6b70b3610530d9b3637fd4179a0f2cecc5541fdd.zip
weed shell: add "volume.fix.replication"
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
-rw-r--r--weed/shell/command_volume_fix_replication.go194
1 files changed, 194 insertions, 0 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
new file mode 100644
index 000000000..6a76ba458
--- /dev/null
+++ b/weed/shell/command_volume_fix_replication.go
@@ -0,0 +1,194 @@
+package shell
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "io"
+ "math/rand"
+ "sort"
+)
+
+func init() {
+ commands = append(commands, &commandVolumeFixReplication{})
+}
+
+type commandVolumeFixReplication struct {
+}
+
+func (c *commandVolumeFixReplication) Name() string {
+ return "volume.fix.replication"
+}
+
+func (c *commandVolumeFixReplication) Help() string {
+ return `add replicas to volumes that are missing replicas
+
+ -n do not take action
+`
+}
+
+func (c *commandVolumeFixReplication) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+
+ takeAction := true
+ if len(args) > 0 && args[0] == "-n" {
+ takeAction = false
+ }
+
+ var resp *master_pb.VolumeListResponse
+ ctx := context.Background()
+ err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return err
+ }
+
+ // find all volumes that needs replication
+ // collect all data nodes
+ replicatedVolumeLocations := make(map[uint32][]location)
+ replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage)
+ var allLocations []location
+ for _, dc := range resp.TopologyInfo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, dn := range rack.DataNodeInfos {
+ loc := newLocation(dc.Id, rack.Id, dn)
+ for _, v := range dn.VolumeInfos {
+ if v.ReplicaPlacement > 0 {
+ replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc)
+ replicatedVolumeInfo[v.Id] = v
+ }
+ }
+ allLocations = append(allLocations, loc)
+ }
+ }
+ }
+
+ // find all under replicated volumes
+ underReplicatedVolumeLocations := make(map[uint32][]location)
+ for vid, locations := range replicatedVolumeLocations {
+ volumeInfo := replicatedVolumeInfo[vid]
+ replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
+ if replicaPlacement.GetCopyCount() > len(locations) {
+ underReplicatedVolumeLocations[vid] = locations
+ }
+ }
+
+ if len(underReplicatedVolumeLocations) == 0 {
+ return fmt.Errorf("no under replicated volumes")
+ }
+
+ if len(allLocations) == 0 {
+ return fmt.Errorf("no data nodes at all")
+ }
+
+ // find the most under populated data nodes
+ keepDataNodesSorted(allLocations)
+
+ for vid, locations := range underReplicatedVolumeLocations {
+ volumeInfo := replicatedVolumeInfo[vid]
+ replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
+ foundNewLocation := false
+ for _, dst := range allLocations {
+ // check whether data nodes satisfy the constraints
+ if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) {
+ // ask the volume server to replicate the volume
+ sourceNodes := underReplicatedVolumeLocations[vid]
+ sourceNode := sourceNodes[rand.Intn(len(sourceNodes))]
+ foundNewLocation = true
+ fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id)
+
+ if !takeAction {
+ break
+ }
+
+ err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, replicateErr := volumeServerClient.ReplicateVolume(ctx, &volume_server_pb.ReplicateVolumeRequest{
+ VolumeId: volumeInfo.Id,
+ Collection: volumeInfo.Collection,
+ SourceDataNode: sourceNode.dataNode.Id,
+ })
+ return replicateErr
+ })
+
+ if err != nil {
+ return err
+ }
+
+ // adjust free volume count
+ dst.dataNode.FreeVolumeCount--
+ keepDataNodesSorted(allLocations)
+ break
+ }
+ }
+ if !foundNewLocation {
+ fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations)
+ }
+
+ }
+
+ return nil
+}
+
+func keepDataNodesSorted(dataNodes []location) {
+ sort.Slice(dataNodes, func(i, j int) bool {
+ return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount
+ })
+}
+
+func satisfyReplicaPlacement(replicaPlacement *storage.ReplicaPlacement, existingLocations []location, possibleLocation location) bool {
+
+ existingDataCenters := make(map[string]bool)
+ existingRacks := make(map[string]bool)
+ existingDataNodes := make(map[string]bool)
+ for _, loc := range existingLocations {
+ existingDataCenters[loc.DataCenter()] = true
+ existingRacks[loc.Rack()] = true
+ existingDataNodes[loc.String()] = true
+ }
+
+ if replicaPlacement.DiffDataCenterCount >= len(existingDataCenters) {
+ // check dc, good if different from any existing data centers
+ _, found := existingDataCenters[possibleLocation.DataCenter()]
+ return !found
+ } else if replicaPlacement.DiffRackCount >= len(existingRacks) {
+ // check rack, good if different from any existing racks
+ _, found := existingRacks[possibleLocation.Rack()]
+ return !found
+ } else if replicaPlacement.SameRackCount >= len(existingDataNodes) {
+ // check data node, good if different from any existing data nodes
+ _, found := existingDataNodes[possibleLocation.String()]
+ return !found
+ }
+
+ return false
+}
+
+type location struct {
+ dc string
+ rack string
+ dataNode *master_pb.DataNodeInfo
+}
+
+func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
+ return location{
+ dc: dc,
+ rack: rack,
+ dataNode: dataNode,
+ }
+}
+
+func (l location) String() string {
+ return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
+}
+
+func (l location) Rack() string {
+ return fmt.Sprintf("%s %s", l.dc, l.rack)
+}
+
+func (l location) DataCenter() string {
+ return l.dc
+}