aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
authorMaratKarimov <wto17ty@gmail.com>2025-03-30 07:12:06 +0300
committerGitHub <noreply@github.com>2025-03-29 21:12:06 -0700
commitba3afd18037919861470f99a3554bd4c20c1d95c (patch)
tree95151f374e88ae2e72bbacee61af8cd6b360b22a /weed/filer
parent528702d30b05b83642affb0dabb09926357d1f10 (diff)
downloadseaweedfs-ba3afd18037919861470f99a3554bd4c20c1d95c.tar.xz
seaweedfs-ba3afd18037919861470f99a3554bd4c20c1d95c.zip
Tarantool filer store (#6669)
Co-authored-by: Marat Karimov <m.karimov@digitalms.ru>
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/store_test/test_suite.go17
-rw-r--r--weed/filer/tarantool/doc.go7
-rw-r--r--weed/filer/tarantool/readme.md11
-rw-r--r--weed/filer/tarantool/tarantool_store.go318
-rw-r--r--weed/filer/tarantool/tarantool_store_kv.go95
-rw-r--r--weed/filer/tarantool/tarantool_store_test.go24
6 files changed, 470 insertions, 2 deletions
diff --git a/weed/filer/store_test/test_suite.go b/weed/filer/store_test/test_suite.go
index 1e4149589..fda694f26 100644
--- a/weed/filer/store_test/test_suite.go
+++ b/weed/filer/store_test/test_suite.go
@@ -29,16 +29,29 @@ func TestFilerStore(t *testing.T, store filer.FilerStore) {
})
assert.Nil(t, err, "list directory")
assert.Equal(t, 3, counter, "directory list counter")
- assert.Equal(t, "f00003", lastFileName, "directory list last file")
+ assert.Equal(t, "f00002", lastFileName, "directory list last file")
lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) bool {
counter++
return true
})
assert.Nil(t, err, "list directory")
assert.Equal(t, 1027, counter, "directory list counter")
- assert.Equal(t, "f01027", lastFileName, "directory list last file")
+ assert.Equal(t, "f01026", lastFileName, "directory list last file")
}
+ testKey := []byte("test_key")
+ testValue1 := []byte("test_value1")
+ testValue2 := []byte("test_value2")
+
+ err := store.KvPut(ctx, testKey, testValue1)
+ assert.Nil(t, err, "KV put")
+ value, err := store.KvGet(ctx, testKey)
+ assert.Equal(t, value, testValue1, "KV get")
+
+ err = store.KvPut(ctx, testKey, testValue2)
+ assert.Nil(t, err, "KV update")
+ value, err = store.KvGet(ctx, testKey)
+ assert.Equal(t, value, testValue2, "KV get after update")
}
func makeEntry(fullPath util.FullPath, isDirectory bool) *filer.Entry {
diff --git a/weed/filer/tarantool/doc.go b/weed/filer/tarantool/doc.go
new file mode 100644
index 000000000..3c448e8e1
--- /dev/null
+++ b/weed/filer/tarantool/doc.go
@@ -0,0 +1,7 @@
+/*
+Package tarantool is for Tarantool filer store.
+
+The referenced "github.com/tarantool/go-tarantool/v2" library is too big when compiled.
+So this is only compiled in "make full_install".
+*/
+package tarantool
diff --git a/weed/filer/tarantool/readme.md b/weed/filer/tarantool/readme.md
new file mode 100644
index 000000000..b51241488
--- /dev/null
+++ b/weed/filer/tarantool/readme.md
@@ -0,0 +1,11 @@
+## Tarantool
+
+database: https://www.tarantool.io/
+
+go driver: https://github.com/tarantool/go-tarantool/
+
+To set up local env:
+`make -C docker test_tarantool`
+
+Run tests:
+`RUN_TARANTOOL_TESTS=1 go test -tags=tarantool ./weed/filer/tarantool` \ No newline at end of file
diff --git a/weed/filer/tarantool/tarantool_store.go b/weed/filer/tarantool/tarantool_store.go
new file mode 100644
index 000000000..8d19db60d
--- /dev/null
+++ b/weed/filer/tarantool/tarantool_store.go
@@ -0,0 +1,318 @@
+//go:build tarantool
+// +build tarantool
+
+package tarantool
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ weed_util "github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/tarantool/go-tarantool/v2"
+ "github.com/tarantool/go-tarantool/v2/crud"
+ "github.com/tarantool/go-tarantool/v2/pool"
+)
+
+const (
+ tarantoolSpaceName = "filer_metadata"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &TarantoolStore{})
+}
+
+type TarantoolStore struct {
+ pool *pool.ConnectionPool
+}
+
+func (store *TarantoolStore) GetName() string {
+ return "tarantool"
+}
+
+func (store *TarantoolStore) Initialize(configuration weed_util.Configuration, prefix string) error {
+
+ configuration.SetDefault(prefix+"address", "localhost:3301")
+ configuration.SetDefault(prefix+"user", "guest")
+ configuration.SetDefault(prefix+"password", "")
+ configuration.SetDefault(prefix+"timeout", "5s")
+ configuration.SetDefault(prefix+"maxReconnects", "1000")
+
+ address := configuration.GetString(prefix + "address")
+ user := configuration.GetString(prefix + "user")
+ password := configuration.GetString(prefix + "password")
+
+ timeoutStr := configuration.GetString(prefix + "timeout")
+ timeout, err := time.ParseDuration(timeoutStr)
+ if err != nil {
+ return fmt.Errorf("parse tarantool store timeout: %v", err)
+ }
+
+ maxReconnects := configuration.GetInt(prefix + "maxReconnects")
+ if maxReconnects < 0 {
+ return fmt.Errorf("maxReconnects is negative")
+ }
+
+ addresses := strings.Split(address, ",")
+
+ return store.initialize(addresses, user, password, timeout, uint(maxReconnects))
+}
+
+func (store *TarantoolStore) initialize(addresses []string, user string, password string, timeout time.Duration, maxReconnects uint) error {
+
+ opts := tarantool.Opts{
+ Timeout: timeout,
+ Reconnect: time.Second,
+ MaxReconnects: maxReconnects,
+ }
+
+ poolInstances := makePoolInstances(addresses, user, password, opts)
+ poolOpts := pool.Opts{
+ CheckTimeout: time.Second,
+ }
+
+ ctx := context.Background()
+ p, err := pool.ConnectWithOpts(ctx, poolInstances, poolOpts)
+ if err != nil {
+ return fmt.Errorf("Can't create connection pool: %v", err)
+ }
+
+ _, err = p.Do(tarantool.NewPingRequest(), pool.ANY).Get()
+ if err != nil {
+ return err
+ }
+
+ store.pool = p
+
+ return nil
+}
+
+func makePoolInstances(addresses []string, user string, password string, opts tarantool.Opts) []pool.Instance {
+ poolInstances := make([]pool.Instance, 0, len(addresses))
+ for i, address := range addresses {
+ poolInstances = append(poolInstances, makePoolInstance(address, user, password, opts, i))
+ }
+ return poolInstances
+}
+
+func makePoolInstance(address string, user string, password string, opts tarantool.Opts, serial int) pool.Instance {
+ return pool.Instance{
+ Name: fmt.Sprintf("instance%d", serial),
+ Dialer: tarantool.NetDialer{
+ Address: address,
+ User: user,
+ Password: password,
+ },
+ Opts: opts,
+ }
+}
+
+func (store *TarantoolStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+
+func (store *TarantoolStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *TarantoolStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *TarantoolStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
+ meta = util.MaybeGzipData(meta)
+ }
+
+ var ttl int64
+ if entry.TtlSec > 0 {
+ ttl = time.Now().Unix() + int64(entry.TtlSec)
+ } else {
+ ttl = 0
+ }
+
+ var operations = []crud.Operation{
+ {
+ Operator: crud.Insert,
+ Field: "data",
+ Value: string(meta),
+ },
+ }
+
+ req := crud.MakeUpsertRequest(tarantoolSpaceName).
+ Tuple([]interface{}{dir, nil, name, ttl, string(meta)}).
+ Operations(operations)
+
+ ret := crud.Result{}
+
+ if err := store.pool.Do(req, pool.RW).GetTyped(&ret); err != nil {
+ return fmt.Errorf("insert %s: %s", entry.FullPath, err)
+ }
+
+ return nil
+}
+
+func (store *TarantoolStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *TarantoolStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
+ dir, name := fullpath.DirAndName()
+
+ findEntryGetOpts := crud.GetOpts{
+ Fields: crud.MakeOptTuple([]interface{}{"data"}),
+ Mode: crud.MakeOptString("read"),
+ PreferReplica: crud.MakeOptBool(true),
+ Balance: crud.MakeOptBool(true),
+ }
+
+ req := crud.MakeGetRequest(tarantoolSpaceName).
+ Key(crud.Tuple([]interface{}{dir, name})).
+ Opts(findEntryGetOpts)
+
+ resp := crud.Result{}
+
+ err = store.pool.Do(req, pool.PreferRO).GetTyped(&resp)
+ if err != nil {
+ return nil, err
+ }
+
+ results, ok := resp.Rows.([]interface{})
+ if !ok || len(results) != 1 {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ rows, ok := results[0].([]interface{})
+ if !ok || len(rows) != 1 {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ row, ok := rows[0].(string)
+ if !ok {
+ return nil, fmt.Errorf("Can't convert rows[0] field to string. Actual type: %v, value: %v", reflect.TypeOf(rows[0]), rows[0])
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+
+ err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData([]byte(row)))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *TarantoolStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ dir, name := fullpath.DirAndName()
+
+ delOpts := crud.DeleteOpts{
+ Noreturn: crud.MakeOptBool(true),
+ }
+
+ req := crud.MakeDeleteRequest(tarantoolSpaceName).
+ Key(crud.Tuple([]interface{}{dir, name})).
+ Opts(delOpts)
+
+ if _, err := store.pool.Do(req, pool.RW).Get(); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *TarantoolStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ req := tarantool.NewCallRequest("filer_metadata.delete_by_directory_idx").
+ Args([]interface{}{fullpath})
+
+ if _, err := store.pool.Do(req, pool.RW).Get(); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *TarantoolStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *TarantoolStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+
+ req := tarantool.NewCallRequest("filer_metadata.find_by_directory_idx_and_name").
+ Args([]interface{}{string(dirPath), startFileName, includeStartFile, limit})
+
+ results, err := store.pool.Do(req, pool.PreferRO).Get()
+ if err != nil {
+ return
+ }
+
+ if len(results) < 1 {
+ glog.Errorf("Can't find results, data is empty")
+ return
+ }
+
+ rows, ok := results[0].([]interface{})
+ if !ok {
+ glog.Errorf("Can't convert results[0] to list")
+ return
+ }
+
+ for _, result := range rows {
+ row, ok := result.([]interface{})
+ if !ok {
+ glog.Errorf("Can't convert result to list")
+ return
+ }
+
+ if len(row) < 5 {
+ glog.Errorf("Length of result is less than needed: %v", len(row))
+ return
+ }
+
+ nameRaw := row[2]
+ name, ok := nameRaw.(string)
+ if !ok {
+ glog.Errorf("Can't convert name field to string. Actual type: %v, value: %v", reflect.TypeOf(nameRaw), nameRaw)
+ return
+ }
+
+ dataRaw := row[4]
+ data, ok := dataRaw.(string)
+ if !ok {
+ glog.Errorf("Can't convert data field to string. Actual type: %v, value: %v", reflect.TypeOf(dataRaw), dataRaw)
+ return
+ }
+
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(string(dirPath), name),
+ }
+ lastFileName = name
+ if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ if !eachEntryFunc(entry) {
+ break
+ }
+ }
+
+ return lastFileName, err
+}
+
+func (store *TarantoolStore) Shutdown() {
+ store.pool.Close()
+}
diff --git a/weed/filer/tarantool/tarantool_store_kv.go b/weed/filer/tarantool/tarantool_store_kv.go
new file mode 100644
index 000000000..e9f0f4dd0
--- /dev/null
+++ b/weed/filer/tarantool/tarantool_store_kv.go
@@ -0,0 +1,95 @@
+//go:build tarantool
+// +build tarantool
+
+package tarantool
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/tarantool/go-tarantool/v2/crud"
+ "github.com/tarantool/go-tarantool/v2/pool"
+)
+
+const (
+ tarantoolKVSpaceName = "key_value"
+)
+
+func (store *TarantoolStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+
+ var operations = []crud.Operation{
+ {
+ Operator: crud.Insert,
+ Field: "value",
+ Value: string(value),
+ },
+ }
+
+ req := crud.MakeUpsertRequest(tarantoolKVSpaceName).
+ Tuple([]interface{}{string(key), nil, string(value)}).
+ Operations(operations)
+
+ ret := crud.Result{}
+ if err := store.pool.Do(req, pool.RW).GetTyped(&ret); err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+
+ return nil
+}
+
+func (store *TarantoolStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+
+ getOpts := crud.GetOpts{
+ Fields: crud.MakeOptTuple([]interface{}{"value"}),
+ Mode: crud.MakeOptString("read"),
+ PreferReplica: crud.MakeOptBool(true),
+ Balance: crud.MakeOptBool(true),
+ }
+
+ req := crud.MakeGetRequest(tarantoolKVSpaceName).
+ Key(crud.Tuple([]interface{}{string(key)})).
+ Opts(getOpts)
+
+ resp := crud.Result{}
+
+ err = store.pool.Do(req, pool.PreferRO).GetTyped(&resp)
+ if err != nil {
+ return nil, err
+ }
+
+ results, ok := resp.Rows.([]interface{})
+ if !ok || len(results) != 1 {
+ return nil, filer.ErrKvNotFound
+ }
+
+ rows, ok := results[0].([]interface{})
+ if !ok || len(rows) != 1 {
+ return nil, filer.ErrKvNotFound
+ }
+
+ row, ok := rows[0].(string)
+ if !ok {
+ return nil, fmt.Errorf("Can't convert rows[0] field to string. Actual type: %v, value: %v", reflect.TypeOf(rows[0]), rows[0])
+ }
+
+ return []byte(row), nil
+}
+
+func (store *TarantoolStore) KvDelete(ctx context.Context, key []byte) (err error) {
+
+ delOpts := crud.DeleteOpts{
+ Noreturn: crud.MakeOptBool(true),
+ }
+
+ req := crud.MakeDeleteRequest(tarantoolKVSpaceName).
+ Key(crud.Tuple([]interface{}{string(key)})).
+ Opts(delOpts)
+
+ if _, err := store.pool.Do(req, pool.RW).Get(); err != nil {
+ return fmt.Errorf("kv delete: %v", err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/tarantool/tarantool_store_test.go b/weed/filer/tarantool/tarantool_store_test.go
new file mode 100644
index 000000000..500289773
--- /dev/null
+++ b/weed/filer/tarantool/tarantool_store_test.go
@@ -0,0 +1,24 @@
+//go:build tarantool
+// +build tarantool
+
+package tarantool
+
+import (
+ "os"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer/store_test"
+)
+
+func TestStore(t *testing.T) {
+ // run "make test_tarantool" under docker folder.
+ // to set up local env
+ if os.Getenv("RUN_TARANTOOL_TESTS") != "1" {
+ t.Skip("Tarantool tests are disabled. Set RUN_TARANTOOL_TESTS=1 to enable.")
+ }
+ store := &TarantoolStore{}
+ addresses := []string{"127.0.1:3303"}
+ store.initialize(addresses, "client", "client", 5*time.Second, 1000)
+ store_test.TestFilerStore(t, store)
+}