aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSmoothDenis <syropyatov@tochka.com>2025-05-12 13:20:25 +0500
committerGitHub <noreply@github.com>2025-05-12 01:20:25 -0700
commit45964c2f869358ef456109cbd51c060f4c55e8f8 (patch)
tree7f7d760c5163bbaca937b5a09e057ff490e4b977
parent9c1048bacb66307e456796f328c06670068912b7 (diff)
downloadseaweedfs-45964c2f869358ef456109cbd51c060f4c55e8f8.tar.xz
seaweedfs-45964c2f869358ef456109cbd51c060f4c55e8f8.zip
fix: ydb filer bugs (#6778)
* fix: ydb filer bugs * fix(ydb): correct DeleteEntry log argument types * fix(ydb): bucket creation & deletion logic
-rw-r--r--weed/filer/ydb/ydb_store.go92
1 files changed, 72 insertions, 20 deletions
diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go
index b4f2de5b8..a9ad6666e 100644
--- a/weed/filer/ydb/ydb_store.go
+++ b/weed/filer/ydb/ydb_store.go
@@ -6,6 +6,12 @@ package ydb
import (
"context"
"fmt"
+ "os"
+ "path"
+ "strings"
+ "sync"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -13,16 +19,10 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
environ "github.com/ydb-platform/ydb-go-sdk-auth-environ"
"github.com/ydb-platform/ydb-go-sdk/v3"
- "github.com/ydb-platform/ydb-go-sdk/v3/sugar"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
- "os"
- "path"
- "strings"
- "sync"
- "time"
)
const (
@@ -97,12 +97,9 @@ func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix
}
store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix)
- if err = sugar.MakeRecursive(ctx, store.DB, store.tablePathPrefix); err != nil {
- return fmt.Errorf("MakeRecursive %s : %v", store.tablePathPrefix, err)
- }
- if err = store.createTable(ctx, store.tablePathPrefix); err != nil {
- glog.Errorf("createTable %s: %v", store.tablePathPrefix, err)
+ if err := store.ensureTables(ctx); err != nil {
+ return err
}
return err
}
@@ -121,7 +118,9 @@ func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *tabl
return fmt.Errorf("execute statement: %v", err)
}
return nil
- })
+ },
+ table.WithIdempotent(),
+ )
}
if err != nil {
return err
@@ -204,6 +203,7 @@ func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath)
dir, name := fullpath.DirAndName()
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
query := withPragma(tablePathPrefix, deleteQuery)
+ glog.V(4).Infof("DeleteEntry %s, tablePathPrefix %s, shortDir %s", fullpath, *tablePathPrefix, *shortDir)
queryParams := table.NewQueryParameters(
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
table.ValueParam("$name", types.UTF8Value(name)))
@@ -212,7 +212,7 @@ func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath)
}
func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
- dir, _ := fullpath.DirAndName()
+ dir := string(fullpath)
tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
query := withPragma(tablePathPrefix, deleteFolderChildrenQuery)
queryParams := table.NewQueryParameters(
@@ -246,12 +246,19 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath
}
}
restLimit := limit - entryCount
+ const maxChunk = int64(1000)
+ chunkLimit := restLimit
+ if chunkLimit > maxChunk {
+ chunkLimit = maxChunk
+ }
+ glog.V(4).Infof("startFileName %s, restLimit %d, chunkLimit %d", startFileName, restLimit, chunkLimit)
+
queryParams := table.NewQueryParameters(
table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
table.ValueParam("$directory", types.UTF8Value(*shortDir)),
table.ValueParam("$start_name", types.UTF8Value(startFileName)),
table.ValueParam("$prefix", types.UTF8Value(prefix+"%")),
- table.ValueParam("$limit", types.Uint64Value(uint64(restLimit))),
+ table.ValueParam("$limit", types.Uint64Value(uint64(chunkLimit))),
)
err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error {
var name string
@@ -261,12 +268,14 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath
return nil
}
truncated = res.CurrentResultSet().Truncated()
+ glog.V(4).Infof("truncated %v, entryCount %d", truncated, entryCount)
for res.NextRow() {
if err := res.ScanNamed(
named.OptionalWithDefault("name", &name),
named.OptionalWithDefault("meta", &data)); err != nil {
return fmt.Errorf("list scanNamed %s : %v", dir, err)
}
+ glog.V(8).Infof("name %s, fullpath %s", name, util.NewFullPath(dir, name))
lastFileName = name
entry := &filer.Entry{
FullPath: util.NewFullPath(dir, name),
@@ -327,12 +336,16 @@ func (store *YdbStore) CanDropWholeBucket() bool {
}
func (store *YdbStore) OnBucketCreation(bucket string) {
+ if !store.SupportBucketTable {
+ return
+ }
+ prefix := path.Join(store.tablePathPrefix, bucket)
+
store.dbsLock.Lock()
defer store.dbsLock.Unlock()
- if err := store.createTable(context.Background(),
- path.Join(store.tablePathPrefix, bucket)); err != nil {
- glog.Errorf("createTable %s: %v", bucket, err)
+ if err := store.createTable(context.Background(), prefix); err != nil {
+ glog.Errorf("createTable %s: %v", prefix, err)
}
if store.dbs == nil {
@@ -342,12 +355,21 @@ func (store *YdbStore) OnBucketCreation(bucket string) {
}
func (store *YdbStore) OnBucketDeletion(bucket string) {
+ if !store.SupportBucketTable {
+ return
+ }
store.dbsLock.Lock()
defer store.dbsLock.Unlock()
- if err := store.deleteTable(context.Background(),
- path.Join(store.tablePathPrefix, bucket)); err != nil {
- glog.Errorf("deleteTable %s: %v", bucket, err)
+ prefix := path.Join(store.tablePathPrefix, bucket)
+ glog.V(4).Infof("deleting table %s", prefix)
+
+ if err := store.deleteTable(context.Background(), prefix); err != nil {
+ glog.Errorf("deleteTable %s: %v", prefix, err)
+ }
+
+ if err := store.DB.Scheme().RemoveDirectory(context.Background(), prefix); err != nil {
+ glog.Errorf("remove directory %s: %v", prefix, err)
}
if store.dbs == nil {
@@ -384,9 +406,11 @@ func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPre
}
prefixBuckets := store.dirBuckets + "/"
+ glog.V(4).Infof("dir: %s, prefixBuckets: %s", *dir, prefixBuckets)
if strings.HasPrefix(*dir, prefixBuckets) {
// detect bucket
bucketAndDir := (*dir)[len(prefixBuckets):]
+ glog.V(4).Infof("bucketAndDir: %s", bucketAndDir)
var bucket string
if t := strings.Index(bucketAndDir, "/"); t > 0 {
bucket = bucketAndDir[:t]
@@ -413,3 +437,31 @@ func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPre
}
return
}
+
+func (store *YdbStore) ensureTables(ctx context.Context) error {
+ prefixFull := store.tablePathPrefix
+
+ glog.V(4).Infof("creating base table %s", prefixFull)
+ baseTable := path.Join(prefixFull, abstract_sql.DEFAULT_TABLE)
+ if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
+ return s.CreateTable(ctx, baseTable, createTableOptions()...)
+ }); err != nil {
+ return fmt.Errorf("failed to create base table %s: %v", baseTable, err)
+ }
+
+ glog.V(4).Infof("creating bucket tables")
+ if store.SupportBucketTable {
+ store.dbsLock.Lock()
+ defer store.dbsLock.Unlock()
+ for bucket := range store.dbs {
+ glog.V(4).Infof("creating bucket table %s", bucket)
+ bucketTable := path.Join(prefixFull, bucket, abstract_sql.DEFAULT_TABLE)
+ if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
+ return s.CreateTable(ctx, bucketTable, createTableOptions()...)
+ }); err != nil {
+ glog.Errorf("failed to create bucket table %s: %v", bucketTable, err)
+ }
+ }
+ }
+ return nil
+}