aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/shell/command_ec_balance.go48
-rw-r--r--weed/shell/command_ec_common.go51
-rw-r--r--weed/shell/command_ec_decode.go27
-rw-r--r--weed/shell/command_volume_fix_replication.go7
4 files changed, 75 insertions, 58 deletions
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 546a8340e..2baadb144 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -388,7 +388,10 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool
}
ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
- diskInfo := ecNode.info.DiskInfos[string(types.HardDriveType)]
+ diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
+ if !found {
+ return
+ }
for _, ecShardInfo := range diskInfo.EcShardInfos {
count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
}
@@ -413,28 +416,30 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool
if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
emptyNodeIds := make(map[uint32]bool)
- emptyDiskInfo := emptyNode.info.DiskInfos[string(types.HardDriveType)]
- for _, shards := range emptyDiskInfo.EcShardInfos {
- emptyNodeIds[shards.Id] = true
+ if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
+ for _, shards := range emptyDiskInfo.EcShardInfos {
+ emptyNodeIds[shards.Id] = true
+ }
}
- fullDiskInfo := fullNode.info.DiskInfos[string(types.HardDriveType)]
- for _, shards := range fullDiskInfo.EcShardInfos {
- if _, found := emptyNodeIds[shards.Id]; !found {
- for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
-
- fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
-
- err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
- if err != nil {
- return err
+ if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
+ for _, shards := range fullDiskInfo.EcShardInfos {
+ if _, found := emptyNodeIds[shards.Id]; !found {
+ for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
+
+ fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
+
+ err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
+ if err != nil {
+ return err
+ }
+
+ ecNodeIdToShardCount[emptyNode.info.Id]++
+ ecNodeIdToShardCount[fullNode.info.Id]--
+ hasMove = true
+ break
}
-
- ecNodeIdToShardCount[emptyNode.info.Id]++
- ecNodeIdToShardCount[fullNode.info.Id]--
- hasMove = true
break
}
- break
}
}
}
@@ -515,7 +520,10 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[
func collectVolumeIdToEcNodes(allEcNodes []*EcNode) map[needle.VolumeId][]*EcNode {
vidLocations := make(map[needle.VolumeId][]*EcNode)
for _, ecNode := range allEcNodes {
- diskInfo := ecNode.info.DiskInfos[string(types.HardDriveType)]
+ diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
+ if !found {
+ continue
+ }
for _, shardInfo := range diskInfo.EcShardInfos {
vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
}
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index 7c23fda0e..c3ffa67d6 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -288,10 +288,11 @@ func ceilDivide(total, n int) int {
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
- diskInfo := ecNode.info.DiskInfos[string(types.HardDriveType)]
- for _, shardInfo := range diskInfo.EcShardInfos {
- if needle.VolumeId(shardInfo.Id) == vid {
- return erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
+ for _, shardInfo := range diskInfo.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ return erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ }
}
}
@@ -301,18 +302,19 @@ func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.Shar
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
foundVolume := false
- diskInfo := ecNode.info.DiskInfos[string(types.HardDriveType)]
- for _, shardInfo := range diskInfo.EcShardInfos {
- if needle.VolumeId(shardInfo.Id) == vid {
- oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
- newShardBits := oldShardBits
- for _, shardId := range shardIds {
- newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
+ if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
+ for _, shardInfo := range diskInfo.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ newShardBits := oldShardBits
+ for _, shardId := range shardIds {
+ newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
+ }
+ shardInfo.EcIndexBits = uint32(newShardBits)
+ ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
+ foundVolume = true
+ break
}
- shardInfo.EcIndexBits = uint32(newShardBits)
- ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
- foundVolume = true
- break
}
}
@@ -335,16 +337,17 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
- diskInfo := ecNode.info.DiskInfos[string(types.HardDriveType)]
- for _, shardInfo := range diskInfo.EcShardInfos {
- if needle.VolumeId(shardInfo.Id) == vid {
- oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
- newShardBits := oldShardBits
- for _, shardId := range shardIds {
- newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
+ if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
+ for _, shardInfo := range diskInfo.EcShardInfos {
+ if needle.VolumeId(shardInfo.Id) == vid {
+ oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
+ newShardBits := oldShardBits
+ for _, shardId := range shardIds {
+ newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
+ }
+ shardInfo.EcIndexBits = uint32(newShardBits)
+ ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
}
- shardInfo.EcIndexBits = uint32(newShardBits)
- ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
}
}
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index 61b810a8c..3e1499d41 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -226,10 +226,11 @@ func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyIn
func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) {
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
- diskInfo := dn.DiskInfos[string(types.HardDriveType)]
- for _, v := range diskInfo.EcShardInfos {
- if v.Collection == selectedCollection && v.Id == uint32(vid) {
- ecShardInfos = append(ecShardInfos, v)
+ if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
+ for _, v := range diskInfo.EcShardInfos {
+ if v.Collection == selectedCollection && v.Id == uint32(vid) {
+ ecShardInfos = append(ecShardInfos, v)
+ }
}
}
})
@@ -241,10 +242,11 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection stri
vidMap := make(map[uint32]bool)
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
- diskInfo := dn.DiskInfos[string(types.HardDriveType)]
- for _, v := range diskInfo.EcShardInfos {
- if v.Collection == selectedCollection {
- vidMap[v.Id] = true
+ if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
+ for _, v := range diskInfo.EcShardInfos {
+ if v.Collection == selectedCollection {
+ vidMap[v.Id] = true
+ }
}
}
})
@@ -260,10 +262,11 @@ func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeI
nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits)
eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
- diskInfo := dn.DiskInfos[string(types.HardDriveType)]
- for _, v := range diskInfo.EcShardInfos {
- if v.Id == uint32(vid) {
- nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits)
+ if diskInfo, found := dn.DiskInfos[string(types.HardDriveType)]; found {
+ for _, v := range diskInfo.EcShardInfos {
+ if v.Id == uint32(vid) {
+ nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits)
+ }
}
}
})
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index c7de14318..7b2eb6769 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
"path/filepath"
"sort"
@@ -167,7 +168,8 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
keepDataNodesSorted(allLocations, replica.info.DiskType)
for _, dst := range allLocations {
// check whether data nodes satisfy the constraints
- if dst.dataNode.DiskInfos[replica.info.DiskType].FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
+ fn := capacityByFreeVolumeCount(types.ToDiskType(replica.info.DiskType))
+ if fn(dst.dataNode) > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
// check collection name pattern
if *c.collectionPattern != "" {
matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
@@ -218,8 +220,9 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
}
func keepDataNodesSorted(dataNodes []location, diskType string) {
+ fn := capacityByFreeVolumeCount(types.ToDiskType(diskType))
sort.Slice(dataNodes, func(i, j int) bool {
- return dataNodes[i].dataNode.DiskInfos[diskType].FreeVolumeCount > dataNodes[j].dataNode.DiskInfos[diskType].FreeVolumeCount
+ return fn(dataNodes[i].dataNode) > fn(dataNodes[j].dataNode)
})
}