aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
authorChris Lu <chris.lu@uber.com>2019-03-30 23:08:29 -0700
committerChris Lu <chris.lu@uber.com>2019-03-30 23:08:29 -0700
commit97406333a5ecc5b0d2cdaa74ff9901e3100e4bf2 (patch)
tree04cb10ddb0fb87663ba1783a7e82397aa2c9c06f /weed/filer2
parent920b4e56aa76fbf37780363d5b345c2882d311b5 (diff)
downloadseaweedfs-97406333a5ecc5b0d2cdaa74ff9901e3100e4bf2.tar.xz
seaweedfs-97406333a5ecc5b0d2cdaa74ff9901e3100e4bf2.zip
support atomic renaming for mysql/postgres filer store
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/abstract_sql/abstract_sql_store.go44
-rw-r--r--weed/filer2/cassandra/cassandra_store.go10
-rw-r--r--weed/filer2/filer.go12
-rw-r--r--weed/filer2/filerstore.go4
-rw-r--r--weed/filer2/fullpath.go13
-rw-r--r--weed/filer2/leveldb/leveldb_store.go10
-rw-r--r--weed/filer2/memdb/memdb_store.go10
-rw-r--r--weed/filer2/redis/universal_redis_store.go10
8 files changed, 104 insertions, 9 deletions
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go
index 95ce9cb9f..9a3ee51c3 100644
--- a/weed/filer2/abstract_sql/abstract_sql_store.go
+++ b/weed/filer2/abstract_sql/abstract_sql_store.go
@@ -19,6 +19,40 @@ type AbstractSqlStore struct {
SqlListInclusive string
}
+type TxOrDB interface {
+ ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
+ QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
+ QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
+}
+
+func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ tx, err := store.DB.BeginTx(ctx, nil)
+ if err != nil {
+ return ctx, err
+ }
+
+ return context.WithValue(ctx, "tx", tx), nil
+}
+func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
+ return tx.Commit()
+ }
+ return nil
+}
+func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
+ return tx.Rollback()
+ }
+ return nil
+}
+
+func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB {
+ if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
+ return tx
+ }
+ return store.DB
+}
+
func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
dir, name := entry.FullPath.DirAndName()
@@ -27,7 +61,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.En
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- res, err := store.DB.Exec(store.SqlInsert, hashToLong(dir), name, dir, meta)
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, hashToLong(dir), name, dir, meta)
if err != nil {
return fmt.Errorf("insert %s: %s", entry.FullPath, err)
}
@@ -47,7 +81,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- res, err := store.DB.Exec(store.SqlUpdate, meta, hashToLong(dir), name, dir)
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, hashToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("update %s: %s", entry.FullPath, err)
}
@@ -62,7 +96,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.En
func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) {
dir, name := fullpath.DirAndName()
- row := store.DB.QueryRow(store.SqlFind, hashToLong(dir), name, dir)
+ row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, hashToLong(dir), name, dir)
var data []byte
if err := row.Scan(&data); err != nil {
return nil, filer2.ErrNotFound
@@ -82,7 +116,7 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.
dir, name := fullpath.DirAndName()
- res, err := store.DB.Exec(store.SqlDelete, hashToLong(dir), name, dir)
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, hashToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("delete %s: %s", fullpath, err)
}
@@ -102,7 +136,7 @@ func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpat
sqlText = store.SqlListInclusive
}
- rows, err := store.DB.Query(sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit)
+ rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, hashToLong(string(fullpath)), startFileName, string(fullpath), limit)
if err != nil {
return nil, fmt.Errorf("list %s : %v", fullpath, err)
}
diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go
index e14a9e023..72680b5e1 100644
--- a/weed/filer2/cassandra/cassandra_store.go
+++ b/weed/filer2/cassandra/cassandra_store.go
@@ -40,6 +40,16 @@ func (store *CassandraStore) initialize(keyspace string, hosts []string) (err er
return
}
+func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error){
+ return ctx, nil
+}
+func (store *CassandraStore) CommitTransaction(ctx context.Context) error{
+ return nil
+}
+func (store *CassandraStore) RollbackTransaction(ctx context.Context) error{
+ return nil
+}
+
func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
dir, name := entry.FullPath.DirAndName()
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 4220e24d3..06c26abb4 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -57,6 +57,18 @@ func (fs *Filer) KeepConnectedToMaster() {
fs.MasterClient.KeepConnectedToMaster()
}
+func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return f.store.BeginTransaction(ctx)
+}
+
+func (f *Filer) CommitTransaction(ctx context.Context) error {
+ return f.store.CommitTransaction(ctx)
+}
+
+func (f *Filer) RollbackTransaction(ctx context.Context) error {
+ return f.store.RollbackTransaction(ctx)
+}
+
func (f *Filer) CreateEntry(ctx context.Context, entry *Entry) error {
if string(entry.FullPath) == "/" {
diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go
index c10074eb2..0b256e56e 100644
--- a/weed/filer2/filerstore.go
+++ b/weed/filer2/filerstore.go
@@ -17,6 +17,10 @@ type FilerStore interface {
FindEntry(context.Context, FullPath) (entry *Entry, err error)
DeleteEntry(context.Context, FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
+
+ BeginTransaction(ctx context.Context) (context.Context, error)
+ CommitTransaction(ctx context.Context) error
+ RollbackTransaction(ctx context.Context) error
}
var ErrNotFound = errors.New("filer: no entry is found in filer store")
diff --git a/weed/filer2/fullpath.go b/weed/filer2/fullpath.go
index be6e34431..191e51cf3 100644
--- a/weed/filer2/fullpath.go
+++ b/weed/filer2/fullpath.go
@@ -8,10 +8,7 @@ import (
type FullPath string
func NewFullPath(dir, name string) FullPath {
- if strings.HasSuffix(dir, "/") {
- return FullPath(dir + name)
- }
- return FullPath(dir + "/" + name)
+ return FullPath(dir).Child(name)
}
func (fp FullPath) DirAndName() (string, string) {
@@ -29,3 +26,11 @@ func (fp FullPath) Name() string {
_, name := filepath.Split(string(fp))
return name
}
+
+func (fp FullPath) Child(name string) FullPath {
+ dir := string(fp)
+ if strings.HasSuffix(dir, "/") {
+ return FullPath(dir + name)
+ }
+ return FullPath(dir + "/" + name)
+}
diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go
index 60de11565..97df9163b 100644
--- a/weed/filer2/leveldb/leveldb_store.go
+++ b/weed/filer2/leveldb/leveldb_store.go
@@ -46,6 +46,16 @@ func (store *LevelDBStore) initialize(dir string) (err error) {
return
}
+func (store *LevelDBStore) BeginTransaction(ctx context.Context) (context.Context, error){
+ return ctx, nil
+}
+func (store *LevelDBStore) CommitTransaction(ctx context.Context) error{
+ return nil
+}
+func (store *LevelDBStore) RollbackTransaction(ctx context.Context) error{
+ return nil
+}
+
func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
key := genKey(entry.DirAndName())
diff --git a/weed/filer2/memdb/memdb_store.go b/weed/filer2/memdb/memdb_store.go
index d4c906f2d..811c87440 100644
--- a/weed/filer2/memdb/memdb_store.go
+++ b/weed/filer2/memdb/memdb_store.go
@@ -34,6 +34,16 @@ func (store *MemDbStore) Initialize(configuration util.Configuration) (err error
return nil
}
+func (store *MemDbStore) BeginTransaction(ctx context.Context) (context.Context, error){
+ return ctx, nil
+}
+func (store *MemDbStore) CommitTransaction(ctx context.Context) error{
+ return nil
+}
+func (store *MemDbStore) RollbackTransaction(ctx context.Context) error{
+ return nil
+}
+
func (store *MemDbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
// println("inserting", entry.FullPath)
store.tree.ReplaceOrInsert(entryItem{entry})
diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go
index ec78f70e7..72e9ce8b3 100644
--- a/weed/filer2/redis/universal_redis_store.go
+++ b/weed/filer2/redis/universal_redis_store.go
@@ -19,6 +19,16 @@ type UniversalRedisStore struct {
Client redis.UniversalClient
}
+func (store *UniversalRedisStore) BeginTransaction(ctx context.Context) (context.Context, error){
+ return ctx, nil
+}
+func (store *UniversalRedisStore) CommitTransaction(ctx context.Context) error{
+ return nil
+}
+func (store *UniversalRedisStore) RollbackTransaction(ctx context.Context) error{
+ return nil
+}
+
func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
value, err := entry.EncodeAttributesAndChunks()