aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2024-12-18 20:59:48 +0100
committerGitHub <noreply@github.com>2024-12-18 11:59:48 -0800
commit44c48c929ac2e513a3ad5749744c77ab480ae1fe (patch)
treeb93904e1c7ca3ff6e4177281fc991a04d953f885
parent72af97162fa4b76557c9160a3809b0a9dd67cbbd (diff)
downloadseaweedfs-44c48c929ac2e513a3ad5749744c77ab480ae1fe.tar.xz
seaweedfs-44c48c929ac2e513a3ad5749744c77ab480ae1fe.zip
Parallelize volume replica operations within `ec.encode`. (#6374)
-rw-r--r--weed/shell/command_ec_encode.go21
-rw-r--r--weed/shell/command_volume_move.go8
2 files changed, 21 insertions, 8 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 62bf7fbbf..2b35c5c79 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -119,7 +119,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
// encode all requested volumes...
for _, vid := range volumeIds {
- if err = doEcEncode(commandEnv, *collection, vid); err != nil {
+ if err = doEcEncode(commandEnv, *collection, vid, *parallelize); err != nil {
return fmt.Errorf("ec encode for volume %d: %v", vid, err)
}
}
@@ -131,7 +131,7 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
return nil
}
-func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId) error {
+func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelize bool) error {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
}
@@ -142,11 +142,20 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId)
return fmt.Errorf("volume %d not found", vid)
}
- // fmt.Printf("found ec %d shards on %v\n", vid, locations)
-
// mark the volume as readonly
- if err := markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false); err != nil {
- return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
+ ewg := ErrorWaitGroup{
+ parallelize: parallelize,
+ }
+ for _, location := range locations {
+ ewg.Add(func() error {
+ if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, location, false, false); err != nil {
+ return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, location.Url, err)
+ }
+ return nil
+ })
+ }
+ if err := ewg.Wait(); err != nil {
+ return err
}
// generate ec shards
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index 2ddd3f625..26fd5fc58 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -227,10 +227,14 @@ func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId
})
}
+func markVolumeReplicaWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, location wdclient.Location, writable, persist bool) error {
+ fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
+ return markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist)
+}
+
func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable, persist bool) error {
for _, location := range locations {
- fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
- if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist); err != nil {
+ if err := markVolumeReplicaWritable(grpcDialOption, volumeId, location, writable, persist); err != nil {
return err
}
}