aboutsummaryrefslogtreecommitdiff
path: root/test/fuse_integration/concurrent_operations_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'test/fuse_integration/concurrent_operations_test.go')
-rw-r--r--test/fuse_integration/concurrent_operations_test.go448
1 files changed, 448 insertions, 0 deletions
diff --git a/test/fuse_integration/concurrent_operations_test.go b/test/fuse_integration/concurrent_operations_test.go
new file mode 100644
index 000000000..7a5cdd0d3
--- /dev/null
+++ b/test/fuse_integration/concurrent_operations_test.go
@@ -0,0 +1,448 @@
+package fuse_test
+
+import (
+ "bytes"
+ "crypto/rand"
+ "fmt"
+ "os"
+ "path/filepath"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// TestConcurrentFileOperations tests concurrent file operations
+func TestConcurrentFileOperations(t *testing.T) {
+ framework := NewFuseTestFramework(t, DefaultTestConfig())
+ defer framework.Cleanup()
+
+ require.NoError(t, framework.Setup(DefaultTestConfig()))
+
+ t.Run("ConcurrentFileWrites", func(t *testing.T) {
+ testConcurrentFileWrites(t, framework)
+ })
+
+ t.Run("ConcurrentFileReads", func(t *testing.T) {
+ testConcurrentFileReads(t, framework)
+ })
+
+ t.Run("ConcurrentReadWrite", func(t *testing.T) {
+ testConcurrentReadWrite(t, framework)
+ })
+
+ t.Run("ConcurrentDirectoryOperations", func(t *testing.T) {
+ testConcurrentDirectoryOperations(t, framework)
+ })
+
+ t.Run("ConcurrentFileCreation", func(t *testing.T) {
+ testConcurrentFileCreation(t, framework)
+ })
+}
+
+// testConcurrentFileWrites tests multiple goroutines writing to different files
+func testConcurrentFileWrites(t *testing.T, framework *FuseTestFramework) {
+ numWorkers := 10
+ filesPerWorker := 5
+ var wg sync.WaitGroup
+ var mutex sync.Mutex
+ errors := make([]error, 0)
+
+ // Function to collect errors safely
+ addError := func(err error) {
+ mutex.Lock()
+ defer mutex.Unlock()
+ errors = append(errors, err)
+ }
+
+ // Start concurrent workers
+ for worker := 0; worker < numWorkers; worker++ {
+ wg.Add(1)
+ go func(workerID int) {
+ defer wg.Done()
+
+ for file := 0; file < filesPerWorker; file++ {
+ filename := fmt.Sprintf("worker_%d_file_%d.txt", workerID, file)
+ content := []byte(fmt.Sprintf("Worker %d, File %d - %s", workerID, file, time.Now().String()))
+
+ mountPath := filepath.Join(framework.GetMountPoint(), filename)
+ if err := os.WriteFile(mountPath, content, 0644); err != nil {
+ addError(fmt.Errorf("worker %d file %d: %v", workerID, file, err))
+ return
+ }
+
+ // Verify file was written correctly
+ readContent, err := os.ReadFile(mountPath)
+ if err != nil {
+ addError(fmt.Errorf("worker %d file %d read: %v", workerID, file, err))
+ return
+ }
+
+ if !bytes.Equal(content, readContent) {
+ addError(fmt.Errorf("worker %d file %d: content mismatch", workerID, file))
+ return
+ }
+ }
+ }(worker)
+ }
+
+ wg.Wait()
+
+ // Check for errors
+ require.Empty(t, errors, "Concurrent writes failed: %v", errors)
+
+ // Verify all files exist and have correct content
+ for worker := 0; worker < numWorkers; worker++ {
+ for file := 0; file < filesPerWorker; file++ {
+ filename := fmt.Sprintf("worker_%d_file_%d.txt", worker, file)
+ framework.AssertFileExists(filename)
+ }
+ }
+}
+
+// testConcurrentFileReads tests multiple goroutines reading from the same file
+func testConcurrentFileReads(t *testing.T, framework *FuseTestFramework) {
+ // Create a test file
+ filename := "concurrent_read_test.txt"
+ testData := make([]byte, 1024*1024) // 1MB
+ _, err := rand.Read(testData)
+ require.NoError(t, err)
+
+ framework.CreateTestFile(filename, testData)
+
+ numReaders := 20
+ var wg sync.WaitGroup
+ var mutex sync.Mutex
+ errors := make([]error, 0)
+
+ addError := func(err error) {
+ mutex.Lock()
+ defer mutex.Unlock()
+ errors = append(errors, err)
+ }
+
+ // Start concurrent readers
+ for reader := 0; reader < numReaders; reader++ {
+ wg.Add(1)
+ go func(readerID int) {
+ defer wg.Done()
+
+ mountPath := filepath.Join(framework.GetMountPoint(), filename)
+
+ // Read multiple times
+ for i := 0; i < 3; i++ {
+ readData, err := os.ReadFile(mountPath)
+ if err != nil {
+ addError(fmt.Errorf("reader %d iteration %d: %v", readerID, i, err))
+ return
+ }
+
+ if !bytes.Equal(testData, readData) {
+ addError(fmt.Errorf("reader %d iteration %d: data mismatch", readerID, i))
+ return
+ }
+ }
+ }(reader)
+ }
+
+ wg.Wait()
+ require.Empty(t, errors, "Concurrent reads failed: %v", errors)
+}
+
+// testConcurrentReadWrite tests simultaneous read and write operations
+func testConcurrentReadWrite(t *testing.T, framework *FuseTestFramework) {
+ filename := "concurrent_rw_test.txt"
+ initialData := bytes.Repeat([]byte("INITIAL"), 1000)
+ framework.CreateTestFile(filename, initialData)
+
+ var wg sync.WaitGroup
+ var mutex sync.Mutex
+ errors := make([]error, 0)
+
+ addError := func(err error) {
+ mutex.Lock()
+ defer mutex.Unlock()
+ errors = append(errors, err)
+ }
+
+ mountPath := filepath.Join(framework.GetMountPoint(), filename)
+
+ // Start readers
+ numReaders := 5
+ for i := 0; i < numReaders; i++ {
+ wg.Add(1)
+ go func(readerID int) {
+ defer wg.Done()
+
+ for j := 0; j < 10; j++ {
+ _, err := os.ReadFile(mountPath)
+ if err != nil {
+ addError(fmt.Errorf("reader %d: %v", readerID, err))
+ return
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+ }(i)
+ }
+
+ // Start writers
+ numWriters := 2
+ for i := 0; i < numWriters; i++ {
+ wg.Add(1)
+ go func(writerID int) {
+ defer wg.Done()
+
+ for j := 0; j < 5; j++ {
+ newData := bytes.Repeat([]byte(fmt.Sprintf("WRITER%d", writerID)), 1000)
+ err := os.WriteFile(mountPath, newData, 0644)
+ if err != nil {
+ addError(fmt.Errorf("writer %d: %v", writerID, err))
+ return
+ }
+ time.Sleep(50 * time.Millisecond)
+ }
+ }(i)
+ }
+
+ wg.Wait()
+ require.Empty(t, errors, "Concurrent read/write failed: %v", errors)
+
+ // Verify file still exists and is readable
+ framework.AssertFileExists(filename)
+}
+
+// testConcurrentDirectoryOperations tests concurrent directory operations
+func testConcurrentDirectoryOperations(t *testing.T, framework *FuseTestFramework) {
+ numWorkers := 8
+ var wg sync.WaitGroup
+ var mutex sync.Mutex
+ errors := make([]error, 0)
+
+ addError := func(err error) {
+ mutex.Lock()
+ defer mutex.Unlock()
+ errors = append(errors, err)
+ }
+
+ // Each worker creates a directory tree
+ for worker := 0; worker < numWorkers; worker++ {
+ wg.Add(1)
+ go func(workerID int) {
+ defer wg.Done()
+
+ // Create worker directory
+ workerDir := fmt.Sprintf("worker_%d", workerID)
+ mountPath := filepath.Join(framework.GetMountPoint(), workerDir)
+
+ if err := os.Mkdir(mountPath, 0755); err != nil {
+ addError(fmt.Errorf("worker %d mkdir: %v", workerID, err))
+ return
+ }
+
+ // Create subdirectories and files
+ for i := 0; i < 5; i++ {
+ subDir := filepath.Join(mountPath, fmt.Sprintf("subdir_%d", i))
+ if err := os.Mkdir(subDir, 0755); err != nil {
+ addError(fmt.Errorf("worker %d subdir %d: %v", workerID, i, err))
+ return
+ }
+
+ // Create file in subdirectory
+ testFile := filepath.Join(subDir, "test.txt")
+ content := []byte(fmt.Sprintf("Worker %d, Subdir %d", workerID, i))
+ if err := os.WriteFile(testFile, content, 0644); err != nil {
+ addError(fmt.Errorf("worker %d file %d: %v", workerID, i, err))
+ return
+ }
+ }
+ }(worker)
+ }
+
+ wg.Wait()
+ require.Empty(t, errors, "Concurrent directory operations failed: %v", errors)
+
+ // Verify all structures were created
+ for worker := 0; worker < numWorkers; worker++ {
+ workerDir := fmt.Sprintf("worker_%d", worker)
+ mountPath := filepath.Join(framework.GetMountPoint(), workerDir)
+
+ info, err := os.Stat(mountPath)
+ require.NoError(t, err)
+ assert.True(t, info.IsDir())
+
+ // Check subdirectories
+ for i := 0; i < 5; i++ {
+ subDir := filepath.Join(mountPath, fmt.Sprintf("subdir_%d", i))
+ info, err := os.Stat(subDir)
+ require.NoError(t, err)
+ assert.True(t, info.IsDir())
+
+ testFile := filepath.Join(subDir, "test.txt")
+ expectedContent := []byte(fmt.Sprintf("Worker %d, Subdir %d", worker, i))
+ actualContent, err := os.ReadFile(testFile)
+ require.NoError(t, err)
+ assert.Equal(t, expectedContent, actualContent)
+ }
+ }
+}
+
+// testConcurrentFileCreation tests concurrent creation of files in same directory
+func testConcurrentFileCreation(t *testing.T, framework *FuseTestFramework) {
+ // Create test directory
+ testDir := "concurrent_creation"
+ framework.CreateTestDir(testDir)
+
+ numWorkers := 15
+ filesPerWorker := 10
+ var wg sync.WaitGroup
+ var mutex sync.Mutex
+ errors := make([]error, 0)
+ createdFiles := make(map[string]bool)
+
+ addError := func(err error) {
+ mutex.Lock()
+ defer mutex.Unlock()
+ errors = append(errors, err)
+ }
+
+ addFile := func(filename string) {
+ mutex.Lock()
+ defer mutex.Unlock()
+ createdFiles[filename] = true
+ }
+
+ // Create files concurrently
+ for worker := 0; worker < numWorkers; worker++ {
+ wg.Add(1)
+ go func(workerID int) {
+ defer wg.Done()
+
+ for file := 0; file < filesPerWorker; file++ {
+ filename := fmt.Sprintf("file_%d_%d.txt", workerID, file)
+ relativePath := filepath.Join(testDir, filename)
+ mountPath := filepath.Join(framework.GetMountPoint(), relativePath)
+
+ content := []byte(fmt.Sprintf("Worker %d, File %d, Time: %s",
+ workerID, file, time.Now().Format(time.RFC3339Nano)))
+
+ if err := os.WriteFile(mountPath, content, 0644); err != nil {
+ addError(fmt.Errorf("worker %d file %d: %v", workerID, file, err))
+ return
+ }
+
+ addFile(filename)
+ }
+ }(worker)
+ }
+
+ wg.Wait()
+ require.Empty(t, errors, "Concurrent file creation failed: %v", errors)
+
+ // Verify all files were created
+ expectedCount := numWorkers * filesPerWorker
+ assert.Equal(t, expectedCount, len(createdFiles))
+
+ // Read directory and verify count
+ mountPath := filepath.Join(framework.GetMountPoint(), testDir)
+ entries, err := os.ReadDir(mountPath)
+ require.NoError(t, err)
+ assert.Equal(t, expectedCount, len(entries))
+
+ // Verify each file exists and has content
+ for filename := range createdFiles {
+ relativePath := filepath.Join(testDir, filename)
+ framework.AssertFileExists(relativePath)
+ }
+}
+
+// TestStressOperations tests high-load scenarios
+func TestStressOperations(t *testing.T) {
+ framework := NewFuseTestFramework(t, DefaultTestConfig())
+ defer framework.Cleanup()
+
+ require.NoError(t, framework.Setup(DefaultTestConfig()))
+
+ t.Run("HighFrequencySmallWrites", func(t *testing.T) {
+ testHighFrequencySmallWrites(t, framework)
+ })
+
+ t.Run("ManySmallFiles", func(t *testing.T) {
+ testManySmallFiles(t, framework)
+ })
+}
+
+// testHighFrequencySmallWrites tests many small writes to the same file
+func testHighFrequencySmallWrites(t *testing.T, framework *FuseTestFramework) {
+ filename := "high_freq_writes.txt"
+ mountPath := filepath.Join(framework.GetMountPoint(), filename)
+
+ // Open file for writing
+ file, err := os.OpenFile(mountPath, os.O_CREATE|os.O_WRONLY, 0644)
+ require.NoError(t, err)
+ defer file.Close()
+
+ // Perform many small writes
+ numWrites := 1000
+ writeSize := 100
+
+ for i := 0; i < numWrites; i++ {
+ data := []byte(fmt.Sprintf("Write %04d: %s\n", i, bytes.Repeat([]byte("x"), writeSize-20)))
+ _, err := file.Write(data)
+ require.NoError(t, err)
+ }
+ file.Close()
+
+ // Verify file size
+ info, err := os.Stat(mountPath)
+ require.NoError(t, err)
+ assert.Equal(t, totalSize, info.Size())
+}
+
+// testManySmallFiles tests creating many small files
+func testManySmallFiles(t *testing.T, framework *FuseTestFramework) {
+ testDir := "many_small_files"
+ framework.CreateTestDir(testDir)
+
+ numFiles := 500
+ var wg sync.WaitGroup
+ var mutex sync.Mutex
+ errors := make([]error, 0)
+
+ addError := func(err error) {
+ mutex.Lock()
+ defer mutex.Unlock()
+ errors = append(errors, err)
+ }
+
+ // Create files in batches
+ batchSize := 50
+ for batch := 0; batch < numFiles/batchSize; batch++ {
+ wg.Add(1)
+ go func(batchID int) {
+ defer wg.Done()
+
+ for i := 0; i < batchSize; i++ {
+ fileNum := batchID*batchSize + i
+ filename := filepath.Join(testDir, fmt.Sprintf("small_file_%04d.txt", fileNum))
+ content := []byte(fmt.Sprintf("File %d content", fileNum))
+
+ mountPath := filepath.Join(framework.GetMountPoint(), filename)
+ if err := os.WriteFile(mountPath, content, 0644); err != nil {
+ addError(fmt.Errorf("file %d: %v", fileNum, err))
+ return
+ }
+ }
+ }(batch)
+ }
+
+ wg.Wait()
+ require.Empty(t, errors, "Many small files creation failed: %v", errors)
+
+ // Verify directory listing
+ mountPath := filepath.Join(framework.GetMountPoint(), testDir)
+ entries, err := os.ReadDir(mountPath)
+ require.NoError(t, err)
+ assert.Equal(t, numFiles, len(entries))
+}