diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-02-16 11:06:13 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-02-16 11:06:13 -0800 |
| commit | 632c94d4389dbccd1e1e765036b3c9785d44e5d5 (patch) | |
| tree | 65a29307f4079f6b539e97c405d65809d898e170 /weed/shell | |
| parent | b9b5b932c5892f14da6b489a6907ffd8649a3eb5 (diff) | |
| parent | 0611233f16ff0e8f4b99aa6528f47153efd9a846 (diff) | |
| download | seaweedfs-632c94d4389dbccd1e1e765036b3c9785d44e5d5.tar.xz seaweedfs-632c94d4389dbccd1e1e765036b3c9785d44e5d5.zip | |
Merge branch 'extend_to_disk_type'
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_ec_balance.go | 57 | ||||
| -rw-r--r-- | weed/shell/command_ec_common.go | 81 | ||||
| -rw-r--r-- | weed/shell/command_ec_decode.go | 25 | ||||
| -rw-r--r-- | weed/shell/command_ec_encode.go | 10 | ||||
| -rw-r--r-- | weed/shell/command_ec_rebuild.go | 30 | ||||
| -rw-r--r-- | weed/shell/command_ec_test.go | 1 | ||||
| -rw-r--r-- | weed/shell/command_volume_balance.go | 51 | ||||
| -rw-r--r-- | weed/shell/command_volume_configure_replication.go | 10 | ||||
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 29 | ||||
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 24 | ||||
| -rw-r--r-- | weed/shell/command_volume_list.go | 47 | ||||
| -rw-r--r-- | weed/shell/command_volume_server_evacuate.go | 50 | ||||
| -rw-r--r-- | weed/shell/command_volume_tier_download.go | 8 | ||||
| -rw-r--r-- | weed/shell/command_volume_tier_move.go | 108 |
14 files changed, 378 insertions, 153 deletions
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 7117f52df..b1ca926d5 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -3,6 +3,7 @@ package shell import ( "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "sort" @@ -325,7 +326,9 @@ func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra var possibleDestinationEcNodes []*EcNode for _, n := range racks[RackId(rackId)].ecNodes { - possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) + if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found { + possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) + } } sourceEcNodes := rackEcNodesWithVid[rackId] averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes)) @@ -386,11 +389,15 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool rackEcNodes = append(rackEcNodes, node) } - ecNodeIdToShardCount := groupByCount(rackEcNodes, func(node *EcNode) (id string, count int) { - for _, ecShardInfo := range node.info.EcShardInfos { + ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) { + diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + if !found { + return + } + for _, ecShardInfo := range diskInfo.EcShardInfos { count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount() } - return node.info.Id, count + return ecNode.info.Id, count }) var totalShardCount int @@ -411,26 +418,30 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount { emptyNodeIds := make(map[uint32]bool) - for _, shards := range emptyNode.info.EcShardInfos { - emptyNodeIds[shards.Id] = true + if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found { + for _, shards := range emptyDiskInfo.EcShardInfos { + emptyNodeIds[shards.Id] = true + } } - for _, shards := range fullNode.info.EcShardInfos { - if _, found := emptyNodeIds[shards.Id]; !found { - for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() { - - fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) - - err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing) - if err != nil { - return err + if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found { + for _, shards := range fullDiskInfo.EcShardInfos { + if _, found := emptyNodeIds[shards.Id]; !found { + for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() { + + fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) + + err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing) + if err != nil { + return err + } + + ecNodeIdToShardCount[emptyNode.info.Id]++ + ecNodeIdToShardCount[fullNode.info.Id]-- + hasMove = true + break } - - ecNodeIdToShardCount[emptyNode.info.Id]++ - ecNodeIdToShardCount[fullNode.info.Id]-- - hasMove = true break } - break } } } @@ -511,7 +522,11 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[ func collectVolumeIdToEcNodes(allEcNodes []*EcNode) map[needle.VolumeId][]*EcNode { vidLocations := make(map[needle.VolumeId][]*EcNode) for _, ecNode := range allEcNodes { - for _, shardInfo := range ecNode.info.EcShardInfos { + diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + if !found { + continue + } + for _, shardInfo := range diskInfo.EcShardInfos { vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode) } } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index a808335eb..87a138ab6 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -3,6 +3,7 @@ package shell import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "math" "sort" @@ -159,8 +160,15 @@ func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (cou return } -func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) { - return int(dn.MaxVolumeCount-dn.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(dn.EcShardInfos) +func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) { + if dn.DiskInfos == nil { + return 0 + } + diskInfo := dn.DiskInfos[string(diskType)] + if diskInfo == nil { + return 0 + } + return int(diskInfo.MaxVolumeCount-diskInfo.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos) } type RackId string @@ -174,10 +182,12 @@ type EcNode struct { } func (ecNode *EcNode) localShardIdCount(vid uint32) int { - for _, ecShardInfo := range ecNode.info.EcShardInfos { - if vid == ecShardInfo.Id { - shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) - return shardBits.ShardIdCount() + for _, diskInfo := range ecNode.info.DiskInfos { + for _, ecShardInfo := range diskInfo.EcShardInfos { + if vid == ecShardInfo.Id { + shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + return shardBits.ShardIdCount() + } } } return 0 @@ -214,7 +224,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter return } - freeEcSlots := countFreeShardSlots(dn) + freeEcSlots := countFreeShardSlots(dn, types.HardDriveType) ecNodes = append(ecNodes, &EcNode{ info: dn, dc: dc, @@ -278,9 +288,11 @@ func ceilDivide(total, n int) int { func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits { - for _, shardInfo := range ecNode.info.EcShardInfos { - if needle.VolumeId(shardInfo.Id) == vid { - return erasure_coding.ShardBits(shardInfo.EcIndexBits) + if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found { + for _, shardInfo := range diskInfo.EcShardInfos { + if needle.VolumeId(shardInfo.Id) == vid { + return erasure_coding.ShardBits(shardInfo.EcIndexBits) + } } } @@ -290,18 +302,26 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode { foundVolume := false - for _, shardInfo := range ecNode.info.EcShardInfos { - if needle.VolumeId(shardInfo.Id) == vid { - oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) - newShardBits := oldShardBits - for _, shardId := range shardIds { - newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId)) + diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + if found { + for _, shardInfo := range diskInfo.EcShardInfos { + if needle.VolumeId(shardInfo.Id) == vid { + oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) + newShardBits := oldShardBits + for _, shardId := range shardIds { + newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId)) + } + shardInfo.EcIndexBits = uint32(newShardBits) + ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount() + foundVolume = true + break } - shardInfo.EcIndexBits = uint32(newShardBits) - ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount() - foundVolume = true - break } + } else { + diskInfo = &master_pb.DiskInfo{ + Type: string(types.HardDriveType), + } + ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo } if !foundVolume { @@ -309,10 +329,11 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, for _, shardId := range shardIds { newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId)) } - ecNode.info.EcShardInfos = append(ecNode.info.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{ + diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{ Id: uint32(vid), Collection: collection, EcIndexBits: uint32(newShardBits), + DiskType: string(types.HardDriveType), }) ecNode.freeEcSlot -= len(shardIds) } @@ -322,15 +343,17 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode { - for _, shardInfo := range ecNode.info.EcShardInfos { - if needle.VolumeId(shardInfo.Id) == vid { - oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) - newShardBits := oldShardBits - for _, shardId := range shardIds { - newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId)) + if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found { + for _, shardInfo := range diskInfo.EcShardInfos { + if needle.VolumeId(shardInfo.Id) == vid { + oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) + newShardBits := oldShardBits + for _, shardId := range shardIds { + newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId)) + } + shardInfo.EcIndexBits = uint32(newShardBits) + ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount() } - shardInfo.EcIndexBits = uint32(newShardBits) - ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount() } } diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index da7c8844f..3e1499d41 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "google.golang.org/grpc" @@ -225,9 +226,11 @@ func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyIn func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) { eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - for _, v := range dn.EcShardInfos { - if v.Collection == selectedCollection && v.Id == uint32(vid) { - ecShardInfos = append(ecShardInfos, v) + if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { + for _, v := range diskInfo.EcShardInfos { + if v.Collection == selectedCollection && v.Id == uint32(vid) { + ecShardInfos = append(ecShardInfos, v) + } } } }) @@ -239,9 +242,11 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection stri vidMap := make(map[uint32]bool) eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - for _, v := range dn.EcShardInfos { - if v.Collection == selectedCollection { - vidMap[v.Id] = true + if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { + for _, v := range diskInfo.EcShardInfos { + if v.Collection == selectedCollection { + vidMap[v.Id] = true + } } } }) @@ -257,9 +262,11 @@ func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeI nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits) eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - for _, v := range dn.EcShardInfos { - if v.Id == uint32(vid) { - nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits) + if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { + for _, v := range diskInfo.EcShardInfos { + if v.Id == uint32(vid) { + nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits) + } } } }) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index b048b4cd3..e937f4490 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -281,10 +281,12 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri vidMap := make(map[uint32]bool) eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - for _, v := range dn.VolumeInfos { - if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { - if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 { - vidMap[v.Id] = true + for _, diskInfo := range dn.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { + if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 { + vidMap[v.Id] = true + } } } } diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index df28681fe..8d5d7bb91 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -188,10 +188,12 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection needEcxFile := true var localShardBits erasure_coding.ShardBits - for _, ecShardInfo := range rebuilder.info.EcShardInfos { - if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId { - needEcxFile = false - localShardBits = erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + for _, diskInfo := range rebuilder.info.DiskInfos { + for _, ecShardInfo := range diskInfo.EcShardInfos { + if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId { + needEcxFile = false + localShardBits = erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + } } } @@ -247,15 +249,17 @@ type EcShardMap map[needle.VolumeId]EcShardLocations type EcShardLocations [][]*EcNode func (ecShardMap EcShardMap) registerEcNode(ecNode *EcNode, collection string) { - for _, shardInfo := range ecNode.info.EcShardInfos { - if shardInfo.Collection == collection { - existing, found := ecShardMap[needle.VolumeId(shardInfo.Id)] - if !found { - existing = make([][]*EcNode, erasure_coding.TotalShardsCount) - ecShardMap[needle.VolumeId(shardInfo.Id)] = existing - } - for _, shardId := range erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIds() { - existing[shardId] = append(existing[shardId], ecNode) + for _, diskInfo := range ecNode.info.DiskInfos { + for _, shardInfo := range diskInfo.EcShardInfos { + if shardInfo.Collection == collection { + existing, found := ecShardMap[needle.VolumeId(shardInfo.Id)] + if !found { + existing = make([][]*EcNode, erasure_coding.TotalShardsCount) + ecShardMap[needle.VolumeId(shardInfo.Id)] = existing + } + for _, shardId := range erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIds() { + existing[shardId] = append(existing[shardId], ecNode) + } } } } diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go index 4fddcbea5..14235d565 100644 --- a/weed/shell/command_ec_test.go +++ b/weed/shell/command_ec_test.go @@ -127,6 +127,7 @@ func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNod return &EcNode{ info: &master_pb.DataNodeInfo{ Id: dataNodeId, + DiskInfos: make(map[string]*master_pb.DiskInfo), }, dc: dc, rack: RackId(rack), diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 6dd248844..a1e3c1ab6 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -4,8 +4,8 @@ import ( "context" "flag" "fmt" - "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "os" "sort" @@ -111,7 +111,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return nil } -func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []storage.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { +func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { for _, diskType := range diskTypes { if err := balanceVolumeServersByDiskType(commandEnv, diskType, volumeReplicas, nodes, volumeSizeLimit, collection, applyBalancing); err != nil { @@ -122,7 +122,7 @@ func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []storage.DiskType, } -func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType storage.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { +func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { // balance writable volumes for _, n := range nodes { @@ -135,7 +135,7 @@ func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType storage.Dis return v.DiskType == string(diskType) && (!v.ReadOnly && v.Size < volumeSizeLimit) }) } - if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount, sortWritableVolumes, applyBalancing); err != nil { + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortWritableVolumes, applyBalancing); err != nil { return err } @@ -150,7 +150,7 @@ func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType storage.Dis return v.DiskType == string(diskType) && (v.ReadOnly || v.Size >= volumeSizeLimit) }) } - if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount, sortReadOnlyVolumes, applyBalancing); err != nil { + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortReadOnlyVolumes, applyBalancing); err != nil { return err } @@ -175,21 +175,21 @@ func collectVolumeServersByDc(t *master_pb.TopologyInfo, selectedDataCenter stri return } -func collectVolumeDiskTypes(t *master_pb.TopologyInfo) (diskTypes []storage.DiskType) { +func collectVolumeDiskTypes(t *master_pb.TopologyInfo) (diskTypes []types.DiskType) { knownTypes := make(map[string]bool) for _, dc := range t.DataCenterInfos { for _, r := range dc.RackInfos { for _, dn := range r.DataNodeInfos { - for _, vi := range dn.VolumeInfos { - if _, found := knownTypes[vi.DiskType]; !found { - knownTypes[vi.DiskType] = true + for diskType, _ := range dn.DiskInfos { + if _, found := knownTypes[diskType]; !found { + knownTypes[diskType] = true } } } } } for diskType, _ := range knownTypes { - diskTypes = append(diskTypes, storage.ToDiskType(diskType)) + diskTypes = append(diskTypes, types.ToDiskType(diskType)) } return } @@ -203,11 +203,24 @@ type Node struct { type CapacityFunc func(*master_pb.DataNodeInfo) int -func capacityByMaxSsdVolumeCount(info *master_pb.DataNodeInfo) int { - return int(info.MaxSsdVolumeCount) +func capacityByMaxVolumeCount(diskType types.DiskType) CapacityFunc { + return func(info *master_pb.DataNodeInfo) int { + diskInfo, found := info.DiskInfos[string(diskType)] + if !found { + return 0 + } + return int(diskInfo.MaxVolumeCount) + } } -func capacityByMaxVolumeCount(info *master_pb.DataNodeInfo) int { - return int(info.MaxVolumeCount) + +func capacityByFreeVolumeCount(diskType types.DiskType) CapacityFunc { + return func(info *master_pb.DataNodeInfo) int { + diskInfo, found := info.DiskInfos[string(diskType)] + if !found { + return 0 + } + return int(diskInfo.MaxVolumeCount - diskInfo.VolumeCount) + } } func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 { @@ -220,9 +233,11 @@ func (n *Node) localVolumeNextRatio(capacityFunc CapacityFunc) float64 { func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) { n.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage) - for _, v := range n.info.VolumeInfos { - if fn(v) { - n.selectedVolumes[v.Id] = v + for _, diskInfo := range n.info.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if fn(v) { + n.selectedVolumes[v.Id] = v + } } } } @@ -329,7 +344,7 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f } fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) if applyChange { - return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, "") + return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, v.DiskType) } return nil } diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index 539bdb515..ecbe402e8 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -71,10 +71,12 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman var allLocations []location eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { loc := newLocation(dc, string(rack), dn) - for _, v := range dn.VolumeInfos { - if v.Id == uint32(vid) && v.ReplicaPlacement != replicaPlacementInt32 { - allLocations = append(allLocations, loc) - continue + for _, diskInfo := range dn.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if v.Id == uint32(vid) && v.ReplicaPlacement != replicaPlacementInt32 { + allLocations = append(allLocations, loc) + continue + } } } }) diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 8ae8850f3..7b2eb6769 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "path/filepath" "sort" @@ -102,8 +103,6 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } // find the most under populated data nodes - keepDataNodesSorted(allLocations) - return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations) } @@ -113,11 +112,13 @@ func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint var allLocations []location eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { loc := newLocation(dc, string(rack), dn) - for _, v := range dn.VolumeInfos { - volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ - location: &loc, - info: v, - }) + for _, diskInfo := range dn.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ + location: &loc, + info: v, + }) + } } allLocations = append(allLocations, loc) }) @@ -157,15 +158,18 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma } func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { + for _, vid := range underReplicatedVolumeIds { replicas := volumeReplicas[vid] replica := pickOneReplicaToCopyFrom(replicas) replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) foundNewLocation := false hasSkippedCollection := false + keepDataNodesSorted(allLocations, replica.info.DiskType) for _, dst := range allLocations { // check whether data nodes satisfy the constraints - if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { + fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) + if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { // check collection name pattern if *c.collectionPattern != "" { matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) @@ -202,11 +206,11 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm } // adjust free volume count - dst.dataNode.FreeVolumeCount-- - keepDataNodesSorted(allLocations) + dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount-- break } } + if !foundNewLocation && !hasSkippedCollection { fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) } @@ -215,9 +219,10 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm return nil } -func keepDataNodesSorted(dataNodes []location) { +func keepDataNodesSorted(dataNodes []location, diskType string) { + fn := capacityByFreeVolumeCount(types.ToDiskType(diskType)) sort.Slice(dataNodes, func(i, j int) bool { - return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount + return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode) }) } diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 75b0f28da..677f38856 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -285,18 +285,20 @@ func (c *commandVolumeFsck) collectVolumeIds(verbose bool, writer io.Writer) (vo } eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) { - for _, vi := range t.VolumeInfos { - volumeIdToServer[vi.Id] = VInfo{ - server: t.Id, - collection: vi.Collection, - isEcVolume: false, + for _, diskInfo := range t.DiskInfos{ + for _, vi := range diskInfo.VolumeInfos { + volumeIdToServer[vi.Id] = VInfo{ + server: t.Id, + collection: vi.Collection, + isEcVolume: false, + } } - } - for _, ecShardInfo := range t.EcShardInfos { - volumeIdToServer[ecShardInfo.Id] = VInfo{ - server: t.Id, - collection: ecShardInfo.Collection, - isEcVolume: true, + for _, ecShardInfo := range diskInfo.EcShardInfos { + volumeIdToServer[ecShardInfo.Id] = VInfo{ + server: t.Id, + collection: ecShardInfo.Collection, + isEcVolume: true, + } } } }) diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go index c5a9388fa..dc8783cd1 100644 --- a/weed/shell/command_volume_list.go +++ b/weed/shell/command_volume_list.go @@ -1,6 +1,7 @@ package shell import ( + "bytes" "context" "fmt" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -44,8 +45,25 @@ func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io. return nil } +func diskInfosToString(diskInfos map[string]*master_pb.DiskInfo) string { + var buf bytes.Buffer + for diskType, diskInfo := range diskInfos { + if diskType == "" { + diskType = "hdd" + } + fmt.Fprintf(&buf, " %s(volume:%d/%d active:%d free:%d remote:%d)", diskType, diskInfo.VolumeCount, diskInfo.MaxVolumeCount, diskInfo.ActiveVolumeCount, diskInfo.FreeVolumeCount, diskInfo.RemoteVolumeCount) + } + return buf.String() +} + +func diskInfoToString(diskInfo *master_pb.DiskInfo) string { + var buf bytes.Buffer + fmt.Fprintf(&buf, "volume:%d/%d active:%d free:%d remote:%d", diskInfo.VolumeCount, diskInfo.MaxVolumeCount, diskInfo.ActiveVolumeCount, diskInfo.FreeVolumeCount, diskInfo.RemoteVolumeCount) + return buf.String() +} + func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo, volumeSizeLimitMb uint64) statistics { - fmt.Fprintf(writer, "Topology volume:%d/%d active:%d free:%d remote:%d volumeSizeLimit:%d MB\n", t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount, volumeSizeLimitMb) + fmt.Fprintf(writer, "Topology volumeSizeLimit:%d MB%s\n", volumeSizeLimitMb, diskInfosToString(t.DiskInfos)) sort.Slice(t.DataCenterInfos, func(i, j int) bool { return t.DataCenterInfos[i].Id < t.DataCenterInfos[j].Id }) @@ -57,7 +75,7 @@ func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo, volumeSizeLi return s } func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo) statistics { - fmt.Fprintf(writer, " DataCenter %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount) + fmt.Fprintf(writer, " DataCenter %s%s\n", t.Id, diskInfosToString(t.DiskInfos)) var s statistics sort.Slice(t.RackInfos, func(i, j int) bool { return t.RackInfos[i].Id < t.RackInfos[j].Id @@ -69,7 +87,7 @@ func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo) statisti return s } func writeRackInfo(writer io.Writer, t *master_pb.RackInfo) statistics { - fmt.Fprintf(writer, " Rack %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount) + fmt.Fprintf(writer, " Rack %s%s\n", t.Id, diskInfosToString(t.DiskInfos)) var s statistics sort.Slice(t.DataNodeInfos, func(i, j int) bool { return t.DataNodeInfos[i].Id < t.DataNodeInfos[j].Id @@ -81,8 +99,22 @@ func writeRackInfo(writer io.Writer, t *master_pb.RackInfo) statistics { return s } func writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo) statistics { - fmt.Fprintf(writer, " DataNode %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount) + fmt.Fprintf(writer, " DataNode %s%s\n", t.Id, diskInfosToString(t.DiskInfos)) + var s statistics + for _, diskInfo := range t.DiskInfos { + s = s.plus(writeDiskInfo(writer, diskInfo)) + } + fmt.Fprintf(writer, " DataNode %s %+v \n", t.Id, s) + return s +} + +func writeDiskInfo(writer io.Writer, t *master_pb.DiskInfo) statistics { var s statistics + diskType := t.Type + if diskType == "" { + diskType = "hdd" + } + fmt.Fprintf(writer, " Disk %s(%s)\n", diskType, diskInfoToString(t)) sort.Slice(t.VolumeInfos, func(i, j int) bool { return t.VolumeInfos[i].Id < t.VolumeInfos[j].Id }) @@ -90,13 +122,14 @@ func writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo) statistics { s = s.plus(writeVolumeInformationMessage(writer, vi)) } for _, ecShardInfo := range t.EcShardInfos { - fmt.Fprintf(writer, " ec volume id:%v collection:%v shards:%v\n", ecShardInfo.Id, ecShardInfo.Collection, erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds()) + fmt.Fprintf(writer, " ec volume id:%v collection:%v shards:%v\n", ecShardInfo.Id, ecShardInfo.Collection, erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds()) } - fmt.Fprintf(writer, " DataNode %s %+v \n", t.Id, s) + fmt.Fprintf(writer, " Disk %s %+v \n", diskType, s) return s } + func writeVolumeInformationMessage(writer io.Writer, t *master_pb.VolumeInformationMessage) statistics { - fmt.Fprintf(writer, " volume %+v \n", t) + fmt.Fprintf(writer, " volume %+v \n", t) return newStatistics(t) } diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 1539ccb19..5216de66b 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "os" "sort" @@ -100,17 +101,19 @@ func evacuateNormalVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListRes // move away normal volumes volumeReplicas, _ := collectVolumeReplicaLocations(resp) - for _, vol := range thisNode.info.VolumeInfos { - hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) - if err != nil { - return fmt.Errorf("move away volume %d from %s: %v", vol.Id, volumeServer, err) - } - if !hasMoved { - if skipNonMoveable { - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement)) - fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String()) - } else { - return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer) + for _, diskInfo := range thisNode.info.DiskInfos { + for _, vol := range diskInfo.VolumeInfos { + hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) + if err != nil { + return fmt.Errorf("move away volume %d from %s: %v", vol.Id, volumeServer, err) + } + if !hasMoved { + if skipNonMoveable { + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement)) + fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String()) + } else { + return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer) + } } } } @@ -126,16 +129,18 @@ func evacuateEcVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListRespons } // move away ec volumes - for _, ecShardInfo := range thisNode.info.EcShardInfos { - hasMoved, err := moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange) - if err != nil { - return fmt.Errorf("move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err) - } - if !hasMoved { - if skipNonMoveable { - fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer) - } else { - return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer) + for _, diskInfo := range thisNode.info.DiskInfos { + for _, ecShardInfo := range diskInfo.EcShardInfos { + hasMoved, err := moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange) + if err != nil { + return fmt.Errorf("move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err) + } + if !hasMoved { + if skipNonMoveable { + fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer) + } else { + return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer) + } } } } @@ -174,8 +179,9 @@ func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEc } func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { + fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType)) sort.Slice(otherNodes, func(i, j int) bool { - return otherNodes[i].localVolumeRatio(capacityByMaxVolumeCount)+otherNodes[i].localVolumeRatio(capacityByMaxSsdVolumeCount) < otherNodes[j].localVolumeRatio(capacityByMaxVolumeCount)+otherNodes[j].localVolumeRatio(capacityByMaxSsdVolumeCount) + return otherNodes[i].localVolumeRatio(fn) > otherNodes[j].localVolumeRatio(fn) }) for i := 0; i < len(otherNodes); i++ { diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go index d31c8c031..8aeb34d5c 100644 --- a/weed/shell/command_volume_tier_download.go +++ b/weed/shell/command_volume_tier_download.go @@ -86,9 +86,11 @@ func collectRemoteVolumes(topoInfo *master_pb.TopologyInfo, selectedCollection s vidMap := make(map[uint32]bool) eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - for _, v := range dn.VolumeInfos { - if v.Collection == selectedCollection && v.RemoteStorageKey != "" && v.RemoteStorageName != "" { - vidMap[v.Id] = true + for _, diskInfo := range dn.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if v.Collection == selectedCollection && v.RemoteStorageKey != "" && v.RemoteStorageName != "" { + vidMap[v.Id] = true + } } } }) diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go new file mode 100644 index 000000000..3c6c31eac --- /dev/null +++ b/weed/shell/command_volume_tier_move.go @@ -0,0 +1,108 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "io" + "time" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func init() { + Commands = append(Commands, &commandVolumeTierMove{}) +} + +type commandVolumeTierMove struct { +} + +func (c *commandVolumeTierMove) Name() string { + return "volume.tier.upload" +} + +func (c *commandVolumeTierMove) Help() string { + return `change a volume from one disk type to another + + volume.tier.move -source=hdd -target=ssd [-collection=""] [-fullPercent=95] [-quietFor=1h] + volume.tier.move -target=hdd [-collection=""] -volumeId=<volume_id> + +` +} + +func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeId := tierCommand.Int("volumeId", 0, "the volume id") + collection := tierCommand.String("collection", "", "the collection name") + fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") + quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") + source := tierCommand.String("fromDiskType", "", "the source disk type") + target := tierCommand.String("toDiskType", "", "the target disk type") + if err = tierCommand.Parse(args); err != nil { + return nil + } + + if *source == *target { + return fmt.Errorf("source tier %s is the same as target tier %s", *source, *target) + } + + vid := needle.VolumeId(*volumeId) + + // volumeId is provided + if vid != 0 { + // return doVolumeTierMove(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile) + } + + // apply to all volumes in the collection + // reusing collectVolumeIdsForEcEncode for now + volumeIds, err := collectVolumeIdsForTierChange(commandEnv, *source, *collection, *fullPercentage, *quietPeriod) + if err != nil { + return err + } + fmt.Printf("tier move volumes: %v\n", volumeIds) + + return nil +} + +func collectVolumeIdsForTierChange(commandEnv *CommandEnv, sourceTier string, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { + + var resp *master_pb.VolumeListResponse + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return + } + + quietSeconds := int64(quietPeriod / time.Second) + nowUnixSeconds := time.Now().Unix() + + fmt.Printf("collect %s volumes quiet for: %d seconds\n", sourceTier, quietSeconds) + + vidMap := make(map[uint32]bool) + eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + for _, diskInfo := range dn.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == types.ToDiskType(sourceTier) { + if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 { + vidMap[v.Id] = true + } + } + } + } + }) + + for vid := range vidMap { + vids = append(vids, needle.VolumeId(vid)) + } + + return +} |
