aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_ec_decode.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-12-14 17:06:13 -0800
committerGitHub <noreply@github.com>2025-12-14 17:06:13 -0800
commit7ed75784241ad8d7635113e3e173959f0e8446ae (patch)
tree11c7eeb84c6aaed2ba328b0a46ed760a93fc277c /weed/shell/command_ec_decode.go
parent8bdc4390a04604af79f91c7dce94e3b2b58442f7 (diff)
downloadseaweedfs-7ed75784241ad8d7635113e3e173959f0e8446ae.tar.xz
seaweedfs-7ed75784241ad8d7635113e3e173959f0e8446ae.zip
fix(ec.decode): purge EC shards when volume is empty (#7749)HEADorigin/masterorigin/HEADmaster
* fix(ec.decode): purge EC shards when volume is empty When an EC volume has no live entries (all deleted), ec.decode should not generate an empty normal volume. Instead, treat decode as a no-op and allow shard purge to proceed cleanly.\n\nFixes: #7748 * chore: address PR review comments * test: cover live EC index + avoid magic string * chore: harden empty-EC handling - Make shard cleanup best-effort (collect errors)\n- Remove unreachable EOF handling in HasLiveNeedles\n- Add empty ecx test case\n- Share no-live-entries substring between server/client\n * perf: parallelize EC shard unmount/delete across locations * refactor: combine unmount+delete into single goroutine per location * refactor: use errors.Join for multi-error aggregation * refactor: use existing ErrorWaitGroup for parallel execution * fix: capture loop variables + clarify SuperBlockSize safety
Diffstat (limited to 'weed/shell/command_ec_decode.go')
-rw-r--r--weed/shell/command_ec_decode.go65
1 files changed, 47 insertions, 18 deletions
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index 695641a31..a9e5c9113 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -5,11 +5,14 @@ import (
"flag"
"fmt"
"io"
+ "strings"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@@ -118,6 +121,11 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
// generate a normal volume
err = generateNormalVolume(commandEnv.option.GrpcDialOption, vid, collection, targetNodeLocation)
if err != nil {
+ // Special case: if the EC index has no live entries, decoding is a no-op.
+ // Just purge EC shards and return success without generating/mounting an empty volume.
+ if isEcDecodeEmptyVolumeErr(err) {
+ return unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcIndexBits, vid)
+ }
return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
}
@@ -130,6 +138,44 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
return nil
}
+func isEcDecodeEmptyVolumeErr(err error) bool {
+ st, ok := status.FromError(err)
+ if !ok {
+ return false
+ }
+ if st.Code() != codes.FailedPrecondition {
+ return false
+ }
+ // Keep this robust against wording tweaks while still being specific.
+ return strings.Contains(st.Message(), erasure_coding.EcNoLiveEntriesSubstring)
+}
+
+func unmountAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
+ return unmountAndDeleteEcShardsWithPrefix("unmountAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid)
+}
+
+func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
+ ewg := NewErrorWaitGroup(len(nodeToEcIndexBits))
+
+ // unmount and delete ec shards in parallel (one goroutine per location)
+ for location, ecIndexBits := range nodeToEcIndexBits {
+ location, ecIndexBits := location, ecIndexBits // capture loop variables for goroutine
+ ewg.Add(func() error {
+ fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
+ if err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()); err != nil {
+ return fmt.Errorf("%s unmount ec volume %d on %s: %w", prefix, vid, location, err)
+ }
+
+ fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
+ if err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()); err != nil {
+ return fmt.Errorf("%s delete ec volume %d on %s: %w", prefix, vid, location, err)
+ }
+ return nil
+ })
+ }
+ return ewg.Wait()
+}
+
func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
// mount volume
@@ -142,24 +188,7 @@ func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection str
return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err)
}
- // unmount ec shards
- for location, ecIndexBits := range nodeToEcIndexBits {
- fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
- err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
- if err != nil {
- return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err)
- }
- }
- // delete ec shards
- for location, ecIndexBits := range nodeToEcIndexBits {
- fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
- err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
- if err != nil {
- return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err)
- }
- }
-
- return nil
+ return unmountAndDeleteEcShardsWithPrefix("mountVolumeAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid)
}
func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {