diff options
| author | Chris Lu <chris.lu@uber.com> | 2019-03-23 11:34:09 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@uber.com> | 2019-03-23 11:34:09 -0700 |
| commit | 6b70b3610530d9b3637fd4179a0f2cecc5541fdd (patch) | |
| tree | 21f9aaf5965c14b371b1aa5aaf618fcf3beeda07 /weed/shell/command_volume_fix_replication.go | |
| parent | 95e0520182eeeb57921916dc694b64ff342c93e1 (diff) | |
| download | seaweedfs-6b70b3610530d9b3637fd4179a0f2cecc5541fdd.tar.xz seaweedfs-6b70b3610530d9b3637fd4179a0f2cecc5541fdd.zip | |
weed shell: add "volume.fix.replication"
Diffstat (limited to 'weed/shell/command_volume_fix_replication.go')
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 194 |
1 files changed, 194 insertions, 0 deletions
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go new file mode 100644 index 000000000..6a76ba458 --- /dev/null +++ b/weed/shell/command_volume_fix_replication.go @@ -0,0 +1,194 @@ +package shell + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage" + "io" + "math/rand" + "sort" +) + +func init() { + commands = append(commands, &commandVolumeFixReplication{}) +} + +type commandVolumeFixReplication struct { +} + +func (c *commandVolumeFixReplication) Name() string { + return "volume.fix.replication" +} + +func (c *commandVolumeFixReplication) Help() string { + return `add replicas to volumes that are missing replicas + + -n do not take action +` +} + +func (c *commandVolumeFixReplication) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { + + takeAction := true + if len(args) > 0 && args[0] == "-n" { + takeAction = false + } + + var resp *master_pb.VolumeListResponse + ctx := context.Background() + err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return err + } + + // find all volumes that needs replication + // collect all data nodes + replicatedVolumeLocations := make(map[uint32][]location) + replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage) + var allLocations []location + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, dn := range rack.DataNodeInfos { + loc := newLocation(dc.Id, rack.Id, dn) + for _, v := range dn.VolumeInfos { + if v.ReplicaPlacement > 0 { + replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc) + replicatedVolumeInfo[v.Id] = v + } + } + allLocations = append(allLocations, loc) + } + } + } + + // find all under replicated volumes + underReplicatedVolumeLocations := make(map[uint32][]location) + for vid, locations := range replicatedVolumeLocations { + volumeInfo := replicatedVolumeInfo[vid] + replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) + if replicaPlacement.GetCopyCount() > len(locations) { + underReplicatedVolumeLocations[vid] = locations + } + } + + if len(underReplicatedVolumeLocations) == 0 { + return fmt.Errorf("no under replicated volumes") + } + + if len(allLocations) == 0 { + return fmt.Errorf("no data nodes at all") + } + + // find the most under populated data nodes + keepDataNodesSorted(allLocations) + + for vid, locations := range underReplicatedVolumeLocations { + volumeInfo := replicatedVolumeInfo[vid] + replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) + foundNewLocation := false + for _, dst := range allLocations { + // check whether data nodes satisfy the constraints + if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) { + // ask the volume server to replicate the volume + sourceNodes := underReplicatedVolumeLocations[vid] + sourceNode := sourceNodes[rand.Intn(len(sourceNodes))] + foundNewLocation = true + fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id) + + if !takeAction { + break + } + + err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, replicateErr := volumeServerClient.ReplicateVolume(ctx, &volume_server_pb.ReplicateVolumeRequest{ + VolumeId: volumeInfo.Id, + Collection: volumeInfo.Collection, + SourceDataNode: sourceNode.dataNode.Id, + }) + return replicateErr + }) + + if err != nil { + return err + } + + // adjust free volume count + dst.dataNode.FreeVolumeCount-- + keepDataNodesSorted(allLocations) + break + } + } + if !foundNewLocation { + fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations) + } + + } + + return nil +} + +func keepDataNodesSorted(dataNodes []location) { + sort.Slice(dataNodes, func(i, j int) bool { + return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount + }) +} + +func satisfyReplicaPlacement(replicaPlacement *storage.ReplicaPlacement, existingLocations []location, possibleLocation location) bool { + + existingDataCenters := make(map[string]bool) + existingRacks := make(map[string]bool) + existingDataNodes := make(map[string]bool) + for _, loc := range existingLocations { + existingDataCenters[loc.DataCenter()] = true + existingRacks[loc.Rack()] = true + existingDataNodes[loc.String()] = true + } + + if replicaPlacement.DiffDataCenterCount >= len(existingDataCenters) { + // check dc, good if different from any existing data centers + _, found := existingDataCenters[possibleLocation.DataCenter()] + return !found + } else if replicaPlacement.DiffRackCount >= len(existingRacks) { + // check rack, good if different from any existing racks + _, found := existingRacks[possibleLocation.Rack()] + return !found + } else if replicaPlacement.SameRackCount >= len(existingDataNodes) { + // check data node, good if different from any existing data nodes + _, found := existingDataNodes[possibleLocation.String()] + return !found + } + + return false +} + +type location struct { + dc string + rack string + dataNode *master_pb.DataNodeInfo +} + +func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location { + return location{ + dc: dc, + rack: rack, + dataNode: dataNode, + } +} + +func (l location) String() string { + return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id) +} + +func (l location) Rack() string { + return fmt.Sprintf("%s %s", l.dc, l.rack) +} + +func (l location) DataCenter() string { + return l.dc +} |
