aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/foundationdb
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/foundationdb')
-rw-r--r--weed/filer/foundationdb/foundationdb_store.go174
1 files changed, 172 insertions, 2 deletions
diff --git a/weed/filer/foundationdb/foundationdb_store.go b/weed/filer/foundationdb/foundationdb_store.go
index 720afd7bc..852ad2701 100644
--- a/weed/filer/foundationdb/foundationdb_store.go
+++ b/weed/filer/foundationdb/foundationdb_store.go
@@ -22,6 +22,7 @@ import (
"bytes"
"context"
"fmt"
+ "sync"
"time"
"github.com/apple/foundationdb/bindings/go/src/fdb"
@@ -37,15 +38,146 @@ import (
const (
// FoundationDB transaction size limit is 10MB
FDB_TRANSACTION_SIZE_LIMIT = 10 * 1024 * 1024
+ // Safe limit for batch size (leave margin for FDB overhead)
+ FDB_BATCH_SIZE_LIMIT = 8 * 1024 * 1024
// Maximum number of entries to return in a single directory listing
// Large batches can cause transaction timeouts and increase memory pressure
MAX_DIRECTORY_LIST_LIMIT = 1000
+
+ // Write batching defaults
+ DEFAULT_BATCH_SIZE = 100
+ DEFAULT_BATCH_INTERVAL = 5 * time.Millisecond
)
func init() {
filer.Stores = append(filer.Stores, &FoundationDBStore{})
}
+// writeOp represents a pending write operation
+type writeOp struct {
+ key fdb.Key
+ value []byte // nil for delete
+ done chan error
+}
+
+// opSize returns the approximate size of an operation in bytes
+func (op *writeOp) size() int {
+ return len(op.key) + len(op.value)
+}
+
+// writeBatcher batches multiple writes into single transactions
+type writeBatcher struct {
+ store *FoundationDBStore
+ ops chan *writeOp
+ stop chan struct{}
+ wg sync.WaitGroup
+ size int
+ interval time.Duration
+}
+
+func newWriteBatcher(store *FoundationDBStore, size int, interval time.Duration) *writeBatcher {
+ b := &writeBatcher{
+ store: store,
+ ops: make(chan *writeOp, size*10),
+ stop: make(chan struct{}),
+ size: size,
+ interval: interval,
+ }
+ b.wg.Add(1)
+ go b.run()
+ return b
+}
+
+func (b *writeBatcher) run() {
+ defer b.wg.Done()
+ batch := make([]*writeOp, 0, b.size)
+ batchBytes := 0 // Track cumulative size of batch
+ timer := time.NewTimer(b.interval)
+ defer timer.Stop()
+
+ flush := func() {
+ if len(batch) == 0 {
+ return
+ }
+ _, err := b.store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
+ for _, op := range batch {
+ if op.value != nil {
+ tr.Set(op.key, op.value)
+ } else {
+ tr.Clear(op.key)
+ }
+ }
+ return nil, nil
+ })
+ for _, op := range batch {
+ if op.done != nil {
+ op.done <- err
+ close(op.done)
+ }
+ }
+ batch = batch[:0]
+ batchBytes = 0
+ }
+
+ resetTimer := func() {
+ if !timer.Stop() {
+ select {
+ case <-timer.C:
+ default:
+ }
+ }
+ timer.Reset(b.interval)
+ }
+
+ for {
+ select {
+ case op := <-b.ops:
+ batch = append(batch, op)
+ batchBytes += op.size()
+ // Flush when batch count or size limit is reached
+ if len(batch) >= b.size || batchBytes >= FDB_BATCH_SIZE_LIMIT {
+ flush()
+ resetTimer()
+ }
+ case <-timer.C:
+ flush()
+ // Timer already fired, safe to reset directly
+ timer.Reset(b.interval)
+ case <-b.stop:
+ for {
+ select {
+ case op := <-b.ops:
+ batch = append(batch, op)
+ default:
+ flush()
+ return
+ }
+ }
+ }
+ }
+}
+
+func (b *writeBatcher) submit(key fdb.Key, value []byte, wait bool) error {
+ op := &writeOp{key: key, value: value}
+ if wait {
+ op.done = make(chan error, 1)
+ }
+ select {
+ case b.ops <- op:
+ if wait {
+ return <-op.done
+ }
+ return nil
+ case <-b.stop:
+ return fmt.Errorf("batcher stopped")
+ }
+}
+
+func (b *writeBatcher) shutdown() {
+ close(b.stop)
+ b.wg.Wait()
+}
+
type FoundationDBStore struct {
database fdb.Database
seaweedfsDir directory.DirectorySubspace
@@ -53,6 +185,10 @@ type FoundationDBStore struct {
directoryPrefix string
timeout time.Duration
maxRetryDelay time.Duration
+ // Write batching
+ batcher *writeBatcher
+ batchSize int
+ batchInterval time.Duration
}
// Context key type for storing transactions
@@ -89,6 +225,8 @@ func (store *FoundationDBStore) Initialize(configuration util.Configuration, pre
configuration.SetDefault(prefix+"timeout", "5s")
configuration.SetDefault(prefix+"max_retry_delay", "1s")
configuration.SetDefault(prefix+"directory_prefix", "seaweedfs")
+ configuration.SetDefault(prefix+"batch_size", DEFAULT_BATCH_SIZE)
+ configuration.SetDefault(prefix+"batch_interval", DEFAULT_BATCH_INTERVAL.String())
clusterFile := configuration.GetString(prefix + "cluster_file")
apiVersion := configuration.GetInt(prefix + "api_version")
@@ -108,6 +246,18 @@ func (store *FoundationDBStore) Initialize(configuration util.Configuration, pre
return fmt.Errorf("invalid max_retry_delay duration %s: %w", maxRetryDelayStr, err)
}
+ // Parse batch configuration
+ store.batchSize = configuration.GetInt(prefix + "batch_size")
+ if store.batchSize <= 0 {
+ store.batchSize = DEFAULT_BATCH_SIZE
+ }
+ batchIntervalStr := configuration.GetString(prefix + "batch_interval")
+ store.batchInterval, err = time.ParseDuration(batchIntervalStr)
+ if err != nil {
+ glog.Warningf("invalid %sbatch_interval duration %q, using default %v: %v", prefix, batchIntervalStr, DEFAULT_BATCH_INTERVAL, err)
+ store.batchInterval = DEFAULT_BATCH_INTERVAL
+ }
+
return store.initialize(clusterFile, apiVersion)
}
@@ -138,6 +288,11 @@ func (store *FoundationDBStore) initialize(clusterFile string, apiVersion int) e
return fmt.Errorf("failed to create/open kv directory: %w", err)
}
+ // Start write batcher for improved throughput
+ store.batcher = newWriteBatcher(store, store.batchSize, store.batchInterval)
+ glog.V(0).Infof("FoundationDB: write batching enabled (batch_size=%d, batch_interval=%v)",
+ store.batchSize, store.batchInterval)
+
glog.V(0).Infof("FoundationDB store initialized successfully with directory prefix: %s", store.directoryPrefix)
return nil
}
@@ -215,7 +370,12 @@ func (store *FoundationDBStore) UpdateEntry(ctx context.Context, entry *filer.En
return nil
}
- // Execute in a new transaction if not in an existing one
+ // Use write batcher for better throughput
+ if store.batcher != nil {
+ return store.batcher.submit(key, value, true)
+ }
+
+ // Fallback: execute in a new transaction
_, err = store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
tr.Set(key, value)
return nil, nil
@@ -276,7 +436,12 @@ func (store *FoundationDBStore) DeleteEntry(ctx context.Context, fullpath util.F
return nil
}
- // Execute in a new transaction if not in an existing one
+ // Use write batcher for better throughput (nil value = delete)
+ if store.batcher != nil {
+ return store.batcher.submit(key, nil, true)
+ }
+
+ // Fallback: execute in a new transaction
_, err := store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
tr.Clear(key)
return nil, nil
@@ -556,6 +721,11 @@ func (store *FoundationDBStore) KvDelete(ctx context.Context, key []byte) error
}
func (store *FoundationDBStore) Shutdown() {
+ // Stop write batcher
+ if store.batcher != nil {
+ store.batcher.shutdown()
+ store.batcher = nil
+ }
// FoundationDB doesn't have an explicit close method for Database
glog.V(0).Infof("FoundationDB store shutdown")
}