aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_common.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell/command_ec_common.go')
-rw-r--r--weed/shell/command_ec_common.go177
1 files changed, 152 insertions, 25 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 7787f4e9f..d0fe16a68 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -3,6 +3,7 @@ package shell
import (
"context"
"fmt"
+ "math"
"sort"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -14,33 +15,36 @@ import (
"google.golang.org/grpc"
)
-func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error {
+func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
- fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
+ copiedShardIds := []uint32{uint32(shardId)}
- if !applyBalancing {
- return nil
- }
+ if applyBalancing {
- // ask destination node to copy shard and the ecx file from source node, and mount it
- copiedShardIds, err := oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id)
- if err != nil {
- return err
- }
+ // ask destination node to copy shard and the ecx file from source node, and mount it
+ copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id)
+ if err != nil {
+ return err
+ }
- // unmount the to be deleted shards
- err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
- if err != nil {
- return err
- }
+ // unmount the to be deleted shards
+ err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
+ if err != nil {
+ return err
+ }
+
+ // ask source node to delete the shard, and maybe the ecx file
+ err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
+ if err != nil {
+ return err
+ }
+
+ fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
- // ask source node to delete the shard, and maybe the ecx file
- err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
- if err != nil {
- return err
}
- deleteEcVolumeShards(existingLocation, vid, copiedShardIds)
+ destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
+ existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
return nil
@@ -98,11 +102,11 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption
return
}
-func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc, rack string, dn *master_pb.DataNodeInfo)) {
+func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) {
for _, dc := range topo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, dn := range rack.DataNodeInfos {
- fn(dc.Id, rack.Id, dn)
+ fn(dc.Id, RackId(rack.Id), dn)
}
}
}
@@ -114,6 +118,35 @@ func sortEcNodes(ecNodes []*EcNode) {
})
}
+type CandidateEcNode struct {
+ ecNode *EcNode
+ shardCount int
+}
+
+// if the index node changed the freeEcSlot, need to keep every EcNode still sorted
+func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
+ for i := index - 1; i >= 0; i-- {
+ if lessThan(i+1, i) {
+ swap(data, i, i+1)
+ } else {
+ break
+ }
+ }
+ for i := index + 1; i < len(data); i++ {
+ if lessThan(i, i-1) {
+ swap(data, i, i-1)
+ } else {
+ break
+ }
+ }
+}
+
+func swap(data []*CandidateEcNode, i, j int) {
+ t := data[i]
+ data[i] = data[j]
+ data[j] = t
+}
+
func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
for _, ecShardInfo := range ecShardInfos {
shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
@@ -126,14 +159,22 @@ func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) {
return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos)
}
+type RackId string
+type EcNodeId string
+
type EcNode struct {
info *master_pb.DataNodeInfo
dc string
- rack string
+ rack RackId
freeEcSlot int
}
-func collectEcNodes(ctx context.Context, commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
+type EcRack struct {
+ ecNodes map[EcNodeId]*EcNode
+ freeEcSlot int
+}
+
+func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations
var resp *master_pb.VolumeListResponse
@@ -146,7 +187,10 @@ func collectEcNodes(ctx context.Context, commandEnv *CommandEnv) (ecNodes []*EcN
}
// find out all volume servers with one slot left.
- eachDataNode(resp.TopologyInfo, func(dc, rack string, dn *master_pb.DataNodeInfo) {
+ eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ if selectedDataCenter != "" && selectedDataCenter != dc {
+ return
+ }
if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 {
ecNodes = append(ecNodes, &EcNode{
info: dn,
@@ -207,3 +251,86 @@ func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
return mountErr
})
}
+
+func ceilDivide(total, n int) int {
+ return int(math.Ceil(float64(total) / float64(n)))
+}
+
+func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
+
+ for _, shardInfo := range ecNode.info.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ return erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ }
+ }
+
+ return 0
+}
+
+func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
+
+ foundVolume := false
+ for _, shardInfo := range ecNode.info.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ newShardBits := oldShardBits
+ for _, shardId := range shardIds {
+ newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
+ }
+ shardInfo.EcIndexBits = uint32(newShardBits)
+ ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
+ foundVolume = true
+ break
+ }
+ }
+
+ if !foundVolume {
+ var newShardBits erasure_coding.ShardBits
+ for _, shardId := range shardIds {
+ newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
+ }
+ ecNode.info.EcShardInfos = append(ecNode.info.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
+ Id: uint32(vid),
+ Collection: collection,
+ EcIndexBits: uint32(newShardBits),
+ })
+ ecNode.freeEcSlot -= len(shardIds)
+ }
+
+ return ecNode
+}
+
+func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
+
+ for _, shardInfo := range ecNode.info.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ newShardBits := oldShardBits
+ for _, shardId := range shardIds {
+ newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
+ }
+ shardInfo.EcIndexBits = uint32(newShardBits)
+ ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
+ }
+ }
+
+ return ecNode
+}
+
+func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
+ countMap := make(map[string]int)
+ for _, d := range data {
+ id, count := identifierFn(d)
+ countMap[id] += count
+ }
+ return countMap
+}
+
+func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
+ groupMap := make(map[string][]*EcNode)
+ for _, d := range data {
+ id := identifierFn(d)
+ groupMap[id] = append(groupMap[id], d)
+ }
+ return groupMap
+}