aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/abstract_sql
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-01-19 13:53:16 -0800
committerChris Lu <chris.lu@gmail.com>2021-01-19 13:53:16 -0800
commit4c5b752b040bbbee34fdc1db61fe6e210fb11961 (patch)
tree179a33da743be436b38f7ec5dd5c9562cd40898e /weed/filer/abstract_sql
parent96354208c57c69b05a9c94fc26ae13860f1f5008 (diff)
downloadseaweedfs-4c5b752b040bbbee34fdc1db61fe6e210fb11961.tar.xz
seaweedfs-4c5b752b040bbbee34fdc1db61fe6e210fb11961.zip
restructuring sql stores
Diffstat (limited to 'weed/filer/abstract_sql')
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go113
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store_kv.go8
2 files changed, 89 insertions, 32 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index 9aafd448e..dd35112a2 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -9,21 +9,28 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"strings"
+ "sync"
)
+type SqlGenerator interface {
+ GetSqlInsert(bucket string) string
+ GetSqlUpdate(bucket string) string
+ GetSqlFind(bucket string) string
+ GetSqlDelete(bucket string) string
+ GetSqlDeleteFolderChildren(bucket string) string
+ GetSqlListExclusive(bucket string) string
+ GetSqlListInclusive(bucket string) string
+}
+
type AbstractSqlStore struct {
- DB *sql.DB
- SqlInsert string
- SqlUpdate string
- SqlFind string
- SqlDelete string
- SqlDeleteFolderChildren string
- SqlListExclusive string
- SqlListInclusive string
+ DB *sql.DB
+ SqlGenerator
+ dbs map[string]bool
+ dbsLock sync.Mutex
}
const (
- DEFAULT = "_main"
+ DEFAULT_TABLE = "filemeta"
)
type TxOrDB interface {
@@ -57,16 +64,53 @@ func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
}
func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) {
+
shortPath = fullpath
+ bucket = DEFAULT_TABLE
+
if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
- return tx, bucket, shortPath, err
+ txOrDB = tx
+ } else {
+ txOrDB = store.DB
+ }
+
+ if strings.HasPrefix(string(fullpath), "/buckets/") {
+ return
}
- return store.DB, bucket, shortPath, err
+
+ // detect bucket
+ bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
+ t := strings.Index(bucketAndObjectKey, "/")
+ if t < 0 && !isForChildren {
+ return
+ }
+ if t > 0 {
+ bucket = bucketAndObjectKey[:t]
+ shortPath = util.FullPath(bucketAndObjectKey[t:])
+ }
+
+ if isValidBucket(bucket) {
+ store.dbsLock.Lock()
+ defer store.dbsLock.Unlock()
+
+ if store.dbs == nil {
+ store.dbs = make(map[string]bool)
+ }
+
+ if _, found := store.dbs[bucket]; !found {
+ if err = store.createTable(bucket); err != nil {
+ store.dbs[bucket] = true
+ }
+ }
+
+ }
+
+ return
}
func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
- db, _, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
if err != nil {
return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
}
@@ -81,7 +125,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
meta = util.MaybeGzipData(meta)
}
- res, err := db.ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta)
+ res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta)
if err == nil {
return
}
@@ -94,7 +138,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
// now the insert failed possibly due to duplication constraints
glog.V(1).Infof("insert %s falls back to update: %v", entry.FullPath, err)
- res, err = db.ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
+ res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("upsert %s: %s", entry.FullPath, err)
}
@@ -109,7 +153,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
- db, _, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
if err != nil {
return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
}
@@ -120,7 +164,7 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Ent
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- res, err := db.ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
+ res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("update %s: %s", entry.FullPath, err)
}
@@ -134,13 +178,13 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Ent
func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
- db, _, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
if err != nil {
return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
}
dir, name := shortPath.DirAndName()
- row := db.QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir)
+ row := db.QueryRowContext(ctx, store.GetSqlFind(bucket), util.HashStringToLong(dir), name, dir)
var data []byte
if err := row.Scan(&data); err != nil {
@@ -162,14 +206,14 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full
func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
- db, _, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
if err != nil {
return fmt.Errorf("findDB %s : %v", fullpath, err)
}
dir, name := shortPath.DirAndName()
- res, err := db.ExecContext(ctx, store.SqlDelete, util.HashStringToLong(dir), name, dir)
+ res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("delete %s: %s", fullpath, err)
}
@@ -189,12 +233,16 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
return fmt.Errorf("findDB %s : %v", fullpath, err)
}
- if bucket != DEFAULT && shortPath == "/" {
- store.deleteTable(bucket)
- return nil
+ if isValidBucket(bucket) && shortPath == "/" {
+ if store.deleteTable(bucket) {
+ store.dbsLock.Lock()
+ delete(store.dbs, bucket)
+ store.dbsLock.Unlock()
+ return nil
+ }
}
- res, err := db.ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(shortPath)), fullpath)
+ res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), fullpath)
if err != nil {
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
}
@@ -209,14 +257,14 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- db, _, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
if err != nil {
return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
}
- sqlText := store.SqlListExclusive
+ sqlText := store.GetSqlListExclusive(bucket)
if includeStartFile {
- sqlText = store.SqlListInclusive
+ sqlText = store.GetSqlListInclusive(bucket)
}
rows, err := db.QueryContext(ctx, sqlText, util.HashStringToLong(string(shortPath)), startFileName, string(shortPath), prefix+"%", limit+1)
@@ -259,5 +307,14 @@ func (store *AbstractSqlStore) Shutdown() {
store.DB.Close()
}
-func (store *AbstractSqlStore) deleteTable(bucket string) {
+func isValidBucket(bucket string) bool {
+ return bucket != DEFAULT_TABLE && bucket != ""
+}
+
+func (store *AbstractSqlStore) createTable(bucket string) error {
+ return nil
+}
+
+func (store *AbstractSqlStore) deleteTable(bucket string) bool {
+ return false
}
diff --git a/weed/filer/abstract_sql/abstract_sql_store_kv.go b/weed/filer/abstract_sql/abstract_sql_store_kv.go
index 4e56c5db2..03b016c76 100644
--- a/weed/filer/abstract_sql/abstract_sql_store_kv.go
+++ b/weed/filer/abstract_sql/abstract_sql_store_kv.go
@@ -20,7 +20,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
dirStr, dirHash, name := genDirAndName(key)
- res, err := db.ExecContext(ctx, store.SqlInsert, dirHash, name, dirStr, value)
+ res, err := db.ExecContext(ctx, store.GetSqlInsert(DEFAULT_TABLE), dirHash, name, dirStr, value)
if err == nil {
return
}
@@ -33,7 +33,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
// now the insert failed possibly due to duplication constraints
glog.V(1).Infof("kv insert falls back to update: %s", err)
- res, err = db.ExecContext(ctx, store.SqlUpdate, value, dirHash, name, dirStr)
+ res, err = db.ExecContext(ctx, store.GetSqlUpdate(DEFAULT_TABLE), value, dirHash, name, dirStr)
if err != nil {
return fmt.Errorf("kv upsert: %s", err)
}
@@ -54,7 +54,7 @@ func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []b
}
dirStr, dirHash, name := genDirAndName(key)
- row := db.QueryRowContext(ctx, store.SqlFind, dirHash, name, dirStr)
+ row := db.QueryRowContext(ctx, store.GetSqlFind(DEFAULT_TABLE), dirHash, name, dirStr)
err = row.Scan(&value)
@@ -78,7 +78,7 @@ func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err er
dirStr, dirHash, name := genDirAndName(key)
- res, err := db.ExecContext(ctx, store.SqlDelete, dirHash, name, dirStr)
+ res, err := db.ExecContext(ctx, store.GetSqlDelete(DEFAULT_TABLE), dirHash, name, dirStr)
if err != nil {
return fmt.Errorf("kv delete: %s", err)
}