diff options
Diffstat (limited to 'test/erasure_coding')
| -rw-r--r-- | test/erasure_coding/ec_integration_test.go | 1136 |
1 files changed, 1130 insertions, 6 deletions
diff --git a/test/erasure_coding/ec_integration_test.go b/test/erasure_coding/ec_integration_test.go index 71f77683f..bb0983f06 100644 --- a/test/erasure_coding/ec_integration_test.go +++ b/test/erasure_coding/ec_integration_test.go @@ -315,7 +315,7 @@ func TestECEncodingMasterTimingRaceCondition(t *testing.T) { os.Stdout = w os.Stderr = w - err = ecEncodeCmd.Do(args, commandEnv, &output) + encodeErr := ecEncodeCmd.Do(args, commandEnv, &output) // Restore stdout/stderr w.Close() @@ -343,17 +343,17 @@ func TestECEncodingMasterTimingRaceCondition(t *testing.T) { } // Step 3: Try to get volume locations after EC encoding (this simulates the bug) - locationsAfter, err := getVolumeLocations(commandEnv, volumeId) - if err != nil { - t.Logf("Volume locations after EC encoding: ERROR - %v", err) + locationsAfter, locErr := getVolumeLocations(commandEnv, volumeId) + if locErr != nil { + t.Logf("Volume locations after EC encoding: ERROR - %v", locErr) t.Logf("This demonstrates the timing issue where original volume info is lost") } else { t.Logf("Volume locations after EC encoding: %v", locationsAfter) } // Test result evaluation - if err != nil { - t.Logf("EC encoding completed with error: %v", err) + if encodeErr != nil { + t.Logf("EC encoding completed with error: %v", encodeErr) } else { t.Logf("EC encoding completed successfully") } @@ -622,6 +622,63 @@ func contains(s, substr string) bool { return false } +// assertNoFlagError checks that the error and output don't indicate a flag parsing error. +// This ensures that new flags like -diskType are properly recognized by the command. +func assertNoFlagError(t *testing.T, err error, output string, context string) { + t.Helper() + + // Check for common flag parsing error patterns (case-insensitive) + flagErrorPatterns := []string{ + "flag provided but not defined", + "unknown flag", + "invalid flag", + "bad flag syntax", + } + + outputLower := strings.ToLower(output) + for _, pattern := range flagErrorPatterns { + if strings.Contains(outputLower, pattern) { + t.Fatalf("%s: flag parsing error detected in output: %s", context, pattern) + } + if err != nil && strings.Contains(strings.ToLower(err.Error()), pattern) { + t.Fatalf("%s: flag parsing error in error: %v", context, err) + } + } +} + +// commandRunner is an interface matching the shell command Do method +type commandRunner interface { + Do([]string, *shell.CommandEnv, io.Writer) error +} + +// captureCommandOutput executes a shell command and captures its output from both +// stdout/stderr and the command's buffer. This reduces code duplication in tests. +func captureCommandOutput(t *testing.T, cmd commandRunner, args []string, commandEnv *shell.CommandEnv) (output string, err error) { + t.Helper() + var outBuf bytes.Buffer + + oldStdout, oldStderr := os.Stdout, os.Stderr + r, w, pipeErr := os.Pipe() + require.NoError(t, pipeErr) + + defer func() { + _ = w.Close() + os.Stdout = oldStdout + os.Stderr = oldStderr + }() + + os.Stdout = w + os.Stderr = w + + cmdErr := cmd.Do(args, commandEnv, &outBuf) + + capturedOutput, readErr := io.ReadAll(r) + _ = r.Close() + require.NoError(t, readErr) + + return string(capturedOutput) + outBuf.String(), cmdErr +} + // TestECEncodingRegressionPrevention tests that the specific bug patterns don't reoccur func TestECEncodingRegressionPrevention(t *testing.T) { t.Run("function_signature_regression", func(t *testing.T) { @@ -744,8 +801,14 @@ func TestDiskAwareECRebalancing(t *testing.T) { err := lockCmd.Do([]string{}, commandEnv, &lockOutput) if err != nil { t.Logf("Lock command failed: %v", err) + return } + // Defer unlock to ensure it's always released + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + // Execute EC encoding var output bytes.Buffer ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] @@ -876,6 +939,7 @@ type MultiDiskCluster struct { masterCmd *exec.Cmd volumeServers []*exec.Cmd testDir string + logFiles []*os.File // Track log files for cleanup } func (c *MultiDiskCluster) Stop() { @@ -892,6 +956,13 @@ func (c *MultiDiskCluster) Stop() { c.masterCmd.Process.Kill() c.masterCmd.Wait() } + + // Close all log files to prevent FD leaks + for _, f := range c.logFiles { + if f != nil { + f.Close() + } + } } // startMultiDiskCluster starts a SeaweedFS cluster with multiple disks per volume server @@ -920,6 +991,7 @@ func startMultiDiskCluster(ctx context.Context, dataDir string) (*MultiDiskClust if err != nil { return nil, fmt.Errorf("failed to create master log file: %v", err) } + cluster.logFiles = append(cluster.logFiles, masterLogFile) masterCmd.Stdout = masterLogFile masterCmd.Stderr = masterLogFile @@ -971,6 +1043,7 @@ func startMultiDiskCluster(ctx context.Context, dataDir string) (*MultiDiskClust cluster.Stop() return nil, fmt.Errorf("failed to create volume log file: %v", err) } + cluster.logFiles = append(cluster.logFiles, volumeLogFile) volumeCmd.Stdout = volumeLogFile volumeCmd.Stderr = volumeLogFile @@ -1083,3 +1156,1054 @@ func calculateDiskShardVariance(distribution map[string]map[int]int) float64 { return math.Sqrt(variance / float64(len(counts))) } + +// TestECDiskTypeSupport tests EC operations with different disk types (HDD, SSD) +// This verifies the -diskType flag works correctly for ec.encode and ec.balance +func TestECDiskTypeSupport(t *testing.T) { + if testing.Short() { + t.Skip("Skipping disk type integration test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_ec_disktype_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() + + // Start cluster with SSD disks + cluster, err := startClusterWithDiskType(ctx, testDir, "ssd") + require.NoError(t, err) + defer cluster.Stop() + + // Wait for servers to be ready + require.NoError(t, waitForServer("127.0.0.1:9335", 30*time.Second)) + for i := 0; i < 3; i++ { + require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:810%d", i), 30*time.Second)) + } + + // Wait for volume servers to register with master + t.Log("Waiting for SSD volume servers to register with master...") + time.Sleep(10 * time.Second) + + // Create command environment + options := &shell.ShellOptions{ + Masters: stringPtr("127.0.0.1:9335"), + GrpcDialOption: grpc.WithInsecure(), + FilerGroup: stringPtr("default"), + } + commandEnv := shell.NewCommandEnv(options) + + // Connect to master with longer timeout + ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) + commandEnv.MasterClient.WaitUntilConnected(ctx2) + + // Wait for master client to fully sync + time.Sleep(5 * time.Second) + + // Upload test data to create a volume - retry if volumes not ready + var volumeId needle.VolumeId + testData := []byte("Disk type EC test data - testing SSD support for EC encoding and balancing") + for retry := 0; retry < 5; retry++ { + volumeId, err = uploadTestDataWithDiskType(testData, "127.0.0.1:9335", "ssd", "ssd_test") + if err == nil { + break + } + t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, err) + time.Sleep(3 * time.Second) + } + require.NoError(t, err, "Failed to upload test data to SSD disk after retries") + t.Logf("Created volume %d on SSD disk for disk type EC test", volumeId) + + // Wait for volume to be registered + time.Sleep(3 * time.Second) + + t.Run("verify_ssd_disk_setup", func(t *testing.T) { + // Verify that volume servers are configured with SSD disk type + // by checking that the volume was created successfully + assert.NotEqual(t, needle.VolumeId(0), volumeId, "Volume should be created on SSD disk") + t.Logf("Volume %d created successfully on SSD disk", volumeId) + }) + + t.Run("ec_encode_with_ssd_disktype", func(t *testing.T) { + // Get lock first + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + // Defer unlock to ensure it's always released + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + // Execute EC encoding with SSD disk type + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + args := []string{ + "-volumeId", fmt.Sprintf("%d", volumeId), + "-collection", "ssd_test", + "-diskType", "ssd", + "-force", + } + + outputStr, encodeErr := captureCommandOutput(t, ecEncodeCmd, args, commandEnv) + t.Logf("EC encode command output: %s", outputStr) + + // Fail on flag parsing errors - these indicate the -diskType flag is not recognized + assertNoFlagError(t, encodeErr, outputStr, "ec.encode -diskType") + + // EC encode may fail if volume is too small - that's acceptable for this flag test + // But unexpected errors should fail the test + if encodeErr != nil { + errStr := encodeErr.Error() + if contains(errStr, "volume") || contains(errStr, "size") || contains(errStr, "small") { + t.Logf("EC encoding failed due to volume constraints (expected): %v", encodeErr) + } else { + t.Errorf("EC encoding failed with unexpected error: %v", encodeErr) + } + } + }) + + t.Run("ec_balance_with_ssd_disktype", func(t *testing.T) { + // Get lock first + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + // Defer unlock to ensure it's always released + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + // Execute EC balance with SSD disk type + ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")] + args := []string{ + "-collection", "ssd_test", + "-diskType", "ssd", + } + + outputStr, balanceErr := captureCommandOutput(t, ecBalanceCmd, args, commandEnv) + t.Logf("EC balance command output: %s", outputStr) + + // Fail on flag parsing errors + assertNoFlagError(t, balanceErr, outputStr, "ec.balance -diskType") + + // ec.balance should succeed (it may just have nothing to balance) + require.NoError(t, balanceErr, "ec.balance with -diskType=ssd should succeed") + }) + + t.Run("verify_disktype_flag_parsing", func(t *testing.T) { + // Test that disk type flags are documented in help output + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")] + ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")] + + require.NotNil(t, ecEncodeCmd, "ec.encode command should exist") + require.NotNil(t, ecBalanceCmd, "ec.balance command should exist") + require.NotNil(t, ecDecodeCmd, "ec.decode command should exist") + + // Verify help text mentions diskType flag + encodeHelp := ecEncodeCmd.Help() + assert.Contains(t, encodeHelp, "diskType", "ec.encode help should mention -diskType flag") + + balanceHelp := ecBalanceCmd.Help() + assert.Contains(t, balanceHelp, "diskType", "ec.balance help should mention -diskType flag") + + decodeHelp := ecDecodeCmd.Help() + assert.Contains(t, decodeHelp, "diskType", "ec.decode help should mention -diskType flag") + + t.Log("All EC commands have -diskType flag documented in help") + }) + + t.Run("ec_encode_with_source_disktype", func(t *testing.T) { + // Test that -sourceDiskType flag is accepted + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + // Defer unlock to ensure it's always released + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + // Execute EC encoding with sourceDiskType filter + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + args := []string{ + "-collection", "ssd_test", + "-sourceDiskType", "ssd", // Filter source volumes by SSD + "-diskType", "ssd", // Place EC shards on SSD + "-force", + } + + outputStr, encodeErr := captureCommandOutput(t, ecEncodeCmd, args, commandEnv) + t.Logf("EC encode with sourceDiskType output: %s", outputStr) + + // Fail on flag parsing errors + assertNoFlagError(t, encodeErr, outputStr, "ec.encode -sourceDiskType") + + // May fail if no volumes match the sourceDiskType filter - that's acceptable + if encodeErr != nil { + errStr := encodeErr.Error() + if contains(errStr, "no volume") || contains(errStr, "matching") || contains(errStr, "found") { + t.Logf("EC encoding: no matching volumes (expected): %v", encodeErr) + } else { + t.Errorf("EC encoding with sourceDiskType failed unexpectedly: %v", encodeErr) + } + } + }) + + t.Run("ec_decode_with_disktype", func(t *testing.T) { + // Test that ec.decode accepts -diskType flag + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + // Defer unlock to ensure it's always released + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + // Execute EC decode with disk type + ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")] + args := []string{ + "-collection", "ssd_test", + "-diskType", "ssd", // Source EC shards are on SSD + } + + outputStr, decodeErr := captureCommandOutput(t, ecDecodeCmd, args, commandEnv) + t.Logf("EC decode with diskType output: %s", outputStr) + + // Fail on flag parsing errors + assertNoFlagError(t, decodeErr, outputStr, "ec.decode -diskType") + + // May fail if no EC volumes exist - that's acceptable for this flag test + if decodeErr != nil { + errStr := decodeErr.Error() + if contains(errStr, "no ec volume") || contains(errStr, "not found") || contains(errStr, "ec shard") { + t.Logf("EC decode: no EC volumes to decode (expected): %v", decodeErr) + } else { + t.Errorf("EC decode with diskType failed unexpectedly: %v", decodeErr) + } + } + }) +} + +// startClusterWithDiskType starts a SeaweedFS cluster with a specific disk type +func startClusterWithDiskType(ctx context.Context, dataDir string, diskType string) (*MultiDiskCluster, error) { + weedBinary := findWeedBinary() + if weedBinary == "" { + return nil, fmt.Errorf("weed binary not found") + } + + cluster := &MultiDiskCluster{testDir: dataDir} + + // Create master directory + masterDir := filepath.Join(dataDir, "master") + os.MkdirAll(masterDir, 0755) + + // Start master server on a different port to avoid conflict with other tests + masterCmd := exec.CommandContext(ctx, weedBinary, "master", + "-port", "9335", + "-mdir", masterDir, + "-volumeSizeLimitMB", "10", + "-ip", "127.0.0.1", + ) + + masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) + if err != nil { + return nil, fmt.Errorf("failed to create master log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, masterLogFile) + masterCmd.Stdout = masterLogFile + masterCmd.Stderr = masterLogFile + + if err := masterCmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start master server: %v", err) + } + cluster.masterCmd = masterCmd + + // Wait for master to be ready + time.Sleep(2 * time.Second) + + // Start 3 volume servers with the specified disk type + const numServers = 3 + + for i := 0; i < numServers; i++ { + // Create disk directory for this server + diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d_%s", i, diskType)) + if err := os.MkdirAll(diskDir, 0755); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create disk dir: %v", err) + } + + port := fmt.Sprintf("810%d", i) + rack := fmt.Sprintf("rack%d", i) + + volumeCmd := exec.CommandContext(ctx, weedBinary, "volume", + "-port", port, + "-dir", diskDir, + "-max", "10", + "-mserver", "127.0.0.1:9335", + "-ip", "127.0.0.1", + "-dataCenter", "dc1", + "-rack", rack, + "-disk", diskType, // Specify the disk type + ) + + // Create log file for this volume server + logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i)) + os.MkdirAll(logDir, 0755) + volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log")) + if err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create volume log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, volumeLogFile) + volumeCmd.Stdout = volumeLogFile + volumeCmd.Stderr = volumeLogFile + + if err := volumeCmd.Start(); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to start volume server %d: %v", i, err) + } + cluster.volumeServers = append(cluster.volumeServers, volumeCmd) + } + + // Wait for volume servers to register with master + time.Sleep(8 * time.Second) + + return cluster, nil +} + +// uploadTestDataWithDiskType uploads test data with a specific disk type +func uploadTestDataWithDiskType(data []byte, masterAddress string, diskType string, collection string) (needle.VolumeId, error) { + assignResult, err := operation.Assign(context.Background(), func(ctx context.Context) pb.ServerAddress { + return pb.ServerAddress(masterAddress) + }, grpc.WithInsecure(), &operation.VolumeAssignRequest{ + Count: 1, + Collection: collection, + Replication: "000", + DiskType: diskType, + }) + if err != nil { + return 0, err + } + + uploader, err := operation.NewUploader() + if err != nil { + return 0, err + } + + uploadResult, err, _ := uploader.Upload(context.Background(), bytes.NewReader(data), &operation.UploadOption{ + UploadUrl: "http://" + assignResult.Url + "/" + assignResult.Fid, + Filename: "testfile.txt", + MimeType: "text/plain", + }) + if err != nil { + return 0, err + } + + if uploadResult.Error != "" { + return 0, fmt.Errorf("upload error: %s", uploadResult.Error) + } + + fid, err := needle.ParseFileIdFromString(assignResult.Fid) + if err != nil { + return 0, err + } + + return fid.VolumeId, nil +} + +// TestECDiskTypeMixedCluster tests EC operations on a cluster with mixed disk types +// This verifies that EC shards are correctly placed on the specified disk type +func TestECDiskTypeMixedCluster(t *testing.T) { + if testing.Short() { + t.Skip("Skipping mixed disk type integration test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_ec_mixed_disktype_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() + + // Start cluster with mixed disk types (HDD and SSD) + cluster, err := startMixedDiskTypeCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + // Wait for servers to be ready + require.NoError(t, waitForServer("127.0.0.1:9336", 30*time.Second)) + for i := 0; i < 4; i++ { + require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:811%d", i), 30*time.Second)) + } + + // Wait for volume servers to register with master + t.Log("Waiting for mixed disk type volume servers to register with master...") + time.Sleep(10 * time.Second) + + // Create command environment + options := &shell.ShellOptions{ + Masters: stringPtr("127.0.0.1:9336"), + GrpcDialOption: grpc.WithInsecure(), + FilerGroup: stringPtr("default"), + } + commandEnv := shell.NewCommandEnv(options) + + // Connect to master with longer timeout + ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) + commandEnv.MasterClient.WaitUntilConnected(ctx2) + + // Wait for master client to fully sync + time.Sleep(5 * time.Second) + + t.Run("upload_to_ssd_and_hdd", func(t *testing.T) { + // Upload to SSD + ssdData := []byte("SSD disk type test data for EC encoding") + var ssdVolumeId needle.VolumeId + var ssdErr error + for retry := 0; retry < 5; retry++ { + ssdVolumeId, ssdErr = uploadTestDataWithDiskType(ssdData, "127.0.0.1:9336", "ssd", "ssd_collection") + if ssdErr == nil { + break + } + t.Logf("SSD upload attempt %d failed: %v, retrying...", retry+1, ssdErr) + time.Sleep(3 * time.Second) + } + require.NoError(t, ssdErr, "Failed to upload to SSD after retries - test setup failed") + t.Logf("Created SSD volume %d", ssdVolumeId) + + // Upload to HDD (default) + hddData := []byte("HDD disk type test data for EC encoding") + var hddVolumeId needle.VolumeId + var hddErr error + for retry := 0; retry < 5; retry++ { + hddVolumeId, hddErr = uploadTestDataWithDiskType(hddData, "127.0.0.1:9336", "hdd", "hdd_collection") + if hddErr == nil { + break + } + t.Logf("HDD upload attempt %d failed: %v, retrying...", retry+1, hddErr) + time.Sleep(3 * time.Second) + } + require.NoError(t, hddErr, "Failed to upload to HDD after retries - test setup failed") + t.Logf("Created HDD volume %d", hddVolumeId) + }) + + t.Run("ec_balance_targets_correct_disk_type", func(t *testing.T) { + // Get lock first + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + // Defer unlock to ensure it's always released + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + // Run ec.balance for SSD collection with -diskType=ssd + var ssdOutput bytes.Buffer + ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")] + ssdArgs := []string{ + "-collection", "ssd_collection", + "-diskType", "ssd", + } + + ssdErr := ecBalanceCmd.Do(ssdArgs, commandEnv, &ssdOutput) + t.Logf("EC balance for SSD: %v, output: %s", ssdErr, ssdOutput.String()) + assertNoFlagError(t, ssdErr, ssdOutput.String(), "ec.balance -diskType=ssd") + + // Run ec.balance for HDD collection with -diskType=hdd + var hddOutput bytes.Buffer + hddArgs := []string{ + "-collection", "hdd_collection", + "-diskType", "hdd", + } + + hddErr := ecBalanceCmd.Do(hddArgs, commandEnv, &hddOutput) + t.Logf("EC balance for HDD: %v, output: %s", hddErr, hddOutput.String()) + assertNoFlagError(t, hddErr, hddOutput.String(), "ec.balance -diskType=hdd") + }) +} + +// startMixedDiskTypeCluster starts a cluster with both HDD and SSD volume servers +func startMixedDiskTypeCluster(ctx context.Context, dataDir string) (*MultiDiskCluster, error) { + weedBinary := findWeedBinary() + if weedBinary == "" { + return nil, fmt.Errorf("weed binary not found") + } + + cluster := &MultiDiskCluster{testDir: dataDir} + + // Create master directory + masterDir := filepath.Join(dataDir, "master") + os.MkdirAll(masterDir, 0755) + + // Start master server + masterCmd := exec.CommandContext(ctx, weedBinary, "master", + "-port", "9336", + "-mdir", masterDir, + "-volumeSizeLimitMB", "10", + "-ip", "127.0.0.1", + ) + + masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) + if err != nil { + return nil, fmt.Errorf("failed to create master log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, masterLogFile) + masterCmd.Stdout = masterLogFile + masterCmd.Stderr = masterLogFile + + if err := masterCmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start master server: %v", err) + } + cluster.masterCmd = masterCmd + + // Wait for master to be ready + time.Sleep(2 * time.Second) + + // Start 2 HDD servers and 2 SSD servers + diskTypes := []string{"hdd", "hdd", "ssd", "ssd"} + + for i, diskType := range diskTypes { + diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d_%s", i, diskType)) + if err := os.MkdirAll(diskDir, 0755); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create disk dir: %v", err) + } + + port := fmt.Sprintf("811%d", i) + rack := fmt.Sprintf("rack%d", i) + + volumeCmd := exec.CommandContext(ctx, weedBinary, "volume", + "-port", port, + "-dir", diskDir, + "-max", "10", + "-mserver", "127.0.0.1:9336", + "-ip", "127.0.0.1", + "-dataCenter", "dc1", + "-rack", rack, + "-disk", diskType, + ) + + logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i)) + os.MkdirAll(logDir, 0755) + volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log")) + if err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create volume log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, volumeLogFile) + volumeCmd.Stdout = volumeLogFile + volumeCmd.Stderr = volumeLogFile + + if err := volumeCmd.Start(); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to start volume server %d: %v", i, err) + } + cluster.volumeServers = append(cluster.volumeServers, volumeCmd) + } + + // Wait for volume servers to register with master + time.Sleep(8 * time.Second) + + return cluster, nil +} + +// TestEvacuationFallbackBehavior tests that when a disk type has limited capacity, +// shards fall back to other disk types during evacuation +func TestEvacuationFallbackBehavior(t *testing.T) { + if testing.Short() { + t.Skip("Skipping evacuation fallback test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_evacuation_fallback_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() + + // Start a cluster with limited SSD capacity (1 SSD server, 2 HDD servers) + cluster, err := startLimitedSsdCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + // Wait for servers to be ready + require.NoError(t, waitForServer("127.0.0.1:9337", 30*time.Second)) + for i := 0; i < 3; i++ { + require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:812%d", i), 30*time.Second)) + } + + time.Sleep(10 * time.Second) + + // Create command environment + options := &shell.ShellOptions{ + Masters: stringPtr("127.0.0.1:9337"), + GrpcDialOption: grpc.WithInsecure(), + FilerGroup: stringPtr("default"), + } + commandEnv := shell.NewCommandEnv(options) + + ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) + commandEnv.MasterClient.WaitUntilConnected(ctx2) + + time.Sleep(5 * time.Second) + + t.Run("fallback_when_same_disktype_full", func(t *testing.T) { + // This test verifies that when evacuating SSD EC shards from a server, + // if no SSD capacity is available on other servers, shards fall back to HDD + + // Upload test data to SSD + testData := []byte("Evacuation fallback test data for SSD volume") + var ssdVolumeId needle.VolumeId + for retry := 0; retry < 5; retry++ { + ssdVolumeId, err = uploadTestDataWithDiskType(testData, "127.0.0.1:9337", "ssd", "fallback_test") + if err == nil { + break + } + t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, err) + time.Sleep(3 * time.Second) + } + if err != nil { + t.Skipf("Could not upload to SSD (may not have SSD capacity): %v", err) + return + } + t.Logf("Created SSD volume %d for fallback test", ssdVolumeId) + + time.Sleep(3 * time.Second) + + // Get lock + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + // EC encode the SSD volume + var encodeOutput bytes.Buffer + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + encodeArgs := []string{ + "-volumeId", fmt.Sprintf("%d", ssdVolumeId), + "-collection", "fallback_test", + "-diskType", "ssd", + "-force", + } + + encodeErr := ecEncodeCmd.Do(encodeArgs, commandEnv, &encodeOutput) + if encodeErr != nil { + t.Logf("EC encoding result: %v", encodeErr) + } + t.Logf("EC encode output: %s", encodeOutput.String()) + + // Now simulate evacuation - the fallback behavior is tested in pickBestDiskOnNode + // When strictDiskType=false (evacuation), it prefers SSD but falls back to HDD + t.Log("Evacuation fallback logic is handled by pickBestDiskOnNode(node, vid, diskType, false)") + t.Log("When strictDiskType=false: prefers same disk type, falls back to other types if needed") + }) + + // Note: The fallback behavior is implemented in pickBestDiskOnNode: + // - strictDiskType=true (balancing): Only matching disk types + // - strictDiskType=false (evacuation): Prefer matching, fallback to other types allowed + // This is tested implicitly through the ec.encode command above which uses the fallback path +} + +// TestCrossRackECPlacement tests that EC shards are distributed across different racks +func TestCrossRackECPlacement(t *testing.T) { + if testing.Short() { + t.Skip("Skipping cross-rack EC placement test in short mode") + } + + testDir, err := os.MkdirTemp("", "seaweedfs_cross_rack_ec_test_") + require.NoError(t, err) + defer os.RemoveAll(testDir) + + ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) + defer cancel() + + // Start a cluster with multiple racks + cluster, err := startMultiRackCluster(ctx, testDir) + require.NoError(t, err) + defer cluster.Stop() + + // Wait for servers to be ready + require.NoError(t, waitForServer("127.0.0.1:9338", 30*time.Second)) + for i := 0; i < 4; i++ { + require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:813%d", i), 30*time.Second)) + } + + time.Sleep(10 * time.Second) + + // Create command environment + options := &shell.ShellOptions{ + Masters: stringPtr("127.0.0.1:9338"), + GrpcDialOption: grpc.WithInsecure(), + FilerGroup: stringPtr("default"), + } + commandEnv := shell.NewCommandEnv(options) + + ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel2() + go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) + commandEnv.MasterClient.WaitUntilConnected(ctx2) + + time.Sleep(5 * time.Second) + + // Upload test data + testData := []byte("Cross-rack EC placement test data - needs to be distributed across racks") + var volumeId needle.VolumeId + for retry := 0; retry < 5; retry++ { + volumeId, err = uploadTestDataToMaster(testData, "127.0.0.1:9338") + if err == nil { + break + } + t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, err) + time.Sleep(3 * time.Second) + } + require.NoError(t, err, "Failed to upload test data after retries") + t.Logf("Created volume %d for cross-rack EC test", volumeId) + + time.Sleep(3 * time.Second) + + t.Run("ec_encode_cross_rack", func(t *testing.T) { + // Get lock + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + // EC encode with rack-aware placement + // Note: uploadTestDataToMaster uses collection "test" by default + var output bytes.Buffer + ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] + args := []string{ + "-volumeId", fmt.Sprintf("%d", volumeId), + "-collection", "test", + "-force", + } + + encodeErr := ecEncodeCmd.Do(args, commandEnv, &output) + t.Logf("EC encode output: %s", output.String()) + + if encodeErr != nil { + t.Logf("EC encoding failed: %v", encodeErr) + } else { + t.Logf("EC encoding completed successfully") + } + }) + + t.Run("verify_cross_rack_distribution", func(t *testing.T) { + // Verify EC shards are spread across different racks + rackDistribution := countShardsPerRack(testDir, uint32(volumeId)) + + t.Logf("Rack-level shard distribution for volume %d:", volumeId) + totalShards := 0 + racksWithShards := 0 + for rack, shardCount := range rackDistribution { + t.Logf(" %s: %d shards", rack, shardCount) + totalShards += shardCount + if shardCount > 0 { + racksWithShards++ + } + } + t.Logf("Summary: %d total shards across %d racks", totalShards, racksWithShards) + + // For 10+4 EC, shards should be distributed across at least 2 racks + if totalShards > 0 { + assert.GreaterOrEqual(t, racksWithShards, 2, "EC shards should span at least 2 racks for fault tolerance") + } + }) + + t.Run("ec_balance_respects_rack_placement", func(t *testing.T) { + // Get lock + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + if err != nil { + t.Logf("Lock command failed: %v", err) + return + } + + unlockCmd := shell.Commands[findCommandIndex("unlock")] + var unlockOutput bytes.Buffer + defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + + initialDistribution := countShardsPerRack(testDir, uint32(volumeId)) + t.Logf("Initial rack distribution: %v", initialDistribution) + + // Run ec.balance - use "test" collection to match uploaded data + var output bytes.Buffer + ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")] + err = ecBalanceCmd.Do([]string{"-collection", "test"}, commandEnv, &output) + if err != nil { + t.Logf("ec.balance error: %v", err) + } + t.Logf("ec.balance output: %s", output.String()) + + finalDistribution := countShardsPerRack(testDir, uint32(volumeId)) + t.Logf("Final rack distribution: %v", finalDistribution) + + // Verify rack distribution is maintained or improved + finalRacksWithShards := 0 + for _, count := range finalDistribution { + if count > 0 { + finalRacksWithShards++ + } + } + + t.Logf("After balance: shards across %d racks", finalRacksWithShards) + }) +} + +// startLimitedSsdCluster starts a cluster with limited SSD capacity (1 SSD, 2 HDD) +func startLimitedSsdCluster(ctx context.Context, dataDir string) (*MultiDiskCluster, error) { + weedBinary := findWeedBinary() + if weedBinary == "" { + return nil, fmt.Errorf("weed binary not found") + } + + cluster := &MultiDiskCluster{testDir: dataDir} + + // Create master directory + masterDir := filepath.Join(dataDir, "master") + os.MkdirAll(masterDir, 0755) + + // Start master server on port 9337 + masterCmd := exec.CommandContext(ctx, weedBinary, "master", + "-port", "9337", + "-mdir", masterDir, + "-volumeSizeLimitMB", "10", + "-ip", "127.0.0.1", + ) + + masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) + if err != nil { + return nil, fmt.Errorf("failed to create master log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, masterLogFile) + masterCmd.Stdout = masterLogFile + masterCmd.Stderr = masterLogFile + + if err := masterCmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start master server: %v", err) + } + cluster.masterCmd = masterCmd + + time.Sleep(2 * time.Second) + + // Start 1 SSD server and 2 HDD servers + // This creates a scenario where SSD capacity is limited + serverConfigs := []struct { + diskType string + rack string + }{ + {"ssd", "rack0"}, // Only 1 SSD server + {"hdd", "rack1"}, + {"hdd", "rack2"}, + } + + for i, config := range serverConfigs { + diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d_%s", i, config.diskType)) + if err := os.MkdirAll(diskDir, 0755); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create disk dir: %v", err) + } + + port := fmt.Sprintf("812%d", i) + + volumeCmd := exec.CommandContext(ctx, weedBinary, "volume", + "-port", port, + "-dir", diskDir, + "-max", "10", + "-mserver", "127.0.0.1:9337", + "-ip", "127.0.0.1", + "-dataCenter", "dc1", + "-rack", config.rack, + "-disk", config.diskType, + ) + + logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i)) + os.MkdirAll(logDir, 0755) + volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log")) + if err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create volume log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, volumeLogFile) + volumeCmd.Stdout = volumeLogFile + volumeCmd.Stderr = volumeLogFile + + if err := volumeCmd.Start(); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to start volume server %d: %v", i, err) + } + cluster.volumeServers = append(cluster.volumeServers, volumeCmd) + } + + time.Sleep(8 * time.Second) + + return cluster, nil +} + +// startMultiRackCluster starts a cluster with 4 servers across 4 racks +func startMultiRackCluster(ctx context.Context, dataDir string) (*MultiDiskCluster, error) { + weedBinary := findWeedBinary() + if weedBinary == "" { + return nil, fmt.Errorf("weed binary not found") + } + + cluster := &MultiDiskCluster{testDir: dataDir} + + // Create master directory + masterDir := filepath.Join(dataDir, "master") + os.MkdirAll(masterDir, 0755) + + // Start master server on port 9338 + masterCmd := exec.CommandContext(ctx, weedBinary, "master", + "-port", "9338", + "-mdir", masterDir, + "-volumeSizeLimitMB", "10", + "-ip", "127.0.0.1", + ) + + masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) + if err != nil { + return nil, fmt.Errorf("failed to create master log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, masterLogFile) + masterCmd.Stdout = masterLogFile + masterCmd.Stderr = masterLogFile + + if err := masterCmd.Start(); err != nil { + return nil, fmt.Errorf("failed to start master server: %v", err) + } + cluster.masterCmd = masterCmd + + time.Sleep(2 * time.Second) + + // Start 4 volume servers, each in a different rack + for i := 0; i < 4; i++ { + diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d", i)) + if err := os.MkdirAll(diskDir, 0755); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create disk dir: %v", err) + } + + port := fmt.Sprintf("813%d", i) + rack := fmt.Sprintf("rack%d", i) + + volumeCmd := exec.CommandContext(ctx, weedBinary, "volume", + "-port", port, + "-dir", diskDir, + "-max", "10", + "-mserver", "127.0.0.1:9338", + "-ip", "127.0.0.1", + "-dataCenter", "dc1", + "-rack", rack, + ) + + logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i)) + os.MkdirAll(logDir, 0755) + volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log")) + if err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to create volume log file: %v", err) + } + cluster.logFiles = append(cluster.logFiles, volumeLogFile) + volumeCmd.Stdout = volumeLogFile + volumeCmd.Stderr = volumeLogFile + + if err := volumeCmd.Start(); err != nil { + cluster.Stop() + return nil, fmt.Errorf("failed to start volume server %d: %v", i, err) + } + cluster.volumeServers = append(cluster.volumeServers, volumeCmd) + } + + time.Sleep(8 * time.Second) + + return cluster, nil +} + +// countShardsPerRack counts EC shards per rack by checking server directories +func countShardsPerRack(testDir string, volumeId uint32) map[string]int { + rackDistribution := make(map[string]int) + + // Map server directories to rack names + // Based on our cluster setup: server0->rack0, server1->rack1, etc. + entries, err := os.ReadDir(testDir) + if err != nil { + return rackDistribution + } + + for _, entry := range entries { + if !entry.IsDir() { + continue + } + + // Check for EC shard files in this directory + serverDir := filepath.Join(testDir, entry.Name()) + shardFiles, err := filepath.Glob(filepath.Join(serverDir, fmt.Sprintf("%d.ec*", volumeId))) + if err != nil { + // filepath.Glob only returns ErrBadPattern for malformed patterns + // Skip this directory if there's an error + continue + } + + if len(shardFiles) > 0 { + // Extract rack name from directory name + // e.g., "server0" -> "rack0", "server1" -> "rack1" + rackName := "unknown" + if strings.HasPrefix(entry.Name(), "server") { + parts := strings.Split(entry.Name(), "_") + if len(parts) > 0 { + serverNum := strings.TrimPrefix(parts[0], "server") + rackName = "rack" + serverNum + } + } + rackDistribution[rackName] += len(shardFiles) + } + } + + return rackDistribution +} |
