diff options
Diffstat (limited to 'weed/filer/tarantool/tarantool_store.go')
| -rw-r--r-- | weed/filer/tarantool/tarantool_store.go | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/weed/filer/tarantool/tarantool_store.go b/weed/filer/tarantool/tarantool_store.go new file mode 100644 index 000000000..8d19db60d --- /dev/null +++ b/weed/filer/tarantool/tarantool_store.go @@ -0,0 +1,318 @@ +//go:build tarantool +// +build tarantool + +package tarantool + +import ( + "context" + "fmt" + "reflect" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + weed_util "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/tarantool/go-tarantool/v2" + "github.com/tarantool/go-tarantool/v2/crud" + "github.com/tarantool/go-tarantool/v2/pool" +) + +const ( + tarantoolSpaceName = "filer_metadata" +) + +func init() { + filer.Stores = append(filer.Stores, &TarantoolStore{}) +} + +type TarantoolStore struct { + pool *pool.ConnectionPool +} + +func (store *TarantoolStore) GetName() string { + return "tarantool" +} + +func (store *TarantoolStore) Initialize(configuration weed_util.Configuration, prefix string) error { + + configuration.SetDefault(prefix+"address", "localhost:3301") + configuration.SetDefault(prefix+"user", "guest") + configuration.SetDefault(prefix+"password", "") + configuration.SetDefault(prefix+"timeout", "5s") + configuration.SetDefault(prefix+"maxReconnects", "1000") + + address := configuration.GetString(prefix + "address") + user := configuration.GetString(prefix + "user") + password := configuration.GetString(prefix + "password") + + timeoutStr := configuration.GetString(prefix + "timeout") + timeout, err := time.ParseDuration(timeoutStr) + if err != nil { + return fmt.Errorf("parse tarantool store timeout: %v", err) + } + + maxReconnects := configuration.GetInt(prefix + "maxReconnects") + if maxReconnects < 0 { + return fmt.Errorf("maxReconnects is negative") + } + + addresses := strings.Split(address, ",") + + return store.initialize(addresses, user, password, timeout, uint(maxReconnects)) +} + +func (store *TarantoolStore) initialize(addresses []string, user string, password string, timeout time.Duration, maxReconnects uint) error { + + opts := tarantool.Opts{ + Timeout: timeout, + Reconnect: time.Second, + MaxReconnects: maxReconnects, + } + + poolInstances := makePoolInstances(addresses, user, password, opts) + poolOpts := pool.Opts{ + CheckTimeout: time.Second, + } + + ctx := context.Background() + p, err := pool.ConnectWithOpts(ctx, poolInstances, poolOpts) + if err != nil { + return fmt.Errorf("Can't create connection pool: %v", err) + } + + _, err = p.Do(tarantool.NewPingRequest(), pool.ANY).Get() + if err != nil { + return err + } + + store.pool = p + + return nil +} + +func makePoolInstances(addresses []string, user string, password string, opts tarantool.Opts) []pool.Instance { + poolInstances := make([]pool.Instance, 0, len(addresses)) + for i, address := range addresses { + poolInstances = append(poolInstances, makePoolInstance(address, user, password, opts, i)) + } + return poolInstances +} + +func makePoolInstance(address string, user string, password string, opts tarantool.Opts, serial int) pool.Instance { + return pool.Instance{ + Name: fmt.Sprintf("instance%d", serial), + Dialer: tarantool.NetDialer{ + Address: address, + User: user, + Password: password, + }, + Opts: opts, + } +} + +func (store *TarantoolStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} + +func (store *TarantoolStore) CommitTransaction(ctx context.Context) error { + return nil +} + +func (store *TarantoolStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *TarantoolStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + dir, name := entry.FullPath.DirAndName() + meta, err := entry.EncodeAttributesAndChunks() + if err != nil { + return fmt.Errorf("encode %s: %s", entry.FullPath, err) + } + + if len(entry.GetChunks()) > filer.CountEntryChunksForGzip { + meta = util.MaybeGzipData(meta) + } + + var ttl int64 + if entry.TtlSec > 0 { + ttl = time.Now().Unix() + int64(entry.TtlSec) + } else { + ttl = 0 + } + + var operations = []crud.Operation{ + { + Operator: crud.Insert, + Field: "data", + Value: string(meta), + }, + } + + req := crud.MakeUpsertRequest(tarantoolSpaceName). + Tuple([]interface{}{dir, nil, name, ttl, string(meta)}). + Operations(operations) + + ret := crud.Result{} + + if err := store.pool.Do(req, pool.RW).GetTyped(&ret); err != nil { + return fmt.Errorf("insert %s: %s", entry.FullPath, err) + } + + return nil +} + +func (store *TarantoolStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.InsertEntry(ctx, entry) +} + +func (store *TarantoolStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { + dir, name := fullpath.DirAndName() + + findEntryGetOpts := crud.GetOpts{ + Fields: crud.MakeOptTuple([]interface{}{"data"}), + Mode: crud.MakeOptString("read"), + PreferReplica: crud.MakeOptBool(true), + Balance: crud.MakeOptBool(true), + } + + req := crud.MakeGetRequest(tarantoolSpaceName). + Key(crud.Tuple([]interface{}{dir, name})). + Opts(findEntryGetOpts) + + resp := crud.Result{} + + err = store.pool.Do(req, pool.PreferRO).GetTyped(&resp) + if err != nil { + return nil, err + } + + results, ok := resp.Rows.([]interface{}) + if !ok || len(results) != 1 { + return nil, filer_pb.ErrNotFound + } + + rows, ok := results[0].([]interface{}) + if !ok || len(rows) != 1 { + return nil, filer_pb.ErrNotFound + } + + row, ok := rows[0].(string) + if !ok { + return nil, fmt.Errorf("Can't convert rows[0] field to string. Actual type: %v, value: %v", reflect.TypeOf(rows[0]), rows[0]) + } + + entry = &filer.Entry{ + FullPath: fullpath, + } + + err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData([]byte(row))) + if err != nil { + return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) + } + + return entry, nil +} + +func (store *TarantoolStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { + dir, name := fullpath.DirAndName() + + delOpts := crud.DeleteOpts{ + Noreturn: crud.MakeOptBool(true), + } + + req := crud.MakeDeleteRequest(tarantoolSpaceName). + Key(crud.Tuple([]interface{}{dir, name})). + Opts(delOpts) + + if _, err := store.pool.Do(req, pool.RW).Get(); err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *TarantoolStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { + req := tarantool.NewCallRequest("filer_metadata.delete_by_directory_idx"). + Args([]interface{}{fullpath}) + + if _, err := store.pool.Do(req, pool.RW).Get(); err != nil { + return fmt.Errorf("delete %s : %v", fullpath, err) + } + + return nil +} + +func (store *TarantoolStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *TarantoolStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + + req := tarantool.NewCallRequest("filer_metadata.find_by_directory_idx_and_name"). + Args([]interface{}{string(dirPath), startFileName, includeStartFile, limit}) + + results, err := store.pool.Do(req, pool.PreferRO).Get() + if err != nil { + return + } + + if len(results) < 1 { + glog.Errorf("Can't find results, data is empty") + return + } + + rows, ok := results[0].([]interface{}) + if !ok { + glog.Errorf("Can't convert results[0] to list") + return + } + + for _, result := range rows { + row, ok := result.([]interface{}) + if !ok { + glog.Errorf("Can't convert result to list") + return + } + + if len(row) < 5 { + glog.Errorf("Length of result is less than needed: %v", len(row)) + return + } + + nameRaw := row[2] + name, ok := nameRaw.(string) + if !ok { + glog.Errorf("Can't convert name field to string. Actual type: %v, value: %v", reflect.TypeOf(nameRaw), nameRaw) + return + } + + dataRaw := row[4] + data, ok := dataRaw.(string) + if !ok { + glog.Errorf("Can't convert data field to string. Actual type: %v, value: %v", reflect.TypeOf(dataRaw), dataRaw) + return + } + + entry := &filer.Entry{ + FullPath: util.NewFullPath(string(dirPath), name), + } + lastFileName = name + if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))); decodeErr != nil { + err = decodeErr + glog.V(0).Infof("list %s : %v", entry.FullPath, err) + break + } + if !eachEntryFunc(entry) { + break + } + } + + return lastFileName, err +} + +func (store *TarantoolStore) Shutdown() { + store.pool.Close() +} |
