diff options
Diffstat (limited to 'weed/filer/abstract_sql')
| -rw-r--r-- | weed/filer/abstract_sql/abstract_sql_store.go | 194 | ||||
| -rw-r--r-- | weed/filer/abstract_sql/abstract_sql_store_kv.go | 23 |
2 files changed, 178 insertions, 39 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index da104358b..91b0bc98f 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -9,19 +9,33 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "strings" + "sync" ) +type SqlGenerator interface { + GetSqlInsert(bucket string) string + GetSqlUpdate(bucket string) string + GetSqlFind(bucket string) string + GetSqlDelete(bucket string) string + GetSqlDeleteFolderChildren(bucket string) string + GetSqlListExclusive(bucket string) string + GetSqlListInclusive(bucket string) string + GetSqlCreateTable(bucket string) string + GetSqlDropTable(bucket string) string +} + type AbstractSqlStore struct { - DB *sql.DB - SqlInsert string - SqlUpdate string - SqlFind string - SqlDelete string - SqlDeleteFolderChildren string - SqlListExclusive string - SqlListInclusive string + SqlGenerator + DB *sql.DB + SupportBucketTable bool + dbs map[string]bool + dbsLock sync.Mutex } +const ( + DEFAULT_TABLE = "filemeta" +) + type TxOrDB interface { ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row @@ -52,16 +66,65 @@ func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error { return nil } -func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB { +func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) { + + shortPath = fullpath + bucket = DEFAULT_TABLE + if tx, ok := ctx.Value("tx").(*sql.Tx); ok { - return tx + txOrDB = tx + } else { + txOrDB = store.DB + } + + if !store.SupportBucketTable { + return + } + + if !strings.HasPrefix(string(fullpath), "/buckets/") { + return + } + + // detect bucket + bucketAndObjectKey := string(fullpath)[len("/buckets/"):] + t := strings.Index(bucketAndObjectKey, "/") + if t < 0 && !isForChildren { + return + } + bucket = bucketAndObjectKey + shortPath = "/" + if t > 0 { + bucket = bucketAndObjectKey[:t] + shortPath = util.FullPath(bucketAndObjectKey[t:]) + } + + if isValidBucket(bucket) { + store.dbsLock.Lock() + defer store.dbsLock.Unlock() + + if store.dbs == nil { + store.dbs = make(map[string]bool) + } + + if _, found := store.dbs[bucket]; !found { + if err = store.CreateTable(ctx, bucket); err != nil { + store.dbs[bucket] = true + } + } + } - return store.DB + + return } func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { - dir, name := entry.FullPath.DirAndName() + db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) + if err != nil { + return fmt.Errorf("findDB %s : %v", entry.FullPath, err) + } + + dir, name := shortPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() if err != nil { return fmt.Errorf("encode %s: %s", entry.FullPath, err) @@ -71,7 +134,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent meta = util.MaybeGzipData(meta) } - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta) + res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta) if err == nil { return } @@ -84,7 +147,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent // now the insert failed possibly due to duplication constraints glog.V(1).Infof("insert %s falls back to update: %v", entry.FullPath, err) - res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir) + res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir) if err != nil { return fmt.Errorf("upsert %s: %s", entry.FullPath, err) } @@ -99,13 +162,18 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { - dir, name := entry.FullPath.DirAndName() + db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false) + if err != nil { + return fmt.Errorf("findDB %s : %v", entry.FullPath, err) + } + + dir, name := shortPath.DirAndName() meta, err := entry.EncodeAttributesAndChunks() if err != nil { return fmt.Errorf("encode %s: %s", entry.FullPath, err) } - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir) + res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir) if err != nil { return fmt.Errorf("update %s: %s", entry.FullPath, err) } @@ -119,8 +187,13 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Ent func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) { - dir, name := fullpath.DirAndName() - row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir) + db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false) + if err != nil { + return nil, fmt.Errorf("findDB %s : %v", fullpath, err) + } + + dir, name := shortPath.DirAndName() + row := db.QueryRowContext(ctx, store.GetSqlFind(bucket), util.HashStringToLong(dir), name, dir) var data []byte if err := row.Scan(&data); err != nil { @@ -142,9 +215,14 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error { - dir, name := fullpath.DirAndName() + db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false) + if err != nil { + return fmt.Errorf("findDB %s : %v", fullpath, err) + } + + dir, name := shortPath.DirAndName() - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, util.HashStringToLong(dir), name, dir) + res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir) if err != nil { return fmt.Errorf("delete %s: %s", fullpath, err) } @@ -159,7 +237,23 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.Fu func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error { - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath) + db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true) + if err != nil { + return fmt.Errorf("findDB %s : %v", fullpath, err) + } + + if isValidBucket(bucket) && shortPath == "/" { + if err = store.deleteTable(ctx, bucket); err == nil { + store.dbsLock.Lock() + delete(store.dbs, bucket) + store.dbsLock.Unlock() + return nil + } else { + return err + } + } + + res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), fullpath) if err != nil { return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err) } @@ -172,15 +266,21 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat return nil } -func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { - sqlText := store.SqlListExclusive - if inclusive { - sqlText = store.SqlListInclusive +func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + + db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true) + if err != nil { + return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err) + } + + sqlText := store.GetSqlListExclusive(bucket) + if includeStartFile { + sqlText = store.GetSqlListInclusive(bucket) } - rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), prefix+"%", limit) + rows, err := db.QueryContext(ctx, sqlText, util.HashStringToLong(string(shortPath)), startFileName, string(shortPath), prefix+"%", limit+1) if err != nil { - return nil, fmt.Errorf("list %s : %v", fullpath, err) + return lastFileName, fmt.Errorf("list %s : %v", dirPath, err) } defer rows.Close() @@ -188,28 +288,52 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, var name string var data []byte if err = rows.Scan(&name, &data); err != nil { - glog.V(0).Infof("scan %s : %v", fullpath, err) - return nil, fmt.Errorf("scan %s: %v", fullpath, err) + glog.V(0).Infof("scan %s : %v", dirPath, err) + return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err) } + lastFileName = name entry := &filer.Entry{ - FullPath: util.NewFullPath(string(fullpath), name), + FullPath: util.NewFullPath(string(dirPath), name), } if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err) - return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) + return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) + } + + if !eachEntryFunc(entry) { + break } - entries = append(entries, entry) } - return entries, nil + return lastFileName, nil } -func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) { - return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "") +func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil) } func (store *AbstractSqlStore) Shutdown() { store.DB.Close() } + +func isValidBucket(bucket string) bool { + return bucket != DEFAULT_TABLE && bucket != "" +} + +func (store *AbstractSqlStore) CreateTable(ctx context.Context, bucket string) error { + if !store.SupportBucketTable { + return nil + } + _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlCreateTable(bucket)) + return err +} + +func (store *AbstractSqlStore) deleteTable(ctx context.Context, bucket string) error { + if !store.SupportBucketTable { + return nil + } + _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlDropTable(bucket)) + return err +} diff --git a/weed/filer/abstract_sql/abstract_sql_store_kv.go b/weed/filer/abstract_sql/abstract_sql_store_kv.go index 792a45ff4..03b016c76 100644 --- a/weed/filer/abstract_sql/abstract_sql_store_kv.go +++ b/weed/filer/abstract_sql/abstract_sql_store_kv.go @@ -13,9 +13,14 @@ import ( func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + db, _, _, err := store.getTxOrDB(ctx, "", false) + if err != nil { + return fmt.Errorf("findDB: %v", err) + } + dirStr, dirHash, name := genDirAndName(key) - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, dirHash, name, dirStr, value) + res, err := db.ExecContext(ctx, store.GetSqlInsert(DEFAULT_TABLE), dirHash, name, dirStr, value) if err == nil { return } @@ -28,7 +33,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by // now the insert failed possibly due to duplication constraints glog.V(1).Infof("kv insert falls back to update: %s", err) - res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, value, dirHash, name, dirStr) + res, err = db.ExecContext(ctx, store.GetSqlUpdate(DEFAULT_TABLE), value, dirHash, name, dirStr) if err != nil { return fmt.Errorf("kv upsert: %s", err) } @@ -43,8 +48,13 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + db, _, _, err := store.getTxOrDB(ctx, "", false) + if err != nil { + return nil, fmt.Errorf("findDB: %v", err) + } + dirStr, dirHash, name := genDirAndName(key) - row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, dirHash, name, dirStr) + row := db.QueryRowContext(ctx, store.GetSqlFind(DEFAULT_TABLE), dirHash, name, dirStr) err = row.Scan(&value) @@ -61,9 +71,14 @@ func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []b func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err error) { + db, _, _, err := store.getTxOrDB(ctx, "", false) + if err != nil { + return fmt.Errorf("findDB: %v", err) + } + dirStr, dirHash, name := genDirAndName(key) - res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, dirHash, name, dirStr) + res, err := db.ExecContext(ctx, store.GetSqlDelete(DEFAULT_TABLE), dirHash, name, dirStr) if err != nil { return fmt.Errorf("kv delete: %s", err) } |
