diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-12-11 00:22:32 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-12-11 00:22:32 -0800 |
| commit | c6d6ee8297b7bb569d52b22fa677613514475ce2 (patch) | |
| tree | 85bf1604ff1cabff6f0522a32b0f43acade75ba5 /test | |
| parent | de3ecaf0de03a7d2ee1f8cf7516291b3b3b56122 (diff) | |
| download | seaweedfs-c6d6ee8297b7bb569d52b22fa677613514475ce2.tar.xz seaweedfs-c6d6ee8297b7bb569d52b22fa677613514475ce2.zip | |
test: fix master client timeout causing test hangs (#7715)
* test: fix master client timeout causing test hangs
Use the main test context for KeepConnectedToMaster instead of creating
a separate 60s context. The tests have 180s outer timeouts but the master
client was disconnecting after 60s, causing subsequent commands to hang
waiting for reconnection.
* test: add -peers=none to all test masters and timeout for lock
- Add -peers=none flag to all master servers for faster startup
- Add tryLockWithTimeout helper to avoid tests hanging on lock acquisition
- Skip tests if lock cannot be acquired within 30 seconds
* test: extract connectToMasterAndSync helper to reduce duplication
* test: fix captureCommandOutput pipe deadlock
Close write end of pipe before calling io.ReadAll to signal EOF,
otherwise ReadAll blocks forever waiting for more data.
* test: fix tryLockWithTimeout to check lock command errors
Propagate lock command error through channel and only treat as
locked if command succeeded. Previously any completion (including
errors) was treated as successful lock acquisition.
Diffstat (limited to 'test')
| -rw-r--r-- | test/erasure_coding/ec_integration_test.go | 249 |
1 files changed, 108 insertions, 141 deletions
diff --git a/test/erasure_coding/ec_integration_test.go b/test/erasure_coding/ec_integration_test.go index bb0983f06..9d7cde572 100644 --- a/test/erasure_coding/ec_integration_test.go +++ b/test/erasure_coding/ec_integration_test.go @@ -96,13 +96,12 @@ func TestECEncodingVolumeLocationTimingBug(t *testing.T) { // This simulates the race condition where EC encoding updates master metadata // but volume location collection happens after that update - // First acquire the lock (required for EC encode) - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err = lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } + defer unlock() // Execute EC encoding - test the timing directly var encodeOutput bytes.Buffer @@ -651,6 +650,55 @@ type commandRunner interface { Do([]string, *shell.CommandEnv, io.Writer) error } +// tryLockWithTimeout attempts to acquire the shell lock with a timeout. +// Returns true if lock was acquired, false if timeout or error occurred. +// If lock was acquired, the caller must call the returned unlock function. +func tryLockWithTimeout(t *testing.T, commandEnv *shell.CommandEnv, timeout time.Duration) (locked bool, unlock func()) { + t.Helper() + + type lockResult struct { + err error + output string + } + + lockDone := make(chan lockResult, 1) + go func() { + lockCmd := shell.Commands[findCommandIndex("lock")] + var lockOutput bytes.Buffer + err := lockCmd.Do([]string{}, commandEnv, &lockOutput) + lockDone <- lockResult{err: err, output: lockOutput.String()} + }() + + select { + case res := <-lockDone: + if res.err != nil { + t.Logf("lock command failed: %v, output: %s", res.err, res.output) + return false, nil + } + unlockCmd := shell.Commands[findCommandIndex("unlock")] + return true, func() { + var unlockOutput bytes.Buffer + _ = unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + } + case <-time.After(timeout): + t.Logf("timed out acquiring lock after %s", timeout) + return false, nil + } +} + +// connectToMasterAndSync connects the command environment to the master and waits for sync. +// This helper reduces code duplication across test functions. +func connectToMasterAndSync(ctx context.Context, t *testing.T, commandEnv *shell.CommandEnv) { + t.Helper() + // Connect to master - use the main test context to avoid early disconnection + go commandEnv.MasterClient.KeepConnectedToMaster(ctx) + commandEnv.MasterClient.WaitUntilConnected(ctx) + + // Wait for master client to fully sync + t.Log("Waiting for master client to sync...") + time.Sleep(5 * time.Second) +} + // captureCommandOutput executes a shell command and captures its output from both // stdout/stderr and the command's buffer. This reduces code duplication in tests. func captureCommandOutput(t *testing.T, cmd commandRunner, args []string, commandEnv *shell.CommandEnv) (output string, err error) { @@ -661,17 +709,16 @@ func captureCommandOutput(t *testing.T, cmd commandRunner, args []string, comman r, w, pipeErr := os.Pipe() require.NoError(t, pipeErr) - defer func() { - _ = w.Close() - os.Stdout = oldStdout - os.Stderr = oldStderr - }() - os.Stdout = w os.Stderr = w cmdErr := cmd.Do(args, commandEnv, &outBuf) + // Close write end BEFORE reading to signal EOF to the reader + _ = w.Close() + os.Stdout = oldStdout + os.Stderr = oldStderr + capturedOutput, readErr := io.ReadAll(r) _ = r.Close() require.NoError(t, readErr) @@ -753,14 +800,7 @@ func TestDiskAwareECRebalancing(t *testing.T) { } commandEnv := shell.NewCommandEnv(options) - // Connect to master with longer timeout - ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) - commandEnv.MasterClient.WaitUntilConnected(ctx2) - - // Wait for master client to fully sync - time.Sleep(5 * time.Second) + connectToMasterAndSync(ctx, t, commandEnv) // Upload test data to create a volume - retry if volumes not ready var volumeId needle.VolumeId @@ -1194,14 +1234,7 @@ func TestECDiskTypeSupport(t *testing.T) { } commandEnv := shell.NewCommandEnv(options) - // Connect to master with longer timeout - ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) - commandEnv.MasterClient.WaitUntilConnected(ctx2) - - // Wait for master client to fully sync - time.Sleep(5 * time.Second) + connectToMasterAndSync(ctx, t, commandEnv) // Upload test data to create a volume - retry if volumes not ready var volumeId needle.VolumeId @@ -1228,19 +1261,12 @@ func TestECDiskTypeSupport(t *testing.T) { }) t.Run("ec_encode_with_ssd_disktype", func(t *testing.T) { - // Get lock first - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - // Defer unlock to ensure it's always released - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() // Execute EC encoding with SSD disk type ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] @@ -1270,19 +1296,12 @@ func TestECDiskTypeSupport(t *testing.T) { }) t.Run("ec_balance_with_ssd_disktype", func(t *testing.T) { - // Get lock first - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - // Defer unlock to ensure it's always released - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() // Execute EC balance with SSD disk type ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")] @@ -1325,19 +1344,12 @@ func TestECDiskTypeSupport(t *testing.T) { }) t.Run("ec_encode_with_source_disktype", func(t *testing.T) { - // Test that -sourceDiskType flag is accepted - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - // Defer unlock to ensure it's always released - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() // Execute EC encoding with sourceDiskType filter ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")] @@ -1366,19 +1378,12 @@ func TestECDiskTypeSupport(t *testing.T) { }) t.Run("ec_decode_with_disktype", func(t *testing.T) { - // Test that ec.decode accepts -diskType flag - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - // Defer unlock to ensure it's always released - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() // Execute EC decode with disk type ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")] @@ -1424,6 +1429,7 @@ func startClusterWithDiskType(ctx context.Context, dataDir string, diskType stri "-mdir", masterDir, "-volumeSizeLimitMB", "10", "-ip", "127.0.0.1", + "-peers", "none", ) masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) @@ -1569,14 +1575,7 @@ func TestECDiskTypeMixedCluster(t *testing.T) { } commandEnv := shell.NewCommandEnv(options) - // Connect to master with longer timeout - ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) - commandEnv.MasterClient.WaitUntilConnected(ctx2) - - // Wait for master client to fully sync - time.Sleep(5 * time.Second) + connectToMasterAndSync(ctx, t, commandEnv) t.Run("upload_to_ssd_and_hdd", func(t *testing.T) { // Upload to SSD @@ -1611,19 +1610,12 @@ func TestECDiskTypeMixedCluster(t *testing.T) { }) t.Run("ec_balance_targets_correct_disk_type", func(t *testing.T) { - // Get lock first - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - // Defer unlock to ensure it's always released - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() // Run ec.balance for SSD collection with -diskType=ssd var ssdOutput bytes.Buffer @@ -1669,6 +1661,7 @@ func startMixedDiskTypeCluster(ctx context.Context, dataDir string) (*MultiDiskC "-mdir", masterDir, "-volumeSizeLimitMB", "10", "-ip", "127.0.0.1", + "-peers", "none", ) masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) @@ -1770,12 +1763,7 @@ func TestEvacuationFallbackBehavior(t *testing.T) { } commandEnv := shell.NewCommandEnv(options) - ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) - commandEnv.MasterClient.WaitUntilConnected(ctx2) - - time.Sleep(5 * time.Second) + connectToMasterAndSync(ctx, t, commandEnv) t.Run("fallback_when_same_disktype_full", func(t *testing.T) { // This test verifies that when evacuating SSD EC shards from a server, @@ -1800,18 +1788,12 @@ func TestEvacuationFallbackBehavior(t *testing.T) { time.Sleep(3 * time.Second) - // Get lock - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() // EC encode the SSD volume var encodeOutput bytes.Buffer @@ -1875,12 +1857,7 @@ func TestCrossRackECPlacement(t *testing.T) { } commandEnv := shell.NewCommandEnv(options) - ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel2() - go commandEnv.MasterClient.KeepConnectedToMaster(ctx2) - commandEnv.MasterClient.WaitUntilConnected(ctx2) - - time.Sleep(5 * time.Second) + connectToMasterAndSync(ctx, t, commandEnv) // Upload test data testData := []byte("Cross-rack EC placement test data - needs to be distributed across racks") @@ -1899,18 +1876,12 @@ func TestCrossRackECPlacement(t *testing.T) { time.Sleep(3 * time.Second) t.Run("ec_encode_cross_rack", func(t *testing.T) { - // Get lock - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() // EC encode with rack-aware placement // Note: uploadTestDataToMaster uses collection "test" by default @@ -1955,18 +1926,12 @@ func TestCrossRackECPlacement(t *testing.T) { }) t.Run("ec_balance_respects_rack_placement", func(t *testing.T) { - // Get lock - lockCmd := shell.Commands[findCommandIndex("lock")] - var lockOutput bytes.Buffer - err := lockCmd.Do([]string{}, commandEnv, &lockOutput) - if err != nil { - t.Logf("Lock command failed: %v", err) - return + // Try to get lock with timeout to avoid hanging + locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second) + if !locked { + t.Skip("Could not acquire lock within timeout - master may not be ready") } - - unlockCmd := shell.Commands[findCommandIndex("unlock")] - var unlockOutput bytes.Buffer - defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput) + defer unlock() initialDistribution := countShardsPerRack(testDir, uint32(volumeId)) t.Logf("Initial rack distribution: %v", initialDistribution) @@ -2014,6 +1979,7 @@ func startLimitedSsdCluster(ctx context.Context, dataDir string) (*MultiDiskClus "-mdir", masterDir, "-volumeSizeLimitMB", "10", "-ip", "127.0.0.1", + "-peers", "none", ) masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) @@ -2104,6 +2070,7 @@ func startMultiRackCluster(ctx context.Context, dataDir string) (*MultiDiskClust "-mdir", masterDir, "-volumeSizeLimitMB", "10", "-ip", "127.0.0.1", + "-peers", "none", ) masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log")) |
