aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-10-26 22:48:58 -0700
committerGitHub <noreply@github.com>2025-10-26 22:48:58 -0700
commit0813138d578fd63b928ec1afc9ddabb191657743 (patch)
tree19d92182af2c5946bdafd84ea7f679b9d43c8af2
parent824dcac3bf5b75fd4b74bf83d7b08895422d4374 (diff)
downloadseaweedfs-0813138d578fd63b928ec1afc9ddabb191657743.tar.xz
seaweedfs-0813138d578fd63b928ec1afc9ddabb191657743.zip
Volume Server: handle incomplete ec encoding (#7384)
* handle incomplete ec encoding * unit tests * simplify, and better logs * Update disk_location_ec.go When loadEcShards() fails partway through, some EC shards may already be loaded into the l.ecVolumes map in memory. The previous code only cleaned up filesystem files but left orphaned in-memory state, which could cause memory leaks and inconsistent state. * address comments * Performance: Avoid Double os.Stat() Call * Platform Compatibility: Use filepath.Join * in memory cleanup * Update disk_location_ec.go * refactor * Added Shard Size Validation * check ec shard sizes * validate shard size * calculate expected shard size * refactoring * minor * fix shard directory * 10GB sparse files can be slow or fail on non-sparse FS. Use 10MB to hit SmallBlockSize math (1MB shards) deterministically. * grouping logic should be updated to use both collection and volumeId to ensure correctness * unexpected error * handle exceptions in tests; use constants * The check for orphaned shards should be performed for the previous volume before resetting sameVolumeShards for the new volume. * address comments * Eliminated Redundant Parsing in checkOrphanedShards * minor * Avoid misclassifying local EC as distributed when .dat stat errors occur; also standardize unload-before-remove. * fmt * refactor * refactor * adjust to warning
-rw-r--r--weed/storage/disk_location.go29
-rw-r--r--weed/storage/disk_location_ec.go252
-rw-r--r--weed/storage/disk_location_ec_realworld_test.go198
-rw-r--r--weed/storage/disk_location_ec_shard_size_test.go195
-rw-r--r--weed/storage/disk_location_ec_test.go643
5 files changed, 1301 insertions, 16 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index aac824318..e8c1d10e4 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -144,10 +144,26 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne
return false
}
- // skip if ec volumes exists
+ // parse out collection, volume id (moved up to use in EC validation)
+ vid, collection, err := volumeIdFromFileName(basename)
+ if err != nil {
+ glog.Warningf("get volume id failed, %s, err : %s", volumeName, err)
+ return false
+ }
+
+ // skip if ec volumes exists, but validate EC files first
if skipIfEcVolumesExists {
- if util.FileExists(l.IdxDirectory + "/" + volumeName + ".ecx") {
- return false
+ ecxFilePath := filepath.Join(l.IdxDirectory, volumeName+".ecx")
+ if util.FileExists(ecxFilePath) {
+ // Check if EC volume is valid by verifying shard count
+ if !l.validateEcVolume(collection, vid) {
+ glog.Warningf("EC volume %d validation failed, removing incomplete EC files to allow .dat file loading", vid)
+ l.removeEcVolumeFiles(collection, vid)
+ // Continue to load .dat file
+ } else {
+ // Valid EC volume exists, skip .dat file
+ return false
+ }
}
}
@@ -161,13 +177,6 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne
return false
}
- // parse out collection, volume id
- vid, collection, err := volumeIdFromFileName(basename)
- if err != nil {
- glog.Warningf("get volume id failed, %s, err : %s", volumeName, err)
- return false
- }
-
// avoid loading one volume more than once
l.volumesLock.RLock()
_, found := l.volumes[vid]
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index 0db73adc6..128bfd26f 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -10,6 +10,7 @@ import (
"slices"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
@@ -40,6 +41,23 @@ func (l *DiskLocation) DestroyEcVolume(vid needle.VolumeId) {
}
}
+// unloadEcVolume removes an EC volume from memory without deleting its files on disk.
+// This is useful for distributed EC volumes where shards may be on other servers.
+func (l *DiskLocation) unloadEcVolume(vid needle.VolumeId) {
+ var toClose *erasure_coding.EcVolume
+ l.ecVolumesLock.Lock()
+ if ecVolume, found := l.ecVolumes[vid]; found {
+ toClose = ecVolume
+ delete(l.ecVolumes, vid)
+ }
+ l.ecVolumesLock.Unlock()
+
+ // Close outside the lock to avoid holding write lock during I/O
+ if toClose != nil {
+ toClose.Close()
+ }
+}
+
func (l *DiskLocation) CollectEcShards(vid needle.VolumeId, shardFileNames []string) (ecVolume *erasure_coding.EcVolume, found bool) {
l.ecVolumesLock.RLock()
defer l.ecVolumesLock.RUnlock()
@@ -154,8 +172,18 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
slices.SortFunc(dirEntries, func(a, b os.DirEntry) int {
return strings.Compare(a.Name(), b.Name())
})
+
var sameVolumeShards []string
var prevVolumeId needle.VolumeId
+ var prevCollection string
+
+ // Helper to reset state between volume processing
+ reset := func() {
+ sameVolumeShards = nil
+ prevVolumeId = 0
+ prevCollection = ""
+ }
+
for _, fileInfo := range dirEntries {
if fileInfo.IsDir() {
continue
@@ -178,24 +206,31 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
// 0 byte files should be only appearing erroneously for ec data files
// so we ignore them
if re.MatchString(ext) && info.Size() > 0 {
- if prevVolumeId == 0 || volumeId == prevVolumeId {
+ // Group shards by both collection and volumeId to avoid mixing collections
+ if prevVolumeId == 0 || (volumeId == prevVolumeId && collection == prevCollection) {
sameVolumeShards = append(sameVolumeShards, fileInfo.Name())
} else {
+ // Before starting a new group, check if previous group had orphaned shards
+ l.checkOrphanedShards(sameVolumeShards, prevCollection, prevVolumeId)
sameVolumeShards = []string{fileInfo.Name()}
}
prevVolumeId = volumeId
+ prevCollection = collection
continue
}
- if ext == ".ecx" && volumeId == prevVolumeId {
- if err = l.loadEcShards(sameVolumeShards, collection, volumeId); err != nil {
- return fmt.Errorf("loadEcShards collection:%v volumeId:%d : %v", collection, volumeId, err)
- }
- prevVolumeId = volumeId
+ if ext == ".ecx" && volumeId == prevVolumeId && collection == prevCollection {
+ l.handleFoundEcxFile(sameVolumeShards, collection, volumeId)
+ reset()
continue
}
}
+
+ // Check for orphaned EC shards without .ecx file at the end of the directory scan
+ // This handles the last group of shards in the directory
+ l.checkOrphanedShards(sameVolumeShards, prevCollection, prevVolumeId)
+
return nil
}
@@ -237,3 +272,208 @@ func (l *DiskLocation) EcShardCount() int {
}
return shardCount
}
+
+// handleFoundEcxFile processes a complete group of EC shards when their .ecx file is found.
+// This includes validation, loading, and cleanup of incomplete/invalid EC volumes.
+func (l *DiskLocation) handleFoundEcxFile(shards []string, collection string, volumeId needle.VolumeId) {
+ // Check if this is an incomplete EC encoding (not a distributed EC volume)
+ // Key distinction: if .dat file still exists, EC encoding may have failed
+ // If .dat file is gone, this is likely a distributed EC volume with shards on multiple servers
+ baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId))
+ datFileName := baseFileName + ".dat"
+
+ // Determine .dat presence robustly; unexpected errors are treated as "exists"
+ datExists := l.checkDatFileExists(datFileName)
+
+ // Validate EC volume if .dat file exists (incomplete EC encoding scenario)
+ // This checks shard count, shard size consistency, and expected size vs .dat file
+ // If .dat is gone, EC encoding completed and shards are distributed across servers
+ if datExists && !l.validateEcVolume(collection, volumeId) {
+ glog.Warningf("Incomplete or invalid EC volume %d: .dat exists but validation failed, cleaning up EC files...", volumeId)
+ l.removeEcVolumeFiles(collection, volumeId)
+ return
+ }
+
+ // Attempt to load the EC shards
+ if err := l.loadEcShards(shards, collection, volumeId); err != nil {
+ // If EC shards failed to load and .dat still exists, clean up EC files to allow .dat file to be used
+ // If .dat is gone, log error but don't clean up (may be waiting for shards from other servers)
+ if datExists {
+ glog.Warningf("Failed to load EC shards for volume %d and .dat exists: %v, cleaning up EC files to use .dat...", volumeId, err)
+ // Unload first to release FDs, then remove files
+ l.unloadEcVolume(volumeId)
+ l.removeEcVolumeFiles(collection, volumeId)
+ } else {
+ glog.Warningf("Failed to load EC shards for volume %d: %v (this may be normal for distributed EC volumes)", volumeId, err)
+ // Clean up any partially loaded in-memory state. This does not delete files.
+ l.unloadEcVolume(volumeId)
+ }
+ return
+ }
+}
+
+// checkDatFileExists checks if .dat file exists with robust error handling.
+// Unexpected errors (permission, I/O) are treated as "exists" to avoid misclassifying
+// local EC as distributed EC, which is the safer fallback.
+func (l *DiskLocation) checkDatFileExists(datFileName string) bool {
+ if _, err := os.Stat(datFileName); err == nil {
+ return true
+ } else if !os.IsNotExist(err) {
+ glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err)
+ // Safer to assume local .dat exists to avoid misclassifying as distributed EC
+ return true
+ }
+ return false
+}
+
+// checkOrphanedShards checks if the given shards are orphaned (no .ecx file) and cleans them up if needed.
+// Returns true if orphaned shards were found and cleaned up.
+// This handles the case where EC encoding was interrupted before creating the .ecx file.
+func (l *DiskLocation) checkOrphanedShards(shards []string, collection string, volumeId needle.VolumeId) bool {
+ if len(shards) == 0 || volumeId == 0 {
+ return false
+ }
+
+ // Check if .dat file exists (incomplete encoding, not distributed EC)
+ baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId))
+ datFileName := baseFileName + ".dat"
+
+ if l.checkDatFileExists(datFileName) {
+ glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...",
+ len(shards), volumeId)
+ l.removeEcVolumeFiles(collection, volumeId)
+ return true
+ }
+ return false
+}
+
+// calculateExpectedShardSize computes the exact expected shard size based on .dat file size
+// The EC encoding process is deterministic:
+// 1. Data is processed in batches of (LargeBlockSize * DataShardsCount) for large blocks
+// 2. Remaining data is processed in batches of (SmallBlockSize * DataShardsCount) for small blocks
+// 3. Each shard gets exactly its portion, with zero-padding applied to incomplete blocks
+func calculateExpectedShardSize(datFileSize int64) int64 {
+ var shardSize int64
+
+ // Process large blocks (1GB * 10 = 10GB batches)
+ largeBatchSize := int64(erasure_coding.ErasureCodingLargeBlockSize) * int64(erasure_coding.DataShardsCount)
+ numLargeBatches := datFileSize / largeBatchSize
+ shardSize = numLargeBatches * int64(erasure_coding.ErasureCodingLargeBlockSize)
+ remainingSize := datFileSize - (numLargeBatches * largeBatchSize)
+
+ // Process remaining data in small blocks (1MB * 10 = 10MB batches)
+ if remainingSize > 0 {
+ smallBatchSize := int64(erasure_coding.ErasureCodingSmallBlockSize) * int64(erasure_coding.DataShardsCount)
+ numSmallBatches := (remainingSize + smallBatchSize - 1) / smallBatchSize // Ceiling division
+ shardSize += numSmallBatches * int64(erasure_coding.ErasureCodingSmallBlockSize)
+ }
+
+ return shardSize
+}
+
+// validateEcVolume checks if EC volume has enough shards to be functional
+// For distributed EC volumes (where .dat is deleted), any number of shards is valid
+// For incomplete EC encoding (where .dat still exists), we need at least DataShardsCount shards
+// Also validates that all shards have the same size (required for Reed-Solomon EC)
+// If .dat exists, it also validates shards match the expected size based on .dat file size
+func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId) bool {
+ baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid))
+ datFileName := baseFileName + ".dat"
+
+ var expectedShardSize int64 = -1
+ datExists := false
+
+ // If .dat file exists, compute exact expected shard size from it
+ if datFileInfo, err := os.Stat(datFileName); err == nil {
+ datExists = true
+ expectedShardSize = calculateExpectedShardSize(datFileInfo.Size())
+ } else if !os.IsNotExist(err) {
+ // If stat fails with unexpected error (permission, I/O), fail validation
+ // Don't treat this as "distributed EC" - it could be a temporary error
+ glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err)
+ return false
+ }
+
+ shardCount := 0
+ var actualShardSize int64 = -1
+
+ // Count shards and validate they all have the same size (required for Reed-Solomon EC)
+ // Shard files (.ec00 - .ec13) are always in l.Directory, not l.IdxDirectory
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardFileName := baseFileName + erasure_coding.ToExt(i)
+ fi, err := os.Stat(shardFileName)
+
+ if err == nil {
+ // Check if file has non-zero size
+ if fi.Size() > 0 {
+ // Validate all shards are the same size (required for Reed-Solomon EC)
+ if actualShardSize == -1 {
+ actualShardSize = fi.Size()
+ } else if fi.Size() != actualShardSize {
+ glog.Warningf("EC volume %d shard %d has size %d, expected %d (all EC shards must be same size)",
+ vid, i, fi.Size(), actualShardSize)
+ return false
+ }
+ shardCount++
+ }
+ } else if !os.IsNotExist(err) {
+ // If stat fails with unexpected error (permission, I/O), fail validation
+ // This is consistent with .dat file error handling
+ glog.Warningf("Failed to stat shard file %s: %v", shardFileName, err)
+ return false
+ }
+ }
+
+ // If .dat file exists, validate shard size matches expected size
+ if datExists && actualShardSize > 0 && expectedShardSize > 0 {
+ if actualShardSize != expectedShardSize {
+ glog.Warningf("EC volume %d: shard size %d doesn't match expected size %d (based on .dat file size)",
+ vid, actualShardSize, expectedShardSize)
+ return false
+ }
+ }
+
+ // If .dat file is gone, this is a distributed EC volume - any shard count is valid
+ if !datExists {
+ glog.V(1).Infof("EC volume %d: distributed EC (.dat removed) with %d shards", vid, shardCount)
+ return true
+ }
+
+ // If .dat file exists, we need at least DataShardsCount shards locally
+ // Otherwise it's an incomplete EC encoding that should be cleaned up
+ if shardCount < erasure_coding.DataShardsCount {
+ glog.Warningf("EC volume %d has .dat file but only %d shards (need at least %d for local EC)",
+ vid, shardCount, erasure_coding.DataShardsCount)
+ return false
+ }
+
+ return true
+}
+
+// removeEcVolumeFiles removes all EC-related files for a volume
+func (l *DiskLocation) removeEcVolumeFiles(collection string, vid needle.VolumeId) {
+ baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid))
+ indexBaseFileName := erasure_coding.EcShardFileName(collection, l.IdxDirectory, int(vid))
+
+ // Helper to remove a file with consistent error handling
+ removeFile := func(filePath, description string) {
+ if err := os.Remove(filePath); err != nil {
+ if !os.IsNotExist(err) {
+ glog.Warningf("Failed to remove incomplete %s %s: %v", description, filePath, err)
+ }
+ } else {
+ glog.V(2).Infof("Removed incomplete %s: %s", description, filePath)
+ }
+ }
+
+ // Remove index files first (.ecx, .ecj) before shard files
+ // This ensures that if cleanup is interrupted, the .ecx file won't trigger
+ // EC loading for incomplete/missing shards on next startup
+ removeFile(indexBaseFileName+".ecx", "EC index file")
+ removeFile(indexBaseFileName+".ecj", "EC journal file")
+
+ // Remove all EC shard files (.ec00 ~ .ec13) from data directory
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ removeFile(baseFileName+erasure_coding.ToExt(i), "EC shard file")
+ }
+}
diff --git a/weed/storage/disk_location_ec_realworld_test.go b/weed/storage/disk_location_ec_realworld_test.go
new file mode 100644
index 000000000..3a21ccb6c
--- /dev/null
+++ b/weed/storage/disk_location_ec_realworld_test.go
@@ -0,0 +1,198 @@
+package storage
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+)
+
+// TestCalculateExpectedShardSizeWithRealEncoding validates our shard size calculation
+// by actually running EC encoding on real files and comparing the results
+func TestCalculateExpectedShardSizeWithRealEncoding(t *testing.T) {
+ tempDir := t.TempDir()
+
+ tests := []struct {
+ name string
+ datFileSize int64
+ description string
+ }{
+ {
+ name: "5MB file",
+ datFileSize: 5 * 1024 * 1024,
+ description: "Small file that needs 1 small block per shard",
+ },
+ {
+ name: "10MB file (exactly 10 small blocks)",
+ datFileSize: 10 * 1024 * 1024,
+ description: "Exactly fits in 1MB small blocks",
+ },
+ {
+ name: "15MB file",
+ datFileSize: 15 * 1024 * 1024,
+ description: "Requires 2 small blocks per shard",
+ },
+ {
+ name: "50MB file",
+ datFileSize: 50 * 1024 * 1024,
+ description: "Requires 5 small blocks per shard",
+ },
+ {
+ name: "100MB file",
+ datFileSize: 100 * 1024 * 1024,
+ description: "Requires 10 small blocks per shard",
+ },
+ {
+ name: "512MB file",
+ datFileSize: 512 * 1024 * 1024,
+ description: "Requires 52 small blocks per shard (rounded up)",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Create a test .dat file with the specified size
+ baseFileName := filepath.Join(tempDir, "test_volume")
+ datFileName := baseFileName + ".dat"
+
+ // Create .dat file with random data pattern (so it's compressible but realistic)
+ datFile, err := os.Create(datFileName)
+ if err != nil {
+ t.Fatalf("Failed to create .dat file: %v", err)
+ }
+
+ // Write some pattern data (not all zeros, to be more realistic)
+ pattern := make([]byte, 4096)
+ for i := range pattern {
+ pattern[i] = byte(i % 256)
+ }
+
+ written := int64(0)
+ for written < tt.datFileSize {
+ toWrite := tt.datFileSize - written
+ if toWrite > int64(len(pattern)) {
+ toWrite = int64(len(pattern))
+ }
+ n, err := datFile.Write(pattern[:toWrite])
+ if err != nil {
+ t.Fatalf("Failed to write to .dat file: %v", err)
+ }
+ written += int64(n)
+ }
+ datFile.Close()
+
+ // Calculate expected shard size using our function
+ expectedShardSize := calculateExpectedShardSize(tt.datFileSize)
+
+ // Run actual EC encoding
+ err = erasure_coding.WriteEcFiles(baseFileName)
+ if err != nil {
+ t.Fatalf("Failed to encode EC files: %v", err)
+ }
+
+ // Measure actual shard sizes
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardFileName := baseFileName + erasure_coding.ToExt(i)
+ shardInfo, err := os.Stat(shardFileName)
+ if err != nil {
+ t.Fatalf("Failed to stat shard file %s: %v", shardFileName, err)
+ }
+
+ actualShardSize := shardInfo.Size()
+
+ // Verify actual size matches expected size
+ if actualShardSize != expectedShardSize {
+ t.Errorf("Shard %d size mismatch:\n"+
+ " .dat file size: %d bytes\n"+
+ " Expected shard size: %d bytes\n"+
+ " Actual shard size: %d bytes\n"+
+ " Difference: %d bytes\n"+
+ " %s",
+ i, tt.datFileSize, expectedShardSize, actualShardSize,
+ actualShardSize-expectedShardSize, tt.description)
+ }
+ }
+
+ // If we got here, all shards match!
+ t.Logf("✓ SUCCESS: .dat size %d → actual shard size %d matches calculated size (%s)",
+ tt.datFileSize, expectedShardSize, tt.description)
+
+ // Cleanup
+ os.Remove(datFileName)
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ os.Remove(baseFileName + erasure_coding.ToExt(i))
+ }
+ })
+ }
+}
+
+// TestCalculateExpectedShardSizeEdgeCases tests edge cases with real encoding
+func TestCalculateExpectedShardSizeEdgeCases(t *testing.T) {
+ tempDir := t.TempDir()
+
+ tests := []struct {
+ name string
+ datFileSize int64
+ }{
+ {"1 byte file", 1},
+ {"1KB file", 1024},
+ {"10KB file", 10 * 1024},
+ {"1MB file (1 small block)", 1024 * 1024},
+ {"1MB + 1 byte", 1024*1024 + 1},
+ {"9.9MB (almost 1 small block per shard)", 9*1024*1024 + 900*1024},
+ {"10.1MB (just over 1 small block per shard)", 10*1024*1024 + 100*1024},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ baseFileName := filepath.Join(tempDir, tt.name)
+ datFileName := baseFileName + ".dat"
+
+ // Create .dat file
+ datFile, err := os.Create(datFileName)
+ if err != nil {
+ t.Fatalf("Failed to create .dat file: %v", err)
+ }
+
+ // Write exactly the specified number of bytes
+ data := make([]byte, tt.datFileSize)
+ for i := range data {
+ data[i] = byte(i % 256)
+ }
+ datFile.Write(data)
+ datFile.Close()
+
+ // Calculate expected
+ expectedShardSize := calculateExpectedShardSize(tt.datFileSize)
+
+ // Run actual EC encoding
+ err = erasure_coding.WriteEcFiles(baseFileName)
+ if err != nil {
+ t.Fatalf("Failed to encode EC files: %v", err)
+ }
+
+ // Check first shard (all should be same size)
+ shardFileName := baseFileName + erasure_coding.ToExt(0)
+ shardInfo, err := os.Stat(shardFileName)
+ if err != nil {
+ t.Fatalf("Failed to stat shard file: %v", err)
+ }
+
+ actualShardSize := shardInfo.Size()
+
+ if actualShardSize != expectedShardSize {
+ t.Errorf("File size %d: expected shard %d, got %d (diff: %d)",
+ tt.datFileSize, expectedShardSize, actualShardSize, actualShardSize-expectedShardSize)
+ } else {
+ t.Logf("✓ File size %d → shard size %d (correct)", tt.datFileSize, actualShardSize)
+ }
+
+ // Cleanup
+ os.Remove(datFileName)
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ os.Remove(baseFileName + erasure_coding.ToExt(i))
+ }
+ })
+ }
+}
diff --git a/weed/storage/disk_location_ec_shard_size_test.go b/weed/storage/disk_location_ec_shard_size_test.go
new file mode 100644
index 000000000..e58c1c129
--- /dev/null
+++ b/weed/storage/disk_location_ec_shard_size_test.go
@@ -0,0 +1,195 @@
+package storage
+
+import (
+ "testing"
+)
+
+func TestCalculateExpectedShardSize(t *testing.T) {
+ const (
+ largeBlock = 1024 * 1024 * 1024 // 1GB
+ smallBlock = 1024 * 1024 // 1MB
+ dataShards = 10
+ largeBatchSize = largeBlock * dataShards // 10GB
+ smallBatchSize = smallBlock * dataShards // 10MB
+ )
+
+ tests := []struct {
+ name string
+ datFileSize int64
+ expectedShardSize int64
+ description string
+ }{
+ // Edge case: empty file
+ {
+ name: "0 bytes (empty file)",
+ datFileSize: 0,
+ expectedShardSize: 0,
+ description: "Empty file has 0 shard size",
+ },
+
+ // Boundary tests: exact multiples of large block
+ {
+ name: "Exact 10GB (1 large batch)",
+ datFileSize: largeBatchSize, // 10GB = 1 large batch
+ expectedShardSize: largeBlock, // 1GB per shard
+ description: "Exactly fits in large blocks",
+ },
+ {
+ name: "Exact 20GB (2 large batches)",
+ datFileSize: 2 * largeBatchSize, // 20GB
+ expectedShardSize: 2 * largeBlock, // 2GB per shard
+ description: "2 complete large batches",
+ },
+ {
+ name: "Just under large batch (10GB - 1 byte)",
+ datFileSize: largeBatchSize - 1, // 10,737,418,239 bytes
+ expectedShardSize: 1024 * smallBlock, // 1024MB = 1GB (needs 1024 small blocks)
+ description: "Just under 10GB needs 1024 small blocks",
+ },
+ {
+ name: "Just over large batch (10GB + 1 byte)",
+ datFileSize: largeBatchSize + 1, // 10GB + 1 byte
+ expectedShardSize: largeBlock + smallBlock, // 1GB + 1MB
+ description: "Just over 10GB adds 1 small block",
+ },
+
+ // Boundary tests: exact multiples of small batch
+ {
+ name: "Exact 10MB (1 small batch)",
+ datFileSize: smallBatchSize, // 10MB
+ expectedShardSize: smallBlock, // 1MB per shard
+ description: "Exactly fits in 1 small batch",
+ },
+ {
+ name: "Exact 20MB (2 small batches)",
+ datFileSize: 2 * smallBatchSize, // 20MB
+ expectedShardSize: 2 * smallBlock, // 2MB per shard
+ description: "2 complete small batches",
+ },
+ {
+ name: "Just under small batch (10MB - 1 byte)",
+ datFileSize: smallBatchSize - 1, // 10MB - 1 byte
+ expectedShardSize: smallBlock, // Still needs 1MB per shard (rounds up)
+ description: "Just under 10MB rounds up to 1 small block",
+ },
+ {
+ name: "Just over small batch (10MB + 1 byte)",
+ datFileSize: smallBatchSize + 1, // 10MB + 1 byte
+ expectedShardSize: 2 * smallBlock, // 2MB per shard
+ description: "Just over 10MB needs 2 small blocks",
+ },
+
+ // Mixed: large batch + partial small batch
+ {
+ name: "10GB + 1MB",
+ datFileSize: largeBatchSize + 1*1024*1024, // 10GB + 1MB
+ expectedShardSize: largeBlock + smallBlock, // 1GB + 1MB
+ description: "1 large batch + 1MB needs 1 small block",
+ },
+ {
+ name: "10GB + 5MB",
+ datFileSize: largeBatchSize + 5*1024*1024, // 10GB + 5MB
+ expectedShardSize: largeBlock + smallBlock, // 1GB + 1MB
+ description: "1 large batch + 5MB rounds up to 1 small block",
+ },
+ {
+ name: "10GB + 15MB",
+ datFileSize: largeBatchSize + 15*1024*1024, // 10GB + 15MB
+ expectedShardSize: largeBlock + 2*smallBlock, // 1GB + 2MB
+ description: "1 large batch + 15MB needs 2 small blocks",
+ },
+
+ // Original test cases
+ {
+ name: "11GB (1 large batch + 103 small blocks)",
+ datFileSize: 11 * 1024 * 1024 * 1024, // 11GB
+ expectedShardSize: 1*1024*1024*1024 + 103*1024*1024, // 1GB + 103MB (103 small blocks for 1GB remaining)
+ description: "1GB large + 1GB remaining needs 103 small blocks",
+ },
+ {
+ name: "5MB (requires 1 small block per shard)",
+ datFileSize: 5 * 1024 * 1024, // 5MB
+ expectedShardSize: 1 * 1024 * 1024, // 1MB per shard (rounded up)
+ description: "Small file rounds up to 1MB per shard",
+ },
+ {
+ name: "1KB (minimum size)",
+ datFileSize: 1024,
+ expectedShardSize: 1 * 1024 * 1024, // 1MB per shard (1 small block)
+ description: "Tiny file needs 1 small block",
+ },
+ {
+ name: "10.5GB (mixed)",
+ datFileSize: 10*1024*1024*1024 + 512*1024*1024, // 10.5GB
+ expectedShardSize: 1*1024*1024*1024 + 52*1024*1024, // 1GB + 52MB (52 small blocks for 512MB remaining)
+ description: "1GB large + 512MB remaining needs 52 small blocks",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ actualShardSize := calculateExpectedShardSize(tt.datFileSize)
+
+ if actualShardSize != tt.expectedShardSize {
+ t.Errorf("Expected shard size %d, got %d. %s",
+ tt.expectedShardSize, actualShardSize, tt.description)
+ }
+
+ t.Logf("✓ File size: %d → Shard size: %d (%s)",
+ tt.datFileSize, actualShardSize, tt.description)
+ })
+ }
+}
+
+// TestShardSizeValidationScenarios tests realistic scenarios
+func TestShardSizeValidationScenarios(t *testing.T) {
+ scenarios := []struct {
+ name string
+ datFileSize int64
+ actualShardSize int64
+ shouldBeValid bool
+ }{
+ {
+ name: "Valid: exact match for 10GB",
+ datFileSize: 10 * 1024 * 1024 * 1024, // 10GB
+ actualShardSize: 1 * 1024 * 1024 * 1024, // 1GB (exact)
+ shouldBeValid: true,
+ },
+ {
+ name: "Invalid: 1 byte too small",
+ datFileSize: 10 * 1024 * 1024 * 1024, // 10GB
+ actualShardSize: 1*1024*1024*1024 - 1, // 1GB - 1 byte
+ shouldBeValid: false,
+ },
+ {
+ name: "Invalid: 1 byte too large",
+ datFileSize: 10 * 1024 * 1024 * 1024, // 10GB
+ actualShardSize: 1*1024*1024*1024 + 1, // 1GB + 1 byte
+ shouldBeValid: false,
+ },
+ {
+ name: "Valid: small file exact match",
+ datFileSize: 5 * 1024 * 1024, // 5MB
+ actualShardSize: 1 * 1024 * 1024, // 1MB (exact)
+ shouldBeValid: true,
+ },
+ {
+ name: "Invalid: wrong size for small file",
+ datFileSize: 5 * 1024 * 1024, // 5MB
+ actualShardSize: 500 * 1024, // 500KB (too small)
+ shouldBeValid: false,
+ },
+ }
+
+ for _, scenario := range scenarios {
+ t.Run(scenario.name, func(t *testing.T) {
+ expectedSize := calculateExpectedShardSize(scenario.datFileSize)
+ isValid := scenario.actualShardSize == expectedSize
+
+ if isValid != scenario.shouldBeValid {
+ t.Errorf("Expected validation result %v, got %v. Actual shard: %d, Expected: %d",
+ scenario.shouldBeValid, isValid, scenario.actualShardSize, expectedSize)
+ }
+ })
+ }
+}
diff --git a/weed/storage/disk_location_ec_test.go b/weed/storage/disk_location_ec_test.go
new file mode 100644
index 000000000..097536118
--- /dev/null
+++ b/weed/storage/disk_location_ec_test.go
@@ -0,0 +1,643 @@
+package storage
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
+ "github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "github.com/seaweedfs/seaweedfs/weed/storage/types"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+// TestIncompleteEcEncodingCleanup tests the cleanup logic for incomplete EC encoding scenarios
+func TestIncompleteEcEncodingCleanup(t *testing.T) {
+ tests := []struct {
+ name string
+ volumeId needle.VolumeId
+ collection string
+ createDatFile bool
+ createEcxFile bool
+ createEcjFile bool
+ numShards int
+ expectCleanup bool
+ expectLoadSuccess bool
+ }{
+ {
+ name: "Incomplete EC: shards without .ecx, .dat exists - should cleanup",
+ volumeId: 100,
+ collection: "",
+ createDatFile: true,
+ createEcxFile: false,
+ createEcjFile: false,
+ numShards: 14, // All shards but no .ecx
+ expectCleanup: true,
+ expectLoadSuccess: false,
+ },
+ {
+ name: "Distributed EC: shards without .ecx, .dat deleted - should NOT cleanup",
+ volumeId: 101,
+ collection: "",
+ createDatFile: false,
+ createEcxFile: false,
+ createEcjFile: false,
+ numShards: 5, // Partial shards, distributed
+ expectCleanup: false,
+ expectLoadSuccess: false,
+ },
+ {
+ name: "Incomplete EC: shards with .ecx but < 10 shards, .dat exists - should cleanup",
+ volumeId: 102,
+ collection: "",
+ createDatFile: true,
+ createEcxFile: true,
+ createEcjFile: false,
+ numShards: 7, // Less than DataShardsCount (10)
+ expectCleanup: true,
+ expectLoadSuccess: false,
+ },
+ {
+ name: "Valid local EC: shards with .ecx, >= 10 shards, .dat exists - should load",
+ volumeId: 103,
+ collection: "",
+ createDatFile: true,
+ createEcxFile: true,
+ createEcjFile: false,
+ numShards: 14, // All shards
+ expectCleanup: false,
+ expectLoadSuccess: true, // Would succeed if .ecx was valid
+ },
+ {
+ name: "Distributed EC: shards with .ecx, .dat deleted - should load",
+ volumeId: 104,
+ collection: "",
+ createDatFile: false,
+ createEcxFile: true,
+ createEcjFile: false,
+ numShards: 10, // Enough shards
+ expectCleanup: false,
+ expectLoadSuccess: true, // Would succeed if .ecx was valid
+ },
+ {
+ name: "Incomplete EC with collection: shards without .ecx, .dat exists - should cleanup",
+ volumeId: 105,
+ collection: "test_collection",
+ createDatFile: true,
+ createEcxFile: false,
+ createEcjFile: false,
+ numShards: 14,
+ expectCleanup: true,
+ expectLoadSuccess: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Use per-subtest temp directory for stronger isolation
+ tempDir := t.TempDir()
+
+ // Create DiskLocation
+ minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
+ diskLocation := &DiskLocation{
+ Directory: tempDir,
+ DirectoryUuid: "test-uuid",
+ IdxDirectory: tempDir,
+ DiskType: types.HddType,
+ MaxVolumeCount: 100,
+ OriginalMaxVolumeCount: 100,
+ MinFreeSpace: minFreeSpace,
+ }
+ diskLocation.volumes = make(map[needle.VolumeId]*Volume)
+ diskLocation.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
+
+ // Setup test files
+ baseFileName := erasure_coding.EcShardFileName(tt.collection, tempDir, int(tt.volumeId))
+
+ // Use deterministic but small size: 10MB .dat => 1MB per shard
+ datFileSize := int64(10 * 1024 * 1024) // 10MB
+ expectedShardSize := calculateExpectedShardSize(datFileSize)
+
+ // Create .dat file if needed
+ if tt.createDatFile {
+ datFile, err := os.Create(baseFileName + ".dat")
+ if err != nil {
+ t.Fatalf("Failed to create .dat file: %v", err)
+ }
+ if err := datFile.Truncate(datFileSize); err != nil {
+ t.Fatalf("Failed to truncate .dat file: %v", err)
+ }
+ if err := datFile.Close(); err != nil {
+ t.Fatalf("Failed to close .dat file: %v", err)
+ }
+ }
+
+ // Create EC shard files
+ for i := 0; i < tt.numShards; i++ {
+ shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i))
+ if err != nil {
+ t.Fatalf("Failed to create shard file: %v", err)
+ }
+ if err := shardFile.Truncate(expectedShardSize); err != nil {
+ t.Fatalf("Failed to truncate shard file: %v", err)
+ }
+ if err := shardFile.Close(); err != nil {
+ t.Fatalf("Failed to close shard file: %v", err)
+ }
+ }
+
+ // Create .ecx file if needed
+ if tt.createEcxFile {
+ ecxFile, err := os.Create(baseFileName + ".ecx")
+ if err != nil {
+ t.Fatalf("Failed to create .ecx file: %v", err)
+ }
+ if _, err := ecxFile.WriteString("dummy ecx data"); err != nil {
+ ecxFile.Close()
+ t.Fatalf("Failed to write .ecx file: %v", err)
+ }
+ if err := ecxFile.Close(); err != nil {
+ t.Fatalf("Failed to close .ecx file: %v", err)
+ }
+ }
+
+ // Create .ecj file if needed
+ if tt.createEcjFile {
+ ecjFile, err := os.Create(baseFileName + ".ecj")
+ if err != nil {
+ t.Fatalf("Failed to create .ecj file: %v", err)
+ }
+ if _, err := ecjFile.WriteString("dummy ecj data"); err != nil {
+ ecjFile.Close()
+ t.Fatalf("Failed to write .ecj file: %v", err)
+ }
+ if err := ecjFile.Close(); err != nil {
+ t.Fatalf("Failed to close .ecj file: %v", err)
+ }
+ }
+
+ // Run loadAllEcShards
+ loadErr := diskLocation.loadAllEcShards()
+ if loadErr != nil {
+ t.Logf("loadAllEcShards returned error (expected in some cases): %v", loadErr)
+ }
+
+ // Test idempotency - running again should not cause issues
+ loadErr2 := diskLocation.loadAllEcShards()
+ if loadErr2 != nil {
+ t.Logf("Second loadAllEcShards returned error: %v", loadErr2)
+ }
+
+ // Verify cleanup expectations
+ if tt.expectCleanup {
+ // Check that files were cleaned up
+ if util.FileExists(baseFileName + ".ecx") {
+ t.Errorf("Expected .ecx to be cleaned up but it still exists")
+ }
+ if util.FileExists(baseFileName + ".ecj") {
+ t.Errorf("Expected .ecj to be cleaned up but it still exists")
+ }
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardFile := baseFileName + erasure_coding.ToExt(i)
+ if util.FileExists(shardFile) {
+ t.Errorf("Expected shard %d to be cleaned up but it still exists", i)
+ }
+ }
+ // .dat file should still exist (not cleaned up)
+ if tt.createDatFile && !util.FileExists(baseFileName+".dat") {
+ t.Errorf("Expected .dat file to remain but it was deleted")
+ }
+ } else {
+ // Check that files were NOT cleaned up
+ for i := 0; i < tt.numShards; i++ {
+ shardFile := baseFileName + erasure_coding.ToExt(i)
+ if !util.FileExists(shardFile) {
+ t.Errorf("Expected shard %d to remain but it was cleaned up", i)
+ }
+ }
+ if tt.createEcxFile && !util.FileExists(baseFileName+".ecx") {
+ t.Errorf("Expected .ecx to remain but it was cleaned up")
+ }
+ }
+
+ // Verify load expectations
+ if tt.expectLoadSuccess {
+ if diskLocation.EcShardCount() == 0 {
+ t.Errorf("Expected EC shards to be loaded for volume %d", tt.volumeId)
+ }
+ }
+
+ })
+ }
+}
+
+// TestValidateEcVolume tests the validateEcVolume function
+func TestValidateEcVolume(t *testing.T) {
+ tempDir := t.TempDir()
+
+ minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
+ diskLocation := &DiskLocation{
+ Directory: tempDir,
+ DirectoryUuid: "test-uuid",
+ IdxDirectory: tempDir,
+ DiskType: types.HddType,
+ MinFreeSpace: minFreeSpace,
+ }
+
+ tests := []struct {
+ name string
+ volumeId needle.VolumeId
+ collection string
+ createDatFile bool
+ numShards int
+ expectValid bool
+ }{
+ {
+ name: "Valid: .dat exists with 10+ shards",
+ volumeId: 200,
+ collection: "",
+ createDatFile: true,
+ numShards: 10,
+ expectValid: true,
+ },
+ {
+ name: "Invalid: .dat exists with < 10 shards",
+ volumeId: 201,
+ collection: "",
+ createDatFile: true,
+ numShards: 9,
+ expectValid: false,
+ },
+ {
+ name: "Valid: .dat deleted (distributed EC) with any shards",
+ volumeId: 202,
+ collection: "",
+ createDatFile: false,
+ numShards: 5,
+ expectValid: true,
+ },
+ {
+ name: "Valid: .dat deleted (distributed EC) with no shards",
+ volumeId: 203,
+ collection: "",
+ createDatFile: false,
+ numShards: 0,
+ expectValid: true,
+ },
+ {
+ name: "Invalid: zero-byte shard files should not count",
+ volumeId: 204,
+ collection: "",
+ createDatFile: true,
+ numShards: 0, // Will create 10 zero-byte files below
+ expectValid: false,
+ },
+ {
+ name: "Invalid: .dat exists with different size shards",
+ volumeId: 205,
+ collection: "",
+ createDatFile: true,
+ numShards: 10, // Will create shards with varying sizes
+ expectValid: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ baseFileName := erasure_coding.EcShardFileName(tt.collection, tempDir, int(tt.volumeId))
+
+ // For proper testing, we need to use realistic sizes that match EC encoding
+ // EC uses large blocks (1GB) and small blocks (1MB)
+ // For test purposes, use a small .dat file size that still exercises the logic
+ // 10MB .dat file = 1MB per shard (one small batch, fast and deterministic)
+ datFileSize := int64(10 * 1024 * 1024) // 10MB
+ expectedShardSize := calculateExpectedShardSize(datFileSize)
+
+ // Create .dat file if needed
+ if tt.createDatFile {
+ datFile, err := os.Create(baseFileName + ".dat")
+ if err != nil {
+ t.Fatalf("Failed to create .dat file: %v", err)
+ }
+ // Write minimal data (don't need to fill entire 10GB for tests)
+ datFile.Truncate(datFileSize)
+ datFile.Close()
+ }
+
+ // Create EC shard files with correct size
+ for i := 0; i < tt.numShards; i++ {
+ shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i))
+ if err != nil {
+ t.Fatalf("Failed to create shard file: %v", err)
+ }
+ // Use truncate to create file of correct size without allocating all the space
+ if err := shardFile.Truncate(expectedShardSize); err != nil {
+ shardFile.Close()
+ t.Fatalf("Failed to truncate shard file: %v", err)
+ }
+ if err := shardFile.Close(); err != nil {
+ t.Fatalf("Failed to close shard file: %v", err)
+ }
+ }
+
+ // For zero-byte test case, create empty files for all data shards
+ if tt.volumeId == 204 {
+ for i := 0; i < erasure_coding.DataShardsCount; i++ {
+ shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i))
+ if err != nil {
+ t.Fatalf("Failed to create empty shard file: %v", err)
+ }
+ // Don't write anything - leave as zero-byte
+ shardFile.Close()
+ }
+ }
+
+ // For mismatched shard size test case, create shards with different sizes
+ if tt.volumeId == 205 {
+ for i := 0; i < erasure_coding.DataShardsCount; i++ {
+ shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i))
+ if err != nil {
+ t.Fatalf("Failed to create shard file: %v", err)
+ }
+ // Write different amount of data to each shard
+ data := make([]byte, 100+i*10)
+ shardFile.Write(data)
+ shardFile.Close()
+ }
+ }
+
+ // Test validation
+ isValid := diskLocation.validateEcVolume(tt.collection, tt.volumeId)
+ if isValid != tt.expectValid {
+ t.Errorf("Expected validation result %v but got %v", tt.expectValid, isValid)
+ }
+ })
+ }
+}
+
+// TestRemoveEcVolumeFiles tests the removeEcVolumeFiles function
+func TestRemoveEcVolumeFiles(t *testing.T) {
+ tests := []struct {
+ name string
+ separateIdxDir bool
+ }{
+ {"Same directory for data and index", false},
+ {"Separate idx directory", true},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tempDir := t.TempDir()
+
+ var dataDir, idxDir string
+ if tt.separateIdxDir {
+ dataDir = filepath.Join(tempDir, "data")
+ idxDir = filepath.Join(tempDir, "idx")
+ os.MkdirAll(dataDir, 0755)
+ os.MkdirAll(idxDir, 0755)
+ } else {
+ dataDir = tempDir
+ idxDir = tempDir
+ }
+
+ minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
+ diskLocation := &DiskLocation{
+ Directory: dataDir,
+ DirectoryUuid: "test-uuid",
+ IdxDirectory: idxDir,
+ DiskType: types.HddType,
+ MinFreeSpace: minFreeSpace,
+ }
+
+ volumeId := needle.VolumeId(300)
+ collection := ""
+ dataBaseFileName := erasure_coding.EcShardFileName(collection, dataDir, int(volumeId))
+ idxBaseFileName := erasure_coding.EcShardFileName(collection, idxDir, int(volumeId))
+
+ // Create all EC shard files in data directory
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardFile, err := os.Create(dataBaseFileName + erasure_coding.ToExt(i))
+ if err != nil {
+ t.Fatalf("Failed to create shard file: %v", err)
+ }
+ if _, err := shardFile.WriteString("dummy shard data"); err != nil {
+ shardFile.Close()
+ t.Fatalf("Failed to write shard file: %v", err)
+ }
+ if err := shardFile.Close(); err != nil {
+ t.Fatalf("Failed to close shard file: %v", err)
+ }
+ }
+
+ // Create .ecx file in idx directory
+ ecxFile, err := os.Create(idxBaseFileName + ".ecx")
+ if err != nil {
+ t.Fatalf("Failed to create .ecx file: %v", err)
+ }
+ if _, err := ecxFile.WriteString("dummy ecx data"); err != nil {
+ ecxFile.Close()
+ t.Fatalf("Failed to write .ecx file: %v", err)
+ }
+ if err := ecxFile.Close(); err != nil {
+ t.Fatalf("Failed to close .ecx file: %v", err)
+ }
+
+ // Create .ecj file in idx directory
+ ecjFile, err := os.Create(idxBaseFileName + ".ecj")
+ if err != nil {
+ t.Fatalf("Failed to create .ecj file: %v", err)
+ }
+ if _, err := ecjFile.WriteString("dummy ecj data"); err != nil {
+ ecjFile.Close()
+ t.Fatalf("Failed to write .ecj file: %v", err)
+ }
+ if err := ecjFile.Close(); err != nil {
+ t.Fatalf("Failed to close .ecj file: %v", err)
+ }
+
+ // Create .dat file in data directory (should NOT be removed)
+ datFile, err := os.Create(dataBaseFileName + ".dat")
+ if err != nil {
+ t.Fatalf("Failed to create .dat file: %v", err)
+ }
+ if _, err := datFile.WriteString("dummy dat data"); err != nil {
+ datFile.Close()
+ t.Fatalf("Failed to write .dat file: %v", err)
+ }
+ if err := datFile.Close(); err != nil {
+ t.Fatalf("Failed to close .dat file: %v", err)
+ }
+
+ // Call removeEcVolumeFiles
+ diskLocation.removeEcVolumeFiles(collection, volumeId)
+
+ // Verify all EC shard files are removed from data directory
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardFile := dataBaseFileName + erasure_coding.ToExt(i)
+ if util.FileExists(shardFile) {
+ t.Errorf("Shard file %d should be removed but still exists", i)
+ }
+ }
+
+ // Verify .ecx file is removed from idx directory
+ if util.FileExists(idxBaseFileName + ".ecx") {
+ t.Errorf(".ecx file should be removed but still exists")
+ }
+
+ // Verify .ecj file is removed from idx directory
+ if util.FileExists(idxBaseFileName + ".ecj") {
+ t.Errorf(".ecj file should be removed but still exists")
+ }
+
+ // Verify .dat file is NOT removed from data directory
+ if !util.FileExists(dataBaseFileName + ".dat") {
+ t.Errorf(".dat file should NOT be removed but was deleted")
+ }
+ })
+ }
+}
+
+// TestEcCleanupWithSeparateIdxDirectory tests EC cleanup when idx directory is different
+func TestEcCleanupWithSeparateIdxDirectory(t *testing.T) {
+ tempDir := t.TempDir()
+
+ idxDir := filepath.Join(tempDir, "idx")
+ dataDir := filepath.Join(tempDir, "data")
+ os.MkdirAll(idxDir, 0755)
+ os.MkdirAll(dataDir, 0755)
+
+ minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
+ diskLocation := &DiskLocation{
+ Directory: dataDir,
+ DirectoryUuid: "test-uuid",
+ IdxDirectory: idxDir,
+ DiskType: types.HddType,
+ MinFreeSpace: minFreeSpace,
+ }
+ diskLocation.volumes = make(map[needle.VolumeId]*Volume)
+ diskLocation.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
+
+ volumeId := needle.VolumeId(400)
+ collection := ""
+
+ // Create shards in data directory (shards only go to Directory, not IdxDirectory)
+ dataBaseFileName := erasure_coding.EcShardFileName(collection, dataDir, int(volumeId))
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardFile, err := os.Create(dataBaseFileName + erasure_coding.ToExt(i))
+ if err != nil {
+ t.Fatalf("Failed to create shard file: %v", err)
+ }
+ if _, err := shardFile.WriteString("dummy shard data"); err != nil {
+ t.Fatalf("Failed to write shard file: %v", err)
+ }
+ if err := shardFile.Close(); err != nil {
+ t.Fatalf("Failed to close shard file: %v", err)
+ }
+ }
+
+ // Create .dat in data directory
+ datFile, err := os.Create(dataBaseFileName + ".dat")
+ if err != nil {
+ t.Fatalf("Failed to create .dat file: %v", err)
+ }
+ if _, err := datFile.WriteString("dummy data"); err != nil {
+ t.Fatalf("Failed to write .dat file: %v", err)
+ }
+ if err := datFile.Close(); err != nil {
+ t.Fatalf("Failed to close .dat file: %v", err)
+ }
+
+ // Do not create .ecx: trigger orphaned-shards cleanup when .dat exists
+
+ // Run loadAllEcShards
+ loadErr := diskLocation.loadAllEcShards()
+ if loadErr != nil {
+ t.Logf("loadAllEcShards error: %v", loadErr)
+ }
+
+ // Verify cleanup occurred in data directory (shards)
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ shardFile := dataBaseFileName + erasure_coding.ToExt(i)
+ if util.FileExists(shardFile) {
+ t.Errorf("Shard file %d should be cleaned up but still exists", i)
+ }
+ }
+
+ // Verify .dat in data directory still exists (only EC files are cleaned up)
+ if !util.FileExists(dataBaseFileName + ".dat") {
+ t.Errorf(".dat file should remain but was deleted")
+ }
+}
+
+// TestDistributedEcVolumeNoFileDeletion verifies that distributed EC volumes
+// (where .dat is deleted) do NOT have their shard files deleted when load fails
+// This tests the critical bug fix where DestroyEcVolume was incorrectly deleting files
+func TestDistributedEcVolumeNoFileDeletion(t *testing.T) {
+ tempDir := t.TempDir()
+
+ minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
+ diskLocation := &DiskLocation{
+ Directory: tempDir,
+ DirectoryUuid: "test-uuid",
+ IdxDirectory: tempDir,
+ DiskType: types.HddType,
+ MinFreeSpace: minFreeSpace,
+ ecVolumes: make(map[needle.VolumeId]*erasure_coding.EcVolume),
+ }
+
+ collection := ""
+ volumeId := needle.VolumeId(500)
+ baseFileName := erasure_coding.EcShardFileName(collection, tempDir, int(volumeId))
+
+ // Create EC shards (only 5 shards - less than DataShardsCount, but OK for distributed EC)
+ numDistributedShards := 5
+ for i := 0; i < numDistributedShards; i++ {
+ shardFile, err := os.Create(baseFileName + erasure_coding.ToExt(i))
+ if err != nil {
+ t.Fatalf("Failed to create shard file: %v", err)
+ }
+ if _, err := shardFile.WriteString("dummy shard data"); err != nil {
+ shardFile.Close()
+ t.Fatalf("Failed to write shard file: %v", err)
+ }
+ if err := shardFile.Close(); err != nil {
+ t.Fatalf("Failed to close shard file: %v", err)
+ }
+ }
+
+ // Create .ecx file to trigger EC loading
+ ecxFile, err := os.Create(baseFileName + ".ecx")
+ if err != nil {
+ t.Fatalf("Failed to create .ecx file: %v", err)
+ }
+ if _, err := ecxFile.WriteString("dummy ecx data"); err != nil {
+ ecxFile.Close()
+ t.Fatalf("Failed to write .ecx file: %v", err)
+ }
+ if err := ecxFile.Close(); err != nil {
+ t.Fatalf("Failed to close .ecx file: %v", err)
+ }
+
+ // NO .dat file - this is a distributed EC volume
+
+ // Run loadAllEcShards - this should fail but NOT delete shard files
+ loadErr := diskLocation.loadAllEcShards()
+ if loadErr != nil {
+ t.Logf("loadAllEcShards returned error (expected): %v", loadErr)
+ }
+
+ // CRITICAL CHECK: Verify shard files still exist (should NOT be deleted)
+ for i := 0; i < 5; i++ {
+ shardFile := baseFileName + erasure_coding.ToExt(i)
+ if !util.FileExists(shardFile) {
+ t.Errorf("CRITICAL BUG: Shard file %s was deleted for distributed EC volume!", shardFile)
+ }
+ }
+
+ // Verify .ecx file still exists (should NOT be deleted for distributed EC)
+ if !util.FileExists(baseFileName + ".ecx") {
+ t.Errorf("CRITICAL BUG: .ecx file was deleted for distributed EC volume!")
+ }
+
+ t.Logf("SUCCESS: Distributed EC volume files preserved (not deleted)")
+}