aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_common.go
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
committerbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
commitd861cbd81b75b6684c971ac00e33685e6575b833 (patch)
tree301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/shell/command_ec_common.go
parent70da715d8d917527291b35fb069fac077d17b868 (diff)
parent4ee58922eff61a5a4ca29c0b4829b097a498549e (diff)
downloadseaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz
seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/shell/command_ec_common.go')
-rw-r--r--weed/shell/command_ec_common.go140
1 files changed, 88 insertions, 52 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 2beed4742..fd35bb14b 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -3,6 +3,7 @@ package shell
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"math"
"sort"
@@ -15,26 +16,26 @@ import (
"google.golang.org/grpc"
)
-func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
+func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
copiedShardIds := []uint32{uint32(shardId)}
if applyBalancing {
// ask destination node to copy shard and the ecx file from source node, and mount it
- copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id)
+ copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingLocation.info.Id)
if err != nil {
return err
}
// unmount the to be deleted shards
- err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
+ err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingLocation.info.Id, copiedShardIds)
if err != nil {
return err
}
// ask source node to delete the shard, and maybe the ecx file
- err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
+ err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingLocation.info.Id, copiedShardIds)
if err != nil {
return err
}
@@ -50,7 +51,7 @@ func moveMountedShardToEcNode(ctx context.Context, commandEnv *CommandEnv, exist
}
-func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
+func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
targetServer *EcNode, shardIdsToCopy []uint32,
volumeId needle.VolumeId, collection string, existingLocation string) (copiedShardIds []uint32, err error) {
@@ -61,7 +62,7 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption
if targetServer.info.Id != existingLocation {
fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
- _, copyErr := volumeServerClient.VolumeEcShardsCopy(ctx, &volume_server_pb.VolumeEcShardsCopyRequest{
+ _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: shardIdsToCopy,
@@ -76,7 +77,7 @@ func oneServerCopyAndMountEcShardsFromSource(ctx context.Context, grpcDialOption
}
fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
- _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
+ _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: shardIdsToCopy,
@@ -159,8 +160,15 @@ func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (cou
return
}
-func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) {
- return int(dn.MaxVolumeCount-dn.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(dn.EcShardInfos)
+func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
+ if dn.DiskInfos == nil {
+ return 0
+ }
+ diskInfo := dn.DiskInfos[string(diskType)]
+ if diskInfo == nil {
+ return 0
+ }
+ return int(diskInfo.MaxVolumeCount-diskInfo.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
}
type RackId string
@@ -173,30 +181,47 @@ type EcNode struct {
freeEcSlot int
}
+func (ecNode *EcNode) localShardIdCount(vid uint32) int {
+ for _, diskInfo := range ecNode.info.DiskInfos {
+ for _, ecShardInfo := range diskInfo.EcShardInfos {
+ if vid == ecShardInfo.Id {
+ shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
+ return shardBits.ShardIdCount()
+ }
+ }
+ }
+ return 0
+}
+
type EcRack struct {
ecNodes map[EcNodeId]*EcNode
freeEcSlot int
}
-func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
+func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations
- var resp *master_pb.VolumeListResponse
- err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
- return err
- })
+ // collect topology information
+ topologyInfo, _, err := collectTopologyInfo(commandEnv)
if err != nil {
- return nil, 0, err
+ return
}
// find out all volume servers with one slot left.
- eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
+ ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
+
+ sortEcNodesByFreeslotsDecending(ecNodes)
+
+ return
+}
+
+func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
+ eachDataNode(topo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
if selectedDataCenter != "" && selectedDataCenter != dc {
return
}
- freeEcSlots := countFreeShardSlots(dn)
+ freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
ecNodes = append(ecNodes, &EcNode{
info: dn,
dc: dc,
@@ -205,19 +230,15 @@ func collectEcNodes(ctx context.Context, commandEnv *CommandEnv, selectedDataCen
})
totalFreeEcSlots += freeEcSlots
})
-
- sortEcNodesByFreeslotsDecending(ecNodes)
-
return
}
-func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
- collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
+func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeDeletedShardIds []uint32) error {
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, deleteErr := volumeServerClient.VolumeEcShardsDelete(ctx, &volume_server_pb.VolumeEcShardsDeleteRequest{
+ _, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: toBeDeletedShardIds,
@@ -227,13 +248,12 @@ func sourceServerDeleteEcShards(ctx context.Context, grpcDialOption grpc.DialOpt
}
-func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
- volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
+func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation string, toBeUnmountedhardIds []uint32) error {
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(ctx, &volume_server_pb.VolumeEcShardsUnmountRequest{
+ _, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
VolumeId: uint32(volumeId),
ShardIds: toBeUnmountedhardIds,
})
@@ -241,13 +261,12 @@ func unmountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
})
}
-func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
- collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
+func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation string, toBeMountedhardIds []uint32) error {
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
return operation.WithVolumeServerClient(sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- _, mountErr := volumeServerClient.VolumeEcShardsMount(ctx, &volume_server_pb.VolumeEcShardsMountRequest{
+ _, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: toBeMountedhardIds,
@@ -256,15 +275,21 @@ func mountEcShards(ctx context.Context, grpcDialOption grpc.DialOption,
})
}
+func divide(total, n int) float64 {
+ return float64(total) / float64(n)
+}
+
func ceilDivide(total, n int) int {
return int(math.Ceil(float64(total) / float64(n)))
}
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
- for _, shardInfo := range ecNode.info.EcShardInfos {
- if needle.VolumeId(shardInfo.Id) == vid {
- return erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
+ for _, shardInfo := range diskInfo.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ return erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ }
}
}
@@ -274,18 +299,26 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
foundVolume := false
- for _, shardInfo := range ecNode.info.EcShardInfos {
- if needle.VolumeId(shardInfo.Id) == vid {
- oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
- newShardBits := oldShardBits
- for _, shardId := range shardIds {
- newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
+ diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
+ if found {
+ for _, shardInfo := range diskInfo.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ newShardBits := oldShardBits
+ for _, shardId := range shardIds {
+ newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
+ }
+ shardInfo.EcIndexBits = uint32(newShardBits)
+ ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
+ foundVolume = true
+ break
}
- shardInfo.EcIndexBits = uint32(newShardBits)
- ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
- foundVolume = true
- break
}
+ } else {
+ diskInfo = &master_pb.DiskInfo{
+ Type: string(types.HardDriveType),
+ }
+ ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
}
if !foundVolume {
@@ -293,10 +326,11 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
for _, shardId := range shardIds {
newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
}
- ecNode.info.EcShardInfos = append(ecNode.info.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
+ diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
Id: uint32(vid),
Collection: collection,
EcIndexBits: uint32(newShardBits),
+ DiskType: string(types.HardDriveType),
})
ecNode.freeEcSlot -= len(shardIds)
}
@@ -306,15 +340,17 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
- for _, shardInfo := range ecNode.info.EcShardInfos {
- if needle.VolumeId(shardInfo.Id) == vid {
- oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
- newShardBits := oldShardBits
- for _, shardId := range shardIds {
- newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
+ if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
+ for _, shardInfo := range diskInfo.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ newShardBits := oldShardBits
+ for _, shardId := range shardIds {
+ newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
+ }
+ shardInfo.EcIndexBits = uint32(newShardBits)
+ ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
}
- shardInfo.EcIndexBits = uint32(newShardBits)
- ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
}
}