From 6b70b3610530d9b3637fd4179a0f2cecc5541fdd Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 23 Mar 2019 11:34:09 -0700 Subject: weed shell: add "volume.fix.replication" --- weed/shell/command_volume_fix_replication.go | 194 +++++++++++++++++++++++++++ 1 file changed, 194 insertions(+) create mode 100644 weed/shell/command_volume_fix_replication.go (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go new file mode 100644 index 000000000..6a76ba458 --- /dev/null +++ b/weed/shell/command_volume_fix_replication.go @@ -0,0 +1,194 @@ +package shell + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage" + "io" + "math/rand" + "sort" +) + +func init() { + commands = append(commands, &commandVolumeFixReplication{}) +} + +type commandVolumeFixReplication struct { +} + +func (c *commandVolumeFixReplication) Name() string { + return "volume.fix.replication" +} + +func (c *commandVolumeFixReplication) Help() string { + return `add replicas to volumes that are missing replicas + + -n do not take action +` +} + +func (c *commandVolumeFixReplication) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { + + takeAction := true + if len(args) > 0 && args[0] == "-n" { + takeAction = false + } + + var resp *master_pb.VolumeListResponse + ctx := context.Background() + err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return err + } + + // find all volumes that needs replication + // collect all data nodes + replicatedVolumeLocations := make(map[uint32][]location) + replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage) + var allLocations []location + for _, dc := range resp.TopologyInfo.DataCenterInfos { + for _, rack := range dc.RackInfos { + for _, dn := range rack.DataNodeInfos { + loc := newLocation(dc.Id, rack.Id, dn) + for _, v := range dn.VolumeInfos { + if v.ReplicaPlacement > 0 { + replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc) + replicatedVolumeInfo[v.Id] = v + } + } + allLocations = append(allLocations, loc) + } + } + } + + // find all under replicated volumes + underReplicatedVolumeLocations := make(map[uint32][]location) + for vid, locations := range replicatedVolumeLocations { + volumeInfo := replicatedVolumeInfo[vid] + replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) + if replicaPlacement.GetCopyCount() > len(locations) { + underReplicatedVolumeLocations[vid] = locations + } + } + + if len(underReplicatedVolumeLocations) == 0 { + return fmt.Errorf("no under replicated volumes") + } + + if len(allLocations) == 0 { + return fmt.Errorf("no data nodes at all") + } + + // find the most under populated data nodes + keepDataNodesSorted(allLocations) + + for vid, locations := range underReplicatedVolumeLocations { + volumeInfo := replicatedVolumeInfo[vid] + replicaPlacement, _ := storage.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement)) + foundNewLocation := false + for _, dst := range allLocations { + // check whether data nodes satisfy the constraints + if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, locations, dst) { + // ask the volume server to replicate the volume + sourceNodes := underReplicatedVolumeLocations[vid] + sourceNode := sourceNodes[rand.Intn(len(sourceNodes))] + foundNewLocation = true + fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", volumeInfo.Id, replicaPlacement, sourceNode.dataNode.Id, dst.dataNode.Id) + + if !takeAction { + break + } + + err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, replicateErr := volumeServerClient.ReplicateVolume(ctx, &volume_server_pb.ReplicateVolumeRequest{ + VolumeId: volumeInfo.Id, + Collection: volumeInfo.Collection, + SourceDataNode: sourceNode.dataNode.Id, + }) + return replicateErr + }) + + if err != nil { + return err + } + + // adjust free volume count + dst.dataNode.FreeVolumeCount-- + keepDataNodesSorted(allLocations) + break + } + } + if !foundNewLocation { + fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", volumeInfo.Id, replicaPlacement, locations) + } + + } + + return nil +} + +func keepDataNodesSorted(dataNodes []location) { + sort.Slice(dataNodes, func(i, j int) bool { + return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount + }) +} + +func satisfyReplicaPlacement(replicaPlacement *storage.ReplicaPlacement, existingLocations []location, possibleLocation location) bool { + + existingDataCenters := make(map[string]bool) + existingRacks := make(map[string]bool) + existingDataNodes := make(map[string]bool) + for _, loc := range existingLocations { + existingDataCenters[loc.DataCenter()] = true + existingRacks[loc.Rack()] = true + existingDataNodes[loc.String()] = true + } + + if replicaPlacement.DiffDataCenterCount >= len(existingDataCenters) { + // check dc, good if different from any existing data centers + _, found := existingDataCenters[possibleLocation.DataCenter()] + return !found + } else if replicaPlacement.DiffRackCount >= len(existingRacks) { + // check rack, good if different from any existing racks + _, found := existingRacks[possibleLocation.Rack()] + return !found + } else if replicaPlacement.SameRackCount >= len(existingDataNodes) { + // check data node, good if different from any existing data nodes + _, found := existingDataNodes[possibleLocation.String()] + return !found + } + + return false +} + +type location struct { + dc string + rack string + dataNode *master_pb.DataNodeInfo +} + +func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location { + return location{ + dc: dc, + rack: rack, + dataNode: dataNode, + } +} + +func (l location) String() string { + return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id) +} + +func (l location) Rack() string { + return fmt.Sprintf("%s %s", l.dc, l.rack) +} + +func (l location) DataCenter() string { + return l.dc +} -- cgit v1.2.3 From bd1c0735e0bc3bf7a24ec948372b72a4a9652d03 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 23 Mar 2019 11:54:26 -0700 Subject: weed shell: adjust help text format --- weed/shell/command_volume_fix_replication.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 6a76ba458..2ec850140 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -26,7 +26,12 @@ func (c *commandVolumeFixReplication) Name() string { func (c *commandVolumeFixReplication) Help() string { return `add replicas to volumes that are missing replicas - -n do not take action + This command file all under-replicated volumes, and find volume servers with free slots. + If the free slots satisfy the replication requirement, the volume content is copied over and mounted. + + volume.fix.replication -n # do not take action + volume.fix.replication # actually copying the volume files and mount the volume + ` } -- cgit v1.2.3 From cd8a3b99bb095bf19b7b8fac54ec1360bc754e9b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 23 Mar 2019 12:57:35 -0700 Subject: textual changes --- weed/shell/command_volume_fix_replication.go | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 2ec850140..b8daa2d9f 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -32,6 +32,12 @@ func (c *commandVolumeFixReplication) Help() string { volume.fix.replication -n # do not take action volume.fix.replication # actually copying the volume files and mount the volume + Note: + * each time this will only add back one replica for one volume id. If there are multiple replicas + are missing, e.g. multiple volume servers are new, you may need to run this multiple times. + * do not run this too quick within seconds, since the new volume replica may take a few seconds + to register itself to the master. + ` } -- cgit v1.2.3 From 13ad5c196656c586e908defce9b6d8e717663625 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 17 Apr 2019 22:04:49 -0700 Subject: refactoring --- weed/shell/command_volume_fix_replication.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index b8daa2d9f..cf424ccc6 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -117,7 +117,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *commandEnv, } err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - _, replicateErr := volumeServerClient.ReplicateVolume(ctx, &volume_server_pb.ReplicateVolumeRequest{ + _, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ VolumeId: volumeInfo.Id, Collection: volumeInfo.Collection, SourceDataNode: sourceNode.dataNode.Id, -- cgit v1.2.3 From 6fc1f53018685e6842c68883af67128fbc0bd522 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 20 Apr 2019 11:35:20 -0700 Subject: shell: add command volume.move --- weed/shell/command_volume_fix_replication.go | 1 - 1 file changed, 1 deletion(-) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index cf424ccc6..cff4adf89 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -119,7 +119,6 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *commandEnv, err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, replicateErr := volumeServerClient.VolumeCopy(ctx, &volume_server_pb.VolumeCopyRequest{ VolumeId: volumeInfo.Id, - Collection: volumeInfo.Collection, SourceDataNode: sourceNode.dataNode.Id, }) return replicateErr -- cgit v1.2.3 From ede876cfdb0116557dd197a7951957dab6745c24 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 5 Jun 2019 01:30:24 -0700 Subject: periodic scripts exeuction from leader master --- weed/shell/command_volume_fix_replication.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index cff4adf89..09e1c19eb 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -13,7 +13,7 @@ import ( ) func init() { - commands = append(commands, &commandVolumeFixReplication{}) + Commands = append(Commands, &commandVolumeFixReplication{}) } type commandVolumeFixReplication struct { @@ -41,7 +41,7 @@ func (c *commandVolumeFixReplication) Help() string { ` } -func (c *commandVolumeFixReplication) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) { +func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { takeAction := true if len(args) > 0 && args[0] == "-n" { @@ -50,7 +50,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *commandEnv, var resp *master_pb.VolumeListResponse ctx := context.Background() - err = commandEnv.masterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{}) return err }) -- cgit v1.2.3 From 6bc3dee5b37aefc0a04603c6f00670dd6ec2d2ea Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 22 Jun 2019 10:56:54 -0700 Subject: refactoring --- weed/shell/command_volume_fix_replication.go | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 09e1c19eb..0b3b0363c 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -63,20 +63,16 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, replicatedVolumeLocations := make(map[uint32][]location) replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage) var allLocations []location - for _, dc := range resp.TopologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, dn := range rack.DataNodeInfos { - loc := newLocation(dc.Id, rack.Id, dn) - for _, v := range dn.VolumeInfos { - if v.ReplicaPlacement > 0 { - replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc) - replicatedVolumeInfo[v.Id] = v - } - } - allLocations = append(allLocations, loc) + eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + loc := newLocation(dc.Id, rack.Id, dn) + for _, v := range dn.VolumeInfos { + if v.ReplicaPlacement > 0 { + replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc) + replicatedVolumeInfo[v.Id] = v } } - } + allLocations = append(allLocations, loc) + }) // find all under replicated volumes underReplicatedVolumeLocations := make(map[uint32][]location) -- cgit v1.2.3 From cd45ab072a8dba68773f995e130798e449b1791a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 22 Jun 2019 12:30:08 -0700 Subject: fix compilation error --- weed/shell/command_volume_fix_replication.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/shell/command_volume_fix_replication.go') diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 0b3b0363c..4c7a794c0 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -64,7 +64,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, replicatedVolumeInfo := make(map[uint32]*master_pb.VolumeInformationMessage) var allLocations []location eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - loc := newLocation(dc.Id, rack.Id, dn) + loc := newLocation(dc, string(rack), dn) for _, v := range dn.VolumeInfos { if v.ReplicaPlacement > 0 { replicatedVolumeLocations[v.Id] = append(replicatedVolumeLocations[v.Id], loc) -- cgit v1.2.3