aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/ydb/ydb_store.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/ydb/ydb_store.go')
-rw-r--r--weed/filer/ydb/ydb_store.go75
1 files changed, 37 insertions, 38 deletions
diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go
index 6d6389dde..678f58143 100644
--- a/weed/filer/ydb/ydb_store.go
+++ b/weed/filer/ydb/ydb_store.go
@@ -55,7 +55,7 @@ func (store *YdbStore) Initialize(configuration util.Configuration, prefix strin
return store.initialize(
configuration.GetString("filer.options.buckets_folder"),
configuration.GetString(prefix+"dsn"),
- configuration.GetString(prefix+"tablePathPrefix"),
+ configuration.GetString(prefix+"prefix"),
configuration.GetBool(prefix+"useBucketPrefix"),
configuration.GetInt(prefix+"connectionTimeOut"),
configuration.GetInt(prefix+"poolSizeLimit"),
@@ -64,7 +64,6 @@ func (store *YdbStore) Initialize(configuration util.Configuration, prefix strin
func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix string, useBucketPrefix bool, connectionTimeOut int, poolSizeLimit int) (err error) {
store.dirBuckets = dirBuckets
- store.tablePathPrefix = tablePathPrefix
store.SupportBucketTable = useBucketPrefix
store.dbs = make(map[string]bool)
ctx, cancel := context.WithCancel(context.Background())
@@ -83,7 +82,7 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix
dsn = os.Getenv("YDB_CONNECTION_STRING")
}
store.DB, err = ydb.Open(ctx, dsn, opts...)
- if err != nil {
+ if err != nil || store.DB == nil {
if store.DB != nil {
_ = store.DB.Close(ctx)
store.DB = nil
@@ -92,16 +91,12 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix
}
store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix)
- if err = sugar.RemoveRecursive(ctx, store.DB, store.tablePathPrefix); err != nil {
- return fmt.Errorf("RemoveRecursive %s : %v", store.tablePathPrefix, err)
- }
if err = sugar.MakeRecursive(ctx, store.DB, store.tablePathPrefix); err != nil {
return fmt.Errorf("MakeRecursive %s : %v", store.tablePathPrefix, err)
}
- tablePath := path.Join(store.tablePathPrefix, abstract_sql.DEFAULT_TABLE)
- if err = store.createTable(ctx, tablePath); err != nil {
- glog.Errorf("createTable %s: %v", tablePath, err)
+ if err = store.createTable(ctx, store.tablePathPrefix); err != nil {
+ glog.Errorf("createTable %s: %v", store.tablePathPrefix, err)
}
return err
}
@@ -126,9 +121,15 @@ func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *tabl
return nil
})
}
- if err != nil && processResultFunc != nil && res != nil {
- if err = processResultFunc(res); err != nil {
- return fmt.Errorf("process resul: %v", err)
+ if err != nil {
+ return err
+ }
+ if res != nil {
+ defer func() { _ = res.Close() }()
+ if processResultFunc != nil {
+ if err = processResultFunc(res); err != nil {
+ return fmt.Errorf("process result: %v", err)
+ }
}
}
return err
@@ -167,15 +168,14 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e
table.ValueParam("$name", types.UTF8Value(name)))
err = store.doTxOrDB(ctx, &queryWithPragma, queryParams, roTX, func(res result.Result) error {
- defer func() {
- _ = res.Close()
- }()
- for res.NextRow() {
- if err := res.ScanNamed(named.Required("meta", &data)); err != nil {
- return fmt.Errorf("scanNamed %s : %v", entry.FullPath, err)
+ for res.NextResultSet(ctx) {
+ for res.NextRow() {
+ if err = res.ScanNamed(named.OptionalWithDefault("meta", &data)); err != nil {
+ return fmt.Errorf("scanNamed %s : %v", fullpath, err)
+ }
+ entryFound = true
+ return nil
}
- entryFound = true
- return nil
}
return res.Err()
})
@@ -185,9 +185,12 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e
if !entryFound {
return nil, filer_pb.ErrNotFound
}
- entry.FullPath = fullpath
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
- return nil, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ return nil, fmt.Errorf("decode %s : %v", fullpath, err)
}
return entry, nil
@@ -232,17 +235,14 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath
table.ValueParam("$limit", types.Uint64Value(uint64(limit))),
)
err = store.doTxOrDB(ctx, &queryWithPragma, queryParams, roTX, func(res result.Result) error {
- defer func() {
- _ = res.Close()
- }()
+ var name string
+ var data []byte
for res.NextResultSet(ctx) {
for res.NextRow() {
- var name string
- var data []byte
if err := res.ScanNamed(
- named.Required("name", &name),
- named.Required("meta", &data)); err != nil {
- return fmt.Errorf("scanNamed %s : %v", dir, err)
+ named.OptionalWithDefault("name", &name),
+ named.OptionalWithDefault("meta", &data)); err != nil {
+ return fmt.Errorf("list scanNamed %s : %v", dir, err)
}
lastFileName = name
entry := &filer.Entry{
@@ -304,7 +304,7 @@ func (store *YdbStore) OnBucketCreation(bucket string) {
defer store.dbsLock.Unlock()
if err := store.createTable(context.Background(),
- path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE)); err != nil {
+ path.Join(store.tablePathPrefix, bucket)); err != nil {
glog.Errorf("createTable %s: %v", bucket, err)
}
@@ -319,7 +319,7 @@ func (store *YdbStore) OnBucketDeletion(bucket string) {
defer store.dbsLock.Unlock()
if err := store.deleteTable(context.Background(),
- path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE)); err != nil {
+ path.Join(store.tablePathPrefix, bucket)); err != nil {
glog.Errorf("deleteTable %s: %v", bucket, err)
}
@@ -331,7 +331,7 @@ func (store *YdbStore) OnBucketDeletion(bucket string) {
func (store *YdbStore) createTable(ctx context.Context, prefix string) error {
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
- return s.CreateTable(ctx, prefix, createTableOptions()...)
+ return s.CreateTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE), createTableOptions()...)
})
}
@@ -340,7 +340,7 @@ func (store *YdbStore) deleteTable(ctx context.Context, prefix string) error {
return nil
}
return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
- return s.DropTable(ctx, prefix)
+ return s.DropTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE))
})
}
@@ -366,15 +366,14 @@ func (store *YdbStore) getPrefix(ctx context.Context, dir string) (tablePathPref
store.dbsLock.Lock()
defer store.dbsLock.Unlock()
+ tablePathPrefix = path.Join(store.tablePathPrefix, bucket)
if _, found := store.dbs[bucket]; !found {
- if err := store.createTable(ctx,
- path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE)); err == nil {
+ if err := store.createTable(ctx, tablePathPrefix); err == nil {
store.dbs[bucket] = true
} else {
- glog.Errorf("createTable %s: %v", bucket, err)
+ glog.Errorf("createTable %s: %v", tablePathPrefix, err)
}
}
- tablePathPrefix = path.Join(store.tablePathPrefix, bucket)
}
return
}