aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2025-05-09 02:14:14 +0200
committerGitHub <noreply@github.com>2025-05-08 17:14:14 -0700
commit97dad06ed89bc0bbc0c7299b190477c00608e0a4 (patch)
tree2c82c9436c583fec8f0634220630d98e09c234c0
parent582206b7d21433f2da06be440bd81094d3bd848a (diff)
downloadseaweedfs-97dad06ed89bc0bbc0c7299b190477c00608e0a4.tar.xz
seaweedfs-97dad06ed89bc0bbc0c7299b190477c00608e0a4.zip
Improve parallelization for `ec.encode` (#6769)
Improve parallelization for `ec.encode`. Instead of processing one volume at at time, perform all EC conversion steps (mark readonly -> generate EC shards -> delete volume -> remount) in parallel for all of them. This should substantially improve performance when EC encoding entire collections.
-rw-r--r--weed/shell/command_ec_encode.go95
1 files changed, 61 insertions, 34 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index a1d899d15..afbf90b44 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -9,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/grpc"
@@ -98,76 +99,74 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
}
}
- var collections []string
var volumeIds []needle.VolumeId
+ var balanceCollections []string
if vid := needle.VolumeId(*volumeId); vid != 0 {
// volumeId is provided
volumeIds = append(volumeIds, vid)
- collections = collectCollectionsForVolumeIds(topologyInfo, volumeIds)
+ balanceCollections = collectCollectionsForVolumeIds(topologyInfo, volumeIds)
} else {
// apply to all volumes for the given collection
volumeIds, err = collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
if err != nil {
return err
}
- collections = append(collections, *collection)
+ balanceCollections = []string{*collection}
}
// encode all requested volumes...
- for _, vid := range volumeIds {
- if err = doEcEncode(commandEnv, *collection, vid, *maxParallelization); err != nil {
- return fmt.Errorf("ec encode for volume %d: %v", vid, err)
- }
+ if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil {
+ return fmt.Errorf("ec encode for volumes %v: %v", volumeIds, err)
}
// ...then re-balance ec shards.
- if err := EcBalance(commandEnv, collections, "", rp, *maxParallelization, *applyBalancing); err != nil {
- return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", collections, err)
+ if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil {
+ return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err)
}
return nil
}
-func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, maxParallelization int) error {
+func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error {
var ewg *ErrorWaitGroup
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
}
- // find volume location
- locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(vid))
- if !found {
- return fmt.Errorf("volume %d not found", vid)
+ // resolve volume locations
+ locations := map[needle.VolumeId][]wdclient.Location{}
+ for _, vid := range volumeIds {
+ ls, ok := commandEnv.MasterClient.GetLocationsClone(uint32(vid))
+ if !ok {
+ return fmt.Errorf("volume %d not found", vid)
+ }
+ locations[vid] = ls
}
- target := locations[0]
- // mark the volume as readonly
+ // mark volumes as readonly
ewg = NewErrorWaitGroup(maxParallelization)
- for _, location := range locations {
- ewg.Add(func() error {
- if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, location, false, false); err != nil {
- return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, location.Url, err)
- }
- return nil
- })
+ for _, vid := range volumeIds {
+ for _, l := range locations[vid] {
+ ewg.Add(func() error {
+ if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, l, false, false); err != nil {
+ return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, l.Url, err)
+ }
+ return nil
+ })
+ }
}
if err := ewg.Wait(); err != nil {
return err
}
// generate ec shards
- if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, target.ServerAddress()); err != nil {
- return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, target.Url, err)
- }
-
- // ask the source volume server to delete the original volume
ewg = NewErrorWaitGroup(maxParallelization)
- for _, location := range locations {
+ for _, vid := range volumeIds {
+ target := locations[vid][0]
ewg.Add(func() error {
- if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress(), false); err != nil {
- return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, vid, err)
+ if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, target.ServerAddress()); err != nil {
+ return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, target.Url, err)
}
- fmt.Printf("deleted volume %d from %s\n", vid, location.Url)
return nil
})
}
@@ -175,13 +174,41 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
return err
}
+ // ask the source volume server to delete the original volume
+ ewg = NewErrorWaitGroup(maxParallelization)
+ for _, vid := range volumeIds {
+ for _, l := range locations[vid] {
+ ewg.Add(func() error {
+ if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil {
+ return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)
+ }
+ fmt.Printf("deleted volume %d from %s\n", vid, l.Url)
+ return nil
+ })
+ }
+ }
+ if err := ewg.Wait(); err != nil {
+ return err
+ }
+
// mount all ec shards for the converted volume
shardIds := make([]uint32, erasure_coding.TotalShardsCount)
for i := range shardIds {
shardIds[i] = uint32(i)
}
- if err := mountEcShards(commandEnv.option.GrpcDialOption, collection, vid, target.ServerAddress(), shardIds); err != nil {
- return fmt.Errorf("mount ec shards for volume %d on %s: %v", vid, target.Url, err)
+
+ ewg = NewErrorWaitGroup(maxParallelization)
+ for _, vid := range volumeIds {
+ target := locations[vid][0]
+ ewg.Add(func() error {
+ if err := mountEcShards(commandEnv.option.GrpcDialOption, collection, vid, target.ServerAddress(), shardIds); err != nil {
+ return fmt.Errorf("mount ec shards for volume %d on %s: %v", vid, target.Url, err)
+ }
+ return nil
+ })
+ }
+ if err := ewg.Wait(); err != nil {
+ return err
}
return nil