aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/foundationdb/foundationdb_store.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/foundationdb/foundationdb_store.go')
-rw-r--r--weed/filer/foundationdb/foundationdb_store.go575
1 files changed, 575 insertions, 0 deletions
diff --git a/weed/filer/foundationdb/foundationdb_store.go b/weed/filer/foundationdb/foundationdb_store.go
new file mode 100644
index 000000000..509ee4b86
--- /dev/null
+++ b/weed/filer/foundationdb/foundationdb_store.go
@@ -0,0 +1,575 @@
+//go:build foundationdb
+// +build foundationdb
+
+// Package foundationdb provides a filer store implementation using FoundationDB as the backend.
+//
+// IMPORTANT DESIGN NOTE - DeleteFolderChildren and Transaction Limits:
+//
+// FoundationDB imposes strict transaction limits:
+// - Maximum transaction size: 10MB
+// - Maximum transaction duration: 5 seconds
+//
+// The DeleteFolderChildren operation always uses batched deletion with multiple small transactions
+// to safely handle directories of any size. Even if called within an existing transaction context,
+// it will create its own batch transactions to avoid exceeding FDB limits.
+//
+// This means DeleteFolderChildren is NOT atomic with respect to an outer transaction - it manages
+// its own transaction boundaries for safety and reliability.
+
+package foundationdb
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/apple/foundationdb/bindings/go/src/fdb"
+ "github.com/apple/foundationdb/bindings/go/src/fdb/directory"
+ "github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+const (
+ // FoundationDB transaction size limit is 10MB
+ FDB_TRANSACTION_SIZE_LIMIT = 10 * 1024 * 1024
+ // Maximum number of entries to return in a single directory listing
+ // Large batches can cause transaction timeouts and increase memory pressure
+ MAX_DIRECTORY_LIST_LIMIT = 1000
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &FoundationDBStore{})
+}
+
+type FoundationDBStore struct {
+ database fdb.Database
+ seaweedfsDir directory.DirectorySubspace
+ kvDir directory.DirectorySubspace
+ directoryPrefix string
+ timeout time.Duration
+ maxRetryDelay time.Duration
+}
+
+// Context key type for storing transactions
+type contextKey string
+
+const transactionKey contextKey = "fdb_transaction"
+
+// Helper functions for context-scoped transactions
+func (store *FoundationDBStore) getTransactionFromContext(ctx context.Context) (fdb.Transaction, bool) {
+ val := ctx.Value(transactionKey)
+ if val == nil {
+ var emptyTx fdb.Transaction
+ return emptyTx, false
+ }
+ if tx, ok := val.(fdb.Transaction); ok {
+ return tx, true
+ }
+ var emptyTx fdb.Transaction
+ return emptyTx, false
+}
+
+func (store *FoundationDBStore) setTransactionInContext(ctx context.Context, tx fdb.Transaction) context.Context {
+ return context.WithValue(ctx, transactionKey, tx)
+}
+
+func (store *FoundationDBStore) GetName() string {
+ return "foundationdb"
+}
+
+func (store *FoundationDBStore) Initialize(configuration util.Configuration, prefix string) error {
+ // Set default configuration values
+ configuration.SetDefault(prefix+"cluster_file", "/etc/foundationdb/fdb.cluster")
+ configuration.SetDefault(prefix+"api_version", 740)
+ configuration.SetDefault(prefix+"timeout", "5s")
+ configuration.SetDefault(prefix+"max_retry_delay", "1s")
+ configuration.SetDefault(prefix+"directory_prefix", "seaweedfs")
+
+ clusterFile := configuration.GetString(prefix + "cluster_file")
+ apiVersion := configuration.GetInt(prefix + "api_version")
+ timeoutStr := configuration.GetString(prefix + "timeout")
+ maxRetryDelayStr := configuration.GetString(prefix + "max_retry_delay")
+ store.directoryPrefix = configuration.GetString(prefix + "directory_prefix")
+
+ // Parse timeout values
+ var err error
+ store.timeout, err = time.ParseDuration(timeoutStr)
+ if err != nil {
+ return fmt.Errorf("invalid timeout duration %s: %w", timeoutStr, err)
+ }
+
+ store.maxRetryDelay, err = time.ParseDuration(maxRetryDelayStr)
+ if err != nil {
+ return fmt.Errorf("invalid max_retry_delay duration %s: %w", maxRetryDelayStr, err)
+ }
+
+ return store.initialize(clusterFile, apiVersion)
+}
+
+func (store *FoundationDBStore) initialize(clusterFile string, apiVersion int) error {
+ glog.V(0).Infof("FoundationDB: connecting to cluster file: %s, API version: %d", clusterFile, apiVersion)
+
+ // Set FDB API version
+ if err := fdb.APIVersion(apiVersion); err != nil {
+ return fmt.Errorf("failed to set FoundationDB API version %d: %w", apiVersion, err)
+ }
+
+ // Open database
+ var err error
+ store.database, err = fdb.OpenDatabase(clusterFile)
+ if err != nil {
+ return fmt.Errorf("failed to open FoundationDB database: %w", err)
+ }
+
+ // Create/open seaweedfs directory
+ store.seaweedfsDir, err = directory.CreateOrOpen(store.database, []string{store.directoryPrefix}, nil)
+ if err != nil {
+ return fmt.Errorf("failed to create/open seaweedfs directory: %w", err)
+ }
+
+ // Create/open kv subdirectory for key-value operations
+ store.kvDir, err = directory.CreateOrOpen(store.database, []string{store.directoryPrefix, "kv"}, nil)
+ if err != nil {
+ return fmt.Errorf("failed to create/open kv directory: %w", err)
+ }
+
+ glog.V(0).Infof("FoundationDB store initialized successfully with directory prefix: %s", store.directoryPrefix)
+ return nil
+}
+
+func (store *FoundationDBStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ // Check if there's already a transaction in this context
+ if _, exists := store.getTransactionFromContext(ctx); exists {
+ return ctx, fmt.Errorf("transaction already in progress for this context")
+ }
+
+ // Create a new transaction
+ tx, err := store.database.CreateTransaction()
+ if err != nil {
+ return ctx, fmt.Errorf("failed to create transaction: %w", err)
+ }
+
+ // Store the transaction in context and return the new context
+ newCtx := store.setTransactionInContext(ctx, tx)
+ return newCtx, nil
+}
+
+func (store *FoundationDBStore) CommitTransaction(ctx context.Context) error {
+ // Get transaction from context
+ tx, exists := store.getTransactionFromContext(ctx)
+ if !exists {
+ return fmt.Errorf("no transaction in progress for this context")
+ }
+
+ // Commit the transaction
+ err := tx.Commit().Get()
+ if err != nil {
+ return fmt.Errorf("failed to commit transaction: %w", err)
+ }
+
+ return nil
+}
+
+func (store *FoundationDBStore) RollbackTransaction(ctx context.Context) error {
+ // Get transaction from context
+ tx, exists := store.getTransactionFromContext(ctx)
+ if !exists {
+ return fmt.Errorf("no transaction in progress for this context")
+ }
+
+ // Cancel the transaction
+ tx.Cancel()
+ return nil
+}
+
+func (store *FoundationDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
+ return store.UpdateEntry(ctx, entry)
+}
+
+func (store *FoundationDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
+ key := store.genKey(entry.DirAndName())
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %w", entry.FullPath, entry.Attr, err)
+ }
+
+ if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
+ value = util.MaybeGzipData(value)
+ }
+
+ // Check transaction size limit
+ if len(value) > FDB_TRANSACTION_SIZE_LIMIT {
+ return fmt.Errorf("entry %s exceeds FoundationDB transaction size limit (%d > %d bytes)",
+ entry.FullPath, len(value), FDB_TRANSACTION_SIZE_LIMIT)
+ }
+
+ // Check if there's a transaction in context
+ if tx, exists := store.getTransactionFromContext(ctx); exists {
+ tx.Set(key, value)
+ return nil
+ }
+
+ // Execute in a new transaction if not in an existing one
+ _, err = store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
+ tr.Set(key, value)
+ return nil, nil
+ })
+
+ if err != nil {
+ return fmt.Errorf("persisting %s: %w", entry.FullPath, err)
+ }
+
+ return nil
+}
+
+func (store *FoundationDBStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+ key := store.genKey(util.FullPath(fullpath).DirAndName())
+
+ var data []byte
+ // Check if there's a transaction in context
+ if tx, exists := store.getTransactionFromContext(ctx); exists {
+ data, err = tx.Get(key).Get()
+ } else {
+ var result interface{}
+ result, err = store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
+ return rtr.Get(key).Get()
+ })
+ if err == nil {
+ if resultBytes, ok := result.([]byte); ok {
+ data = resultBytes
+ }
+ }
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("find entry %s: %w", fullpath, err)
+ }
+
+ if data == nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %w", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *FoundationDBStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
+ key := store.genKey(util.FullPath(fullpath).DirAndName())
+
+ // Check if there's a transaction in context
+ if tx, exists := store.getTransactionFromContext(ctx); exists {
+ tx.Clear(key)
+ return nil
+ }
+
+ // Execute in a new transaction if not in an existing one
+ _, err := store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
+ tr.Clear(key)
+ return nil, nil
+ })
+
+ if err != nil {
+ return fmt.Errorf("deleting %s: %w", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *FoundationDBStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
+ // Recursively delete all entries in this directory and its subdirectories
+ // We need recursion because our key structure is tuple{dirPath, fileName}
+ // not tuple{dirPath, ...pathComponents}, so a simple prefix range won't catch subdirectories
+
+ // ALWAYS use batched deletion to safely handle directories of any size.
+ // This avoids FoundationDB's 10MB transaction size and 5s timeout limits.
+ //
+ // Note: Even if called within an existing transaction, we create our own batch transactions.
+ // This means DeleteFolderChildren is NOT atomic with an outer transaction, but it ensures
+ // reliability and prevents transaction limit violations.
+ return store.deleteFolderChildrenInBatches(ctx, fullpath)
+}
+
+// deleteFolderChildrenInBatches deletes directory contents in multiple transactions
+// to avoid hitting FoundationDB's transaction size (10MB) and time (5s) limits
+func (store *FoundationDBStore) deleteFolderChildrenInBatches(ctx context.Context, fullpath util.FullPath) error {
+ const BATCH_SIZE = 100 // Delete up to 100 entries per transaction
+
+ // Ensure listing and recursion run outside of any ambient transaction
+ // Store a sentinel nil value so getTransactionFromContext returns false
+ ctxNoTxn := context.WithValue(ctx, transactionKey, (*struct{})(nil))
+
+ for {
+ // Collect one batch of entries
+ var entriesToDelete []util.FullPath
+ var subDirectories []util.FullPath
+
+ // List entries - we'll process BATCH_SIZE at a time
+ _, err := store.ListDirectoryEntries(ctxNoTxn, fullpath, "", true, int64(BATCH_SIZE), func(entry *filer.Entry) bool {
+ entriesToDelete = append(entriesToDelete, entry.FullPath)
+ if entry.IsDirectory() {
+ subDirectories = append(subDirectories, entry.FullPath)
+ }
+ return true
+ })
+
+ if err != nil {
+ return fmt.Errorf("listing children of %s: %w", fullpath, err)
+ }
+
+ // If no entries found, we're done
+ if len(entriesToDelete) == 0 {
+ break
+ }
+
+ // Recursively delete subdirectories first (also in batches)
+ for _, subDir := range subDirectories {
+ if err := store.deleteFolderChildrenInBatches(ctxNoTxn, subDir); err != nil {
+ return err
+ }
+ }
+
+ // Delete this batch of entries in a single transaction
+ _, err = store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
+ txCtx := store.setTransactionInContext(context.Background(), tr)
+ for _, entryPath := range entriesToDelete {
+ if delErr := store.DeleteEntry(txCtx, entryPath); delErr != nil {
+ return nil, fmt.Errorf("deleting entry %s: %w", entryPath, delErr)
+ }
+ }
+ return nil, nil
+ })
+
+ if err != nil {
+ return err
+ }
+
+ // If we got fewer entries than BATCH_SIZE, we're done with this directory
+ if len(entriesToDelete) < BATCH_SIZE {
+ break
+ }
+ }
+
+ return nil
+}
+
+func (store *FoundationDBStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
+}
+
+func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ // Cap limit for optimal FoundationDB performance
+ // Large batches can cause transaction timeouts and increase memory pressure
+ if limit > MAX_DIRECTORY_LIST_LIMIT || limit <= 0 {
+ limit = MAX_DIRECTORY_LIST_LIMIT
+ }
+
+ // Get the range for the entire directory first
+ dirTuple := tuple.Tuple{string(dirPath)}
+ dirRange, err := fdb.PrefixRange(store.seaweedfsDir.Pack(dirTuple))
+ if err != nil {
+ return "", fmt.Errorf("creating prefix range for %s: %w", dirPath, err)
+ }
+
+ // Determine the key range for the scan
+ // Use FDB's range capabilities to only fetch keys matching the prefix
+ var beginKey, endKey fdb.Key
+ dirBeginConv, dirEndConv := dirRange.FDBRangeKeys()
+ dirBegin := dirBeginConv.FDBKey()
+ dirEnd := dirEndConv.FDBKey()
+
+ if prefix != "" {
+ // Build range by bracketing the filename component
+ // Start at Pack(dirPath, prefix) and end at Pack(dirPath, nextPrefix)
+ // where nextPrefix is the next lexicographic string
+ beginKey = store.seaweedfsDir.Pack(tuple.Tuple{string(dirPath), prefix})
+ endKey = dirEnd
+
+ // Use Strinc to get the next string for proper prefix range
+ if nextPrefix, strincErr := fdb.Strinc([]byte(prefix)); strincErr == nil {
+ endKey = store.seaweedfsDir.Pack(tuple.Tuple{string(dirPath), string(nextPrefix)})
+ }
+ } else {
+ // Use entire directory range
+ beginKey = dirBegin
+ endKey = dirEnd
+ }
+
+ // Determine start key and selector based on startFileName
+ var beginSelector fdb.KeySelector
+ if startFileName != "" {
+ // Start from the specified file
+ startKey := store.seaweedfsDir.Pack(tuple.Tuple{string(dirPath), startFileName})
+ if includeStartFile {
+ beginSelector = fdb.FirstGreaterOrEqual(startKey)
+ } else {
+ beginSelector = fdb.FirstGreaterThan(startKey)
+ }
+ // Ensure beginSelector is within our desired range
+ if bytes.Compare(beginSelector.Key.FDBKey(), beginKey.FDBKey()) < 0 {
+ beginSelector = fdb.FirstGreaterOrEqual(beginKey)
+ }
+ } else {
+ // Start from beginning of the range
+ beginSelector = fdb.FirstGreaterOrEqual(beginKey)
+ }
+
+ // End selector is the end of our calculated range
+ endSelector := fdb.FirstGreaterOrEqual(endKey)
+
+ var kvs []fdb.KeyValue
+ var rangeErr error
+ // Check if there's a transaction in context
+ if tx, exists := store.getTransactionFromContext(ctx); exists {
+ sr := fdb.SelectorRange{Begin: beginSelector, End: endSelector}
+ kvs, rangeErr = tx.GetRange(sr, fdb.RangeOptions{Limit: int(limit)}).GetSliceWithError()
+ if rangeErr != nil {
+ return "", fmt.Errorf("scanning %s: %w", dirPath, rangeErr)
+ }
+ } else {
+ result, err := store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
+ sr := fdb.SelectorRange{Begin: beginSelector, End: endSelector}
+ kvSlice, err := rtr.GetRange(sr, fdb.RangeOptions{Limit: int(limit)}).GetSliceWithError()
+ if err != nil {
+ return nil, err
+ }
+ return kvSlice, nil
+ })
+ if err != nil {
+ return "", fmt.Errorf("scanning %s: %w", dirPath, err)
+ }
+ var ok bool
+ kvs, ok = result.([]fdb.KeyValue)
+ if !ok {
+ return "", fmt.Errorf("unexpected type from ReadTransact: %T, expected []fdb.KeyValue", result)
+ }
+ }
+
+ for _, kv := range kvs {
+ fileName, extractErr := store.extractFileName(kv.Key)
+ if extractErr != nil {
+ glog.Warningf("list %s: failed to extract fileName from key %v: %v", dirPath, kv.Key, extractErr)
+ continue
+ }
+
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(dirPath), fileName),
+ }
+
+ if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(kv.Value)); decodeErr != nil {
+ glog.V(0).Infof("list %s : %v", entry.FullPath, decodeErr)
+ continue
+ }
+
+ if !eachEntryFunc(entry) {
+ break
+ }
+ lastFileName = fileName
+ }
+
+ return lastFileName, nil
+}
+
+// KV operations
+func (store *FoundationDBStore) KvPut(ctx context.Context, key []byte, value []byte) error {
+ fdbKey := store.kvDir.Pack(tuple.Tuple{key})
+
+ // Check if there's a transaction in context
+ if tx, exists := store.getTransactionFromContext(ctx); exists {
+ tx.Set(fdbKey, value)
+ return nil
+ }
+
+ _, err := store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
+ tr.Set(fdbKey, value)
+ return nil, nil
+ })
+
+ return err
+}
+
+func (store *FoundationDBStore) KvGet(ctx context.Context, key []byte) ([]byte, error) {
+ fdbKey := store.kvDir.Pack(tuple.Tuple{key})
+
+ var data []byte
+ var err error
+
+ // Check if there's a transaction in context
+ if tx, exists := store.getTransactionFromContext(ctx); exists {
+ data, err = tx.Get(fdbKey).Get()
+ } else {
+ var result interface{}
+ result, err = store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
+ return rtr.Get(fdbKey).Get()
+ })
+ if err == nil {
+ if resultBytes, ok := result.([]byte); ok {
+ data = resultBytes
+ }
+ }
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("kv get %s: %w", string(key), err)
+ }
+ if data == nil {
+ return nil, filer.ErrKvNotFound
+ }
+
+ return data, nil
+}
+
+func (store *FoundationDBStore) KvDelete(ctx context.Context, key []byte) error {
+ fdbKey := store.kvDir.Pack(tuple.Tuple{key})
+
+ // Check if there's a transaction in context
+ if tx, exists := store.getTransactionFromContext(ctx); exists {
+ tx.Clear(fdbKey)
+ return nil
+ }
+
+ _, err := store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
+ tr.Clear(fdbKey)
+ return nil, nil
+ })
+
+ return err
+}
+
+func (store *FoundationDBStore) Shutdown() {
+ // FoundationDB doesn't have an explicit close method for Database
+ glog.V(0).Infof("FoundationDB store shutdown")
+}
+
+// Helper functions
+func (store *FoundationDBStore) genKey(dirPath, fileName string) fdb.Key {
+ return store.seaweedfsDir.Pack(tuple.Tuple{dirPath, fileName})
+}
+
+func (store *FoundationDBStore) extractFileName(key fdb.Key) (string, error) {
+ t, err := store.seaweedfsDir.Unpack(key)
+ if err != nil {
+ return "", fmt.Errorf("unpack key %v: %w", key, err)
+ }
+ if len(t) != 2 {
+ return "", fmt.Errorf("tuple unexpected length (len=%d, expected 2) for key %v", len(t), key)
+ }
+
+ if fileName, ok := t[1].(string); ok {
+ return fileName, nil
+ }
+ return "", fmt.Errorf("second element not a string (type=%T) for key %v", t[1], key)
+}