aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_common.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_ec_common.go')
-rw-r--r--weed/shell/command_ec_common.go337
1 files changed, 337 insertions, 0 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
new file mode 100644
index 000000000..0db119d3c
--- /dev/null
+++ b/weed/shell/command_ec_common.go
@@ -0,0 +1,337 @@
+package shell
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "sort"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+)
+
+func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
+
+ copiedShardIds := []uint32{uint32(shardId)}
+
+ if applyBalancing {
+
+ // ask destination node to copy shard and the ecx file from source node, and mount it
+ copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id)
+ if err != nil {
+ return err
+ }
+
+ // unmount the to be deleted shards
+ err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
+ if err != nil {
+ return err
+ }
+
+ // ask source node to delete the shard, and maybe the ecx file
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
+ if err != nil {
+ return err
+ }
+
+ fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
+
+ }
+
+ destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
+ existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
+
+ return nil
+
+}
+
+func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
+ targetServer *EcNode, shardIdsToCopy []uint32,
+ volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
+
+ fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
+
+ err = operation.WithVolumeServerClient(targetServer.info.Id, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+
+ if targetServer.info.Id != existingLocation {
+
+ fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
+ _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ ShardIds: shardIdsToCopy,
+ CopyEcxFile: true,
+ CopyEcjFile: true,
+ CopyVifFile: true,
+ SourceDataNode: existingLocation,
+ })
+ if copyErr != nil {
+ return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
+ }
+ }
+
+ fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
+ _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ ShardIds: shardIdsToCopy,
+ })
+ if mountErr != nil {
+ return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
+ }
+
+ if targetServer.info.Id != existingLocation {
+ copiedShardIds = shardIdsToCopy
+ glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ return
+ }
+
+ return
+}
+
+func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) {
+ for _, dc := range topo.DataCenterInfos {
+ for _, rack := range dc.RackInfos {
+ for _, dn := range rack.DataNodeInfos {
+ fn(dc.Id, RackId(rack.Id), dn)
+ }
+ }
+ }
+}
+
+func sortEcNodesByFreeslotsDecending(ecNodes []*EcNode) {
+ sort.Slice(ecNodes, func(i, j int) bool {
+ return ecNodes[i].freeEcSlot > ecNodes[j].freeEcSlot
+ })
+}
+
+func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
+ sort.Slice(ecNodes, func(i, j int) bool {
+ return ecNodes[i].freeEcSlot < ecNodes[j].freeEcSlot
+ })
+}
+
+type CandidateEcNode struct {
+ ecNode *EcNode
+ shardCount int
+}
+
+// if the index node changed the freeEcSlot, need to keep every EcNode still sorted
+func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
+ for i := index - 1; i >= 0; i-- {
+ if lessThan(i+1, i) {
+ swap(data, i, i+1)
+ } else {
+ break
+ }
+ }
+ for i := index + 1; i < len(data); i++ {
+ if lessThan(i, i-1) {
+ swap(data, i, i-1)
+ } else {
+ break
+ }
+ }
+}
+
+func swap(data []*CandidateEcNode, i, j int) {
+ t := data[i]
+ data[i] = data[j]
+ data[j] = t
+}
+
+func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
+ for _, ecShardInfo := range ecShardInfos {
+ shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
+ count += shardBits.ShardIdCount()
+ }
+ return
+}
+
+func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) {
+ return int(dn.MaxVolumeCount-dn.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(dn.EcShardInfos)
+}
+
+type RackId string
+type EcNodeId string
+
+type EcNode struct {
+ info *master_pb.DataNodeInfo
+ dc string
+ rack RackId
+ freeEcSlot int
+}
+
+type EcRack struct {
+ ecNodes map[EcNodeId]*EcNode
+ freeEcSlot int
+}
+
+func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
+
+ // list all possible locations
+ var resp *master_pb.VolumeListResponse
+ err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
+ return err
+ })
+ if err != nil {
+ return nil, 0, err
+ }
+
+ // find out all volume servers with one slot left.
+ eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ if selectedDataCenter != "" && selectedDataCenter != dc {
+ return
+ }
+
+ freeEcSlots := countFreeShardSlots(dn)
+ ecNodes = append(ecNodes, &EcNode{
+ info: dn,
+ dc: dc,
+ rack: rack,
+ freeEcSlot: int(freeEcSlots),
+ })
+ totalFreeEcSlots += freeEcSlots
+ })
+
+ sortEcNodesByFreeslotsDecending(ecNodes)
+
+ return
+}
+
+func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
+
+ fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
+
+ return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ ShardIds: toBeDeletedShardIds,
+ })
+ return deleteErr
+ })
+
+}
+
+func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
+
+ fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
+
+ return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
+ VolumeId: uint32(volumeId),
+ ShardIds: toBeUnmountedhardIds,
+ })
+ return deleteErr
+ })
+}
+
+func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
+
+ fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
+
+ return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
+ VolumeId: uint32(volumeId),
+ Collection: collection,
+ ShardIds: toBeMountedhardIds,
+ })
+ return mountErr
+ })
+}
+
+func ceilDivide(total, n int) int {
+ return int(math.Ceil(float64(total) / float64(n)))
+}
+
+func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
+
+ for _, shardInfo := range ecNode.info.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ return erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ }
+ }
+
+ return 0
+}
+
+func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
+
+ foundVolume := false
+ for _, shardInfo := range ecNode.info.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ newShardBits := oldShardBits
+ for _, shardId := range shardIds {
+ newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
+ }
+ shardInfo.EcIndexBits = uint32(newShardBits)
+ ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
+ foundVolume = true
+ break
+ }
+ }
+
+ if !foundVolume {
+ var newShardBits erasure_coding.ShardBits
+ for _, shardId := range shardIds {
+ newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
+ }
+ ecNode.info.EcShardInfos = append(ecNode.info.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
+ Id: uint32(vid),
+ Collection: collection,
+ EcIndexBits: uint32(newShardBits),
+ })
+ ecNode.freeEcSlot -= len(shardIds)
+ }
+
+ return ecNode
+}
+
+func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
+
+ for _, shardInfo := range ecNode.info.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ newShardBits := oldShardBits
+ for _, shardId := range shardIds {
+ newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
+ }
+ shardInfo.EcIndexBits = uint32(newShardBits)
+ ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
+ }
+ }
+
+ return ecNode
+}
+
+func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
+ countMap := make(map[string]int)
+ for _, d := range data {
+ id, count := identifierFn(d)
+ countMap[id] += count
+ }
+ return countMap
+}
+
+func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
+ groupMap := make(map[string][]*EcNode)
+ for _, d := range data {
+ id := identifierFn(d)
+ groupMap[id] = append(groupMap[id], d)
+ }
+ return groupMap
+}