diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-07-16 12:43:08 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-07-16 12:43:08 -0700 |
| commit | 9982f91b4ca885f09c32d87b3c31fe76c1304e8e (patch) | |
| tree | 058cfa836389db65f489a473773f57ad1990af2b /test/fuse_integration/concurrent_operations_test.go | |
| parent | 215c5de5799f9d71a63cd385f3db143cb4886692 (diff) | |
| download | seaweedfs-9982f91b4ca885f09c32d87b3c31fe76c1304e8e.tar.xz seaweedfs-9982f91b4ca885f09c32d87b3c31fe76c1304e8e.zip | |
Add more fuse tests (#6992)
* add more tests
* move to new package
* add github action
* Update fuse-integration.yml
* Update fuse-integration.yml
* Update test/fuse_integration/README.md
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update test/fuse_integration/README.md
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update test/fuse_integration/framework.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update test/fuse_integration/README.md
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Update test/fuse_integration/README.md
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix
* Update test/fuse_integration/concurrent_operations_test.go
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
---------
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Diffstat (limited to 'test/fuse_integration/concurrent_operations_test.go')
| -rw-r--r-- | test/fuse_integration/concurrent_operations_test.go | 448 |
1 files changed, 448 insertions, 0 deletions
diff --git a/test/fuse_integration/concurrent_operations_test.go b/test/fuse_integration/concurrent_operations_test.go new file mode 100644 index 000000000..7a5cdd0d3 --- /dev/null +++ b/test/fuse_integration/concurrent_operations_test.go @@ -0,0 +1,448 @@ +package fuse_test + +import ( + "bytes" + "crypto/rand" + "fmt" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestConcurrentFileOperations tests concurrent file operations +func TestConcurrentFileOperations(t *testing.T) { + framework := NewFuseTestFramework(t, DefaultTestConfig()) + defer framework.Cleanup() + + require.NoError(t, framework.Setup(DefaultTestConfig())) + + t.Run("ConcurrentFileWrites", func(t *testing.T) { + testConcurrentFileWrites(t, framework) + }) + + t.Run("ConcurrentFileReads", func(t *testing.T) { + testConcurrentFileReads(t, framework) + }) + + t.Run("ConcurrentReadWrite", func(t *testing.T) { + testConcurrentReadWrite(t, framework) + }) + + t.Run("ConcurrentDirectoryOperations", func(t *testing.T) { + testConcurrentDirectoryOperations(t, framework) + }) + + t.Run("ConcurrentFileCreation", func(t *testing.T) { + testConcurrentFileCreation(t, framework) + }) +} + +// testConcurrentFileWrites tests multiple goroutines writing to different files +func testConcurrentFileWrites(t *testing.T, framework *FuseTestFramework) { + numWorkers := 10 + filesPerWorker := 5 + var wg sync.WaitGroup + var mutex sync.Mutex + errors := make([]error, 0) + + // Function to collect errors safely + addError := func(err error) { + mutex.Lock() + defer mutex.Unlock() + errors = append(errors, err) + } + + // Start concurrent workers + for worker := 0; worker < numWorkers; worker++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + for file := 0; file < filesPerWorker; file++ { + filename := fmt.Sprintf("worker_%d_file_%d.txt", workerID, file) + content := []byte(fmt.Sprintf("Worker %d, File %d - %s", workerID, file, time.Now().String())) + + mountPath := filepath.Join(framework.GetMountPoint(), filename) + if err := os.WriteFile(mountPath, content, 0644); err != nil { + addError(fmt.Errorf("worker %d file %d: %v", workerID, file, err)) + return + } + + // Verify file was written correctly + readContent, err := os.ReadFile(mountPath) + if err != nil { + addError(fmt.Errorf("worker %d file %d read: %v", workerID, file, err)) + return + } + + if !bytes.Equal(content, readContent) { + addError(fmt.Errorf("worker %d file %d: content mismatch", workerID, file)) + return + } + } + }(worker) + } + + wg.Wait() + + // Check for errors + require.Empty(t, errors, "Concurrent writes failed: %v", errors) + + // Verify all files exist and have correct content + for worker := 0; worker < numWorkers; worker++ { + for file := 0; file < filesPerWorker; file++ { + filename := fmt.Sprintf("worker_%d_file_%d.txt", worker, file) + framework.AssertFileExists(filename) + } + } +} + +// testConcurrentFileReads tests multiple goroutines reading from the same file +func testConcurrentFileReads(t *testing.T, framework *FuseTestFramework) { + // Create a test file + filename := "concurrent_read_test.txt" + testData := make([]byte, 1024*1024) // 1MB + _, err := rand.Read(testData) + require.NoError(t, err) + + framework.CreateTestFile(filename, testData) + + numReaders := 20 + var wg sync.WaitGroup + var mutex sync.Mutex + errors := make([]error, 0) + + addError := func(err error) { + mutex.Lock() + defer mutex.Unlock() + errors = append(errors, err) + } + + // Start concurrent readers + for reader := 0; reader < numReaders; reader++ { + wg.Add(1) + go func(readerID int) { + defer wg.Done() + + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // Read multiple times + for i := 0; i < 3; i++ { + readData, err := os.ReadFile(mountPath) + if err != nil { + addError(fmt.Errorf("reader %d iteration %d: %v", readerID, i, err)) + return + } + + if !bytes.Equal(testData, readData) { + addError(fmt.Errorf("reader %d iteration %d: data mismatch", readerID, i)) + return + } + } + }(reader) + } + + wg.Wait() + require.Empty(t, errors, "Concurrent reads failed: %v", errors) +} + +// testConcurrentReadWrite tests simultaneous read and write operations +func testConcurrentReadWrite(t *testing.T, framework *FuseTestFramework) { + filename := "concurrent_rw_test.txt" + initialData := bytes.Repeat([]byte("INITIAL"), 1000) + framework.CreateTestFile(filename, initialData) + + var wg sync.WaitGroup + var mutex sync.Mutex + errors := make([]error, 0) + + addError := func(err error) { + mutex.Lock() + defer mutex.Unlock() + errors = append(errors, err) + } + + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // Start readers + numReaders := 5 + for i := 0; i < numReaders; i++ { + wg.Add(1) + go func(readerID int) { + defer wg.Done() + + for j := 0; j < 10; j++ { + _, err := os.ReadFile(mountPath) + if err != nil { + addError(fmt.Errorf("reader %d: %v", readerID, err)) + return + } + time.Sleep(10 * time.Millisecond) + } + }(i) + } + + // Start writers + numWriters := 2 + for i := 0; i < numWriters; i++ { + wg.Add(1) + go func(writerID int) { + defer wg.Done() + + for j := 0; j < 5; j++ { + newData := bytes.Repeat([]byte(fmt.Sprintf("WRITER%d", writerID)), 1000) + err := os.WriteFile(mountPath, newData, 0644) + if err != nil { + addError(fmt.Errorf("writer %d: %v", writerID, err)) + return + } + time.Sleep(50 * time.Millisecond) + } + }(i) + } + + wg.Wait() + require.Empty(t, errors, "Concurrent read/write failed: %v", errors) + + // Verify file still exists and is readable + framework.AssertFileExists(filename) +} + +// testConcurrentDirectoryOperations tests concurrent directory operations +func testConcurrentDirectoryOperations(t *testing.T, framework *FuseTestFramework) { + numWorkers := 8 + var wg sync.WaitGroup + var mutex sync.Mutex + errors := make([]error, 0) + + addError := func(err error) { + mutex.Lock() + defer mutex.Unlock() + errors = append(errors, err) + } + + // Each worker creates a directory tree + for worker := 0; worker < numWorkers; worker++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + // Create worker directory + workerDir := fmt.Sprintf("worker_%d", workerID) + mountPath := filepath.Join(framework.GetMountPoint(), workerDir) + + if err := os.Mkdir(mountPath, 0755); err != nil { + addError(fmt.Errorf("worker %d mkdir: %v", workerID, err)) + return + } + + // Create subdirectories and files + for i := 0; i < 5; i++ { + subDir := filepath.Join(mountPath, fmt.Sprintf("subdir_%d", i)) + if err := os.Mkdir(subDir, 0755); err != nil { + addError(fmt.Errorf("worker %d subdir %d: %v", workerID, i, err)) + return + } + + // Create file in subdirectory + testFile := filepath.Join(subDir, "test.txt") + content := []byte(fmt.Sprintf("Worker %d, Subdir %d", workerID, i)) + if err := os.WriteFile(testFile, content, 0644); err != nil { + addError(fmt.Errorf("worker %d file %d: %v", workerID, i, err)) + return + } + } + }(worker) + } + + wg.Wait() + require.Empty(t, errors, "Concurrent directory operations failed: %v", errors) + + // Verify all structures were created + for worker := 0; worker < numWorkers; worker++ { + workerDir := fmt.Sprintf("worker_%d", worker) + mountPath := filepath.Join(framework.GetMountPoint(), workerDir) + + info, err := os.Stat(mountPath) + require.NoError(t, err) + assert.True(t, info.IsDir()) + + // Check subdirectories + for i := 0; i < 5; i++ { + subDir := filepath.Join(mountPath, fmt.Sprintf("subdir_%d", i)) + info, err := os.Stat(subDir) + require.NoError(t, err) + assert.True(t, info.IsDir()) + + testFile := filepath.Join(subDir, "test.txt") + expectedContent := []byte(fmt.Sprintf("Worker %d, Subdir %d", worker, i)) + actualContent, err := os.ReadFile(testFile) + require.NoError(t, err) + assert.Equal(t, expectedContent, actualContent) + } + } +} + +// testConcurrentFileCreation tests concurrent creation of files in same directory +func testConcurrentFileCreation(t *testing.T, framework *FuseTestFramework) { + // Create test directory + testDir := "concurrent_creation" + framework.CreateTestDir(testDir) + + numWorkers := 15 + filesPerWorker := 10 + var wg sync.WaitGroup + var mutex sync.Mutex + errors := make([]error, 0) + createdFiles := make(map[string]bool) + + addError := func(err error) { + mutex.Lock() + defer mutex.Unlock() + errors = append(errors, err) + } + + addFile := func(filename string) { + mutex.Lock() + defer mutex.Unlock() + createdFiles[filename] = true + } + + // Create files concurrently + for worker := 0; worker < numWorkers; worker++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + for file := 0; file < filesPerWorker; file++ { + filename := fmt.Sprintf("file_%d_%d.txt", workerID, file) + relativePath := filepath.Join(testDir, filename) + mountPath := filepath.Join(framework.GetMountPoint(), relativePath) + + content := []byte(fmt.Sprintf("Worker %d, File %d, Time: %s", + workerID, file, time.Now().Format(time.RFC3339Nano))) + + if err := os.WriteFile(mountPath, content, 0644); err != nil { + addError(fmt.Errorf("worker %d file %d: %v", workerID, file, err)) + return + } + + addFile(filename) + } + }(worker) + } + + wg.Wait() + require.Empty(t, errors, "Concurrent file creation failed: %v", errors) + + // Verify all files were created + expectedCount := numWorkers * filesPerWorker + assert.Equal(t, expectedCount, len(createdFiles)) + + // Read directory and verify count + mountPath := filepath.Join(framework.GetMountPoint(), testDir) + entries, err := os.ReadDir(mountPath) + require.NoError(t, err) + assert.Equal(t, expectedCount, len(entries)) + + // Verify each file exists and has content + for filename := range createdFiles { + relativePath := filepath.Join(testDir, filename) + framework.AssertFileExists(relativePath) + } +} + +// TestStressOperations tests high-load scenarios +func TestStressOperations(t *testing.T) { + framework := NewFuseTestFramework(t, DefaultTestConfig()) + defer framework.Cleanup() + + require.NoError(t, framework.Setup(DefaultTestConfig())) + + t.Run("HighFrequencySmallWrites", func(t *testing.T) { + testHighFrequencySmallWrites(t, framework) + }) + + t.Run("ManySmallFiles", func(t *testing.T) { + testManySmallFiles(t, framework) + }) +} + +// testHighFrequencySmallWrites tests many small writes to the same file +func testHighFrequencySmallWrites(t *testing.T, framework *FuseTestFramework) { + filename := "high_freq_writes.txt" + mountPath := filepath.Join(framework.GetMountPoint(), filename) + + // Open file for writing + file, err := os.OpenFile(mountPath, os.O_CREATE|os.O_WRONLY, 0644) + require.NoError(t, err) + defer file.Close() + + // Perform many small writes + numWrites := 1000 + writeSize := 100 + + for i := 0; i < numWrites; i++ { + data := []byte(fmt.Sprintf("Write %04d: %s\n", i, bytes.Repeat([]byte("x"), writeSize-20))) + _, err := file.Write(data) + require.NoError(t, err) + } + file.Close() + + // Verify file size + info, err := os.Stat(mountPath) + require.NoError(t, err) + assert.Equal(t, totalSize, info.Size()) +} + +// testManySmallFiles tests creating many small files +func testManySmallFiles(t *testing.T, framework *FuseTestFramework) { + testDir := "many_small_files" + framework.CreateTestDir(testDir) + + numFiles := 500 + var wg sync.WaitGroup + var mutex sync.Mutex + errors := make([]error, 0) + + addError := func(err error) { + mutex.Lock() + defer mutex.Unlock() + errors = append(errors, err) + } + + // Create files in batches + batchSize := 50 + for batch := 0; batch < numFiles/batchSize; batch++ { + wg.Add(1) + go func(batchID int) { + defer wg.Done() + + for i := 0; i < batchSize; i++ { + fileNum := batchID*batchSize + i + filename := filepath.Join(testDir, fmt.Sprintf("small_file_%04d.txt", fileNum)) + content := []byte(fmt.Sprintf("File %d content", fileNum)) + + mountPath := filepath.Join(framework.GetMountPoint(), filename) + if err := os.WriteFile(mountPath, content, 0644); err != nil { + addError(fmt.Errorf("file %d: %v", fileNum, err)) + return + } + } + }(batch) + } + + wg.Wait() + require.Empty(t, errors, "Many small files creation failed: %v", errors) + + // Verify directory listing + mountPath := filepath.Join(framework.GetMountPoint(), testDir) + entries, err := os.ReadDir(mountPath) + require.NoError(t, err) + assert.Equal(t, numFiles, len(entries)) +} |
