aboutsummaryrefslogtreecommitdiff
path: root/test/foundationdb/foundationdb_concurrent_test.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-11-19 20:06:57 -0800
committerGitHub <noreply@github.com>2025-11-19 20:06:57 -0800
commitc6b6ea40e61b79722e1a539f814933898b9780a7 (patch)
tree3b09cd214ed6420881412ba9570122216d8df5bd /test/foundationdb/foundationdb_concurrent_test.go
parent8be9e258fc7d1110421aaee451945668cafa23e7 (diff)
downloadseaweedfs-c6b6ea40e61b79722e1a539f814933898b9780a7.tar.xz
seaweedfs-c6b6ea40e61b79722e1a539f814933898b9780a7.zip
filer store: add foundationdb (#7178)
* add foundationdb * Update foundationdb_store.go * fix * apply the patch * avoid panic on error * address comments * remove extra data * address comments * adds more debug messages * fix range listing * delete with prefix range; list with right start key * fix docker files * use the more idiomatic FoundationDB KeySelectors * address comments * proper errors * fix API versions * more efficient * recursive deletion * clean up * clean up * pagination, one transaction for deletion * error checking * Use fdb.Strinc() to compute the lexicographically next string and create a proper range * fix docker * Update README.md * delete in batches * delete in batches * fix build * add foundationdb build * Updated FoundationDB Version * Fixed glibc/musl Incompatibility (Alpine → Debian) * Update container_foundationdb_version.yml * build SeaweedFS * build tag * address comments * separate transaction * address comments * fix build * empty vs no data * fixes * add go test * Install FoundationDB client libraries * nil compare
Diffstat (limited to 'test/foundationdb/foundationdb_concurrent_test.go')
-rw-r--r--test/foundationdb/foundationdb_concurrent_test.go445
1 files changed, 445 insertions, 0 deletions
diff --git a/test/foundationdb/foundationdb_concurrent_test.go b/test/foundationdb/foundationdb_concurrent_test.go
new file mode 100644
index 000000000..b0ecaf742
--- /dev/null
+++ b/test/foundationdb/foundationdb_concurrent_test.go
@@ -0,0 +1,445 @@
+//go:build foundationdb
+// +build foundationdb
+
+package foundationdb
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/filer/foundationdb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+func TestFoundationDBStore_ConcurrentInserts(t *testing.T) {
+ store := createTestStore(t)
+ defer store.Shutdown()
+
+ ctx := context.Background()
+ numGoroutines := 10
+ entriesPerGoroutine := 100
+
+ var wg sync.WaitGroup
+ errors := make(chan error, numGoroutines*entriesPerGoroutine)
+
+ // Launch concurrent insert operations
+ for g := 0; g < numGoroutines; g++ {
+ wg.Add(1)
+ go func(goroutineID int) {
+ defer wg.Done()
+
+ for i := 0; i < entriesPerGoroutine; i++ {
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath("/concurrent", fmt.Sprintf("g%d_file%d.txt", goroutineID, i)),
+ Attr: filer.Attr{
+ Mode: 0644,
+ Uid: uint32(goroutineID),
+ Gid: 1000,
+ Mtime: time.Now(),
+ },
+ }
+
+ err := store.InsertEntry(ctx, entry)
+ if err != nil {
+ errors <- fmt.Errorf("goroutine %d, entry %d: %v", goroutineID, i, err)
+ return
+ }
+ }
+ }(g)
+ }
+
+ wg.Wait()
+ close(errors)
+
+ // Check for errors
+ for err := range errors {
+ t.Errorf("Concurrent insert error: %v", err)
+ }
+
+ // Verify all entries were inserted
+ expectedTotal := numGoroutines * entriesPerGoroutine
+ actualCount := 0
+
+ _, err := store.ListDirectoryEntries(ctx, "/concurrent", "", true, 10000, func(entry *filer.Entry) bool {
+ actualCount++
+ return true
+ })
+ if err != nil {
+ t.Fatalf("ListDirectoryEntries failed: %v", err)
+ }
+
+ if actualCount != expectedTotal {
+ t.Errorf("Expected %d entries, found %d", expectedTotal, actualCount)
+ }
+}
+
+func TestFoundationDBStore_ConcurrentReadsAndWrites(t *testing.T) {
+ store := createTestStore(t)
+ defer store.Shutdown()
+
+ ctx := context.Background()
+ numReaders := 5
+ numWriters := 5
+ operationsPerGoroutine := 50
+ testFile := "/concurrent/rw_test_file.txt"
+
+ // Insert initial file
+ initialEntry := &filer.Entry{
+ FullPath: testFile,
+ Attr: filer.Attr{
+ Mode: 0644,
+ Uid: 1000,
+ Gid: 1000,
+ Mtime: time.Now(),
+ },
+ }
+ err := store.InsertEntry(ctx, initialEntry)
+ if err != nil {
+ t.Fatalf("Initial InsertEntry failed: %v", err)
+ }
+
+ var wg sync.WaitGroup
+ errors := make(chan error, (numReaders+numWriters)*operationsPerGoroutine)
+
+ // Launch reader goroutines
+ for r := 0; r < numReaders; r++ {
+ wg.Add(1)
+ go func(readerID int) {
+ defer wg.Done()
+
+ for i := 0; i < operationsPerGoroutine; i++ {
+ _, err := store.FindEntry(ctx, testFile)
+ if err != nil {
+ errors <- fmt.Errorf("reader %d, operation %d: %v", readerID, i, err)
+ return
+ }
+
+ // Small delay to allow interleaving with writes
+ time.Sleep(1 * time.Millisecond)
+ }
+ }(r)
+ }
+
+ // Launch writer goroutines
+ for w := 0; w < numWriters; w++ {
+ wg.Add(1)
+ go func(writerID int) {
+ defer wg.Done()
+
+ for i := 0; i < operationsPerGoroutine; i++ {
+ entry := &filer.Entry{
+ FullPath: testFile,
+ Attr: filer.Attr{
+ Mode: 0644,
+ Uid: uint32(writerID + 1000),
+ Gid: uint32(i),
+ Mtime: time.Now(),
+ },
+ }
+
+ err := store.UpdateEntry(ctx, entry)
+ if err != nil {
+ errors <- fmt.Errorf("writer %d, operation %d: %v", writerID, i, err)
+ return
+ }
+
+ // Small delay to allow interleaving with reads
+ time.Sleep(1 * time.Millisecond)
+ }
+ }(w)
+ }
+
+ wg.Wait()
+ close(errors)
+
+ // Check for errors
+ for err := range errors {
+ t.Errorf("Concurrent read/write error: %v", err)
+ }
+
+ // Verify final state
+ finalEntry, err := store.FindEntry(ctx, testFile)
+ if err != nil {
+ t.Fatalf("Final FindEntry failed: %v", err)
+ }
+
+ if finalEntry.FullPath != testFile {
+ t.Errorf("Expected final path %s, got %s", testFile, finalEntry.FullPath)
+ }
+}
+
+func TestFoundationDBStore_ConcurrentTransactions(t *testing.T) {
+ store := createTestStore(t)
+ defer store.Shutdown()
+
+ ctx := context.Background()
+ numTransactions := 5
+ entriesPerTransaction := 10
+
+ var wg sync.WaitGroup
+ errors := make(chan error, numTransactions)
+ successfulTx := make(chan int, numTransactions)
+
+ // Launch concurrent transactions
+ for tx := 0; tx < numTransactions; tx++ {
+ wg.Add(1)
+ go func(txID int) {
+ defer wg.Done()
+
+ // Note: FoundationDB has optimistic concurrency control
+ // Some transactions may need to retry due to conflicts
+ maxRetries := 3
+ for attempt := 0; attempt < maxRetries; attempt++ {
+ txCtx, err := store.BeginTransaction(ctx)
+ if err != nil {
+ if attempt == maxRetries-1 {
+ errors <- fmt.Errorf("tx %d: failed to begin after %d attempts: %v", txID, maxRetries, err)
+ }
+ time.Sleep(time.Duration(attempt+1) * 10 * time.Millisecond)
+ continue
+ }
+
+ // Insert multiple entries in transaction
+ success := true
+ for i := 0; i < entriesPerTransaction; i++ {
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath("/transactions", fmt.Sprintf("tx%d_file%d.txt", txID, i)),
+ Attr: filer.Attr{
+ Mode: 0644,
+ Uid: uint32(txID),
+ Gid: uint32(i),
+ Mtime: time.Now(),
+ },
+ }
+
+ err = store.InsertEntry(txCtx, entry)
+ if err != nil {
+ errors <- fmt.Errorf("tx %d, entry %d: insert failed: %v", txID, i, err)
+ store.RollbackTransaction(txCtx)
+ success = false
+ break
+ }
+ }
+
+ if success {
+ err = store.CommitTransaction(txCtx)
+ if err != nil {
+ if attempt == maxRetries-1 {
+ errors <- fmt.Errorf("tx %d: commit failed after %d attempts: %v", txID, maxRetries, err)
+ }
+ time.Sleep(time.Duration(attempt+1) * 10 * time.Millisecond)
+ continue
+ }
+ successfulTx <- txID
+ return
+ }
+ }
+ }(tx)
+ }
+
+ wg.Wait()
+ close(errors)
+ close(successfulTx)
+
+ // Check for errors
+ for err := range errors {
+ t.Errorf("Concurrent transaction error: %v", err)
+ }
+
+ // Count successful transactions
+ successCount := 0
+ successfulTxIDs := make([]int, 0)
+ for txID := range successfulTx {
+ successCount++
+ successfulTxIDs = append(successfulTxIDs, txID)
+ }
+
+ t.Logf("Successful transactions: %d/%d (IDs: %v)", successCount, numTransactions, successfulTxIDs)
+
+ // Verify entries from successful transactions
+ totalExpectedEntries := successCount * entriesPerTransaction
+ actualCount := 0
+
+ _, err := store.ListDirectoryEntries(ctx, "/transactions", "", true, 10000, func(entry *filer.Entry) bool {
+ actualCount++
+ return true
+ })
+ if err != nil {
+ t.Fatalf("ListDirectoryEntries failed: %v", err)
+ }
+
+ if actualCount != totalExpectedEntries {
+ t.Errorf("Expected %d entries from successful transactions, found %d", totalExpectedEntries, actualCount)
+ }
+}
+
+func TestFoundationDBStore_ConcurrentDirectoryOperations(t *testing.T) {
+ store := createTestStore(t)
+ defer store.Shutdown()
+
+ ctx := context.Background()
+ numWorkers := 10
+ directoriesPerWorker := 20
+ filesPerDirectory := 5
+
+ var wg sync.WaitGroup
+ errors := make(chan error, numWorkers*directoriesPerWorker*filesPerDirectory)
+
+ // Launch workers that create directories with files
+ for w := 0; w < numWorkers; w++ {
+ wg.Add(1)
+ go func(workerID int) {
+ defer wg.Done()
+
+ for d := 0; d < directoriesPerWorker; d++ {
+ dirPath := fmt.Sprintf("/worker%d/dir%d", workerID, d)
+
+ // Create files in directory
+ for f := 0; f < filesPerDirectory; f++ {
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(dirPath, fmt.Sprintf("file%d.txt", f)),
+ Attr: filer.Attr{
+ Mode: 0644,
+ Uid: uint32(workerID),
+ Gid: uint32(d),
+ Mtime: time.Now(),
+ },
+ }
+
+ err := store.InsertEntry(ctx, entry)
+ if err != nil {
+ errors <- fmt.Errorf("worker %d, dir %d, file %d: %v", workerID, d, f, err)
+ return
+ }
+ }
+ }
+ }(w)
+ }
+
+ wg.Wait()
+ close(errors)
+
+ // Check for errors
+ for err := range errors {
+ t.Errorf("Concurrent directory operation error: %v", err)
+ }
+
+ // Verify directory structure
+ for w := 0; w < numWorkers; w++ {
+ for d := 0; d < directoriesPerWorker; d++ {
+ dirPath := fmt.Sprintf("/worker%d/dir%d", w, d)
+
+ fileCount := 0
+ _, err := store.ListDirectoryEntries(ctx, dirPath, "", true, 1000, func(entry *filer.Entry) bool {
+ fileCount++
+ return true
+ })
+ if err != nil {
+ t.Errorf("ListDirectoryEntries failed for %s: %v", dirPath, err)
+ continue
+ }
+
+ if fileCount != filesPerDirectory {
+ t.Errorf("Expected %d files in %s, found %d", filesPerDirectory, dirPath, fileCount)
+ }
+ }
+ }
+}
+
+func TestFoundationDBStore_ConcurrentKVOperations(t *testing.T) {
+ store := createTestStore(t)
+ defer store.Shutdown()
+
+ ctx := context.Background()
+ numWorkers := 8
+ operationsPerWorker := 100
+
+ var wg sync.WaitGroup
+ errors := make(chan error, numWorkers*operationsPerWorker)
+
+ // Launch workers performing KV operations
+ for w := 0; w < numWorkers; w++ {
+ wg.Add(1)
+ go func(workerID int) {
+ defer wg.Done()
+
+ for i := 0; i < operationsPerWorker; i++ {
+ key := []byte(fmt.Sprintf("worker%d_key%d", workerID, i))
+ value := []byte(fmt.Sprintf("worker%d_value%d_timestamp%d", workerID, i, time.Now().UnixNano()))
+
+ // Put operation
+ err := store.KvPut(ctx, key, value)
+ if err != nil {
+ errors <- fmt.Errorf("worker %d, operation %d: KvPut failed: %v", workerID, i, err)
+ continue
+ }
+
+ // Get operation
+ retrievedValue, err := store.KvGet(ctx, key)
+ if err != nil {
+ errors <- fmt.Errorf("worker %d, operation %d: KvGet failed: %v", workerID, i, err)
+ continue
+ }
+
+ if string(retrievedValue) != string(value) {
+ errors <- fmt.Errorf("worker %d, operation %d: value mismatch", workerID, i)
+ continue
+ }
+
+ // Delete operation (for some keys)
+ if i%5 == 0 {
+ err = store.KvDelete(ctx, key)
+ if err != nil {
+ errors <- fmt.Errorf("worker %d, operation %d: KvDelete failed: %v", workerID, i, err)
+ }
+ }
+ }
+ }(w)
+ }
+
+ wg.Wait()
+ close(errors)
+
+ // Check for errors
+ errorCount := 0
+ for err := range errors {
+ t.Errorf("Concurrent KV operation error: %v", err)
+ errorCount++
+ }
+
+ if errorCount > 0 {
+ t.Errorf("Total errors in concurrent KV operations: %d", errorCount)
+ }
+}
+
+func createTestStore(t *testing.T) *foundationdb.FoundationDBStore {
+ // Skip test if FoundationDB cluster file doesn't exist
+ clusterFile := os.Getenv("FDB_CLUSTER_FILE")
+ if clusterFile == "" {
+ clusterFile = "/var/fdb/config/fdb.cluster"
+ }
+
+ if _, err := os.Stat(clusterFile); os.IsNotExist(err) {
+ t.Skip("FoundationDB cluster file not found, skipping test")
+ }
+
+ config := util.GetViper()
+ config.Set("foundationdb.cluster_file", clusterFile)
+ config.Set("foundationdb.api_version", 740)
+ config.Set("foundationdb.timeout", "10s")
+ config.Set("foundationdb.max_retry_delay", "2s")
+ config.Set("foundationdb.directory_prefix", fmt.Sprintf("seaweedfs_concurrent_test_%d", time.Now().UnixNano()))
+
+ store := &foundationdb.FoundationDBStore{}
+ err := store.Initialize(config, "foundationdb.")
+ if err != nil {
+ t.Fatalf("Failed to initialize FoundationDB store: %v", err)
+ }
+
+ return store
+}