diff options
Diffstat (limited to 'weed/shell/command_ec_decode.go')
| -rw-r--r-- | weed/shell/command_ec_decode.go | 65 |
1 files changed, 47 insertions, 18 deletions
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go index 695641a31..a9e5c9113 100644 --- a/weed/shell/command_ec_decode.go +++ b/weed/shell/command_ec_decode.go @@ -5,11 +5,14 @@ import ( "flag" "fmt" "io" + "strings" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/storage/types" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -118,6 +121,11 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec // generate a normal volume err = generateNormalVolume(commandEnv.option.GrpcDialOption, vid, collection, targetNodeLocation) if err != nil { + // Special case: if the EC index has no live entries, decoding is a no-op. + // Just purge EC shards and return success without generating/mounting an empty volume. + if isEcDecodeEmptyVolumeErr(err) { + return unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcIndexBits, vid) + } return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err) } @@ -130,6 +138,44 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec return nil } +func isEcDecodeEmptyVolumeErr(err error) bool { + st, ok := status.FromError(err) + if !ok { + return false + } + if st.Code() != codes.FailedPrecondition { + return false + } + // Keep this robust against wording tweaks while still being specific. + return strings.Contains(st.Message(), erasure_coding.EcNoLiveEntriesSubstring) +} + +func unmountAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { + return unmountAndDeleteEcShardsWithPrefix("unmountAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid) +} + +func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { + ewg := NewErrorWaitGroup(len(nodeToEcIndexBits)) + + // unmount and delete ec shards in parallel (one goroutine per location) + for location, ecIndexBits := range nodeToEcIndexBits { + location, ecIndexBits := location, ecIndexBits // capture loop variables for goroutine + ewg.Add(func() error { + fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) + if err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()); err != nil { + return fmt.Errorf("%s unmount ec volume %d on %s: %w", prefix, vid, location, err) + } + + fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) + if err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()); err != nil { + return fmt.Errorf("%s delete ec volume %d on %s: %w", prefix, vid, location, err) + } + return nil + }) + } + return ewg.Wait() +} + func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error { // mount volume @@ -142,24 +188,7 @@ func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection str return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err) } - // unmount ec shards - for location, ecIndexBits := range nodeToEcIndexBits { - fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) - err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()) - if err != nil { - return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err) - } - } - // delete ec shards - for location, ecIndexBits := range nodeToEcIndexBits { - fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds()) - err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()) - if err != nil { - return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err) - } - } - - return nil + return unmountAndDeleteEcShardsWithPrefix("mountVolumeAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid) } func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error { |
