diff options
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_ec_balance.go | 57 | ||||
| -rw-r--r-- | weed/shell/command_ec_common.go | 81 | ||||
| -rw-r--r-- | weed/shell/command_ec_decode.go | 25 | ||||
| -rw-r--r-- | weed/shell/command_ec_encode.go | 10 | ||||
| -rw-r--r-- | weed/shell/command_ec_rebuild.go | 30 | ||||
| -rw-r--r-- | weed/shell/command_ec_test.go | 3 | ||||
| -rw-r--r-- | weed/shell/command_fs_configure.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_volume_balance.go | 113 | ||||
| -rw-r--r-- | weed/shell/command_volume_configure_replication.go | 10 | ||||
| -rw-r--r-- | weed/shell/command_volume_copy.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 29 | ||||
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 24 | ||||
| -rw-r--r-- | weed/shell/command_volume_list.go | 47 | ||||
| -rw-r--r-- | weed/shell/command_volume_move.go | 13 | ||||
| -rw-r--r-- | weed/shell/command_volume_server_evacuate.go | 50 | ||||
| -rw-r--r-- | weed/shell/command_volume_tier_download.go | 8 | ||||
| -rw-r--r-- | weed/shell/command_volume_tier_move.go | 108 |
17 files changed, 446 insertions, 166 deletions
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 7117f52df..b1ca926d5 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -3,6 +3,7 @@ package shell import ( "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "sort" @@ -325,7 +326,9 @@ func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, ra var possibleDestinationEcNodes []*EcNode for _, n := range racks[RackId(rackId)].ecNodes { - possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) + if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found { + possibleDestinationEcNodes = append(possibleDestinationEcNodes, n) + } } sourceEcNodes := rackEcNodesWithVid[rackId] averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes)) @@ -386,11 +389,15 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool rackEcNodes = append(rackEcNodes, node) } - ecNodeIdToShardCount := groupByCount(rackEcNodes, func(node *EcNode) (id string, count int) { - for _, ecShardInfo := range node.info.EcShardInfos { + ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) { + diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + if !found { + return + } + for _, ecShardInfo := range diskInfo.EcShardInfos { count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount() } - return node.info.Id, count + return ecNode.info.Id, count }) var totalShardCount int @@ -411,26 +418,30 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount { emptyNodeIds := make(map[uint32]bool) - for _, shards := range emptyNode.info.EcShardInfos { - emptyNodeIds[shards.Id] = true + if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found { + for _, shards := range emptyDiskInfo.EcShardInfos { + emptyNodeIds[shards.Id] = true + } } - for _, shards := range fullNode.info.EcShardInfos { - if _, found := emptyNodeIds[shards.Id]; !found { - for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() { - - fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) - - err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing) - if err != nil { - return err + if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found { + for _, shards := range fullDiskInfo.EcShardInfos { + if _, found := emptyNodeIds[shards.Id]; !found { + for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() { + + fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id) + + err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing) + if err != nil { + return err + } + + ecNodeIdToShardCount[emptyNode.info.Id]++ + ecNodeIdToShardCount[fullNode.info.Id]-- + hasMove = true + break } - - ecNodeIdToShardCount[emptyNode.info.Id]++ - ecNodeIdToShardCount[fullNode.info.Id]-- - hasMove = true break } - break } } } @@ -511,7 +522,11 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[ func collectVolumeIdToEcNodes(allEcNodes []*EcNode) map[needle.VolumeId][]*EcNode { vidLocations := make(map[needle.VolumeId][]*EcNode) for _, ecNode := range allEcNodes { - for _, shardInfo := range ecNode.info.EcShardInfos { + diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + if !found { + continue + } + for _, shardInfo := range diskInfo.EcShardInfos { vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode) } } diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index a808335eb..87a138ab6 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -3,6 +3,7 @@ package shell import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "math" "sort" @@ -159,8 +160,15 @@ func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (cou return } -func countFreeShardSlots(dn *master_pb.DataNodeInfo) (count int) { - return int(dn.MaxVolumeCount-dn.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(dn.EcShardInfos) +func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) { + if dn.DiskInfos == nil { + return 0 + } + diskInfo := dn.DiskInfos[string(diskType)] + if diskInfo == nil { + return 0 + } + return int(diskInfo.MaxVolumeCount-diskInfo.ActiveVolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos) } type RackId string @@ -174,10 +182,12 @@ type EcNode struct { } func (ecNode *EcNode) localShardIdCount(vid uint32) int { - for _, ecShardInfo := range ecNode.info.EcShardInfos { - if vid == ecShardInfo.Id { - shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) - return shardBits.ShardIdCount() + for _, diskInfo := range ecNode.info.DiskInfos { + for _, ecShardInfo := range diskInfo.EcShardInfos { + if vid == ecShardInfo.Id { + shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + return shardBits.ShardIdCount() + } } } return 0 @@ -214,7 +224,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter return } - freeEcSlots := countFreeShardSlots(dn) + freeEcSlots := countFreeShardSlots(dn, types.HardDriveType) ecNodes = append(ecNodes, &EcNode{ info: dn, dc: dc, @@ -278,9 +288,11 @@ func ceilDivide(total, n int) int { func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits { - for _, shardInfo := range ecNode.info.EcShardInfos { - if needle.VolumeId(shardInfo.Id) == vid { - return erasure_coding.ShardBits(shardInfo.EcIndexBits) + if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found { + for _, shardInfo := range diskInfo.EcShardInfos { + if needle.VolumeId(shardInfo.Id) == vid { + return erasure_coding.ShardBits(shardInfo.EcIndexBits) + } } } @@ -290,18 +302,26 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode { foundVolume := false - for _, shardInfo := range ecNode.info.EcShardInfos { - if needle.VolumeId(shardInfo.Id) == vid { - oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) - newShardBits := oldShardBits - for _, shardId := range shardIds { - newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId)) + diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)] + if found { + for _, shardInfo := range diskInfo.EcShardInfos { + if needle.VolumeId(shardInfo.Id) == vid { + oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) + newShardBits := oldShardBits + for _, shardId := range shardIds { + newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId)) + } + shardInfo.EcIndexBits = uint32(newShardBits) + ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount() + foundVolume = true + break } - shardInfo.EcIndexBits = uint32(newShardBits) - ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount() - foundVolume = true - break } + } else { + diskInfo = &master_pb.DiskInfo{ + Type: string(types.HardDriveType), + } + ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo } if !foundVolume { @@ -309,10 +329,11 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, for _, shardId := range shardIds { newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId)) } - ecNode.info.EcShardInfos = append(ecNode.info.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{ + diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{ Id: uint32(vid), Collection: collection, EcIndexBits: uint32(newShardBits), + DiskType: string(types.HardDriveType), }) ecNode.freeEcSlot -= len(shardIds) } @@ -322,15 +343,17 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode { - for _, shardInfo := range ecNode.info.EcShardInfos { - if needle.VolumeId(shardInfo.Id) == vid { - oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) - newShardBits := oldShardBits - for _, shardId := range shardIds { - newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId)) + if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found { + for _, shardInfo := range diskInfo.EcShardInfos { + if needle.VolumeId(shardInfo.Id) == vid { + oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits) + newShardBits := oldShardBits + for _, shardId := range shardIds { + newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId)) + } + shardInfo.EcIndexBits = uint32(newShardBits) + ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount() } - shardInfo.EcIndexBits = uint32(newShardBits) - ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount() } } diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index da7c8844f..3e1499d41 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "google.golang.org/grpc" @@ -225,9 +226,11 @@ func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyIn func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) { eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - for _, v := range dn.EcShardInfos { - if v.Collection == selectedCollection && v.Id == uint32(vid) { - ecShardInfos = append(ecShardInfos, v) + if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { + for _, v := range diskInfo.EcShardInfos { + if v.Collection == selectedCollection && v.Id == uint32(vid) { + ecShardInfos = append(ecShardInfos, v) + } } } }) @@ -239,9 +242,11 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection stri vidMap := make(map[uint32]bool) eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - for _, v := range dn.EcShardInfos { - if v.Collection == selectedCollection { - vidMap[v.Id] = true + if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { + for _, v := range diskInfo.EcShardInfos { + if v.Collection == selectedCollection { + vidMap[v.Id] = true + } } } }) @@ -257,9 +262,11 @@ func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeI nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits) eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - for _, v := range dn.EcShardInfos { - if v.Id == uint32(vid) { - nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits) + if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found { + for _, v := range diskInfo.EcShardInfos { + if v.Id == uint32(vid) { + nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits) + } } } }) diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index b048b4cd3..e937f4490 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -281,10 +281,12 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri vidMap := make(map[uint32]bool) eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - for _, v := range dn.VolumeInfos { - if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { - if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 { - vidMap[v.Id] = true + for _, diskInfo := range dn.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { + if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 { + vidMap[v.Id] = true + } } } } diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index df28681fe..8d5d7bb91 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -188,10 +188,12 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection needEcxFile := true var localShardBits erasure_coding.ShardBits - for _, ecShardInfo := range rebuilder.info.EcShardInfos { - if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId { - needEcxFile = false - localShardBits = erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + for _, diskInfo := range rebuilder.info.DiskInfos { + for _, ecShardInfo := range diskInfo.EcShardInfos { + if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId { + needEcxFile = false + localShardBits = erasure_coding.ShardBits(ecShardInfo.EcIndexBits) + } } } @@ -247,15 +249,17 @@ type EcShardMap map[needle.VolumeId]EcShardLocations type EcShardLocations [][]*EcNode func (ecShardMap EcShardMap) registerEcNode(ecNode *EcNode, collection string) { - for _, shardInfo := range ecNode.info.EcShardInfos { - if shardInfo.Collection == collection { - existing, found := ecShardMap[needle.VolumeId(shardInfo.Id)] - if !found { - existing = make([][]*EcNode, erasure_coding.TotalShardsCount) - ecShardMap[needle.VolumeId(shardInfo.Id)] = existing - } - for _, shardId := range erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIds() { - existing[shardId] = append(existing[shardId], ecNode) + for _, diskInfo := range ecNode.info.DiskInfos { + for _, shardInfo := range diskInfo.EcShardInfos { + if shardInfo.Collection == collection { + existing, found := ecShardMap[needle.VolumeId(shardInfo.Id)] + if !found { + existing = make([][]*EcNode, erasure_coding.TotalShardsCount) + ecShardMap[needle.VolumeId(shardInfo.Id)] = existing + } + for _, shardId := range erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIds() { + existing[shardId] = append(existing[shardId], ecNode) + } } } } diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go index 4fddcbea5..a1226adbb 100644 --- a/weed/shell/command_ec_test.go +++ b/weed/shell/command_ec_test.go @@ -126,7 +126,8 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) { func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNode { return &EcNode{ info: &master_pb.DataNodeInfo{ - Id: dataNodeId, + Id: dataNodeId, + DiskInfos: make(map[string]*master_pb.DiskInfo), }, dc: dc, rack: RackId(rack), diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go index 06ae15c9e..d0db3722a 100644 --- a/weed/shell/command_fs_configure.go +++ b/weed/shell/command_fs_configure.go @@ -52,6 +52,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io collection := fsConfigureCommand.String("collection", "", "assign writes to this collection") replication := fsConfigureCommand.String("replication", "", "assign writes with this replication") ttl := fsConfigureCommand.String("ttl", "", "assign writes with this ttl") + diskType := fsConfigureCommand.String("disk", "", "[hdd|ssd] hard drive or solid state drive") fsync := fsConfigureCommand.Bool("fsync", false, "fsync for the writes") volumeGrowthCount := fsConfigureCommand.Int("volumeGrowthCount", 0, "the number of physical volumes to add if no writable volumes") isDelete := fsConfigureCommand.Bool("delete", false, "delete the configuration by locationPrefix") @@ -81,6 +82,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io Replication: *replication, Ttl: *ttl, Fsync: *fsync, + DiskType: *diskType, VolumeGrowthCount: uint32(*volumeGrowthCount), } diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index 928dec02a..a1e3c1ab6 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "os" "sort" @@ -85,6 +86,7 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer volumeServers := collectVolumeServersByDc(resp.TopologyInfo, *dc) volumeReplicas, _ := collectVolumeReplicaLocations(resp) + diskTypes := collectVolumeDiskTypes(resp.TopologyInfo) if *collection == "EACH_COLLECTION" { collections, err := ListCollectionNames(commandEnv, true, false) @@ -92,16 +94,16 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return err } for _, c := range collections { - if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { + if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, c, *applyBalancing); err != nil { return err } } } else if *collection == "ALL_COLLECTIONS" { - if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { + if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, "ALL_COLLECTIONS", *applyBalancing); err != nil { return err } } else { - if err = balanceVolumeServers(commandEnv, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { + if err = balanceVolumeServers(commandEnv, diskTypes, volumeReplicas, volumeServers, resp.VolumeSizeLimitMb*1024*1024, *collection, *applyBalancing); err != nil { return err } } @@ -109,7 +111,18 @@ func (c *commandVolumeBalance) Do(args []string, commandEnv *CommandEnv, writer return nil } -func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { +func balanceVolumeServers(commandEnv *CommandEnv, diskTypes []types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { + + for _, diskType := range diskTypes { + if err := balanceVolumeServersByDiskType(commandEnv, diskType, volumeReplicas, nodes, volumeSizeLimit, collection, applyBalancing); err != nil { + return err + } + } + return nil + +} + +func balanceVolumeServersByDiskType(commandEnv *CommandEnv, diskType types.DiskType, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, volumeSizeLimit uint64, collection string, applyBalancing bool) error { // balance writable volumes for _, n := range nodes { @@ -119,10 +132,10 @@ func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*V return false } } - return !v.ReadOnly && v.Size < volumeSizeLimit + return v.DiskType == string(diskType) && (!v.ReadOnly && v.Size < volumeSizeLimit) }) } - if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortWritableVolumes, applyBalancing); err != nil { + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortWritableVolumes, applyBalancing); err != nil { return err } @@ -134,10 +147,10 @@ func balanceVolumeServers(commandEnv *CommandEnv, volumeReplicas map[uint32][]*V return false } } - return v.ReadOnly || v.Size >= volumeSizeLimit + return v.DiskType == string(diskType) && (v.ReadOnly || v.Size >= volumeSizeLimit) }) } - if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, sortReadOnlyVolumes, applyBalancing); err != nil { + if err := balanceSelectedVolume(commandEnv, volumeReplicas, nodes, capacityByMaxVolumeCount(diskType), sortReadOnlyVolumes, applyBalancing); err != nil { return err } @@ -162,6 +175,25 @@ func collectVolumeServersByDc(t *master_pb.TopologyInfo, selectedDataCenter stri return } +func collectVolumeDiskTypes(t *master_pb.TopologyInfo) (diskTypes []types.DiskType) { + knownTypes := make(map[string]bool) + for _, dc := range t.DataCenterInfos { + for _, r := range dc.RackInfos { + for _, dn := range r.DataNodeInfos { + for diskType, _ := range dn.DiskInfos { + if _, found := knownTypes[diskType]; !found { + knownTypes[diskType] = true + } + } + } + } + } + for diskType, _ := range knownTypes { + diskTypes = append(diskTypes, types.ToDiskType(diskType)) + } + return +} + type Node struct { info *master_pb.DataNodeInfo selectedVolumes map[uint32]*master_pb.VolumeInformationMessage @@ -169,19 +201,43 @@ type Node struct { rack string } -func (n *Node) localVolumeRatio() float64 { - return divide(len(n.selectedVolumes), int(n.info.MaxVolumeCount)) +type CapacityFunc func(*master_pb.DataNodeInfo) int + +func capacityByMaxVolumeCount(diskType types.DiskType) CapacityFunc { + return func(info *master_pb.DataNodeInfo) int { + diskInfo, found := info.DiskInfos[string(diskType)] + if !found { + return 0 + } + return int(diskInfo.MaxVolumeCount) + } +} + +func capacityByFreeVolumeCount(diskType types.DiskType) CapacityFunc { + return func(info *master_pb.DataNodeInfo) int { + diskInfo, found := info.DiskInfos[string(diskType)] + if !found { + return 0 + } + return int(diskInfo.MaxVolumeCount - diskInfo.VolumeCount) + } +} + +func (n *Node) localVolumeRatio(capacityFunc CapacityFunc) float64 { + return divide(len(n.selectedVolumes), capacityFunc(n.info)) } -func (n *Node) localVolumeNextRatio() float64 { - return divide(len(n.selectedVolumes)+1, int(n.info.MaxVolumeCount)) +func (n *Node) localVolumeNextRatio(capacityFunc CapacityFunc) float64 { + return divide(len(n.selectedVolumes)+1, capacityFunc(n.info)) } func (n *Node) selectVolumes(fn func(v *master_pb.VolumeInformationMessage) bool) { n.selectedVolumes = make(map[uint32]*master_pb.VolumeInformationMessage) - for _, v := range n.info.VolumeInfos { - if fn(v) { - n.selectedVolumes[v.Id] = v + for _, diskInfo := range n.info.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if fn(v) { + n.selectedVolumes[v.Id] = v + } } } } @@ -198,33 +254,40 @@ func sortReadOnlyVolumes(volumes []*master_pb.VolumeInformationMessage) { }) } -func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) { +func balanceSelectedVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, nodes []*Node, capacityFunc CapacityFunc, sortCandidatesFn func(volumes []*master_pb.VolumeInformationMessage), applyBalancing bool) (err error) { selectedVolumeCount, volumeMaxCount := 0, 0 + var nodesWithCapacity []*Node for _, dn := range nodes { selectedVolumeCount += len(dn.selectedVolumes) - volumeMaxCount += int(dn.info.MaxVolumeCount) + capacity := capacityFunc(dn.info) + if capacity > 0 { + nodesWithCapacity = append(nodesWithCapacity, dn) + } + volumeMaxCount += capacity } idealVolumeRatio := divide(selectedVolumeCount, volumeMaxCount) hasMoved := true + // fmt.Fprintf(os.Stdout, " total %d volumes, max %d volumes, idealVolumeRatio %f\n", selectedVolumeCount, volumeMaxCount, idealVolumeRatio) + for hasMoved { hasMoved = false - sort.Slice(nodes, func(i, j int) bool { - return nodes[i].localVolumeRatio() < nodes[j].localVolumeRatio() + sort.Slice(nodesWithCapacity, func(i, j int) bool { + return nodesWithCapacity[i].localVolumeRatio(capacityFunc) < nodesWithCapacity[j].localVolumeRatio(capacityFunc) }) - fullNode := nodes[len(nodes)-1] + fullNode := nodesWithCapacity[len(nodesWithCapacity)-1] var candidateVolumes []*master_pb.VolumeInformationMessage for _, v := range fullNode.selectedVolumes { candidateVolumes = append(candidateVolumes, v) } sortCandidatesFn(candidateVolumes) - for i := 0; i < len(nodes)-1; i++ { - emptyNode := nodes[i] - if !(fullNode.localVolumeRatio() > idealVolumeRatio && emptyNode.localVolumeNextRatio() <= idealVolumeRatio) { + for i := 0; i < len(nodesWithCapacity)-1; i++ { + emptyNode := nodesWithCapacity[i] + if !(fullNode.localVolumeRatio(capacityFunc) > idealVolumeRatio && emptyNode.localVolumeNextRatio(capacityFunc) <= idealVolumeRatio) { // no more volume servers with empty slots break } @@ -279,9 +342,9 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f if v.Collection == "" { collectionPrefix = "" } - fmt.Fprintf(os.Stdout, "moving volume %s%d %s => %s\n", collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) + fmt.Fprintf(os.Stdout, " moving %s volume %s%d %s => %s\n", v.DiskType, collectionPrefix, v.Id, fullNode.info.Id, emptyNode.info.Id) if applyChange { - return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second) + return LiveMoveVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(v.Id), fullNode.info.Id, emptyNode.info.Id, 5*time.Second, v.DiskType) } return nil } diff --git a/weed/shell/command_volume_configure_replication.go b/weed/shell/command_volume_configure_replication.go index 539bdb515..ecbe402e8 100644 --- a/weed/shell/command_volume_configure_replication.go +++ b/weed/shell/command_volume_configure_replication.go @@ -71,10 +71,12 @@ func (c *commandVolumeConfigureReplication) Do(args []string, commandEnv *Comman var allLocations []location eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { loc := newLocation(dc, string(rack), dn) - for _, v := range dn.VolumeInfos { - if v.Id == uint32(vid) && v.ReplicaPlacement != replicaPlacementInt32 { - allLocations = append(allLocations, loc) - continue + for _, diskInfo := range dn.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if v.Id == uint32(vid) && v.ReplicaPlacement != replicaPlacementInt32 { + allLocations = append(allLocations, loc) + continue + } } } }) diff --git a/weed/shell/command_volume_copy.go b/weed/shell/command_volume_copy.go index f9edf9431..85b4bb095 100644 --- a/weed/shell/command_volume_copy.go +++ b/weed/shell/command_volume_copy.go @@ -52,6 +52,6 @@ func (c *commandVolumeCopy) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("source and target volume servers are the same!") } - _, err = copyVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer) + _, err = copyVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, "") return } diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 8ae8850f3..7b2eb6769 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "path/filepath" "sort" @@ -102,8 +103,6 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } // find the most under populated data nodes - keepDataNodesSorted(allLocations) - return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations) } @@ -113,11 +112,13 @@ func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint var allLocations []location eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { loc := newLocation(dc, string(rack), dn) - for _, v := range dn.VolumeInfos { - volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ - location: &loc, - info: v, - }) + for _, diskInfo := range dn.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{ + location: &loc, + info: v, + }) + } } allLocations = append(allLocations, loc) }) @@ -157,15 +158,18 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma } func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error { + for _, vid := range underReplicatedVolumeIds { replicas := volumeReplicas[vid] replica := pickOneReplicaToCopyFrom(replicas) replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement)) foundNewLocation := false hasSkippedCollection := false + keepDataNodesSorted(allLocations, replica.info.DiskType) for _, dst := range allLocations { // check whether data nodes satisfy the constraints - if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { + fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType)) + if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { // check collection name pattern if *c.collectionPattern != "" { matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) @@ -202,11 +206,11 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm } // adjust free volume count - dst.dataNode.FreeVolumeCount-- - keepDataNodesSorted(allLocations) + dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount-- break } } + if !foundNewLocation && !hasSkippedCollection { fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas)) } @@ -215,9 +219,10 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm return nil } -func keepDataNodesSorted(dataNodes []location) { +func keepDataNodesSorted(dataNodes []location, diskType string) { + fn := capacityByFreeVolumeCount(types.ToDiskType(diskType)) sort.Slice(dataNodes, func(i, j int) bool { - return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount + return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode) }) } diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 75b0f28da..f9dcf3b5f 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -285,18 +285,20 @@ func (c *commandVolumeFsck) collectVolumeIds(verbose bool, writer io.Writer) (vo } eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) { - for _, vi := range t.VolumeInfos { - volumeIdToServer[vi.Id] = VInfo{ - server: t.Id, - collection: vi.Collection, - isEcVolume: false, + for _, diskInfo := range t.DiskInfos { + for _, vi := range diskInfo.VolumeInfos { + volumeIdToServer[vi.Id] = VInfo{ + server: t.Id, + collection: vi.Collection, + isEcVolume: false, + } } - } - for _, ecShardInfo := range t.EcShardInfos { - volumeIdToServer[ecShardInfo.Id] = VInfo{ - server: t.Id, - collection: ecShardInfo.Collection, - isEcVolume: true, + for _, ecShardInfo := range diskInfo.EcShardInfos { + volumeIdToServer[ecShardInfo.Id] = VInfo{ + server: t.Id, + collection: ecShardInfo.Collection, + isEcVolume: true, + } } } }) diff --git a/weed/shell/command_volume_list.go b/weed/shell/command_volume_list.go index c5a9388fa..dc8783cd1 100644 --- a/weed/shell/command_volume_list.go +++ b/weed/shell/command_volume_list.go @@ -1,6 +1,7 @@ package shell import ( + "bytes" "context" "fmt" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -44,8 +45,25 @@ func (c *commandVolumeList) Do(args []string, commandEnv *CommandEnv, writer io. return nil } +func diskInfosToString(diskInfos map[string]*master_pb.DiskInfo) string { + var buf bytes.Buffer + for diskType, diskInfo := range diskInfos { + if diskType == "" { + diskType = "hdd" + } + fmt.Fprintf(&buf, " %s(volume:%d/%d active:%d free:%d remote:%d)", diskType, diskInfo.VolumeCount, diskInfo.MaxVolumeCount, diskInfo.ActiveVolumeCount, diskInfo.FreeVolumeCount, diskInfo.RemoteVolumeCount) + } + return buf.String() +} + +func diskInfoToString(diskInfo *master_pb.DiskInfo) string { + var buf bytes.Buffer + fmt.Fprintf(&buf, "volume:%d/%d active:%d free:%d remote:%d", diskInfo.VolumeCount, diskInfo.MaxVolumeCount, diskInfo.ActiveVolumeCount, diskInfo.FreeVolumeCount, diskInfo.RemoteVolumeCount) + return buf.String() +} + func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo, volumeSizeLimitMb uint64) statistics { - fmt.Fprintf(writer, "Topology volume:%d/%d active:%d free:%d remote:%d volumeSizeLimit:%d MB\n", t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount, volumeSizeLimitMb) + fmt.Fprintf(writer, "Topology volumeSizeLimit:%d MB%s\n", volumeSizeLimitMb, diskInfosToString(t.DiskInfos)) sort.Slice(t.DataCenterInfos, func(i, j int) bool { return t.DataCenterInfos[i].Id < t.DataCenterInfos[j].Id }) @@ -57,7 +75,7 @@ func writeTopologyInfo(writer io.Writer, t *master_pb.TopologyInfo, volumeSizeLi return s } func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo) statistics { - fmt.Fprintf(writer, " DataCenter %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount) + fmt.Fprintf(writer, " DataCenter %s%s\n", t.Id, diskInfosToString(t.DiskInfos)) var s statistics sort.Slice(t.RackInfos, func(i, j int) bool { return t.RackInfos[i].Id < t.RackInfos[j].Id @@ -69,7 +87,7 @@ func writeDataCenterInfo(writer io.Writer, t *master_pb.DataCenterInfo) statisti return s } func writeRackInfo(writer io.Writer, t *master_pb.RackInfo) statistics { - fmt.Fprintf(writer, " Rack %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount) + fmt.Fprintf(writer, " Rack %s%s\n", t.Id, diskInfosToString(t.DiskInfos)) var s statistics sort.Slice(t.DataNodeInfos, func(i, j int) bool { return t.DataNodeInfos[i].Id < t.DataNodeInfos[j].Id @@ -81,8 +99,22 @@ func writeRackInfo(writer io.Writer, t *master_pb.RackInfo) statistics { return s } func writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo) statistics { - fmt.Fprintf(writer, " DataNode %s volume:%d/%d active:%d free:%d remote:%d\n", t.Id, t.VolumeCount, t.MaxVolumeCount, t.ActiveVolumeCount, t.FreeVolumeCount, t.RemoteVolumeCount) + fmt.Fprintf(writer, " DataNode %s%s\n", t.Id, diskInfosToString(t.DiskInfos)) + var s statistics + for _, diskInfo := range t.DiskInfos { + s = s.plus(writeDiskInfo(writer, diskInfo)) + } + fmt.Fprintf(writer, " DataNode %s %+v \n", t.Id, s) + return s +} + +func writeDiskInfo(writer io.Writer, t *master_pb.DiskInfo) statistics { var s statistics + diskType := t.Type + if diskType == "" { + diskType = "hdd" + } + fmt.Fprintf(writer, " Disk %s(%s)\n", diskType, diskInfoToString(t)) sort.Slice(t.VolumeInfos, func(i, j int) bool { return t.VolumeInfos[i].Id < t.VolumeInfos[j].Id }) @@ -90,13 +122,14 @@ func writeDataNodeInfo(writer io.Writer, t *master_pb.DataNodeInfo) statistics { s = s.plus(writeVolumeInformationMessage(writer, vi)) } for _, ecShardInfo := range t.EcShardInfos { - fmt.Fprintf(writer, " ec volume id:%v collection:%v shards:%v\n", ecShardInfo.Id, ecShardInfo.Collection, erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds()) + fmt.Fprintf(writer, " ec volume id:%v collection:%v shards:%v\n", ecShardInfo.Id, ecShardInfo.Collection, erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds()) } - fmt.Fprintf(writer, " DataNode %s %+v \n", t.Id, s) + fmt.Fprintf(writer, " Disk %s %+v \n", diskType, s) return s } + func writeVolumeInformationMessage(writer io.Writer, t *master_pb.VolumeInformationMessage) statistics { - fmt.Fprintf(writer, " volume %+v \n", t) + fmt.Fprintf(writer, " volume %+v \n", t) return newStatistics(t) } diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index 2bc8dfad8..f9462beee 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -29,6 +29,7 @@ func (c *commandVolumeMove) Help() string { return `move a live volume from one volume server to another volume server volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id> + volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id> -disk [hdd|ssd] This command move a live volume from one volume server to another volume server. Here are the steps: @@ -40,6 +41,8 @@ func (c *commandVolumeMove) Help() string { Now the master will mark this volume id as writable. 5. This command asks the source volume server to delete the source volume + The option "-disk [hdd|ssd]" can be used to change the volume disk type. + ` } @@ -53,6 +56,7 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io. volumeIdInt := volMoveCommand.Int("volumeId", 0, "the volume id") sourceNodeStr := volMoveCommand.String("source", "", "the source volume server <host>:<port>") targetNodeStr := volMoveCommand.String("target", "", "the target volume server <host>:<port>") + diskTypeStr := volMoveCommand.String("disk", "", "[hdd|ssd] hard drive or solid state drive") if err = volMoveCommand.Parse(args); err != nil { return nil } @@ -65,14 +69,14 @@ func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("source and target volume servers are the same!") } - return LiveMoveVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second) + return LiveMoveVolume(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr) } // LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests. -func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration) (err error) { +func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, idleTimeout time.Duration, diskType string) (err error) { log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer) - lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer) + lastAppendAtNs, err := copyVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, diskType) if err != nil { return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err) } @@ -91,7 +95,7 @@ func LiveMoveVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, so return nil } -func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string) (lastAppendAtNs uint64, err error) { +func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer string, diskType string) (lastAppendAtNs uint64, err error) { // check to see if the volume is already read-only and if its not then we need // to mark it as read-only and then before we return we need to undo what we @@ -134,6 +138,7 @@ func copyVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, source resp, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{ VolumeId: uint32(volumeId), SourceDataNode: sourceVolumeServer, + DiskType: diskType, }) if replicateErr == nil { lastAppendAtNs = resp.LastAppendAtNs diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index a82454cd3..5216de66b 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "os" "sort" @@ -100,17 +101,19 @@ func evacuateNormalVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListRes // move away normal volumes volumeReplicas, _ := collectVolumeReplicaLocations(resp) - for _, vol := range thisNode.info.VolumeInfos { - hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) - if err != nil { - return fmt.Errorf("move away volume %d from %s: %v", vol.Id, volumeServer, err) - } - if !hasMoved { - if skipNonMoveable { - replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement)) - fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String()) - } else { - return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer) + for _, diskInfo := range thisNode.info.DiskInfos { + for _, vol := range diskInfo.VolumeInfos { + hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange) + if err != nil { + return fmt.Errorf("move away volume %d from %s: %v", vol.Id, volumeServer, err) + } + if !hasMoved { + if skipNonMoveable { + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement)) + fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String()) + } else { + return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer) + } } } } @@ -126,16 +129,18 @@ func evacuateEcVolumes(commandEnv *CommandEnv, resp *master_pb.VolumeListRespons } // move away ec volumes - for _, ecShardInfo := range thisNode.info.EcShardInfos { - hasMoved, err := moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange) - if err != nil { - return fmt.Errorf("move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err) - } - if !hasMoved { - if skipNonMoveable { - fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer) - } else { - return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer) + for _, diskInfo := range thisNode.info.DiskInfos { + for _, ecShardInfo := range diskInfo.EcShardInfos { + hasMoved, err := moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange) + if err != nil { + return fmt.Errorf("move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err) + } + if !hasMoved { + if skipNonMoveable { + fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer) + } else { + return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer) + } } } } @@ -174,8 +179,9 @@ func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEc } func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) { + fn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType)) sort.Slice(otherNodes, func(i, j int) bool { - return otherNodes[i].localVolumeRatio() < otherNodes[j].localVolumeRatio() + return otherNodes[i].localVolumeRatio(fn) > otherNodes[j].localVolumeRatio(fn) }) for i := 0; i < len(otherNodes); i++ { diff --git a/weed/shell/command_volume_tier_download.go b/weed/shell/command_volume_tier_download.go index d31c8c031..8aeb34d5c 100644 --- a/weed/shell/command_volume_tier_download.go +++ b/weed/shell/command_volume_tier_download.go @@ -86,9 +86,11 @@ func collectRemoteVolumes(topoInfo *master_pb.TopologyInfo, selectedCollection s vidMap := make(map[uint32]bool) eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { - for _, v := range dn.VolumeInfos { - if v.Collection == selectedCollection && v.RemoteStorageKey != "" && v.RemoteStorageName != "" { - vidMap[v.Id] = true + for _, diskInfo := range dn.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if v.Collection == selectedCollection && v.RemoteStorageKey != "" && v.RemoteStorageName != "" { + vidMap[v.Id] = true + } } } }) diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go new file mode 100644 index 000000000..3c6c31eac --- /dev/null +++ b/weed/shell/command_volume_tier_move.go @@ -0,0 +1,108 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "io" + "time" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func init() { + Commands = append(Commands, &commandVolumeTierMove{}) +} + +type commandVolumeTierMove struct { +} + +func (c *commandVolumeTierMove) Name() string { + return "volume.tier.upload" +} + +func (c *commandVolumeTierMove) Help() string { + return `change a volume from one disk type to another + + volume.tier.move -source=hdd -target=ssd [-collection=""] [-fullPercent=95] [-quietFor=1h] + volume.tier.move -target=hdd [-collection=""] -volumeId=<volume_id> + +` +} + +func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeId := tierCommand.Int("volumeId", 0, "the volume id") + collection := tierCommand.String("collection", "", "the collection name") + fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size") + quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period") + source := tierCommand.String("fromDiskType", "", "the source disk type") + target := tierCommand.String("toDiskType", "", "the target disk type") + if err = tierCommand.Parse(args); err != nil { + return nil + } + + if *source == *target { + return fmt.Errorf("source tier %s is the same as target tier %s", *source, *target) + } + + vid := needle.VolumeId(*volumeId) + + // volumeId is provided + if vid != 0 { + // return doVolumeTierMove(commandEnv, writer, *collection, vid, *dest, *keepLocalDatFile) + } + + // apply to all volumes in the collection + // reusing collectVolumeIdsForEcEncode for now + volumeIds, err := collectVolumeIdsForTierChange(commandEnv, *source, *collection, *fullPercentage, *quietPeriod) + if err != nil { + return err + } + fmt.Printf("tier move volumes: %v\n", volumeIds) + + return nil +} + +func collectVolumeIdsForTierChange(commandEnv *CommandEnv, sourceTier string, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) { + + var resp *master_pb.VolumeListResponse + err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) + return err + }) + if err != nil { + return + } + + quietSeconds := int64(quietPeriod / time.Second) + nowUnixSeconds := time.Now().Unix() + + fmt.Printf("collect %s volumes quiet for: %d seconds\n", sourceTier, quietSeconds) + + vidMap := make(map[uint32]bool) + eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { + for _, diskInfo := range dn.DiskInfos { + for _, v := range diskInfo.VolumeInfos { + if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == types.ToDiskType(sourceTier) { + if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 { + vidMap[v.Id] = true + } + } + } + } + }) + + for vid := range vidMap { + vids = append(vids, needle.VolumeId(vid)) + } + + return +} |
