aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_encode.go
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2025-01-17 10:02:30 +0100
committerGitHub <noreply@github.com>2025-01-17 01:02:30 -0800
commiteab2e0e1127e2d8ccdee9ee518e0ae20ea8311ba (patch)
tree6ef09e3b48efd8ec874abc32d7d7e8fffe487f05 /weed/shell/command_ec_encode.go
parentc7ae969c06476655bcb0268ca2fd8061bbd6e975 (diff)
downloadseaweedfs-eab2e0e1127e2d8ccdee9ee518e0ae20ea8311ba.tar.xz
seaweedfs-eab2e0e1127e2d8ccdee9ee518e0ae20ea8311ba.zip
`ec.encode`: Fix bug causing source volumes not being deleted after EC conversion. (#6447)
This logic was originally part of `spreadEcShards()`, which got removed during the unification effort with `ec.balance` (https://github.com/seaweedfs/seaweedfs/pull/6344), accidentally breaking functionality in the process. The commit restores the deletion code for EC'd volumes - with parallelization support.
Diffstat (limited to 'weed/shell/command_ec_encode.go')
-rw-r--r--weed/shell/command_ec_encode.go19
1 files changed, 18 insertions, 1 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index 829b93a3d..da0a94f0f 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -132,6 +132,8 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
}
func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, maxParallelization int) error {
+ var ewg *ErrorWaitGroup
+
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
}
@@ -143,7 +145,7 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
}
// mark the volume as readonly
- ewg := NewErrorWaitGroup(maxParallelization)
+ ewg = NewErrorWaitGroup(maxParallelization)
for _, location := range locations {
ewg.Add(func() error {
if err := markVolumeReplicaWritable(commandEnv.option.GrpcDialOption, vid, location, false, false); err != nil {
@@ -161,6 +163,21 @@ func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId,
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
}
+ // ask the source volume server to delete the original volume
+ ewg = NewErrorWaitGroup(maxParallelization)
+ for _, location := range locations {
+ ewg.Add(func() error {
+ if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress(), false); err != nil {
+ return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, vid, err)
+ }
+ fmt.Printf("deleted volume %d from %s\n", vid, location.Url)
+ return nil
+ })
+ }
+ if err := ewg.Wait(); err != nil {
+ return err
+ }
+
return nil
}