diff options
| author | MaratKarimov <wto17ty@gmail.com> | 2025-03-30 07:12:06 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-03-29 21:12:06 -0700 |
| commit | ba3afd18037919861470f99a3554bd4c20c1d95c (patch) | |
| tree | 95151f374e88ae2e72bbacee61af8cd6b360b22a /weed/filer | |
| parent | 528702d30b05b83642affb0dabb09926357d1f10 (diff) | |
| download | seaweedfs-ba3afd18037919861470f99a3554bd4c20c1d95c.tar.xz seaweedfs-ba3afd18037919861470f99a3554bd4c20c1d95c.zip | |
Tarantool filer store (#6669)
Co-authored-by: Marat Karimov <m.karimov@digitalms.ru>
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/store_test/test_suite.go | 17 | ||||
| -rw-r--r-- | weed/filer/tarantool/doc.go | 7 | ||||
| -rw-r--r-- | weed/filer/tarantool/readme.md | 11 | ||||
| -rw-r--r-- | weed/filer/tarantool/tarantool_store.go | 318 | ||||
| -rw-r--r-- | weed/filer/tarantool/tarantool_store_kv.go | 95 | ||||
| -rw-r--r-- | weed/filer/tarantool/tarantool_store_test.go | 24 |
6 files changed, 470 insertions, 2 deletions
diff --git a/weed/filer/store_test/test_suite.go b/weed/filer/store_test/test_suite.go index 1e4149589..fda694f26 100644 --- a/weed/filer/store_test/test_suite.go +++ b/weed/filer/store_test/test_suite.go @@ -29,16 +29,29 @@ func TestFilerStore(t *testing.T, store filer.FilerStore) { }) assert.Nil(t, err, "list directory") assert.Equal(t, 3, counter, "directory list counter") - assert.Equal(t, "f00003", lastFileName, "directory list last file") + assert.Equal(t, "f00002", lastFileName, "directory list last file") lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) bool { counter++ return true }) assert.Nil(t, err, "list directory") assert.Equal(t, 1027, counter, "directory list counter") - assert.Equal(t, "f01027", lastFileName, "directory list last file") + assert.Equal(t, "f01026", lastFileName, "directory list last file") } + testKey := []byte("test_key") + testValue1 := []byte("test_value1") + testValue2 := []byte("test_value2") + + err := store.KvPut(ctx, testKey, testValue1) + assert.Nil(t, err, "KV put") + value, err := store.KvGet(ctx, testKey) + assert.Equal(t, value, testValue1, "KV get") + + err = store.KvPut(ctx, testKey, testValue2) + assert.Nil(t, err, "KV update") + value, err = store.KvGet(ctx, testKey) + assert.Equal(t, value, testValue2, "KV get after update") } func makeEntry(fullPath util.FullPath, isDirectory bool) *filer.Entry { diff --git a/weed/filer/tarantool/doc.go b/weed/filer/tarantool/doc.go new file mode 100644 index 000000000..3c448e8e1 --- /dev/null +++ b/weed/filer/tarantool/doc.go @@ -0,0 +1,7 @@ +/* +Package tarantool is for Tarantool filer store. + +The referenced "github.com/tarantool/go-tarantool/v2" library is too big when compiled. +So this is only compiled in "make full_install". +*/ +package tarantool diff --git a/weed/filer/tarantool/readme.md b/weed/filer/tarantool/readme.md new file mode 100644 index 000000000..b51241488 --- /dev/null +++ b/weed/filer/tarantool/readme.md @@ -0,0 +1,11 @@ +## Tarantool + +database: https://www.tarantool.io/ + +go driver: https://github.com/tarantool/go-tarantool/ + +To set up local env: +`make -C docker test_tarantool` + +Run tests: +`RUN_TARANTOOL_TESTS=1 go test -tags=tarantool ./weed/filer/tarantool`
\ No newline at end of file diff --git a/weed/filer/tarantool/tarantool_store.go b/weed/filer/tarantool/tarantool_store.go new file mode 100644 index 000000000..8d19db60d --- /dev/null +++ b/weed/filer/tarantool/tarantool_store.go @@ -0,0 +1,318 @@ +//go:build tarantool +// +build tarantool + +package tarantool + +import ( + "context" + "fmt" + "reflect" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + weed_util "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/tarantool/go-tarantool/v2" + "github.com/tarantool/go-tarantool/v2/crud" + "github.com/tarantool/go-tarantool/v2/pool" +) + +const ( + tarantoolSpaceName = "filer_metadata" +) + +func init() { + filer.Stores = append(filer.Stores, &TarantoolStore{}) +} + +type TarantoolStore struct { + pool *pool.ConnectionPool +} + +func (store *TarantoolStore) GetName() string { + return "tarantool" +} + +func (store *TarantoolStore) Initialize(configuration weed_util.Configuration, prefix string) error { + + configuration.SetDefault(prefix+"address", "localhost:3301") + configuration.SetDefault(prefix+"user", "guest") + configuration.SetDefault(prefix+"password", "") + configuration.SetDefault(prefix+"timeout", "5s") + configuration.SetDefault(prefix+"maxReconnects", "1000") + + address := configuration.GetString(prefix + "address") + user := configuration.GetString(prefix + "user") + password := configuration.GetString(prefix + "password") + + timeoutStr := configuration.GetString(prefix + "timeout") + timeout, err := time.ParseDuration(timeoutStr) + if err != nil { + return fmt.Errorf("parse tarantool store timeout: %v", err) + } + + maxReconnects := configuration.GetInt(prefix + "maxReconnects") + if maxReconnects < 0 { + return fmt.Errorf("maxReconnects is negative") + } + + addresses := strings.Split(address, ",") + + return store.initialize(addresses, user, password, timeout, uint(maxReconnects)) +} + +func (store *TarantoolStore) initialize(addresses []string, user string, password string, timeout time.Duration, maxReconnects uint) error { + + opts := tarantool.Opts{ + Timeout: timeout, + Reconnect: time.Second, + MaxReconnects: maxReconnects, + } + + poolInstances := makePoolInstances(addresses, user, password, opts) + poolOpts := pool.Opts{ + CheckTimeout: time.Second, + } + + ctx := context.Background() + p, err := pool.ConnectWithOpts(ctx, poolInstances, poolOpts) + if err != nil { + return fmt.Errorf("Can't create connection pool: %v", err) + } + + _, err = p.Do(tarantool.NewPingRequest(), pool.ANY).Get() + if err != nil { + return err + } + + store.pool = p + + return nil +} + +func makePoolInstances(addresses []string, user string, password string, opts tarantool.Opts) []pool.Instance { + poolInstances := make([]pool.Instance, 0, len(addresses)) + for i, address := range addresses { + poolInstances = append(poolInstances, makePoolInstance(address, user, password, opts, i)) + } + return poolInstances +} + +func makePoolInstance(address string, user string, password string, opts tarantool.Opts, serial int) pool.Instance { + return pool.Instance{ + Name: fmt.Sprintf("instance%d", serial), + Dialer: tarantool.NetDialer{ + Address: address, + User: user, + Password: password, + }, + Opts: opts, + } +} + +func (store *TarantoolStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} + +func (store *TarantoolStore) CommitTransaction(ctx context.Context) error { + return nil +} + +func (store *TarantoolStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *TarantoolStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + if len(entry.GetChunks()) > filer.CountEntryChunksForGzip { + meta = util.MaybeGzipData(meta) + } + + var ttl int64 + if entry.TtlSec > 0 { + ttl = time.Now().Unix() + int64(entry.TtlSec) + } else { + ttl = 0 + } + + var operations = []crud.Operation{ + { + Operator: crud.Insert, + Field: "data", + Value: string(meta), + }, + } + + req := crud.MakeUpsertRequest(tarantoolSpaceName). + Tuple([]interface{}{dir, nil, name, ttl, string(meta)}). + Operations(operations) + + ret := crud.Result{} + + if err := store.pool.Do(req, pool.RW).GetTyped(&ret); err != nil { + return fmt.Errorf("insert %s: %s", entry.FullPath, err) + } + + return nil +} + +func (store *TarantoolStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.InsertEntry(ctx, entry) +} + +func (store *TarantoolStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { + dir, name := fullpath.DirAndName() + + findEntryGetOpts := crud.GetOpts{ + Fields: crud.MakeOptTuple([]interface{}{"data"}), + Mode: crud.MakeOptString("read"), + PreferReplica: crud.MakeOptBool(true), + Balance: crud.MakeOptBool(true), + } + + req := crud.MakeGetRequest(tarantoolSpaceName). + Key(crud.Tuple([]interface{}{dir, name})). + Opts(findEntryGetOpts) + + resp := crud.Result{} + + err = store.pool.Do(req, pool.PreferRO).GetTyped(&resp) + if err != nil { + return nil, err + } + + results, ok := resp.Rows.([]interface{}) + if !ok || len(results) != 1 { + return nil, filer_pb.ErrNotFound + } + + rows, ok := results[0].([]interface{}) + if !ok || len(rows) != 1 { + return nil, filer_pb.ErrNotFound + } + + row, ok := rows[0].(string) + if !ok { + return nil, fmt.Errorf("Can't convert rows[0] field to string. Actual type: %v, value: %v", reflect.TypeOf(rows[0]), rows[0]) + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + + err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData([]byte(row))) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *TarantoolStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { + dir, name := fullpath.DirAndName() + + delOpts := crud.DeleteOpts{ + Noreturn: crud.MakeOptBool(true), + } + + req := crud.MakeDeleteRequest(tarantoolSpaceName). + Key(crud.Tuple([]interface{}{dir, name})). + Opts(delOpts) + + if _, err := store.pool.Do(req, pool.RW).Get(); err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *TarantoolStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { + req := tarantool.NewCallRequest("filer_metadata.delete_by_directory_idx"). + Args([]interface{}{fullpath}) + + if _, err := store.pool.Do(req, pool.RW).Get(); err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *TarantoolStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *TarantoolStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + + req := tarantool.NewCallRequest("filer_metadata.find_by_directory_idx_and_name"). + Args([]interface{}{string(dirPath), startFileName, includeStartFile, limit}) + + results, err := store.pool.Do(req, pool.PreferRO).Get() + if err != nil { + return + } + + if len(results) < 1 { + glog.Errorf("Can't find results, data is empty") + return + } + + rows, ok := results[0].([]interface{}) + if !ok { + glog.Errorf("Can't convert results[0] to list") + return + } + + for _, result := range rows { + row, ok := result.([]interface{}) + if !ok { + glog.Errorf("Can't convert result to list") + return + } + + if len(row) < 5 { + glog.Errorf("Length of result is less than needed: %v", len(row)) + return + } + + nameRaw := row[2] + name, ok := nameRaw.(string) + if !ok { + glog.Errorf("Can't convert name field to string. Actual type: %v, value: %v", reflect.TypeOf(nameRaw), nameRaw) + return + } + + dataRaw := row[4] + data, ok := dataRaw.(string) + if !ok { + glog.Errorf("Can't convert data field to string. Actual type: %v, value: %v", reflect.TypeOf(dataRaw), dataRaw) + return + } + + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(dirPath), name), + } + lastFileName = name + if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + if !eachEntryFunc(entry) { + break + } + } + + return lastFileName, err +} + +func (store *TarantoolStore) Shutdown() { + store.pool.Close() +} diff --git a/weed/filer/tarantool/tarantool_store_kv.go b/weed/filer/tarantool/tarantool_store_kv.go new file mode 100644 index 000000000..e9f0f4dd0 --- /dev/null +++ b/weed/filer/tarantool/tarantool_store_kv.go @@ -0,0 +1,95 @@ +//go:build tarantool +// +build tarantool + +package tarantool + +import ( + "context" + "fmt" + "reflect" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/tarantool/go-tarantool/v2/crud" + "github.com/tarantool/go-tarantool/v2/pool" +) + +const ( + tarantoolKVSpaceName = "key_value" +) + +func (store *TarantoolStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + + var operations = []crud.Operation{ + { + Operator: crud.Insert, + Field: "value", + Value: string(value), + }, + } + + req := crud.MakeUpsertRequest(tarantoolKVSpaceName). + Tuple([]interface{}{string(key), nil, string(value)}). + Operations(operations) + + ret := crud.Result{} + if err := store.pool.Do(req, pool.RW).GetTyped(&ret); err != nil { + return fmt.Errorf("kv put: %v", err) + } + + return nil +} + +func (store *TarantoolStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + + getOpts := crud.GetOpts{ + Fields: crud.MakeOptTuple([]interface{}{"value"}), + Mode: crud.MakeOptString("read"), + PreferReplica: crud.MakeOptBool(true), + Balance: crud.MakeOptBool(true), + } + + req := crud.MakeGetRequest(tarantoolKVSpaceName). + Key(crud.Tuple([]interface{}{string(key)})). + Opts(getOpts) + + resp := crud.Result{} + + err = store.pool.Do(req, pool.PreferRO).GetTyped(&resp) + if err != nil { + return nil, err + } + + results, ok := resp.Rows.([]interface{}) + if !ok || len(results) != 1 { + return nil, filer.ErrKvNotFound + } + + rows, ok := results[0].([]interface{}) + if !ok || len(rows) != 1 { + return nil, filer.ErrKvNotFound + } + + row, ok := rows[0].(string) + if !ok { + return nil, fmt.Errorf("Can't convert rows[0] field to string. Actual type: %v, value: %v", reflect.TypeOf(rows[0]), rows[0]) + } + + return []byte(row), nil +} + +func (store *TarantoolStore) KvDelete(ctx context.Context, key []byte) (err error) { + + delOpts := crud.DeleteOpts{ + Noreturn: crud.MakeOptBool(true), + } + + req := crud.MakeDeleteRequest(tarantoolKVSpaceName). + Key(crud.Tuple([]interface{}{string(key)})). + Opts(delOpts) + + if _, err := store.pool.Do(req, pool.RW).Get(); err != nil { + return fmt.Errorf("kv delete: %v", err) + } + + return nil +} diff --git a/weed/filer/tarantool/tarantool_store_test.go b/weed/filer/tarantool/tarantool_store_test.go new file mode 100644 index 000000000..500289773 --- /dev/null +++ b/weed/filer/tarantool/tarantool_store_test.go @@ -0,0 +1,24 @@ +//go:build tarantool +// +build tarantool + +package tarantool + +import ( + "os" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer/store_test" +) + +func TestStore(t *testing.T) { + // run "make test_tarantool" under docker folder. + // to set up local env + if os.Getenv("RUN_TARANTOOL_TESTS") != "1" { + t.Skip("Tarantool tests are disabled. Set RUN_TARANTOOL_TESTS=1 to enable.") + } + store := &TarantoolStore{} + addresses := []string{"127.0.1:3303"} + store.initialize(addresses, "client", "client", 5*time.Second, 1000) + store_test.TestFilerStore(t, store) +} |
