aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/filer/tikv/tikv_store.go360
-rw-r--r--weed/filer/tikv/tikv_store_kv.go44
2 files changed, 404 insertions, 0 deletions
diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go
new file mode 100644
index 000000000..9bee57283
--- /dev/null
+++ b/weed/filer/tikv/tikv_store.go
@@ -0,0 +1,360 @@
+package tikv
+
+import (
+ "bytes"
+ "context"
+ "crypto/sha1"
+ "fmt"
+ "io"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/tikv/client-go/v2/tikv"
+ "github.com/tikv/client-go/v2/txnkv"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &TikvStore{})
+}
+
+type TikvStore struct {
+ client *tikv.KVStore
+}
+
+// Basic APIs
+func (store *TikvStore) GetName() string {
+ return "tikv"
+}
+
+func (store *TikvStore) Initialize(config util.Configuration, prefix string) error {
+ pdAddrs := []string{}
+ pdAddrsStr := config.GetString(prefix + "pdaddrs")
+ for _, item := range strings.Split(pdAddrsStr, ",") {
+ pdAddrs = append(pdAddrs, strings.TrimSpace(item))
+ }
+ return store.initialize(pdAddrs)
+}
+
+func (store *TikvStore) initialize(pdAddrs []string) error {
+ client, err := tikv.NewTxnClient(pdAddrs)
+ store.client = client
+ return err
+}
+
+func (store *TikvStore) Shutdown() {
+ err := store.client.Close()
+ if err != nil {
+ glog.V(0).Infof("Shutdown TiKV client got error: %v", err)
+ }
+}
+
+// ~ Basic APIs
+
+// Entry APIs
+func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
+ dir, name := entry.DirAndName()
+ key := generateKey(dir, name)
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Set(key, value)
+ })
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+ return nil
+}
+
+func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) {
+ dir, name := path.DirAndName()
+ key := generateKey(dir, name)
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return nil, err
+ }
+ var value []byte = nil
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ val, err := txn.Get(context.TODO(), key)
+ if err == nil {
+ value = val
+ }
+ return err
+ })
+
+ if isNotExists(err) || value == nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", path, err)
+ }
+
+ entry := &filer.Entry{
+ FullPath: path,
+ }
+ err = entry.DecodeAttributesAndChunks(value)
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+ return entry, nil
+}
+
+func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error {
+ dir, name := path.DirAndName()
+ key := generateKey(dir, name)
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Delete(key)
+ })
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", path, err)
+ }
+ return nil
+}
+
+// ~ Entry APIs
+
+// Directory APIs
+func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error {
+ directoryPrefix := genDirectoryKeyPrefix(path, "")
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ iter, err := txn.Iter(directoryPrefix, nil)
+ if err != nil {
+ return err
+ }
+ defer iter.Close()
+ for iter.Valid() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ err = txn.Delete(key)
+ if err != nil {
+ return err
+ }
+ err = iter.Next()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", path, err)
+ }
+ return nil
+}
+
+func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
+}
+
+func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
+ lastFileName := ""
+ directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
+ }
+
+ txn, err := store.getTxn(ctx)
+ if err != nil {
+ return lastFileName, err
+ }
+ err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
+ iter, err := txn.Iter(lastFileStart, nil)
+ if err != nil {
+ return err
+ }
+ defer iter.Close()
+ i := int64(0)
+ first := true
+ for iter.Valid() {
+ if first {
+ first = false
+ if !includeStartFile {
+ if iter.Valid() {
+ // Check first item is lastFileStart
+ if bytes.Equal(iter.Key(), lastFileStart) {
+ // Is lastFileStart and not include start file, just
+ // ignore it.
+ err = iter.Next()
+ if err != nil {
+ return err
+ }
+ continue
+ }
+ }
+ }
+ }
+ // Check for limitation
+ if limit > 0 {
+ i++
+ if i > limit {
+ break
+ }
+ }
+ // Validate key prefix
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ value := iter.Value()
+
+ // Start process
+ fileName := getNameFromKey(key)
+ if fileName != "" {
+ // Got file name, then generate the Entry
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(dirPath), fileName),
+ }
+ // Update lastFileName
+ lastFileName = fileName
+ // Check for decode value.
+ if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
+ // Got error just return the error
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ return err
+ }
+ // Run for each callback if return false just break the iteration
+ if !eachEntryFunc(entry) {
+ break
+ }
+ }
+ // End process
+
+ err = iter.Next()
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
+ }
+ return lastFileName, nil
+}
+
+// ~ Directory APIs
+
+// Transaction Related APIs
+func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ tx, err := store.client.Begin()
+ if err != nil {
+ return ctx, err
+ }
+ return context.WithValue(ctx, "tx", tx), nil
+}
+
+func (store *TikvStore) CommitTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
+ return tx.Commit(context.Background())
+ }
+ return nil
+}
+
+func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
+ return tx.Rollback()
+ }
+ return nil
+}
+
+// ~ Transaction Related APIs
+
+// Transaction Wrapper
+type TxnWrapper struct {
+ *txnkv.KVTxn
+ inContext bool
+}
+
+func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error {
+ err := f(w.KVTxn)
+ if !w.inContext {
+ if err != nil {
+ w.KVTxn.Rollback()
+ return err
+ }
+ w.KVTxn.Commit(context.Background())
+ return nil
+ }
+ return err
+}
+
+func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) {
+ if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok {
+ return &TxnWrapper{tx, true}, nil
+ }
+ txn, err := store.client.Begin()
+ if err != nil {
+ return nil, err
+ }
+ return &TxnWrapper{txn, false}, nil
+}
+
+// ~ Transaction Wrapper
+
+// Encoding Functions
+func hashToBytes(dir string) []byte {
+ h := sha1.New()
+ io.WriteString(h, dir)
+ b := h.Sum(nil)
+ return b
+}
+
+func generateKey(dirPath, fileName string) []byte {
+ key := hashToBytes(dirPath)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func getNameFromKey(key []byte) string {
+ return string(key[sha1.Size:])
+}
+
+func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = hashToBytes(string(fullpath))
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func isNotExists(err error) bool {
+ if err == nil {
+ return false
+ }
+ if err.Error() == "not exist" {
+ return true
+ }
+ return false
+}
+
+// ~ Encoding Functions
diff --git a/weed/filer/tikv/tikv_store_kv.go b/weed/filer/tikv/tikv_store_kv.go
new file mode 100644
index 000000000..2472759c0
--- /dev/null
+++ b/weed/filer/tikv/tikv_store_kv.go
@@ -0,0 +1,44 @@
+package tikv
+
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/tikv/client-go/v2/txnkv"
+)
+
+func (store *TikvStore) KvPut(ctx context.Context, key []byte, value []byte) error {
+ tw, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ return tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Set(key, value)
+ })
+}
+
+func (store *TikvStore) KvGet(ctx context.Context, key []byte) ([]byte, error) {
+ tw, err := store.getTxn(ctx)
+ if err != nil {
+ return nil, err
+ }
+ var data []byte = nil
+ err = tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ val, err := txn.Get(context.TODO(), key)
+ if err == nil {
+ data = val
+ }
+ return err
+ })
+ return data, err
+}
+
+func (store *TikvStore) KvDelete(ctx context.Context, key []byte) error {
+ tw, err := store.getTxn(ctx)
+ if err != nil {
+ return err
+ }
+ return tw.RunInTxn(func(txn *txnkv.KVTxn) error {
+ return txn.Delete(key)
+ })
+}