aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-10-26 13:53:10 -0700
committerchrislu <chris.lu@gmail.com>2025-10-26 13:53:10 -0700
commit6fff6269c8f766db3292cb5849f1a9a18ca73799 (patch)
tree80d2fb0fbe4cd59b36955d840c00cbda844e189a
parent7b6f364e7f52424b782f41d19291355c17e4d924 (diff)
downloadseaweedfs-6fff6269c8f766db3292cb5849f1a9a18ca73799.tar.xz
seaweedfs-6fff6269c8f766db3292cb5849f1a9a18ca73799.zip
refactoring
-rw-r--r--weed/storage/disk_location_ec.go45
-rw-r--r--weed/storage/disk_location_ec_test.go37
2 files changed, 56 insertions, 26 deletions
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index 2859f7683..cc3469c73 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -207,6 +207,12 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
// If .dat file is gone, this is likely a distributed EC volume with shards on multiple servers
baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId))
datFileName := baseFileName + ".dat"
+ // Helper to reset state between volume processing
+ reset := func() {
+ sameVolumeShards = nil
+ prevVolumeId = 0
+ }
+
datExists := util.FileExists(datFileName)
// Validate EC volume if .dat file exists (incomplete EC encoding scenario)
@@ -217,8 +223,7 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
l.removeEcVolumeFiles(collection, volumeId)
// Clean up any in-memory state. This does not delete files (already deleted by removeEcVolumeFiles).
l.unloadEcVolume(volumeId)
- sameVolumeShards = nil
- prevVolumeId = 0
+ reset()
continue
}
@@ -233,12 +238,10 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
}
// Clean up any partially loaded in-memory state. This does not delete files.
l.unloadEcVolume(volumeId)
- sameVolumeShards = nil
- prevVolumeId = 0
+ reset()
continue
}
- prevVolumeId = 0
- sameVolumeShards = nil
+ reset()
continue
}
@@ -250,18 +253,8 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
// We have collected EC shards but never found .ecx file
// Need to determine the collection name from the shard filenames
baseName := sameVolumeShards[0][:len(sameVolumeShards[0])-len(path.Ext(sameVolumeShards[0]))]
- collection, volumeId, err := parseCollectionVolumeId(baseName)
- if err == nil && volumeId == prevVolumeId {
- baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(volumeId))
- datFileName := baseFileName + ".dat"
- // Only clean up if .dat file exists (incomplete encoding, not distributed EC)
- if util.FileExists(datFileName) {
- glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...",
- len(sameVolumeShards), volumeId)
- l.removeEcVolumeFiles(collection, volumeId)
- // Clean up any in-memory state. This does not delete files (already deleted by removeEcVolumeFiles).
- l.unloadEcVolume(volumeId)
- }
+ if collection, volumeId, err := parseCollectionVolumeId(baseName); err == nil && volumeId == prevVolumeId {
+ l.cleanupIfIncomplete(collection, volumeId, len(sameVolumeShards))
}
}
@@ -307,6 +300,20 @@ func (l *DiskLocation) EcShardCount() int {
return shardCount
}
+// cleanupIfIncomplete removes EC files when .dat exists but .ecx is missing for a volume.
+// This handles the case where EC encoding was interrupted before creating the .ecx file.
+func (l *DiskLocation) cleanupIfIncomplete(collection string, vid needle.VolumeId, shardCount int) {
+ baseFileName := erasure_coding.EcShardFileName(collection, l.Directory, int(vid))
+ datFileName := baseFileName + ".dat"
+ if util.FileExists(datFileName) {
+ glog.Warningf("Found %d EC shards without .ecx file for volume %d (incomplete encoding interrupted before .ecx creation), cleaning up...",
+ shardCount, vid)
+ l.removeEcVolumeFiles(collection, vid)
+ // Clean up any in-memory state. This does not delete files (already deleted by removeEcVolumeFiles).
+ l.unloadEcVolume(vid)
+ }
+}
+
// calculateExpectedShardSize computes the exact expected shard size based on .dat file size
// The EC encoding process is deterministic:
// 1. Data is processed in batches of (LargeBlockSize * DataShardsCount) for large blocks
@@ -347,6 +354,8 @@ func (l *DiskLocation) validateEcVolume(collection string, vid needle.VolumeId)
if datFileInfo, err := os.Stat(datFileName); err == nil {
datExists = true
expectedShardSize = calculateExpectedShardSize(datFileInfo.Size())
+ } else if !os.IsNotExist(err) {
+ glog.Warningf("Failed to stat .dat file %s: %v", datFileName, err)
}
shardCount := 0
diff --git a/weed/storage/disk_location_ec_test.go b/weed/storage/disk_location_ec_test.go
index 874e73da6..fad1612fa 100644
--- a/weed/storage/disk_location_ec_test.go
+++ b/weed/storage/disk_location_ec_test.go
@@ -13,8 +13,6 @@ import (
// TestIncompleteEcEncodingCleanup tests the cleanup logic for incomplete EC encoding scenarios
func TestIncompleteEcEncodingCleanup(t *testing.T) {
- tempDir := t.TempDir()
-
tests := []struct {
name string
volumeId needle.VolumeId
@@ -96,6 +94,9 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ // Use per-subtest temp directory for stronger isolation
+ tempDir := t.TempDir()
+
// Create DiskLocation
minFreeSpace := util.MinFreeSpace{Type: util.AsPercent, Percent: 1, Raw: "1"}
diskLocation := &DiskLocation{
@@ -123,8 +124,12 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create .dat file: %v", err)
}
- datFile.Truncate(datFileSize)
- datFile.Close()
+ if err := datFile.Truncate(datFileSize); err != nil {
+ t.Fatalf("Failed to truncate .dat file: %v", err)
+ }
+ if err := datFile.Close(); err != nil {
+ t.Fatalf("Failed to close .dat file: %v", err)
+ }
}
// Create EC shard files
@@ -163,6 +168,12 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) {
t.Logf("loadAllEcShards returned error (expected in some cases): %v", loadErr)
}
+ // Test idempotency - running again should not cause issues
+ loadErr2 := diskLocation.loadAllEcShards()
+ if loadErr2 != nil {
+ t.Logf("Second loadAllEcShards returned error: %v", loadErr2)
+ }
+
// Verify cleanup expectations
if tt.expectCleanup {
// Check that files were cleaned up
@@ -195,6 +206,13 @@ func TestIncompleteEcEncodingCleanup(t *testing.T) {
}
}
+ // Verify load expectations
+ if tt.expectLoadSuccess {
+ if diskLocation.EcShardCount() == 0 {
+ t.Errorf("Expected EC shards to be loaded for volume %d", tt.volumeId)
+ }
+ }
+
})
}
}
@@ -299,8 +317,12 @@ func TestValidateEcVolume(t *testing.T) {
t.Fatalf("Failed to create shard file: %v", err)
}
// Use truncate to create file of correct size without allocating all the space
- shardFile.Truncate(expectedShardSize)
- shardFile.Close()
+ if err := shardFile.Truncate(expectedShardSize); err != nil {
+ t.Fatalf("Failed to truncate shard file: %v", err)
+ }
+ if err := shardFile.Close(); err != nil {
+ t.Fatalf("Failed to close shard file: %v", err)
+ }
}
// For zero-byte test case, create 10 empty files
@@ -439,8 +461,7 @@ func TestEcCleanupWithSeparateIdxDirectory(t *testing.T) {
datFile.WriteString("dummy data")
datFile.Close()
- // Create .ecx and .ecj in idx directory (but no .ecx to trigger cleanup)
- // Don't create .ecx to test orphaned shards cleanup
+ // Do not create .ecx: trigger orphaned-shards cleanup when .dat exists
// Run loadAllEcShards
loadErr := diskLocation.loadAllEcShards()