aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/imports.go1
-rw-r--r--weed/command/scaffold/filer.toml8
-rw-r--r--weed/filer/tikv/tikv.go5
-rw-r--r--weed/filer/tikv/tikv_store.go389
-rw-r--r--weed/filer/tikv/tikv_store_kv.go50
5 files changed, 0 insertions, 453 deletions
diff --git a/weed/command/imports.go b/weed/command/imports.go
index 48cda5f90..3792c45c4 100644
--- a/weed/command/imports.go
+++ b/weed/command/imports.go
@@ -31,5 +31,4 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
- _ "github.com/chrislusf/seaweedfs/weed/filer/tikv"
)
diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml
index aeb8a5b67..aec409408 100644
--- a/weed/command/scaffold/filer.toml
+++ b/weed/command/scaffold/filer.toml
@@ -252,11 +252,3 @@ location = "/tmp/"
address = "localhost:6379"
password = ""
database = 1
-
-[tikv]
-enabled = false
-# If you have many pd address, use ',' split then:
-# pdaddrs = "pdhost1:2379, pdhost2:2379, pdhost3:2379"
-pdaddrs = "localhost:2379"
-# Concurrency for TiKV delete range
-deleterange_concurrency = 1
diff --git a/weed/filer/tikv/tikv.go b/weed/filer/tikv/tikv.go
deleted file mode 100644
index 8bb5dc577..000000000
--- a/weed/filer/tikv/tikv.go
+++ /dev/null
@@ -1,5 +0,0 @@
-package tikv
-
-/*
- * This empty file is let go build can work without tikv tag
- */
diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go
deleted file mode 100644
index 561f23910..000000000
--- a/weed/filer/tikv/tikv_store.go
+++ /dev/null
@@ -1,389 +0,0 @@
-//go:build tikv
-// +build tikv
-
-package tikv
-
-import (
- "bytes"
- "context"
- "crypto/sha1"
- "fmt"
- "io"
- "strings"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/tikv/client-go/v2/tikv"
- "github.com/tikv/client-go/v2/txnkv"
-)
-
-var (
- _ filer.FilerStore = ((*TikvStore)(nil))
-)
-
-func init() {
- filer.Stores = append(filer.Stores, &TikvStore{})
-}
-
-type TikvStore struct {
- client *tikv.KVStore
- deleteRangeConcurrency int
-}
-
-// Basic APIs
-func (store *TikvStore) GetName() string {
- return "tikv"
-}
-
-func (store *TikvStore) Initialize(config util.Configuration, prefix string) error {
- pdAddrs := []string{}
- pdAddrsStr := config.GetString(prefix + "pdaddrs")
- for _, item := range strings.Split(pdAddrsStr, ",") {
- pdAddrs = append(pdAddrs, strings.TrimSpace(item))
- }
- drc := config.GetInt(prefix + "deleterange_concurrency")
- if drc <= 0 {
- drc = 1
- }
- store.deleteRangeConcurrency = drc
- return store.initialize(pdAddrs)
-}
-
-func (store *TikvStore) initialize(pdAddrs []string) error {
- client, err := tikv.NewTxnClient(pdAddrs)
- store.client = client
- return err
-}
-
-func (store *TikvStore) Shutdown() {
- err := store.client.Close()
- if err != nil {
- glog.V(0).Infof("Shutdown TiKV client got error: %v", err)
- }
-}
-
-// ~ Basic APIs
-
-// Entry APIs
-func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
- dir, name := entry.DirAndName()
- key := generateKey(dir, name)
-
- value, err := entry.EncodeAttributesAndChunks()
- if err != nil {
- return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
- }
- txn, err := store.getTxn(ctx)
- if err != nil {
- return err
- }
- err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
- return txn.Set(key, value)
- })
- if err != nil {
- return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
- }
- return nil
-}
-
-func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
- return store.InsertEntry(ctx, entry)
-}
-
-func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) {
- dir, name := path.DirAndName()
- key := generateKey(dir, name)
-
- txn, err := store.getTxn(ctx)
- if err != nil {
- return nil, err
- }
- var value []byte = nil
- err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
- val, err := txn.Get(context.TODO(), key)
- if err == nil {
- value = val
- }
- return err
- })
-
- if isNotExists(err) || value == nil {
- return nil, filer_pb.ErrNotFound
- }
-
- if err != nil {
- return nil, fmt.Errorf("get %s : %v", path, err)
- }
-
- entry := &filer.Entry{
- FullPath: path,
- }
- err = entry.DecodeAttributesAndChunks(value)
- if err != nil {
- return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
- }
- return entry, nil
-}
-
-func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error {
- dir, name := path.DirAndName()
- key := generateKey(dir, name)
-
- txn, err := store.getTxn(ctx)
- if err != nil {
- return err
- }
-
- err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
- return txn.Delete(key)
- })
- if err != nil {
- return fmt.Errorf("delete %s : %v", path, err)
- }
- return nil
-}
-
-// ~ Entry APIs
-
-// Directory APIs
-func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error {
- directoryPrefix := genDirectoryKeyPrefix(path, "")
-
- txn, err := store.getTxn(ctx)
- if err != nil {
- return err
- }
- var (
- startKey []byte = nil
- endKey []byte = nil
- )
- err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
- iter, err := txn.Iter(directoryPrefix, nil)
- if err != nil {
- return err
- }
- defer iter.Close()
- for iter.Valid() {
- key := iter.Key()
- endKey = key
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- if startKey == nil {
- startKey = key
- }
-
- err = iter.Next()
- if err != nil {
- return err
- }
- }
- // Only one Key matched just delete it.
- if startKey != nil && bytes.Equal(startKey, endKey) {
- return txn.Delete(startKey)
- }
- return nil
- })
- if err != nil {
- return fmt.Errorf("delete %s : %v", path, err)
- }
-
- if startKey != nil && endKey != nil && !bytes.Equal(startKey, endKey) {
- // has startKey and endKey and they are not equals, so use delete range
- _, err = store.client.DeleteRange(context.Background(), startKey, endKey, store.deleteRangeConcurrency)
- if err != nil {
- return fmt.Errorf("delete %s : %v", path, err)
- }
- }
- return err
-}
-
-func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
- return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
-}
-
-func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
- lastFileName := ""
- directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
- lastFileStart := directoryPrefix
- if startFileName != "" {
- lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
- }
-
- txn, err := store.getTxn(ctx)
- if err != nil {
- return lastFileName, err
- }
- err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
- iter, err := txn.Iter(lastFileStart, nil)
- if err != nil {
- return err
- }
- defer iter.Close()
- i := int64(0)
- first := true
- for iter.Valid() {
- if first {
- first = false
- if !includeStartFile {
- if iter.Valid() {
- // Check first item is lastFileStart
- if bytes.Equal(iter.Key(), lastFileStart) {
- // Is lastFileStart and not include start file, just
- // ignore it.
- err = iter.Next()
- if err != nil {
- return err
- }
- continue
- }
- }
- }
- }
- // Check for limitation
- if limit > 0 {
- i++
- if i > limit {
- break
- }
- }
- // Validate key prefix
- key := iter.Key()
- if !bytes.HasPrefix(key, directoryPrefix) {
- break
- }
- value := iter.Value()
-
- // Start process
- fileName := getNameFromKey(key)
- if fileName != "" {
- // Got file name, then generate the Entry
- entry := &filer.Entry{
- FullPath: util.NewFullPath(string(dirPath), fileName),
- }
- // Update lastFileName
- lastFileName = fileName
- // Check for decode value.
- if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
- // Got error just return the error
- glog.V(0).Infof("list %s : %v", entry.FullPath, err)
- return err
- }
- // Run for each callback if return false just break the iteration
- if !eachEntryFunc(entry) {
- break
- }
- }
- // End process
-
- err = iter.Next()
- if err != nil {
- return err
- }
- }
- return nil
- })
- if err != nil {
- return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
- }
- return lastFileName, nil
-}
-
-// ~ Directory APIs
-
-// Transaction Related APIs
-func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- tx, err := store.client.Begin()
- if err != nil {
- return ctx, err
- }
- return context.WithValue(ctx, "tx", tx), nil
-}
-
-func (store *TikvStore) CommitTransaction(ctx context.Context) error {
- if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
- return tx.Commit(context.Background())
- }
- return nil
-}
-
-func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
- if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
- return tx.Rollback()
- }
- return nil
-}
-
-// ~ Transaction Related APIs
-
-// Transaction Wrapper
-type TxnWrapper struct {
- *txnkv.KVTxn
- inContext bool
-}
-
-func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error {
- err := f(w.KVTxn)
- if !w.inContext {
- if err != nil {
- w.KVTxn.Rollback()
- return err
- }
- w.KVTxn.Commit(context.Background())
- return nil
- }
- return err
-}
-
-func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) {
- if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
- return &TxnWrapper{tx, true}, nil
- }
- txn, err := store.client.Begin()
- if err != nil {
- return nil, err
- }
- return &TxnWrapper{txn, false}, nil
-}
-
-// ~ Transaction Wrapper
-
-// Encoding Functions
-func hashToBytes(dir string) []byte {
- h := sha1.New()
- io.WriteString(h, dir)
- b := h.Sum(nil)
- return b
-}
-
-func generateKey(dirPath, fileName string) []byte {
- key := hashToBytes(dirPath)
- key = append(key, []byte(fileName)...)
- return key
-}
-
-func getNameFromKey(key []byte) string {
- return string(key[sha1.Size:])
-}
-
-func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) {
- keyPrefix = hashToBytes(string(fullpath))
- if len(startFileName) > 0 {
- keyPrefix = append(keyPrefix, []byte(startFileName)...)
- }
- return keyPrefix
-}
-
-func isNotExists(err error) bool {
- if err == nil {
- return false
- }
- if err.Error() == "not exist" {
- return true
- }
- return false
-}
-
-// ~ Encoding Functions
diff --git a/weed/filer/tikv/tikv_store_kv.go b/weed/filer/tikv/tikv_store_kv.go
deleted file mode 100644
index 1d9428c69..000000000
--- a/weed/filer/tikv/tikv_store_kv.go
+++ /dev/null
@@ -1,50 +0,0 @@
-//go:build tikv
-// +build tikv
-
-package tikv
-
-import (
- "context"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/tikv/client-go/v2/txnkv"
-)
-
-func (store *TikvStore) KvPut(ctx context.Context, key []byte, value []byte) error {
- tw, err := store.getTxn(ctx)
- if err != nil {
- return err
- }
- return tw.RunInTxn(func(txn *txnkv.KVTxn) error {
- return txn.Set(key, value)
- })
-}
-
-func (store *TikvStore) KvGet(ctx context.Context, key []byte) ([]byte, error) {
- tw, err := store.getTxn(ctx)
- if err != nil {
- return nil, err
- }
- var data []byte = nil
- err = tw.RunInTxn(func(txn *txnkv.KVTxn) error {
- val, err := txn.Get(context.TODO(), key)
- if err == nil {
- data = val
- }
- return err
- })
- if isNotExists(err) {
- return data, filer.ErrKvNotFound
- }
- return data, err
-}
-
-func (store *TikvStore) KvDelete(ctx context.Context, key []byte) error {
- tw, err := store.getTxn(ctx)
- if err != nil {
- return err
- }
- return tw.RunInTxn(func(txn *txnkv.KVTxn) error {
- return txn.Delete(key)
- })
-}