aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/erasure_coding/ec_integration_test.go249
1 files changed, 108 insertions, 141 deletions
diff --git a/test/erasure_coding/ec_integration_test.go b/test/erasure_coding/ec_integration_test.go
index bb0983f06..9d7cde572 100644
--- a/test/erasure_coding/ec_integration_test.go
+++ b/test/erasure_coding/ec_integration_test.go
@@ -96,13 +96,12 @@ func TestECEncodingVolumeLocationTimingBug(t *testing.T) {
// This simulates the race condition where EC encoding updates master metadata
// but volume location collection happens after that update
- // First acquire the lock (required for EC encode)
- lockCmd := shell.Commands[findCommandIndex("lock")]
- var lockOutput bytes.Buffer
- err = lockCmd.Do([]string{}, commandEnv, &lockOutput)
- if err != nil {
- t.Logf("Lock command failed: %v", err)
+ // Try to get lock with timeout to avoid hanging
+ locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
+ if !locked {
+ t.Skip("Could not acquire lock within timeout - master may not be ready")
}
+ defer unlock()
// Execute EC encoding - test the timing directly
var encodeOutput bytes.Buffer
@@ -651,6 +650,55 @@ type commandRunner interface {
Do([]string, *shell.CommandEnv, io.Writer) error
}
+// tryLockWithTimeout attempts to acquire the shell lock with a timeout.
+// Returns true if lock was acquired, false if timeout or error occurred.
+// If lock was acquired, the caller must call the returned unlock function.
+func tryLockWithTimeout(t *testing.T, commandEnv *shell.CommandEnv, timeout time.Duration) (locked bool, unlock func()) {
+ t.Helper()
+
+ type lockResult struct {
+ err error
+ output string
+ }
+
+ lockDone := make(chan lockResult, 1)
+ go func() {
+ lockCmd := shell.Commands[findCommandIndex("lock")]
+ var lockOutput bytes.Buffer
+ err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
+ lockDone <- lockResult{err: err, output: lockOutput.String()}
+ }()
+
+ select {
+ case res := <-lockDone:
+ if res.err != nil {
+ t.Logf("lock command failed: %v, output: %s", res.err, res.output)
+ return false, nil
+ }
+ unlockCmd := shell.Commands[findCommandIndex("unlock")]
+ return true, func() {
+ var unlockOutput bytes.Buffer
+ _ = unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
+ }
+ case <-time.After(timeout):
+ t.Logf("timed out acquiring lock after %s", timeout)
+ return false, nil
+ }
+}
+
+// connectToMasterAndSync connects the command environment to the master and waits for sync.
+// This helper reduces code duplication across test functions.
+func connectToMasterAndSync(ctx context.Context, t *testing.T, commandEnv *shell.CommandEnv) {
+ t.Helper()
+ // Connect to master - use the main test context to avoid early disconnection
+ go commandEnv.MasterClient.KeepConnectedToMaster(ctx)
+ commandEnv.MasterClient.WaitUntilConnected(ctx)
+
+ // Wait for master client to fully sync
+ t.Log("Waiting for master client to sync...")
+ time.Sleep(5 * time.Second)
+}
+
// captureCommandOutput executes a shell command and captures its output from both
// stdout/stderr and the command's buffer. This reduces code duplication in tests.
func captureCommandOutput(t *testing.T, cmd commandRunner, args []string, commandEnv *shell.CommandEnv) (output string, err error) {
@@ -661,17 +709,16 @@ func captureCommandOutput(t *testing.T, cmd commandRunner, args []string, comman
r, w, pipeErr := os.Pipe()
require.NoError(t, pipeErr)
- defer func() {
- _ = w.Close()
- os.Stdout = oldStdout
- os.Stderr = oldStderr
- }()
-
os.Stdout = w
os.Stderr = w
cmdErr := cmd.Do(args, commandEnv, &outBuf)
+ // Close write end BEFORE reading to signal EOF to the reader
+ _ = w.Close()
+ os.Stdout = oldStdout
+ os.Stderr = oldStderr
+
capturedOutput, readErr := io.ReadAll(r)
_ = r.Close()
require.NoError(t, readErr)
@@ -753,14 +800,7 @@ func TestDiskAwareECRebalancing(t *testing.T) {
}
commandEnv := shell.NewCommandEnv(options)
- // Connect to master with longer timeout
- ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
- defer cancel2()
- go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
- commandEnv.MasterClient.WaitUntilConnected(ctx2)
-
- // Wait for master client to fully sync
- time.Sleep(5 * time.Second)
+ connectToMasterAndSync(ctx, t, commandEnv)
// Upload test data to create a volume - retry if volumes not ready
var volumeId needle.VolumeId
@@ -1194,14 +1234,7 @@ func TestECDiskTypeSupport(t *testing.T) {
}
commandEnv := shell.NewCommandEnv(options)
- // Connect to master with longer timeout
- ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
- defer cancel2()
- go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
- commandEnv.MasterClient.WaitUntilConnected(ctx2)
-
- // Wait for master client to fully sync
- time.Sleep(5 * time.Second)
+ connectToMasterAndSync(ctx, t, commandEnv)
// Upload test data to create a volume - retry if volumes not ready
var volumeId needle.VolumeId
@@ -1228,19 +1261,12 @@ func TestECDiskTypeSupport(t *testing.T) {
})
t.Run("ec_encode_with_ssd_disktype", func(t *testing.T) {
- // Get lock first
- lockCmd := shell.Commands[findCommandIndex("lock")]
- var lockOutput bytes.Buffer
- err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
- if err != nil {
- t.Logf("Lock command failed: %v", err)
- return
+ // Try to get lock with timeout to avoid hanging
+ locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
+ if !locked {
+ t.Skip("Could not acquire lock within timeout - master may not be ready")
}
-
- // Defer unlock to ensure it's always released
- unlockCmd := shell.Commands[findCommandIndex("unlock")]
- var unlockOutput bytes.Buffer
- defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
+ defer unlock()
// Execute EC encoding with SSD disk type
ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
@@ -1270,19 +1296,12 @@ func TestECDiskTypeSupport(t *testing.T) {
})
t.Run("ec_balance_with_ssd_disktype", func(t *testing.T) {
- // Get lock first
- lockCmd := shell.Commands[findCommandIndex("lock")]
- var lockOutput bytes.Buffer
- err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
- if err != nil {
- t.Logf("Lock command failed: %v", err)
- return
+ // Try to get lock with timeout to avoid hanging
+ locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
+ if !locked {
+ t.Skip("Could not acquire lock within timeout - master may not be ready")
}
-
- // Defer unlock to ensure it's always released
- unlockCmd := shell.Commands[findCommandIndex("unlock")]
- var unlockOutput bytes.Buffer
- defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
+ defer unlock()
// Execute EC balance with SSD disk type
ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")]
@@ -1325,19 +1344,12 @@ func TestECDiskTypeSupport(t *testing.T) {
})
t.Run("ec_encode_with_source_disktype", func(t *testing.T) {
- // Test that -sourceDiskType flag is accepted
- lockCmd := shell.Commands[findCommandIndex("lock")]
- var lockOutput bytes.Buffer
- err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
- if err != nil {
- t.Logf("Lock command failed: %v", err)
- return
+ // Try to get lock with timeout to avoid hanging
+ locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
+ if !locked {
+ t.Skip("Could not acquire lock within timeout - master may not be ready")
}
-
- // Defer unlock to ensure it's always released
- unlockCmd := shell.Commands[findCommandIndex("unlock")]
- var unlockOutput bytes.Buffer
- defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
+ defer unlock()
// Execute EC encoding with sourceDiskType filter
ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
@@ -1366,19 +1378,12 @@ func TestECDiskTypeSupport(t *testing.T) {
})
t.Run("ec_decode_with_disktype", func(t *testing.T) {
- // Test that ec.decode accepts -diskType flag
- lockCmd := shell.Commands[findCommandIndex("lock")]
- var lockOutput bytes.Buffer
- err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
- if err != nil {
- t.Logf("Lock command failed: %v", err)
- return
+ // Try to get lock with timeout to avoid hanging
+ locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
+ if !locked {
+ t.Skip("Could not acquire lock within timeout - master may not be ready")
}
-
- // Defer unlock to ensure it's always released
- unlockCmd := shell.Commands[findCommandIndex("unlock")]
- var unlockOutput bytes.Buffer
- defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
+ defer unlock()
// Execute EC decode with disk type
ecDecodeCmd := shell.Commands[findCommandIndex("ec.decode")]
@@ -1424,6 +1429,7 @@ func startClusterWithDiskType(ctx context.Context, dataDir string, diskType stri
"-mdir", masterDir,
"-volumeSizeLimitMB", "10",
"-ip", "127.0.0.1",
+ "-peers", "none",
)
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
@@ -1569,14 +1575,7 @@ func TestECDiskTypeMixedCluster(t *testing.T) {
}
commandEnv := shell.NewCommandEnv(options)
- // Connect to master with longer timeout
- ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
- defer cancel2()
- go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
- commandEnv.MasterClient.WaitUntilConnected(ctx2)
-
- // Wait for master client to fully sync
- time.Sleep(5 * time.Second)
+ connectToMasterAndSync(ctx, t, commandEnv)
t.Run("upload_to_ssd_and_hdd", func(t *testing.T) {
// Upload to SSD
@@ -1611,19 +1610,12 @@ func TestECDiskTypeMixedCluster(t *testing.T) {
})
t.Run("ec_balance_targets_correct_disk_type", func(t *testing.T) {
- // Get lock first
- lockCmd := shell.Commands[findCommandIndex("lock")]
- var lockOutput bytes.Buffer
- err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
- if err != nil {
- t.Logf("Lock command failed: %v", err)
- return
+ // Try to get lock with timeout to avoid hanging
+ locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
+ if !locked {
+ t.Skip("Could not acquire lock within timeout - master may not be ready")
}
-
- // Defer unlock to ensure it's always released
- unlockCmd := shell.Commands[findCommandIndex("unlock")]
- var unlockOutput bytes.Buffer
- defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
+ defer unlock()
// Run ec.balance for SSD collection with -diskType=ssd
var ssdOutput bytes.Buffer
@@ -1669,6 +1661,7 @@ func startMixedDiskTypeCluster(ctx context.Context, dataDir string) (*MultiDiskC
"-mdir", masterDir,
"-volumeSizeLimitMB", "10",
"-ip", "127.0.0.1",
+ "-peers", "none",
)
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
@@ -1770,12 +1763,7 @@ func TestEvacuationFallbackBehavior(t *testing.T) {
}
commandEnv := shell.NewCommandEnv(options)
- ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
- defer cancel2()
- go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
- commandEnv.MasterClient.WaitUntilConnected(ctx2)
-
- time.Sleep(5 * time.Second)
+ connectToMasterAndSync(ctx, t, commandEnv)
t.Run("fallback_when_same_disktype_full", func(t *testing.T) {
// This test verifies that when evacuating SSD EC shards from a server,
@@ -1800,18 +1788,12 @@ func TestEvacuationFallbackBehavior(t *testing.T) {
time.Sleep(3 * time.Second)
- // Get lock
- lockCmd := shell.Commands[findCommandIndex("lock")]
- var lockOutput bytes.Buffer
- err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
- if err != nil {
- t.Logf("Lock command failed: %v", err)
- return
+ // Try to get lock with timeout to avoid hanging
+ locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
+ if !locked {
+ t.Skip("Could not acquire lock within timeout - master may not be ready")
}
-
- unlockCmd := shell.Commands[findCommandIndex("unlock")]
- var unlockOutput bytes.Buffer
- defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
+ defer unlock()
// EC encode the SSD volume
var encodeOutput bytes.Buffer
@@ -1875,12 +1857,7 @@ func TestCrossRackECPlacement(t *testing.T) {
}
commandEnv := shell.NewCommandEnv(options)
- ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
- defer cancel2()
- go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
- commandEnv.MasterClient.WaitUntilConnected(ctx2)
-
- time.Sleep(5 * time.Second)
+ connectToMasterAndSync(ctx, t, commandEnv)
// Upload test data
testData := []byte("Cross-rack EC placement test data - needs to be distributed across racks")
@@ -1899,18 +1876,12 @@ func TestCrossRackECPlacement(t *testing.T) {
time.Sleep(3 * time.Second)
t.Run("ec_encode_cross_rack", func(t *testing.T) {
- // Get lock
- lockCmd := shell.Commands[findCommandIndex("lock")]
- var lockOutput bytes.Buffer
- err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
- if err != nil {
- t.Logf("Lock command failed: %v", err)
- return
+ // Try to get lock with timeout to avoid hanging
+ locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
+ if !locked {
+ t.Skip("Could not acquire lock within timeout - master may not be ready")
}
-
- unlockCmd := shell.Commands[findCommandIndex("unlock")]
- var unlockOutput bytes.Buffer
- defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
+ defer unlock()
// EC encode with rack-aware placement
// Note: uploadTestDataToMaster uses collection "test" by default
@@ -1955,18 +1926,12 @@ func TestCrossRackECPlacement(t *testing.T) {
})
t.Run("ec_balance_respects_rack_placement", func(t *testing.T) {
- // Get lock
- lockCmd := shell.Commands[findCommandIndex("lock")]
- var lockOutput bytes.Buffer
- err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
- if err != nil {
- t.Logf("Lock command failed: %v", err)
- return
+ // Try to get lock with timeout to avoid hanging
+ locked, unlock := tryLockWithTimeout(t, commandEnv, 30*time.Second)
+ if !locked {
+ t.Skip("Could not acquire lock within timeout - master may not be ready")
}
-
- unlockCmd := shell.Commands[findCommandIndex("unlock")]
- var unlockOutput bytes.Buffer
- defer unlockCmd.Do([]string{}, commandEnv, &unlockOutput)
+ defer unlock()
initialDistribution := countShardsPerRack(testDir, uint32(volumeId))
t.Logf("Initial rack distribution: %v", initialDistribution)
@@ -2014,6 +1979,7 @@ func startLimitedSsdCluster(ctx context.Context, dataDir string) (*MultiDiskClus
"-mdir", masterDir,
"-volumeSizeLimitMB", "10",
"-ip", "127.0.0.1",
+ "-peers", "none",
)
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
@@ -2104,6 +2070,7 @@ func startMultiRackCluster(ctx context.Context, dataDir string) (*MultiDiskClust
"-mdir", masterDir,
"-volumeSizeLimitMB", "10",
"-ip", "127.0.0.1",
+ "-peers", "none",
)
masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))