diff options
Diffstat (limited to 'weed/filer2')
| -rw-r--r-- | weed/filer2/abstract_sql/abstract_sql_store.go | 44 | ||||
| -rw-r--r-- | weed/filer2/cassandra/cassandra_store.go | 10 | ||||
| -rw-r--r-- | weed/filer2/filer.go | 12 | ||||
| -rw-r--r-- | weed/filer2/filerstore.go | 4 | ||||
| -rw-r--r-- | weed/filer2/fullpath.go | 13 | ||||
| -rw-r--r-- | weed/filer2/leveldb/leveldb_store.go | 10 | ||||
| -rw-r--r-- | weed/filer2/memdb/memdb_store.go | 10 | ||||
| -rw-r--r-- | weed/filer2/redis/universal_redis_store.go | 10 |
8 files changed, 104 insertions, 9 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go index 95ce9cb9f..9a3ee51c3 100644 --- a/weed/filer2/abstract_sql/abstract_sql_store.go +++ b/weed/filer2/abstract_sql/abstract_sql_store.go @@ -19,6 +19,40 @@ type AbstractSqlStore struct { SqlListInclusive string } +type TxOrDB interface { + ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) + QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row + QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) +} + +func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) { + tx, err := store.DB.BeginTx(ctx, nil) + if err != nil { + return ctx, err + } + + return context.WithValue(ctx, "tx", tx), nil +} +func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(*sql.Tx); ok { + return tx.Commit() + } + return nil +} +func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error { + if tx, ok := ctx.Value("tx").(*sql.Tx); ok { + return tx.Rollback() + } + return nil +} + +func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB { + if tx, ok := ctx.Value("tx").(*sql.Tx); ok { + return tx + } + return store.DB +} + func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { dir, name := entry.FullPath.DirAndName() @@ -27,7 +61,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.En return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - res, err := store.DB.Exec(store.SqlInsert, hashToLong(dir), name, dir, meta) + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, hashToLong(dir), name, dir, meta) if err != nil { return fmt.Errorf("insert %s: %s", entry.FullPath, err) } @@ -47,7 +81,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - res, err := store.DB.Exec(store.SqlUpdate, meta, hashToLong(dir), name, dir) + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, hashToLong(dir), name, dir) if err != nil { return fmt.Errorf("update %s: %s", entry.FullPath, err) } @@ -62,7 +96,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) { dir, name := fullpath.DirAndName() - row := store.DB.QueryRow(store.SqlFind, hashToLong(dir), name, dir) + row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, hashToLong(dir), name, dir) var data []byte if err := row.Scan(&data); err != nil { return nil, filer2.ErrNotFound @@ -82,7 +116,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2. dir, name := fullpath.DirAndName() - res, err := store.DB.Exec(store.SqlDelete, hashToLong(dir), name, dir) + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, hashToLong(dir), name, dir) if err != nil { return fmt.Errorf("delete %s: %s", fullpath, err) } @@ -102,7 +136,7 @@ func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpat sqlText = store.SqlListInclusive } - rows, err := store.DB.Query(sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit) + rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit) if err != nil { return nil, fmt.Errorf("list %s : %v", fullpath, err) } diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go index e14a9e023..72680b5e1 100644 --- a/weed/filer2/cassandra/cassandra_store.go +++ b/weed/filer2/cassandra/cassandra_store.go @@ -40,6 +40,16 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string) (err er return } +func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error){ + return ctx, nil +} +func (store *CassandraStore) CommitTransaction(ctx context.Context) error{ + return nil +} +func (store *CassandraStore) RollbackTransaction(ctx context.Context) error{ + return nil +} + func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { dir, name := entry.FullPath.DirAndName() diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 4220e24d3..06c26abb4 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -57,6 +57,18 @@ func (fs *Filer) KeepConnectedToMaster() { fs.MasterClient.KeepConnectedToMaster() } +func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) { + return f.store.BeginTransaction(ctx) +} + +func (f *Filer) CommitTransaction(ctx context.Context) error { + return f.store.CommitTransaction(ctx) +} + +func (f *Filer) RollbackTransaction(ctx context.Context) error { + return f.store.RollbackTransaction(ctx) +} + func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error { if string(entry.FullPath) == "/" { diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go index c10074eb2..0b256e56e 100644 --- a/weed/filer2/filerstore.go +++ b/weed/filer2/filerstore.go @@ -17,6 +17,10 @@ type FilerStore interface { FindEntry(context.Context, FullPath) (entry *Entry, err error) DeleteEntry(context.Context, FullPath) (err error) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) + + BeginTransaction(ctx context.Context) (context.Context, error) + CommitTransaction(ctx context.Context) error + RollbackTransaction(ctx context.Context) error } var ErrNotFound = errors.New("filer: no entry is found in filer store") diff --git a/weed/filer2/fullpath.go b/weed/filer2/fullpath.go index be6e34431..191e51cf3 100644 --- a/weed/filer2/fullpath.go +++ b/weed/filer2/fullpath.go @@ -8,10 +8,7 @@ import ( type FullPath string func NewFullPath(dir, name string) FullPath { - if strings.HasSuffix(dir, "/") { - return FullPath(dir + name) - } - return FullPath(dir + "/" + name) + return FullPath(dir).Child(name) } func (fp FullPath) DirAndName() (string, string) { @@ -29,3 +26,11 @@ func (fp FullPath) Name() string { _, name := filepath.Split(string(fp)) return name } + +func (fp FullPath) Child(name string) FullPath { + dir := string(fp) + if strings.HasSuffix(dir, "/") { + return FullPath(dir + name) + } + return FullPath(dir + "/" + name) +} diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go index 60de11565..97df9163b 100644 --- a/weed/filer2/leveldb/leveldb_store.go +++ b/weed/filer2/leveldb/leveldb_store.go @@ -46,6 +46,16 @@ func (store *LevelDBStore) initialize(dir string) (err error) { return } +func (store *LevelDBStore) BeginTransaction(ctx context.Context) (context.Context, error){ + return ctx, nil +} +func (store *LevelDBStore) CommitTransaction(ctx context.Context) error{ + return nil +} +func (store *LevelDBStore) RollbackTransaction(ctx context.Context) error{ + return nil +} + func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { key := genKey(entry.DirAndName()) diff --git a/weed/filer2/memdb/memdb_store.go b/weed/filer2/memdb/memdb_store.go index d4c906f2d..811c87440 100644 --- a/weed/filer2/memdb/memdb_store.go +++ b/weed/filer2/memdb/memdb_store.go @@ -34,6 +34,16 @@ func (store *MemDbStore) Initialize(configuration util.Configuration) (err error return nil } +func (store *MemDbStore) BeginTransaction(ctx context.Context) (context.Context, error){ + return ctx, nil +} +func (store *MemDbStore) CommitTransaction(ctx context.Context) error{ + return nil +} +func (store *MemDbStore) RollbackTransaction(ctx context.Context) error{ + return nil +} + func (store *MemDbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { // println("inserting", entry.FullPath) store.tree.ReplaceOrInsert(entryItem{entry}) diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go index ec78f70e7..72e9ce8b3 100644 --- a/weed/filer2/redis/universal_redis_store.go +++ b/weed/filer2/redis/universal_redis_store.go @@ -19,6 +19,16 @@ type UniversalRedisStore struct { Client redis.UniversalClient } +func (store *UniversalRedisStore) BeginTransaction(ctx context.Context) (context.Context, error){ + return ctx, nil +} +func (store *UniversalRedisStore) CommitTransaction(ctx context.Context) error{ + return nil +} +func (store *UniversalRedisStore) RollbackTransaction(ctx context.Context) error{ + return nil +} + func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) { value, err := entry.EncodeAttributesAndChunks() |
