aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLisandro Pin <lisandro.pin@proton.ch>2025-05-09 18:01:32 +0200
committerGitHub <noreply@github.com>2025-05-09 09:01:32 -0700
commit848d1f7c34f4332d1bef0fe05679ee95dcb4f75b (patch)
treef82808ec3d1a9bfcb550050d523c7562a0a43c22
parent2ae5b480a66fb21abbf6e27ff13af9b05742077a (diff)
downloadseaweedfs-848d1f7c34f4332d1bef0fe05679ee95dcb4f75b.tar.xz
seaweedfs-848d1f7c34f4332d1bef0fe05679ee95dcb4f75b.zip
Improve safety for weed shell's `ec.encode`. (#6773)
Improve safety for weed shells `ec.encode`. The current process for `ec.encode` is: 1. EC shards for a volume are generated and added to a single server 2. The original volume is deleted 3. EC shards get re-balanced across the entire topology It is then possible to lose data between #2 and #3, if the underlying volume storage/server/rack/DC happens to fail, for whatever reason. As a fix, this MR reworks `ec.encode` so: * Newly created EC shards are spread across all locations for the source volume. * Source volumes are deleted only after EC shards are converted and balanced.
-rw-r--r--weed/shell/command_ec_common.go8
-rw-r--r--weed/shell/command_ec_encode.go90
2 files changed, 63 insertions, 35 deletions
diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go
index c49ab7611..209d8a733 100644
--- a/weed/shell/command_ec_common.go
+++ b/weed/shell/command_ec_common.go
@@ -134,6 +134,14 @@ func NewErrorWaitGroup(maxConcurrency int) *ErrorWaitGroup {
}
}
+func (ewg *ErrorWaitGroup) Reset() {
+ close(ewg.wgSem)
+
+ ewg.wg = &sync.WaitGroup{}
+ ewg.wgSem = make(chan bool, ewg.maxConcurrency)
+ ewg.errors = nil
+}
+
func (ewg *ErrorWaitGroup) Add(f ErrorWaitGroupTask) {
if ewg.maxConcurrency <= 1 {
// Keep run order deterministic when parallelization is off
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index afbf90b44..d9e3a88c6 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -118,33 +118,42 @@ func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
if err = doEcEncode(commandEnv, *collection, volumeIds, *maxParallelization); err != nil {
return fmt.Errorf("ec encode for volumes %v: %v", volumeIds, err)
}
- // ...then re-balance ec shards.
+ // ...re-balance ec shards...
if err := EcBalance(commandEnv, balanceCollections, "", rp, *maxParallelization, *applyBalancing); err != nil {
return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err)
}
+ // ...then delete original volumes.
+ if err := doDeleteVolumes(commandEnv, volumeIds, *maxParallelization); err != nil {
+ return fmt.Errorf("re-balance ec shards for collection(s) %v: %v", balanceCollections, err)
+ }
return nil
}
-func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error {
- var ewg *ErrorWaitGroup
-
- if !commandEnv.isLocked() {
- return fmt.Errorf("lock is lost")
- }
-
- // resolve volume locations
- locations := map[needle.VolumeId][]wdclient.Location{}
+func volumeLocations(commandEnv *CommandEnv, volumeIds []needle.VolumeId) (map[needle.VolumeId][]wdclient.Location, error) {
+ res := map[needle.VolumeId][]wdclient.Location{}
for _, vid := range volumeIds {
ls, ok := commandEnv.MasterClient.GetLocationsClone(uint32(vid))
if !ok {
- return fmt.Errorf("volume %d not found", vid)
+ return nil, fmt.Errorf("volume %d not found", vid)
}
- locations[vid] = ls
+ res[vid] = ls
+ }
+
+ return res, nil
+}
+
+func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.VolumeId, maxParallelization int) error {
+ if !commandEnv.isLocked() {
+ return fmt.Errorf("lock is lost")
+ }
+ locations, err := volumeLocations(commandEnv, volumeIds)
+ if err != nil {
+ return nil
}
// mark volumes as readonly
- ewg = NewErrorWaitGroup(maxParallelization)
+ ewg := NewErrorWaitGroup(maxParallelization)
for _, vid := range volumeIds {
for _, l := range locations[vid] {
ewg.Add(func() error {
@@ -160,9 +169,9 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo
}
// generate ec shards
- ewg = NewErrorWaitGroup(maxParallelization)
- for _, vid := range volumeIds {
- target := locations[vid][0]
+ ewg.Reset()
+ for i, vid := range volumeIds {
+ target := locations[vid][i%len(locations[vid])]
ewg.Add(func() error {
if err := generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, target.ServerAddress()); err != nil {
return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, target.Url, err)
@@ -174,30 +183,13 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo
return err
}
- // ask the source volume server to delete the original volume
- ewg = NewErrorWaitGroup(maxParallelization)
- for _, vid := range volumeIds {
- for _, l := range locations[vid] {
- ewg.Add(func() error {
- if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil {
- return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)
- }
- fmt.Printf("deleted volume %d from %s\n", vid, l.Url)
- return nil
- })
- }
- }
- if err := ewg.Wait(); err != nil {
- return err
- }
-
// mount all ec shards for the converted volume
shardIds := make([]uint32, erasure_coding.TotalShardsCount)
for i := range shardIds {
shardIds[i] = uint32(i)
}
- ewg = NewErrorWaitGroup(maxParallelization)
+ ewg.Reset()
for _, vid := range volumeIds {
target := locations[vid][0]
ewg.Add(func() error {
@@ -214,9 +206,37 @@ func doEcEncode(commandEnv *CommandEnv, collection string, volumeIds []needle.Vo
return nil
}
+func doDeleteVolumes(commandEnv *CommandEnv, volumeIds []needle.VolumeId, maxParallelization int) error {
+ if !commandEnv.isLocked() {
+ return fmt.Errorf("lock is lost")
+ }
+ locations, err := volumeLocations(commandEnv, volumeIds)
+ if err != nil {
+ return nil
+ }
+
+ ewg := NewErrorWaitGroup(maxParallelization)
+ for _, vid := range volumeIds {
+ for _, l := range locations[vid] {
+ ewg.Add(func() error {
+ if err := deleteVolume(commandEnv.option.GrpcDialOption, vid, l.ServerAddress(), false); err != nil {
+ return fmt.Errorf("deleteVolume %s volume %d: %v", l.Url, vid, err)
+ }
+ fmt.Printf("deleted volume %d from %s\n", vid, l.Url)
+ return nil
+ })
+ }
+ }
+ if err := ewg.Wait(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
- fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer)
+ fmt.Printf("generateEcShards %d (collection %q) on %s ...\n", volumeId, collection, sourceVolumeServer)
err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{