diff options
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/filer/tikv/tikv_store.go | 360 | ||||
| -rw-r--r-- | weed/filer/tikv/tikv_store_kv.go | 44 |
2 files changed, 404 insertions, 0 deletions
diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go new file mode 100644 index 000000000..9bee57283 --- /dev/null +++ b/weed/filer/tikv/tikv_store.go @@ -0,0 +1,360 @@ +package tikv + +import ( + "bytes" + "context" + "crypto/sha1" + "fmt" + "io" + "strings" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/txnkv" +) + +func init() { + filer.Stores = append(filer.Stores, &TikvStore{}) +} + +type TikvStore struct { + client *tikv.KVStore +} + +// Basic APIs +func (store *TikvStore) GetName() string { + return "tikv" +} + +func (store *TikvStore) Initialize(config util.Configuration, prefix string) error { + pdAddrs := []string{} + pdAddrsStr := config.GetString(prefix + "pdaddrs") + for _, item := range strings.Split(pdAddrsStr, ",") { + pdAddrs = append(pdAddrs, strings.TrimSpace(item)) + } + return store.initialize(pdAddrs) +} + +func (store *TikvStore) initialize(pdAddrs []string) error { + client, err := tikv.NewTxnClient(pdAddrs) + store.client = client + return err +} + +func (store *TikvStore) Shutdown() { + err := store.client.Close() + if err != nil { + glog.V(0).Infof("Shutdown TiKV client got error: %v", err) + } +} + +// ~ Basic APIs + +// Entry APIs +func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer.Entry) error { + dir, name := entry.DirAndName() + key := generateKey(dir, name) + + value, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) + } + txn, err := store.getTxn(ctx) + if err != nil { + return err + } + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + return txn.Set(key, value) + }) + if err != nil { + return fmt.Errorf("persisting %s : %v", entry.FullPath, err) + } + return nil +} + +func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error { + return store.InsertEntry(ctx, entry) +} + +func (store *TikvStore) FindEntry(ctx context.Context, path util.FullPath) (*filer.Entry, error) { + dir, name := path.DirAndName() + key := generateKey(dir, name) + + txn, err := store.getTxn(ctx) + if err != nil { + return nil, err + } + var value []byte = nil + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + val, err := txn.Get(context.TODO(), key) + if err == nil { + value = val + } + return err + }) + + if isNotExists(err) || value == nil { + return nil, filer_pb.ErrNotFound + } + + if err != nil { + return nil, fmt.Errorf("get %s : %v", path, err) + } + + entry := &filer.Entry{ + FullPath: path, + } + err = entry.DecodeAttributesAndChunks(value) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + return entry, nil +} + +func (store *TikvStore) DeleteEntry(ctx context.Context, path util.FullPath) error { + dir, name := path.DirAndName() + key := generateKey(dir, name) + + txn, err := store.getTxn(ctx) + if err != nil { + return err + } + + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + return txn.Delete(key) + }) + if err != nil { + return fmt.Errorf("delete %s : %v", path, err) + } + return nil +} + +// ~ Entry APIs + +// Directory APIs +func (store *TikvStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) error { + directoryPrefix := genDirectoryKeyPrefix(path, "") + + txn, err := store.getTxn(ctx) + if err != nil { + return err + } + + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + iter, err := txn.Iter(directoryPrefix, nil) + if err != nil { + return err + } + defer iter.Close() + for iter.Valid() { + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + err = txn.Delete(key) + if err != nil { + return err + } + err = iter.Next() + if err != nil { + return err + } + } + return nil + }) + if err != nil { + return fmt.Errorf("delete %s : %v", path, err) + } + return nil +} + +func (store *TikvStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc) +} + +func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (string, error) { + lastFileName := "" + directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix) + lastFileStart := directoryPrefix + if startFileName != "" { + lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName) + } + + txn, err := store.getTxn(ctx) + if err != nil { + return lastFileName, err + } + err = txn.RunInTxn(func(txn *txnkv.KVTxn) error { + iter, err := txn.Iter(lastFileStart, nil) + if err != nil { + return err + } + defer iter.Close() + i := int64(0) + first := true + for iter.Valid() { + if first { + first = false + if !includeStartFile { + if iter.Valid() { + // Check first item is lastFileStart + if bytes.Equal(iter.Key(), lastFileStart) { + // Is lastFileStart and not include start file, just + // ignore it. + err = iter.Next() + if err != nil { + return err + } + continue + } + } + } + } + // Check for limitation + if limit > 0 { + i++ + if i > limit { + break + } + } + // Validate key prefix + key := iter.Key() + if !bytes.HasPrefix(key, directoryPrefix) { + break + } + value := iter.Value() + + // Start process + fileName := getNameFromKey(key) + if fileName != "" { + // Got file name, then generate the Entry + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(dirPath), fileName), + } + // Update lastFileName + lastFileName = fileName + // Check for decode value. + if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil { + // Got error just return the error + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + return err + } + // Run for each callback if return false just break the iteration + if !eachEntryFunc(entry) { + break + } + } + // End process + + err = iter.Next() + if err != nil { + return err + } + } + return nil + }) + if err != nil { + return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err) + } + return lastFileName, nil +} + +// ~ Directory APIs + +// Transaction Related APIs +func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) { + tx, err := store.client.Begin() + if err != nil { + return ctx, err + } + return context.WithValue(ctx, "tx", tx), nil +} + +func (store *TikvStore) CommitTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { + return tx.Commit(context.Background()) + } + return nil +} + +func (store *TikvStore) RollbackTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { + return tx.Rollback() + } + return nil +} + +// ~ Transaction Related APIs + +// Transaction Wrapper +type TxnWrapper struct { + *txnkv.KVTxn + inContext bool +} + +func (w *TxnWrapper) RunInTxn(f func(txn *txnkv.KVTxn) error) error { + err := f(w.KVTxn) + if !w.inContext { + if err != nil { + w.KVTxn.Rollback() + return err + } + w.KVTxn.Commit(context.Background()) + return nil + } + return err +} + +func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) { + if tx, ok := ctx.Value("tx").(*txnkv.KVTxn); ok { + return &TxnWrapper{tx, true}, nil + } + txn, err := store.client.Begin() + if err != nil { + return nil, err + } + return &TxnWrapper{txn, false}, nil +} + +// ~ Transaction Wrapper + +// Encoding Functions +func hashToBytes(dir string) []byte { + h := sha1.New() + io.WriteString(h, dir) + b := h.Sum(nil) + return b +} + +func generateKey(dirPath, fileName string) []byte { + key := hashToBytes(dirPath) + key = append(key, []byte(fileName)...) + return key +} + +func getNameFromKey(key []byte) string { + return string(key[sha1.Size:]) +} + +func genDirectoryKeyPrefix(fullpath util.FullPath, startFileName string) (keyPrefix []byte) { + keyPrefix = hashToBytes(string(fullpath)) + if len(startFileName) > 0 { + keyPrefix = append(keyPrefix, []byte(startFileName)...) + } + return keyPrefix +} + +func isNotExists(err error) bool { + if err == nil { + return false + } + if err.Error() == "not exist" { + return true + } + return false +} + +// ~ Encoding Functions diff --git a/weed/filer/tikv/tikv_store_kv.go b/weed/filer/tikv/tikv_store_kv.go new file mode 100644 index 000000000..2472759c0 --- /dev/null +++ b/weed/filer/tikv/tikv_store_kv.go @@ -0,0 +1,44 @@ +package tikv + +import ( + "context" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/tikv/client-go/v2/txnkv" +) + +func (store *TikvStore) KvPut(ctx context.Context, key []byte, value []byte) error { + tw, err := store.getTxn(ctx) + if err != nil { + return err + } + return tw.RunInTxn(func(txn *txnkv.KVTxn) error { + return txn.Set(key, value) + }) +} + +func (store *TikvStore) KvGet(ctx context.Context, key []byte) ([]byte, error) { + tw, err := store.getTxn(ctx) + if err != nil { + return nil, err + } + var data []byte = nil + err = tw.RunInTxn(func(txn *txnkv.KVTxn) error { + val, err := txn.Get(context.TODO(), key) + if err == nil { + data = val + } + return err + }) + return data, err +} + +func (store *TikvStore) KvDelete(ctx context.Context, key []byte) error { + tw, err := store.getTxn(ctx) + if err != nil { + return err + } + return tw.RunInTxn(func(txn *txnkv.KVTxn) error { + return txn.Delete(key) + }) +} |
