aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_balance.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-06-03 20:25:02 -0700
committerChris Lu <chris.lu@gmail.com>2019-06-03 20:25:02 -0700
commitb05456fe07f49e50776124f5b3315c4bd7bfef36 (patch)
tree2c6934fbf7df3d1c7197014bf9463aea69273efa /weed/shell/command_ec_balance.go
parent11cffb3168fd18bbf28772a54a0c3fac13e671e4 (diff)
downloadseaweedfs-b05456fe07f49e50776124f5b3315c4bd7bfef36.tar.xz
seaweedfs-b05456fe07f49e50776124f5b3315c4bd7bfef36.zip
able to purge extra ec shard copies
Diffstat (limited to 'weed/shell/command_ec_balance.go')
-rw-r--r--weed/shell/command_ec_balance.go83
1 files changed, 60 insertions, 23 deletions
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 0934c79fd..664284df9 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -129,40 +129,77 @@ func balanceEcVolumes(commandEnv *commandEnv, collection string, applyBalancing
for vid, locations := range vidLocations {
- // collect all ec nodes with at least one free slot
- var possibleDestinationEcNodes []*EcNode
- for _, ecNode := range allEcNodes {
- if ecNode.freeEcSlot > 0 {
- possibleDestinationEcNodes = append(possibleDestinationEcNodes, ecNode)
- }
+ if err := doDeduplicateEcShards(ctx, commandEnv, collection, vid, locations, applyBalancing); err != nil {
+ return err
}
- // calculate average number of shards an ec node should have for one volume
- averageShardsPerEcNode := int(math.Ceil(float64(erasure_coding.TotalShardsCount) / float64(len(possibleDestinationEcNodes))))
+ if err := doBalanceEcShards(ctx, commandEnv, collection, vid, locations, allEcNodes, applyBalancing); err != nil {
+ return err
+ }
- fmt.Printf("vid %d averageShardsPerEcNode %+v\n", vid, averageShardsPerEcNode)
+ }
- // check whether this volume has ecNodes that are over average
- isOverLimit := false
- for _, ecNode := range locations {
- shardBits := findEcVolumeShards(ecNode, vid)
- if shardBits.ShardIdCount() > averageShardsPerEcNode {
- isOverLimit = true
- fmt.Printf("vid %d %s has %d shards, isOverLimit %+v\n", vid, ecNode.info.Id, shardBits.ShardIdCount(), isOverLimit)
- break
- }
+ return nil
+}
+
+func doBalanceEcShards(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId, locations []*EcNode, allEcNodes []*EcNode, applyBalancing bool) error {
+ // collect all ec nodes with at least one free slot
+ var possibleDestinationEcNodes []*EcNode
+ for _, ecNode := range allEcNodes {
+ if ecNode.freeEcSlot > 0 {
+ possibleDestinationEcNodes = append(possibleDestinationEcNodes, ecNode)
+ }
+ }
+ // calculate average number of shards an ec node should have for one volume
+ averageShardsPerEcNode := int(math.Ceil(float64(erasure_coding.TotalShardsCount) / float64(len(possibleDestinationEcNodes))))
+ fmt.Printf("vid %d averageShardsPerEcNode %+v\n", vid, averageShardsPerEcNode)
+ // check whether this volume has ecNodes that are over average
+ isOverLimit := false
+ for _, ecNode := range locations {
+ shardBits := findEcVolumeShards(ecNode, vid)
+ if shardBits.ShardIdCount() > averageShardsPerEcNode {
+ isOverLimit = true
+ fmt.Printf("vid %d %s has %d shards, isOverLimit %+v\n", vid, ecNode.info.Id, shardBits.ShardIdCount(), isOverLimit)
+ break
+ }
+ }
+ if isOverLimit {
+ if err := spreadShardsIntoMoreDataNodes(ctx, commandEnv, averageShardsPerEcNode, collection, vid, locations, possibleDestinationEcNodes, applyBalancing); err != nil {
+ return err
}
+ }
+ return nil
+}
- if isOverLimit {
+func doDeduplicateEcShards(ctx context.Context, commandEnv *commandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
- if err := spreadShardsIntoMoreDataNodes(ctx, commandEnv, averageShardsPerEcNode, collection, vid, locations, possibleDestinationEcNodes, applyBalancing); err != nil {
+ // check whether this volume has ecNodes that are over average
+ shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
+ for _, ecNode := range locations {
+ shardBits := findEcVolumeShards(ecNode, vid)
+ for _, shardId := range shardBits.ShardIds() {
+ shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
+ }
+ }
+ for shardId, ecNodes := range shardToLocations {
+ if len(ecNodes) <= 1 {
+ continue
+ }
+ sortEcNodes(ecNodes)
+ fmt.Printf("ec shard %d.%d has %d copies, removing from %+v\n", vid, shardId, len(ecNodes), ecNodes[1:])
+ if !applyBalancing {
+ continue
+ }
+ for _, ecNode := range ecNodes[1:] {
+ duplicatedShardIds := []uint32{uint32(shardId)}
+ if err := unmountEcShards(ctx, commandEnv.option.GrpcDialOption, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
+ return err
+ }
+ if err := sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, vid, ecNode.info.Id, duplicatedShardIds); err != nil {
return err
}
-
}
-
}
-
return nil
}