diff options
Diffstat (limited to 'weed/filer/ydb/ydb_store.go')
| -rw-r--r-- | weed/filer/ydb/ydb_store.go | 256 |
1 files changed, 256 insertions, 0 deletions
diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go new file mode 100644 index 000000000..7278c6301 --- /dev/null +++ b/weed/filer/ydb/ydb_store.go @@ -0,0 +1,256 @@ +package ydb + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/yandex-cloud/ydb-go-sdk/v2" + "github.com/yandex-cloud/ydb-go-sdk/v2/connect" + "github.com/yandex-cloud/ydb-go-sdk/v2/table" + "strings" + "time" +) + +var ( + roTX = table.TxControl( + table.BeginTx(table.WithOnlineReadOnly()), + table.CommitTx(), + ) + rwTX = table.TxControl( + table.BeginTx(table.WithSerializableReadWrite()), + table.CommitTx(), + ) +) + +const ( + createQuery = ` + PRAGMA TablePathPrefix("%s"); + CREATE TABLE file_meta ( + dir_hash int64, + name Utf8, + directory Utf8, + meta String, + PRIMARY KEY (dir_hash, name) + );` + insertQuery = ` + DECLARE $dir_hash int64; + DECLARE $name AS Utf8; + DECLARE $directory AS Utf8; + DECLARE $meta AS String; + + UPSERT INTO file_meta + (dir_hash, name, directory, meta) + VALUES + ($dir_hash, $name, $directory, $meta);` + updateQuery = ` + DECLARE $dir_hash int64; + DECLARE $name AS Utf8; + DECLARE $directory AS Utf8; + DECLARE $meta AS String; + + REPLACE INTO file_meta + (dir_hash, name, directory, meta) + VALUES + ($dir_hash, $name, $directory, $meta) + COMMIT;` + deleteQuery = ` + DECLARE $dir_hash int64; + DECLARE $name AS Utf8; + + DELETE FROM file_meta + WHERE dir_hash == $dir_hash AND name == $name; + COMMIT;` + findQuery = ` + DECLARE $dir_hash int64; + DECLARE $name AS Utf8; + + SELECT meta + FROM file_meta + WHERE dir_hash == $dir_hash AND name == $name;` +) + +type YdbStore struct { + SupportBucketTable bool + DB *connect.Connection + connParams connect.ConnectParams + connCtx context.Context + dirBuckets string + tablePathPrefix string +} + +func init() { + filer.Stores = append(filer.Stores, &YdbStore{}) +} + +func (store *YdbStore) GetName() string { + return "ydb" +} + +func (store *YdbStore) Initialize(configuration util.Configuration, prefix string) (err error) { + return store.initialize(configuration.GetString(prefix + "coonectionUrl")) +} + +func (store *YdbStore) initialize(sqlUrl string) (err error) { + store.SupportBucketTable = false + var cancel context.CancelFunc + store.connCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + store.connParams = connect.MustConnectionString(sqlUrl) + store.DB, err = connect.New(store.connCtx, store.connParams) + if err != nil { + store.DB.Close() + store.DB = nil + return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err) + } + defer store.DB.Close() + + if err = store.DB.EnsurePathExists(store.connCtx, store.connParams.Database()); err != nil { + return fmt.Errorf("connect to %s error:%v", sqlUrl, err) + } + return nil +} + +func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Entry, query string) (err error) { + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + fileMeta := FileMeta{util.HashStringToLong(dir), name, dir, meta} + return table.Retry(ctx, store.DB.Table().Pool(), + table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) { + stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), query)) + if err != nil { + return err + } + _, _, err = stmt.Execute(ctx, rwTX, fileMeta.QueryParameters()) + return err + }), + ) +} + +func (store *YdbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.insertOrUpdateEntry(ctx, entry, insertQuery) +} + +func (store *YdbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.insertOrUpdateEntry(ctx, entry, updateQuery) +} + +func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) { + dir, name := fullpath.DirAndName() + var res *table.Result + err = table.Retry(ctx, store.DB.Table().Pool(), + table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) { + stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), findQuery)) + if err != nil { + return err + } + _, res, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters( + table.ValueParam("$dir_hash", ydb.Int64Value(util.HashStringToLong(dir))), + table.ValueParam("$name", ydb.UTF8Value(name)))) + return err + }), + ) + if err != nil { + return nil, err + } + for res.NextSet() { + for res.NextRow() { + res.SeekItem("meta") + entry.FullPath = fullpath + if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(res.String())); err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + return entry, nil + } + } + + return nil, filer_pb.ErrNotFound +} + +func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { + dir, name := fullpath.DirAndName() + return table.Retry(ctx, store.DB.Table().Pool(), + table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) { + stmt, err := s.Prepare(ctx, store.withPragma(store.getPrefix(dir), deleteQuery)) + if err != nil { + return err + } + _, _, err = stmt.Execute(ctx, rwTX, table.NewQueryParameters( + table.ValueParam("$dir_hash", ydb.Int64Value(util.HashStringToLong(dir))), + table.ValueParam("$name", ydb.UTF8Value(name)))) + return err + }), + ) +} + +func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) { + return nil +} + +func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) BeginTransaction(ctx context.Context) (context.Context, error) { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) CommitTransaction(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) RollbackTransaction(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) Shutdown() { + //TODO implement me + panic("implement me") +} + +func (store *YdbStore) getPrefix(dir string) string { + prefixBuckets := store.dirBuckets + "/" + if strings.HasPrefix(dir, prefixBuckets) { + // detect bucket + bucketAndDir := dir[len(prefixBuckets):] + if t := strings.Index(bucketAndDir, "/"); t > 0 { + return bucketAndDir[:t] + } + } + return "" +} + +func (store *YdbStore) withPragma(prefix, query string) string { + if store.tablePathPrefix != "" && store.tablePathPrefix != "/" { + prefix = store.tablePathPrefix + "/" + prefix + } + return `PRAGMA TablePathPrefix("` + prefix + `");` + query +} |
