diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-12-14 17:06:13 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-12-14 17:06:13 -0800 |
| commit | 7ed75784241ad8d7635113e3e173959f0e8446ae (patch) | |
| tree | 11c7eeb84c6aaed2ba328b0a46ed760a93fc277c /weed/shell/command_ec_decode.go | |
| parent | 8bdc4390a04604af79f91c7dce94e3b2b58442f7 (diff) | |
| download | seaweedfs-7ed75784241ad8d7635113e3e173959f0e8446ae.tar.xz seaweedfs-7ed75784241ad8d7635113e3e173959f0e8446ae.zip | |
fix(ec.decode): purge EC shards when volume is empty (#7749)HEADorigin/masterorigin/HEADmaster
* fix(ec.decode): purge EC shards when volume is empty
When an EC volume has no live entries (all deleted), ec.decode should not generate an empty normal volume. Instead, treat decode as a no-op and allow shard purge to proceed cleanly.\n\nFixes: #7748
* chore: address PR review comments
* test: cover live EC index + avoid magic string
* chore: harden empty-EC handling
- Make shard cleanup best-effort (collect errors)\n- Remove unreachable EOF handling in HasLiveNeedles\n- Add empty ecx test case\n- Share no-live-entries substring between server/client\n
* perf: parallelize EC shard unmount/delete across locations
* refactor: combine unmount+delete into single goroutine per location
* refactor: use errors.Join for multi-error aggregation
* refactor: use existing ErrorWaitGroup for parallel execution
* fix: capture loop variables + clarify SuperBlockSize safety
Diffstat (limited to 'weed/shell/command_ec_decode.go')
| -rw-r--r-- | weed/shell/command_ec_decode.go | 65 |
1 files changed, 47 insertions, 18 deletions
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 695641a31..a9e5c9113 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -5,11 +5,14 @@ import ( "flag" "fmt" "io" + "strings" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/storage/types" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -118,6 +121,11 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec // generate a normal volume err = generateNormalVolume(commandEnv.option.GrpcDialOption, vid, collection, targetNodeLocation) if err != nil { + // Special case: if the EC index has no live entries, decoding is a no-op. + // Just purge EC shards and return success without generating/mounting an empty volume. + if isEcDecodeEmptyVolumeErr(err) { + return unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcIndexBits, vid) + } return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err) } @@ -130,6 +138,44 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec return nil } +func isEcDecodeEmptyVolumeErr(err error) bool { + st, ok := status.FromError(err) + if !ok { + return false + } + if st.Code() != codes.FailedPrecondition { + return false + } + // Keep this robust against wording tweaks while still being specific. + return strings.Contains(st.Message(), erasure_coding.EcNoLiveEntriesSubstring) +} + +func unmountAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { + return unmountAndDeleteEcShardsWithPrefix("unmountAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid) +} + +func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { + ewg := NewErrorWaitGroup(len(nodeToEcIndexBits)) + + // unmount and delete ec shards in parallel (one goroutine per location) + for location, ecIndexBits := range nodeToEcIndexBits { + location, ecIndexBits := location, ecIndexBits // capture loop variables for goroutine + ewg.Add(func() error { + fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) + if err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()); err != nil { + return fmt.Errorf("%s unmount ec volume %d on %s: %w", prefix, vid, location, err) + } + + fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) + if err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()); err != nil { + return fmt.Errorf("%s delete ec volume %d on %s: %w", prefix, vid, location, err) + } + return nil + }) + } + return ewg.Wait() +} + func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { // mount volume @@ -142,24 +188,7 @@ func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection str return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err) } - // unmount ec shards - for location, ecIndexBits := range nodeToEcIndexBits { - fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) - err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()) - if err != nil { - return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err) - } - } - // delete ec shards - for location, ecIndexBits := range nodeToEcIndexBits { - fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) - err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()) - if err != nil { - return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err) - } - } - - return nil + return unmountAndDeleteEcShardsWithPrefix("mountVolumeAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid) } func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error { |
