aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go194
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store_kv.go23
-rw-r--r--weed/filer/cassandra/cassandra_store.go22
-rw-r--r--weed/filer/configuration.go6
-rw-r--r--weed/filer/elastic/v7/elastic_store.go60
-rw-r--r--weed/filer/etcd/etcd_store.go23
-rw-r--r--weed/filer/filechunk_manifest.go7
-rw-r--r--weed/filer/filechunks.go9
-rw-r--r--weed/filer/filer.go147
-rw-r--r--weed/filer/filer_buckets.go4
-rw-r--r--weed/filer/filer_delete_entry.go69
-rw-r--r--weed/filer/filer_notify.go6
-rw-r--r--weed/filer/filer_search.go70
-rw-r--r--weed/filer/filerstore.go6
-rw-r--r--weed/filer/filerstore_translate_path.go24
-rw-r--r--weed/filer/filerstore_wrapper.go62
-rw-r--r--weed/filer/hbase/hbase_store.go19
-rw-r--r--weed/filer/hbase/hbase_store_kv.go3
-rw-r--r--weed/filer/leveldb/leveldb_store.go28
-rw-r--r--weed/filer/leveldb/leveldb_store_test.go33
-rw-r--r--weed/filer/leveldb2/leveldb2_store.go27
-rw-r--r--weed/filer/leveldb2/leveldb2_store_test.go6
-rw-r--r--weed/filer/leveldb3/leveldb3_store.go375
-rw-r--r--weed/filer/leveldb3/leveldb3_store_kv.go46
-rw-r--r--weed/filer/leveldb3/leveldb3_store_test.go88
-rw-r--r--weed/filer/mongodb/mongodb_store.go22
-rw-r--r--weed/filer/mysql/mysql_sql_gen.go52
-rw-r--r--weed/filer/mysql/mysql_store.go23
-rw-r--r--weed/filer/mysql2/mysql2_store.go82
-rw-r--r--weed/filer/postgres/postgres_sql_gen.go53
-rw-r--r--weed/filer/postgres/postgres_store.go18
-rw-r--r--weed/filer/postgres2/postgres2_store.go87
-rw-r--r--weed/filer/reader_at.go8
-rw-r--r--weed/filer/redis/redis_cluster_store.go6
-rw-r--r--weed/filer/redis/redis_store.go2
-rw-r--r--weed/filer/redis/universal_redis_store.go49
-rw-r--r--weed/filer/redis/universal_redis_store_kv.go8
-rw-r--r--weed/filer/redis2/redis_cluster_store.go6
-rw-r--r--weed/filer/redis2/redis_store.go2
-rw-r--r--weed/filer/redis2/universal_redis_store.go51
-rw-r--r--weed/filer/redis2/universal_redis_store_kv.go8
-rw-r--r--weed/filer/rocksdb/README.md41
-rw-r--r--weed/filer/rocksdb/rocksdb_store.go302
-rw-r--r--weed/filer/rocksdb/rocksdb_store_kv.go47
-rw-r--r--weed/filer/rocksdb/rocksdb_store_test.go117
-rw-r--r--weed/filer/rocksdb/rocksdb_ttl.go40
-rw-r--r--weed/filer/stream.go8
47 files changed, 1967 insertions, 422 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index da104358b..91b0bc98f 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -9,19 +9,33 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"strings"
+ "sync"
)
+type SqlGenerator interface {
+ GetSqlInsert(bucket string) string
+ GetSqlUpdate(bucket string) string
+ GetSqlFind(bucket string) string
+ GetSqlDelete(bucket string) string
+ GetSqlDeleteFolderChildren(bucket string) string
+ GetSqlListExclusive(bucket string) string
+ GetSqlListInclusive(bucket string) string
+ GetSqlCreateTable(bucket string) string
+ GetSqlDropTable(bucket string) string
+}
+
type AbstractSqlStore struct {
- DB *sql.DB
- SqlInsert string
- SqlUpdate string
- SqlFind string
- SqlDelete string
- SqlDeleteFolderChildren string
- SqlListExclusive string
- SqlListInclusive string
+ SqlGenerator
+ DB *sql.DB
+ SupportBucketTable bool
+ dbs map[string]bool
+ dbsLock sync.Mutex
}
+const (
+ DEFAULT_TABLE = "filemeta"
+)
+
type TxOrDB interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
@@ -52,16 +66,65 @@ func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
return nil
}
-func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB {
+func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) {
+
+ shortPath = fullpath
+ bucket = DEFAULT_TABLE
+
if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
- return tx
+ txOrDB = tx
+ } else {
+ txOrDB = store.DB
+ }
+
+ if !store.SupportBucketTable {
+ return
+ }
+
+ if !strings.HasPrefix(string(fullpath), "/buckets/") {
+ return
+ }
+
+ // detect bucket
+ bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
+ t := strings.Index(bucketAndObjectKey, "/")
+ if t < 0 && !isForChildren {
+ return
+ }
+ bucket = bucketAndObjectKey
+ shortPath = "/"
+ if t > 0 {
+ bucket = bucketAndObjectKey[:t]
+ shortPath = util.FullPath(bucketAndObjectKey[t:])
+ }
+
+ if isValidBucket(bucket) {
+ store.dbsLock.Lock()
+ defer store.dbsLock.Unlock()
+
+ if store.dbs == nil {
+ store.dbs = make(map[string]bool)
+ }
+
+ if _, found := store.dbs[bucket]; !found {
+ if err = store.CreateTable(ctx, bucket); err != nil {
+ store.dbs[bucket] = true
+ }
+ }
+
}
- return store.DB
+
+ return
}
func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
- dir, name := entry.FullPath.DirAndName()
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
+ if err != nil {
+ return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
+ }
+
+ dir, name := shortPath.DirAndName()
meta, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
@@ -71,7 +134,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
meta = util.MaybeGzipData(meta)
}
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta)
+ res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta)
if err == nil {
return
}
@@ -84,7 +147,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
// now the insert failed possibly due to duplication constraints
glog.V(1).Infof("insert %s falls back to update: %v", entry.FullPath, err)
- res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
+ res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("upsert %s: %s", entry.FullPath, err)
}
@@ -99,13 +162,18 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
- dir, name := entry.FullPath.DirAndName()
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
+ if err != nil {
+ return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
+ }
+
+ dir, name := shortPath.DirAndName()
meta, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
+ res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("update %s: %s", entry.FullPath, err)
}
@@ -119,8 +187,13 @@ func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Ent
func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
- dir, name := fullpath.DirAndName()
- row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir)
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
+ if err != nil {
+ return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
+ }
+
+ dir, name := shortPath.DirAndName()
+ row := db.QueryRowContext(ctx, store.GetSqlFind(bucket), util.HashStringToLong(dir), name, dir)
var data []byte
if err := row.Scan(&data); err != nil {
@@ -142,9 +215,14 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full
func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
- dir, name := fullpath.DirAndName()
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
+ if err != nil {
+ return fmt.Errorf("findDB %s : %v", fullpath, err)
+ }
+
+ dir, name := shortPath.DirAndName()
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, util.HashStringToLong(dir), name, dir)
+ res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir)
if err != nil {
return fmt.Errorf("delete %s: %s", fullpath, err)
}
@@ -159,7 +237,23 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.Fu
func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath)
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true)
+ if err != nil {
+ return fmt.Errorf("findDB %s : %v", fullpath, err)
+ }
+
+ if isValidBucket(bucket) && shortPath == "/" {
+ if err = store.deleteTable(ctx, bucket); err == nil {
+ store.dbsLock.Lock()
+ delete(store.dbs, bucket)
+ store.dbsLock.Unlock()
+ return nil
+ } else {
+ return err
+ }
+ }
+
+ res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), fullpath)
if err != nil {
return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
}
@@ -172,15 +266,21 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
return nil
}
-func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- sqlText := store.SqlListExclusive
- if inclusive {
- sqlText = store.SqlListInclusive
+func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+
+ db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
+ if err != nil {
+ return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
+ }
+
+ sqlText := store.GetSqlListExclusive(bucket)
+ if includeStartFile {
+ sqlText = store.GetSqlListInclusive(bucket)
}
- rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), prefix+"%", limit)
+ rows, err := db.QueryContext(ctx, sqlText, util.HashStringToLong(string(shortPath)), startFileName, string(shortPath), prefix+"%", limit+1)
if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
defer rows.Close()
@@ -188,28 +288,52 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
var name string
var data []byte
if err = rows.Scan(&name, &data); err != nil {
- glog.V(0).Infof("scan %s : %v", fullpath, err)
- return nil, fmt.Errorf("scan %s: %v", fullpath, err)
+ glog.V(0).Infof("scan %s : %v", dirPath, err)
+ return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
}
+ lastFileName = name
entry := &filer.Entry{
- FullPath: util.NewFullPath(string(fullpath), name),
+ FullPath: util.NewFullPath(string(dirPath), name),
}
if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
- return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
+ return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
+ }
+
+ if !eachEntryFunc(entry) {
+ break
}
- entries = append(entries, entry)
}
- return entries, nil
+ return lastFileName, nil
}
-func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
- return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
+func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
}
func (store *AbstractSqlStore) Shutdown() {
store.DB.Close()
}
+
+func isValidBucket(bucket string) bool {
+ return bucket != DEFAULT_TABLE && bucket != ""
+}
+
+func (store *AbstractSqlStore) CreateTable(ctx context.Context, bucket string) error {
+ if !store.SupportBucketTable {
+ return nil
+ }
+ _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlCreateTable(bucket))
+ return err
+}
+
+func (store *AbstractSqlStore) deleteTable(ctx context.Context, bucket string) error {
+ if !store.SupportBucketTable {
+ return nil
+ }
+ _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlDropTable(bucket))
+ return err
+}
diff --git a/weed/filer/abstract_sql/abstract_sql_store_kv.go b/weed/filer/abstract_sql/abstract_sql_store_kv.go
index 792a45ff4..03b016c76 100644
--- a/weed/filer/abstract_sql/abstract_sql_store_kv.go
+++ b/weed/filer/abstract_sql/abstract_sql_store_kv.go
@@ -13,9 +13,14 @@ import (
func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ db, _, _, err := store.getTxOrDB(ctx, "", false)
+ if err != nil {
+ return fmt.Errorf("findDB: %v", err)
+ }
+
dirStr, dirHash, name := genDirAndName(key)
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, dirHash, name, dirStr, value)
+ res, err := db.ExecContext(ctx, store.GetSqlInsert(DEFAULT_TABLE), dirHash, name, dirStr, value)
if err == nil {
return
}
@@ -28,7 +33,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
// now the insert failed possibly due to duplication constraints
glog.V(1).Infof("kv insert falls back to update: %s", err)
- res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, value, dirHash, name, dirStr)
+ res, err = db.ExecContext(ctx, store.GetSqlUpdate(DEFAULT_TABLE), value, dirHash, name, dirStr)
if err != nil {
return fmt.Errorf("kv upsert: %s", err)
}
@@ -43,8 +48,13 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+ db, _, _, err := store.getTxOrDB(ctx, "", false)
+ if err != nil {
+ return nil, fmt.Errorf("findDB: %v", err)
+ }
+
dirStr, dirHash, name := genDirAndName(key)
- row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, dirHash, name, dirStr)
+ row := db.QueryRowContext(ctx, store.GetSqlFind(DEFAULT_TABLE), dirHash, name, dirStr)
err = row.Scan(&value)
@@ -61,9 +71,14 @@ func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []b
func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err error) {
+ db, _, _, err := store.getTxOrDB(ctx, "", false)
+ if err != nil {
+ return fmt.Errorf("findDB: %v", err)
+ }
+
dirStr, dirHash, name := genDirAndName(key)
- res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, dirHash, name, dirStr)
+ res, err := db.ExecContext(ctx, store.GetSqlDelete(DEFAULT_TABLE), dirHash, name, dirStr)
if err != nil {
return fmt.Errorf("kv delete: %s", err)
}
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
index 49f5625d9..fd2ce91a6 100644
--- a/weed/filer/cassandra/cassandra_store.go
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -168,41 +168,43 @@ func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath
return nil
}
-func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer.Entry, err error) {
+func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
+ if _, ok := store.isSuperLargeDirectory(string(dirPath)); ok {
return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing
}
cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
- if inclusive {
+ if includeStartFile {
cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
}
var data []byte
var name string
- iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
+ iter := store.session.Query(cqlStr, string(dirPath), startFileName, limit+1).Iter()
for iter.Scan(&name, &data) {
entry := &filer.Entry{
- FullPath: util.NewFullPath(string(fullpath), name),
+ FullPath: util.NewFullPath(string(dirPath), name),
}
+ lastFileName = name
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
if err := iter.Close(); err != nil {
glog.V(0).Infof("list iterator close: %v", err)
}
- return entries, err
+ return lastFileName, err
}
func (store *CassandraStore) Shutdown() {
diff --git a/weed/filer/configuration.go b/weed/filer/configuration.go
index a6f18709e..9ef2f3e0f 100644
--- a/weed/filer/configuration.go
+++ b/weed/filer/configuration.go
@@ -2,7 +2,7 @@ package filer
import (
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/spf13/viper"
+ "github.com/chrislusf/seaweedfs/weed/util"
"os"
"reflect"
"strings"
@@ -12,7 +12,7 @@ var (
Stores []FilerStore
)
-func (f *Filer) LoadConfiguration(config *viper.Viper) {
+func (f *Filer) LoadConfiguration(config *util.ViperProxy) {
validateOneEnabledStore(config)
@@ -79,7 +79,7 @@ func (f *Filer) LoadConfiguration(config *viper.Viper) {
}
-func validateOneEnabledStore(config *viper.Viper) {
+func validateOneEnabledStore(config *util.ViperProxy) {
enabledStore := ""
for _, store := range Stores {
if config.GetBool(store.GetName() + ".enabled") {
diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go
index ec88e10a5..1e7f55599 100644
--- a/weed/filer/elastic/v7/elastic_store.go
+++ b/weed/filer/elastic/v7/elastic_store.go
@@ -96,8 +96,8 @@ func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
return nil
}
-func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
@@ -187,28 +187,28 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e
}
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
- if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil {
- for _, entry := range entries {
- store.DeleteEntry(ctx, entry.FullPath)
+ _, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool {
+ if err := store.DeleteEntry(ctx, entry.FullPath); err != nil {
+ glog.Errorf("elastic delete %s: %v.", entry.FullPath, err)
+ return false
}
- }
- return nil
+ return true
+ })
+ return
}
-func (store *ElasticStore) ListDirectoryEntries(
- ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
-) (entries []*filer.Entry, err error) {
- if string(fullpath) == "/" {
- return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit)
+func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ if string(dirPath) == "/" {
+ return store.listRootDirectoryEntries(ctx, startFileName, includeStartFile, limit, eachEntryFunc)
}
- return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
+ return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
}
-func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
+func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
indexResult, err := store.client.CatIndices().Do(ctx)
if err != nil {
glog.Errorf("list indices %v.", err)
- return entries, err
+ return
}
for _, index := range indexResult {
if index.Index == indexKV {
@@ -218,6 +218,7 @@ func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFi
if entry, err := store.FindEntry(ctx,
weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {
fileName := getFileName(entry.FullPath)
+ lastFileName = fileName
if fileName == startFileName && !inclusive {
continue
}
@@ -225,24 +226,25 @@ func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFi
if limit < 0 {
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
}
}
- return entries, nil
+ return
}
func (store *ElasticStore) listDirectoryEntries(
- ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
-) (entries []*filer.Entry, err error) {
+ ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
first := true
index := getIndex(fullpath)
nextStart := ""
parentId := weed_util.Md5String([]byte(fullpath))
- if _, err := store.client.Refresh(index).Do(ctx); err != nil {
+ if _, err = store.client.Refresh(index).Do(ctx); err != nil {
if elastic.IsNotFound(err) {
store.client.CreateIndex(index).Do(ctx)
- return entries, nil
+ return
}
}
for {
@@ -250,7 +252,7 @@ func (store *ElasticStore) listDirectoryEntries(
if (startFileName == "" && first) || inclusive {
if result, err = store.search(ctx, index, parentId); err != nil {
glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
- return entries, err
+ return
}
} else {
fullPath := string(fullpath) + "/" + startFileName
@@ -260,7 +262,7 @@ func (store *ElasticStore) listDirectoryEntries(
after := weed_util.Md5String([]byte(fullPath))
if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
- return entries, err
+ return
}
}
first = false
@@ -272,21 +274,21 @@ func (store *ElasticStore) listDirectoryEntries(
if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil {
limit--
if limit < 0 {
- return entries, nil
+ return lastFileName, nil
}
nextStart = string(esEntry.Entry.FullPath)
fileName := getFileName(esEntry.Entry.FullPath)
+ lastFileName = fileName
if fileName == startFileName && !inclusive {
continue
}
- entries = append(entries, esEntry.Entry)
+ if !eachEntryFunc(esEntry.Entry) {
+ break
+ }
}
}
- if len(result.Hits.Hits) < store.maxPageSize {
- break
- }
}
- return entries, nil
+ return
}
func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) {
diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go
index 634fba1eb..8159c634d 100644
--- a/weed/filer/etcd/etcd_store.go
+++ b/weed/filer/etcd/etcd_store.go
@@ -101,7 +101,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPa
resp, err := store.client.Get(ctx, string(key))
if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
if len(resp.Kvs) == 0 {
@@ -139,17 +139,17 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_
return nil
}
-func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ directoryPrefix := genDirectoryKeyPrefix(dirPath, "")
resp, err := store.client.Get(ctx, string(directoryPrefix),
clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
for _, kv := range resp.Kvs {
@@ -157,25 +157,28 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_
if fileName == "" {
continue
}
- if fileName == startFileName && !inclusive {
+ if fileName == startFileName && !includeStartFile {
continue
}
+ lastFileName = fileName
limit--
if limit < 0 {
break
}
entry := &filer.Entry{
- FullPath: weed_util.NewFullPath(string(fullpath), fileName),
+ FullPath: weed_util.NewFullPath(string(dirPath), fileName),
}
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(kv.Value)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
- return entries, err
+ return lastFileName, err
}
func genKey(dirPath, fileName string) (key []byte) {
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index f5ab36d37..845bfaec1 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -3,6 +3,7 @@ package filer
import (
"bytes"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"io"
"math"
"time"
@@ -38,7 +39,7 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa
return
}
-func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
+func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
// TODO maybe parallel this
for _, chunk := range chunks {
if !chunk.IsChunkManifest {
@@ -63,7 +64,7 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil
return
}
-func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
+func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
if !chunk.IsChunkManifest {
return
}
@@ -84,7 +85,7 @@ func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *fil
}
// TODO fetch from cache for weed mount?
-func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
+func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
urlStrings, err := lookupFileIdFn(fileId)
if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index c75a35f79..68f308a51 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
"sort"
"sync"
@@ -52,7 +53,7 @@ func ETagChunks(chunks []*filer_pb.FileChunk) (etag string) {
return fmt.Sprintf("%x-%d", util.Md5(bytes.Join(md5_digests, nil)), len(chunks))
}
-func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
+func CompactFileChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (compacted, garbage []*filer_pb.FileChunk) {
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
@@ -71,7 +72,7 @@ func CompactFileChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_
return
}
-func MinusChunks(lookupFileIdFn LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
+func MinusChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk, err error) {
aData, aMeta, aErr := ResolveChunkManifest(lookupFileIdFn, as)
if aErr != nil {
@@ -116,7 +117,7 @@ func (cv *ChunkView) IsFullChunk() bool {
return cv.Size == cv.ChunkSize
}
-func ViewFromChunks(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
+func ViewFromChunks(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, offset int64, size int64) (views []*ChunkView) {
visibles, _ := NonOverlappingVisibleIntervals(lookupFileIdFn, chunks)
@@ -222,7 +223,7 @@ func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (n
// NonOverlappingVisibleIntervals translates the file chunk into VisibleInterval in memory
// If the file chunk content is a chunk manifest
-func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
+func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (visibles []VisibleInterval, err error) {
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks)
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 13dedea1e..e59887763 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -134,69 +134,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
return nil
}
- dirParts := strings.Split(string(entry.FullPath), "/")
-
- // fmt.Printf("directory parts: %+v\n", dirParts)
-
- var lastDirectoryEntry *Entry
-
- for i := 1; i < len(dirParts); i++ {
- dirPath := "/" + util.Join(dirParts[:i]...)
- // fmt.Printf("%d directory: %+v\n", i, dirPath)
-
- // check the store directly
- glog.V(4).Infof("find uncached directory: %s", dirPath)
- dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
-
- // no such existing directory
- if dirEntry == nil {
-
- // create the directory
- now := time.Now()
-
- dirEntry = &Entry{
- FullPath: util.FullPath(dirPath),
- Attr: Attr{
- Mtime: now,
- Crtime: now,
- Mode: os.ModeDir | entry.Mode | 0110,
- Uid: entry.Uid,
- Gid: entry.Gid,
- Collection: entry.Collection,
- Replication: entry.Replication,
- UserName: entry.UserName,
- GroupNames: entry.GroupNames,
- },
- }
-
- glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
- mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
- if mkdirErr != nil {
- if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
- glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
- return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
- }
- } else {
- f.maybeAddBucket(dirEntry)
- f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
- }
-
- } else if !dirEntry.IsDirectory() {
- glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
- return fmt.Errorf("%s is a file", dirPath)
- }
-
- // remember the direct parent directory entry
- if i == len(dirParts)-1 {
- lastDirectoryEntry = dirEntry
- }
-
- }
-
- if lastDirectoryEntry == nil {
- glog.Errorf("CreateEntry %s: lastDirectoryEntry is nil", entry.FullPath)
- return fmt.Errorf("parent folder not found: %v", entry.FullPath)
- }
+ oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
/*
if !hasWritePermission(lastDirectoryEntry, entry) {
@@ -206,9 +144,13 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
}
*/
- oldEntry, _ := f.FindEntry(ctx, entry.FullPath)
-
if oldEntry == nil {
+
+ dirParts := strings.Split(string(entry.FullPath), "/")
+ if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
+ return err
+ }
+
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
if err := f.Store.InsertEntry(ctx, entry); err != nil {
glog.Errorf("insert entry %s: %v", entry.FullPath, err)
@@ -236,6 +178,65 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
return nil
}
+func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, dirParts []string, level int, isFromOtherCluster bool) (err error) {
+
+ if level == 0 {
+ return nil
+ }
+
+ dirPath := "/" + util.Join(dirParts[:level]...)
+ // fmt.Printf("%d directory: %+v\n", i, dirPath)
+
+ // check the store directly
+ glog.V(4).Infof("find uncached directory: %s", dirPath)
+ dirEntry, _ := f.FindEntry(ctx, util.FullPath(dirPath))
+
+ // no such existing directory
+ if dirEntry == nil {
+
+ // ensure parent directory
+ if err = f.ensureParentDirecotryEntry(ctx, entry, dirParts, level-1, isFromOtherCluster); err != nil {
+ return err
+ }
+
+ // create the directory
+ now := time.Now()
+
+ dirEntry = &Entry{
+ FullPath: util.FullPath(dirPath),
+ Attr: Attr{
+ Mtime: now,
+ Crtime: now,
+ Mode: os.ModeDir | entry.Mode | 0110,
+ Uid: entry.Uid,
+ Gid: entry.Gid,
+ Collection: entry.Collection,
+ Replication: entry.Replication,
+ UserName: entry.UserName,
+ GroupNames: entry.GroupNames,
+ },
+ }
+
+ glog.V(2).Infof("create directory: %s %v", dirPath, dirEntry.Mode)
+ mkdirErr := f.Store.InsertEntry(ctx, dirEntry)
+ if mkdirErr != nil {
+ if _, err := f.FindEntry(ctx, util.FullPath(dirPath)); err == filer_pb.ErrNotFound {
+ glog.V(3).Infof("mkdir %s: %v", dirPath, mkdirErr)
+ return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
+ }
+ } else {
+ f.maybeAddBucket(dirEntry)
+ f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
+ }
+
+ } else if !dirEntry.IsDirectory() {
+ glog.Errorf("CreateEntry %s: %s should be a directory", entry.FullPath, dirPath)
+ return fmt.Errorf("%s is a file", dirPath)
+ }
+
+ return nil
+}
+
func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
if oldEntry != nil {
entry.Attr.Crtime = oldEntry.Attr.Crtime
@@ -280,21 +281,19 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
}
-func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, expiredCount int, lastFileName string, err error) {
- listedEntries, listErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix)
- if listErr != nil {
- return listedEntries, expiredCount, "", listErr
- }
- for _, entry := range listedEntries {
- lastFileName = entry.Name()
+func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
+ lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
if entry.TtlSec > 0 {
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
f.Store.DeleteOneEntry(ctx, entry)
expiredCount++
- continue
+ return true
}
}
- entries = append(entries, entry)
+ return eachEntryFunc(entry)
+ })
+ if err != nil {
+ return expiredCount, lastFileName, err
}
return
}
diff --git a/weed/filer/filer_buckets.go b/weed/filer/filer_buckets.go
index 4d4f4abc3..ba170f02e 100644
--- a/weed/filer/filer_buckets.go
+++ b/weed/filer/filer_buckets.go
@@ -27,9 +27,9 @@ func (f *Filer) LoadBuckets() {
buckets: make(map[BucketName]*BucketOption),
}
- limit := math.MaxInt32
+ limit := int64(math.MaxInt32)
- entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "")
+ entries, _, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "", "")
if err != nil {
glog.V(1).Infof("no buckets found: %v", err)
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index da92c4f4b..50a669f40 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -30,7 +30,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
// delete the folder children, not including the folder itself
var dirChunks []*filer_pb.FileChunk
var dirHardLinkIds []HardLinkId
- dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isFromOtherCluster, signatures)
+ dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isDeleteCollection, isFromOtherCluster, signatures)
if err != nil {
glog.V(0).Infof("delete directory %s: %v", p, err)
return fmt.Errorf("delete directory %s: %v", p, err)
@@ -63,46 +63,49 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
return nil
}
-func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, hardlinkIds []HardLinkId, err error) {
+func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isDeletingBucket, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, hardlinkIds []HardLinkId, err error) {
lastFileName := ""
includeLastFile := false
- for {
- entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "")
- if err != nil {
- glog.Errorf("list folder %s: %v", entry.FullPath, err)
- return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
- }
- if lastFileName == "" && !isRecursive && len(entries) > 0 {
- // only for first iteration in the loop
- glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
- return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
- }
+ if !isDeletingBucket {
+ for {
+ entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "")
+ if err != nil {
+ glog.Errorf("list folder %s: %v", entry.FullPath, err)
+ return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
+ }
+ if lastFileName == "" && !isRecursive && len(entries) > 0 {
+ // only for first iteration in the loop
+ glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
+ return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
+ }
- for _, sub := range entries {
- lastFileName = sub.Name()
- var dirChunks []*filer_pb.FileChunk
- var dirHardLinkIds []HardLinkId
- if sub.IsDirectory() {
- dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false, nil)
- chunks = append(chunks, dirChunks...)
- hardlinkIds = append(hardlinkIds, dirHardLinkIds...)
- } else {
- f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
- if len(sub.HardLinkId) != 0 {
- // hard link chunk data are deleted separately
- hardlinkIds = append(hardlinkIds, sub.HardLinkId)
+ for _, sub := range entries {
+ lastFileName = sub.Name()
+ var dirChunks []*filer_pb.FileChunk
+ var dirHardLinkIds []HardLinkId
+ if sub.IsDirectory() {
+ subIsDeletingBucket := f.isBucket(sub)
+ dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, subIsDeletingBucket, false, nil)
+ chunks = append(chunks, dirChunks...)
+ hardlinkIds = append(hardlinkIds, dirHardLinkIds...)
} else {
- chunks = append(chunks, sub.Chunks...)
+ f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
+ if len(sub.HardLinkId) != 0 {
+ // hard link chunk data are deleted separately
+ hardlinkIds = append(hardlinkIds, sub.HardLinkId)
+ } else {
+ chunks = append(chunks, sub.Chunks...)
+ }
+ }
+ if err != nil && !ignoreRecursiveError {
+ return nil, nil, err
}
}
- if err != nil && !ignoreRecursiveError {
- return nil, nil, err
- }
- }
- if len(entries) < PaginationSize {
- break
+ if len(entries) < PaginationSize {
+ break
+ }
}
}
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index 40755e6a7..f3a795ad0 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -113,13 +113,13 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano()
- dayEntries, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "")
+ dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "", "")
if listDayErr != nil {
return lastTsNs, fmt.Errorf("fail to list log by day: %v", listDayErr)
}
for _, dayEntry := range dayEntries {
// println("checking day", dayEntry.FullPath)
- hourMinuteEntries, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "")
+ hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "")
if listHourMinuteErr != nil {
return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
}
@@ -170,7 +170,7 @@ func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func
return lastTsNs, err
}
if logEntry.TsNs <= ns {
- return lastTsNs, nil
+ continue
}
// println("each log: ", logEntry.TsNs)
if err := eachLogEntryFn(logEntry); err != nil {
diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go
index b26959cb0..0a14d3756 100644
--- a/weed/filer/filer_search.go
+++ b/weed/filer/filer_search.go
@@ -19,58 +19,72 @@ func splitPattern(pattern string) (prefix string, restPattern string) {
return "", restPattern
}
-func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, namePattern string) (entries []*Entry, err error) {
+// For now, prefix and namePattern are mutually exclusive
+func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string) (entries []*Entry, hasMore bool, err error) {
+
+ _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, func(entry *Entry) bool {
+ entries = append(entries, entry)
+ return true
+ })
+
+ hasMore = int64(len(entries)) >= limit+1
+ if hasMore {
+ entries = entries[:limit]
+ }
+
+ return entries, hasMore, err
+}
+
+// For now, prefix and namePattern are mutually exclusive
+func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1]
}
- prefix, restNamePattern := splitPattern(namePattern)
- var missedCount int
- var lastFileName string
+ prefixInNamePattern, restNamePattern := splitPattern(namePattern)
+ if prefixInNamePattern != "" {
+ prefix = prefixInNamePattern
+ }
+ var missedCount int64
- entries, missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern)
+ missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern, eachEntryFunc)
for missedCount > 0 && err == nil {
- var makeupEntries []*Entry
- makeupEntries, missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern)
- for _, entry := range makeupEntries {
- entries = append(entries, entry)
- }
+ missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern, eachEntryFunc)
}
- return entries, err
+ return
}
-func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix, restNamePattern string) (matchedEntries []*Entry, missedCount int, lastFileName string, err error) {
- var foundEntries []*Entry
+func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix, restNamePattern string, eachEntryFunc ListEachEntryFunc) (missedCount int64, lastFileName string, err error) {
- foundEntries, lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix)
- if err != nil {
- return
- }
if len(restNamePattern) == 0 {
- return foundEntries, 0, lastFileName, nil
+ lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, eachEntryFunc)
+ return 0, lastFileName, err
}
- for _, entry := range foundEntries {
+
+ lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
nameToTest := strings.ToLower(entry.Name())
if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && matched {
- matchedEntries = append(matchedEntries, entry)
+ if !eachEntryFunc(entry) {
+ return false
+ }
} else {
missedCount++
}
+ return true
+ })
+ if err != nil {
+ return
}
return
}
-func (f *Filer) doListValidEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*Entry, lastFileName string, err error) {
- var makeupEntries []*Entry
- var expiredCount int
- entries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix)
+func (f *Filer) doListValidEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
+ var expiredCount int64
+ expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix, eachEntryFunc)
for expiredCount > 0 && err == nil {
- makeupEntries, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix)
- if err == nil {
- entries = append(entries, makeupEntries...)
- }
+ expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix, eachEntryFunc)
}
return
}
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index f1e6c6c35..8955a25c7 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -13,6 +13,8 @@ var (
ErrKvNotFound = errors.New("kv: not found")
)
+type ListEachEntryFunc func(entry *Entry) bool
+
type FilerStore interface {
// GetName gets the name to locate the configuration in filer.toml file
GetName() string
@@ -24,8 +26,8 @@ type FilerStore interface {
FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
DeleteEntry(context.Context, util.FullPath) (err error)
DeleteFolderChildren(context.Context, util.FullPath) (err error)
- ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
- ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
+ ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
+ ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
BeginTransaction(ctx context.Context) (context.Context, error)
CommitTransaction(ctx context.Context) error
diff --git a/weed/filer/filerstore_translate_path.go b/weed/filer/filerstore_translate_path.go
index ea0f9db77..00bf82ed4 100644
--- a/weed/filer/filerstore_translate_path.go
+++ b/weed/filer/filerstore_translate_path.go
@@ -106,32 +106,24 @@ func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp u
return t.actualStore.DeleteFolderChildren(ctx, newFullPath)
}
-func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
+func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
newFullPath := t.translatePath(dirPath)
- entries, err := t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
+ return t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
- }
- return entries, err
+ return eachEntryFunc(entry)
+ })
}
-func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
+func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) {
newFullPath := t.translatePath(dirPath)
- entries, err := t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix)
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
+ return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
- }
- return entries, nil
+ return eachEntryFunc(entry)
+ })
}
func (t *FilerStorePathTranlator) BeginTransaction(ctx context.Context) (context.Context, error) {
diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go
index 3206d5ba4..64baac371 100644
--- a/weed/filer/filerstore_wrapper.go
+++ b/weed/filer/filerstore_wrapper.go
@@ -194,7 +194,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.
return actualStore.DeleteFolderChildren(ctx, fp)
}
-func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
+func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
actualStore := fsw.getActualStore(dirPath + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc()
start := time.Now()
@@ -203,18 +203,14 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath
}()
glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
- entries, err := actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
+ return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.Chunks)
- }
- return entries, err
+ return eachEntryFunc(entry)
+ })
}
-func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
+func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
actualStore := fsw.getActualStore(dirPath + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc()
start := time.Now()
@@ -222,48 +218,52 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
- entries, err := actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
+ lastFileName, err = actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, eachEntryFunc)
if err == ErrUnsupportedListDirectoryPrefixed {
- entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
+ lastFileName, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
+ fsw.maybeReadHardLink(ctx, entry)
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ return eachEntryFunc(entry)
+ })
}
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
- fsw.maybeReadHardLink(ctx, entry)
- filer_pb.AfterEntryDeserialization(entry.Chunks)
- }
- return entries, nil
+ return lastFileName, err
}
-func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
+func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
actualStore := fsw.getActualStore(dirPath + "/")
- entries, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
if prefix == "" {
+ return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
+ }
+
+ var notPrefixed []*Entry
+ lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
+ notPrefixed = append(notPrefixed, entry)
+ return true
+ })
+ if err != nil {
return
}
- count := 0
- var lastFileName string
- notPrefixed := entries
- entries = nil
+ count := int64(0)
for count < limit && len(notPrefixed) > 0 {
for _, entry := range notPrefixed {
- lastFileName = entry.Name()
if strings.HasPrefix(entry.Name(), prefix) {
count++
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ return
+ }
if count >= limit {
break
}
}
}
if count < limit {
- notPrefixed, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
+ notPrefixed = notPrefixed[:0]
+ _, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
+ notPrefixed = append(notPrefixed, entry)
+ return true
+ })
if err != nil {
return
}
diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go
index 6b0ad58b9..2e4491515 100644
--- a/weed/filer/hbase/hbase_store.go
+++ b/weed/filer/hbase/hbase_store.go
@@ -148,19 +148,18 @@ func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.Ful
return
}
-func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer.Entry, error) {
- return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "")
+func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
-func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*filer.Entry, error) {
+func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
expectedPrefix := []byte(dirPath.Child(prefix))
scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
if err != nil {
- return nil, err
+ return lastFileName, err
}
- var entries []*filer.Entry
scanner := store.Client.Scan(scan)
defer scanner.Close()
for {
@@ -169,7 +168,7 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
break
}
if err != nil {
- return entries, err
+ return lastFileName, err
}
if len(res.Cells) == 0 {
continue
@@ -186,6 +185,8 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
continue
}
+ lastFileName = fileName
+
value := cell.Value
if fileName == startFileName && !includeStartFile {
@@ -204,10 +205,12 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
- return entries, nil
+ return lastFileName, nil
}
func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) {
diff --git a/weed/filer/hbase/hbase_store_kv.go b/weed/filer/hbase/hbase_store_kv.go
index 26bf763e2..990e55a24 100644
--- a/weed/filer/hbase/hbase_store_kv.go
+++ b/weed/filer/hbase/hbase_store_kv.go
@@ -7,9 +7,10 @@ import (
"time"
)
-const(
+const (
COLUMN_NAME = "a"
)
+
func (store *HbaseStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
return store.doPut(ctx, store.cfKv, key, value, 0)
}
diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go
index b879f3a6e..f0ae64769 100644
--- a/weed/filer/leveldb/leveldb_store.go
+++ b/weed/filer/leveldb/leveldb_store.go
@@ -107,7 +107,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
return nil, filer_pb.ErrNotFound
}
if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
entry = &filer.Entry{
@@ -162,16 +162,19 @@ func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
return nil
}
-func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer.Entry, err error) {
- return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
+func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
-func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- directoryPrefix := genDirectoryKeyPrefix(fullpath, prefix)
+ directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
+ }
- iter := store.db.NewIterator(&leveldb_util.Range{Start: genDirectoryKeyPrefix(fullpath, startFileName)}, nil)
+ iter := store.db.NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
for iter.Next() {
key := iter.Key()
if !bytes.HasPrefix(key, directoryPrefix) {
@@ -181,26 +184,29 @@ func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, ful
if fileName == "" {
continue
}
- if fileName == startFileName && !inclusive {
+ if fileName == startFileName && !includeStartFile {
continue
}
+ lastFileName = fileName
limit--
if limit < 0 {
break
}
entry := &filer.Entry{
- FullPath: weed_util.NewFullPath(string(fullpath), fileName),
+ FullPath: weed_util.NewFullPath(string(dirPath), fileName),
}
if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
iter.Release()
- return entries, err
+ return lastFileName, err
}
func genKey(dirPath, fileName string) (key []byte) {
diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go
index c5bfb8474..9c342605e 100644
--- a/weed/filer/leveldb/leveldb_store_test.go
+++ b/weed/filer/leveldb/leveldb_store_test.go
@@ -2,9 +2,11 @@ package leveldb
import (
"context"
+ "fmt"
"io/ioutil"
"os"
"testing"
+ "time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -49,14 +51,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "")
+ entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -75,7 +77,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if err != nil {
t.Errorf("list entries: %v", err)
return
@@ -86,3 +88,28 @@ func TestEmptyRoot(t *testing.T) {
}
}
+
+func BenchmarkInsertEntry(b *testing.B) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench")
+ defer os.RemoveAll(dir)
+ store := &LevelDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ ctx := context.Background()
+
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ entry := &filer.Entry{
+ FullPath: util.FullPath(fmt.Sprintf("/file%d.txt", i)),
+ Attr: filer.Attr{
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Mode: os.FileMode(0644),
+ },
+ }
+ store.InsertEntry(ctx, entry)
+ }
+}
diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go
index 4b41554b9..965721460 100644
--- a/weed/filer/leveldb2/leveldb2_store.go
+++ b/weed/filer/leveldb2/leveldb2_store.go
@@ -115,7 +115,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.Fu
return nil, filer_pb.ErrNotFound
}
if err != nil {
- return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
}
entry = &filer.Entry{
@@ -171,15 +171,17 @@ func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath w
return nil
}
-func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer.Entry, err error) {
- return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
+func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
-func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, prefix, store.dbCount)
- lastFileStart, _ := genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount)
+ directoryPrefix, partitionId := genDirectoryKeyPrefix(dirPath, prefix, store.dbCount)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart, _ = genDirectoryKeyPrefix(dirPath, startFileName, store.dbCount)
+ }
iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
for iter.Next() {
@@ -191,15 +193,16 @@ func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fu
if fileName == "" {
continue
}
- if fileName == startFileName && !inclusive {
+ if fileName == startFileName && !includeStartFile {
continue
}
+ lastFileName = fileName
limit--
if limit < 0 {
break
}
entry := &filer.Entry{
- FullPath: weed_util.NewFullPath(string(fullpath), fileName),
+ FullPath: weed_util.NewFullPath(string(dirPath), fileName),
}
// println("list", entry.FullPath, "chunks", len(entry.Chunks))
@@ -208,11 +211,13 @@ func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fu
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
iter.Release()
- return entries, err
+ return lastFileName, err
}
func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int) {
diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go
index 22c0d6052..495c73fdd 100644
--- a/weed/filer/leveldb2/leveldb2_store_test.go
+++ b/weed/filer/leveldb2/leveldb2_store_test.go
@@ -49,14 +49,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "")
+ entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "")
+ entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
if err != nil {
t.Errorf("list entries: %v", err)
return
diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go
new file mode 100644
index 000000000..24e00edc7
--- /dev/null
+++ b/weed/filer/leveldb3/leveldb3_store.go
@@ -0,0 +1,375 @@
+package leveldb
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "fmt"
+ "github.com/syndtr/goleveldb/leveldb"
+ leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+ "io"
+ "os"
+ "strings"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ DEFAULT = "_main"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &LevelDB3Store{})
+}
+
+type LevelDB3Store struct {
+ dir string
+ dbs map[string]*leveldb.DB
+ dbsLock sync.RWMutex
+}
+
+func (store *LevelDB3Store) GetName() string {
+ return "leveldb3"
+}
+
+func (store *LevelDB3Store) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
+ dir := configuration.GetString(prefix + "dir")
+ return store.initialize(dir)
+}
+
+func (store *LevelDB3Store) initialize(dir string) (err error) {
+ glog.Infof("filer store leveldb3 dir: %s", dir)
+ if err := weed_util.TestFolderWritable(dir); err != nil {
+ return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
+ }
+ store.dir = dir
+
+ db, loadDbErr := store.loadDB(DEFAULT)
+ if loadDbErr != nil {
+ return loadDbErr
+ }
+ store.dbs = make(map[string]*leveldb.DB)
+ store.dbs[DEFAULT] = db
+
+ return
+}
+
+func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) {
+
+ opts := &opt.Options{
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 4,
+ }
+ if name != DEFAULT {
+ opts = &opt.Options{
+ BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 4,
+ }
+ }
+
+ dbFolder := fmt.Sprintf("%s/%s", store.dir, name)
+ os.MkdirAll(dbFolder, 0755)
+ db, dbErr := leveldb.OpenFile(dbFolder, opts)
+ if leveldb_errors.IsCorrupted(dbErr) {
+ db, dbErr = leveldb.RecoverFile(dbFolder, opts)
+ }
+ if dbErr != nil {
+ glog.Errorf("filer store open dir %s: %v", dbFolder, dbErr)
+ return nil, dbErr
+ }
+ return db, nil
+}
+
+func (store *LevelDB3Store) findDB(fullpath weed_util.FullPath, isForChildren bool) (*leveldb.DB, string, weed_util.FullPath, error) {
+
+ store.dbsLock.RLock()
+
+ defaultDB := store.dbs[DEFAULT]
+ if !strings.HasPrefix(string(fullpath), "/buckets/") {
+ store.dbsLock.RUnlock()
+ return defaultDB, DEFAULT, fullpath, nil
+ }
+
+ // detect bucket
+ bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
+ t := strings.Index(bucketAndObjectKey, "/")
+ if t < 0 && !isForChildren {
+ store.dbsLock.RUnlock()
+ return defaultDB, DEFAULT, fullpath, nil
+ }
+ bucket := bucketAndObjectKey
+ shortPath := weed_util.FullPath("/")
+ if t > 0 {
+ bucket = bucketAndObjectKey[:t]
+ shortPath = weed_util.FullPath(bucketAndObjectKey[t:])
+ }
+
+ if db, found := store.dbs[bucket]; found {
+ store.dbsLock.RUnlock()
+ return db, bucket, shortPath, nil
+ }
+
+ store.dbsLock.RUnlock()
+ // upgrade to write lock
+ store.dbsLock.Lock()
+ defer store.dbsLock.Unlock()
+
+ // double check after getting the write lock
+ if db, found := store.dbs[bucket]; found {
+ return db, bucket, shortPath, nil
+ }
+
+ // create db
+ db, err := store.loadDB(bucket)
+ if err != nil {
+ return nil, bucket, shortPath, err
+ }
+ store.dbs[bucket] = db
+
+ return db, bucket, shortPath, nil
+}
+
+func (store *LevelDB3Store) closeDB(bucket string) {
+
+ store.dbsLock.Lock()
+ defer store.dbsLock.Unlock()
+
+ if db, found := store.dbs[bucket]; found {
+ db.Close()
+ delete(store.dbs, bucket)
+ }
+
+}
+
+func (store *LevelDB3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *LevelDB3Store) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *LevelDB3Store) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *LevelDB3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ db, _, shortPath, err := store.findDB(entry.FullPath, false)
+ if err != nil {
+ return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
+ }
+
+ dir, name := shortPath.DirAndName()
+ key := genKey(dir, name)
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ if len(entry.Chunks) > 50 {
+ value = weed_util.MaybeGzipData(value)
+ }
+
+ err = db.Put(key, value, nil)
+
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
+
+ return nil
+}
+
+func (store *LevelDB3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *LevelDB3Store) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
+
+ db, _, shortPath, err := store.findDB(fullpath, false)
+ if err != nil {
+ return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
+ }
+
+ dir, name := shortPath.DirAndName()
+ key := genKey(dir, name)
+
+ data, err := db.Get(key, nil)
+
+ if err == leveldb.ErrNotFound {
+ return nil, filer_pb.ErrNotFound
+ }
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
+
+ return entry, nil
+}
+
+func (store *LevelDB3Store) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+
+ db, _, shortPath, err := store.findDB(fullpath, false)
+ if err != nil {
+ return fmt.Errorf("findDB %s : %v", fullpath, err)
+ }
+
+ dir, name := shortPath.DirAndName()
+ key := genKey(dir, name)
+
+ err = db.Delete(key, nil)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *LevelDB3Store) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+
+ db, bucket, shortPath, err := store.findDB(fullpath, true)
+ if err != nil {
+ return fmt.Errorf("findDB %s : %v", fullpath, err)
+ }
+
+ if bucket != DEFAULT && shortPath == "/" {
+ store.closeDB(bucket)
+ if bucket != "" { // just to make sure
+ os.RemoveAll(store.dir + "/" + bucket)
+ }
+ return nil
+ }
+
+ directoryPrefix := genDirectoryKeyPrefix(shortPath, "")
+
+ batch := new(leveldb.Batch)
+
+ iter := db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ batch.Delete(append(directoryPrefix, []byte(fileName)...))
+ }
+ iter.Release()
+
+ err = db.Write(batch, nil)
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
+}
+
+func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+
+ db, _, shortPath, err := store.findDB(dirPath, true)
+ if err != nil {
+ return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
+ }
+
+ directoryPrefix := genDirectoryKeyPrefix(shortPath, prefix)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart = genDirectoryKeyPrefix(shortPath, startFileName)
+ }
+
+ iter := db.NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ if fileName == startFileName && !includeStartFile {
+ continue
+ }
+ lastFileName = fileName
+ limit--
+ if limit < 0 {
+ break
+ }
+ entry := &filer.Entry{
+ FullPath: weed_util.NewFullPath(string(dirPath), fileName),
+ }
+
+ // println("list", entry.FullPath, "chunks", len(entry.Chunks))
+ if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+ if !eachEntryFunc(entry) {
+ break
+ }
+ }
+ iter.Release()
+
+ return lastFileName, err
+}
+
+func genKey(dirPath, fileName string) (key []byte) {
+ key = hashToBytes(dirPath)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = hashToBytes(string(fullpath))
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func getNameFromKey(key []byte) string {
+
+ return string(key[md5.Size:])
+
+}
+
+// hash directory
+func hashToBytes(dir string) []byte {
+ h := md5.New()
+ io.WriteString(h, dir)
+ b := h.Sum(nil)
+ return b
+}
+
+func (store *LevelDB3Store) Shutdown() {
+ for _, db := range store.dbs {
+ db.Close()
+ }
+}
diff --git a/weed/filer/leveldb3/leveldb3_store_kv.go b/weed/filer/leveldb3/leveldb3_store_kv.go
new file mode 100644
index 000000000..18d782b80
--- /dev/null
+++ b/weed/filer/leveldb3/leveldb3_store_kv.go
@@ -0,0 +1,46 @@
+package leveldb
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/syndtr/goleveldb/leveldb"
+)
+
+func (store *LevelDB3Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+
+ err = store.dbs[DEFAULT].Put(key, value, nil)
+
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+
+ return nil
+}
+
+func (store *LevelDB3Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+
+ value, err = store.dbs[DEFAULT].Get(key, nil)
+
+ if err == leveldb.ErrNotFound {
+ return nil, filer.ErrKvNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("kv get: %v", err)
+ }
+
+ return
+}
+
+func (store *LevelDB3Store) KvDelete(ctx context.Context, key []byte) (err error) {
+
+ err = store.dbs[DEFAULT].Delete(key, nil)
+
+ if err != nil {
+ return fmt.Errorf("kv delete: %v", err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/leveldb3/leveldb3_store_test.go b/weed/filer/leveldb3/leveldb3_store_test.go
new file mode 100644
index 000000000..53b0e927f
--- /dev/null
+++ b/weed/filer/leveldb3/leveldb3_store_test.go
@@ -0,0 +1,88 @@
+package leveldb
+
+import (
+ "context"
+ "io/ioutil"
+ "os"
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func TestCreateAndFind(t *testing.T) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
+ defer os.RemoveAll(dir)
+ store := &LevelDB3Store{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
+
+ ctx := context.Background()
+
+ entry1 := &filer.Entry{
+ FullPath: fullpath,
+ Attr: filer.Attr{
+ Mode: 0440,
+ Uid: 1234,
+ Gid: 5678,
+ },
+ }
+
+ if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ t.Errorf("create entry %v: %v", entry1.FullPath, err)
+ return
+ }
+
+ entry, err := testFiler.FindEntry(ctx, fullpath)
+
+ if err != nil {
+ t.Errorf("find entry: %v", err)
+ return
+ }
+
+ if entry.FullPath != entry1.FullPath {
+ t.Errorf("find wrong entry: %v", entry.FullPath)
+ return
+ }
+
+ // checking one upper directory
+ entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+ // checking one upper directory
+ entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
+
+func TestEmptyRoot(t *testing.T) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
+ defer os.RemoveAll(dir)
+ store := &LevelDB3Store{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ ctx := context.Background()
+
+ // checking one upper directory
+ entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ if err != nil {
+ t.Errorf("list entries: %v", err)
+ return
+ }
+ if len(entries) != 0 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go
index d20c6477a..1ef5056f4 100644
--- a/weed/filer/mongodb/mongodb_store.go
+++ b/weed/filer/mongodb/mongodb_store.go
@@ -178,14 +178,14 @@ func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath ut
return nil
}
-func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
+func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- var where = bson.M{"directory": string(fullpath), "name": bson.M{"$gt": startFileName}}
- if inclusive {
+ var where = bson.M{"directory": string(dirPath), "name": bson.M{"$gt": startFileName}}
+ if includeStartFile {
where["name"] = bson.M{
"$gte": startFileName,
}
@@ -197,26 +197,30 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath ut
var data Model
err := cur.Decode(&data)
if err != nil && err != mongo.ErrNoDocuments {
- return nil, err
+ return lastFileName, err
}
entry := &filer.Entry{
- FullPath: util.NewFullPath(string(fullpath), data.Name),
+ FullPath: util.NewFullPath(string(dirPath), data.Name),
}
+ lastFileName = data.Name
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
+
}
if err := cur.Close(ctx); err != nil {
glog.V(0).Infof("list iterator close: %v", err)
}
- return entries, err
+ return lastFileName, err
}
func (store *MongodbStore) Shutdown() {
diff --git a/weed/filer/mysql/mysql_sql_gen.go b/weed/filer/mysql/mysql_sql_gen.go
new file mode 100644
index 000000000..057484c37
--- /dev/null
+++ b/weed/filer/mysql/mysql_sql_gen.go
@@ -0,0 +1,52 @@
+package mysql
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
+ _ "github.com/go-sql-driver/mysql"
+)
+
+type SqlGenMysql struct {
+ CreateTableSqlTemplate string
+ DropTableSqlTemplate string
+}
+
+var (
+ _ = abstract_sql.SqlGenerator(&SqlGenMysql{})
+)
+
+func (gen *SqlGenMysql) GetSqlInsert(bucket string) string {
+ return fmt.Sprintf("INSERT INTO %s (dirhash,name,directory,meta) VALUES(?,?,?,?)", bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlUpdate(bucket string) string {
+ return fmt.Sprintf("UPDATE %s SET meta=? WHERE dirhash=? AND name=? AND directory=?", bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlFind(bucket string) string {
+ return fmt.Sprintf("SELECT meta FROM %s WHERE dirhash=? AND name=? AND directory=?", bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlDelete(bucket string) string {
+ return fmt.Sprintf("DELETE FROM %s WHERE dirhash=? AND name=? AND directory=?", bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlDeleteFolderChildren(bucket string) string {
+ return fmt.Sprintf("DELETE FROM %s WHERE dirhash=? AND directory=?", bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlListExclusive(bucket string) string {
+ return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlListInclusive(bucket string) string {
+ return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?", bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlCreateTable(bucket string) string {
+ return fmt.Sprintf(gen.CreateTableSqlTemplate, bucket)
+}
+
+func (gen *SqlGenMysql) GetSqlDropTable(bucket string) string {
+ return fmt.Sprintf(gen.DropTableSqlTemplate, bucket)
+}
diff --git a/weed/filer/mysql/mysql_store.go b/weed/filer/mysql/mysql_store.go
index 5bc132980..686628740 100644
--- a/weed/filer/mysql/mysql_store.go
+++ b/weed/filer/mysql/mysql_store.go
@@ -3,8 +3,9 @@ package mysql
import (
"database/sql"
"fmt"
-
"github.com/chrislusf/seaweedfs/weed/filer"
+ "time"
+
"github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
"github.com/chrislusf/seaweedfs/weed/util"
_ "github.com/go-sql-driver/mysql"
@@ -35,20 +36,19 @@ func (store *MysqlStore) Initialize(configuration util.Configuration, prefix str
configuration.GetString(prefix+"database"),
configuration.GetInt(prefix+"connection_max_idle"),
configuration.GetInt(prefix+"connection_max_open"),
+ configuration.GetInt(prefix+"connection_max_lifetime_seconds"),
configuration.GetBool(prefix+"interpolateParams"),
)
}
-func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen int,
- interpolateParams bool) (err error) {
- //
- store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES(?,?,?,?)"
- store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?"
- store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
- store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
- store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?"
- store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?"
- store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?"
+func (store *MysqlStore) initialize(user, password, hostname string, port int, database string, maxIdle, maxOpen,
+ maxLifetimeSeconds int, interpolateParams bool) (err error) {
+
+ store.SupportBucketTable = false
+ store.SqlGenerator = &SqlGenMysql{
+ CreateTableSqlTemplate: "",
+ DropTableSqlTemplate: "drop table %s",
+ }
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
if interpolateParams {
@@ -65,6 +65,7 @@ func (store *MysqlStore) initialize(user, password, hostname string, port int, d
store.DB.SetMaxIdleConns(maxIdle)
store.DB.SetMaxOpenConns(maxOpen)
+ store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
if err = store.DB.Ping(); err != nil {
return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
diff --git a/weed/filer/mysql2/mysql2_store.go b/weed/filer/mysql2/mysql2_store.go
new file mode 100644
index 000000000..15216b651
--- /dev/null
+++ b/weed/filer/mysql2/mysql2_store.go
@@ -0,0 +1,82 @@
+package mysql2
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
+ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ _ "github.com/go-sql-driver/mysql"
+)
+
+const (
+ CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?charset=utf8"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &MysqlStore2{})
+}
+
+type MysqlStore2 struct {
+ abstract_sql.AbstractSqlStore
+}
+
+func (store *MysqlStore2) GetName() string {
+ return "mysql2"
+}
+
+func (store *MysqlStore2) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"createTable"),
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetString(prefix+"hostname"),
+ configuration.GetInt(prefix+"port"),
+ configuration.GetString(prefix+"database"),
+ configuration.GetInt(prefix+"connection_max_idle"),
+ configuration.GetInt(prefix+"connection_max_open"),
+ configuration.GetInt(prefix+"connection_max_lifetime_seconds"),
+ configuration.GetBool(prefix+"interpolateParams"),
+ )
+}
+
+func (store *MysqlStore2) initialize(createTable, user, password, hostname string, port int, database string, maxIdle, maxOpen,
+ maxLifetimeSeconds int, interpolateParams bool) (err error) {
+
+ store.SupportBucketTable = true
+ store.SqlGenerator = &mysql.SqlGenMysql{
+ CreateTableSqlTemplate: createTable,
+ DropTableSqlTemplate: "drop table %s",
+ }
+
+ sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
+ if interpolateParams {
+ sqlUrl += "&interpolateParams=true"
+ }
+
+ var dbErr error
+ store.DB, dbErr = sql.Open("mysql", sqlUrl)
+ if dbErr != nil {
+ store.DB.Close()
+ store.DB = nil
+ return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
+ }
+
+ store.DB.SetMaxIdleConns(maxIdle)
+ store.DB.SetMaxOpenConns(maxOpen)
+ store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
+
+ if err = store.DB.Ping(); err != nil {
+ return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
+ }
+
+ if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil {
+ return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/postgres/postgres_sql_gen.go b/weed/filer/postgres/postgres_sql_gen.go
new file mode 100644
index 000000000..284cf254b
--- /dev/null
+++ b/weed/filer/postgres/postgres_sql_gen.go
@@ -0,0 +1,53 @@
+package postgres
+
+import (
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
+ _ "github.com/lib/pq"
+)
+
+type SqlGenPostgres struct {
+ CreateTableSqlTemplate string
+ DropTableSqlTemplate string
+}
+
+var (
+ _ = abstract_sql.SqlGenerator(&SqlGenPostgres{})
+)
+
+func (gen *SqlGenPostgres) GetSqlInsert(bucket string) string {
+ return fmt.Sprintf("INSERT INTO %s (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)", bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlUpdate(bucket string) string {
+ return fmt.Sprintf("UPDATE %s SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4", bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlFind(bucket string) string {
+ return fmt.Sprintf("SELECT meta FROM %s WHERE dirhash=$1 AND name=$2 AND directory=$3", bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlDelete(bucket string) string {
+ return fmt.Sprintf("DELETE FROM %s WHERE dirhash=$1 AND name=$2 AND directory=$3", bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlDeleteFolderChildren(bucket string) string {
+ return fmt.Sprintf("DELETE FROM %s WHERE dirhash=$1 AND directory=$2", bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlListExclusive(bucket string) string {
+ return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5", bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlListInclusive(bucket string) string {
+ return fmt.Sprintf("SELECT NAME, meta FROM %s WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5", bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlCreateTable(bucket string) string {
+ return fmt.Sprintf(gen.CreateTableSqlTemplate, bucket)
+}
+
+func (gen *SqlGenPostgres) GetSqlDropTable(bucket string) string {
+ return fmt.Sprintf(gen.DropTableSqlTemplate, bucket)
+}
diff --git a/weed/filer/postgres/postgres_store.go b/weed/filer/postgres/postgres_store.go
index 2325568fe..27c6278c7 100644
--- a/weed/filer/postgres/postgres_store.go
+++ b/weed/filer/postgres/postgres_store.go
@@ -33,21 +33,20 @@ func (store *PostgresStore) Initialize(configuration util.Configuration, prefix
configuration.GetString(prefix+"hostname"),
configuration.GetInt(prefix+"port"),
configuration.GetString(prefix+"database"),
+ configuration.GetString(prefix+"schema"),
configuration.GetString(prefix+"sslmode"),
configuration.GetInt(prefix+"connection_max_idle"),
configuration.GetInt(prefix+"connection_max_open"),
)
}
-func (store *PostgresStore) initialize(user, password, hostname string, port int, database, sslmode string, maxIdle, maxOpen int) (err error) {
+func (store *PostgresStore) initialize(user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen int) (err error) {
- store.SqlInsert = "INSERT INTO filemeta (dirhash,name,directory,meta) VALUES($1,$2,$3,$4)"
- store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4"
- store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
- store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
- store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2"
- store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5"
- store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5"
+ store.SupportBucketTable = false
+ store.SqlGenerator = &SqlGenPostgres{
+ CreateTableSqlTemplate: "",
+ DropTableSqlTemplate: "drop table %s",
+ }
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)
if user != "" {
@@ -59,6 +58,9 @@ func (store *PostgresStore) initialize(user, password, hostname string, port int
if database != "" {
sqlUrl += " dbname=" + database
}
+ if schema != "" {
+ sqlUrl += " search_path=" + schema
+ }
var dbErr error
store.DB, dbErr = sql.Open("postgres", sqlUrl)
if dbErr != nil {
diff --git a/weed/filer/postgres2/postgres2_store.go b/weed/filer/postgres2/postgres2_store.go
new file mode 100644
index 000000000..82552376f
--- /dev/null
+++ b/weed/filer/postgres2/postgres2_store.go
@@ -0,0 +1,87 @@
+package postgres2
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
+ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ _ "github.com/lib/pq"
+)
+
+const (
+ CONNECTION_URL_PATTERN = "host=%s port=%d sslmode=%s connect_timeout=30"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &PostgresStore2{})
+}
+
+type PostgresStore2 struct {
+ abstract_sql.AbstractSqlStore
+}
+
+func (store *PostgresStore2) GetName() string {
+ return "postgres2"
+}
+
+func (store *PostgresStore2) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"createTable"),
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetString(prefix+"hostname"),
+ configuration.GetInt(prefix+"port"),
+ configuration.GetString(prefix+"database"),
+ configuration.GetString(prefix+"schema"),
+ configuration.GetString(prefix+"sslmode"),
+ configuration.GetInt(prefix+"connection_max_idle"),
+ configuration.GetInt(prefix+"connection_max_open"),
+ )
+}
+
+func (store *PostgresStore2) initialize(createTable, user, password, hostname string, port int, database, schema, sslmode string, maxIdle, maxOpen int) (err error) {
+
+ store.SupportBucketTable = true
+ store.SqlGenerator = &postgres.SqlGenPostgres{
+ CreateTableSqlTemplate: createTable,
+ DropTableSqlTemplate: "drop table %s",
+ }
+
+ sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, sslmode)
+ if user != "" {
+ sqlUrl += " user=" + user
+ }
+ if password != "" {
+ sqlUrl += " password=" + password
+ }
+ if database != "" {
+ sqlUrl += " dbname=" + database
+ }
+ if schema != "" {
+ sqlUrl += " search_path=" + schema
+ }
+ var dbErr error
+ store.DB, dbErr = sql.Open("postgres", sqlUrl)
+ if dbErr != nil {
+ store.DB.Close()
+ store.DB = nil
+ return fmt.Errorf("can not connect to %s error:%v", sqlUrl, err)
+ }
+
+ store.DB.SetMaxIdleConns(maxIdle)
+ store.DB.SetMaxOpenConns(maxOpen)
+
+ if err = store.DB.Ping(); err != nil {
+ return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
+ }
+
+ if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil {
+ return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 6193dbd45..307224f35 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -18,7 +18,7 @@ import (
type ChunkReadAt struct {
masterClient *wdclient.MasterClient
chunkViews []*ChunkView
- lookupFileId LookupFileIdFunctionType
+ lookupFileId wdclient.LookupFileIdFunctionType
readerLock sync.Mutex
fileSize int64
@@ -31,9 +31,7 @@ type ChunkReadAt struct {
var _ = io.ReaderAt(&ChunkReadAt{})
var _ = io.Closer(&ChunkReadAt{})
-type LookupFileIdFunctionType func(fileId string) (targetUrls []string, err error)
-
-func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
+func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionType {
vidCache := make(map[string]*filer_pb.Locations)
var vicCacheLock sync.RWMutex
@@ -109,7 +107,7 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
defer c.readerLock.Unlock()
glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
- return c.doReadAt(p[n:], offset+int64(n))
+ return c.doReadAt(p, offset)
}
func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
diff --git a/weed/filer/redis/redis_cluster_store.go b/weed/filer/redis/redis_cluster_store.go
index 8af94ee55..9572058a8 100644
--- a/weed/filer/redis/redis_cluster_store.go
+++ b/weed/filer/redis/redis_cluster_store.go
@@ -3,7 +3,7 @@ package redis
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
)
func init() {
@@ -20,8 +20,8 @@ func (store *RedisClusterStore) GetName() string {
func (store *RedisClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) {
- configuration.SetDefault(prefix+"useReadOnly", true)
- configuration.SetDefault(prefix+"routeByLatency", true)
+ configuration.SetDefault(prefix+"useReadOnly", false)
+ configuration.SetDefault(prefix+"routeByLatency", false)
return store.initialize(
configuration.GetStringSlice(prefix+"addresses"),
diff --git a/weed/filer/redis/redis_store.go b/weed/filer/redis/redis_store.go
index e152457ed..665352a63 100644
--- a/weed/filer/redis/redis_store.go
+++ b/weed/filer/redis/redis_store.go
@@ -3,7 +3,7 @@ package redis
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
)
func init() {
diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go
index 0de9924a3..30d11a7f4 100644
--- a/weed/filer/redis/universal_redis_store.go
+++ b/weed/filer/redis/universal_redis_store.go
@@ -7,7 +7,7 @@ import (
"strings"
"time"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -44,7 +44,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
value = util.MaybeGzipData(value)
}
- _, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
+ _, err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
if err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
@@ -52,7 +52,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
dir, name := entry.FullPath.DirAndName()
if name != "" {
- _, err = store.Client.SAdd(genDirectoryListKey(dir), name).Result()
+ _, err = store.Client.SAdd(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
@@ -68,7 +68,7 @@ func (store *UniversalRedisStore) UpdateEntry(ctx context.Context, entry *filer.
func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
- data, err := store.Client.Get(string(fullpath)).Result()
+ data, err := store.Client.Get(ctx, string(fullpath)).Result()
if err == redis.Nil {
return nil, filer_pb.ErrNotFound
}
@@ -90,7 +90,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.F
func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
- _, err = store.Client.Del(string(fullpath)).Result()
+ _, err = store.Client.Del(ctx, string(fullpath)).Result()
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
@@ -98,7 +98,7 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util
dir, name := fullpath.DirAndName()
if name != "" {
- _, err = store.Client.SRem(genDirectoryListKey(dir), name).Result()
+ _, err = store.Client.SRem(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
}
@@ -109,14 +109,14 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath util
func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
- members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
+ members, err := store.Client.SMembers(ctx, genDirectoryListKey(string(fullpath))).Result()
if err != nil {
return fmt.Errorf("delete folder %s : %v", fullpath, err)
}
for _, fileName := range members {
path := util.NewFullPath(string(fullpath), fileName)
- _, err = store.Client.Del(string(path)).Result()
+ _, err = store.Client.Del(ctx, string(path)).Result()
if err != nil {
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
}
@@ -125,17 +125,16 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full
return nil
}
-func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer.Entry, err error) {
+func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- dirListKey := genDirectoryListKey(string(fullpath))
- members, err := store.Client.SMembers(dirListKey).Result()
+ dirListKey := genDirectoryListKey(string(dirPath))
+ members, err := store.Client.SMembers(ctx, dirListKey).Result()
if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
// skip
@@ -144,7 +143,7 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full
for _, m := range members {
if strings.Compare(m, startFileName) >= 0 {
if m == startFileName {
- if inclusive {
+ if includeStartFile {
t = append(t, m)
}
} else {
@@ -161,29 +160,35 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, full
})
// limit
- if limit < len(members) {
+ if limit < int64(len(members)) {
members = members[:limit]
}
// fetch entry meta
for _, fileName := range members {
- path := util.NewFullPath(string(fullpath), fileName)
+ path := util.NewFullPath(string(dirPath), fileName)
entry, err := store.FindEntry(ctx, path)
+ lastFileName = fileName
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
+ if err == filer_pb.ErrNotFound {
+ continue
+ }
} else {
if entry.TtlSec > 0 {
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- store.Client.Del(string(path)).Result()
- store.Client.SRem(dirListKey, fileName).Result()
+ store.Client.Del(ctx, string(path)).Result()
+ store.Client.SRem(ctx, dirListKey, fileName).Result()
continue
}
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
}
- return entries, err
+ return lastFileName, err
}
func genDirectoryListKey(dir string) (dirList string) {
diff --git a/weed/filer/redis/universal_redis_store_kv.go b/weed/filer/redis/universal_redis_store_kv.go
index 0fc12c631..ad6e389ed 100644
--- a/weed/filer/redis/universal_redis_store_kv.go
+++ b/weed/filer/redis/universal_redis_store_kv.go
@@ -5,12 +5,12 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
)
func (store *UniversalRedisStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
- _, err = store.Client.Set(string(key), value, 0).Result()
+ _, err = store.Client.Set(ctx, string(key), value, 0).Result()
if err != nil {
return fmt.Errorf("kv put: %v", err)
@@ -21,7 +21,7 @@ func (store *UniversalRedisStore) KvPut(ctx context.Context, key []byte, value [
func (store *UniversalRedisStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
- data, err := store.Client.Get(string(key)).Result()
+ data, err := store.Client.Get(ctx, string(key)).Result()
if err == redis.Nil {
return nil, filer.ErrKvNotFound
@@ -32,7 +32,7 @@ func (store *UniversalRedisStore) KvGet(ctx context.Context, key []byte) (value
func (store *UniversalRedisStore) KvDelete(ctx context.Context, key []byte) (err error) {
- _, err = store.Client.Del(string(key)).Result()
+ _, err = store.Client.Del(ctx, string(key)).Result()
if err != nil {
return fmt.Errorf("kv delete: %v", err)
diff --git a/weed/filer/redis2/redis_cluster_store.go b/weed/filer/redis2/redis_cluster_store.go
index c7742bb19..22d09da25 100644
--- a/weed/filer/redis2/redis_cluster_store.go
+++ b/weed/filer/redis2/redis_cluster_store.go
@@ -3,7 +3,7 @@ package redis2
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
)
func init() {
@@ -20,8 +20,8 @@ func (store *RedisCluster2Store) GetName() string {
func (store *RedisCluster2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
- configuration.SetDefault(prefix+"useReadOnly", true)
- configuration.SetDefault(prefix+"routeByLatency", true)
+ configuration.SetDefault(prefix+"useReadOnly", false)
+ configuration.SetDefault(prefix+"routeByLatency", false)
return store.initialize(
configuration.GetStringSlice(prefix+"addresses"),
diff --git a/weed/filer/redis2/redis_store.go b/weed/filer/redis2/redis_store.go
index da404ed4c..8eb97e374 100644
--- a/weed/filer/redis2/redis_store.go
+++ b/weed/filer/redis2/redis_store.go
@@ -3,7 +3,7 @@ package redis2
import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
)
func init() {
diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go
index 00d02ea14..aab3d1f4a 100644
--- a/weed/filer/redis2/universal_redis_store.go
+++ b/weed/filer/redis2/universal_redis_store.go
@@ -5,7 +5,7 @@ import (
"fmt"
"time"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -56,7 +56,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
value = util.MaybeGzipData(value)
}
- if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
+ if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
@@ -66,7 +66,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
}
if name != "" {
- if err = store.Client.ZAddNX(genDirectoryListKey(dir), redis.Z{Score: 0, Member: name}).Err(); err != nil {
+ if err = store.Client.ZAddNX(ctx, genDirectoryListKey(dir), &redis.Z{Score: 0, Member: name}).Err(); err != nil {
return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
}
}
@@ -81,7 +81,7 @@ func (store *UniversalRedis2Store) UpdateEntry(ctx context.Context, entry *filer
func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
- data, err := store.Client.Get(string(fullpath)).Result()
+ data, err := store.Client.Get(ctx, string(fullpath)).Result()
if err == redis.Nil {
return nil, filer_pb.ErrNotFound
}
@@ -103,12 +103,12 @@ func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.
func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
- _, err = store.Client.Del(genDirectoryListKey(string(fullpath))).Result()
+ _, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result()
if err != nil {
return fmt.Errorf("delete dir list %s : %v", fullpath, err)
}
- _, err = store.Client.Del(string(fullpath)).Result()
+ _, err = store.Client.Del(ctx, string(fullpath)).Result()
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
@@ -118,7 +118,7 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
return nil
}
if name != "" {
- _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result()
+ _, err = store.Client.ZRem(ctx, genDirectoryListKey(dir), name).Result()
if err != nil {
return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err)
}
@@ -133,14 +133,14 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful
return nil
}
- members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result()
+ members, err := store.Client.ZRange(ctx, genDirectoryListKey(string(fullpath)), 0, -1).Result()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
}
for _, fileName := range members {
path := util.NewFullPath(string(fullpath), fileName)
- _, err = store.Client.Del(string(path)).Result()
+ _, err = store.Client.Del(ctx, string(path)).Result()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
}
@@ -149,45 +149,50 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful
return nil
}
-func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
- limit int) (entries []*filer.Entry, err error) {
+func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- dirListKey := genDirectoryListKey(string(fullpath))
+ dirListKey := genDirectoryListKey(string(dirPath))
start := int64(0)
if startFileName != "" {
- start, _ = store.Client.ZRank(dirListKey, startFileName).Result()
- if !inclusive {
+ start, _ = store.Client.ZRank(ctx, dirListKey, startFileName).Result()
+ if !includeStartFile {
start++
}
}
- members, err := store.Client.ZRange(dirListKey, start, start+int64(limit)-1).Result()
+ members, err := store.Client.ZRange(ctx, dirListKey, start, start+int64(limit)-1).Result()
if err != nil {
- return nil, fmt.Errorf("list %s : %v", fullpath, err)
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
// fetch entry meta
for _, fileName := range members {
- path := util.NewFullPath(string(fullpath), fileName)
+ path := util.NewFullPath(string(dirPath), fileName)
entry, err := store.FindEntry(ctx, path)
+ lastFileName = fileName
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
+ if err == filer_pb.ErrNotFound {
+ continue
+ }
} else {
if entry.TtlSec > 0 {
if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- store.Client.Del(string(path)).Result()
- store.Client.ZRem(dirListKey, fileName).Result()
+ store.Client.Del(ctx, string(path)).Result()
+ store.Client.ZRem(ctx, dirListKey, fileName).Result()
continue
}
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
}
- return entries, err
+ return lastFileName, err
}
func genDirectoryListKey(dir string) (dirList string) {
diff --git a/weed/filer/redis2/universal_redis_store_kv.go b/weed/filer/redis2/universal_redis_store_kv.go
index 658491ddf..bde994dc9 100644
--- a/weed/filer/redis2/universal_redis_store_kv.go
+++ b/weed/filer/redis2/universal_redis_store_kv.go
@@ -5,12 +5,12 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/go-redis/redis"
+ "github.com/go-redis/redis/v8"
)
func (store *UniversalRedis2Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
- _, err = store.Client.Set(string(key), value, 0).Result()
+ _, err = store.Client.Set(ctx, string(key), value, 0).Result()
if err != nil {
return fmt.Errorf("kv put: %v", err)
@@ -21,7 +21,7 @@ func (store *UniversalRedis2Store) KvPut(ctx context.Context, key []byte, value
func (store *UniversalRedis2Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
- data, err := store.Client.Get(string(key)).Result()
+ data, err := store.Client.Get(ctx, string(key)).Result()
if err == redis.Nil {
return nil, filer.ErrKvNotFound
@@ -32,7 +32,7 @@ func (store *UniversalRedis2Store) KvGet(ctx context.Context, key []byte) (value
func (store *UniversalRedis2Store) KvDelete(ctx context.Context, key []byte) (err error) {
- _, err = store.Client.Del(string(key)).Result()
+ _, err = store.Client.Del(ctx, string(key)).Result()
if err != nil {
return fmt.Errorf("kv delete: %v", err)
diff --git a/weed/filer/rocksdb/README.md b/weed/filer/rocksdb/README.md
new file mode 100644
index 000000000..6bae6d34e
--- /dev/null
+++ b/weed/filer/rocksdb/README.md
@@ -0,0 +1,41 @@
+# Prepare the compilation environment on linux
+- sudo add-apt-repository -y ppa:ubuntu-toolchain-r/test
+- sudo apt-get update -qq
+- sudo apt-get install gcc-6 g++-6 libsnappy-dev zlib1g-dev libbz2-dev -qq
+- export CXX="g++-6" CC="gcc-6"
+
+- wget https://launchpad.net/ubuntu/+archive/primary/+files/libgflags2_2.0-1.1ubuntu1_amd64.deb
+- sudo dpkg -i libgflags2_2.0-1.1ubuntu1_amd64.deb
+- wget https://launchpad.net/ubuntu/+archive/primary/+files/libgflags-dev_2.0-1.1ubuntu1_amd64.deb
+- sudo dpkg -i libgflags-dev_2.0-1.1ubuntu1_amd64.deb
+
+# Prepare the compilation environment on mac os
+```
+brew install snappy
+```
+
+# install rocksdb:
+```
+ export ROCKSDB_HOME=/Users/chris/dev/rocksdb
+
+ git clone https://github.com/facebook/rocksdb.git $ROCKSDB_HOME
+ pushd $ROCKSDB_HOME
+ make clean
+ make install-static
+ popd
+```
+
+# install gorocksdb
+
+```
+export CGO_CFLAGS="-I$ROCKSDB_HOME/include"
+export CGO_LDFLAGS="-L$ROCKSDB_HOME -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd"
+
+go get github.com/tecbot/gorocksdb
+```
+# compile with rocksdb
+
+```
+cd ~/go/src/github.com/chrislusf/seaweedfs/weed
+go install -tags rocksdb
+```
diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go
new file mode 100644
index 000000000..70c301725
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_store.go
@@ -0,0 +1,302 @@
+// +build rocksdb
+
+package rocksdb
+
+import (
+ "bytes"
+ "context"
+ "crypto/md5"
+ "fmt"
+ "io"
+
+ "github.com/tecbot/gorocksdb"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RocksDBStore{})
+}
+
+type options struct {
+ opt *gorocksdb.Options
+ ro *gorocksdb.ReadOptions
+ wo *gorocksdb.WriteOptions
+}
+
+func (opt *options) init() {
+ opt.opt = gorocksdb.NewDefaultOptions()
+ opt.ro = gorocksdb.NewDefaultReadOptions()
+ opt.wo = gorocksdb.NewDefaultWriteOptions()
+}
+
+func (opt *options) close() {
+ opt.opt.Destroy()
+ opt.ro.Destroy()
+ opt.wo.Destroy()
+}
+
+type RocksDBStore struct {
+ path string
+ db *gorocksdb.DB
+ options
+}
+
+func (store *RocksDBStore) GetName() string {
+ return "rocksdb"
+}
+
+func (store *RocksDBStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
+ dir := configuration.GetString(prefix + "dir")
+ return store.initialize(dir)
+}
+
+func (store *RocksDBStore) initialize(dir string) (err error) {
+ glog.Infof("filer store rocksdb dir: %s", dir)
+ if err := weed_util.TestFolderWritable(dir); err != nil {
+ return fmt.Errorf("Check Level Folder %s Writable: %s", dir, err)
+ }
+ store.options.init()
+ store.opt.SetCreateIfMissing(true)
+ // reduce write amplification
+ // also avoid expired data stored in highest level never get compacted
+ store.opt.SetLevelCompactionDynamicLevelBytes(true)
+ store.opt.SetCompactionFilter(NewTTLFilter())
+ // store.opt.SetMaxBackgroundCompactions(2)
+
+ store.db, err = gorocksdb.OpenDb(store.opt, dir)
+
+ return
+}
+
+func (store *RocksDBStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *RocksDBStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *RocksDBStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *RocksDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ dir, name := entry.DirAndName()
+ key := genKey(dir, name)
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ err = store.db.Put(store.wo, key, value)
+
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
+
+ return nil
+}
+
+func (store *RocksDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *RocksDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
+ dir, name := fullpath.DirAndName()
+ key := genKey(dir, name)
+ data, err := store.db.Get(store.ro, key)
+
+ if data == nil {
+ return nil, filer_pb.ErrNotFound
+ }
+ defer data.Free()
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(data.Data())
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
+
+ return entry, nil
+}
+
+func (store *RocksDBStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ dir, name := fullpath.DirAndName()
+ key := genKey(dir, name)
+
+ err = store.db.Delete(store.wo, key)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ batch := gorocksdb.NewWriteBatch()
+ defer batch.Destroy()
+
+ ro := gorocksdb.NewDefaultReadOptions()
+ defer ro.Destroy()
+ ro.SetFillCache(false)
+
+ iter := store.db.NewIterator(ro)
+ defer iter.Close()
+ err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool {
+ batch.Delete(key)
+ return true
+ })
+ if err != nil {
+ return fmt.Errorf("delete list %s : %v", fullpath, err)
+ }
+
+ err = store.db.Write(store.wo, batch)
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int64, fn func(key, value []byte) bool) (err error) {
+
+ if len(lastKey) == 0 {
+ iter.Seek(prefix)
+ } else {
+ iter.Seek(lastKey)
+ if !includeLastKey {
+ if iter.Valid() {
+ if bytes.Equal(iter.Key().Data(), lastKey) {
+ iter.Next()
+ }
+ }
+ }
+ }
+
+ i := int64(0)
+ for ; iter.Valid(); iter.Next() {
+
+ if limit > 0 {
+ i++
+ if i > limit {
+ break
+ }
+ }
+
+ key := iter.Key().Data()
+
+ if !bytes.HasPrefix(key, prefix) {
+ break
+ }
+
+ ret := fn(key, iter.Value().Data())
+
+ if !ret {
+ break
+ }
+
+ }
+
+ if err := iter.Err(); err != nil {
+ return fmt.Errorf("prefix scan iterator: %v", err)
+ }
+ return nil
+}
+
+func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
+}
+
+func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+
+ directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
+ lastFileStart := directoryPrefix
+ if startFileName != "" {
+ lastFileStart = genDirectoryKeyPrefix(dirPath, startFileName)
+ }
+
+ ro := gorocksdb.NewDefaultReadOptions()
+ defer ro.Destroy()
+ ro.SetFillCache(false)
+
+ iter := store.db.NewIterator(ro)
+ defer iter.Close()
+ err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, func(key, value []byte) bool {
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ return true
+ }
+ entry := &filer.Entry{
+ FullPath: weed_util.NewFullPath(string(dirPath), fileName),
+ }
+ lastFileName = fileName
+
+ // println("list", entry.FullPath, "chunks", len(entry.Chunks))
+ if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ return false
+ }
+ if !eachEntryFunc(entry) {
+ return false
+ }
+ return true
+ })
+ if err != nil {
+ return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
+ }
+
+ return lastFileName, err
+}
+
+func genKey(dirPath, fileName string) (key []byte) {
+ key = hashToBytes(dirPath)
+ key = append(key, []byte(fileName)...)
+ return key
+}
+
+func genDirectoryKeyPrefix(fullpath weed_util.FullPath, startFileName string) (keyPrefix []byte) {
+ keyPrefix = hashToBytes(string(fullpath))
+ if len(startFileName) > 0 {
+ keyPrefix = append(keyPrefix, []byte(startFileName)...)
+ }
+ return keyPrefix
+}
+
+func getNameFromKey(key []byte) string {
+
+ return string(key[md5.Size:])
+
+}
+
+// hash directory, and use last byte for partitioning
+func hashToBytes(dir string) []byte {
+ h := md5.New()
+ io.WriteString(h, dir)
+
+ b := h.Sum(nil)
+
+ return b
+}
+
+func (store *RocksDBStore) Shutdown() {
+ store.db.Close()
+ store.options.close()
+}
diff --git a/weed/filer/rocksdb/rocksdb_store_kv.go b/weed/filer/rocksdb/rocksdb_store_kv.go
new file mode 100644
index 000000000..cf1214d5b
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_store_kv.go
@@ -0,0 +1,47 @@
+// +build rocksdb
+
+package rocksdb
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+)
+
+func (store *RocksDBStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+
+ err = store.db.Put(store.wo, key, value)
+
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+
+ return nil
+}
+
+func (store *RocksDBStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+
+ value, err = store.db.GetBytes(store.ro, key)
+
+ if value == nil {
+ return nil, filer.ErrKvNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("kv get: %v", err)
+ }
+
+ return
+}
+
+func (store *RocksDBStore) KvDelete(ctx context.Context, key []byte) (err error) {
+
+ err = store.db.Delete(store.wo, key)
+
+ if err != nil {
+ return fmt.Errorf("kv delete: %v", err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/rocksdb/rocksdb_store_test.go b/weed/filer/rocksdb/rocksdb_store_test.go
new file mode 100644
index 000000000..439663524
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_store_test.go
@@ -0,0 +1,117 @@
+// +build rocksdb
+
+package rocksdb
+
+import (
+ "context"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func TestCreateAndFind(t *testing.T) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
+ defer os.RemoveAll(dir)
+ store := &RocksDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ fullpath := util.FullPath("/home/chris/this/is/one/file1.jpg")
+
+ ctx := context.Background()
+
+ entry1 := &filer.Entry{
+ FullPath: fullpath,
+ Attr: filer.Attr{
+ Mode: 0440,
+ Uid: 1234,
+ Gid: 5678,
+ },
+ }
+
+ if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ t.Errorf("create entry %v: %v", entry1.FullPath, err)
+ return
+ }
+
+ entry, err := testFiler.FindEntry(ctx, fullpath)
+
+ if err != nil {
+ t.Errorf("find entry: %v", err)
+ return
+ }
+
+ if entry.FullPath != entry1.FullPath {
+ t.Errorf("find wrong entry: %v", entry.FullPath)
+ return
+ }
+
+ // checking one upper directory
+ entries, _, _ := testFiler.ListDirectoryEntries(ctx, util.FullPath("/home/chris/this/is/one"), "", false, 100, "", "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+ // checking one upper directory
+ entries, _, _ = testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ if len(entries) != 1 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
+
+func TestEmptyRoot(t *testing.T) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
+ defer os.RemoveAll(dir)
+ store := &RocksDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ ctx := context.Background()
+
+ // checking one upper directory
+ entries, _, err := testFiler.ListDirectoryEntries(ctx, util.FullPath("/"), "", false, 100, "", "")
+ if err != nil {
+ t.Errorf("list entries: %v", err)
+ return
+ }
+ if len(entries) != 0 {
+ t.Errorf("list entries count: %v", len(entries))
+ return
+ }
+
+}
+
+func BenchmarkInsertEntry(b *testing.B) {
+ testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
+ dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench")
+ defer os.RemoveAll(dir)
+ store := &RocksDBStore{}
+ store.initialize(dir)
+ testFiler.SetStore(store)
+
+ ctx := context.Background()
+
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ entry := &filer.Entry{
+ FullPath: util.FullPath(fmt.Sprintf("/file%d.txt", i)),
+ Attr: filer.Attr{
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Mode: os.FileMode(0644),
+ },
+ }
+ store.InsertEntry(ctx, entry)
+ }
+}
diff --git a/weed/filer/rocksdb/rocksdb_ttl.go b/weed/filer/rocksdb/rocksdb_ttl.go
new file mode 100644
index 000000000..faed22310
--- /dev/null
+++ b/weed/filer/rocksdb/rocksdb_ttl.go
@@ -0,0 +1,40 @@
+//+build rocksdb
+
+package rocksdb
+
+import (
+ "time"
+
+ "github.com/tecbot/gorocksdb"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+)
+
+type TTLFilter struct {
+ skipLevel0 bool
+}
+
+func NewTTLFilter() gorocksdb.CompactionFilter {
+ return &TTLFilter{
+ skipLevel0: true,
+ }
+}
+
+func (t *TTLFilter) Filter(level int, key, val []byte) (remove bool, newVal []byte) {
+ // decode could be slow, causing write stall
+ // level >0 sst can run compaction in parallel
+ if !t.skipLevel0 || level > 0 {
+ entry := filer.Entry{}
+ if err := entry.DecodeAttributesAndChunks(val); err == nil {
+ if entry.TtlSec > 0 &&
+ entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) {
+ return true, nil
+ }
+ }
+ }
+ return false, val
+}
+
+func (t *TTLFilter) Name() string {
+ return "TTLFilter"
+}
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index cffdc8303..f0042a0ff 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -13,16 +13,16 @@ import (
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
-func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
+func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
// fmt.Printf("start to stream content for chunks: %+v\n", chunks)
- chunkViews := ViewFromChunks(masterClient.LookupFileId, chunks, offset, size)
+ chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
for _, chunkView := range chunkViews {
- urlStrings, err := masterClient.LookupFileId(chunkView.FileId)
+ urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
return err
@@ -86,7 +86,7 @@ type ChunkStreamReader struct {
bufferOffset int64
bufferPos int
chunkIndex int
- lookupFileId LookupFileIdFunctionType
+ lookupFileId wdclient.LookupFileIdFunctionType
}
var _ = io.ReadSeeker(&ChunkStreamReader{})