aboutsummaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/erasure_coding/ec_integration_test.go438
1 files changed, 437 insertions, 1 deletions
diff --git a/test/erasure_coding/ec_integration_test.go b/test/erasure_coding/ec_integration_test.go
index 87b9b40ba..67f8eed04 100644
--- a/test/erasure_coding/ec_integration_test.go
+++ b/test/erasure_coding/ec_integration_test.go
@@ -5,9 +5,11 @@ import (
"context"
"fmt"
"io"
+ "math"
"os"
"os/exec"
"path/filepath"
+ "strings"
"testing"
"time"
@@ -139,6 +141,15 @@ func TestECEncodingVolumeLocationTimingBug(t *testing.T) {
t.Logf("EC encoding completed successfully")
}
+ // Add detailed logging for EC encoding command
+ t.Logf("Debug: Executing EC encoding command for volume %d", volumeId)
+ t.Logf("Debug: Command arguments: %v", args)
+ if err != nil {
+ t.Logf("Debug: EC encoding command failed with error: %v", err)
+ } else {
+ t.Logf("Debug: EC encoding command completed successfully")
+ }
+
// The key test: check if the fix prevents the timing issue
if contains(outputStr, "Collecting volume locations") && contains(outputStr, "before EC encoding") {
t.Logf("FIX DETECTED: Volume locations collected BEFORE EC encoding (timing bug prevented)")
@@ -526,7 +537,8 @@ func uploadTestData(data []byte, masterAddress string) (needle.VolumeId, error)
func getVolumeLocations(commandEnv *shell.CommandEnv, volumeId needle.VolumeId) ([]string, error) {
// Retry mechanism to handle timing issues with volume registration
- for i := 0; i < 10; i++ {
+ // Increase retry attempts for volume location retrieval
+ for i := 0; i < 20; i++ { // Increased from 10 to 20 retries
locations, ok := commandEnv.MasterClient.GetLocationsClone(uint32(volumeId))
if ok {
var result []string
@@ -646,3 +658,427 @@ func TestECEncodingRegressionPrevention(t *testing.T) {
t.Log("Timing pattern regression test passed")
})
}
+
+// TestDiskAwareECRebalancing tests EC shard placement across multiple disks per server
+// This verifies the disk-aware EC rebalancing feature works correctly
+func TestDiskAwareECRebalancing(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping disk-aware integration test in short mode")
+ }
+
+ testDir, err := os.MkdirTemp("", "seaweedfs_disk_aware_ec_test_")
+ require.NoError(t, err)
+ defer os.RemoveAll(testDir)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
+ defer cancel()
+
+ // Start cluster with MULTIPLE DISKS per volume server
+ cluster, err := startMultiDiskCluster(ctx, testDir)
+ require.NoError(t, err)
+ defer cluster.Stop()
+
+ // Wait for servers to be ready
+ require.NoError(t, waitForServer("127.0.0.1:9334", 30*time.Second))
+ for i := 0; i < 3; i++ {
+ require.NoError(t, waitForServer(fmt.Sprintf("127.0.0.1:809%d", i), 30*time.Second))
+ }
+
+ // Wait longer for volume servers to register with master and create volumes
+ t.Log("Waiting for volume servers to register with master...")
+ time.Sleep(10 * time.Second)
+
+ // Create command environment
+ options := &shell.ShellOptions{
+ Masters: stringPtr("127.0.0.1:9334"),
+ GrpcDialOption: grpc.WithInsecure(),
+ FilerGroup: stringPtr("default"),
+ }
+ commandEnv := shell.NewCommandEnv(options)
+
+ // Connect to master with longer timeout
+ ctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
+ defer cancel2()
+ go commandEnv.MasterClient.KeepConnectedToMaster(ctx2)
+ commandEnv.MasterClient.WaitUntilConnected(ctx2)
+
+ // Wait for master client to fully sync
+ time.Sleep(5 * time.Second)
+
+ // Upload test data to create a volume - retry if volumes not ready
+ var volumeId needle.VolumeId
+ testData := []byte("Disk-aware EC rebalancing test data - this needs to be large enough to create a volume")
+ for retry := 0; retry < 5; retry++ {
+ volumeId, err = uploadTestDataToMaster(testData, "127.0.0.1:9334")
+ if err == nil {
+ break
+ }
+ t.Logf("Upload attempt %d failed: %v, retrying...", retry+1, err)
+ time.Sleep(3 * time.Second)
+ }
+ require.NoError(t, err, "Failed to upload test data after retries")
+ t.Logf("Created volume %d for disk-aware EC test", volumeId)
+
+ // Wait for volume to be registered
+ time.Sleep(3 * time.Second)
+
+ t.Run("verify_multi_disk_setup", func(t *testing.T) {
+ // Verify that each server has multiple disk directories
+ for server := 0; server < 3; server++ {
+ diskCount := 0
+ for disk := 0; disk < 4; disk++ {
+ diskDir := filepath.Join(testDir, fmt.Sprintf("server%d_disk%d", server, disk))
+ if _, err := os.Stat(diskDir); err == nil {
+ diskCount++
+ }
+ }
+ assert.Equal(t, 4, diskCount, "Server %d should have 4 disk directories", server)
+ t.Logf("Server %d has %d disk directories", server, diskCount)
+ }
+ })
+
+ t.Run("ec_encode_with_disk_awareness", func(t *testing.T) {
+ // Get lock first
+ lockCmd := shell.Commands[findCommandIndex("lock")]
+ var lockOutput bytes.Buffer
+ err := lockCmd.Do([]string{}, commandEnv, &lockOutput)
+ if err != nil {
+ t.Logf("Lock command failed: %v", err)
+ }
+
+ // Execute EC encoding
+ var output bytes.Buffer
+ ecEncodeCmd := shell.Commands[findCommandIndex("ec.encode")]
+ args := []string{"-volumeId", fmt.Sprintf("%d", volumeId), "-collection", "test", "-force"}
+
+ // Capture output
+ oldStdout := os.Stdout
+ oldStderr := os.Stderr
+ r, w, _ := os.Pipe()
+ os.Stdout = w
+ os.Stderr = w
+
+ err = ecEncodeCmd.Do(args, commandEnv, &output)
+
+ w.Close()
+ os.Stdout = oldStdout
+ os.Stderr = oldStderr
+
+ capturedOutput, _ := io.ReadAll(r)
+ outputStr := string(capturedOutput) + output.String()
+
+ t.Logf("EC encode output:\n%s", outputStr)
+
+ if err != nil {
+ t.Logf("EC encoding completed with error: %v", err)
+ } else {
+ t.Logf("EC encoding completed successfully")
+ }
+ })
+
+ t.Run("verify_disk_level_shard_distribution", func(t *testing.T) {
+ // Wait for shards to be distributed
+ time.Sleep(2 * time.Second)
+
+ // Count shards on each disk of each server
+ diskDistribution := countShardsPerDisk(testDir, uint32(volumeId))
+
+ totalShards := 0
+ disksWithShards := 0
+ maxShardsOnSingleDisk := 0
+
+ t.Logf("Disk-level shard distribution for volume %d:", volumeId)
+ for server, disks := range diskDistribution {
+ for diskId, shardCount := range disks {
+ if shardCount > 0 {
+ t.Logf(" %s disk %d: %d shards", server, diskId, shardCount)
+ totalShards += shardCount
+ disksWithShards++
+ if shardCount > maxShardsOnSingleDisk {
+ maxShardsOnSingleDisk = shardCount
+ }
+ }
+ }
+ }
+
+ t.Logf("Summary: %d total shards across %d disks (max %d on single disk)",
+ totalShards, disksWithShards, maxShardsOnSingleDisk)
+
+ // EC creates 14 shards (10 data + 4 parity), plus .ecx and .ecj files
+ // We should see shards distributed across multiple disks
+ if disksWithShards > 1 {
+ t.Logf("PASS: Shards distributed across %d disks", disksWithShards)
+ } else {
+ t.Logf("INFO: Shards on %d disk(s) - may be expected if volume was on single disk", disksWithShards)
+ }
+ })
+
+ t.Run("test_ec_balance_disk_awareness", func(t *testing.T) {
+ // Calculate initial disk balance variance
+ initialDistribution := countShardsPerDisk(testDir, uint32(volumeId))
+ initialVariance := calculateDiskShardVariance(initialDistribution)
+ t.Logf("Initial disk shard variance: %.2f", initialVariance)
+
+ // Run ec.balance command
+ var output bytes.Buffer
+ ecBalanceCmd := shell.Commands[findCommandIndex("ec.balance")]
+
+ oldStdout := os.Stdout
+ oldStderr := os.Stderr
+ r, w, _ := os.Pipe()
+ os.Stdout = w
+ os.Stderr = w
+
+ err := ecBalanceCmd.Do([]string{"-force"}, commandEnv, &output)
+
+ w.Close()
+ os.Stdout = oldStdout
+ os.Stderr = oldStderr
+
+ capturedOutput, _ := io.ReadAll(r)
+ outputStr := string(capturedOutput) + output.String()
+
+ if err != nil {
+ t.Logf("ec.balance error: %v", err)
+ }
+ t.Logf("ec.balance output:\n%s", outputStr)
+
+ // Wait for balance to complete
+ time.Sleep(2 * time.Second)
+
+ // Calculate final disk balance variance
+ finalDistribution := countShardsPerDisk(testDir, uint32(volumeId))
+ finalVariance := calculateDiskShardVariance(finalDistribution)
+ t.Logf("Final disk shard variance: %.2f", finalVariance)
+
+ t.Logf("Variance change: %.2f -> %.2f", initialVariance, finalVariance)
+ })
+
+ t.Run("verify_no_disk_overload", func(t *testing.T) {
+ // Verify that no single disk has too many shards of the same volume
+ diskDistribution := countShardsPerDisk(testDir, uint32(volumeId))
+
+ for server, disks := range diskDistribution {
+ for diskId, shardCount := range disks {
+ // With 14 EC shards and 12 disks (3 servers x 4 disks), ideally ~1-2 shards per disk
+ // Allow up to 4 shards per disk as a reasonable threshold
+ if shardCount > 4 {
+ t.Logf("WARNING: %s disk %d has %d shards (may indicate imbalance)",
+ server, diskId, shardCount)
+ }
+ }
+ }
+ })
+}
+
+// MultiDiskCluster represents a test cluster with multiple disks per volume server
+type MultiDiskCluster struct {
+ masterCmd *exec.Cmd
+ volumeServers []*exec.Cmd
+ testDir string
+}
+
+func (c *MultiDiskCluster) Stop() {
+ // Stop volume servers first
+ for _, cmd := range c.volumeServers {
+ if cmd != nil && cmd.Process != nil {
+ cmd.Process.Kill()
+ cmd.Wait()
+ }
+ }
+
+ // Stop master server
+ if c.masterCmd != nil && c.masterCmd.Process != nil {
+ c.masterCmd.Process.Kill()
+ c.masterCmd.Wait()
+ }
+}
+
+// startMultiDiskCluster starts a SeaweedFS cluster with multiple disks per volume server
+func startMultiDiskCluster(ctx context.Context, dataDir string) (*MultiDiskCluster, error) {
+ weedBinary := findWeedBinary()
+ if weedBinary == "" {
+ return nil, fmt.Errorf("weed binary not found")
+ }
+
+ cluster := &MultiDiskCluster{testDir: dataDir}
+
+ // Create master directory
+ masterDir := filepath.Join(dataDir, "master")
+ os.MkdirAll(masterDir, 0755)
+
+ // Start master server on a different port to avoid conflict
+ masterCmd := exec.CommandContext(ctx, weedBinary, "master",
+ "-port", "9334",
+ "-mdir", masterDir,
+ "-volumeSizeLimitMB", "10",
+ "-ip", "127.0.0.1",
+ )
+
+ masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
+ if err != nil {
+ return nil, fmt.Errorf("failed to create master log file: %v", err)
+ }
+ masterCmd.Stdout = masterLogFile
+ masterCmd.Stderr = masterLogFile
+
+ if err := masterCmd.Start(); err != nil {
+ return nil, fmt.Errorf("failed to start master server: %v", err)
+ }
+ cluster.masterCmd = masterCmd
+
+ // Wait for master to be ready
+ time.Sleep(2 * time.Second)
+
+ // Start 3 volume servers, each with 4 disks
+ const numServers = 3
+ const disksPerServer = 4
+
+ for i := 0; i < numServers; i++ {
+ // Create 4 disk directories per server
+ var diskDirs []string
+ var maxVolumes []string
+
+ for d := 0; d < disksPerServer; d++ {
+ diskDir := filepath.Join(dataDir, fmt.Sprintf("server%d_disk%d", i, d))
+ if err := os.MkdirAll(diskDir, 0755); err != nil {
+ cluster.Stop()
+ return nil, fmt.Errorf("failed to create disk dir: %v", err)
+ }
+ diskDirs = append(diskDirs, diskDir)
+ maxVolumes = append(maxVolumes, "5")
+ }
+
+ port := fmt.Sprintf("809%d", i)
+ rack := fmt.Sprintf("rack%d", i)
+
+ volumeCmd := exec.CommandContext(ctx, weedBinary, "volume",
+ "-port", port,
+ "-dir", strings.Join(diskDirs, ","),
+ "-max", strings.Join(maxVolumes, ","),
+ "-mserver", "127.0.0.1:9334",
+ "-ip", "127.0.0.1",
+ "-dataCenter", "dc1",
+ "-rack", rack,
+ )
+
+ // Create log file for this volume server
+ logDir := filepath.Join(dataDir, fmt.Sprintf("server%d_logs", i))
+ os.MkdirAll(logDir, 0755)
+ volumeLogFile, err := os.Create(filepath.Join(logDir, "volume.log"))
+ if err != nil {
+ cluster.Stop()
+ return nil, fmt.Errorf("failed to create volume log file: %v", err)
+ }
+ volumeCmd.Stdout = volumeLogFile
+ volumeCmd.Stderr = volumeLogFile
+
+ if err := volumeCmd.Start(); err != nil {
+ cluster.Stop()
+ return nil, fmt.Errorf("failed to start volume server %d: %v", i, err)
+ }
+ cluster.volumeServers = append(cluster.volumeServers, volumeCmd)
+ }
+
+ // Wait for volume servers to register with master
+ // Multi-disk servers may take longer to initialize
+ time.Sleep(8 * time.Second)
+
+ return cluster, nil
+}
+
+// uploadTestDataToMaster uploads test data to a specific master address
+func uploadTestDataToMaster(data []byte, masterAddress string) (needle.VolumeId, error) {
+ assignResult, err := operation.Assign(context.Background(), func(ctx context.Context) pb.ServerAddress {
+ return pb.ServerAddress(masterAddress)
+ }, grpc.WithInsecure(), &operation.VolumeAssignRequest{
+ Count: 1,
+ Collection: "test",
+ Replication: "000",
+ })
+ if err != nil {
+ return 0, err
+ }
+
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ return 0, err
+ }
+
+ uploadResult, err, _ := uploader.Upload(context.Background(), bytes.NewReader(data), &operation.UploadOption{
+ UploadUrl: "http://" + assignResult.Url + "/" + assignResult.Fid,
+ Filename: "testfile.txt",
+ MimeType: "text/plain",
+ })
+ if err != nil {
+ return 0, err
+ }
+
+ if uploadResult.Error != "" {
+ return 0, fmt.Errorf("upload error: %s", uploadResult.Error)
+ }
+
+ fid, err := needle.ParseFileIdFromString(assignResult.Fid)
+ if err != nil {
+ return 0, err
+ }
+
+ return fid.VolumeId, nil
+}
+
+// countShardsPerDisk counts EC shards on each disk of each server
+// Returns map: "serverN" -> map[diskId]shardCount
+func countShardsPerDisk(testDir string, volumeId uint32) map[string]map[int]int {
+ result := make(map[string]map[int]int)
+
+ const numServers = 3
+ const disksPerServer = 4
+
+ for server := 0; server < numServers; server++ {
+ serverKey := fmt.Sprintf("server%d", server)
+ result[serverKey] = make(map[int]int)
+
+ for disk := 0; disk < disksPerServer; disk++ {
+ diskDir := filepath.Join(testDir, fmt.Sprintf("server%d_disk%d", server, disk))
+ count, err := countECShardFiles(diskDir, volumeId)
+ if err == nil && count > 0 {
+ result[serverKey][disk] = count
+ }
+ }
+ }
+
+ return result
+}
+
+// calculateDiskShardVariance measures how evenly shards are distributed across disks
+// Lower variance means better distribution
+func calculateDiskShardVariance(distribution map[string]map[int]int) float64 {
+ var counts []float64
+
+ for _, disks := range distribution {
+ for _, count := range disks {
+ if count > 0 {
+ counts = append(counts, float64(count))
+ }
+ }
+ }
+
+ if len(counts) == 0 {
+ return 0
+ }
+
+ // Calculate mean
+ mean := 0.0
+ for _, c := range counts {
+ mean += c
+ }
+ mean /= float64(len(counts))
+
+ // Calculate variance
+ variance := 0.0
+ for _, c := range counts {
+ variance += (c - mean) * (c - mean)
+ }
+
+ return math.Sqrt(variance / float64(len(counts)))
+}