aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_encode.go
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2024-12-12 16:42:03 +0100
committerGitHub <noreply@github.com>2024-12-12 07:42:03 -0800
commit6320036c567bb3d2bde32824574233aee817cd53 (patch)
treebcda15c472cb853d66c9eeab652483e309b833f2 /weed/shell/command_ec_encode.go
parent700b95304bfd50c60cf6509438ff775cbaf21b16 (diff)
downloadseaweedfs-6320036c567bb3d2bde32824574233aee817cd53.tar.xz
seaweedfs-6320036c567bb3d2bde32824574233aee817cd53.zip
Delete legacy balancing code for `ec.encode`. (#6344)
Diffstat (limited to 'weed/shell/command_ec_encode.go')
-rw-r--r--weed/shell/command_ec_encode.go133
1 files changed, 0 insertions, 133 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index a4f1eec54..6ee530256 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -5,8 +5,6 @@ import (
"flag"
"fmt"
"io"
- "math/rand"
- "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -19,7 +17,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/wdclient"
)
func init() {
@@ -181,136 +178,6 @@ func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId,
}
-// TODO: delete this (now unused) shard spread logic.
-func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location, parallelCopy bool) (err error) {
-
- allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv)
- if err != nil {
- return err
- }
-
- if totalFreeEcSlots < erasure_coding.TotalShardsCount {
- return fmt.Errorf("not enough free ec shard slots. only %d left", totalFreeEcSlots)
- }
- allocatedDataNodes := allEcNodes
- if len(allocatedDataNodes) > erasure_coding.TotalShardsCount {
- allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount]
- }
-
- // calculate how many shards to allocate for these servers
- allocatedEcIds := balancedEcDistribution(allocatedDataNodes)
-
- // ask the data nodes to copy from the source volume server
- copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0], parallelCopy)
- if err != nil {
- return err
- }
-
- // unmount the to be deleted shards
- err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].ServerAddress(), copiedShardIds)
- if err != nil {
- return err
- }
-
- // ask the source volume server to clean up copied ec shards
- err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].ServerAddress(), copiedShardIds)
- if err != nil {
- return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
- }
-
- // ask the source volume server to delete the original volume
- for _, location := range existingLocations {
- fmt.Printf("delete volume %d from %s\n", volumeId, location.Url)
- err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.ServerAddress(), false)
- if err != nil {
- return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
- }
- }
-
- return err
-
-}
-
-func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location, parallelCopy bool) (actuallyCopied []uint32, err error) {
-
- fmt.Printf("parallelCopyEcShardsFromSource %d %s\n", volumeId, existingLocation.Url)
-
- var wg sync.WaitGroup
- shardIdChan := make(chan []uint32, len(targetServers))
- copyFunc := func(server *EcNode, allocatedEcShardIds []uint32) {
- defer wg.Done()
- copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server,
- allocatedEcShardIds, volumeId, collection, existingLocation.ServerAddress())
- if copyErr != nil {
- err = copyErr
- } else {
- shardIdChan <- copiedShardIds
- server.addEcVolumeShards(volumeId, collection, copiedShardIds)
- }
- }
- cleanupFunc := func(server *EcNode, allocatedEcShardIds []uint32) {
- if err := unmountEcShards(grpcDialOption, volumeId, pb.NewServerAddressFromDataNode(server.info), allocatedEcShardIds); err != nil {
- fmt.Printf("unmount aborted shards %d.%v on %s: %v\n", volumeId, allocatedEcShardIds, server.info.Id, err)
- }
- if err := sourceServerDeleteEcShards(grpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(server.info), allocatedEcShardIds); err != nil {
- fmt.Printf("remove aborted shards %d.%v on target server %s: %v\n", volumeId, allocatedEcShardIds, server.info.Id, err)
- }
- if err := sourceServerDeleteEcShards(grpcDialOption, collection, volumeId, existingLocation.ServerAddress(), allocatedEcShardIds); err != nil {
- fmt.Printf("remove aborted shards %d.%v on existing server %s: %v\n", volumeId, allocatedEcShardIds, existingLocation.ServerAddress(), err)
- }
- }
-
- // maybe parallelize
- for i, server := range targetServers {
- if len(allocatedEcIds[i]) <= 0 {
- continue
- }
-
- wg.Add(1)
- if parallelCopy {
- go copyFunc(server, allocatedEcIds[i])
- } else {
- copyFunc(server, allocatedEcIds[i])
- }
- }
- wg.Wait()
- close(shardIdChan)
-
- if err != nil {
- for i, server := range targetServers {
- if len(allocatedEcIds[i]) <= 0 {
- continue
- }
- cleanupFunc(server, allocatedEcIds[i])
- }
- return nil, err
- }
-
- for shardIds := range shardIdChan {
- actuallyCopied = append(actuallyCopied, shardIds...)
- }
-
- return
-}
-
-func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) {
- allocated = make([][]uint32, len(servers))
- allocatedShardIdIndex := uint32(0)
- serverIndex := rand.Intn(len(servers))
- for allocatedShardIdIndex < erasure_coding.TotalShardsCount {
- if servers[serverIndex].freeEcSlot > 0 {
- allocated[serverIndex] = append(allocated[serverIndex], allocatedShardIdIndex)
- allocatedShardIdIndex++
- }
- serverIndex++
- if serverIndex >= len(servers) {
- serverIndex = 0
- }
- }
-
- return allocated
-}
-
func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
// collect topology information
topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0)