aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-05-01 22:28:55 +0500
committerKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-05-01 22:28:55 +0500
commit21033ff4c3fcf1516cab32a0b955a40f3e9119d9 (patch)
treea5dfb25e446176b9a15c82b53736784885c3f4c5
parentec0ed41e375d99ffc7d6a4290e92470c7eabc8e7 (diff)
downloadseaweedfs-21033ff4c3fcf1516cab32a0b955a40f3e9119d9.tar.xz
seaweedfs-21033ff4c3fcf1516cab32a0b955a40f3e9119d9.zip
refactor use const CountEntryChunksForGzip
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go2
-rw-r--r--weed/filer/arangodb/arangodb_store.go4
-rw-r--r--weed/filer/cassandra/cassandra_store.go2
-rw-r--r--weed/filer/etcd/etcd_store.go2
-rw-r--r--weed/filer/filerstore.go2
-rw-r--r--weed/filer/hbase/hbase_store.go2
-rw-r--r--weed/filer/leveldb/leveldb_store.go2
-rw-r--r--weed/filer/leveldb2/leveldb2_store.go2
-rw-r--r--weed/filer/leveldb3/leveldb3_store.go2
-rw-r--r--weed/filer/mongodb/mongodb_store.go2
-rw-r--r--weed/filer/redis/universal_redis_store.go2
-rw-r--r--weed/filer/redis2/universal_redis_store.go2
-rw-r--r--weed/filer/redis3/universal_redis_store.go2
-rw-r--r--weed/filer/redis_lua/universal_redis_store.go2
-rw-r--r--weed/filer/ydb/readme.md11
-rw-r--r--weed/filer/ydb/ydb_queries.go2
-rw-r--r--weed/filer/ydb/ydb_store.go48
17 files changed, 55 insertions, 36 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index 4bf9b16fa..13268b944 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -156,7 +156,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = util.MaybeGzipData(meta)
}
diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go
index 9fd1fffb3..13d14b2b0 100644
--- a/weed/filer/arangodb/arangodb_store.go
+++ b/weed/filer/arangodb/arangodb_store.go
@@ -157,7 +157,7 @@ func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = util.MaybeGzipData(meta)
}
model := &Model{
@@ -196,7 +196,7 @@ func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = util.MaybeGzipData(meta)
}
model := &Model{
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
index fb61b0771..d8c094a45 100644
--- a/weed/filer/cassandra/cassandra_store.go
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -100,7 +100,7 @@ func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = util.MaybeGzipData(meta)
}
diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go
index 2a5dfc926..4146a3899 100644
--- a/weed/filer/etcd/etcd_store.go
+++ b/weed/filer/etcd/etcd_store.go
@@ -82,7 +82,7 @@ func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer.Entry) (er
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = weed_util.MaybeGzipData(meta)
}
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index a092ee456..260945b33 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -7,6 +7,8 @@ import (
"io"
)
+const CountEntryChunksForGzip = 50
+
var (
ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing")
diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go
index e0d878ca7..c5d6eb48c 100644
--- a/weed/filer/hbase/hbase_store.go
+++ b/weed/filer/hbase/hbase_store.go
@@ -75,7 +75,7 @@ func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) er
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go
index 73d757e62..6abb37f99 100644
--- a/weed/filer/leveldb/leveldb_store.go
+++ b/weed/filer/leveldb/leveldb_store.go
@@ -86,7 +86,7 @@ func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = weed_util.MaybeGzipData(value)
}
diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go
index 966686ed9..d68493bd7 100644
--- a/weed/filer/leveldb2/leveldb2_store.go
+++ b/weed/filer/leveldb2/leveldb2_store.go
@@ -88,7 +88,7 @@ func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = weed_util.MaybeGzipData(value)
}
diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go
index e448f0093..d21515bd4 100644
--- a/weed/filer/leveldb3/leveldb3_store.go
+++ b/weed/filer/leveldb3/leveldb3_store.go
@@ -177,7 +177,7 @@ func (store *LevelDB3Store) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = weed_util.MaybeGzipData(value)
}
diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go
index c12354ad6..83686bfe7 100644
--- a/weed/filer/mongodb/mongodb_store.go
+++ b/weed/filer/mongodb/mongodb_store.go
@@ -107,7 +107,7 @@ func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = util.MaybeGzipData(meta)
}
diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go
index 0cdf58d7f..89684647b 100644
--- a/weed/filer/redis/universal_redis_store.go
+++ b/weed/filer/redis/universal_redis_store.go
@@ -40,7 +40,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go
index deccf8922..7a34092a0 100644
--- a/weed/filer/redis2/universal_redis_store.go
+++ b/weed/filer/redis2/universal_redis_store.go
@@ -52,7 +52,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
diff --git a/weed/filer/redis3/universal_redis_store.go b/weed/filer/redis3/universal_redis_store.go
index f04ee493d..10a87e2a4 100644
--- a/weed/filer/redis3/universal_redis_store.go
+++ b/weed/filer/redis3/universal_redis_store.go
@@ -40,7 +40,7 @@ func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
diff --git a/weed/filer/redis_lua/universal_redis_store.go b/weed/filer/redis_lua/universal_redis_store.go
index 9674ac03f..0ab0f2f24 100644
--- a/weed/filer/redis_lua/universal_redis_store.go
+++ b/weed/filer/redis_lua/universal_redis_store.go
@@ -53,7 +53,7 @@ func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *fil
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
diff --git a/weed/filer/ydb/readme.md b/weed/filer/ydb/readme.md
index 90d7a18e9..2221e13b5 100644
--- a/weed/filer/ydb/readme.md
+++ b/weed/filer/ydb/readme.md
@@ -8,14 +8,9 @@ options:
```
[ydb]
enabled=true
-db_name="seaweedfs"
-servers=["http://localhost:8529"]
-#basic auth
-user="root"
-pass="test"
-
-# tls settings
-insecure_skip_verify=true
+prefix="seaweedfs"
+useBucketPrefix=true
+coonectionUrl=grpcs://ydb-ru.yandex.net:2135/?database=/ru/home/username/db
```
get ydb types
diff --git a/weed/filer/ydb/ydb_queries.go b/weed/filer/ydb/ydb_queries.go
index 57b282a7a..fdfc8bcb1 100644
--- a/weed/filer/ydb/ydb_queries.go
+++ b/weed/filer/ydb/ydb_queries.go
@@ -67,6 +67,6 @@ const (
SELECT name, meta
FROM file_meta
- WHERE dir_hash == $dir_hash AND directory == $directory and name %v $start_name and name LIKE '$prefix%'
+ WHERE dir_hash == $dir_hash AND directory == $directory and name %s $start_name and name LIKE '$prefix%%'
ORDER BY name ASC LIMIT $limit;`
)
diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go
index aedc11ec5..d82fc4da7 100644
--- a/weed/filer/ydb/ydb_store.go
+++ b/weed/filer/ydb/ydb_store.go
@@ -15,6 +15,10 @@ import (
"time"
)
+const (
+ defaultConnectionTimeOut = 10
+)
+
var (
roTX = table.TxControl(
table.BeginTx(table.WithOnlineReadOnly()),
@@ -29,8 +33,6 @@ var (
type YdbStore struct {
SupportBucketTable bool
DB *connect.Connection
- connParams connect.ConnectParams
- connCtx context.Context
dirBuckets string
tablePathPrefix string
}
@@ -44,16 +46,27 @@ func (store *YdbStore) GetName() string {
}
func (store *YdbStore) Initialize(configuration util.Configuration, prefix string) (err error) {
- return store.initialize(configuration.GetString(prefix + "coonectionUrl"))
+ return store.initialize(
+ configuration.GetString("filer.options.buckets_folder"),
+ configuration.GetString(prefix+"coonectionUrl"),
+ configuration.GetString(prefix+"tablePathPrefix"),
+ configuration.GetBool(prefix+"useBucketPrefix"),
+ configuration.GetInt(prefix+"connectionTimeOut"),
+ )
}
-func (store *YdbStore) initialize(sqlUrl string) (err error) {
- store.SupportBucketTable = false
+func (store *YdbStore) initialize(dirBuckets string, sqlUrl string, tablePathPrefix string, useBucketPrefix bool, connectionTimeOut int) (err error) {
+ store.dirBuckets = dirBuckets
+ store.tablePathPrefix = tablePathPrefix
+ store.SupportBucketTable = useBucketPrefix
+ if connectionTimeOut == 0 {
+ connectionTimeOut = defaultConnectionTimeOut
+ }
var cancel context.CancelFunc
- store.connCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
+ connCtx, cancel := context.WithTimeout(context.Background(), time.Duration(connectionTimeOut)*time.Second)
defer cancel()
- store.connParams = connect.MustConnectionString(sqlUrl)
- store.DB, err = connect.New(store.connCtx, store.connParams)
+ connParams := connect.MustConnectionString(sqlUrl)
+ store.DB, err = connect.New(connCtx, connParams)
if err != nil {
store.DB.Close()
store.DB = nil
@@ -61,7 +74,7 @@ func (store *YdbStore) initialize(sqlUrl string) (err error) {
}
defer store.DB.Close()
- if err = store.DB.EnsurePathExists(store.connCtx, store.connParams.Database()); err != nil {
+ if err = store.DB.EnsurePathExists(connCtx, connParams.Database()); err != nil {
return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
}
return nil
@@ -73,6 +86,11 @@ func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Ent
if err != nil {
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
+
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
+ meta = util.MaybeGzipData(meta)
+ }
+
fileMeta := FileMeta{util.HashStringToLong(dir), name, dir, meta}
return table.Retry(ctx, store.DB.Table().Pool(),
table.OperationFunc(func(ctx context.Context, s *table.Session) (err error) {
@@ -114,7 +132,7 @@ func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (e
}
defer res.Close()
- for res.NextSet() {
+ for res.NextResultSet(ctx) {
for res.NextRow() {
res.SeekItem("meta")
entry.FullPath = fullpath
@@ -251,17 +269,21 @@ func (store *YdbStore) Shutdown() {
}
func (store *YdbStore) getPrefix(dir string) string {
+ if !store.SupportBucketTable {
+ return store.tablePathPrefix
+ }
+
prefixBuckets := store.dirBuckets + "/"
if strings.HasPrefix(dir, prefixBuckets) {
// detect bucket
bucketAndDir := dir[len(prefixBuckets):]
if t := strings.Index(bucketAndDir, "/"); t > 0 {
- return bucketAndDir[:t]
+ return path.Join(bucketAndDir[:t], store.tablePathPrefix)
}
}
- return ""
+ return store.tablePathPrefix
}
func (store *YdbStore) withPragma(prefix, query string) string {
- return `PRAGMA TablePathPrefix("` + path.Join(store.tablePathPrefix, prefix) + `");` + query
+ return `PRAGMA TablePathPrefix("` + prefix + `");` + query
}