aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-12-14 17:06:13 -0800
committerGitHub <noreply@github.com>2025-12-14 17:06:13 -0800
commit7ed75784241ad8d7635113e3e173959f0e8446ae (patch)
tree11c7eeb84c6aaed2ba328b0a46ed760a93fc277c
parent8bdc4390a04604af79f91c7dce94e3b2b58442f7 (diff)
downloadseaweedfs-7ed75784241ad8d7635113e3e173959f0e8446ae.tar.xz
seaweedfs-7ed75784241ad8d7635113e3e173959f0e8446ae.zip
fix(ec.decode): purge EC shards when volume is empty (#7749)HEADorigin/masterorigin/HEADmaster
* fix(ec.decode): purge EC shards when volume is empty When an EC volume has no live entries (all deleted), ec.decode should not generate an empty normal volume. Instead, treat decode as a no-op and allow shard purge to proceed cleanly.\n\nFixes: #7748 * chore: address PR review comments * test: cover live EC index + avoid magic string * chore: harden empty-EC handling - Make shard cleanup best-effort (collect errors)\n- Remove unreachable EOF handling in HasLiveNeedles\n- Add empty ecx test case\n- Share no-live-entries substring between server/client\n * perf: parallelize EC shard unmount/delete across locations * refactor: combine unmount+delete into single goroutine per location * refactor: use errors.Join for multi-error aggregation * refactor: use existing ErrorWaitGroup for parallel execution * fix: capture loop variables + clarify SuperBlockSize safety
-rw-r--r--weed/server/volume_grpc_erasure_coding.go14
-rw-r--r--weed/shell/command_ec_decode.go65
-rw-r--r--weed/storage/erasure_coding/ec_decoder.go22
-rw-r--r--weed/storage/erasure_coding/ec_decoder_test.go81
4 files changed, 164 insertions, 18 deletions
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 5d100bdda..ec59ffa39 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -20,6 +20,9 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
"github.com/seaweedfs/seaweedfs/weed/util"
+
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
/*
@@ -506,6 +509,17 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
}
dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
+
+ // If the EC index contains no live entries, decoding should be a no-op:
+ // just allow the caller to purge EC shards and do not generate an empty normal volume.
+ hasLive, err := erasure_coding.HasLiveNeedles(indexBaseFileName)
+ if err != nil {
+ return nil, fmt.Errorf("HasLiveNeedles %s: %w", indexBaseFileName, err)
+ }
+ if !hasLive {
+ return nil, status.Errorf(codes.FailedPrecondition, "ec volume %d %s", req.VolumeId, erasure_coding.EcNoLiveEntriesSubstring)
+ }
+
// calculate .dat file size
datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName)
if err != nil {
diff --git a/weed/shell/command_ec_decode.go b/weed/shell/command_ec_decode.go
index 695641a31..a9e5c9113 100644
--- a/weed/shell/command_ec_decode.go
+++ b/weed/shell/command_ec_decode.go
@@ -5,11 +5,14 @@ import (
"flag"
"fmt"
"io"
+ "strings"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@@ -118,6 +121,11 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
// generate a normal volume
err = generateNormalVolume(commandEnv.option.GrpcDialOption, vid, collection, targetNodeLocation)
if err != nil {
+ // Special case: if the EC index has no live entries, decoding is a no-op.
+ // Just purge EC shards and return success without generating/mounting an empty volume.
+ if isEcDecodeEmptyVolumeErr(err) {
+ return unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcIndexBits, vid)
+ }
return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
}
@@ -130,6 +138,44 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
return nil
}
+func isEcDecodeEmptyVolumeErr(err error) bool {
+ st, ok := status.FromError(err)
+ if !ok {
+ return false
+ }
+ if st.Code() != codes.FailedPrecondition {
+ return false
+ }
+ // Keep this robust against wording tweaks while still being specific.
+ return strings.Contains(st.Message(), erasure_coding.EcNoLiveEntriesSubstring)
+}
+
+func unmountAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
+ return unmountAndDeleteEcShardsWithPrefix("unmountAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid)
+}
+
+func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
+ ewg := NewErrorWaitGroup(len(nodeToEcIndexBits))
+
+ // unmount and delete ec shards in parallel (one goroutine per location)
+ for location, ecIndexBits := range nodeToEcIndexBits {
+ location, ecIndexBits := location, ecIndexBits // capture loop variables for goroutine
+ ewg.Add(func() error {
+ fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
+ if err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()); err != nil {
+ return fmt.Errorf("%s unmount ec volume %d on %s: %w", prefix, vid, location, err)
+ }
+
+ fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
+ if err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()); err != nil {
+ return fmt.Errorf("%s delete ec volume %d on %s: %w", prefix, vid, location, err)
+ }
+ return nil
+ })
+ }
+ return ewg.Wait()
+}
+
func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
// mount volume
@@ -142,24 +188,7 @@ func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection str
return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err)
}
- // unmount ec shards
- for location, ecIndexBits := range nodeToEcIndexBits {
- fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
- err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
- if err != nil {
- return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err)
- }
- }
- // delete ec shards
- for location, ecIndexBits := range nodeToEcIndexBits {
- fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
- err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
- if err != nil {
- return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err)
- }
- }
-
- return nil
+ return unmountAndDeleteEcShardsWithPrefix("mountVolumeAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid)
}
func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go
index a1d929f6c..429dd7ac4 100644
--- a/weed/storage/erasure_coding/ec_decoder.go
+++ b/weed/storage/erasure_coding/ec_decoder.go
@@ -14,6 +14,23 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
+// EcNoLiveEntriesSubstring is used for server/client coordination when ec.decode determines that
+// decoding should be a no-op (all entries are deleted).
+const EcNoLiveEntriesSubstring = "has no live entries"
+
+// HasLiveNeedles returns whether the EC index (.ecx) contains at least one live (non-deleted) entry.
+// This is used by ec.decode to avoid generating an empty normal volume when all entries were deleted.
+func HasLiveNeedles(indexBaseFileName string) (hasLive bool, err error) {
+ err = iterateEcxFile(indexBaseFileName, func(_ types.NeedleId, _ types.Offset, size types.Size) error {
+ if !size.IsDeleted() {
+ hasLive = true
+ return io.EOF // stop early
+ }
+ return nil
+ })
+ return
+}
+
// write .idx file from .ecx and .ecj files
func WriteIdxFileFromEcIndex(baseFileName string) (err error) {
@@ -52,6 +69,11 @@ func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64,
return 0, fmt.Errorf("read ec volume %s version: %v", dataBaseFileName, err)
}
+ // Safety: ensure datSize is at least SuperBlockSize. While the caller typically
+ // checks HasLiveNeedles first, this protects against direct calls to FindDatFileSize
+ // when all needles are deleted (see issue #7748).
+ datSize = int64(super_block.SuperBlockSize)
+
err = iterateEcxFile(indexBaseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error {
if size.IsDeleted() {
diff --git a/weed/storage/erasure_coding/ec_decoder_test.go b/weed/storage/erasure_coding/ec_decoder_test.go
new file mode 100644
index 000000000..625d55402
--- /dev/null
+++ b/weed/storage/erasure_coding/ec_decoder_test.go
@@ -0,0 +1,81 @@
+package erasure_coding_test
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+
+ erasure_coding "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+)
+
+func TestHasLiveNeedles_AllDeletedIsFalse(t *testing.T) {
+ dir := t.TempDir()
+
+ collection := "foo"
+ base := filepath.Join(dir, collection+"_1")
+
+ // Build an ecx file with only deleted entries.
+ // ecx file entries are the same format as .idx entries.
+ ecx := makeNeedleMapEntry(types.NeedleId(1), types.Offset{}, types.TombstoneFileSize)
+ if err := os.WriteFile(base+".ecx", ecx, 0644); err != nil {
+ t.Fatalf("write ecx: %v", err)
+ }
+
+ hasLive, err := erasure_coding.HasLiveNeedles(base)
+ if err != nil {
+ t.Fatalf("HasLiveNeedles: %v", err)
+ }
+ if hasLive {
+ t.Fatalf("expected no live entries")
+ }
+}
+
+func TestHasLiveNeedles_WithLiveEntryIsTrue(t *testing.T) {
+ dir := t.TempDir()
+
+ collection := "foo"
+ base := filepath.Join(dir, collection+"_1")
+
+ // Build an ecx file containing at least one live entry.
+ // ecx file entries are the same format as .idx entries.
+ live := makeNeedleMapEntry(types.NeedleId(1), types.Offset{}, types.Size(1))
+ if err := os.WriteFile(base+".ecx", live, 0644); err != nil {
+ t.Fatalf("write ecx: %v", err)
+ }
+
+ hasLive, err := erasure_coding.HasLiveNeedles(base)
+ if err != nil {
+ t.Fatalf("HasLiveNeedles: %v", err)
+ }
+ if !hasLive {
+ t.Fatalf("expected live entries")
+ }
+}
+
+func TestHasLiveNeedles_EmptyFileIsFalse(t *testing.T) {
+ dir := t.TempDir()
+
+ base := filepath.Join(dir, "foo_1")
+
+ // Create an empty ecx file.
+ if err := os.WriteFile(base+".ecx", []byte{}, 0644); err != nil {
+ t.Fatalf("write ecx: %v", err)
+ }
+
+ hasLive, err := erasure_coding.HasLiveNeedles(base)
+ if err != nil {
+ t.Fatalf("HasLiveNeedles: %v", err)
+ }
+ if hasLive {
+ t.Fatalf("expected no live entries for empty file")
+ }
+}
+
+func makeNeedleMapEntry(key types.NeedleId, offset types.Offset, size types.Size) []byte {
+ b := make([]byte, types.NeedleIdSize+types.OffsetSize+types.SizeSize)
+ types.NeedleIdToBytes(b[0:types.NeedleIdSize], key)
+ types.OffsetToBytes(b[types.NeedleIdSize:types.NeedleIdSize+types.OffsetSize], offset)
+ types.SizeToBytes(b[types.NeedleIdSize+types.OffsetSize:types.NeedleIdSize+types.OffsetSize+types.SizeSize], size)
+ return b
+}