diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-06-10 21:32:56 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-06-10 21:32:56 -0700 |
| commit | f9d8bd51ad8cf4597559c3e3e327bbb3432719b7 (patch) | |
| tree | 67bed513a722b9c89bf559f9eaf9365b58d7bc1f /weed/shell/command_ec_common.go | |
| parent | 9d9162ca356932fb8c9ec0322f3a771bbbcc47f5 (diff) | |
| download | seaweedfs-f9d8bd51ad8cf4597559c3e3e327bbb3432719b7.tar.xz seaweedfs-f9d8bd51ad8cf4597559c3e3e327bbb3432719b7.zip | |
ec shard balancing
Diffstat (limited to 'weed/shell/command_ec_common.go')
| -rw-r--r-- | weed/shell/command_ec_common.go | 177 |
1 files changed, 152 insertions, 25 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 7787f4e9f..d0fe16a68 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -3,6 +3,7 @@ package shell import ( "context" "fmt" + "math" "sort" "github.com/chrislusf/seaweedfs/weed/glog" @@ -14,33 +15,36 @@ import ( "google.golang.org/grpc" ) -func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) error { +func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) { - fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) + copiedShardIds := []uint32{uint32(shardId)} - if !applyBalancing { - return nil - } + if applyBalancing { - // ask destination node to copy shard and the ecx file from source node, and mount it - copiedShardIds, err := oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id) - if err != nil { - return err - } + // ask destination node to copy shard and the ecx file from source node, and mount it + copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, uint32(shardId), 1, vid, collection, existingLocation.info.Id) + if err != nil { + return err + } - // unmount the to be deleted shards - err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds) - if err != nil { - return err - } + // unmount the to be deleted shards + err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds) + if err != nil { + return err + } + + // ask source node to delete the shard, and maybe the ecx file + err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds) + if err != nil { + return err + } + + fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id) - // ask source node to delete the shard, and maybe the ecx file - err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds) - if err != nil { - return err } - deleteEcVolumeShards(existingLocation, vid, copiedShardIds) + destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds) + existingLocation.deleteEcVolumeShards(vid, copiedShardIds) return nil @@ -98,11 +102,11 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption return } -func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc, rack string, dn *master_pb.DataNodeInfo)) { +func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) { for _, dc := range topo.DataCenterInfos { for _, rack := range dc.RackInfos { for _, dn := range rack.DataNodeInfos { - fn(dc.Id, rack.Id, dn) + fn(dc.Id, RackId(rack.Id), dn) } } } @@ -114,6 +118,35 @@ func sortEcNodes(ecNodes []*EcNode) { }) } +type CandidateEcNode struct { + ecNode *EcNode + shardCount int +} + +// if the index node changed the freeEcSlot, need to keep every EcNode still sorted +func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) { + for i := index - 1; i >= 0; i-- { + if lessThan(i+1, i) { + swap(data, i, i+1) + } else { + break + } + } + for i := index + 1; i < len(data); i++ { + if lessThan(i, i-1) { + swap(data, i, i-1) + } else { + break + } + } +} + +func swap(data []*CandidateEcNode, i, j int) { + t := data[i] + data[i] = data[j] + data[j] = t +} + func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) { for _, ecShardInfo := range ecShardInfos { shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) @@ -126,14 +159,22 @@ func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) { return int(dn.FreeVolumeCount)*10 - countShards(dn.EcShardInfos) } +type RackId string +type EcNodeId string + type EcNode struct { info *master_pb.DataNodeInfo dc string - rack string + rack RackId freeEcSlot int } -func collectEcNodes(ctx context.Context, commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { +type EcRack struct { + ecNodes map[EcNodeId]*EcNode + freeEcSlot int +} + +func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) { // list all possible locations var resp *master_pb.VolumeListResponse @@ -146,7 +187,10 @@ func collectEcNodes(ctx context.Context, commandEnv *CommandEnv) (ecNodes []*EcN } // find out all volume servers with one slot left. - eachDataNode(resp.TopologyInfo, func(dc, rack string, dn *master_pb.DataNodeInfo) { + eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + if selectedDataCenter != "" && selectedDataCenter != dc { + return + } if freeEcSlots := countFreeShardSlots(dn); freeEcSlots > 0 { ecNodes = append(ecNodes, &EcNode{ info: dn, @@ -207,3 +251,86 @@ func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption, return mountErr }) } + +func ceilDivide(total, n int) int { + return int(math.Ceil(float64(total) / float64(n))) +} + +func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits { + + for _, shardInfo := range ecNode.info.EcShardInfos { + if needle.VolumeId(shardInfo.Id) == vid { + return erasure_coding.ShardBits(shardInfo.EcIndexBits) + } + } + + return 0 +} + +func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode { + + foundVolume := false + for _, shardInfo := range ecNode.info.EcShardInfos { + if needle.VolumeId(shardInfo.Id) == vid { + oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) + newShardBits := oldShardBits + for _, shardId := range shardIds { + newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId)) + } + shardInfo.EcIndexBits = uint32(newShardBits) + ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount() + foundVolume = true + break + } + } + + if !foundVolume { + var newShardBits erasure_coding.ShardBits + for _, shardId := range shardIds { + newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId)) + } + ecNode.info.EcShardInfos = append(ecNode.info.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{ + Id: uint32(vid), + Collection: collection, + EcIndexBits: uint32(newShardBits), + }) + ecNode.freeEcSlot -= len(shardIds) + } + + return ecNode +} + +func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode { + + for _, shardInfo := range ecNode.info.EcShardInfos { + if needle.VolumeId(shardInfo.Id) == vid { + oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) + newShardBits := oldShardBits + for _, shardId := range shardIds { + newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId)) + } + shardInfo.EcIndexBits = uint32(newShardBits) + ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount() + } + } + + return ecNode +} + +func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int { + countMap := make(map[string]int) + for _, d := range data { + id, count := identifierFn(d) + countMap[id] += count + } + return countMap +} + +func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode { + groupMap := make(map[string][]*EcNode) + for _, d := range data { + id := identifierFn(d) + groupMap[id] = append(groupMap[id], d) + } + return groupMap +} |
