aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/scaffold/filer.toml5
-rw-r--r--weed/filer/tikv/tikv_store.go120
-rw-r--r--weed/filer/tikv/tikv_store_kv.go6
3 files changed, 78 insertions, 53 deletions
diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml
index 080d8f78b..61a7ced6d 100644
--- a/weed/command/scaffold/filer.toml
+++ b/weed/command/scaffold/filer.toml
@@ -381,10 +381,11 @@ enabled = false
# If you have many pd address, use ',' split then:
# pdaddrs = "pdhost1:2379, pdhost2:2379, pdhost3:2379"
pdaddrs = "localhost:2379"
-# Concurrency for TiKV delete range
-deleterange_concurrency = 1
# Enable 1PC
enable_1pc = false
+# batch delete count, default 10000 in code
+#batchdelete_count = 20000
+
# Set the CA certificate path
ca_path=""
# Set the certificate path
diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go
index 307d2b3fb..6749e6bc2 100644
--- a/weed/filer/tikv/tikv_store.go
+++ b/weed/filer/tikv/tikv_store.go
@@ -20,6 +20,8 @@ import (
"github.com/tikv/client-go/v2/txnkv"
)
+const defaultBatchCommitSize = 10000
+
var (
_ filer.FilerStore = ((*TikvStore)(nil))
)
@@ -29,9 +31,9 @@ func init() {
}
type TikvStore struct {
- client *txnkv.Client
- deleteRangeConcurrency int
- onePC bool
+ client *txnkv.Client
+ onePC bool
+ batchCommitSize int
}
// Basic APIs
@@ -46,12 +48,13 @@ func (store *TikvStore) Initialize(config util.Configuration, prefix string) err
verify_cn := strings.Split(config.GetString(prefix+"verify_cn"), ",")
pdAddrs := strings.Split(config.GetString(prefix+"pdaddrs"), ",")
- drc := config.GetInt(prefix + "deleterange_concurrency")
- if drc <= 0 {
- drc = 1
+ bdc := config.GetInt(prefix + "batchdelete_count")
+ if bdc <= 0 {
+ bdc = defaultBatchCommitSize
}
+
store.onePC = config.GetBool(prefix + "enable_1pc")
- store.deleteRangeConcurrency = drc
+ store.batchCommitSize = bdc
return store.initialize(ca, cert, key, verify_cn, pdAddrs)
}
@@ -86,7 +89,7 @@ func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) err
if err != nil {
return err
}
- err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error {
return txn.Set(key, value)
})
if err != nil {
@@ -108,7 +111,7 @@ func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*fil
return nil, err
}
var value []byte = nil
- err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error {
val, err := txn.Get(context.TODO(), key)
if err == nil {
value = val
@@ -143,7 +146,7 @@ func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) err
return err
}
- err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error {
return txn.Delete(key)
})
if err != nil {
@@ -158,53 +161,75 @@ func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) err
func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error {
directoryPrefix := genDirectoryKeyPrefix(path, "")
- txn, err := store.getTxn(ctx)
+ iterTxn, err := store.getTxn(ctx)
if err != nil {
return err
}
- var (
- startKey []byte = nil
- endKey []byte = nil
- )
- err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
- iter, err := txn.Iter(directoryPrefix, nil)
- if err != nil {
- return err
+
+ if !iterTxn.inContext {
+ defer func() {
+ _ = iterTxn.Rollback()
+ }()
+ }
+
+ iter, err := iterTxn.Iter(directoryPrefix, nil)
+ if err != nil {
+ return err
+ }
+ defer iter.Close()
+
+ var keys [][]byte
+
+ for iter.Valid() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
}
- defer iter.Close()
- for iter.Valid() {
- key := iter.Key()
- endKey = key
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- if startKey == nil {
- startKey = key
- }
- err = iter.Next()
- if err != nil {
- return err
+ keys = append(keys, append([]byte(nil), key...))
+
+ if len(keys) >= store.batchCommitSize {
+ if err := store.deleteBatch(ctx, keys); err != nil {
+ return fmt.Errorf("delete batch in %s, error: %v", path, err)
}
+ keys = keys[:0]
}
- // Only one Key matched just delete it.
- if startKey != nil && bytes.Equal(startKey, endKey) {
- return txn.Delete(startKey)
+
+ if err := iter.Next(); err != nil {
+ return err
}
- return nil
- })
+ }
+
+ if len(keys) > 0 {
+ if err := store.deleteBatch(ctx, keys); err != nil {
+ return fmt.Errorf("delete batch in %s, error: %v", path, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *TikvStore) deleteBatch(ctx context.Context, keys [][]byte) error {
+ deleteTxn, err := store.getTxn(ctx)
if err != nil {
- return fmt.Errorf("delete %s : %v", path, err)
+ return err
}
- if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) {
- // has startKey and endKey and they are not equals, so use delete range
- _, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency)
- if err != nil {
- return fmt.Errorf("delete %s : %v", path, err)
+ if !deleteTxn.inContext {
+ defer func() { _ = deleteTxn.Rollback() }()
+ }
+
+ for _, key := range keys {
+ if err := deleteTxn.Delete(key); err != nil {
+ return err
}
}
- return err
+
+ if !deleteTxn.inContext {
+ return deleteTxn.Commit(ctx)
+ }
+
+ return nil
}
func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
@@ -224,7 +249,7 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat
return lastFileName, err
}
var callbackErr error
- err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ err = txn.RunInTxn(ctx, func(txn *txnkv.KVTxn) error {
iter, err := txn.Iter(lastFileStart, nil)
if err != nil {
return err
@@ -353,15 +378,14 @@ type TxnWrapper struct {
inContext bool
}
-func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error {
+func (w *TxnWrapper) RunInTxn(ctx context.Context, f func(txn *txnkv.KVTxn) error) error {
err := f(w.KVTxn)
if !w.inContext {
if err != nil {
w.KVTxn.Rollback()
return err
}
- w.KVTxn.Commit(context.Background())
- return nil
+ return w.KVTxn.Commit(ctx)
}
return err
}
diff --git a/weed/filer/tikv/tikv_store_kv.go b/weed/filer/tikv/tikv_store_kv.go
index a2aaafb7a..0266dcfdb 100644
--- a/weed/filer/tikv/tikv_store_kv.go
+++ b/weed/filer/tikv/tikv_store_kv.go
@@ -15,7 +15,7 @@ func (store *TikvStore) KvPut(ctx context.Context, key []byte, value []byte) err
if err != nil {
return err
}
- return tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return tw.RunInTxn(ctx, func(txn *txnkv.KVTxn) error {
return txn.Set(key, value)
})
}
@@ -26,7 +26,7 @@ func (store *TikvStore) KvGet(ctx context.Context, key []byte) ([]byte, error) {
return nil, err
}
var data []byte = nil
- err = tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ err = tw.RunInTxn(ctx, func(txn *txnkv.KVTxn) error {
val, err := txn.Get(context.TODO(), key)
if err == nil {
data = val
@@ -44,7 +44,7 @@ func (store *TikvStore) KvDelete(ctx context.Context, key []byte) error {
if err != nil {
return err
}
- return tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return tw.RunInTxn(ctx, func(txn *txnkv.KVTxn) error {
return txn.Delete(key)
})
}