aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/foundationdb/foundationdb_store_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/foundationdb/foundationdb_store_test.go')
-rw-r--r--weed/filer/foundationdb/foundationdb_store_test.go545
1 files changed, 545 insertions, 0 deletions
diff --git a/weed/filer/foundationdb/foundationdb_store_test.go b/weed/filer/foundationdb/foundationdb_store_test.go
new file mode 100644
index 000000000..215c98c76
--- /dev/null
+++ b/weed/filer/foundationdb/foundationdb_store_test.go
@@ -0,0 +1,545 @@
+//go:build foundationdb
+// +build foundationdb
+
+package foundationdb
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+func TestFoundationDBStore_Initialize(t *testing.T) {
+ // Test with default configuration
+ config := util.GetViper()
+ config.Set("foundationdb.cluster_file", getTestClusterFile())
+ config.Set("foundationdb.api_version", 740)
+
+ store := &FoundationDBStore{}
+ err := store.Initialize(config, "foundationdb.")
+ if err != nil {
+ t.Skip("FoundationDB not available for testing, skipping")
+ }
+
+ defer store.Shutdown()
+
+ if store.GetName() != "foundationdb" {
+ t.Errorf("Expected store name 'foundationdb', got '%s'", store.GetName())
+ }
+
+ if store.directoryPrefix != "seaweedfs" {
+ t.Errorf("Expected default directory prefix 'seaweedfs', got '%s'", store.directoryPrefix)
+ }
+}
+
+func TestFoundationDBStore_InitializeWithCustomConfig(t *testing.T) {
+ config := util.GetViper()
+ config.Set("foundationdb.cluster_file", getTestClusterFile())
+ config.Set("foundationdb.api_version", 740)
+ config.Set("foundationdb.timeout", "10s")
+ config.Set("foundationdb.max_retry_delay", "2s")
+ config.Set("foundationdb.directory_prefix", "custom_prefix")
+
+ store := &FoundationDBStore{}
+ err := store.Initialize(config, "foundationdb.")
+ if err != nil {
+ t.Skip("FoundationDB not available for testing, skipping")
+ }
+
+ defer store.Shutdown()
+
+ if store.directoryPrefix != "custom_prefix" {
+ t.Errorf("Expected custom directory prefix 'custom_prefix', got '%s'", store.directoryPrefix)
+ }
+
+ if store.timeout != 10*time.Second {
+ t.Errorf("Expected timeout 10s, got %v", store.timeout)
+ }
+
+ if store.maxRetryDelay != 2*time.Second {
+ t.Errorf("Expected max retry delay 2s, got %v", store.maxRetryDelay)
+ }
+}
+
+func TestFoundationDBStore_InitializeInvalidConfig(t *testing.T) {
+ tests := []struct {
+ name string
+ config map[string]interface{}
+ errorMsg string
+ }{
+ {
+ name: "invalid timeout",
+ config: map[string]interface{}{
+ "foundationdb.cluster_file": getTestClusterFile(),
+ "foundationdb.api_version": 740,
+ "foundationdb.timeout": "invalid",
+ "foundationdb.directory_prefix": "test",
+ },
+ errorMsg: "invalid timeout duration",
+ },
+ {
+ name: "invalid max_retry_delay",
+ config: map[string]interface{}{
+ "foundationdb.cluster_file": getTestClusterFile(),
+ "foundationdb.api_version": 740,
+ "foundationdb.timeout": "5s",
+ "foundationdb.max_retry_delay": "invalid",
+ "foundationdb.directory_prefix": "test",
+ },
+ errorMsg: "invalid max_retry_delay duration",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ config := util.GetViper()
+ for key, value := range tt.config {
+ config.Set(key, value)
+ }
+
+ store := &FoundationDBStore{}
+ err := store.Initialize(config, "foundationdb.")
+ if err == nil {
+ store.Shutdown()
+ t.Errorf("Expected initialization to fail, but it succeeded")
+ } else if !containsString(err.Error(), tt.errorMsg) {
+ t.Errorf("Expected error message to contain '%s', got '%s'", tt.errorMsg, err.Error())
+ }
+ })
+ }
+}
+
+func TestFoundationDBStore_KeyGeneration(t *testing.T) {
+ store := &FoundationDBStore{}
+ err := store.initialize(getTestClusterFile(), 740)
+ if err != nil {
+ t.Skip("FoundationDB not available for testing, skipping")
+ }
+ defer store.Shutdown()
+
+ // Test key generation for different paths
+ testCases := []struct {
+ dirPath string
+ fileName string
+ desc string
+ }{
+ {"/", "file.txt", "root directory file"},
+ {"/dir", "file.txt", "subdirectory file"},
+ {"/deep/nested/dir", "file.txt", "deep nested file"},
+ {"/dir with spaces", "file with spaces.txt", "paths with spaces"},
+ {"/unicode/测试", "文件.txt", "unicode paths"},
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ key := store.genKey(tc.dirPath, tc.fileName)
+ if len(key) == 0 {
+ t.Error("Generated key should not be empty")
+ }
+
+ // Test that we can extract filename back
+ // Note: This tests internal consistency
+ if tc.fileName != "" {
+ extractedName, err := store.extractFileName(key)
+ if err != nil {
+ t.Errorf("extractFileName failed: %v", err)
+ }
+ if extractedName != tc.fileName {
+ t.Errorf("Expected extracted filename '%s', got '%s'", tc.fileName, extractedName)
+ }
+ }
+ })
+ }
+}
+
+func TestFoundationDBStore_ErrorHandling(t *testing.T) {
+ store := &FoundationDBStore{}
+ err := store.initialize(getTestClusterFile(), 740)
+ if err != nil {
+ t.Skip("FoundationDB not available for testing, skipping")
+ }
+ defer store.Shutdown()
+
+ ctx := context.Background()
+
+ // Test FindEntry with non-existent path
+ _, err = store.FindEntry(ctx, "/non/existent/file.txt")
+ if err == nil {
+ t.Error("Expected error for non-existent file")
+ }
+ if !errors.Is(err, filer_pb.ErrNotFound) {
+ t.Errorf("Expected ErrNotFound, got %v", err)
+ }
+
+ // Test KvGet with non-existent key
+ _, err = store.KvGet(ctx, []byte("non_existent_key"))
+ if err == nil {
+ t.Error("Expected error for non-existent key")
+ }
+ if !errors.Is(err, filer.ErrKvNotFound) {
+ t.Errorf("Expected ErrKvNotFound, got %v", err)
+ }
+
+ // Test transaction state errors
+ err = store.CommitTransaction(ctx)
+ if err == nil {
+ t.Error("Expected error when committing without active transaction")
+ }
+
+ err = store.RollbackTransaction(ctx)
+ if err == nil {
+ t.Error("Expected error when rolling back without active transaction")
+ }
+}
+
+func TestFoundationDBStore_TransactionState(t *testing.T) {
+ store := &FoundationDBStore{}
+ err := store.initialize(getTestClusterFile(), 740)
+ if err != nil {
+ t.Skip("FoundationDB not available for testing, skipping")
+ }
+ defer store.Shutdown()
+
+ ctx := context.Background()
+
+ // Test double transaction begin
+ txCtx, err := store.BeginTransaction(ctx)
+ if err != nil {
+ t.Fatalf("BeginTransaction failed: %v", err)
+ }
+
+ // Try to begin another transaction on the same context
+ _, err = store.BeginTransaction(txCtx)
+ if err == nil {
+ t.Error("Expected error when beginning transaction while one is active")
+ }
+
+ // Commit the transaction
+ err = store.CommitTransaction(txCtx)
+ if err != nil {
+ t.Fatalf("CommitTransaction failed: %v", err)
+ }
+
+ // Now should be able to begin a new transaction
+ txCtx2, err := store.BeginTransaction(ctx)
+ if err != nil {
+ t.Fatalf("BeginTransaction after commit failed: %v", err)
+ }
+
+ // Rollback this time
+ err = store.RollbackTransaction(txCtx2)
+ if err != nil {
+ t.Fatalf("RollbackTransaction failed: %v", err)
+ }
+}
+
+// Benchmark tests
+func BenchmarkFoundationDBStore_InsertEntry(b *testing.B) {
+ store := createBenchmarkStore(b)
+ defer store.Shutdown()
+
+ ctx := context.Background()
+ entry := &filer.Entry{
+ FullPath: "/benchmark/file.txt",
+ Attr: filer.Attr{
+ Mode: 0644,
+ Uid: 1000,
+ Gid: 1000,
+ Mtime: time.Now(),
+ },
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ entry.FullPath = util.NewFullPath("/benchmark", fmt.Sprintf("%x", uint64(i))+".txt")
+ err := store.InsertEntry(ctx, entry)
+ if err != nil {
+ b.Fatalf("InsertEntry failed: %v", err)
+ }
+ }
+}
+
+func BenchmarkFoundationDBStore_FindEntry(b *testing.B) {
+ store := createBenchmarkStore(b)
+ defer store.Shutdown()
+
+ ctx := context.Background()
+
+ // Pre-populate with test entries
+ numEntries := 1000
+ for i := 0; i < numEntries; i++ {
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath("/benchmark", fmt.Sprintf("%x", uint64(i))+".txt"),
+ Attr: filer.Attr{
+ Mode: 0644,
+ Uid: 1000,
+ Gid: 1000,
+ Mtime: time.Now(),
+ },
+ }
+ err := store.InsertEntry(ctx, entry)
+ if err != nil {
+ b.Fatalf("Pre-population InsertEntry failed: %v", err)
+ }
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ path := util.NewFullPath("/benchmark", fmt.Sprintf("%x", uint64(i%numEntries))+".txt")
+ _, err := store.FindEntry(ctx, path)
+ if err != nil {
+ b.Fatalf("FindEntry failed: %v", err)
+ }
+ }
+}
+
+func BenchmarkFoundationDBStore_KvOperations(b *testing.B) {
+ store := createBenchmarkStore(b)
+ defer store.Shutdown()
+
+ ctx := context.Background()
+ key := []byte("benchmark_key")
+ value := []byte("benchmark_value")
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ // Put
+ err := store.KvPut(ctx, key, value)
+ if err != nil {
+ b.Fatalf("KvPut failed: %v", err)
+ }
+
+ // Get
+ _, err = store.KvGet(ctx, key)
+ if err != nil {
+ b.Fatalf("KvGet failed: %v", err)
+ }
+ }
+}
+
+// Helper functions
+func getTestClusterFile() string {
+ clusterFile := os.Getenv("FDB_CLUSTER_FILE")
+ if clusterFile == "" {
+ clusterFile = "/var/fdb/config/fdb.cluster"
+ }
+ return clusterFile
+}
+
+func createBenchmarkStore(b *testing.B) *FoundationDBStore {
+ clusterFile := getTestClusterFile()
+ if _, err := os.Stat(clusterFile); os.IsNotExist(err) {
+ b.Skip("FoundationDB cluster file not found, skipping benchmark")
+ }
+
+ store := &FoundationDBStore{}
+ err := store.initialize(clusterFile, 740)
+ if err != nil {
+ b.Skipf("Failed to initialize FoundationDB store: %v", err)
+ }
+
+ return store
+}
+
+func getTestStore(t *testing.T) *FoundationDBStore {
+ t.Helper()
+
+ clusterFile := getTestClusterFile()
+ if _, err := os.Stat(clusterFile); os.IsNotExist(err) {
+ t.Skip("FoundationDB cluster file not found, skipping test")
+ }
+
+ store := &FoundationDBStore{}
+ if err := store.initialize(clusterFile, 740); err != nil {
+ t.Skipf("Failed to initialize FoundationDB store: %v", err)
+ }
+
+ return store
+}
+
+func containsString(s, substr string) bool {
+ return strings.Contains(s, substr)
+}
+
+func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) {
+ // This test validates that DeleteFolderChildren always uses batching
+ // to safely handle large directories, regardless of transaction context
+
+ store := getTestStore(t)
+ defer store.Shutdown()
+
+ ctx := context.Background()
+ testDir := util.FullPath(fmt.Sprintf("/test_batch_delete_%d", time.Now().UnixNano()))
+
+ // Create a large directory (> 100 entries to trigger batching)
+ const NUM_ENTRIES = 250
+
+ t.Logf("Creating %d test entries...", NUM_ENTRIES)
+ for i := 0; i < NUM_ENTRIES; i++ {
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(testDir), fmt.Sprintf("file_%04d.txt", i)),
+ Attr: filer.Attr{
+ Mode: 0644,
+ Uid: 1000,
+ Gid: 1000,
+ Mtime: time.Now(),
+ },
+ }
+ if err := store.InsertEntry(ctx, entry); err != nil {
+ t.Fatalf("Failed to insert test entry %d: %v", i, err)
+ }
+ }
+
+ // Test 1: DeleteFolderChildren outside transaction should succeed
+ t.Run("OutsideTransaction", func(t *testing.T) {
+ testDir1 := util.FullPath(fmt.Sprintf("/test_batch_1_%d", time.Now().UnixNano()))
+
+ // Create entries
+ for i := 0; i < NUM_ENTRIES; i++ {
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(testDir1), fmt.Sprintf("file_%04d.txt", i)),
+ Attr: filer.Attr{
+ Mode: 0644,
+ Uid: 1000,
+ Gid: 1000,
+ Mtime: time.Now(),
+ },
+ }
+ store.InsertEntry(ctx, entry)
+ }
+
+ // Delete with batching
+ err := store.DeleteFolderChildren(ctx, testDir1)
+ if err != nil {
+ t.Errorf("DeleteFolderChildren outside transaction should succeed, got error: %v", err)
+ }
+
+ // Verify all entries deleted
+ var count int
+ store.ListDirectoryEntries(ctx, testDir1, "", true, 1000, func(entry *filer.Entry) bool {
+ count++
+ return true
+ })
+ if count != 0 {
+ t.Errorf("Expected all entries to be deleted, found %d", count)
+ }
+ })
+
+ // Test 2: DeleteFolderChildren with transaction context - uses its own batched transactions
+ t.Run("WithTransactionContext", func(t *testing.T) {
+ testDir2 := util.FullPath(fmt.Sprintf("/test_batch_2_%d", time.Now().UnixNano()))
+
+ // Create entries
+ for i := 0; i < NUM_ENTRIES; i++ {
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(testDir2), fmt.Sprintf("file_%04d.txt", i)),
+ Attr: filer.Attr{
+ Mode: 0644,
+ Uid: 1000,
+ Gid: 1000,
+ Mtime: time.Now(),
+ },
+ }
+ store.InsertEntry(ctx, entry)
+ }
+
+ // Start a transaction (DeleteFolderChildren will ignore it and use its own batching)
+ txCtx, err := store.BeginTransaction(ctx)
+ if err != nil {
+ t.Fatalf("BeginTransaction failed: %v", err)
+ }
+
+ // Delete large directory - should succeed with batching
+ err = store.DeleteFolderChildren(txCtx, testDir2)
+ if err != nil {
+ t.Errorf("DeleteFolderChildren should succeed with batching even when transaction context present, got: %v", err)
+ }
+
+ // Rollback transaction (DeleteFolderChildren used its own transactions, so this doesn't affect deletions)
+ store.RollbackTransaction(txCtx)
+
+ // Verify entries are still deleted (because DeleteFolderChildren managed its own transactions)
+ var count int
+ store.ListDirectoryEntries(ctx, testDir2, "", true, 1000, func(entry *filer.Entry) bool {
+ count++
+ return true
+ })
+
+ if count != 0 {
+ t.Errorf("Expected all entries to be deleted, found %d (DeleteFolderChildren uses its own transactions)", count)
+ }
+ })
+
+ // Test 3: Nested directories with batching
+ t.Run("NestedDirectories", func(t *testing.T) {
+ testDir3 := util.FullPath(fmt.Sprintf("/test_batch_3_%d", time.Now().UnixNano()))
+
+ // Create nested structure
+ for i := 0; i < 50; i++ {
+ // Files in root
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(testDir3), fmt.Sprintf("file_%02d.txt", i)),
+ Attr: filer.Attr{
+ Mode: 0644,
+ Uid: 1000,
+ Gid: 1000,
+ Mtime: time.Now(),
+ },
+ }
+ store.InsertEntry(ctx, entry)
+
+ // Subdirectory
+ subDir := &filer.Entry{
+ FullPath: util.NewFullPath(string(testDir3), fmt.Sprintf("dir_%02d", i)),
+ Attr: filer.Attr{
+ Mode: 0755 | os.ModeDir,
+ Uid: 1000,
+ Gid: 1000,
+ Mtime: time.Now(),
+ },
+ }
+ store.InsertEntry(ctx, subDir)
+
+ // Files in subdirectory
+ for j := 0; j < 3; j++ {
+ subEntry := &filer.Entry{
+ FullPath: util.NewFullPath(string(testDir3)+"/"+fmt.Sprintf("dir_%02d", i), fmt.Sprintf("subfile_%02d.txt", j)),
+ Attr: filer.Attr{
+ Mode: 0644,
+ Uid: 1000,
+ Gid: 1000,
+ Mtime: time.Now(),
+ },
+ }
+ store.InsertEntry(ctx, subEntry)
+ }
+ }
+
+ // Delete all with batching
+ err := store.DeleteFolderChildren(ctx, testDir3)
+ if err != nil {
+ t.Errorf("DeleteFolderChildren should handle nested directories, got: %v", err)
+ }
+
+ // Verify all deleted
+ var count int
+ store.ListDirectoryEntries(ctx, testDir3, "", true, 1000, func(entry *filer.Entry) bool {
+ count++
+ return true
+ })
+ if count != 0 {
+ t.Errorf("Expected all nested entries to be deleted, found %d", count)
+ }
+ })
+
+ // Cleanup
+ store.DeleteFolderChildren(ctx, testDir)
+}