aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go2
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store_kv.go8
-rw-r--r--weed/filer/arangodb/arangodb_store.go347
-rw-r--r--weed/filer/arangodb/arangodb_store_bucket.go40
-rw-r--r--weed/filer/arangodb/arangodb_store_kv.go54
-rw-r--r--weed/filer/arangodb/helpers.go136
-rw-r--r--weed/filer/arangodb/readme.md52
-rw-r--r--weed/filer/cassandra/cassandra_store.go8
-rw-r--r--weed/filer/configuration.go5
-rw-r--r--weed/filer/elastic/v7/doc.go9
-rw-r--r--weed/filer/elastic/v7/elastic_store.go3
-rw-r--r--weed/filer/elastic/v7/elastic_store_kv.go3
-rw-r--r--weed/filer/entry.go9
-rw-r--r--weed/filer/entry_codec.go13
-rw-r--r--weed/filer/etcd/etcd_store.go6
-rw-r--r--weed/filer/etcd/etcd_store_test.go16
-rw-r--r--weed/filer/filechunk_manifest.go66
-rw-r--r--weed/filer/filechunks.go71
-rw-r--r--weed/filer/filechunks2_test.go10
-rw-r--r--weed/filer/filechunks_read.go116
-rw-r--r--weed/filer/filechunks_read_test.go210
-rw-r--r--weed/filer/filer.go134
-rw-r--r--weed/filer/filer_buckets.go106
-rw-r--r--weed/filer/filer_conf.go56
-rw-r--r--weed/filer/filer_delete_entry.go76
-rw-r--r--weed/filer/filer_deletion.go36
-rw-r--r--weed/filer/filer_hardlink.go16
-rw-r--r--weed/filer/filer_notify.go52
-rw-r--r--weed/filer/filer_notify_append.go15
-rw-r--r--weed/filer/filer_on_meta_event.go12
-rw-r--r--weed/filer/filer_search.go8
-rw-r--r--weed/filer/filerstore.go7
-rw-r--r--weed/filer/filerstore_hardlink.go19
-rw-r--r--weed/filer/filerstore_translate_path.go55
-rw-r--r--weed/filer/filerstore_wrapper.go64
-rw-r--r--weed/filer/hbase/hbase_store.go2
-rw-r--r--weed/filer/leveldb/leveldb_store.go24
-rw-r--r--weed/filer/leveldb/leveldb_store_test.go18
-rw-r--r--weed/filer/leveldb2/leveldb2_store.go9
-rw-r--r--weed/filer/leveldb2/leveldb2_store_test.go14
-rw-r--r--weed/filer/leveldb3/leveldb3_store.go16
-rw-r--r--weed/filer/leveldb3/leveldb3_store_test.go14
-rw-r--r--weed/filer/meta_aggregator.go172
-rw-r--r--weed/filer/mongodb/mongodb_store.go14
-rw-r--r--weed/filer/mysql2/mysql2_store.go3
-rw-r--r--weed/filer/permission.go22
-rw-r--r--weed/filer/read_remote.go8
-rw-r--r--weed/filer/read_write.go14
-rw-r--r--weed/filer/reader_at.go130
-rw-r--r--weed/filer/reader_at_test.go40
-rw-r--r--weed/filer/reader_cache.go192
-rw-r--r--weed/filer/reader_pattern.go38
-rw-r--r--weed/filer/redis/universal_redis_store.go10
-rw-r--r--weed/filer/redis2/redis_sentinel_store.go45
-rw-r--r--weed/filer/redis2/universal_redis_store.go27
-rw-r--r--weed/filer/redis3/ItemList.go507
-rw-r--r--weed/filer/redis3/item_list_serde.go75
-rw-r--r--weed/filer/redis3/kv_directory_children.go138
-rw-r--r--weed/filer/redis3/kv_directory_children_test.go210
-rw-r--r--weed/filer/redis3/redis_cluster_store.go45
-rw-r--r--weed/filer/redis3/redis_sentinel_store.go49
-rw-r--r--weed/filer/redis3/redis_store.go39
-rw-r--r--weed/filer/redis3/skiplist_element_store.go62
-rw-r--r--weed/filer/redis3/universal_redis_store.go179
-rw-r--r--weed/filer/redis3/universal_redis_store_kv.go42
-rw-r--r--weed/filer/redis_lua/redis_cluster_store.go44
-rw-r--r--weed/filer/redis_lua/redis_sentinel_store.go45
-rw-r--r--weed/filer/redis_lua/redis_store.go38
-rw-r--r--weed/filer/redis_lua/stored_procedure/delete_entry.lua19
-rw-r--r--weed/filer/redis_lua/stored_procedure/delete_folder_children.lua15
-rw-r--r--weed/filer/redis_lua/stored_procedure/init.go24
-rw-r--r--weed/filer/redis_lua/stored_procedure/insert_entry.lua27
-rw-r--r--weed/filer/redis_lua/universal_redis_store.go191
-rw-r--r--weed/filer/redis_lua/universal_redis_store_kv.go42
-rw-r--r--weed/filer/remote_mapping.go121
-rw-r--r--weed/filer/remote_storage.go (renamed from weed/filer/filer_remote_storage.go)55
-rw-r--r--weed/filer/remote_storage_test.go (renamed from weed/filer/filer_remote_storage_test.go)0
-rw-r--r--weed/filer/rocksdb/rocksdb_store.go2
-rw-r--r--weed/filer/rocksdb/rocksdb_store_test.go13
-rw-r--r--weed/filer/rocksdb/rocksdb_ttl.go7
-rw-r--r--weed/filer/s3iam_conf.go3
-rw-r--r--weed/filer/sqlite/doc.go9
-rw-r--r--weed/filer/sqlite/sqlite_store.go2
-rw-r--r--weed/filer/sqlite/sqlite_store_unsupported.go3
-rw-r--r--weed/filer/store_test/test_suite.go55
-rw-r--r--weed/filer/stream.go91
-rw-r--r--weed/filer/tikv/tikv_store.go15
-rw-r--r--weed/filer/tikv/tikv_store_kv.go2
-rw-r--r--weed/filer/ydb/doc.go9
-rw-r--r--weed/filer/ydb/readme.md27
-rw-r--r--weed/filer/ydb/ydb_queries.go72
-rw-r--r--weed/filer/ydb/ydb_store.go413
-rw-r--r--weed/filer/ydb/ydb_store_kv.go75
-rw-r--r--weed/filer/ydb/ydb_store_test.go19
-rw-r--r--weed/filer/ydb/ydb_types.go56
95 files changed, 4794 insertions, 698 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index 4bf9b16fa..13268b944 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -156,7 +156,7 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = util.MaybeGzipData(meta)
}
diff --git a/weed/filer/abstract_sql/abstract_sql_store_kv.go b/weed/filer/abstract_sql/abstract_sql_store_kv.go
index 03b016c76..aaf1c196c 100644
--- a/weed/filer/abstract_sql/abstract_sql_store_kv.go
+++ b/weed/filer/abstract_sql/abstract_sql_store_kv.go
@@ -18,7 +18,7 @@ func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []by
return fmt.Errorf("findDB: %v", err)
}
- dirStr, dirHash, name := genDirAndName(key)
+ dirStr, dirHash, name := GenDirAndName(key)
res, err := db.ExecContext(ctx, store.GetSqlInsert(DEFAULT_TABLE), dirHash, name, dirStr, value)
if err == nil {
@@ -53,7 +53,7 @@ func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []b
return nil, fmt.Errorf("findDB: %v", err)
}
- dirStr, dirHash, name := genDirAndName(key)
+ dirStr, dirHash, name := GenDirAndName(key)
row := db.QueryRowContext(ctx, store.GetSqlFind(DEFAULT_TABLE), dirHash, name, dirStr)
err = row.Scan(&value)
@@ -76,7 +76,7 @@ func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err er
return fmt.Errorf("findDB: %v", err)
}
- dirStr, dirHash, name := genDirAndName(key)
+ dirStr, dirHash, name := GenDirAndName(key)
res, err := db.ExecContext(ctx, store.GetSqlDelete(DEFAULT_TABLE), dirHash, name, dirStr)
if err != nil {
@@ -92,7 +92,7 @@ func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err er
}
-func genDirAndName(key []byte) (dirStr string, dirHash int64, name string) {
+func GenDirAndName(key []byte) (dirStr string, dirHash int64, name string) {
for len(key) < 8 {
key = append(key, 0)
}
diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go
new file mode 100644
index 000000000..13d14b2b0
--- /dev/null
+++ b/weed/filer/arangodb/arangodb_store.go
@@ -0,0 +1,347 @@
+package arangodb
+
+import (
+ "context"
+ "crypto/tls"
+ "fmt"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/arangodb/go-driver"
+ "github.com/arangodb/go-driver/http"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &ArangodbStore{})
+}
+
+var (
+ BUCKET_PREFIX = "/buckets"
+ DEFAULT_COLLECTION = "seaweed_no_bucket"
+ KVMETA_COLLECTION = "seaweed_kvmeta"
+)
+
+type ArangodbStore struct {
+ connect driver.Connection
+ client driver.Client
+ database driver.Database
+ kvCollection driver.Collection
+
+ buckets map[string]driver.Collection
+ mu sync.RWMutex
+
+ databaseName string
+}
+
+type Model struct {
+ Key string `json:"_key"`
+ Directory string `json:"directory,omitempty"`
+ Name string `json:"name,omitempty"`
+ Ttl string `json:"ttl,omitempty"`
+
+ //arangodb does not support binary blobs
+ //we encode byte slice into uint64 slice
+ //see helpers.go
+ Meta []uint64 `json:"meta"`
+}
+
+func (store *ArangodbStore) GetName() string {
+ return "arangodb"
+}
+
+func (store *ArangodbStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ store.buckets = make(map[string]driver.Collection, 3)
+ store.databaseName = configuration.GetString(prefix + "db_name")
+ return store.connection(configuration.GetStringSlice(prefix+"servers"),
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetBool(prefix+"insecure_skip_verify"),
+ )
+}
+
+func (store *ArangodbStore) connection(uris []string, user string, pass string, insecure bool) (err error) {
+ ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
+
+ store.connect, err = http.NewConnection(http.ConnectionConfig{
+ Endpoints: uris,
+ TLSConfig: &tls.Config{
+ InsecureSkipVerify: insecure,
+ },
+ })
+ if err != nil {
+ return err
+ }
+ store.client, err = driver.NewClient(driver.ClientConfig{
+ Connection: store.connect,
+ Authentication: driver.BasicAuthentication(user, pass),
+ })
+ if err != nil {
+ return err
+ }
+ ok, err := store.client.DatabaseExists(ctx, store.databaseName)
+ if err != nil {
+ return err
+ }
+ if ok {
+ store.database, err = store.client.Database(ctx, store.databaseName)
+ } else {
+ store.database, err = store.client.CreateDatabase(ctx, store.databaseName, &driver.CreateDatabaseOptions{})
+ }
+ if err != nil {
+ return err
+ }
+ if store.kvCollection, err = store.ensureCollection(ctx, KVMETA_COLLECTION); err != nil {
+ return err
+ }
+ return err
+}
+
+type key int
+
+const (
+ transactionKey key = 0
+)
+
+func (store *ArangodbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ keys := make([]string, 0, len(store.buckets)+1)
+ for k := range store.buckets {
+ keys = append(keys, k)
+ }
+ keys = append(keys, store.kvCollection.Name())
+ txn, err := store.database.BeginTransaction(ctx, driver.TransactionCollections{
+ Exclusive: keys,
+ }, &driver.BeginTransactionOptions{})
+ if err != nil {
+ return nil, err
+ }
+
+ return context.WithValue(ctx, transactionKey, txn), nil
+}
+
+func (store *ArangodbStore) CommitTransaction(ctx context.Context) error {
+ val := ctx.Value(transactionKey)
+ cast, ok := val.(driver.TransactionID)
+ if !ok {
+ return fmt.Errorf("txn cast fail %s:", val)
+ }
+ err := store.database.CommitTransaction(ctx, cast, &driver.CommitTransactionOptions{})
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (store *ArangodbStore) RollbackTransaction(ctx context.Context) error {
+ val := ctx.Value(transactionKey)
+ cast, ok := val.(driver.TransactionID)
+ if !ok {
+ return fmt.Errorf("txn cast fail %s:", val)
+ }
+ err := store.database.AbortTransaction(ctx, cast, &driver.AbortTransactionOptions{})
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (store *ArangodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
+ meta = util.MaybeGzipData(meta)
+ }
+ model := &Model{
+ Key: hashString(string(entry.FullPath)),
+ Directory: dir,
+ Name: name,
+ Meta: bytesToArray(meta),
+ }
+ if entry.TtlSec > 0 {
+ model.Ttl = time.Now().Add(time.Second * time.Duration(entry.TtlSec)).Format(time.RFC3339)
+ } else {
+ model.Ttl = ""
+ }
+
+ targetCollection, err := store.extractBucketCollection(ctx, entry.FullPath)
+ if err != nil {
+ return err
+ }
+ _, err = targetCollection.CreateDocument(ctx, model)
+ if driver.IsConflict(err) {
+ return store.UpdateEntry(ctx, entry)
+ }
+
+ if err != nil {
+ return fmt.Errorf("InsertEntry %s: %v", entry.FullPath, err)
+ }
+
+ return nil
+
+}
+
+func (store *ArangodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
+ meta = util.MaybeGzipData(meta)
+ }
+ model := &Model{
+ Key: hashString(string(entry.FullPath)),
+ Directory: dir,
+ Name: name,
+ Meta: bytesToArray(meta),
+ }
+ if entry.TtlSec > 0 {
+ model.Ttl = time.Now().Add(time.Duration(entry.TtlSec) * time.Second).Format(time.RFC3339)
+ } else {
+ model.Ttl = "none"
+ }
+ targetCollection, err := store.extractBucketCollection(ctx, entry.FullPath)
+ if err != nil {
+ return err
+ }
+ _, err = targetCollection.UpdateDocument(ctx, model.Key, model)
+ if err != nil {
+ return fmt.Errorf("UpdateEntry %s: %v", entry.FullPath, err)
+ }
+
+ return nil
+}
+
+func (store *ArangodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+ var data Model
+ targetCollection, err := store.extractBucketCollection(ctx, fullpath)
+ if err != nil {
+ return nil, err
+ }
+ _, err = targetCollection.ReadDocument(ctx, hashString(string(fullpath)), &data)
+ if err != nil {
+ if driver.IsNotFound(err) {
+ return nil, filer_pb.ErrNotFound
+ }
+ glog.Errorf("find %s: %v", fullpath, err)
+ return nil, filer_pb.ErrNotFound
+ }
+ if len(data.Meta) == 0 {
+ return nil, filer_pb.ErrNotFound
+ }
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(arrayToBytes(data.Meta)))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *ArangodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
+ targetCollection, err := store.extractBucketCollection(ctx, fullpath)
+ if err != nil {
+ return err
+ }
+ _, err = targetCollection.RemoveDocument(ctx, hashString(string(fullpath)))
+ if err != nil && !driver.IsNotFound(err) {
+ glog.Errorf("find %s: %v", fullpath, err)
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+ return nil
+}
+
+// this runs in log time
+func (store *ArangodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+ var query string
+ targetCollection, err := store.extractBucketCollection(ctx, fullpath)
+ if err != nil {
+ return err
+ }
+ query = query + fmt.Sprintf(`
+ for d in %s
+ filter starts_with(d.directory, "%s/") || d.directory == "%s"
+ remove d._key in %s`,
+ targetCollection.Name(),
+ strings.Join(strings.Split(string(fullpath), "/"), ","),
+ string(fullpath),
+ targetCollection.Name(),
+ )
+ cur, err := store.database.Query(ctx, query, nil)
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+ defer cur.Close()
+ return nil
+}
+
+func (store *ArangodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
+}
+
+func (store *ArangodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ targetCollection, err := store.extractBucketCollection(ctx, dirPath+"/")
+ if err != nil {
+ return lastFileName, err
+ }
+ query := "for d in " + targetCollection.Name()
+ if includeStartFile {
+ query = query + " filter d.name >= \"" + startFileName + "\" "
+ } else {
+ query = query + " filter d.name > \"" + startFileName + "\" "
+ }
+ if prefix != "" {
+ query = query + fmt.Sprintf(`&& starts_with(d.name, "%s")`, prefix)
+ }
+ query = query + `
+filter d.directory == @dir
+sort d.name asc
+`
+ if limit > 0 {
+ query = query + "limit " + strconv.Itoa(int(limit))
+ }
+ query = query + "\n return d"
+ cur, err := store.database.Query(ctx, query, map[string]interface{}{"dir": dirPath})
+ if err != nil {
+ return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err)
+ }
+ defer cur.Close()
+ for cur.HasMore() {
+ var data Model
+ _, err = cur.ReadDocument(ctx, &data)
+ if err != nil {
+ break
+ }
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(data.Directory, data.Name),
+ }
+ lastFileName = data.Name
+ converted := arrayToBytes(data.Meta)
+ if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(converted)); decodeErr != nil {
+ err = decodeErr
+ glog.V(0).Infof("list %s : %v", entry.FullPath, err)
+ break
+ }
+
+ if !eachEntryFunc(entry) {
+ break
+ }
+
+ }
+ return lastFileName, err
+}
+
+func (store *ArangodbStore) Shutdown() {
+}
diff --git a/weed/filer/arangodb/arangodb_store_bucket.go b/weed/filer/arangodb/arangodb_store_bucket.go
new file mode 100644
index 000000000..810d639a7
--- /dev/null
+++ b/weed/filer/arangodb/arangodb_store_bucket.go
@@ -0,0 +1,40 @@
+package arangodb
+
+import (
+ "context"
+ "github.com/arangodb/go-driver"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+var _ filer.BucketAware = (*ArangodbStore)(nil)
+
+func (store *ArangodbStore) OnBucketCreation(bucket string) {
+ timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ // create the collection && add to cache
+ _, err := store.ensureBucket(timeout, bucket)
+ if err != nil {
+ glog.Errorf("bucket create %s: %v", bucket, err)
+ }
+}
+func (store *ArangodbStore) OnBucketDeletion(bucket string) {
+ timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ collection, err := store.ensureBucket(timeout, bucket)
+ if err != nil {
+ glog.Errorf("bucket delete %s: %v", bucket, err)
+ return
+ }
+ err = collection.Remove(timeout)
+ if err != nil && !driver.IsNotFound(err) {
+ glog.Errorf("bucket delete %s: %v", bucket, err)
+ return
+ }
+}
+func (store *ArangodbStore) CanDropWholeBucket() bool {
+ return true
+}
diff --git a/weed/filer/arangodb/arangodb_store_kv.go b/weed/filer/arangodb/arangodb_store_kv.go
new file mode 100644
index 000000000..c1307e78d
--- /dev/null
+++ b/weed/filer/arangodb/arangodb_store_kv.go
@@ -0,0 +1,54 @@
+package arangodb
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/arangodb/go-driver"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func (store *ArangodbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ model := &Model{
+ Key: hashString(".kvstore." + string(key)),
+ Directory: ".kvstore." + string(key),
+ Meta: bytesToArray(value),
+ }
+
+ exists, err := store.kvCollection.DocumentExists(ctx, model.Key)
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+ if exists {
+ _, err = store.kvCollection.UpdateDocument(ctx, model.Key, model)
+ } else {
+ _, err = store.kvCollection.CreateDocument(ctx, model)
+ }
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+
+ return nil
+}
+func (store *ArangodbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+ var model Model
+ _, err = store.kvCollection.ReadDocument(ctx, hashString(".kvstore."+string(key)), &model)
+ if driver.IsNotFound(err) {
+ return nil, filer.ErrKvNotFound
+ }
+ if err != nil {
+ glog.Errorf("kv get: %s %v", string(key), err)
+ return nil, filer.ErrKvNotFound
+ }
+ return arrayToBytes(model.Meta), nil
+}
+
+func (store *ArangodbStore) KvDelete(ctx context.Context, key []byte) (err error) {
+ _, err = store.kvCollection.RemoveDocument(ctx, hashString(".kvstore."+string(key)))
+ if err != nil {
+ glog.Errorf("kv del: %v", err)
+ return filer.ErrKvNotFound
+ }
+ return nil
+}
diff --git a/weed/filer/arangodb/helpers.go b/weed/filer/arangodb/helpers.go
new file mode 100644
index 000000000..943189781
--- /dev/null
+++ b/weed/filer/arangodb/helpers.go
@@ -0,0 +1,136 @@
+package arangodb
+
+import (
+ "context"
+ "crypto/md5"
+ "encoding/binary"
+ "encoding/hex"
+ "io"
+ "strings"
+
+ "github.com/arangodb/go-driver"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+//convert a string into arango-key safe hex bytes hash
+func hashString(dir string) string {
+ h := md5.New()
+ io.WriteString(h, dir)
+ b := h.Sum(nil)
+ return hex.EncodeToString(b)
+}
+
+// convert slice of bytes into slice of uint64
+// the first uint64 indicates the length in bytes
+func bytesToArray(bs []byte) []uint64 {
+ out := make([]uint64, 0, 2+len(bs)/8)
+ out = append(out, uint64(len(bs)))
+ for len(bs)%8 != 0 {
+ bs = append(bs, 0)
+ }
+ for i := 0; i < len(bs); i = i + 8 {
+ out = append(out, binary.BigEndian.Uint64(bs[i:]))
+ }
+ return out
+}
+
+// convert from slice of uint64 back to bytes
+// if input length is 0 or 1, will return nil
+func arrayToBytes(xs []uint64) []byte {
+ if len(xs) < 2 {
+ return nil
+ }
+ first := xs[0]
+ out := make([]byte, len(xs)*8) // i think this can actually be len(xs)*8-8, but i dont think an extra 8 bytes hurts...
+ for i := 1; i < len(xs); i = i + 1 {
+ binary.BigEndian.PutUint64(out[((i-1)*8):], xs[i])
+ }
+ return out[:first]
+}
+
+// gets the collection the bucket points to from filepath
+func (store *ArangodbStore) extractBucketCollection(ctx context.Context, fullpath util.FullPath) (c driver.Collection, err error) {
+ bucket, _ := extractBucket(fullpath)
+ if bucket == "" {
+ bucket = DEFAULT_COLLECTION
+ }
+ c, err = store.ensureBucket(ctx, bucket)
+ if err != nil {
+ return nil, err
+ }
+ return c, err
+}
+
+// called by extractBucketCollection
+func extractBucket(fullpath util.FullPath) (string, string) {
+ if !strings.HasPrefix(string(fullpath), BUCKET_PREFIX+"/") {
+ return "", string(fullpath)
+ }
+ if strings.Count(string(fullpath), "/") < 3 {
+ return "", string(fullpath)
+ }
+ bucketAndObjectKey := string(fullpath)[len(BUCKET_PREFIX+"/"):]
+ t := strings.Index(bucketAndObjectKey, "/")
+ bucket := bucketAndObjectKey
+ shortPath := "/"
+ if t > 0 {
+ bucket = bucketAndObjectKey[:t]
+ shortPath = string(util.FullPath(bucketAndObjectKey[t:]))
+ }
+ return bucket, shortPath
+}
+
+// get bucket collection from cache. if not exist, creates the buckets collection and grab it
+func (store *ArangodbStore) ensureBucket(ctx context.Context, bucket string) (bc driver.Collection, err error) {
+ var ok bool
+ store.mu.RLock()
+ bc, ok = store.buckets[bucket]
+ store.mu.RUnlock()
+ if ok {
+ return bc, nil
+ }
+ store.mu.Lock()
+ defer store.mu.Unlock()
+ store.buckets[bucket], err = store.ensureCollection(ctx, bucket)
+ if err != nil {
+ return nil, err
+ }
+ return store.buckets[bucket], nil
+}
+
+// creates collection if not exist, ensures indices if not exist
+func (store *ArangodbStore) ensureCollection(ctx context.Context, name string) (c driver.Collection, err error) {
+ ok, err := store.database.CollectionExists(ctx, name)
+ if err != nil {
+ return
+ }
+ if ok {
+ c, err = store.database.Collection(ctx, name)
+ } else {
+ c, err = store.database.CreateCollection(ctx, name, &driver.CreateCollectionOptions{})
+ }
+ if err != nil {
+ return
+ }
+ // ensure indices
+ if _, _, err = c.EnsurePersistentIndex(ctx, []string{"directory", "name"},
+ &driver.EnsurePersistentIndexOptions{
+ Name: "directory_name_multi", Unique: true,
+ }); err != nil {
+ return
+ }
+ if _, _, err = c.EnsurePersistentIndex(ctx, []string{"directory"},
+ &driver.EnsurePersistentIndexOptions{Name: "IDX_directory"}); err != nil {
+ return
+ }
+ if _, _, err = c.EnsureTTLIndex(ctx, "ttl", 1,
+ &driver.EnsureTTLIndexOptions{Name: "IDX_TTL"}); err != nil {
+ return
+ }
+ if _, _, err = c.EnsurePersistentIndex(ctx, []string{"name"}, &driver.EnsurePersistentIndexOptions{
+ Name: "IDX_name",
+ }); err != nil {
+ return
+ }
+ return c, nil
+}
diff --git a/weed/filer/arangodb/readme.md b/weed/filer/arangodb/readme.md
new file mode 100644
index 000000000..e189811fb
--- /dev/null
+++ b/weed/filer/arangodb/readme.md
@@ -0,0 +1,52 @@
+##arangodb
+
+database: https://github.com/arangodb/arangodb
+go driver: https://github.com/arangodb/go-driver
+
+options:
+
+```
+[arangodb]
+enabled=true
+db_name="seaweedfs"
+servers=["http://localhost:8529"]
+#basic auth
+user="root"
+pass="test"
+
+# tls settings
+insecure_skip_verify=true
+```
+
+i test using this dev database:
+`docker run -p 8529:8529 -e ARANGO_ROOT_PASSWORD=test arangodb/arangodb:3.9.0`
+
+
+## features i don't personally need but are missing
+ [ ] provide tls cert to arango
+ [ ] authentication that is not basic auth
+ [ ] synchronise endpoint interval config
+ [ ] automatic creation of custom index
+ [ ] configure default arangodb collection sharding rules
+ [ ] configure default arangodb collection replication rules
+
+
+## complexity
+
+ok, so if https://www.arangodb.com/docs/stable/indexing-index-basics.html#persistent-index is correct
+
+O(1)
+- InsertEntry
+- UpdateEntry
+- FindEntry
+- DeleteEntry
+- KvPut
+- KvGet
+- KvDelete
+
+O(log(BUCKET_SIZE))
+- DeleteFolderChildren
+
+O(log(DIRECTORY_SIZE))
+- ListDirectoryEntries
+- ListDirectoryPrefixedEntries
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
index fc0b52ac7..d8c094a45 100644
--- a/weed/filer/cassandra/cassandra_store.go
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/gocql/gocql"
+ "time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -33,6 +34,7 @@ func (store *CassandraStore) Initialize(configuration util.Configuration, prefix
configuration.GetString(prefix+"password"),
configuration.GetStringSlice(prefix+"superLargeDirectories"),
configuration.GetString(prefix+"localDC"),
+ configuration.GetInt(prefix+"connection_timeout_millisecond"),
)
}
@@ -41,12 +43,14 @@ func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string,
return
}
-func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string, localDC string) (err error) {
+func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string, localDC string, timeout int) (err error) {
store.cluster = gocql.NewCluster(hosts...)
if username != "" && password != "" {
store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password}
}
store.cluster.Keyspace = keyspace
+ store.cluster.Timeout = time.Duration(timeout) * time.Millisecond
+ glog.V(0).Infof("timeout = %d", timeout)
fallback := gocql.RoundRobinHostPolicy()
if localDC != "" {
fallback = gocql.DCAwareRoundRobinPolicy(localDC)
@@ -96,7 +100,7 @@ func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = util.MaybeGzipData(meta)
}
diff --git a/weed/filer/configuration.go b/weed/filer/configuration.go
index 9ef2f3e0f..85fc65d13 100644
--- a/weed/filer/configuration.go
+++ b/weed/filer/configuration.go
@@ -12,7 +12,7 @@ var (
Stores []FilerStore
)
-func (f *Filer) LoadConfiguration(config *util.ViperProxy) {
+func (f *Filer) LoadConfiguration(config *util.ViperProxy) (isFresh bool) {
validateOneEnabledStore(config)
@@ -24,7 +24,7 @@ func (f *Filer) LoadConfiguration(config *util.ViperProxy) {
if err := store.Initialize(config, store.GetName()+"."); err != nil {
glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err)
}
- f.SetStore(store)
+ isFresh = f.SetStore(store)
glog.V(0).Infof("configured filer store to %s", store.GetName())
hasDefaultStoreConfigured = true
break
@@ -77,6 +77,7 @@ func (f *Filer) LoadConfiguration(config *util.ViperProxy) {
glog.V(0).Infof("configure filer %s for %s", store.GetName(), location)
}
+ return
}
func validateOneEnabledStore(config *util.ViperProxy) {
diff --git a/weed/filer/elastic/v7/doc.go b/weed/filer/elastic/v7/doc.go
new file mode 100644
index 000000000..704bbf6de
--- /dev/null
+++ b/weed/filer/elastic/v7/doc.go
@@ -0,0 +1,9 @@
+/*
+
+Package elastic is for elastic filer store.
+
+The referenced "github.com/olivere/elastic/v7" library is too big when compiled.
+So this is only compiled in "make full_install".
+
+*/
+package elastic
diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go
index a16e5ebca..cb2c66f5a 100644
--- a/weed/filer/elastic/v7/elastic_store.go
+++ b/weed/filer/elastic/v7/elastic_store.go
@@ -1,3 +1,6 @@
+//go:build elastic
+// +build elastic
+
package elastic
import (
diff --git a/weed/filer/elastic/v7/elastic_store_kv.go b/weed/filer/elastic/v7/elastic_store_kv.go
index 99c03314e..43835c153 100644
--- a/weed/filer/elastic/v7/elastic_store_kv.go
+++ b/weed/filer/elastic/v7/elastic_store_kv.go
@@ -1,3 +1,6 @@
+//go:build elastic
+// +build elastic
+
package elastic
import (
diff --git a/weed/filer/entry.go b/weed/filer/entry.go
index 8fa75fe6b..8dd00f010 100644
--- a/weed/filer/entry.go
+++ b/weed/filer/entry.go
@@ -15,15 +15,14 @@ type Attr struct {
Uid uint32 // owner uid
Gid uint32 // group gid
Mime string // mime type
- Replication string // replication
- Collection string // collection name
TtlSec int32 // ttl in seconds
- DiskType string
UserName string
GroupNames []string
SymlinkTarget string
Md5 []byte
FileSize uint64
+ Rdev uint32
+ Inode uint64
}
func (attr Attr) IsDirectory() bool {
@@ -43,6 +42,7 @@ type Entry struct {
HardLinkCounter int32
Content []byte
Remote *filer_pb.RemoteEntry
+ Quota int64
}
func (entry *Entry) Size() uint64 {
@@ -70,6 +70,7 @@ func (entry *Entry) ShallowClone() *Entry {
newEntry.HardLinkCounter = entry.HardLinkCounter
newEntry.Content = entry.Content
newEntry.Remote = entry.Remote
+ newEntry.Quota = entry.Quota
return newEntry
}
@@ -96,6 +97,7 @@ func (entry *Entry) ToExistingProtoEntry(message *filer_pb.Entry) {
message.HardLinkCounter = entry.HardLinkCounter
message.Content = entry.Content
message.RemoteEntry = entry.Remote
+ message.Quota = entry.Quota
}
func FromPbEntryToExistingEntry(message *filer_pb.Entry, fsEntry *Entry) {
@@ -106,6 +108,7 @@ func FromPbEntryToExistingEntry(message *filer_pb.Entry, fsEntry *Entry) {
fsEntry.HardLinkCounter = message.HardLinkCounter
fsEntry.Content = message.Content
fsEntry.Remote = message.RemoteEntry
+ fsEntry.Quota = message.Quota
}
func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry {
diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go
index 55c937b39..3d29ba0b4 100644
--- a/weed/filer/entry_codec.go
+++ b/weed/filer/entry_codec.go
@@ -39,15 +39,14 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
Uid: entry.Uid,
Gid: entry.Gid,
Mime: entry.Mime,
- Collection: entry.Attr.Collection,
- Replication: entry.Attr.Replication,
TtlSec: entry.Attr.TtlSec,
- DiskType: entry.Attr.DiskType,
UserName: entry.Attr.UserName,
GroupName: entry.Attr.GroupNames,
SymlinkTarget: entry.Attr.SymlinkTarget,
Md5: entry.Attr.Md5,
FileSize: entry.Attr.FileSize,
+ Rdev: entry.Attr.Rdev,
+ Inode: entry.Attr.Inode,
}
}
@@ -65,15 +64,14 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
t.Uid = attr.Uid
t.Gid = attr.Gid
t.Mime = attr.Mime
- t.Collection = attr.Collection
- t.Replication = attr.Replication
t.TtlSec = attr.TtlSec
- t.DiskType = attr.DiskType
t.UserName = attr.UserName
t.GroupNames = attr.GroupName
t.SymlinkTarget = attr.SymlinkTarget
t.Md5 = attr.Md5
t.FileSize = attr.FileSize
+ t.Rdev = attr.Rdev
+ t.Inode = attr.Inode
return t
}
@@ -118,6 +116,9 @@ func EqualEntry(a, b *Entry) bool {
if !proto.Equal(a.Remote, b.Remote) {
return false
}
+ if a.Quota != b.Quota {
+ return false
+ }
return true
}
diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go
index 71ed738f9..0dd7dbee2 100644
--- a/weed/filer/etcd/etcd_store.go
+++ b/weed/filer/etcd/etcd_store.go
@@ -7,7 +7,7 @@ import (
"strings"
"time"
- "go.etcd.io/etcd/clientv3"
+ "go.etcd.io/etcd/client/v3"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -82,7 +82,7 @@ func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer.Entry) (er
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = weed_util.MaybeGzipData(meta)
}
@@ -152,7 +152,7 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_u
}
resp, err := store.client.Get(ctx, string(lastFileStart),
- clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
+ clientv3.WithFromKey(), clientv3.WithLimit(limit+1))
if err != nil {
return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
diff --git a/weed/filer/etcd/etcd_store_test.go b/weed/filer/etcd/etcd_store_test.go
new file mode 100644
index 000000000..824c28f5a
--- /dev/null
+++ b/weed/filer/etcd/etcd_store_test.go
@@ -0,0 +1,16 @@
+package etcd
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer/store_test"
+ "testing"
+)
+
+func TestStore(t *testing.T) {
+ // run "make test_etcd" under docker folder.
+ // to set up local env
+ if false {
+ store := &EtcdStore{}
+ store.initialize("localhost:2379", "3s")
+ store_test.TestFilerStore(t, store)
+ }
+}
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 00dbf1fd6..4eb657dfa 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -3,11 +3,15 @@ package filer
import (
"bytes"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
"io"
"math"
+ "net/url"
+ "strings"
+ "sync"
"time"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+
"github.com/golang/protobuf/proto"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -19,6 +23,12 @@ const (
ManifestBatch = 10000
)
+var bytesBufferPool = sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+}
+
func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
for _, chunk := range chunks {
if chunk.IsChunkManifest {
@@ -54,17 +64,17 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun
resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
if err != nil {
- return chunks, nil, err
+ return dataChunks, nil, err
}
manifestChunks = append(manifestChunks, chunk)
// recursive
- dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
+ subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
if subErr != nil {
- return chunks, nil, subErr
+ return dataChunks, nil, subErr
}
- dataChunks = append(dataChunks, dchunks...)
- manifestChunks = append(manifestChunks, mchunks...)
+ dataChunks = append(dataChunks, subDataChunks...)
+ manifestChunks = append(manifestChunks, subManifestChunks...)
}
return
}
@@ -75,12 +85,15 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c
}
// IsChunkManifest
- data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
+ bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer)
+ bytesBuffer.Reset()
+ defer bytesBufferPool.Put(bytesBuffer)
+ err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
if err != nil {
return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
}
m := &filer_pb.FileChunkManifest{}
- if err := proto.Unmarshal(data, m); err != nil {
+ if err := proto.Unmarshal(bytesBuffer.Bytes(), m); err != nil {
return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
}
@@ -90,26 +103,43 @@ func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, c
}
// TODO fetch from cache for weed mount?
-func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
+func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error {
urlStrings, err := lookupFileIdFn(fileId)
if err != nil {
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
- return nil, err
+ return err
+ }
+ err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0)
+ if err != nil {
+ return err
}
- return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0)
+ return nil
}
-func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) {
+func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) {
+ urlStrings, err := lookupFileIdFn(fileId)
+ if err != nil {
+ glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
+ return 0, err
+ }
+ return retriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
+}
+
+func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
- var err error
var shouldRetry bool
- receivedData := make([]byte, 0, size)
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
- receivedData = receivedData[:0]
- shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
- receivedData = append(receivedData, data...)
+ n = 0
+ if strings.Contains(urlString, "%") {
+ urlString = url.PathEscape(urlString)
+ }
+ shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
+ if n < len(buffer) {
+ x := copy(buffer[n:], data)
+ n += x
+ }
})
if !shouldRetry {
break
@@ -128,7 +158,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
}
}
- return receivedData, err
+ return n, err
}
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index 0dc03f6e2..48b344bf8 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -3,11 +3,12 @@ package filer
import (
"bytes"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
- "sort"
"sync"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "golang.org/x/exp/slices"
+
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -23,7 +24,16 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
}
func FileSize(entry *filer_pb.Entry) (size uint64) {
- return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize)
+ if entry == nil || entry.Attributes == nil {
+ return 0
+ }
+ fileSize := entry.Attributes.FileSize
+ if entry.RemoteEntry != nil {
+ if entry.RemoteEntry.RemoteMtime > entry.Attributes.Mtime {
+ fileSize = maxUint64(fileSize, uint64(entry.RemoteEntry.RemoteSize))
+ }
+ }
+ return maxUint64(TotalSize(entry.Chunks), fileSize)
}
func ETag(entry *filer_pb.Entry) (etag string) {
@@ -101,6 +111,21 @@ func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
return
}
+func DoMinusChunksBySourceFileId(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
+
+ fileIds := make(map[string]bool)
+ for _, interval := range bs {
+ fileIds[interval.GetFileIdString()] = true
+ }
+ for _, chunk := range as {
+ if _, found := fileIds[chunk.GetSourceFileId()]; !found {
+ delta = append(delta, chunk)
+ }
+ }
+
+ return
+}
+
type ChunkView struct {
FileId string
Offset int64
@@ -224,19 +249,26 @@ func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (n
func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles []VisibleInterval, err error) {
chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset)
+ if err != nil {
+ return
+ }
+
+ visibles2 := readResolvedChunks(chunks)
- sort.Slice(chunks, func(i, j int) bool {
- if chunks[i].Mtime == chunks[j].Mtime {
- filer_pb.EnsureFid(chunks[i])
- filer_pb.EnsureFid(chunks[j])
- if chunks[i].Fid == nil || chunks[j].Fid == nil {
+ if true {
+ return visibles2, err
+ }
+ slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool {
+ if a.Mtime == b.Mtime {
+ filer_pb.EnsureFid(a)
+ filer_pb.EnsureFid(b)
+ if a.Fid == nil || b.Fid == nil {
return true
}
- return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey
+ return a.Fid.FileKey < b.Fid.FileKey
}
- return chunks[i].Mtime < chunks[j].Mtime // keep this to make tests run
+ return a.Mtime < b.Mtime
})
-
for _, chunk := range chunks {
// glog.V(0).Infof("merge [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size))
@@ -246,9 +278,26 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunction
}
+ if len(visibles) != len(visibles2) {
+ fmt.Printf("different visibles size %d : %d\n", len(visibles), len(visibles2))
+ } else {
+ for i := 0; i < len(visibles); i++ {
+ checkDifference(visibles[i], visibles2[i])
+ }
+ }
+
return
}
+func checkDifference(x, y VisibleInterval) {
+ if x.start != y.start ||
+ x.stop != y.stop ||
+ x.fileId != y.fileId ||
+ x.modifiedTime != y.modifiedTime {
+ fmt.Printf("different visible %+v : %+v\n", x, y)
+ }
+}
+
// find non-overlapping visible intervals
// visible interval map to one file chunk
diff --git a/weed/filer/filechunks2_test.go b/weed/filer/filechunks2_test.go
index 9f9566d9b..39dec87c9 100644
--- a/weed/filer/filechunks2_test.go
+++ b/weed/filer/filechunks2_test.go
@@ -1,7 +1,7 @@
package filer
import (
- "sort"
+ "golang.org/x/exp/slices"
"testing"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -34,11 +34,11 @@ func TestCompactFileChunksRealCase(t *testing.T) {
}
func printChunks(name string, chunks []*filer_pb.FileChunk) {
- sort.Slice(chunks, func(i, j int) bool {
- if chunks[i].Offset == chunks[j].Offset {
- return chunks[i].Mtime < chunks[j].Mtime
+ slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool {
+ if a.Offset == b.Offset {
+ return a.Mtime < b.Mtime
}
- return chunks[i].Offset < chunks[j].Offset
+ return a.Offset < b.Offset
})
for _, chunk := range chunks {
glog.V(0).Infof("%s chunk %s [%10d,%10d)", name, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
diff --git a/weed/filer/filechunks_read.go b/weed/filer/filechunks_read.go
new file mode 100644
index 000000000..1d0bd837a
--- /dev/null
+++ b/weed/filer/filechunks_read.go
@@ -0,0 +1,116 @@
+package filer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "golang.org/x/exp/slices"
+)
+
+func readResolvedChunks(chunks []*filer_pb.FileChunk) (visibles []VisibleInterval) {
+
+ var points []*Point
+ for _, chunk := range chunks {
+ points = append(points, &Point{
+ x: chunk.Offset,
+ ts: chunk.Mtime,
+ chunk: chunk,
+ isStart: true,
+ })
+ points = append(points, &Point{
+ x: chunk.Offset + int64(chunk.Size),
+ ts: chunk.Mtime,
+ chunk: chunk,
+ isStart: false,
+ })
+ }
+ slices.SortFunc(points, func(a, b *Point) bool {
+ if a.x != b.x {
+ return a.x < b.x
+ }
+ if a.ts != b.ts {
+ return a.ts < b.ts
+ }
+ return !a.isStart
+ })
+
+ var prevX int64
+ var queue []*Point
+ for _, point := range points {
+ if point.isStart {
+ if len(queue) > 0 {
+ lastIndex := len(queue) - 1
+ lastPoint := queue[lastIndex]
+ if point.x != prevX && lastPoint.ts < point.ts {
+ visibles = addToVisibles(visibles, prevX, lastPoint, point)
+ prevX = point.x
+ }
+ }
+ // insert into queue
+ for i := len(queue); i >= 0; i-- {
+ if i == 0 || queue[i-1].ts <= point.ts {
+ if i == len(queue) {
+ prevX = point.x
+ }
+ queue = addToQueue(queue, i, point)
+ break
+ }
+ }
+ } else {
+ lastIndex := len(queue) - 1
+ index := lastIndex
+ var startPoint *Point
+ for ; index >= 0; index-- {
+ startPoint = queue[index]
+ if startPoint.ts == point.ts {
+ queue = removeFromQueue(queue, index)
+ break
+ }
+ }
+ if index == lastIndex && startPoint != nil {
+ visibles = addToVisibles(visibles, prevX, startPoint, point)
+ prevX = point.x
+ }
+ }
+ }
+
+ return
+}
+
+func removeFromQueue(queue []*Point, index int) []*Point {
+ for i := index; i < len(queue)-1; i++ {
+ queue[i] = queue[i+1]
+ }
+ queue = queue[:len(queue)-1]
+ return queue
+}
+
+func addToQueue(queue []*Point, index int, point *Point) []*Point {
+ queue = append(queue, point)
+ for i := len(queue) - 1; i > index; i-- {
+ queue[i], queue[i-1] = queue[i-1], queue[i]
+ }
+ return queue
+}
+
+func addToVisibles(visibles []VisibleInterval, prevX int64, startPoint *Point, point *Point) []VisibleInterval {
+ if prevX < point.x {
+ chunk := startPoint.chunk
+ visibles = append(visibles, VisibleInterval{
+ start: prevX,
+ stop: point.x,
+ fileId: chunk.GetFileIdString(),
+ modifiedTime: chunk.Mtime,
+ chunkOffset: prevX - chunk.Offset,
+ chunkSize: chunk.Size,
+ cipherKey: chunk.CipherKey,
+ isGzipped: chunk.IsCompressed,
+ })
+ }
+ return visibles
+}
+
+type Point struct {
+ x int64
+ ts int64
+ chunk *filer_pb.FileChunk
+ isStart bool
+}
diff --git a/weed/filer/filechunks_read_test.go b/weed/filer/filechunks_read_test.go
new file mode 100644
index 000000000..e70c66e6f
--- /dev/null
+++ b/weed/filer/filechunks_read_test.go
@@ -0,0 +1,210 @@
+package filer
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "math/rand"
+ "testing"
+)
+
+func TestReadResolvedChunks(t *testing.T) {
+
+ chunks := []*filer_pb.FileChunk{
+ {
+ FileId: "a",
+ Offset: 0,
+ Size: 100,
+ Mtime: 1,
+ },
+ {
+ FileId: "b",
+ Offset: 50,
+ Size: 100,
+ Mtime: 2,
+ },
+ {
+ FileId: "c",
+ Offset: 200,
+ Size: 50,
+ Mtime: 3,
+ },
+ {
+ FileId: "d",
+ Offset: 250,
+ Size: 50,
+ Mtime: 4,
+ },
+ {
+ FileId: "e",
+ Offset: 175,
+ Size: 100,
+ Mtime: 5,
+ },
+ }
+
+ visibles := readResolvedChunks(chunks)
+
+ for _, visible := range visibles {
+ fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTime)
+ }
+
+}
+
+func TestRandomizedReadResolvedChunks(t *testing.T) {
+
+ var limit int64 = 1024 * 1024
+ array := make([]int64, limit)
+ var chunks []*filer_pb.FileChunk
+ for ts := int64(0); ts < 1024; ts++ {
+ x := rand.Int63n(limit)
+ y := rand.Int63n(limit)
+ size := x - y
+ if size < 0 {
+ size = -size
+ }
+ if size > 1024 {
+ size = 1024
+ }
+ start := x
+ if start > y {
+ start = y
+ }
+ chunks = append(chunks, randomWrite(array, start, size, ts))
+ }
+
+ visibles := readResolvedChunks(chunks)
+
+ for _, visible := range visibles {
+ for i := visible.start; i < visible.stop; i++ {
+ if array[i] != visible.modifiedTime {
+ t.Errorf("position %d expected ts %d actual ts %d", i, array[i], visible.modifiedTime)
+ }
+ }
+ }
+
+ // fmt.Printf("visibles %d", len(visibles))
+
+}
+
+func randomWrite(array []int64, start int64, size int64, ts int64) *filer_pb.FileChunk {
+ for i := start; i < start+size; i++ {
+ array[i] = ts
+ }
+ // fmt.Printf("write [%d,%d) %d\n", start, start+size, ts)
+ return &filer_pb.FileChunk{
+ FileId: "",
+ Offset: start,
+ Size: uint64(size),
+ Mtime: ts,
+ }
+}
+
+func TestSequentialReadResolvedChunks(t *testing.T) {
+
+ var chunkSize int64 = 1024 * 1024 * 2
+ var chunks []*filer_pb.FileChunk
+ for ts := int64(0); ts < 13; ts++ {
+ chunks = append(chunks, &filer_pb.FileChunk{
+ FileId: "",
+ Offset: chunkSize * ts,
+ Size: uint64(chunkSize),
+ Mtime: 1,
+ })
+ }
+
+ visibles := readResolvedChunks(chunks)
+
+ fmt.Printf("visibles %d", len(visibles))
+
+}
+
+func TestActualReadResolvedChunks(t *testing.T) {
+
+ chunks := []*filer_pb.FileChunk{
+ {
+ FileId: "5,e7b96fef48",
+ Offset: 0,
+ Size: 2097152,
+ Mtime: 1634447487595823000,
+ },
+ {
+ FileId: "5,e5562640b9",
+ Offset: 2097152,
+ Size: 2097152,
+ Mtime: 1634447487595826000,
+ },
+ {
+ FileId: "5,df033e0fe4",
+ Offset: 4194304,
+ Size: 2097152,
+ Mtime: 1634447487595827000,
+ },
+ {
+ FileId: "7,eb08148a9b",
+ Offset: 6291456,
+ Size: 2097152,
+ Mtime: 1634447487595827000,
+ },
+ {
+ FileId: "7,e0f92d1604",
+ Offset: 8388608,
+ Size: 2097152,
+ Mtime: 1634447487595828000,
+ },
+ {
+ FileId: "7,e33cb63262",
+ Offset: 10485760,
+ Size: 2097152,
+ Mtime: 1634447487595828000,
+ },
+ {
+ FileId: "5,ea98e40e93",
+ Offset: 12582912,
+ Size: 2097152,
+ Mtime: 1634447487595829000,
+ },
+ {
+ FileId: "5,e165661172",
+ Offset: 14680064,
+ Size: 2097152,
+ Mtime: 1634447487595829000,
+ },
+ {
+ FileId: "3,e692097486",
+ Offset: 16777216,
+ Size: 2097152,
+ Mtime: 1634447487595830000,
+ },
+ {
+ FileId: "3,e28e2e3cbd",
+ Offset: 18874368,
+ Size: 2097152,
+ Mtime: 1634447487595830000,
+ },
+ {
+ FileId: "3,e443974d4e",
+ Offset: 20971520,
+ Size: 2097152,
+ Mtime: 1634447487595830000,
+ },
+ {
+ FileId: "2,e815bed597",
+ Offset: 23068672,
+ Size: 2097152,
+ Mtime: 1634447487595831000,
+ },
+ {
+ FileId: "5,e94715199e",
+ Offset: 25165824,
+ Size: 1974736,
+ Mtime: 1634447487595832000,
+ },
+ }
+
+ visibles := readResolvedChunks(chunks)
+
+ for _, visible := range visibles {
+ fmt.Printf("[%d,%d) %s %d\n", visible.start, visible.stop, visible.fileId, visible.modifiedTime)
+ }
+
+}
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 1a20abefc..86827c50e 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -3,7 +3,11 @@ package filer
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"os"
+ "sort"
"strings"
"time"
@@ -33,8 +37,6 @@ type Filer struct {
fileIdDeletionQueue *util.UnboundedQueue
GrpcDialOption grpc.DialOption
DirBucketsPath string
- FsyncBuckets []string
- buckets *FilerBuckets
Cipher bool
LocalMetaLogBuffer *log_buffer.LogBuffer
metaLogCollection string
@@ -43,16 +45,18 @@ type Filer struct {
Signature int32
FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage
+ UniqueFileId uint32
}
-func NewFiler(masters []string, grpcDialOption grpc.DialOption,
- filerHost string, filerGrpcPort uint32, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
+func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress,
+ filerGroup string, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
f := &Filer{
- MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, filerGroup, cluster.FilerType, filerHost, dataCenter, masters),
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
FilerConf: NewFilerConf(),
RemoteStorage: NewFilerRemoteStorage(),
+ UniqueFileId: uint32(util.RandomInt32()),
}
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
@@ -63,32 +67,69 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption,
return f
}
-func (f *Filer) AggregateFromPeers(self string, filers []string) {
-
- // set peers
- found := false
- for _, peer := range filers {
- if peer == self {
- found = true
- }
+func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, snapshotTime time.Time) (err error) {
+ if len(existingNodes) == 0 {
+ return
}
- if !found {
- filers = append(filers, self)
+ sort.Slice(existingNodes, func(i, j int) bool {
+ return existingNodes[i].CreatedAtNs < existingNodes[j].CreatedAtNs
+ })
+ earliestNode := existingNodes[0]
+ if earliestNode.Address == string(self) {
+ return
}
- f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption)
- f.MetaAggregator.StartLoopSubscribe(f, self)
+ glog.V(0).Infof("bootstrap from %v", earliestNode.Address)
+ err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", int32(f.UniqueFileId), "/", nil,
+ 0, snapshotTime.UnixNano(), f.Signature, func(resp *filer_pb.SubscribeMetadataResponse) error {
+ return Replay(f.Store, resp)
+ }, pb.FatalOnError)
+ return
+}
+
+func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) {
+
+ f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption)
+ f.MasterClient.OnPeerUpdate = f.MetaAggregator.OnPeerUpdate
+
+ for _, peerUpdate := range existingNodes {
+ f.MetaAggregator.OnPeerUpdate(peerUpdate, startFrom)
+ }
}
-func (f *Filer) SetStore(store FilerStore) {
- f.Store = NewFilerStoreWrapper(store)
+func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) {
+
+ if grpcErr := pb.WithMasterClient(false, f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
+ ClientType: cluster.FilerType,
+ FilerGroup: f.MasterClient.FilerGroup,
+ })
+
+ glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes))
+ for _, node := range resp.ClusterNodes {
+ existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{
+ NodeType: cluster.FilerType,
+ Address: node.Address,
+ IsLeader: node.IsLeader,
+ IsAdd: true,
+ CreatedAtNs: node.CreatedAtNs,
+ })
+ }
+ return err
+ }); grpcErr != nil {
+ glog.V(0).Infof("connect to %s: %v", f.MasterClient.GetMaster(), grpcErr)
+ }
+ return
+}
- f.setOrLoadFilerStoreSignature(store)
+func (f *Filer) SetStore(store FilerStore) (isFresh bool) {
+ f.Store = NewFilerStoreWrapper(store)
+ return f.setOrLoadFilerStoreSignature(store)
}
-func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) {
+func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) (isFresh bool) {
storeIdBytes, err := store.KvGet(context.Background(), []byte(FilerStoreId))
if err == ErrKvNotFound || err == nil && len(storeIdBytes) == 0 {
f.Signature = util.RandomInt32()
@@ -98,23 +139,25 @@ func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) {
glog.Fatalf("set %s=%d : %v", FilerStoreId, f.Signature, err)
}
glog.V(0).Infof("create %s to %d", FilerStoreId, f.Signature)
+ return true
} else if err == nil && len(storeIdBytes) == 4 {
f.Signature = int32(util.BytesToUint32(storeIdBytes))
glog.V(0).Infof("existing %s = %d", FilerStoreId, f.Signature)
} else {
glog.Fatalf("read %v=%v : %v", FilerStoreId, string(storeIdBytes), err)
}
+ return false
}
func (f *Filer) GetStore() (store FilerStore) {
return f.Store
}
-func (fs *Filer) GetMaster() string {
+func (fs *Filer) GetMaster() pb.ServerAddress {
return fs.MasterClient.GetMaster()
}
-func (fs *Filer) KeepConnectedToMaster() {
+func (fs *Filer) KeepMasterClientConnected() {
fs.MasterClient.KeepConnectedToMaster()
}
@@ -130,7 +173,7 @@ func (f *Filer) RollbackTransaction(ctx context.Context) error {
return f.Store.RollbackTransaction(ctx)
}
-func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error {
+func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32, skipCreateParentDir bool) error {
if string(entry.FullPath) == "/" {
return nil
@@ -148,9 +191,11 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
if oldEntry == nil {
- dirParts := strings.Split(string(entry.FullPath), "/")
- if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
- return err
+ if !skipCreateParentDir {
+ dirParts := strings.Split(string(entry.FullPath), "/")
+ if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
+ return err
+ }
}
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
@@ -170,7 +215,6 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
}
}
- f.maybeAddBucket(entry)
f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures)
f.deleteChunksIfNotNew(oldEntry, entry)
@@ -207,15 +251,13 @@ func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, di
dirEntry = &Entry{
FullPath: util.FullPath(dirPath),
Attr: Attr{
- Mtime: now,
- Crtime: now,
- Mode: os.ModeDir | entry.Mode | 0111,
- Uid: entry.Uid,
- Gid: entry.Gid,
- Collection: entry.Collection,
- Replication: entry.Replication,
- UserName: entry.UserName,
- GroupNames: entry.GroupNames,
+ Mtime: now,
+ Crtime: now,
+ Mode: os.ModeDir | entry.Mode | 0111,
+ Uid: entry.Uid,
+ Gid: entry.Gid,
+ UserName: entry.UserName,
+ GroupNames: entry.GroupNames,
},
}
@@ -227,7 +269,6 @@ func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, di
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
}
} else {
- f.maybeAddBucket(dirEntry)
f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
}
@@ -285,14 +326,19 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
- if entry.TtlSec > 0 {
- if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- f.Store.DeleteOneEntry(ctx, entry)
- expiredCount++
- return true
+ select {
+ case <-ctx.Done():
+ return false
+ default:
+ if entry.TtlSec > 0 {
+ if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ f.Store.DeleteOneEntry(ctx, entry)
+ expiredCount++
+ return true
+ }
}
+ return eachEntryFunc(entry)
}
- return eachEntryFunc(entry)
})
if err != nil {
return expiredCount, lastFileName, err
diff --git a/weed/filer/filer_buckets.go b/weed/filer/filer_buckets.go
index 38a1abadb..e9cb3547e 100644
--- a/weed/filer/filer_buckets.go
+++ b/weed/filer/filer_buckets.go
@@ -1,76 +1,9 @@
package filer
import (
- "context"
- "math"
"strings"
- "sync"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/util"
)
-type BucketName string
-type BucketOption struct {
- Name BucketName
- Replication string
- fsync bool
-}
-type FilerBuckets struct {
- dirBucketsPath string
- buckets map[BucketName]*BucketOption
- sync.RWMutex
-}
-
-func (f *Filer) LoadBuckets() {
-
- f.buckets = &FilerBuckets{
- buckets: make(map[BucketName]*BucketOption),
- }
-
- limit := int64(math.MaxInt32)
-
- entries, _, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "", "", "")
-
- if err != nil {
- glog.V(1).Infof("no buckets found: %v", err)
- return
- }
-
- shouldFsyncMap := make(map[string]bool)
- for _, bucket := range f.FsyncBuckets {
- shouldFsyncMap[bucket] = true
- }
-
- glog.V(1).Infof("buckets found: %d", len(entries))
-
- f.buckets.Lock()
- for _, entry := range entries {
- _, shouldFsnyc := shouldFsyncMap[entry.Name()]
- f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{
- Name: BucketName(entry.Name()),
- Replication: entry.Replication,
- fsync: shouldFsnyc,
- }
- }
- f.buckets.Unlock()
-
-}
-
-func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) {
-
- f.buckets.RLock()
- defer f.buckets.RUnlock()
-
- option, found := f.buckets.buckets[BucketName(buketName)]
-
- if !found {
- return "", false
- }
- return option.Replication, option.fsync
-
-}
-
func (f *Filer) isBucket(entry *Entry) bool {
if !entry.IsDirectory() {
return false
@@ -83,43 +16,6 @@ func (f *Filer) isBucket(entry *Entry) bool {
return false
}
- f.buckets.RLock()
- defer f.buckets.RUnlock()
-
- _, found := f.buckets.buckets[BucketName(dirName)]
-
- return found
-
-}
-
-func (f *Filer) maybeAddBucket(entry *Entry) {
- if !entry.IsDirectory() {
- return
- }
- parent, dirName := entry.FullPath.DirAndName()
- if parent != f.DirBucketsPath {
- return
- }
- f.addBucket(dirName, &BucketOption{
- Name: BucketName(dirName),
- Replication: entry.Replication,
- })
-}
-
-func (f *Filer) addBucket(buketName string, bucketOption *BucketOption) {
-
- f.buckets.Lock()
- defer f.buckets.Unlock()
-
- f.buckets.buckets[BucketName(buketName)] = bucketOption
-
-}
-
-func (f *Filer) deleteBucket(buketName string) {
-
- f.buckets.Lock()
- defer f.buckets.Unlock()
-
- delete(f.buckets.buckets, BucketName(buketName))
+ return true
}
diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go
index c58b26dc2..32fc647d9 100644
--- a/weed/filer/filer_conf.go
+++ b/weed/filer/filer_conf.go
@@ -3,6 +3,10 @@ package filer
import (
"bytes"
"context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "google.golang.org/grpc"
"io"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -26,6 +30,29 @@ type FilerConf struct {
rules ptrie.Trie
}
+func ReadFilerConf(filerGrpcAddress pb.ServerAddress, grpcDialOption grpc.DialOption, masterClient *wdclient.MasterClient) (*FilerConf, error) {
+ var buf bytes.Buffer
+ if err := pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if masterClient != nil {
+ return ReadEntry(masterClient, client, DirectoryEtcSeaweedFS, FilerConfName, &buf)
+ } else {
+ content, err := ReadInsideFiler(client, DirectoryEtcSeaweedFS, FilerConfName)
+ buf = *bytes.NewBuffer(content)
+ return err
+ }
+ }); err != nil && err != filer_pb.ErrNotFound {
+ return nil, fmt.Errorf("read %s/%s: %v", DirectoryEtcSeaweedFS, FilerConfName, err)
+ }
+
+ fc := NewFilerConf()
+ if buf.Len() > 0 {
+ if err := fc.LoadFromBytes(buf.Bytes()); err != nil {
+ return nil, fmt.Errorf("parse %s/%s: %v", DirectoryEtcSeaweedFS, FilerConfName, err)
+ }
+ }
+ return fc, nil
+}
+
func NewFilerConf() (fc *FilerConf) {
fc = &FilerConf{
rules: ptrie.New(),
@@ -48,12 +75,12 @@ func (fc *FilerConf) loadFromFiler(filer *Filer) (err error) {
return fc.LoadFromBytes(entry.Content)
}
- return fc.loadFromChunks(filer, entry.Content, entry.Chunks)
+ return fc.loadFromChunks(filer, entry.Content, entry.Chunks, entry.Size())
}
-func (fc *FilerConf) loadFromChunks(filer *Filer, content []byte, chunks []*filer_pb.FileChunk) (err error) {
+func (fc *FilerConf) loadFromChunks(filer *Filer, content []byte, chunks []*filer_pb.FileChunk, size uint64) (err error) {
if len(content) == 0 {
- content, err = filer.readEntry(chunks)
+ content, err = filer.readEntry(chunks, size)
if err != nil {
glog.Errorf("read filer conf content: %v", err)
return
@@ -115,21 +142,32 @@ func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf
return pathConf
}
+func (fc *FilerConf) GetCollectionTtls(collection string) (ttls map[string]string) {
+ ttls = make(map[string]string)
+ fc.rules.Walk(func(key []byte, value interface{}) bool {
+ t := value.(*filer_pb.FilerConf_PathConf)
+ if t.Collection == collection {
+ ttls[t.LocationPrefix] = t.GetTtl()
+ }
+ return true
+ })
+ return ttls
+}
+
// merge if values in b is not empty, merge them into a
func mergePathConf(a, b *filer_pb.FilerConf_PathConf) {
a.Collection = util.Nvl(b.Collection, a.Collection)
a.Replication = util.Nvl(b.Replication, a.Replication)
a.Ttl = util.Nvl(b.Ttl, a.Ttl)
- if b.DiskType != "" {
- a.DiskType = b.DiskType
- }
+ a.DiskType = util.Nvl(b.DiskType, a.DiskType)
a.Fsync = b.Fsync || a.Fsync
if b.VolumeGrowthCount > 0 {
a.VolumeGrowthCount = b.VolumeGrowthCount
}
- if b.ReadOnly {
- a.ReadOnly = b.ReadOnly
- }
+ a.ReadOnly = b.ReadOnly || a.ReadOnly
+ a.DataCenter = util.Nvl(b.DataCenter, a.DataCenter)
+ a.Rack = util.Nvl(b.Rack, a.Rack)
+ a.DataNode = util.Nvl(b.DataNode, a.DataNode)
}
func (fc *FilerConf) ToProto() *filer_pb.FilerConf {
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index 35187d034..0fc2f6c3c 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -9,12 +9,13 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-type HardLinkId []byte
-
const (
MsgFailDelNonEmptyFolder = "fail to delete non-empty folder"
)
+type OnChunksFunc func([]*filer_pb.FileChunk) error
+type OnHardLinkIdsFunc func([]HardLinkId) error
+
func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) {
if p == "/" {
return nil
@@ -24,23 +25,30 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
if findErr != nil {
return findErr
}
-
isDeleteCollection := f.isBucket(entry)
-
- var chunks []*filer_pb.FileChunk
- var hardLinkIds []HardLinkId
- chunks = append(chunks, entry.Chunks...)
if entry.IsDirectory() {
// delete the folder children, not including the folder itself
- var dirChunks []*filer_pb.FileChunk
- var dirHardLinkIds []HardLinkId
- dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isDeleteCollection, isFromOtherCluster, signatures)
+ err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isDeleteCollection, isFromOtherCluster, signatures, func(chunks []*filer_pb.FileChunk) error {
+ if shouldDeleteChunks && !isDeleteCollection {
+ f.DirectDeleteChunks(chunks)
+ }
+ return nil
+ }, func(hardLinkIds []HardLinkId) error {
+ // A case not handled:
+ // what if the chunk is in a different collection?
+ if shouldDeleteChunks {
+ f.maybeDeleteHardLinks(hardLinkIds)
+ }
+ return nil
+ })
if err != nil {
glog.V(0).Infof("delete directory %s: %v", p, err)
return fmt.Errorf("delete directory %s: %v", p, err)
}
- chunks = append(chunks, dirChunks...)
- hardLinkIds = append(hardLinkIds, dirHardLinkIds...)
+ }
+
+ if shouldDeleteChunks && !isDeleteCollection {
+ f.DirectDeleteChunks(entry.Chunks)
}
// delete the file or folder
@@ -49,25 +57,15 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
return fmt.Errorf("delete file %s: %v", p, err)
}
- if shouldDeleteChunks && !isDeleteCollection {
- f.DirectDeleteChunks(chunks)
- }
- // A case not handled:
- // what if the chunk is in a different collection?
- if shouldDeleteChunks {
- f.maybeDeleteHardLinks(hardLinkIds)
- }
-
if isDeleteCollection {
collectionName := entry.Name()
f.doDeleteCollection(collectionName)
- f.deleteBucket(collectionName)
}
return nil
}
-func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isDeletingBucket, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, hardlinkIds []HardLinkId, err error) {
+func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isDeletingBucket, isFromOtherCluster bool, signatures []int32, onChunksFn OnChunksFunc, onHardLinkIdsFn OnHardLinkIdsFunc) (err error) {
lastFileName := ""
includeLastFile := false
@@ -76,34 +74,30 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "", "", "")
if err != nil {
glog.Errorf("list folder %s: %v", entry.FullPath, err)
- return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
+ return fmt.Errorf("list folder %s: %v", entry.FullPath, err)
}
if lastFileName == "" && !isRecursive && len(entries) > 0 {
// only for first iteration in the loop
- glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
- return nil, nil, fmt.Errorf("%s: %s", MsgFailDelNonEmptyFolder, entry.FullPath)
+ glog.V(0).Infof("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
+ return fmt.Errorf("%s: %s", MsgFailDelNonEmptyFolder, entry.FullPath)
}
for _, sub := range entries {
lastFileName = sub.Name()
- var dirChunks []*filer_pb.FileChunk
- var dirHardLinkIds []HardLinkId
if sub.IsDirectory() {
subIsDeletingBucket := f.isBucket(sub)
- dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, subIsDeletingBucket, false, nil)
- chunks = append(chunks, dirChunks...)
- hardlinkIds = append(hardlinkIds, dirHardLinkIds...)
+ err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, subIsDeletingBucket, false, nil, onChunksFn, onHardLinkIdsFn)
} else {
f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
if len(sub.HardLinkId) != 0 {
// hard link chunk data are deleted separately
- hardlinkIds = append(hardlinkIds, sub.HardLinkId)
+ err = onHardLinkIdsFn([]HardLinkId{sub.HardLinkId})
} else {
- chunks = append(chunks, sub.Chunks...)
+ err = onChunksFn(sub.Chunks)
}
}
if err != nil && !ignoreRecursiveError {
- return nil, nil, err
+ return err
}
}
@@ -113,22 +107,26 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
}
}
- glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks)
+ glog.V(3).Infof("deleting directory %v delete chunks: %v", entry.FullPath, shouldDeleteChunks)
if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
- return nil, nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
+ return fmt.Errorf("filer store delete: %v", storeDeletionErr)
}
f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
- return chunks, hardlinkIds, nil
+ return nil
}
func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool, signatures []int32) (err error) {
glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
- if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil {
+ if !entry.IsDirectory() && !shouldDeleteChunks {
+ if storeDeletionErr := f.Store.DeleteOneEntrySkipHardlink(ctx, entry.FullPath); storeDeletionErr != nil {
+ return fmt.Errorf("filer store delete skip hardlink: %v", storeDeletionErr)
+ }
+ } else if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil {
return fmt.Errorf("filer store delete: %v", storeDeletionErr)
}
if !entry.IsDirectory() {
@@ -140,7 +138,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
func (f *Filer) doDeleteCollection(collectionName string) (err error) {
- return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ return f.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: collectionName,
})
diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go
index a12587541..e73f94151 100644
--- a/weed/filer/filer_deletion.go
+++ b/weed/filer/filer_deletion.go
@@ -1,6 +1,7 @@
package filer
import (
+ "math"
"strings"
"time"
@@ -129,6 +130,12 @@ func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
}
}
+func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk) {
+ for _, chunk := range chunks {
+ f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
+ }
+}
+
func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
if oldEntry == nil {
@@ -136,18 +143,41 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
}
if newEntry == nil {
f.DeleteChunks(oldEntry.Chunks)
+ return
}
var toDelete []*filer_pb.FileChunk
newChunkIds := make(map[string]bool)
- for _, newChunk := range newEntry.Chunks {
+ newDataChunks, newManifestChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(),
+ newEntry.Chunks, 0, math.MaxInt64)
+ if err != nil {
+ glog.Errorf("Failed to resolve new entry chunks when delete old entry chunks. new: %s, old: %s",
+ newEntry.Chunks, oldEntry.Chunks)
+ return
+ }
+ for _, newChunk := range newDataChunks {
+ newChunkIds[newChunk.GetFileIdString()] = true
+ }
+ for _, newChunk := range newManifestChunks {
newChunkIds[newChunk.GetFileIdString()] = true
}
- for _, oldChunk := range oldEntry.Chunks {
+ oldDataChunks, oldManifestChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(),
+ oldEntry.Chunks, 0, math.MaxInt64)
+ if err != nil {
+ glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
+ newEntry.Chunks, oldEntry.Chunks)
+ return
+ }
+ for _, oldChunk := range oldDataChunks {
+ if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
+ toDelete = append(toDelete, oldChunk)
+ }
+ }
+ for _, oldChunk := range oldManifestChunks {
if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
toDelete = append(toDelete, oldChunk)
}
}
- f.DeleteChunks(toDelete)
+ f.DeleteChunksNotRecursive(toDelete)
}
diff --git a/weed/filer/filer_hardlink.go b/weed/filer/filer_hardlink.go
new file mode 100644
index 000000000..7a91602fd
--- /dev/null
+++ b/weed/filer/filer_hardlink.go
@@ -0,0 +1,16 @@
+package filer
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ HARD_LINK_MARKER = '\x01'
+)
+
+type HardLinkId []byte // 16 bytes + 1 marker byte
+
+func NewHardLinkId() HardLinkId {
+ bytes := append(util.RandomBytes(16), HARD_LINK_MARKER)
+ return bytes
+}
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index 7ab101102..4d26a695c 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
+ "math"
"strings"
"time"
@@ -92,14 +93,14 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
startTime, stopTime = startTime.UTC(), stopTime.UTC()
- targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir,
- startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
+ targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.%08x", SystemLogDir,
+ startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), f.UniqueFileId,
// startTime.Second(), startTime.Nanosecond(),
)
for {
if err := f.appendToFile(targetFile, buf); err != nil {
- glog.V(1).Infof("log write failed %s: %v", targetFile, err)
+ glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
time.Sleep(737 * time.Millisecond)
} else {
break
@@ -107,49 +108,67 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
}
}
-func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) {
+func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, stopTsNs int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, isDone bool, err error) {
startTime = startTime.UTC()
startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
- startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
+ startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
+ var stopDate, stopHourMinute string
+ if stopTsNs != 0 {
+ stopTime := time.Unix(0, stopTsNs+24*60*60*int64(time.Nanosecond)).UTC()
+ stopDate = fmt.Sprintf("%04d-%02d-%02d", stopTime.Year(), stopTime.Month(), stopTime.Day())
+ stopHourMinute = fmt.Sprintf("%02d-%02d", stopTime.Hour(), stopTime.Minute())
+ }
sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano()
- dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 366, "", "", "")
+ dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, math.MaxInt32, "", "", "")
if listDayErr != nil {
- return lastTsNs, fmt.Errorf("fail to list log by day: %v", listDayErr)
+ return lastTsNs, isDone, fmt.Errorf("fail to list log by day: %v", listDayErr)
}
for _, dayEntry := range dayEntries {
+ if stopDate != "" {
+ if strings.Compare(dayEntry.Name(), stopDate) > 0 {
+ break
+ }
+ }
// println("checking day", dayEntry.FullPath)
- hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "", "")
+ hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "")
if listHourMinuteErr != nil {
- return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
+ return lastTsNs, isDone, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
}
for _, hourMinuteEntry := range hourMinuteEntries {
// println("checking hh-mm", hourMinuteEntry.FullPath)
if dayEntry.Name() == startDate {
- if strings.Compare(hourMinuteEntry.Name(), startHourMinute) < 0 {
+ hourMinute := util.FileNameBase(hourMinuteEntry.Name())
+ if strings.Compare(hourMinute, startHourMinute) < 0 {
continue
}
}
+ if dayEntry.Name() == stopDate {
+ hourMinute := util.FileNameBase(hourMinuteEntry.Name())
+ if strings.Compare(hourMinute, stopHourMinute) > 0 {
+ break
+ }
+ }
// println("processing", hourMinuteEntry.FullPath)
chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.Chunks)
- if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
+ if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, stopTsNs, eachLogEntryFn); err != nil {
chunkedFileReader.Close()
if err == io.EOF {
continue
}
- return lastTsNs, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err)
+ return lastTsNs, isDone, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err)
}
chunkedFileReader.Close()
}
}
- return lastTsNs, nil
+ return lastTsNs, isDone, nil
}
-func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) {
+func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) {
for {
n, err := r.Read(sizeBuf)
if err != nil {
@@ -172,9 +191,12 @@ func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func
if err = proto.Unmarshal(entryData, logEntry); err != nil {
return lastTsNs, err
}
- if logEntry.TsNs <= ns {
+ if logEntry.TsNs <= startTsNs {
continue
}
+ if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
+ return lastTsNs, err
+ }
// println("each log: ", logEntry.TsNs)
if err := eachLogEntryFn(logEntry); err != nil {
return lastTsNs, err
diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go
index d441bbbc9..25b99d0f7 100644
--- a/weed/filer/filer_notify_append.go
+++ b/weed/filer/filer_notify_append.go
@@ -33,6 +33,8 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
Gid: OS_GID,
},
}
+ } else if err != nil {
+ return fmt.Errorf("find %s: %v", fullpath, err)
} else {
offset = int64(TotalSize(entry.Chunks))
}
@@ -41,7 +43,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error {
entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset))
// update the entry
- err = f.CreateEntry(context.Background(), entry, false, false, nil)
+ err = f.CreateEntry(context.Background(), entry, false, false, nil, false)
return err
}
@@ -66,7 +68,16 @@ func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.Assi
// upload data
targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
- uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth)
+ uploadOption := &operation.UploadOption{
+ UploadUrl: targetUrl,
+ Filename: "",
+ Cipher: f.Cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: assignResult.Auth,
+ }
+ uploadResult, err := operation.UploadData(data, uploadOption)
if err != nil {
return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
}
diff --git a/weed/filer/filer_on_meta_event.go b/weed/filer/filer_on_meta_event.go
index 34ac5321a..3b290deca 100644
--- a/weed/filer/filer_on_meta_event.go
+++ b/weed/filer/filer_on_meta_event.go
@@ -2,8 +2,6 @@ package filer
import (
"bytes"
- "math"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -24,12 +22,12 @@ func (f *Filer) onBucketEvents(event *filer_pb.SubscribeMetadataResponse) {
}
}
if f.DirBucketsPath == event.Directory {
- if message.OldEntry == nil && message.NewEntry != nil {
+ if filer_pb.IsCreate(event) {
if message.NewEntry.IsDirectory {
f.Store.OnBucketCreation(message.NewEntry.Name)
}
}
- if message.OldEntry != nil && message.NewEntry == nil {
+ if filer_pb.IsDelete(event) {
if message.OldEntry.IsDirectory {
f.Store.OnBucketDeletion(message.OldEntry.Name)
}
@@ -55,9 +53,9 @@ func (f *Filer) maybeReloadFilerConfiguration(event *filer_pb.SubscribeMetadataR
}
}
-func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) {
+func (f *Filer) readEntry(chunks []*filer_pb.FileChunk, size uint64) ([]byte, error) {
var buf bytes.Buffer
- err := StreamContent(f.MasterClient, &buf, chunks, 0, math.MaxInt64)
+ err := StreamContent(f.MasterClient, &buf, chunks, 0, int64(size))
if err != nil {
return nil, err
}
@@ -66,7 +64,7 @@ func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) {
func (f *Filer) reloadFilerConfiguration(entry *filer_pb.Entry) {
fc := NewFilerConf()
- err := fc.loadFromChunks(f, entry.Content, entry.Chunks)
+ err := fc.loadFromChunks(f, entry.Content, entry.Chunks, FileSize(entry))
if err != nil {
glog.Errorf("read filer conf chunks: %v", err)
return
diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go
index 2e0336da8..112df7984 100644
--- a/weed/filer/filer_search.go
+++ b/weed/filer/filer_search.go
@@ -23,15 +23,15 @@ func splitPattern(pattern string) (prefix string, restPattern string) {
// For now, prefix and namePattern are mutually exclusive
func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, namePatternExclude string) (entries []*Entry, hasMore bool, err error) {
+ if limit > math.MaxInt32-1 {
+ limit = math.MaxInt32 - 1
+ }
+
_, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) bool {
entries = append(entries, entry)
return true
})
- if limit == math.MaxInt64 {
- limit = math.MaxInt64 - 1
- }
-
hasMore = int64(len(entries)) >= limit+1
if hasMore {
entries = entries[:limit]
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index 38927d6fb..260945b33 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -4,8 +4,11 @@ import (
"context"
"errors"
"github.com/chrislusf/seaweedfs/weed/util"
+ "io"
)
+const CountEntryChunksForGzip = 50
+
var (
ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
ErrUnsupportedSuperLargeDirectoryListing = errors.New("unsupported super large directory listing")
@@ -45,3 +48,7 @@ type BucketAware interface {
OnBucketDeletion(bucket string)
CanDropWholeBucket() bool
}
+
+type Debuggable interface {
+ Debug(writer io.Writer)
+}
diff --git a/weed/filer/filerstore_hardlink.go b/weed/filer/filerstore_hardlink.go
index 316c76a0c..ae2f5fee7 100644
--- a/weed/filer/filerstore_hardlink.go
+++ b/weed/filer/filerstore_hardlink.go
@@ -9,16 +9,20 @@ import (
)
func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry *Entry) error {
- if len(entry.HardLinkId) == 0 {
+
+ if entry.IsDirectory() {
return nil
}
- // handle hard links
- if err := fsw.setHardLink(ctx, entry); err != nil {
- return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err)
+
+ if len(entry.HardLinkId) > 0 {
+ // handle hard links
+ if err := fsw.setHardLink(ctx, entry); err != nil {
+ return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err)
+ }
}
// check what is existing entry
- glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath)
+ // glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath)
actualStore := fsw.getActualStore(entry.FullPath)
existingEntry, err := actualStore.FindEntry(ctx, entry.FullPath)
if err != nil && err != filer_pb.ErrNotFound {
@@ -46,6 +50,8 @@ func (fsw *FilerStoreWrapper) setHardLink(ctx context.Context, entry *Entry) err
return encodeErr
}
+ glog.V(4).Infof("setHardLink %v nlink:%d", entry.FullPath, entry.HardLinkCounter)
+
return fsw.KvPut(ctx, key, newBlob)
}
@@ -55,7 +61,6 @@ func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entr
}
key := entry.HardLinkId
- glog.V(4).Infof("maybeReadHardLink KvGet %v", key)
value, err := fsw.KvGet(ctx, key)
if err != nil {
glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
@@ -67,6 +72,8 @@ func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entr
return err
}
+ glog.V(4).Infof("maybeReadHardLink %v nlink:%d", entry.FullPath, entry.HardLinkCounter)
+
return nil
}
diff --git a/weed/filer/filerstore_translate_path.go b/weed/filer/filerstore_translate_path.go
index 00bf82ed4..9e74dd41c 100644
--- a/weed/filer/filerstore_translate_path.go
+++ b/weed/filer/filerstore_translate_path.go
@@ -3,20 +3,21 @@ package filer
import (
"context"
"github.com/chrislusf/seaweedfs/weed/util"
+ "math"
"strings"
)
var (
- _ = FilerStore(&FilerStorePathTranlator{})
+ _ = FilerStore(&FilerStorePathTranslator{})
)
-type FilerStorePathTranlator struct {
+type FilerStorePathTranslator struct {
actualStore FilerStore
storeRoot string
}
-func NewFilerStorePathTranlator(storeRoot string, store FilerStore) *FilerStorePathTranlator {
- if innerStore, ok := store.(*FilerStorePathTranlator); ok {
+func NewFilerStorePathTranslator(storeRoot string, store FilerStore) *FilerStorePathTranslator {
+ if innerStore, ok := store.(*FilerStorePathTranslator); ok {
return innerStore
}
@@ -24,13 +25,13 @@ func NewFilerStorePathTranlator(storeRoot string, store FilerStore) *FilerStoreP
storeRoot += "/"
}
- return &FilerStorePathTranlator{
+ return &FilerStorePathTranslator{
actualStore: store,
storeRoot: storeRoot,
}
}
-func (t *FilerStorePathTranlator) translatePath(fp util.FullPath) (newPath util.FullPath) {
+func (t *FilerStorePathTranslator) translatePath(fp util.FullPath) (newPath util.FullPath) {
newPath = fp
if t.storeRoot == "/" {
return
@@ -41,7 +42,7 @@ func (t *FilerStorePathTranlator) translatePath(fp util.FullPath) (newPath util.
}
return
}
-func (t *FilerStorePathTranlator) changeEntryPath(entry *Entry) (previousPath util.FullPath) {
+func (t *FilerStorePathTranslator) changeEntryPath(entry *Entry) (previousPath util.FullPath) {
previousPath = entry.FullPath
if t.storeRoot == "/" {
return
@@ -49,33 +50,33 @@ func (t *FilerStorePathTranlator) changeEntryPath(entry *Entry) (previousPath ut
entry.FullPath = t.translatePath(previousPath)
return
}
-func (t *FilerStorePathTranlator) recoverEntryPath(entry *Entry, previousPath util.FullPath) {
+func (t *FilerStorePathTranslator) recoverEntryPath(entry *Entry, previousPath util.FullPath) {
entry.FullPath = previousPath
}
-func (t *FilerStorePathTranlator) GetName() string {
+func (t *FilerStorePathTranslator) GetName() string {
return t.actualStore.GetName()
}
-func (t *FilerStorePathTranlator) Initialize(configuration util.Configuration, prefix string) error {
+func (t *FilerStorePathTranslator) Initialize(configuration util.Configuration, prefix string) error {
return t.actualStore.Initialize(configuration, prefix)
}
-func (t *FilerStorePathTranlator) InsertEntry(ctx context.Context, entry *Entry) error {
+func (t *FilerStorePathTranslator) InsertEntry(ctx context.Context, entry *Entry) error {
previousPath := t.changeEntryPath(entry)
defer t.recoverEntryPath(entry, previousPath)
return t.actualStore.InsertEntry(ctx, entry)
}
-func (t *FilerStorePathTranlator) UpdateEntry(ctx context.Context, entry *Entry) error {
+func (t *FilerStorePathTranslator) UpdateEntry(ctx context.Context, entry *Entry) error {
previousPath := t.changeEntryPath(entry)
defer t.recoverEntryPath(entry, previousPath)
return t.actualStore.UpdateEntry(ctx, entry)
}
-func (t *FilerStorePathTranlator) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
+func (t *FilerStorePathTranslator) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
if t.storeRoot == "/" {
return t.actualStore.FindEntry(ctx, fp)
}
@@ -87,12 +88,12 @@ func (t *FilerStorePathTranlator) FindEntry(ctx context.Context, fp util.FullPat
return
}
-func (t *FilerStorePathTranlator) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
+func (t *FilerStorePathTranslator) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
newFullPath := t.translatePath(fp)
return t.actualStore.DeleteEntry(ctx, newFullPath)
}
-func (t *FilerStorePathTranlator) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
+func (t *FilerStorePathTranslator) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
previousPath := t.changeEntryPath(existingEntry)
defer t.recoverEntryPath(existingEntry, previousPath)
@@ -100,13 +101,13 @@ func (t *FilerStorePathTranlator) DeleteOneEntry(ctx context.Context, existingEn
return t.actualStore.DeleteEntry(ctx, existingEntry.FullPath)
}
-func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
+func (t *FilerStorePathTranslator) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
newFullPath := t.translatePath(fp)
return t.actualStore.DeleteFolderChildren(ctx, newFullPath)
}
-func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
+func (t *FilerStorePathTranslator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
newFullPath := t.translatePath(dirPath)
@@ -116,38 +117,42 @@ func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirP
})
}
-func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) {
+func (t *FilerStorePathTranslator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) {
newFullPath := t.translatePath(dirPath)
+ if limit > math.MaxInt32-1 {
+ limit = math.MaxInt32 - 1
+ }
+
return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
return eachEntryFunc(entry)
})
}
-func (t *FilerStorePathTranlator) BeginTransaction(ctx context.Context) (context.Context, error) {
+func (t *FilerStorePathTranslator) BeginTransaction(ctx context.Context) (context.Context, error) {
return t.actualStore.BeginTransaction(ctx)
}
-func (t *FilerStorePathTranlator) CommitTransaction(ctx context.Context) error {
+func (t *FilerStorePathTranslator) CommitTransaction(ctx context.Context) error {
return t.actualStore.CommitTransaction(ctx)
}
-func (t *FilerStorePathTranlator) RollbackTransaction(ctx context.Context) error {
+func (t *FilerStorePathTranslator) RollbackTransaction(ctx context.Context) error {
return t.actualStore.RollbackTransaction(ctx)
}
-func (t *FilerStorePathTranlator) Shutdown() {
+func (t *FilerStorePathTranslator) Shutdown() {
t.actualStore.Shutdown()
}
-func (t *FilerStorePathTranlator) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+func (t *FilerStorePathTranslator) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
return t.actualStore.KvPut(ctx, key, value)
}
-func (t *FilerStorePathTranlator) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+func (t *FilerStorePathTranslator) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
return t.actualStore.KvGet(ctx, key)
}
-func (t *FilerStorePathTranlator) KvDelete(ctx context.Context, key []byte) (err error) {
+func (t *FilerStorePathTranslator) KvDelete(ctx context.Context, key []byte) (err error) {
return t.actualStore.KvDelete(ctx, key)
}
diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go
index 2470f340c..3ece25ce6 100644
--- a/weed/filer/filerstore_wrapper.go
+++ b/weed/filer/filerstore_wrapper.go
@@ -4,6 +4,8 @@ import (
"context"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/viant/ptrie"
+ "io"
+ "math"
"strings"
"time"
@@ -14,12 +16,14 @@ import (
var (
_ = VirtualFilerStore(&FilerStoreWrapper{})
+ _ = Debuggable(&FilerStoreWrapper{})
)
type VirtualFilerStore interface {
FilerStore
DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
DeleteOneEntry(ctx context.Context, entry *Entry) error
+ DeleteOneEntrySkipHardlink(ctx context.Context, fullpath util.FullPath) error
AddPathSpecificStore(path string, storeId string, store FilerStore)
OnBucketCreation(bucket string)
OnBucketDeletion(bucket string)
@@ -72,7 +76,7 @@ func (fsw *FilerStoreWrapper) OnBucketDeletion(bucket string) {
}
func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string, store FilerStore) {
- fsw.storeIdToStore[storeId] = NewFilerStorePathTranlator(path, store)
+ fsw.storeIdToStore[storeId] = NewFilerStorePathTranslator(path, store)
err := fsw.pathToStore.Put([]byte(path), storeId)
if err != nil {
glog.Fatalf("put path specific store: %v", err)
@@ -124,7 +128,7 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err
return err
}
- glog.V(4).Infof("InsertEntry %s", entry.FullPath)
+ // glog.V(4).Infof("InsertEntry %s", entry.FullPath)
return actualStore.InsertEntry(ctx, entry)
}
@@ -145,7 +149,7 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err
return err
}
- glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
+ // glog.V(4).Infof("UpdateEntry %s", entry.FullPath)
return actualStore.UpdateEntry(ctx, entry)
}
@@ -189,7 +193,7 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath)
}
}
- glog.V(4).Infof("DeleteEntry %s", fp)
+ // glog.V(4).Infof("DeleteEntry %s", fp)
return actualStore.DeleteEntry(ctx, fp)
}
@@ -209,10 +213,22 @@ func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry
}
}
- glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
+ // glog.V(4).Infof("DeleteOneEntry %s", existingEntry.FullPath)
return actualStore.DeleteEntry(ctx, existingEntry.FullPath)
}
+func (fsw *FilerStoreWrapper) DeleteOneEntrySkipHardlink(ctx context.Context, fullpath util.FullPath) (err error) {
+ actualStore := fsw.getActualStore(fullpath)
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
+ }()
+
+ glog.V(4).Infof("DeleteOneEntrySkipHardlink %s", fullpath)
+ return actualStore.DeleteEntry(ctx, fullpath)
+}
+
func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
actualStore := fsw.getActualStore(fp + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc()
@@ -221,7 +237,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
}()
- glog.V(4).Infof("DeleteFolderChildren %s", fp)
+ // glog.V(4).Infof("DeleteFolderChildren %s", fp)
return actualStore.DeleteFolderChildren(ctx, fp)
}
@@ -233,7 +249,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
}()
- glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
+ // glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.Chunks)
@@ -248,14 +264,18 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
defer func() {
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
}()
- glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
- lastFileName, err = actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, eachEntryFunc)
+ if limit > math.MaxInt32-1 {
+ limit = math.MaxInt32 - 1
+ }
+ // glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
+ adjustedEntryFunc := func(entry *Entry) bool {
+ fsw.maybeReadHardLink(ctx, entry)
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ return eachEntryFunc(entry)
+ }
+ lastFileName, err = actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, adjustedEntryFunc)
if err == ErrUnsupportedListDirectoryPrefixed {
- lastFileName, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
- fsw.maybeReadHardLink(ctx, entry)
- filer_pb.AfterEntryDeserialization(entry.Chunks)
- return eachEntryFunc(entry)
- })
+ lastFileName, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, adjustedEntryFunc)
}
return lastFileName, err
}
@@ -278,8 +298,10 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
count := int64(0)
for count < limit && len(notPrefixed) > 0 {
+ var isLastItemHasPrefix bool
for _, entry := range notPrefixed {
if strings.HasPrefix(entry.Name(), prefix) {
+ isLastItemHasPrefix = true
count++
if !eachEntryFunc(entry) {
return
@@ -287,17 +309,21 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
if count >= limit {
break
}
+ } else {
+ isLastItemHasPrefix = false
}
}
- if count < limit {
+ if count < limit && isLastItemHasPrefix && len(notPrefixed) == int(limit) {
notPrefixed = notPrefixed[:0]
- _, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
+ lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
notPrefixed = append(notPrefixed, entry)
return true
})
if err != nil {
return
}
+ } else {
+ break
}
}
return
@@ -328,3 +354,9 @@ func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []by
func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
return fsw.getDefaultStore().KvDelete(ctx, key)
}
+
+func (fsw *FilerStoreWrapper) Debug(writer io.Writer) {
+ if debuggable, ok := fsw.getDefaultStore().(Debuggable); ok {
+ debuggable.Debug(writer)
+ }
+}
diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go
index e0d878ca7..c5d6eb48c 100644
--- a/weed/filer/hbase/hbase_store.go
+++ b/weed/filer/hbase/hbase_store.go
@@ -75,7 +75,7 @@ func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) er
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go
index ce454f36a..6abb37f99 100644
--- a/weed/filer/leveldb/leveldb_store.go
+++ b/weed/filer/leveldb/leveldb_store.go
@@ -6,8 +6,10 @@ import (
"fmt"
"github.com/syndtr/goleveldb/leveldb"
leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
+ "github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+ "io"
"os"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -20,6 +22,10 @@ const (
DIR_FILE_SEPARATOR = byte(0x00)
)
+var (
+ _ = filer.Debuggable(&LevelDBStore{})
+)
+
func init() {
filer.Stores = append(filer.Stores, &LevelDBStore{})
}
@@ -45,9 +51,9 @@ func (store *LevelDBStore) initialize(dir string) (err error) {
}
opts := &opt.Options{
- BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 10,
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ Filter: filter.NewBloomFilter(8), // false positive rate 0.02
}
if store.db, err = leveldb.OpenFile(dir, opts); err != nil {
@@ -80,7 +86,7 @@ func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = weed_util.MaybeGzipData(value)
}
@@ -241,3 +247,13 @@ func getNameFromKey(key []byte) string {
func (store *LevelDBStore) Shutdown() {
store.db.Close()
}
+
+func (store *LevelDBStore) Debug(writer io.Writer) {
+ iter := store.db.NewIterator(&leveldb_util.Range{}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ fullName := bytes.Replace(key, []byte{DIR_FILE_SEPARATOR}, []byte{' '}, 1)
+ fmt.Fprintf(writer, "%v\n", string(fullName))
+ }
+ iter.Release()
+}
diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go
index d437895f5..4cd8b88e8 100644
--- a/weed/filer/leveldb/leveldb_store_test.go
+++ b/weed/filer/leveldb/leveldb_store_test.go
@@ -3,7 +3,6 @@ package leveldb
import (
"context"
"fmt"
- "io/ioutil"
"os"
"testing"
"time"
@@ -13,9 +12,8 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
- defer os.RemoveAll(dir)
+ testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
+ dir := t.TempDir()
store := &LevelDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
@@ -33,7 +31,7 @@ func TestCreateAndFind(t *testing.T) {
},
}
- if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}
@@ -67,9 +65,8 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
- defer os.RemoveAll(dir)
+ testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
+ dir := t.TempDir()
store := &LevelDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
@@ -90,9 +87,8 @@ func TestEmptyRoot(t *testing.T) {
}
func BenchmarkInsertEntry(b *testing.B) {
- testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench")
- defer os.RemoveAll(dir)
+ testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
+ dir := b.TempDir()
store := &LevelDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go
index 4c4409c4d..d68493bd7 100644
--- a/weed/filer/leveldb2/leveldb2_store.go
+++ b/weed/filer/leveldb2/leveldb2_store.go
@@ -46,10 +46,9 @@ func (store *LevelDB2Store) initialize(dir string, dbCount int) (err error) {
}
opts := &opt.Options{
- BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 4,
- Filter: filter.NewBloomFilter(8), // false positive rate 0.02
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ Filter: filter.NewBloomFilter(8), // false positive rate 0.02
}
for d := 0; d < dbCount; d++ {
@@ -89,7 +88,7 @@ func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = weed_util.MaybeGzipData(value)
}
diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go
index fd0ad18a3..1f8e33116 100644
--- a/weed/filer/leveldb2/leveldb2_store_test.go
+++ b/weed/filer/leveldb2/leveldb2_store_test.go
@@ -2,8 +2,6 @@ package leveldb
import (
"context"
- "io/ioutil"
- "os"
"testing"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -11,9 +9,8 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
- defer os.RemoveAll(dir)
+ testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
+ dir := t.TempDir()
store := &LevelDB2Store{}
store.initialize(dir, 2)
testFiler.SetStore(store)
@@ -31,7 +28,7 @@ func TestCreateAndFind(t *testing.T) {
},
}
- if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}
@@ -65,9 +62,8 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
- defer os.RemoveAll(dir)
+ testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
+ dir := t.TempDir()
store := &LevelDB2Store{}
store.initialize(dir, 2)
testFiler.SetStore(store)
diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go
index bc57a6605..d21515bd4 100644
--- a/weed/filer/leveldb3/leveldb3_store.go
+++ b/weed/filer/leveldb3/leveldb3_store.go
@@ -66,17 +66,15 @@ func (store *LevelDB3Store) initialize(dir string) (err error) {
func (store *LevelDB3Store) loadDB(name string) (*leveldb.DB, error) {
bloom := filter.NewBloomFilter(8) // false positive rate 0.02
opts := &opt.Options{
- BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 4,
- Filter: bloom,
+ BlockCacheCapacity: 32 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 16 * 1024 * 1024, // default value is 4MiB
+ Filter: bloom,
}
if name != DEFAULT {
opts = &opt.Options{
- BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 4,
- Filter: bloom,
+ BlockCacheCapacity: 16 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 8 * 1024 * 1024, // default value is 4MiB
+ Filter: bloom,
}
}
@@ -179,7 +177,7 @@ func (store *LevelDB3Store) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = weed_util.MaybeGzipData(value)
}
diff --git a/weed/filer/leveldb3/leveldb3_store_test.go b/weed/filer/leveldb3/leveldb3_store_test.go
index 0b970a539..823c3a1bf 100644
--- a/weed/filer/leveldb3/leveldb3_store_test.go
+++ b/weed/filer/leveldb3/leveldb3_store_test.go
@@ -2,8 +2,6 @@ package leveldb
import (
"context"
- "io/ioutil"
- "os"
"testing"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -11,9 +9,8 @@ import (
)
func TestCreateAndFind(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
- defer os.RemoveAll(dir)
+ testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
+ dir := t.TempDir()
store := &LevelDB3Store{}
store.initialize(dir)
testFiler.SetStore(store)
@@ -31,7 +28,7 @@ func TestCreateAndFind(t *testing.T) {
},
}
- if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}
@@ -65,9 +62,8 @@ func TestCreateAndFind(t *testing.T) {
}
func TestEmptyRoot(t *testing.T) {
- testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
- defer os.RemoveAll(dir)
+ testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
+ dir := t.TempDir()
store := &LevelDB3Store{}
store.initialize(dir)
testFiler.SetStore(store)
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 913cbd454..fb96ee01b 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -3,6 +3,8 @@ package filer
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"io"
"sync"
@@ -18,9 +20,13 @@ import (
)
type MetaAggregator struct {
- filers []string
- grpcDialOption grpc.DialOption
- MetaLogBuffer *log_buffer.LogBuffer
+ filer *Filer
+ self pb.ServerAddress
+ isLeader bool
+ grpcDialOption grpc.DialOption
+ MetaLogBuffer *log_buffer.LogBuffer
+ peerStatues map[pb.ServerAddress]int
+ peerStatuesLock sync.Mutex
// notifying clients
ListenersLock sync.Mutex
ListenersCond *sync.Cond
@@ -28,10 +34,12 @@ type MetaAggregator struct {
// MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
// The old data comes from what each LocalMetadata persisted on disk.
-func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator {
+func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator {
t := &MetaAggregator{
- filers: filers,
+ filer: filer,
+ self: self,
grpcDialOption: grpcDialOption,
+ peerStatues: make(map[pb.ServerAddress]int),
}
t.ListenersCond = sync.NewCond(&t.ListenersLock)
t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() {
@@ -40,13 +48,66 @@ func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAgg
return t
}
-func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) {
- for _, filer := range ma.filers {
- go ma.subscribeToOneFiler(f, self, filer)
+func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
+ if update.NodeType != cluster.FilerType {
+ return
}
+
+ address := pb.ServerAddress(update.Address)
+ if update.IsAdd {
+ // every filer should subscribe to a new filer
+ if ma.setActive(address, true) {
+ go ma.loopSubscribeToOnefiler(ma.filer, ma.self, address, startFrom)
+ }
+ } else {
+ ma.setActive(address, false)
+ }
+}
+
+func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) {
+ ma.peerStatuesLock.Lock()
+ defer ma.peerStatuesLock.Unlock()
+ if isActive {
+ if _, found := ma.peerStatues[address]; found {
+ ma.peerStatues[address] += 1
+ } else {
+ ma.peerStatues[address] = 1
+ notDuplicated = true
+ }
+ } else {
+ if _, found := ma.peerStatues[address]; found {
+ delete(ma.peerStatues, address)
+ }
+ }
+ return
+}
+func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) {
+ ma.peerStatuesLock.Lock()
+ defer ma.peerStatuesLock.Unlock()
+ var count int
+ count, isActive = ma.peerStatues[address]
+ return count > 0 && isActive
}
-func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string) {
+func (ma *MetaAggregator) loopSubscribeToOnefiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time) {
+ lastTsNs := startFrom.UnixNano()
+ for {
+ glog.V(0).Infof("loopSubscribeToOnefiler read %s start from %v %d", peer, time.Unix(0, lastTsNs), lastTsNs)
+ nextLastTsNs, err := ma.doSubscribeToOneFiler(f, self, peer, lastTsNs)
+ if !ma.isActive(peer) {
+ glog.V(0).Infof("stop subscribing remote %s meta change", peer)
+ return
+ }
+ if err != nil {
+ glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err)
+ } else if lastTsNs < nextLastTsNs {
+ lastTsNs = nextLastTsNs
+ }
+ time.Sleep(1733 * time.Millisecond)
+ }
+}
+
+func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom int64) (int64, error) {
/*
Each filer reads the "filer.store.id", which is the store's signature when filer starts.
@@ -60,20 +121,26 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string
var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
lastPersistTime := time.Now()
- lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano()
+ lastTsNs := startFrom
peerSignature, err := ma.readFilerStoreSignature(peer)
- for err != nil {
- glog.V(0).Infof("connecting to peer filer %s: %v", peer, err)
- time.Sleep(1357 * time.Millisecond)
- peerSignature, err = ma.readFilerStoreSignature(peer)
+ if err != nil {
+ return lastTsNs, fmt.Errorf("connecting to peer filer %s: %v", peer, err)
}
// when filer store is not shared by multiple filers
if peerSignature != f.Signature {
- lastTsNs = 0
if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil {
lastTsNs = prevTsNs
+ defer func(prevTsNs int64) {
+ if lastTsNs != prevTsNs && lastTsNs != lastPersistTime.UnixNano() {
+ if err := ma.updateOffset(f, peer, peerSignature, lastTsNs); err == nil {
+ glog.V(0).Infof("last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
+ } else {
+ glog.Errorf("failed to save last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
+ }
+ }
+ }(prevTsNs)
}
glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
@@ -110,54 +177,50 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string
}
dir := event.Directory
// println("received meta change", dir, "size", len(data))
- ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, 0)
+ ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
if maybeReplicateMetadataChange != nil {
maybeReplicateMetadataChange(event)
}
return nil
}
- for {
- glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs))
- err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "filer:" + self,
- PathPrefix: "/",
- SinceNs: lastTsNs,
- })
- if err != nil {
- return fmt.Errorf("subscribe: %v", err)
- }
+ glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs))
+ err = pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "filer:" + string(self),
+ PathPrefix: "/",
+ SinceNs: lastTsNs,
+ ClientId: int32(ma.filer.UniqueFileId),
+ })
+ if err != nil {
+ return fmt.Errorf("subscribe: %v", err)
+ }
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
- if err := processEventFn(resp); err != nil {
- return fmt.Errorf("process %v: %v", resp, err)
- }
- lastTsNs = resp.TsNs
+ if err := processEventFn(resp); err != nil {
+ return fmt.Errorf("process %v: %v", resp, err)
+ }
+ lastTsNs = resp.TsNs
- f.onMetadataChangeEvent(resp)
+ f.onMetadataChangeEvent(resp)
- }
- })
- if err != nil {
- glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err)
- time.Sleep(1733 * time.Millisecond)
}
- }
+ })
+ return lastTsNs, err
}
-func (ma *MetaAggregator) readFilerStoreSignature(peer string) (sig int32, err error) {
- err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) {
+ err = pb.WithFilerClient(false, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
@@ -172,18 +235,13 @@ const (
MetaOffsetPrefix = "Meta"
)
-func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32) (lastTsNs int64, err error) {
+func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) {
key := []byte(MetaOffsetPrefix + "xxxx")
util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
value, err := f.Store.KvGet(context.Background(), key)
- if err == ErrKvNotFound {
- glog.Warningf("readOffset %s not found", peer)
- return 0, nil
- }
-
if err != nil {
return 0, fmt.Errorf("readOffset %s : %v", peer, err)
}
@@ -195,7 +253,7 @@ func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32)
return
}
-func (ma *MetaAggregator) updateOffset(f *Filer, peer string, peerSignature int32, lastTsNs int64) (err error) {
+func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) {
key := []byte(MetaOffsetPrefix + "xxxx")
util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go
index 1ef5056f4..83686bfe7 100644
--- a/weed/filer/mongodb/mongodb_store.go
+++ b/weed/filer/mongodb/mongodb_store.go
@@ -107,7 +107,7 @@ func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
meta = util.MaybeGzipData(meta)
}
@@ -159,7 +159,7 @@ func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPa
dir, name := fullpath.DirAndName()
where := bson.M{"directory": dir, "name": name}
- _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteOne(ctx, where)
+ _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where)
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
@@ -193,11 +193,15 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath uti
optLimit := int64(limit)
opts := &options.FindOptions{Limit: &optLimit, Sort: bson.M{"name": 1}}
cur, err := store.connect.Database(store.database).Collection(store.collectionName).Find(ctx, where, opts)
+ if err != nil {
+ return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err)
+ }
+
for cur.Next(ctx) {
var data Model
- err := cur.Decode(&data)
- if err != nil && err != mongo.ErrNoDocuments {
- return lastFileName, err
+ err = cur.Decode(&data)
+ if err != nil {
+ break
}
entry := &filer.Entry{
diff --git a/weed/filer/mysql2/mysql2_store.go b/weed/filer/mysql2/mysql2_store.go
index a1f54455a..e50480150 100644
--- a/weed/filer/mysql2/mysql2_store.go
+++ b/weed/filer/mysql2/mysql2_store.go
@@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
+ "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -82,7 +83,7 @@ func (store *MysqlStore2) initialize(createTable, upsertQuery string, enableUpse
return fmt.Errorf("connect to %s error:%v", sqlUrl, err)
}
- if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil {
+ if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil && !strings.Contains(err.Error(), "table already exist") {
return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err)
}
diff --git a/weed/filer/permission.go b/weed/filer/permission.go
deleted file mode 100644
index 0d8b8292b..000000000
--- a/weed/filer/permission.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package filer
-
-func hasWritePermission(dir *Entry, entry *Entry) bool {
-
- if dir == nil {
- return false
- }
-
- if dir.Uid == entry.Uid && dir.Mode&0200 > 0 {
- return true
- }
-
- if dir.Gid == entry.Gid && dir.Mode&0020 > 0 {
- return true
- }
-
- if dir.Mode&0002 > 0 {
- return true
- }
-
- return false
-}
diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go
index 3406246a9..6372dac72 100644
--- a/weed/filer/read_remote.go
+++ b/weed/filer/read_remote.go
@@ -21,13 +21,13 @@ func MapFullPathToRemoteStorageLocation(localMountedDir util.FullPath, remoteMou
return remoteLocation
}
-func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, remoteLocationPath string)(fp util.FullPath) {
+func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, remoteLocationPath string) (fp util.FullPath) {
return localMountedDir.Child(remoteLocationPath[len(remoteMountedLocation.Path):])
}
-func DownloadToLocal(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error {
- return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- _, err := client.DownloadToLocal(context.Background(), &filer_pb.DownloadToLocalRequest{
+func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error {
+ return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ _, err := client.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{
Directory: string(parent),
Name: entry.Name,
})
diff --git a/weed/filer/read_write.go b/weed/filer/read_write.go
index 14e8cab1e..e34034eb6 100644
--- a/weed/filer/read_write.go
+++ b/weed/filer/read_write.go
@@ -4,7 +4,6 @@ import (
"bytes"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "math"
"time"
)
@@ -23,7 +22,7 @@ func ReadEntry(masterClient *wdclient.MasterClient, filerClient filer_pb.Seaweed
return err
}
- return StreamContent(masterClient, byteBuffer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64)
+ return StreamContent(masterClient, byteBuffer, respLookupEntry.Entry.Chunks, 0, int64(FileSize(respLookupEntry.Entry)))
}
@@ -54,15 +53,14 @@ func SaveInsideFiler(client filer_pb.SeaweedFilerClient, dir, name string, conte
Name: name,
IsDirectory: false,
Attributes: &filer_pb.FuseAttributes{
- Mtime: time.Now().Unix(),
- Crtime: time.Now().Unix(),
- FileMode: uint32(0644),
- Collection: "",
- Replication: "",
- FileSize: uint64(len(content)),
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(0644),
+ FileSize: uint64(len(content)),
},
Content: content,
},
+ SkipCheckParentDirectory: true,
})
} else if err == nil {
entry := resp.Entry
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 458cf88be..7d9997761 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -12,20 +12,16 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "github.com/golang/groupcache/singleflight"
)
type ChunkReadAt struct {
- masterClient *wdclient.MasterClient
- chunkViews []*ChunkView
- lookupFileId wdclient.LookupFileIdFunctionType
- readerLock sync.Mutex
- fileSize int64
-
- fetchGroup singleflight.Group
- chunkCache chunk_cache.ChunkCache
- lastChunkFileId string
- lastChunkData []byte
+ masterClient *wdclient.MasterClient
+ chunkViews []*ChunkView
+ readerLock sync.Mutex
+ fileSize int64
+ readerCache *ReaderCache
+ readerPattern *ReaderPattern
+ lastChunkFid string
}
var _ = io.ReaderAt(&ChunkReadAt{})
@@ -43,7 +39,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
if !found {
util.Retry("lookup volume "+vid, func() error {
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
@@ -88,21 +84,22 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
return &ChunkReadAt{
- chunkViews: chunkViews,
- lookupFileId: lookupFn,
- chunkCache: chunkCache,
- fileSize: fileSize,
+ chunkViews: chunkViews,
+ fileSize: fileSize,
+ readerCache: newReaderCache(32, chunkCache, lookupFn),
+ readerPattern: NewReaderPattern(),
}
}
func (c *ChunkReadAt) Close() error {
- c.lastChunkData = nil
- c.lastChunkFileId = ""
+ c.readerCache.destroy()
return nil
}
func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
+ c.readerPattern.MonitorReadAt(offset, len(p))
+
c.readerLock.Lock()
defer c.readerLock.Unlock()
@@ -113,19 +110,17 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
startOffset, remaining := offset, int64(len(p))
- var nextChunk *ChunkView
+ var nextChunks []*ChunkView
for i, chunk := range c.chunkViews {
if remaining <= 0 {
break
}
if i+1 < len(c.chunkViews) {
- nextChunk = c.chunkViews[i+1]
- } else {
- nextChunk = nil
+ nextChunks = c.chunkViews[i+1:]
}
if startOffset < chunk.LogicOffset {
gap := int(chunk.LogicOffset - startOffset)
- glog.V(4).Infof("zero [%d,%d)", startOffset, startOffset+int64(gap))
+ glog.V(4).Infof("zero [%d,%d)", startOffset, chunk.LogicOffset)
n += int(min(int64(gap), remaining))
startOffset, remaining = chunk.LogicOffset, remaining-int64(gap)
if remaining <= 0 {
@@ -138,16 +133,13 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
continue
}
// glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
- var buffer []byte
bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
- bufferLength := chunkStop - chunkStart
- buffer, err = c.readChunkSlice(chunk, nextChunk, uint64(bufferOffset), uint64(bufferLength))
+ copied, err := c.readChunkSliceAt(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], chunk, nextChunks, uint64(bufferOffset))
if err != nil {
glog.Errorf("fetching chunk %+v: %v\n", chunk, err)
- return
+ return copied, err
}
- copied := copy(p[startOffset-offset:chunkStop-chunkStart+startOffset-offset], buffer)
n += copied
startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
}
@@ -169,77 +161,25 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
}
-func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *ChunkView, offset, length uint64) ([]byte, error) {
-
- chunkSlice := c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length)
- if len(chunkSlice) > 0 {
- return chunkSlice, nil
- }
- chunkData, err := c.readFromWholeChunkData(chunkView, nextChunkViews)
- if err != nil {
- return nil, err
- }
- wanted := min(int64(length), int64(len(chunkData))-int64(offset))
- return chunkData[offset : int64(offset)+wanted], nil
-}
-
-func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView, nextChunkViews ...*ChunkView) (chunkData []byte, err error) {
-
- if c.lastChunkFileId == chunkView.FileId {
- return c.lastChunkData, nil
- }
-
- v, doErr := c.readOneWholeChunk(chunkView)
+func (c *ChunkReadAt) readChunkSliceAt(buffer []byte, chunkView *ChunkView, nextChunkViews []*ChunkView, offset uint64) (n int, err error) {
- if doErr != nil {
- return nil, doErr
+ if c.readerPattern.IsRandomMode() {
+ return fetchChunkRange(buffer, c.readerCache.lookupFileIdFn, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset))
}
- chunkData = v.([]byte)
-
- c.lastChunkData = chunkData
- c.lastChunkFileId = chunkView.FileId
-
- for _, nextChunkView := range nextChunkViews {
- if c.chunkCache != nil && nextChunkView != nil {
- go c.readOneWholeChunk(nextChunkView)
+ n, err = c.readerCache.ReadChunkAt(buffer, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(chunkView.ChunkSize), chunkView.LogicOffset == 0)
+ if c.lastChunkFid != chunkView.FileId {
+ if chunkView.Offset == 0 { // start of a new chunk
+ if c.lastChunkFid != "" {
+ c.readerCache.UnCache(c.lastChunkFid)
+ c.readerCache.MaybeCache(nextChunkViews)
+ } else {
+ if len(nextChunkViews) >= 1 {
+ c.readerCache.MaybeCache(nextChunkViews[:1]) // just read the next chunk if at the very beginning
+ }
+ }
}
}
-
+ c.lastChunkFid = chunkView.FileId
return
}
-
-func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, error) {
-
- var err error
-
- return c.fetchGroup.Do(chunkView.FileId, func() (interface{}, error) {
-
- glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
-
- data := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
- if data != nil {
- glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data)))
- } else {
- var err error
- data, err = c.doFetchFullChunkData(chunkView)
- if err != nil {
- return data, err
- }
- c.chunkCache.SetChunk(chunkView.FileId, data)
- }
- return data, err
- })
-}
-
-func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) {
-
- glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId)
-
- data, err := fetchChunk(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped)
-
- glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId)
-
- return data, err
-
-}
diff --git a/weed/filer/reader_at_test.go b/weed/filer/reader_at_test.go
index f8e4727ce..d9afb460c 100644
--- a/weed/filer/reader_at_test.go
+++ b/weed/filer/reader_at_test.go
@@ -21,8 +21,12 @@ func (m *mockChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) {
return data
}
-func (m *mockChunkCache) GetChunkSlice(fileId string, offset, length uint64) []byte {
- return nil
+func (m *mockChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) {
+ x, _ := strconv.Atoi(fileId)
+ for i := 0; i < len(data); i++ {
+ data[i] = byte(x)
+ }
+ return len(data), nil
}
func (m *mockChunkCache) SetChunk(fileId string, data []byte) {
@@ -64,11 +68,11 @@ func TestReaderAt(t *testing.T) {
}
readerAt := &ChunkReadAt{
- chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
- readerLock: sync.Mutex{},
- fileSize: 10,
- chunkCache: &mockChunkCache{},
+ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
+ readerLock: sync.Mutex{},
+ fileSize: 10,
+ readerCache: newReaderCache(3, &mockChunkCache{}, nil),
+ readerPattern: NewReaderPattern(),
}
testReadAt(t, readerAt, 0, 10, 10, io.EOF)
@@ -80,7 +84,7 @@ func TestReaderAt(t *testing.T) {
func testReadAt(t *testing.T, readerAt *ChunkReadAt, offset int64, size int, expected int, expectedErr error) {
data := make([]byte, size)
- n, err := readerAt.ReadAt(data, offset)
+ n, err := readerAt.doReadAt(data, offset)
for _, d := range data {
fmt.Printf("%x", d)
@@ -114,11 +118,11 @@ func TestReaderAt0(t *testing.T) {
}
readerAt := &ChunkReadAt{
- chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
- readerLock: sync.Mutex{},
- fileSize: 10,
- chunkCache: &mockChunkCache{},
+ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
+ readerLock: sync.Mutex{},
+ fileSize: 10,
+ readerCache: newReaderCache(3, &mockChunkCache{}, nil),
+ readerPattern: NewReaderPattern(),
}
testReadAt(t, readerAt, 0, 10, 10, io.EOF)
@@ -142,11 +146,11 @@ func TestReaderAt1(t *testing.T) {
}
readerAt := &ChunkReadAt{
- chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
- readerLock: sync.Mutex{},
- fileSize: 20,
- chunkCache: &mockChunkCache{},
+ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
+ readerLock: sync.Mutex{},
+ fileSize: 20,
+ readerCache: newReaderCache(3, &mockChunkCache{}, nil),
+ readerPattern: NewReaderPattern(),
}
testReadAt(t, readerAt, 0, 20, 20, io.EOF)
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
new file mode 100644
index 000000000..bce97cc49
--- /dev/null
+++ b/weed/filer/reader_cache.go
@@ -0,0 +1,192 @@
+package filer
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "sync"
+ "time"
+)
+
+type ReaderCache struct {
+ chunkCache chunk_cache.ChunkCache
+ lookupFileIdFn wdclient.LookupFileIdFunctionType
+ sync.Mutex
+ downloaders map[string]*SingleChunkCacher
+ limit int
+}
+
+type SingleChunkCacher struct {
+ sync.RWMutex
+ parent *ReaderCache
+ chunkFileId string
+ data []byte
+ err error
+ cipherKey []byte
+ isGzipped bool
+ chunkSize int
+ shouldCache bool
+ wg sync.WaitGroup
+ completedTime time.Time
+}
+
+func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
+ return &ReaderCache{
+ limit: limit,
+ chunkCache: chunkCache,
+ lookupFileIdFn: lookupFileIdFn,
+ downloaders: make(map[string]*SingleChunkCacher),
+ }
+}
+
+func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
+ if rc.lookupFileIdFn == nil {
+ return
+ }
+
+ rc.Lock()
+ defer rc.Unlock()
+
+ for _, chunkView := range chunkViews {
+ if _, found := rc.downloaders[chunkView.FileId]; found {
+ continue
+ }
+
+ if len(rc.downloaders) >= rc.limit {
+ // if still no slots, return
+ return
+ }
+
+ // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset)
+ // cache this chunk if not yet
+ cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
+ cacher.wg.Add(1)
+ go cacher.startCaching()
+ cacher.wg.Wait()
+ rc.downloaders[chunkView.FileId] = cacher
+
+ }
+
+ return
+}
+
+func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
+ rc.Lock()
+ defer rc.Unlock()
+ if cacher, found := rc.downloaders[fileId]; found {
+ return cacher.readChunkAt(buffer, offset)
+ }
+ if shouldCache || rc.lookupFileIdFn == nil {
+ n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
+ if n > 0 {
+ return n, err
+ }
+ }
+
+ if len(rc.downloaders) >= rc.limit {
+ oldestFid, oldestTime := "", time.Now()
+ for fid, downloader := range rc.downloaders {
+ if !downloader.completedTime.IsZero() {
+ if downloader.completedTime.Before(oldestTime) {
+ oldestFid, oldestTime = fid, downloader.completedTime
+ }
+ }
+ }
+ if oldestFid != "" {
+ oldDownloader := rc.downloaders[oldestFid]
+ delete(rc.downloaders, oldestFid)
+ oldDownloader.destroy()
+ }
+ }
+
+ // glog.V(4).Infof("cache1 %s", fileId)
+
+ cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
+ cacher.wg.Add(1)
+ go cacher.startCaching()
+ cacher.wg.Wait()
+ rc.downloaders[fileId] = cacher
+
+ return cacher.readChunkAt(buffer, offset)
+}
+
+func (rc *ReaderCache) UnCache(fileId string) {
+ rc.Lock()
+ defer rc.Unlock()
+ // glog.V(4).Infof("uncache %s", fileId)
+ if downloader, found := rc.downloaders[fileId]; found {
+ downloader.destroy()
+ delete(rc.downloaders, fileId)
+ }
+}
+
+func (rc *ReaderCache) destroy() {
+ rc.Lock()
+ defer rc.Unlock()
+
+ for _, downloader := range rc.downloaders {
+ downloader.destroy()
+ }
+
+}
+
+func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
+ t := &SingleChunkCacher{
+ parent: parent,
+ chunkFileId: fileId,
+ cipherKey: cipherKey,
+ isGzipped: isGzipped,
+ chunkSize: chunkSize,
+ shouldCache: shouldCache,
+ }
+ return t
+}
+
+func (s *SingleChunkCacher) startCaching() {
+ s.Lock()
+ defer s.Unlock()
+
+ s.wg.Done() // means this has been started
+
+ urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
+ if err != nil {
+ s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
+ return
+ }
+
+ s.data = mem.Allocate(s.chunkSize)
+
+ _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
+ if s.err != nil {
+ mem.Free(s.data)
+ s.data = nil
+ return
+ }
+
+ s.completedTime = time.Now()
+ if s.shouldCache {
+ s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
+ }
+
+ return
+}
+
+func (s *SingleChunkCacher) destroy() {
+ if s.data != nil {
+ mem.Free(s.data)
+ s.data = nil
+ }
+}
+
+func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
+ s.RLock()
+ defer s.RUnlock()
+
+ if s.err != nil {
+ return 0, s.err
+ }
+
+ return copy(buf, s.data[offset:]), nil
+
+}
diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go
new file mode 100644
index 000000000..b860bc577
--- /dev/null
+++ b/weed/filer/reader_pattern.go
@@ -0,0 +1,38 @@
+package filer
+
+type ReaderPattern struct {
+ isStreaming bool
+ lastReadOffset int64
+}
+
+// For streaming read: only cache the first chunk
+// For random read: only fetch the requested range, instead of the whole chunk
+
+func NewReaderPattern() *ReaderPattern {
+ return &ReaderPattern{
+ isStreaming: true,
+ lastReadOffset: -1,
+ }
+}
+
+func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) {
+ isStreaming := true
+ if rp.lastReadOffset > offset {
+ isStreaming = false
+ }
+ if rp.lastReadOffset == -1 {
+ if offset != 0 {
+ isStreaming = false
+ }
+ }
+ rp.lastReadOffset = offset
+ rp.isStreaming = isStreaming
+}
+
+func (rp *ReaderPattern) IsStreamingMode() bool {
+ return rp.isStreaming
+}
+
+func (rp *ReaderPattern) IsRandomMode() bool {
+ return !rp.isStreaming
+}
diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go
index 30d11a7f4..89684647b 100644
--- a/weed/filer/redis/universal_redis_store.go
+++ b/weed/filer/redis/universal_redis_store.go
@@ -3,7 +3,7 @@ package redis
import (
"context"
"fmt"
- "sort"
+ "golang.org/x/exp/slices"
"strings"
"time"
@@ -40,7 +40,7 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
@@ -120,6 +120,8 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full
if err != nil {
return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
}
+ // not efficient, but need to remove if it is a directory
+ store.Client.Del(ctx, genDirectoryListKey(string(path)))
}
return nil
@@ -155,8 +157,8 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirP
}
// sort
- sort.Slice(members, func(i, j int) bool {
- return strings.Compare(members[i], members[j]) < 0
+ slices.SortFunc(members, func(a, b string) bool {
+ return strings.Compare(a, b) < 0
})
// limit
diff --git a/weed/filer/redis2/redis_sentinel_store.go b/weed/filer/redis2/redis_sentinel_store.go
new file mode 100644
index 000000000..802588b2b
--- /dev/null
+++ b/weed/filer/redis2/redis_sentinel_store.go
@@ -0,0 +1,45 @@
+package redis2
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis/v8"
+ "time"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &Redis2SentinelStore{})
+}
+
+type Redis2SentinelStore struct {
+ UniversalRedis2Store
+}
+
+func (store *Redis2SentinelStore) GetName() string {
+ return "redis2_sentinel"
+}
+
+func (store *Redis2SentinelStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetStringSlice(prefix+"addresses"),
+ configuration.GetString(prefix+"masterName"),
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetInt(prefix+"database"),
+ )
+}
+
+func (store *Redis2SentinelStore) initialize(addresses []string, masterName string, username string, password string, database int) (err error) {
+ store.Client = redis.NewFailoverClient(&redis.FailoverOptions{
+ MasterName: masterName,
+ SentinelAddrs: addresses,
+ Username: username,
+ Password: password,
+ DB: database,
+ MinRetryBackoff: time.Millisecond * 100,
+ MaxRetryBackoff: time.Minute * 1,
+ ReadTimeout: time.Second * 30,
+ WriteTimeout: time.Second * 5,
+ })
+ return
+}
diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go
index aab3d1f4a..7a34092a0 100644
--- a/weed/filer/redis2/universal_redis_store.go
+++ b/weed/filer/redis2/universal_redis_store.go
@@ -52,7 +52,7 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if len(entry.Chunks) > 50 {
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
value = util.MaybeGzipData(value)
}
@@ -133,7 +133,10 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful
return nil
}
- members, err := store.Client.ZRange(ctx, genDirectoryListKey(string(fullpath)), 0, -1).Result()
+ members, err := store.Client.ZRangeByLex(ctx, genDirectoryListKey(string(fullpath)), &redis.ZRangeBy{
+ Min: "-",
+ Max: "+",
+ }).Result()
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
}
@@ -144,6 +147,8 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful
if err != nil {
return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
}
+ // not efficient, but need to remove if it is a directory
+ store.Client.Del(ctx, genDirectoryListKey(string(path)))
}
return nil
@@ -156,14 +161,22 @@ func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Cont
func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath))
- start := int64(0)
+
+ min := "-"
if startFileName != "" {
- start, _ = store.Client.ZRank(ctx, dirListKey, startFileName).Result()
- if !includeStartFile {
- start++
+ if includeStartFile {
+ min = "[" + startFileName
+ } else {
+ min = "(" + startFileName
}
}
- members, err := store.Client.ZRange(ctx, dirListKey, start, start+int64(limit)-1).Result()
+
+ members, err := store.Client.ZRangeByLex(ctx, dirListKey, &redis.ZRangeBy{
+ Min: min,
+ Max: "+",
+ Offset: 0,
+ Count: limit,
+ }).Result()
if err != nil {
return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
diff --git a/weed/filer/redis3/ItemList.go b/weed/filer/redis3/ItemList.go
new file mode 100644
index 000000000..af3b8ae5a
--- /dev/null
+++ b/weed/filer/redis3/ItemList.go
@@ -0,0 +1,507 @@
+package redis3
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/skiplist"
+ "github.com/go-redis/redis/v8"
+)
+
+type ItemList struct {
+ skipList *skiplist.SkipList
+ batchSize int
+ client redis.UniversalClient
+ prefix string
+}
+
+func newItemList(client redis.UniversalClient, prefix string, store skiplist.ListStore, batchSize int) *ItemList {
+ return &ItemList{
+ skipList: skiplist.New(store),
+ batchSize: batchSize,
+ client: client,
+ prefix: prefix,
+ }
+}
+
+/*
+Be reluctant to create new nodes. Try to fit into either previous node or next node.
+Prefer to add to previous node.
+
+There are multiple cases after finding the name for greater or equal node
+ 1. found and node.Key == name
+ The node contains a batch with leading key the same as the name
+ nothing to do
+ 2. no such node found or node.Key > name
+
+ if no such node found
+ prevNode = list.LargestNode
+
+ // case 2.1
+ if previousNode contains name
+ nothing to do
+
+ // prefer to add to previous node
+ if prevNode != nil {
+ // case 2.2
+ if prevNode has capacity
+ prevNode.add name, and save
+ return
+ // case 2.3
+ split prevNode by name
+ }
+
+ // case 2.4
+ // merge into next node. Avoid too many nodes if adding data in reverse order.
+ if nextNode is not nil and nextNode has capacity
+ delete nextNode.Key
+ nextNode.Key = name
+ nextNode.batch.add name
+ insert nodeNode.Key
+ return
+
+ // case 2.5
+ if prevNode is nil
+ insert new node with key = name, value = batch{name}
+ return
+
+*/
+
+func (nl *ItemList) canAddMember(node *skiplist.SkipListElementReference, name string) (alreadyContains bool, nodeSize int, err error) {
+ ctx := context.Background()
+ pipe := nl.client.TxPipeline()
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ countOperation := pipe.ZLexCount(ctx, key, "-", "+")
+ scoreOperationt := pipe.ZScore(ctx, key, name)
+ if _, err = pipe.Exec(ctx); err != nil && err != redis.Nil {
+ return false, 0, err
+ }
+ if err == redis.Nil {
+ err = nil
+ }
+ alreadyContains = scoreOperationt.Err() == nil
+ nodeSize = int(countOperation.Val())
+ return
+}
+
+func (nl *ItemList) WriteName(name string) error {
+
+ lookupKey := []byte(name)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+ // case 1: the name already exists as one leading key in the batch
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ return nil
+ }
+
+ var prevNodeReference *skiplist.SkipListElementReference
+ if !found {
+ prevNodeReference = nl.skipList.GetLargestNodeReference()
+ }
+
+ if nextNode != nil && prevNode == nil {
+ prevNodeReference = nextNode.Prev
+ }
+
+ if prevNodeReference != nil {
+ alreadyContains, nodeSize, err := nl.canAddMember(prevNodeReference, name)
+ if err != nil {
+ return err
+ }
+ if alreadyContains {
+ // case 2.1
+ return nil
+ }
+
+ // case 2.2
+ if nodeSize < nl.batchSize {
+ return nl.NodeAddMember(prevNodeReference, name)
+ }
+
+ // case 2.3
+ x := nl.NodeInnerPosition(prevNodeReference, name)
+ y := nodeSize - x
+ addToX := x <= y
+ // add to a new node
+ if x == 0 || y == 0 {
+ if err := nl.ItemAdd(lookupKey, 0, name); err != nil {
+ return err
+ }
+ return nil
+ }
+ if addToX {
+ // collect names before name, add them to X
+ namesToX, err := nl.NodeRangeBeforeExclusive(prevNodeReference, name)
+ if err != nil {
+ return nil
+ }
+ // delete skiplist reference to old node
+ if _, err := nl.skipList.DeleteByKey(prevNodeReference.Key); err != nil {
+ return err
+ }
+ // add namesToY and name to a new X
+ namesToX = append(namesToX, name)
+ if err := nl.ItemAdd([]byte(namesToX[0]), 0, namesToX...); err != nil {
+ return nil
+ }
+ // remove names less than name from current Y
+ if err := nl.NodeDeleteBeforeExclusive(prevNodeReference, name); err != nil {
+ return nil
+ }
+
+ // point skip list to current Y
+ if err := nl.ItemAdd(lookupKey, prevNodeReference.ElementPointer); err != nil {
+ return nil
+ }
+ return nil
+ } else {
+ // collect names after name, add them to Y
+ namesToY, err := nl.NodeRangeAfterExclusive(prevNodeReference, name)
+ if err != nil {
+ return nil
+ }
+ // add namesToY and name to a new Y
+ namesToY = append(namesToY, name)
+ if err := nl.ItemAdd(lookupKey, 0, namesToY...); err != nil {
+ return nil
+ }
+ // remove names after name from current X
+ if err := nl.NodeDeleteAfterExclusive(prevNodeReference, name); err != nil {
+ return nil
+ }
+ return nil
+ }
+
+ }
+
+ // case 2.4
+ if nextNode != nil {
+ nodeSize := nl.NodeSize(nextNode.Reference())
+ if nodeSize < nl.batchSize {
+ if id, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
+ return err
+ } else {
+ if err := nl.ItemAdd(lookupKey, id, name); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+ }
+
+ // case 2.5
+ // now prevNode is nil
+ return nl.ItemAdd(lookupKey, 0, name)
+}
+
+/*
+// case 1: exists in nextNode
+if nextNode != nil && nextNode.Key == name {
+ remove from nextNode, update nextNode
+ // TODO: merge with prevNode if possible?
+ return
+}
+if nextNode is nil
+ prevNode = list.Largestnode
+if prevNode == nil and nextNode.Prev != nil
+ prevNode = load(nextNode.Prev)
+
+// case 2: does not exist
+// case 2.1
+if prevNode == nil {
+ return
+}
+// case 2.2
+if prevNameBatch does not contain name {
+ return
+}
+
+// case 3
+delete from prevNameBatch
+if prevNameBatch + nextNode < capacityList
+ // case 3.1
+ merge
+else
+ // case 3.2
+ update prevNode
+
+
+*/
+func (nl *ItemList) DeleteName(name string) error {
+ lookupKey := []byte(name)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+
+ // case 1
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
+ return err
+ }
+ if err := nl.NodeDeleteMember(nextNode.Reference(), name); err != nil {
+ return err
+ }
+ minName := nl.NodeMin(nextNode.Reference())
+ if minName == "" {
+ return nl.NodeDelete(nextNode.Reference())
+ }
+ return nl.ItemAdd([]byte(minName), nextNode.Id)
+ }
+
+ if !found {
+ prevNode, err = nl.skipList.GetLargestNode()
+ if err != nil {
+ return err
+ }
+ }
+
+ if nextNode != nil && prevNode == nil {
+ prevNode, err = nl.skipList.LoadElement(nextNode.Prev)
+ if err != nil {
+ return err
+ }
+ }
+
+ // case 2
+ if prevNode == nil {
+ // case 2.1
+ return nil
+ }
+ if !nl.NodeContainsItem(prevNode.Reference(), name) {
+ return nil
+ }
+
+ // case 3
+ if err := nl.NodeDeleteMember(prevNode.Reference(), name); err != nil {
+ return err
+ }
+ prevSize := nl.NodeSize(prevNode.Reference())
+ if prevSize == 0 {
+ if _, err := nl.skipList.DeleteByKey(prevNode.Key); err != nil {
+ return err
+ }
+ return nil
+ }
+ nextSize := nl.NodeSize(nextNode.Reference())
+ if nextSize > 0 && prevSize+nextSize < nl.batchSize {
+ // case 3.1 merge nextNode and prevNode
+ if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
+ return err
+ }
+ nextNames, err := nl.NodeRangeBeforeExclusive(nextNode.Reference(), "")
+ if err != nil {
+ return err
+ }
+ if err := nl.NodeAddMember(prevNode.Reference(), nextNames...); err != nil {
+ return err
+ }
+ return nl.NodeDelete(nextNode.Reference())
+ } else {
+ // case 3.2 update prevNode
+ // no action to take
+ return nil
+ }
+
+ return nil
+}
+
+func (nl *ItemList) ListNames(startFrom string, visitNamesFn func(name string) bool) error {
+ lookupKey := []byte(startFrom)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ prevNode = nil
+ }
+ if !found {
+ prevNode, err = nl.skipList.GetLargestNode()
+ if err != nil {
+ return err
+ }
+ }
+
+ if prevNode != nil {
+ if !nl.NodeScanIncluseiveAfter(prevNode.Reference(), startFrom, visitNamesFn) {
+ return nil
+ }
+ }
+
+ for nextNode != nil {
+ if !nl.NodeScanIncluseiveAfter(nextNode.Reference(), startFrom, visitNamesFn) {
+ return nil
+ }
+ nextNode, err = nl.skipList.LoadElement(nextNode.Next[0])
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (nl *ItemList) RemoteAllListElement() error {
+
+ t := nl.skipList
+
+ nodeRef := t.StartLevels[0]
+ for nodeRef != nil {
+ node, err := t.LoadElement(nodeRef)
+ if err != nil {
+ return err
+ }
+ if node == nil {
+ return nil
+ }
+ if err := t.DeleteElement(node); err != nil {
+ return err
+ }
+ if err := nl.NodeDelete(node.Reference()); err != nil {
+ return err
+ }
+ nodeRef = node.Next[0]
+ }
+ return nil
+
+}
+
+func (nl *ItemList) NodeContainsItem(node *skiplist.SkipListElementReference, item string) bool {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ _, err := nl.client.ZScore(context.Background(), key, item).Result()
+ if err == redis.Nil {
+ return false
+ }
+ if err == nil {
+ return true
+ }
+ return false
+}
+
+func (nl *ItemList) NodeSize(node *skiplist.SkipListElementReference) int {
+ if node == nil {
+ return 0
+ }
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ return int(nl.client.ZLexCount(context.Background(), key, "-", "+").Val())
+}
+
+func (nl *ItemList) NodeAddMember(node *skiplist.SkipListElementReference, names ...string) error {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ var members []*redis.Z
+ for _, name := range names {
+ members = append(members, &redis.Z{
+ Score: 0,
+ Member: name,
+ })
+ }
+ return nl.client.ZAddNX(context.Background(), key, members...).Err()
+}
+func (nl *ItemList) NodeDeleteMember(node *skiplist.SkipListElementReference, name string) error {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ return nl.client.ZRem(context.Background(), key, name).Err()
+}
+
+func (nl *ItemList) NodeDelete(node *skiplist.SkipListElementReference) error {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ return nl.client.Del(context.Background(), key).Err()
+}
+
+func (nl *ItemList) NodeInnerPosition(node *skiplist.SkipListElementReference, name string) int {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ return int(nl.client.ZLexCount(context.Background(), key, "-", "("+name).Val())
+}
+
+func (nl *ItemList) NodeMin(node *skiplist.SkipListElementReference) string {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ slice := nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{
+ Min: "-",
+ Max: "+",
+ Offset: 0,
+ Count: 1,
+ }).Val()
+ if len(slice) > 0 {
+ s := slice[0]
+ return s
+ }
+ return ""
+}
+
+func (nl *ItemList) NodeScanIncluseiveAfter(node *skiplist.SkipListElementReference, startFrom string, visitNamesFn func(name string) bool) bool {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ if startFrom == "" {
+ startFrom = "-"
+ } else {
+ startFrom = "[" + startFrom
+ }
+ names := nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{
+ Min: startFrom,
+ Max: "+",
+ }).Val()
+ for _, n := range names {
+ if !visitNamesFn(n) {
+ return false
+ }
+ }
+ return true
+}
+
+func (nl *ItemList) NodeRangeBeforeExclusive(node *skiplist.SkipListElementReference, stopAt string) ([]string, error) {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ if stopAt == "" {
+ stopAt = "+"
+ } else {
+ stopAt = "(" + stopAt
+ }
+ return nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{
+ Min: "-",
+ Max: stopAt,
+ }).Result()
+}
+func (nl *ItemList) NodeRangeAfterExclusive(node *skiplist.SkipListElementReference, startFrom string) ([]string, error) {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ if startFrom == "" {
+ startFrom = "-"
+ } else {
+ startFrom = "(" + startFrom
+ }
+ return nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{
+ Min: startFrom,
+ Max: "+",
+ }).Result()
+}
+
+func (nl *ItemList) NodeDeleteBeforeExclusive(node *skiplist.SkipListElementReference, stopAt string) error {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ if stopAt == "" {
+ stopAt = "+"
+ } else {
+ stopAt = "(" + stopAt
+ }
+ return nl.client.ZRemRangeByLex(context.Background(), key, "-", stopAt).Err()
+}
+func (nl *ItemList) NodeDeleteAfterExclusive(node *skiplist.SkipListElementReference, startFrom string) error {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ if startFrom == "" {
+ startFrom = "-"
+ } else {
+ startFrom = "(" + startFrom
+ }
+ return nl.client.ZRemRangeByLex(context.Background(), key, startFrom, "+").Err()
+}
+
+func (nl *ItemList) ItemAdd(lookupKey []byte, idIfKnown int64, names ...string) error {
+ if id, err := nl.skipList.InsertByKey(lookupKey, idIfKnown, nil); err != nil {
+ return err
+ } else {
+ if len(names) > 0 {
+ return nl.NodeAddMember(&skiplist.SkipListElementReference{
+ ElementPointer: id,
+ Key: lookupKey,
+ }, names...)
+ }
+ }
+ return nil
+}
diff --git a/weed/filer/redis3/item_list_serde.go b/weed/filer/redis3/item_list_serde.go
new file mode 100644
index 000000000..d0310ce40
--- /dev/null
+++ b/weed/filer/redis3/item_list_serde.go
@@ -0,0 +1,75 @@
+package redis3
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/skiplist"
+ "github.com/go-redis/redis/v8"
+ "github.com/golang/protobuf/proto"
+)
+
+func LoadItemList(data []byte, prefix string, client redis.UniversalClient, store skiplist.ListStore, batchSize int) *ItemList {
+
+ nl := &ItemList{
+ skipList: skiplist.New(store),
+ batchSize: batchSize,
+ client: client,
+ prefix: prefix,
+ }
+
+ if len(data) == 0 {
+ return nl
+ }
+
+ message := &skiplist.SkipListProto{}
+ if err := proto.Unmarshal(data, message); err != nil {
+ glog.Errorf("loading skiplist: %v", err)
+ }
+ nl.skipList.MaxNewLevel = int(message.MaxNewLevel)
+ nl.skipList.MaxLevel = int(message.MaxLevel)
+ for i, ref := range message.StartLevels {
+ nl.skipList.StartLevels[i] = &skiplist.SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ }
+ }
+ for i, ref := range message.EndLevels {
+ nl.skipList.EndLevels[i] = &skiplist.SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ }
+ }
+ return nl
+}
+
+func (nl *ItemList) HasChanges() bool {
+ return nl.skipList.HasChanges
+}
+
+func (nl *ItemList) ToBytes() []byte {
+ message := &skiplist.SkipListProto{}
+ message.MaxNewLevel = int32(nl.skipList.MaxNewLevel)
+ message.MaxLevel = int32(nl.skipList.MaxLevel)
+ for _, ref := range nl.skipList.StartLevels {
+ if ref == nil {
+ break
+ }
+ message.StartLevels = append(message.StartLevels, &skiplist.SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ })
+ }
+ for _, ref := range nl.skipList.EndLevels {
+ if ref == nil {
+ break
+ }
+ message.EndLevels = append(message.EndLevels, &skiplist.SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ })
+ }
+ data, err := proto.Marshal(message)
+ if err != nil {
+ glog.Errorf("marshal skiplist: %v", err)
+ }
+ return data
+}
diff --git a/weed/filer/redis3/kv_directory_children.go b/weed/filer/redis3/kv_directory_children.go
new file mode 100644
index 000000000..d92dddfe6
--- /dev/null
+++ b/weed/filer/redis3/kv_directory_children.go
@@ -0,0 +1,138 @@
+package redis3
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/go-redis/redis/v8"
+)
+
+const maxNameBatchSizeLimit = 1000000
+
+func insertChild(ctx context.Context, redisStore *UniversalRedis3Store, key string, name string) error {
+
+ // lock and unlock
+ mutex := redisStore.redsync.NewMutex(key + "lock")
+ if err := mutex.Lock(); err != nil {
+ return fmt.Errorf("lock %s: %v", key, err)
+ }
+ defer func() {
+ mutex.Unlock()
+ }()
+
+ client := redisStore.Client
+ data, err := client.Get(ctx, key).Result()
+ if err != nil {
+ if err != redis.Nil {
+ return fmt.Errorf("read %s: %v", key, err)
+ }
+ }
+ store := newSkipListElementStore(key, client)
+ nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit)
+
+ if err := nameList.WriteName(name); err != nil {
+ glog.Errorf("add %s %s: %v", key, name, err)
+ return err
+ }
+
+ if !nameList.HasChanges() {
+ return nil
+ }
+
+ if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func removeChild(ctx context.Context, redisStore *UniversalRedis3Store, key string, name string) error {
+
+ // lock and unlock
+ mutex := redisStore.redsync.NewMutex(key + "lock")
+ if err := mutex.Lock(); err != nil {
+ return fmt.Errorf("lock %s: %v", key, err)
+ }
+ defer mutex.Unlock()
+
+ client := redisStore.Client
+ data, err := client.Get(ctx, key).Result()
+ if err != nil {
+ if err != redis.Nil {
+ return fmt.Errorf("read %s: %v", key, err)
+ }
+ }
+ store := newSkipListElementStore(key, client)
+ nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit)
+
+ if err := nameList.DeleteName(name); err != nil {
+ return err
+ }
+ if !nameList.HasChanges() {
+ return nil
+ }
+
+ if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func removeChildren(ctx context.Context, redisStore *UniversalRedis3Store, key string, onDeleteFn func(name string) error) error {
+
+ // lock and unlock
+ mutex := redisStore.redsync.NewMutex(key + "lock")
+ if err := mutex.Lock(); err != nil {
+ return fmt.Errorf("lock %s: %v", key, err)
+ }
+ defer mutex.Unlock()
+
+ client := redisStore.Client
+ data, err := client.Get(ctx, key).Result()
+ if err != nil {
+ if err != redis.Nil {
+ return fmt.Errorf("read %s: %v", key, err)
+ }
+ }
+ store := newSkipListElementStore(key, client)
+ nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit)
+
+ if err = nameList.ListNames("", func(name string) bool {
+ if err := onDeleteFn(name); err != nil {
+ glog.Errorf("delete %s child %s: %v", key, name, err)
+ return false
+ }
+ return true
+ }); err != nil {
+ return err
+ }
+
+ if err = nameList.RemoteAllListElement(); err != nil {
+ return err
+ }
+
+ return nil
+
+}
+
+func listChildren(ctx context.Context, redisStore *UniversalRedis3Store, key string, startFileName string, eachFn func(name string) bool) error {
+ client := redisStore.Client
+ data, err := client.Get(ctx, key).Result()
+ if err != nil {
+ if err != redis.Nil {
+ return fmt.Errorf("read %s: %v", key, err)
+ }
+ }
+ store := newSkipListElementStore(key, client)
+ nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit)
+
+ if err = nameList.ListNames(startFileName, func(name string) bool {
+ return eachFn(name)
+ }); err != nil {
+ return err
+ }
+
+ return nil
+
+}
diff --git a/weed/filer/redis3/kv_directory_children_test.go b/weed/filer/redis3/kv_directory_children_test.go
new file mode 100644
index 000000000..9d7acacf1
--- /dev/null
+++ b/weed/filer/redis3/kv_directory_children_test.go
@@ -0,0 +1,210 @@
+package redis3
+
+import (
+ "context"
+ "fmt"
+ "github.com/go-redis/redis/v8"
+ "github.com/stvp/tempredis"
+ "strconv"
+ "testing"
+ "time"
+)
+
+var names = []string{
+ "cassandra.in.sh",
+ "cassandra",
+ "debug-cql.bat",
+ "nodetool",
+ "nodetool.bat",
+ "source-conf.ps1",
+ "sstableloader",
+ "sstableloader.bat",
+ "sstablescrub",
+ "sstablescrub.bat",
+ "sstableupgrade",
+ "sstableupgrade.bat",
+ "sstableutil",
+ "sstableutil.bat",
+ "sstableverify",
+ "sstableverify.bat",
+ "stop-server",
+ "stop-server.bat",
+ "stop-server.ps1",
+ "cassandra.in.bat",
+ "cqlsh.py",
+ "cqlsh",
+ "cassandra.ps1",
+ "cqlsh.bat",
+ "debug-cql",
+ "cassandra.bat",
+}
+
+func yTestNameList(t *testing.T) {
+ server, err := tempredis.Start(tempredis.Config{})
+ if err != nil {
+ panic(err)
+ }
+ defer server.Term()
+
+ client := redis.NewClient(&redis.Options{
+ Network: "unix",
+ Addr: server.Socket(),
+ })
+
+ store := newSkipListElementStore("/yyy/bin", client)
+ var data []byte
+ for _, name := range names {
+ nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit)
+ nameList.WriteName(name)
+
+ nameList.ListNames("", func(name string) bool {
+ println(name)
+ return true
+ })
+
+ if nameList.HasChanges() {
+ data = nameList.ToBytes()
+ }
+ println()
+ }
+
+ nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit)
+ nameList.ListNames("", func(name string) bool {
+ println(name)
+ return true
+ })
+
+}
+
+func yBenchmarkNameList(b *testing.B) {
+
+ server, err := tempredis.Start(tempredis.Config{})
+ if err != nil {
+ panic(err)
+ }
+ defer server.Term()
+
+ client := redis.NewClient(&redis.Options{
+ Network: "unix",
+ Addr: server.Socket(),
+ })
+
+ store := newSkipListElementStore("/yyy/bin", client)
+ var data []byte
+ for i := 0; i < b.N; i++ {
+ nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit)
+
+ nameList.WriteName(strconv.Itoa(i) + "namexxxxxxxxxxxxxxxxxxx")
+
+ if nameList.HasChanges() {
+ data = nameList.ToBytes()
+ }
+ }
+}
+
+func BenchmarkRedis(b *testing.B) {
+
+ server, err := tempredis.Start(tempredis.Config{})
+ if err != nil {
+ panic(err)
+ }
+ defer server.Term()
+
+ client := redis.NewClient(&redis.Options{
+ Network: "unix",
+ Addr: server.Socket(),
+ })
+
+ for i := 0; i < b.N; i++ {
+ client.ZAddNX(context.Background(), "/yyy/bin", &redis.Z{Score: 0, Member: strconv.Itoa(i) + "namexxxxxxxxxxxxxxxxxxx"})
+ }
+}
+
+func xTestNameListAdd(t *testing.T) {
+
+ server, err := tempredis.Start(tempredis.Config{})
+ if err != nil {
+ panic(err)
+ }
+ defer server.Term()
+
+ client := redis.NewClient(&redis.Options{
+ Addr: "localhost:6379",
+ Password: "",
+ DB: 0,
+ })
+
+ client.FlushAll(context.Background())
+
+ N := 364800
+
+ ts0 := time.Now()
+ store := newSkipListElementStore("/y", client)
+ var data []byte
+ nameList := LoadItemList(data, "/y", client, store, 100000)
+ for i := 0; i < N; i++ {
+ nameList.WriteName(fmt.Sprintf("%8d", i))
+ }
+
+ ts1 := time.Now()
+
+ for i := 0; i < N; i++ {
+ client.ZAddNX(context.Background(), "/x", &redis.Z{Score: 0, Member: fmt.Sprintf("name %8d", i)})
+ }
+ ts2 := time.Now()
+
+ fmt.Printf("%v %v", ts1.Sub(ts0), ts2.Sub(ts1))
+
+ /*
+ keys := client.Keys(context.Background(), "/*m").Val()
+ for _, k := range keys {
+ println("key", k)
+ for i, v := range client.ZRangeByLex(context.Background(), k, &redis.ZRangeBy{
+ Min: "-",
+ Max: "+",
+ }).Val() {
+ println(" ", i, v)
+ }
+ }
+ */
+}
+
+func xBenchmarkNameList(b *testing.B) {
+
+ server, err := tempredis.Start(tempredis.Config{})
+ if err != nil {
+ panic(err)
+ }
+ defer server.Term()
+
+ client := redis.NewClient(&redis.Options{
+ Addr: "localhost:6379",
+ Password: "",
+ DB: 0,
+ })
+
+ store := newSkipListElementStore("/yyy/bin", client)
+ var data []byte
+ for i := 0; i < b.N; i++ {
+ nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit)
+
+ nameList.WriteName(fmt.Sprintf("name %8d", i))
+
+ if nameList.HasChanges() {
+ data = nameList.ToBytes()
+ }
+ }
+}
+
+func xBenchmarkRedis(b *testing.B) {
+
+ client := redis.NewClient(&redis.Options{
+ Addr: "localhost:6379",
+ Password: "",
+ DB: 0,
+ })
+
+ for i := 0; i < b.N; i++ {
+ client.ZAddNX(context.Background(), "/xxx/bin", &redis.Z{Score: 0, Member: fmt.Sprintf("name %8d", i)})
+ }
+}
diff --git a/weed/filer/redis3/redis_cluster_store.go b/weed/filer/redis3/redis_cluster_store.go
new file mode 100644
index 000000000..73fc0dd20
--- /dev/null
+++ b/weed/filer/redis3/redis_cluster_store.go
@@ -0,0 +1,45 @@
+package redis3
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis/v8"
+ "github.com/go-redsync/redsync/v4"
+ "github.com/go-redsync/redsync/v4/redis/goredis/v8"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RedisCluster3Store{})
+}
+
+type RedisCluster3Store struct {
+ UniversalRedis3Store
+}
+
+func (store *RedisCluster3Store) GetName() string {
+ return "redis_cluster3"
+}
+
+func (store *RedisCluster3Store) Initialize(configuration util.Configuration, prefix string) (err error) {
+
+ configuration.SetDefault(prefix+"useReadOnly", false)
+ configuration.SetDefault(prefix+"routeByLatency", false)
+
+ return store.initialize(
+ configuration.GetStringSlice(prefix+"addresses"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetBool(prefix+"useReadOnly"),
+ configuration.GetBool(prefix+"routeByLatency"),
+ )
+}
+
+func (store *RedisCluster3Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
+ store.Client = redis.NewClusterClient(&redis.ClusterOptions{
+ Addrs: addresses,
+ Password: password,
+ ReadOnly: readOnly,
+ RouteByLatency: routeByLatency,
+ })
+ store.redsync = redsync.New(goredis.NewPool(store.Client))
+ return
+}
diff --git a/weed/filer/redis3/redis_sentinel_store.go b/weed/filer/redis3/redis_sentinel_store.go
new file mode 100644
index 000000000..a87302167
--- /dev/null
+++ b/weed/filer/redis3/redis_sentinel_store.go
@@ -0,0 +1,49 @@
+package redis3
+
+import (
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis/v8"
+ "github.com/go-redsync/redsync/v4"
+ "github.com/go-redsync/redsync/v4/redis/goredis/v8"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &Redis3SentinelStore{})
+}
+
+type Redis3SentinelStore struct {
+ UniversalRedis3Store
+}
+
+func (store *Redis3SentinelStore) GetName() string {
+ return "redis3_sentinel"
+}
+
+func (store *Redis3SentinelStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetStringSlice(prefix+"addresses"),
+ configuration.GetString(prefix+"masterName"),
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetInt(prefix+"database"),
+ )
+}
+
+func (store *Redis3SentinelStore) initialize(addresses []string, masterName string, username string, password string, database int) (err error) {
+ store.Client = redis.NewFailoverClient(&redis.FailoverOptions{
+ MasterName: masterName,
+ SentinelAddrs: addresses,
+ Username: username,
+ Password: password,
+ DB: database,
+ MinRetryBackoff: time.Millisecond * 100,
+ MaxRetryBackoff: time.Minute * 1,
+ ReadTimeout: time.Second * 30,
+ WriteTimeout: time.Second * 5,
+ })
+ store.redsync = redsync.New(goredis.NewPool(store.Client))
+ return
+}
diff --git a/weed/filer/redis3/redis_store.go b/weed/filer/redis3/redis_store.go
new file mode 100644
index 000000000..2eec3d37a
--- /dev/null
+++ b/weed/filer/redis3/redis_store.go
@@ -0,0 +1,39 @@
+package redis3
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis/v8"
+ "github.com/go-redsync/redsync/v4"
+ "github.com/go-redsync/redsync/v4/redis/goredis/v8"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &Redis3Store{})
+}
+
+type Redis3Store struct {
+ UniversalRedis3Store
+}
+
+func (store *Redis3Store) GetName() string {
+ return "redis3"
+}
+
+func (store *Redis3Store) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"address"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetInt(prefix+"database"),
+ )
+}
+
+func (store *Redis3Store) initialize(hostPort string, password string, database int) (err error) {
+ store.Client = redis.NewClient(&redis.Options{
+ Addr: hostPort,
+ Password: password,
+ DB: database,
+ })
+ store.redsync = redsync.New(goredis.NewPool(store.Client))
+ return
+}
diff --git a/weed/filer/redis3/skiplist_element_store.go b/weed/filer/redis3/skiplist_element_store.go
new file mode 100644
index 000000000..8c101d006
--- /dev/null
+++ b/weed/filer/redis3/skiplist_element_store.go
@@ -0,0 +1,62 @@
+package redis3
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/skiplist"
+ "github.com/go-redis/redis/v8"
+ "github.com/golang/protobuf/proto"
+)
+
+type SkipListElementStore struct {
+ Prefix string
+ client redis.UniversalClient
+}
+
+var _ = skiplist.ListStore(&SkipListElementStore{})
+
+func newSkipListElementStore(prefix string, client redis.UniversalClient) *SkipListElementStore {
+ return &SkipListElementStore{
+ Prefix: prefix,
+ client: client,
+ }
+}
+
+func (m *SkipListElementStore) SaveElement(id int64, element *skiplist.SkipListElement) error {
+ key := fmt.Sprintf("%s%d", m.Prefix, id)
+ data, err := proto.Marshal(element)
+ if err != nil {
+ glog.Errorf("marshal %s: %v", key, err)
+ }
+ return m.client.Set(context.Background(), key, data, 0).Err()
+}
+
+func (m *SkipListElementStore) DeleteElement(id int64) error {
+ key := fmt.Sprintf("%s%d", m.Prefix, id)
+ return m.client.Del(context.Background(), key).Err()
+}
+
+func (m *SkipListElementStore) LoadElement(id int64) (*skiplist.SkipListElement, error) {
+ key := fmt.Sprintf("%s%d", m.Prefix, id)
+ data, err := m.client.Get(context.Background(), key).Result()
+ if err != nil {
+ if err == redis.Nil {
+ return nil, nil
+ }
+ return nil, err
+ }
+ t := &skiplist.SkipListElement{}
+ err = proto.Unmarshal([]byte(data), t)
+ if err == nil {
+ for i := 0; i < len(t.Next); i++ {
+ if t.Next[i].IsNil() {
+ t.Next[i] = nil
+ }
+ }
+ if t.Prev.IsNil() {
+ t.Prev = nil
+ }
+ }
+ return t, err
+}
diff --git a/weed/filer/redis3/universal_redis_store.go b/weed/filer/redis3/universal_redis_store.go
new file mode 100644
index 000000000..10a87e2a4
--- /dev/null
+++ b/weed/filer/redis3/universal_redis_store.go
@@ -0,0 +1,179 @@
+package redis3
+
+import (
+ "context"
+ "fmt"
+ "github.com/go-redsync/redsync/v4"
+ "time"
+
+ "github.com/go-redis/redis/v8"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ DIR_LIST_MARKER = "\x00"
+)
+
+type UniversalRedis3Store struct {
+ Client redis.UniversalClient
+ redsync *redsync.Redsync
+}
+
+func (store *UniversalRedis3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *UniversalRedis3Store) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *UniversalRedis3Store) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
+ value = util.MaybeGzipData(value)
+ }
+
+ if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ dir, name := entry.FullPath.DirAndName()
+
+ if name != "" {
+ if err = insertChild(ctx, store, genDirectoryListKey(dir), name); err != nil {
+ return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *UniversalRedis3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+
+ data, err := store.Client.Get(ctx, string(fullpath)).Result()
+ if err == redis.Nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *UniversalRedis3Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ _, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result()
+ if err != nil {
+ return fmt.Errorf("delete dir list %s : %v", fullpath, err)
+ }
+
+ _, err = store.Client.Del(ctx, string(fullpath)).Result()
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ dir, name := fullpath.DirAndName()
+
+ if name != "" {
+ if err = removeChild(ctx, store, genDirectoryListKey(dir), name); err != nil {
+ return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis3Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ return removeChildren(ctx, store, genDirectoryListKey(string(fullpath)), func(name string) error {
+ path := util.NewFullPath(string(fullpath), name)
+ _, err = store.Client.Del(ctx, string(path)).Result()
+ if err != nil {
+ return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
+ }
+ // not efficient, but need to remove if it is a directory
+ store.Client.Del(ctx, genDirectoryListKey(string(path)))
+ return nil
+ })
+
+}
+
+func (store *UniversalRedis3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+
+ dirListKey := genDirectoryListKey(string(dirPath))
+ counter := int64(0)
+
+ err = listChildren(ctx, store, dirListKey, startFileName, func(fileName string) bool {
+ if startFileName != "" {
+ if !includeStartFile && startFileName == fileName {
+ return true
+ }
+ }
+
+ path := util.NewFullPath(string(dirPath), fileName)
+ entry, err := store.FindEntry(ctx, path)
+ lastFileName = fileName
+ if err != nil {
+ glog.V(0).Infof("list %s : %v", path, err)
+ if err == filer_pb.ErrNotFound {
+ return true
+ }
+ } else {
+ if entry.TtlSec > 0 {
+ if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ store.Client.Del(ctx, string(path)).Result()
+ store.Client.ZRem(ctx, dirListKey, fileName).Result()
+ return true
+ }
+ }
+ counter++
+ if !eachEntryFunc(entry) {
+ return false
+ }
+ if counter >= limit {
+ return false
+ }
+ }
+ return true
+ })
+
+ return lastFileName, err
+}
+
+func genDirectoryListKey(dir string) (dirList string) {
+ return dir + DIR_LIST_MARKER
+}
+
+func (store *UniversalRedis3Store) Shutdown() {
+ store.Client.Close()
+}
diff --git a/weed/filer/redis3/universal_redis_store_kv.go b/weed/filer/redis3/universal_redis_store_kv.go
new file mode 100644
index 000000000..a9c440a37
--- /dev/null
+++ b/weed/filer/redis3/universal_redis_store_kv.go
@@ -0,0 +1,42 @@
+package redis3
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/go-redis/redis/v8"
+)
+
+func (store *UniversalRedis3Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+
+ _, err = store.Client.Set(ctx, string(key), value, 0).Result()
+
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis3Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+
+ data, err := store.Client.Get(ctx, string(key)).Result()
+
+ if err == redis.Nil {
+ return nil, filer.ErrKvNotFound
+ }
+
+ return []byte(data), err
+}
+
+func (store *UniversalRedis3Store) KvDelete(ctx context.Context, key []byte) (err error) {
+
+ _, err = store.Client.Del(ctx, string(key)).Result()
+
+ if err != nil {
+ return fmt.Errorf("kv delete: %v", err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/redis_lua/redis_cluster_store.go b/weed/filer/redis_lua/redis_cluster_store.go
new file mode 100644
index 000000000..b68d1092c
--- /dev/null
+++ b/weed/filer/redis_lua/redis_cluster_store.go
@@ -0,0 +1,44 @@
+package redis_lua
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis/v8"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RedisLuaClusterStore{})
+}
+
+type RedisLuaClusterStore struct {
+ UniversalRedisLuaStore
+}
+
+func (store *RedisLuaClusterStore) GetName() string {
+ return "redis_lua_cluster"
+}
+
+func (store *RedisLuaClusterStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+
+ configuration.SetDefault(prefix+"useReadOnly", false)
+ configuration.SetDefault(prefix+"routeByLatency", false)
+
+ return store.initialize(
+ configuration.GetStringSlice(prefix+"addresses"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetBool(prefix+"useReadOnly"),
+ configuration.GetBool(prefix+"routeByLatency"),
+ configuration.GetStringSlice(prefix+"superLargeDirectories"),
+ )
+}
+
+func (store *RedisLuaClusterStore) initialize(addresses []string, password string, readOnly, routeByLatency bool, superLargeDirectories []string) (err error) {
+ store.Client = redis.NewClusterClient(&redis.ClusterOptions{
+ Addrs: addresses,
+ Password: password,
+ ReadOnly: readOnly,
+ RouteByLatency: routeByLatency,
+ })
+ store.loadSuperLargeDirectories(superLargeDirectories)
+ return
+}
diff --git a/weed/filer/redis_lua/redis_sentinel_store.go b/weed/filer/redis_lua/redis_sentinel_store.go
new file mode 100644
index 000000000..5530c098e
--- /dev/null
+++ b/weed/filer/redis_lua/redis_sentinel_store.go
@@ -0,0 +1,45 @@
+package redis_lua
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis/v8"
+ "time"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RedisLuaSentinelStore{})
+}
+
+type RedisLuaSentinelStore struct {
+ UniversalRedisLuaStore
+}
+
+func (store *RedisLuaSentinelStore) GetName() string {
+ return "redis_lua_sentinel"
+}
+
+func (store *RedisLuaSentinelStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetStringSlice(prefix+"addresses"),
+ configuration.GetString(prefix+"masterName"),
+ configuration.GetString(prefix+"username"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetInt(prefix+"database"),
+ )
+}
+
+func (store *RedisLuaSentinelStore) initialize(addresses []string, masterName string, username string, password string, database int) (err error) {
+ store.Client = redis.NewFailoverClient(&redis.FailoverOptions{
+ MasterName: masterName,
+ SentinelAddrs: addresses,
+ Username: username,
+ Password: password,
+ DB: database,
+ MinRetryBackoff: time.Millisecond * 100,
+ MaxRetryBackoff: time.Minute * 1,
+ ReadTimeout: time.Second * 30,
+ WriteTimeout: time.Second * 5,
+ })
+ return
+}
diff --git a/weed/filer/redis_lua/redis_store.go b/weed/filer/redis_lua/redis_store.go
new file mode 100644
index 000000000..a7d11c73c
--- /dev/null
+++ b/weed/filer/redis_lua/redis_store.go
@@ -0,0 +1,38 @@
+package redis_lua
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis/v8"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RedisLuaStore{})
+}
+
+type RedisLuaStore struct {
+ UniversalRedisLuaStore
+}
+
+func (store *RedisLuaStore) GetName() string {
+ return "redis_lua"
+}
+
+func (store *RedisLuaStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"address"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetInt(prefix+"database"),
+ configuration.GetStringSlice(prefix+"superLargeDirectories"),
+ )
+}
+
+func (store *RedisLuaStore) initialize(hostPort string, password string, database int, superLargeDirectories []string) (err error) {
+ store.Client = redis.NewClient(&redis.Options{
+ Addr: hostPort,
+ Password: password,
+ DB: database,
+ })
+ store.loadSuperLargeDirectories(superLargeDirectories)
+ return
+}
diff --git a/weed/filer/redis_lua/stored_procedure/delete_entry.lua b/weed/filer/redis_lua/stored_procedure/delete_entry.lua
new file mode 100644
index 000000000..445337c77
--- /dev/null
+++ b/weed/filer/redis_lua/stored_procedure/delete_entry.lua
@@ -0,0 +1,19 @@
+-- KEYS[1]: full path of entry
+local fullpath = KEYS[1]
+-- KEYS[2]: full path of entry
+local fullpath_list_key = KEYS[2]
+-- KEYS[3]: dir of the entry
+local dir_list_key = KEYS[3]
+
+-- ARGV[1]: isSuperLargeDirectory
+local isSuperLargeDirectory = ARGV[1] == "1"
+-- ARGV[2]: name of the entry
+local name = ARGV[2]
+
+redis.call("DEL", fullpath, fullpath_list_key)
+
+if not isSuperLargeDirectory and name ~= "" then
+ redis.call("ZREM", dir_list_key, name)
+end
+
+return 0 \ No newline at end of file
diff --git a/weed/filer/redis_lua/stored_procedure/delete_folder_children.lua b/weed/filer/redis_lua/stored_procedure/delete_folder_children.lua
new file mode 100644
index 000000000..77e4839f9
--- /dev/null
+++ b/weed/filer/redis_lua/stored_procedure/delete_folder_children.lua
@@ -0,0 +1,15 @@
+-- KEYS[1]: full path of entry
+local fullpath = KEYS[1]
+
+if fullpath ~= "" and string.sub(fullpath, -1) == "/" then
+ fullpath = string.sub(fullpath, 0, -2)
+end
+
+local files = redis.call("ZRANGE", fullpath .. "\0", "0", "-1")
+
+for _, name in ipairs(files) do
+ local file_path = fullpath .. "/" .. name
+ redis.call("DEL", file_path, file_path .. "\0")
+end
+
+return 0 \ No newline at end of file
diff --git a/weed/filer/redis_lua/stored_procedure/init.go b/weed/filer/redis_lua/stored_procedure/init.go
new file mode 100644
index 000000000..1412ceba2
--- /dev/null
+++ b/weed/filer/redis_lua/stored_procedure/init.go
@@ -0,0 +1,24 @@
+package stored_procedure
+
+import (
+ _ "embed"
+ "github.com/go-redis/redis/v8"
+)
+
+func init() {
+ InsertEntryScript = redis.NewScript(insertEntry)
+ DeleteEntryScript = redis.NewScript(deleteEntry)
+ DeleteFolderChildrenScript = redis.NewScript(deleteFolderChildren)
+}
+
+//go:embed insert_entry.lua
+var insertEntry string
+var InsertEntryScript *redis.Script
+
+//go:embed delete_entry.lua
+var deleteEntry string
+var DeleteEntryScript *redis.Script
+
+//go:embed delete_folder_children.lua
+var deleteFolderChildren string
+var DeleteFolderChildrenScript *redis.Script
diff --git a/weed/filer/redis_lua/stored_procedure/insert_entry.lua b/weed/filer/redis_lua/stored_procedure/insert_entry.lua
new file mode 100644
index 000000000..8deef3446
--- /dev/null
+++ b/weed/filer/redis_lua/stored_procedure/insert_entry.lua
@@ -0,0 +1,27 @@
+-- KEYS[1]: full path of entry
+local full_path = KEYS[1]
+-- KEYS[2]: dir of the entry
+local dir_list_key = KEYS[2]
+
+-- ARGV[1]: content of the entry
+local entry = ARGV[1]
+-- ARGV[2]: TTL of the entry
+local ttlSec = tonumber(ARGV[2])
+-- ARGV[3]: isSuperLargeDirectory
+local isSuperLargeDirectory = ARGV[3] == "1"
+-- ARGV[4]: zscore of the entry in zset
+local zscore = tonumber(ARGV[4])
+-- ARGV[5]: name of the entry
+local name = ARGV[5]
+
+if ttlSec > 0 then
+ redis.call("SET", full_path, entry, "EX", ttlSec)
+else
+ redis.call("SET", full_path, entry)
+end
+
+if not isSuperLargeDirectory and name ~= "" then
+ redis.call("ZADD", dir_list_key, "NX", zscore, name)
+end
+
+return 0 \ No newline at end of file
diff --git a/weed/filer/redis_lua/universal_redis_store.go b/weed/filer/redis_lua/universal_redis_store.go
new file mode 100644
index 000000000..0ab0f2f24
--- /dev/null
+++ b/weed/filer/redis_lua/universal_redis_store.go
@@ -0,0 +1,191 @@
+package redis_lua
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/go-redis/redis/v8"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filer/redis_lua/stored_procedure"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ DIR_LIST_MARKER = "\x00"
+)
+
+type UniversalRedisLuaStore struct {
+ Client redis.UniversalClient
+ superLargeDirectoryHash map[string]bool
+}
+
+func (store *UniversalRedisLuaStore) isSuperLargeDirectory(dir string) (isSuperLargeDirectory bool) {
+ _, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
+ return
+}
+
+func (store *UniversalRedisLuaStore) loadSuperLargeDirectories(superLargeDirectories []string) {
+ // set directory hash
+ store.superLargeDirectoryHash = make(map[string]bool)
+ for _, dir := range superLargeDirectories {
+ store.superLargeDirectoryHash[dir] = true
+ }
+}
+
+func (store *UniversalRedisLuaStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *UniversalRedisLuaStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *UniversalRedisLuaStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *UniversalRedisLuaStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
+ value = util.MaybeGzipData(value)
+ }
+
+ dir, name := entry.FullPath.DirAndName()
+
+ err = stored_procedure.InsertEntryScript.Run(ctx, store.Client,
+ []string{string(entry.FullPath), genDirectoryListKey(dir)},
+ value, entry.TtlSec,
+ store.isSuperLargeDirectory(dir), 0, name).Err()
+
+ if err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ return nil
+}
+
+func (store *UniversalRedisLuaStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *UniversalRedisLuaStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+
+ data, err := store.Client.Get(ctx, string(fullpath)).Result()
+ if err == redis.Nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *UniversalRedisLuaStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ dir, name := fullpath.DirAndName()
+
+ err = stored_procedure.DeleteEntryScript.Run(ctx, store.Client,
+ []string{string(fullpath), genDirectoryListKey(string(fullpath)), genDirectoryListKey(dir)},
+ store.isSuperLargeDirectory(dir), name).Err()
+
+ if err != nil {
+ return fmt.Errorf("DeleteEntry %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *UniversalRedisLuaStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ if store.isSuperLargeDirectory(string(fullpath)) {
+ return nil
+ }
+
+ err = stored_procedure.DeleteFolderChildrenScript.Run(ctx, store.Client,
+ []string{string(fullpath)}).Err()
+
+ if err != nil {
+ return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
+func (store *UniversalRedisLuaStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+
+ dirListKey := genDirectoryListKey(string(dirPath))
+
+ min := "-"
+ if startFileName != "" {
+ if includeStartFile {
+ min = "[" + startFileName
+ } else {
+ min = "(" + startFileName
+ }
+ }
+
+ members, err := store.Client.ZRangeByLex(ctx, dirListKey, &redis.ZRangeBy{
+ Min: min,
+ Max: "+",
+ Offset: 0,
+ Count: limit,
+ }).Result()
+ if err != nil {
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
+ }
+
+ // fetch entry meta
+ for _, fileName := range members {
+ path := util.NewFullPath(string(dirPath), fileName)
+ entry, err := store.FindEntry(ctx, path)
+ lastFileName = fileName
+ if err != nil {
+ glog.V(0).Infof("list %s : %v", path, err)
+ if err == filer_pb.ErrNotFound {
+ continue
+ }
+ } else {
+ if entry.TtlSec > 0 {
+ if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ store.DeleteEntry(ctx, path)
+ continue
+ }
+ }
+ if !eachEntryFunc(entry) {
+ break
+ }
+ }
+ }
+
+ return lastFileName, err
+}
+
+func genDirectoryListKey(dir string) (dirList string) {
+ return dir + DIR_LIST_MARKER
+}
+
+func (store *UniversalRedisLuaStore) Shutdown() {
+ store.Client.Close()
+}
diff --git a/weed/filer/redis_lua/universal_redis_store_kv.go b/weed/filer/redis_lua/universal_redis_store_kv.go
new file mode 100644
index 000000000..3df980b66
--- /dev/null
+++ b/weed/filer/redis_lua/universal_redis_store_kv.go
@@ -0,0 +1,42 @@
+package redis_lua
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/go-redis/redis/v8"
+)
+
+func (store *UniversalRedisLuaStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+
+ _, err = store.Client.Set(ctx, string(key), value, 0).Result()
+
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+
+ return nil
+}
+
+func (store *UniversalRedisLuaStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+
+ data, err := store.Client.Get(ctx, string(key)).Result()
+
+ if err == redis.Nil {
+ return nil, filer.ErrKvNotFound
+ }
+
+ return []byte(data), err
+}
+
+func (store *UniversalRedisLuaStore) KvDelete(ctx context.Context, key []byte) (err error) {
+
+ _, err = store.Client.Del(ctx, string(key)).Result()
+
+ if err != nil {
+ return fmt.Errorf("kv delete: %v", err)
+ }
+
+ return nil
+}
diff --git a/weed/filer/remote_mapping.go b/weed/filer/remote_mapping.go
new file mode 100644
index 000000000..b0534e2ca
--- /dev/null
+++ b/weed/filer/remote_mapping.go
@@ -0,0 +1,121 @@
+package filer
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
+ "github.com/golang/protobuf/proto"
+ "google.golang.org/grpc"
+)
+
+func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error) {
+ var oldContent []byte
+ if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
+ return readErr
+ }); readErr != nil {
+ return nil, readErr
+ }
+
+ mappings, readErr = UnmarshalRemoteStorageMappings(oldContent)
+ if readErr != nil {
+ return nil, fmt.Errorf("unmarshal mappings: %v", readErr)
+ }
+
+ return
+}
+
+func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStorageLocation *remote_pb.RemoteStorageLocation) (err error) {
+
+ // read current mapping
+ var oldContent, newContent []byte
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
+ return err
+ })
+ if err != nil {
+ if err != filer_pb.ErrNotFound {
+ return fmt.Errorf("read existing mapping: %v", err)
+ }
+ }
+
+ // add new mapping
+ newContent, err = addRemoteStorageMapping(oldContent, dir, remoteStorageLocation)
+ if err != nil {
+ return fmt.Errorf("add mapping %s~%s: %v", dir, remoteStorageLocation, err)
+ }
+
+ // save back
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent)
+ })
+ if err != nil {
+ return fmt.Errorf("save mapping: %v", err)
+ }
+
+ return nil
+}
+
+func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error) {
+
+ // read current mapping
+ var oldContent, newContent []byte
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
+ return err
+ })
+ if err != nil {
+ if err != filer_pb.ErrNotFound {
+ return fmt.Errorf("read existing mapping: %v", err)
+ }
+ }
+
+ // add new mapping
+ newContent, err = removeRemoteStorageMapping(oldContent, dir)
+ if err != nil {
+ return fmt.Errorf("delete mount %s: %v", dir, err)
+ }
+
+ // save back
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent)
+ })
+ if err != nil {
+ return fmt.Errorf("save mapping: %v", err)
+ }
+
+ return nil
+}
+
+func addRemoteStorageMapping(oldContent []byte, dir string, storageLocation *remote_pb.RemoteStorageLocation) (newContent []byte, err error) {
+ mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
+ if unmarshalErr != nil {
+ // skip
+ }
+
+ // set the new mapping
+ mappings.Mappings[dir] = storageLocation
+
+ if newContent, err = proto.Marshal(mappings); err != nil {
+ return oldContent, fmt.Errorf("marshal mappings: %v", err)
+ }
+
+ return
+}
+
+func removeRemoteStorageMapping(oldContent []byte, dir string) (newContent []byte, err error) {
+ mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
+ if unmarshalErr != nil {
+ return nil, unmarshalErr
+ }
+
+ // set the new mapping
+ delete(mappings.Mappings, dir)
+
+ if newContent, err = proto.Marshal(mappings); err != nil {
+ return oldContent, fmt.Errorf("marshal mappings: %v", err)
+ }
+
+ return
+}
diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/remote_storage.go
index 65704e652..5362ba738 100644
--- a/weed/filer/filer_remote_storage.go
+++ b/weed/filer/remote_storage.go
@@ -131,58 +131,9 @@ func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.Remo
return
}
-func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *remote_pb.RemoteStorageLocation) (newContent []byte, err error) {
- mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
- if unmarshalErr != nil {
- // skip
- }
-
- // set the new mapping
- mappings.Mappings[dir] = storageLocation
-
- if newContent, err = proto.Marshal(mappings); err != nil {
- return oldContent, fmt.Errorf("marshal mappings: %v", err)
- }
-
- return
-}
-
-func RemoveRemoteStorageMapping(oldContent []byte, dir string) (newContent []byte, err error) {
- mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
- if unmarshalErr != nil {
- return nil, unmarshalErr
- }
-
- // set the new mapping
- delete(mappings.Mappings, dir)
-
- if newContent, err = proto.Marshal(mappings); err != nil {
- return oldContent, fmt.Errorf("marshal mappings: %v", err)
- }
-
- return
-}
-
-func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *remote_pb.RemoteStorageMapping, readErr error) {
- var oldContent []byte
- if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
- return readErr
- }); readErr != nil {
- return nil, readErr
- }
-
- mappings, readErr = UnmarshalRemoteStorageMappings(oldContent)
- if readErr != nil {
- return nil, fmt.Errorf("unmarshal mappings: %v", readErr)
- }
-
- return
-}
-
-func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *remote_pb.RemoteConf, readErr error) {
+func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error) {
var oldContent []byte
- if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
return readErr
}); readErr != nil {
@@ -199,7 +150,7 @@ func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string,
return
}
-func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress string, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) {
+func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) {
mappings, listErr := ReadMountMappings(grpcDialOption, filerAddress)
if listErr != nil {
diff --git a/weed/filer/filer_remote_storage_test.go b/weed/filer/remote_storage_test.go
index 9f4d7af2f..9f4d7af2f 100644
--- a/weed/filer/filer_remote_storage_test.go
+++ b/weed/filer/remote_storage_test.go
diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go
index 729da7d9b..f48c3988c 100644
--- a/weed/filer/rocksdb/rocksdb_store.go
+++ b/weed/filer/rocksdb/rocksdb_store.go
@@ -10,7 +10,7 @@ import (
"io"
"os"
- "github.com/tecbot/gorocksdb"
+ gorocksdb "github.com/linxGnu/grocksdb"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
diff --git a/weed/filer/rocksdb/rocksdb_store_test.go b/weed/filer/rocksdb/rocksdb_store_test.go
index f6e755b4b..faabcd341 100644
--- a/weed/filer/rocksdb/rocksdb_store_test.go
+++ b/weed/filer/rocksdb/rocksdb_store_test.go
@@ -1,3 +1,4 @@
+//go:build rocksdb
// +build rocksdb
package rocksdb
@@ -5,7 +6,6 @@ package rocksdb
import (
"context"
"fmt"
- "io/ioutil"
"os"
"testing"
"time"
@@ -16,8 +16,7 @@ import (
func TestCreateAndFind(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test")
- defer os.RemoveAll(dir)
+ dir := t.TempDir()
store := &RocksDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
@@ -35,7 +34,7 @@ func TestCreateAndFind(t *testing.T) {
},
}
- if err := testFiler.CreateEntry(ctx, entry1, false, false, nil); err != nil {
+ if err := testFiler.CreateEntry(ctx, entry1, false, false, nil, false); err != nil {
t.Errorf("create entry %v: %v", entry1.FullPath, err)
return
}
@@ -70,8 +69,7 @@ func TestCreateAndFind(t *testing.T) {
func TestEmptyRoot(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_test2")
- defer os.RemoveAll(dir)
+ dir := t.TempDir()
store := &RocksDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
@@ -93,8 +91,7 @@ func TestEmptyRoot(t *testing.T) {
func BenchmarkInsertEntry(b *testing.B) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
- dir, _ := ioutil.TempDir("", "seaweedfs_filer_bench")
- defer os.RemoveAll(dir)
+ dir := b.TempDir()
store := &RocksDBStore{}
store.initialize(dir)
testFiler.SetStore(store)
diff --git a/weed/filer/rocksdb/rocksdb_ttl.go b/weed/filer/rocksdb/rocksdb_ttl.go
index faed22310..7e9643083 100644
--- a/weed/filer/rocksdb/rocksdb_ttl.go
+++ b/weed/filer/rocksdb/rocksdb_ttl.go
@@ -5,7 +5,7 @@ package rocksdb
import (
"time"
- "github.com/tecbot/gorocksdb"
+ gorocksdb "github.com/linxGnu/grocksdb"
"github.com/chrislusf/seaweedfs/weed/filer"
)
@@ -38,3 +38,8 @@ func (t *TTLFilter) Filter(level int, key, val []byte) (remove bool, newVal []by
func (t *TTLFilter) Name() string {
return "TTLFilter"
}
+func (t *TTLFilter) SetIgnoreSnapshots(value bool) {
+}
+
+func (t *TTLFilter) Destroy() {
+}
diff --git a/weed/filer/s3iam_conf.go b/weed/filer/s3iam_conf.go
index 55c976915..891bf925b 100644
--- a/weed/filer/s3iam_conf.go
+++ b/weed/filer/s3iam_conf.go
@@ -2,13 +2,12 @@ package filer
import (
"bytes"
- "github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"io"
)
-func ParseS3ConfigurationFromBytes(content []byte, config *iam_pb.S3ApiConfiguration) error {
+func ParseS3ConfigurationFromBytes[T proto.Message](content []byte, config T) error {
if err := jsonpb.Unmarshal(bytes.NewBuffer(content), config); err != nil {
return err
}
diff --git a/weed/filer/sqlite/doc.go b/weed/filer/sqlite/doc.go
new file mode 100644
index 000000000..833addf54
--- /dev/null
+++ b/weed/filer/sqlite/doc.go
@@ -0,0 +1,9 @@
+/*
+
+Package sqlite is for sqlite filer store.
+
+The referenced "modernc.org/sqlite" library is too big when compiled.
+So this is only compiled in "make full_install".
+
+*/
+package sqlite
diff --git a/weed/filer/sqlite/sqlite_store.go b/weed/filer/sqlite/sqlite_store.go
index 6b055e53c..70a4bf390 100644
--- a/weed/filer/sqlite/sqlite_store.go
+++ b/weed/filer/sqlite/sqlite_store.go
@@ -1,4 +1,6 @@
+//go:build (linux || darwin || windows) && sqlite
// +build linux darwin windows
+// +build sqlite
// limited GOOS due to modernc.org/libc/unistd
diff --git a/weed/filer/sqlite/sqlite_store_unsupported.go b/weed/filer/sqlite/sqlite_store_unsupported.go
index 803c71afa..351d2e501 100644
--- a/weed/filer/sqlite/sqlite_store_unsupported.go
+++ b/weed/filer/sqlite/sqlite_store_unsupported.go
@@ -1,4 +1,5 @@
-// +build !linux,!darwin,!windows,!s390,!ppc64le,!mips64
+//go:build !linux && !darwin && !windows && !s390 && !ppc64le && !mips64 && !sqlite
+// +build !linux,!darwin,!windows,!s390,!ppc64le,!mips64,!sqlite
// limited GOOS due to modernc.org/libc/unistd
diff --git a/weed/filer/store_test/test_suite.go b/weed/filer/store_test/test_suite.go
new file mode 100644
index 000000000..ad578442c
--- /dev/null
+++ b/weed/filer/store_test/test_suite.go
@@ -0,0 +1,55 @@
+package store_test
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/stretchr/testify/assert"
+ "os"
+ "testing"
+)
+
+func TestFilerStore(t *testing.T, store filer.FilerStore) {
+ ctx := context.Background()
+
+ store.InsertEntry(ctx, makeEntry(util.FullPath("/"), true))
+ store.InsertEntry(ctx, makeEntry(util.FullPath("/a"), true))
+ store.InsertEntry(ctx, makeEntry(util.FullPath("/a/b"), true))
+ store.InsertEntry(ctx, makeEntry(util.FullPath("/a/b/c"), true))
+ for i := 0; i < 2000; i++ {
+ store.InsertEntry(ctx, makeEntry(util.FullPath(fmt.Sprintf("/a/b/c/f%05d", i)), false))
+ }
+
+ {
+ var counter int
+ lastFileName, err := store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), "", false, 3, func(entry *filer.Entry) bool {
+ counter++
+ return true
+ })
+ assert.Nil(t, err, "list directory")
+ assert.Equal(t, 3, counter, "directory list counter")
+ assert.Equal(t, "f00003", lastFileName, "directory list last file")
+ lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) bool {
+ counter++
+ return true
+ })
+ assert.Nil(t, err, "list directory")
+ assert.Equal(t, 1027, counter, "directory list counter")
+ assert.Equal(t, "f01027", lastFileName, "directory list last file")
+ }
+
+}
+
+func makeEntry(fullPath util.FullPath, isDirectory bool) *filer.Entry {
+ var mode os.FileMode
+ if isDirectory {
+ mode = os.ModeDir
+ }
+ return &filer.Entry{
+ FullPath: fullPath,
+ Attr: filer.Attr{
+ Mode: mode,
+ },
+ }
+}
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index ce0264cd3..7da9fd0a0 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -3,6 +3,7 @@ package filer
import (
"bytes"
"fmt"
+ "golang.org/x/exp/slices"
"io"
"math"
"sort"
@@ -39,11 +40,11 @@ func isSameChunks(a, b []*filer_pb.FileChunk) bool {
if len(a) != len(b) {
return false
}
- sort.Slice(a, func(i, j int) bool {
- return strings.Compare(a[i].ETag, a[j].ETag) < 0
+ slices.SortFunc(a, func(i, j *filer_pb.FileChunk) bool {
+ return strings.Compare(i.ETag, j.ETag) < 0
})
- sort.Slice(b, func(i, j int) bool {
- return strings.Compare(b[i].ETag, b[j].ETag) < 0
+ slices.SortFunc(b, func(i, j *filer_pb.FileChunk) bool {
+ return strings.Compare(i.ETag, j.ETag) < 0
})
for i := 0; i < len(a); i++ {
if a[i].ETag != b[i].ETag {
@@ -62,7 +63,7 @@ func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.R
func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
- glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
+ glog.V(4).Infof("start to stream content for chunks: %+v", chunks)
chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
fileId2Url := make(map[string][]string)
@@ -80,11 +81,23 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
fileId2Url[chunkView.FileId] = urlStrings
}
+ remaining := size
for _, chunkView := range chunkViews {
-
+ if offset < chunkView.LogicOffset {
+ gap := chunkView.LogicOffset - offset
+ remaining -= gap
+ glog.V(4).Infof("zero [%d,%d)", offset, chunkView.LogicOffset)
+ err := writeZero(writer, gap)
+ if err != nil {
+ return fmt.Errorf("write zero [%d,%d)", offset, chunkView.LogicOffset)
+ }
+ offset = chunkView.LogicOffset
+ }
urlStrings := fileId2Url[chunkView.FileId]
start := time.Now()
err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ offset += int64(chunkView.Size)
+ remaining -= int64(chunkView.Size)
stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
if err != nil {
stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
@@ -92,6 +105,13 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
}
stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
}
+ if remaining > 0 {
+ glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
+ err := writeZero(writer, remaining)
+ if err != nil {
+ return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
+ }
+ }
return nil
@@ -99,42 +119,59 @@ func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writ
// ---------------- ReadAllReader ----------------------------------
-func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
+func writeZero(w io.Writer, size int64) (err error) {
+ zeroPadding := make([]byte, 1024)
+ var written int
+ for size > 0 {
+ if size > 1024 {
+ written, err = w.Write(zeroPadding)
+ } else {
+ written, err = w.Write(zeroPadding[:size])
+ }
+ size -= int64(written)
+ if err != nil {
+ return
+ }
+ }
+ return
+}
- buffer := bytes.Buffer{}
+func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
return masterClient.LookupFileId(fileId)
}
- chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
+ chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer)))
+
+ idx := 0
for _, chunkView := range chunkViews {
urlStrings, err := lookupFileIdFn(chunkView.FileId)
if err != nil {
glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
- return nil, err
+ return err
}
- data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
+ n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset)
if err != nil {
- return nil, err
+ return err
}
- buffer.Write(data)
+ idx += n
}
- return buffer.Bytes(), nil
+ return nil
}
// ---------------- ChunkStreamReader ----------------------------------
type ChunkStreamReader struct {
- chunkViews []*ChunkView
- totalSize int64
- logicOffset int64
- buffer []byte
- bufferOffset int64
- bufferLock sync.Mutex
- chunk string
- lookupFileId wdclient.LookupFileIdFunctionType
+ chunkViews []*ChunkView
+ totalSize int64
+ logicOffset int64
+ buffer []byte
+ bufferOffset int64
+ bufferLock sync.Mutex
+ chunk string
+ lookupFileId wdclient.LookupFileIdFunctionType
}
var _ = io.ReadSeeker(&ChunkStreamReader{})
@@ -143,8 +180,8 @@ var _ = io.ReaderAt(&ChunkStreamReader{})
func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
- sort.Slice(chunkViews, func(i, j int) bool {
- return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset
+ slices.SortFunc(chunkViews, func(a, b *ChunkView) bool {
+ return a.LogicOffset < b.LogicOffset
})
var totalSize int64
@@ -206,7 +243,7 @@ func (c *ChunkStreamReader) doRead(p []byte) (n int, err error) {
}
func (c *ChunkStreamReader) isBufferEmpty() bool {
- return len(c.buffer) <= int(c.logicOffset - c.bufferOffset)
+ return len(c.buffer) <= int(c.logicOffset-c.bufferOffset)
}
func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
@@ -261,7 +298,7 @@ func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
} else if currentChunkIndex > 0 {
if insideChunk(offset, c.chunkViews[currentChunkIndex]) {
// good hit
- } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]){
+ } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]) {
currentChunkIndex -= 1
// fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
} else {
@@ -297,7 +334,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
var buffer bytes.Buffer
var shouldRetry bool
for _, urlString := range urlStrings {
- shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
+ shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
buffer.Write(data)
})
if !shouldRetry {
diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go
index 4a8e8784d..f333af38e 100644
--- a/weed/filer/tikv/tikv_store.go
+++ b/weed/filer/tikv/tikv_store.go
@@ -1,5 +1,3 @@
-// +build tikv
-
package tikv
import (
@@ -14,7 +12,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv"
)
@@ -27,8 +24,9 @@ func init() {
}
type TikvStore struct {
- client *tikv.KVStore
+ client *txnkv.Client
deleteRangeConcurrency int
+ onePC bool
}
// Basic APIs
@@ -46,12 +44,13 @@ func (store *TikvStore) Initialize(config util.Configuration, prefix string) err
if drc <= 0 {
drc = 1
}
+ store.onePC = config.GetBool(prefix + "enable_1pc")
store.deleteRangeConcurrency = drc
return store.initialize(pdAddrs)
}
func (store *TikvStore) initialize(pdAddrs []string) error {
- client, err := tikv.NewTxnClient(pdAddrs)
+ client, err := txnkv.NewClient(pdAddrs)
store.client = client
return err
}
@@ -298,6 +297,9 @@ func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context,
if err != nil {
return ctx, err
}
+ if store.onePC {
+ tx.SetEnable1PC(store.onePC)
+ }
return context.WithValue(ctx, "tx", tx), nil
}
@@ -344,6 +346,9 @@ func (store *TikvStore) getTxn(ctx context.Context) (*TxnWrapper, error) {
if err != nil {
return nil, err
}
+ if store.onePC {
+ txn.SetEnable1PC(store.onePC)
+ }
return &TxnWrapper{txn, false}, nil
}
diff --git a/weed/filer/tikv/tikv_store_kv.go b/weed/filer/tikv/tikv_store_kv.go
index 3fed7e045..dcc9acf8c 100644
--- a/weed/filer/tikv/tikv_store_kv.go
+++ b/weed/filer/tikv/tikv_store_kv.go
@@ -1,5 +1,3 @@
-// +build tikv
-
package tikv
import (
diff --git a/weed/filer/ydb/doc.go b/weed/filer/ydb/doc.go
new file mode 100644
index 000000000..6ade3a8d8
--- /dev/null
+++ b/weed/filer/ydb/doc.go
@@ -0,0 +1,9 @@
+/*
+
+Package ydb is for YDB filer store.
+
+The referenced "github.com/ydb-platform/ydb-go-sdk/v3" library is too big when compiled.
+So this is only compiled in "make full_install".
+
+*/
+package ydb
diff --git a/weed/filer/ydb/readme.md b/weed/filer/ydb/readme.md
new file mode 100644
index 000000000..b617461fd
--- /dev/null
+++ b/weed/filer/ydb/readme.md
@@ -0,0 +1,27 @@
+## YDB
+
+database: https://github.com/ydb-platform/ydb
+
+go driver: https://github.com/ydb-platform/ydb-go-sdk
+
+options:
+
+```
+[ydb]
+enabled=true
+dsn=grpcs://ydb-ru.yandex.net:2135/?database=/ru/home/username/db
+prefix="seaweedfs"
+useBucketPrefix=true
+poolSizeLimit=50
+dialTimeOut = 10
+```
+
+Authenticate produced with one of next environment variables:
+ * `YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS=<path/to/sa_key_file>` — used service account key file by path
+ * `YDB_ANONYMOUS_CREDENTIALS="1"` — used for authenticate with anonymous access. Anonymous access needs for connect to testing YDB installation
+ * `YDB_METADATA_CREDENTIALS="1"` — used metadata service for authenticate to YDB from yandex cloud virtual machine or from yandex function
+ * `YDB_ACCESS_TOKEN_CREDENTIALS=<access_token>` — used for authenticate to YDB with short-life access token. For example, access token may be IAM token
+ * `YDB_CONNECTION_STRING="grpcs://endpoint/?database=database"`
+
+ * i test using this dev database:
+`make dev_ydb`
diff --git a/weed/filer/ydb/ydb_queries.go b/weed/filer/ydb/ydb_queries.go
new file mode 100644
index 000000000..f1db9a143
--- /dev/null
+++ b/weed/filer/ydb/ydb_queries.go
@@ -0,0 +1,72 @@
+//go:build ydb
+// +build ydb
+
+package ydb
+
+import asql "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
+
+const (
+ upsertQuery = `
+ PRAGMA TablePathPrefix("%v");
+ DECLARE $dir_hash AS int64;
+ DECLARE $directory AS Utf8;
+ DECLARE $name AS Utf8;
+ DECLARE $meta AS String;
+ DECLARE $expire_at AS Optional<uint32>;
+
+ UPSERT INTO ` + asql.DEFAULT_TABLE + `
+ (dir_hash, name, directory, meta, expire_at)
+ VALUES
+ ($dir_hash, $name, $directory, $meta, $expire_at);`
+
+ deleteQuery = `
+ PRAGMA TablePathPrefix("%v");
+ DECLARE $dir_hash AS int64;
+ DECLARE $name AS Utf8;
+
+ DELETE FROM ` + asql.DEFAULT_TABLE + `
+ WHERE dir_hash = $dir_hash AND name = $name;`
+
+ findQuery = `
+ PRAGMA TablePathPrefix("%v");
+ DECLARE $dir_hash AS int64;
+ DECLARE $name AS Utf8;
+
+ SELECT meta
+ FROM ` + asql.DEFAULT_TABLE + `
+ WHERE dir_hash = $dir_hash AND name = $name;`
+
+ deleteFolderChildrenQuery = `
+ PRAGMA TablePathPrefix("%v");
+ DECLARE $dir_hash AS int64;
+ DECLARE $directory AS Utf8;
+
+ DELETE FROM ` + asql.DEFAULT_TABLE + `
+ WHERE dir_hash = $dir_hash AND directory = $directory;`
+
+ listDirectoryQuery = `
+ PRAGMA TablePathPrefix("%v");
+ DECLARE $dir_hash AS int64;
+ DECLARE $directory AS Utf8;
+ DECLARE $start_name AS Utf8;
+ DECLARE $prefix AS Utf8;
+ DECLARE $limit AS Uint64;
+
+ SELECT name, meta
+ FROM ` + asql.DEFAULT_TABLE + `
+ WHERE dir_hash = $dir_hash AND directory = $directory and name > $start_name and name LIKE $prefix
+ ORDER BY name ASC LIMIT $limit;`
+
+ listInclusiveDirectoryQuery = `
+ PRAGMA TablePathPrefix("%v");
+ DECLARE $dir_hash AS int64;
+ DECLARE $directory AS Utf8;
+ DECLARE $start_name AS Utf8;
+ DECLARE $prefix AS Utf8;
+ DECLARE $limit AS Uint64;
+
+ SELECT name, meta
+ FROM ` + asql.DEFAULT_TABLE + `
+ WHERE dir_hash = $dir_hash AND directory = $directory and name >= $start_name and name LIKE $prefix
+ ORDER BY name ASC LIMIT $limit;`
+)
diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go
new file mode 100644
index 000000000..1e3a55a09
--- /dev/null
+++ b/weed/filer/ydb/ydb_store.go
@@ -0,0 +1,413 @@
+//go:build ydb
+// +build ydb
+
+package ydb
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ environ "github.com/ydb-platform/ydb-go-sdk-auth-environ"
+ "github.com/ydb-platform/ydb-go-sdk/v3"
+ "github.com/ydb-platform/ydb-go-sdk/v3/sugar"
+ "github.com/ydb-platform/ydb-go-sdk/v3/table"
+ "github.com/ydb-platform/ydb-go-sdk/v3/table/result"
+ "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
+ "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
+ "os"
+ "path"
+ "strings"
+ "sync"
+ "time"
+)
+
+const (
+ defaultDialTimeOut = 10
+)
+
+var (
+ roTX = table.TxControl(
+ table.BeginTx(table.WithOnlineReadOnly()),
+ table.CommitTx(),
+ )
+ rwTX = table.DefaultTxControl()
+)
+
+type YdbStore struct {
+ DB ydb.Connection
+ dirBuckets string
+ tablePathPrefix string
+ SupportBucketTable bool
+ dbs map[string]bool
+ dbsLock sync.Mutex
+}
+
+func init() {
+ filer.Stores = append(filer.Stores, &YdbStore{})
+}
+
+func (store *YdbStore) GetName() string {
+ return "ydb"
+}
+
+func (store *YdbStore) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString("filer.options.buckets_folder"),
+ configuration.GetString(prefix+"dsn"),
+ configuration.GetString(prefix+"prefix"),
+ configuration.GetBool(prefix+"useBucketPrefix"),
+ configuration.GetInt(prefix+"dialTimeOut"),
+ configuration.GetInt(prefix+"poolSizeLimit"),
+ )
+}
+
+func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix string, useBucketPrefix bool, dialTimeOut int, poolSizeLimit int) (err error) {
+ store.dirBuckets = dirBuckets
+ store.SupportBucketTable = useBucketPrefix
+ if store.SupportBucketTable {
+ glog.V(0).Infof("enabled BucketPrefix")
+ }
+ store.dbs = make(map[string]bool)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ if dialTimeOut == 0 {
+ dialTimeOut = defaultDialTimeOut
+ }
+ opts := []ydb.Option{
+ ydb.WithDialTimeout(time.Duration(dialTimeOut) * time.Second),
+ environ.WithEnvironCredentials(ctx),
+ }
+ if poolSizeLimit > 0 {
+ opts = append(opts, ydb.WithSessionPoolSizeLimit(poolSizeLimit))
+ }
+ if dsn == "" {
+ dsn = os.Getenv("YDB_CONNECTION_STRING")
+ }
+ store.DB, err = ydb.Open(ctx, dsn, opts...)
+ if err != nil {
+ if store.DB != nil {
+ _ = store.DB.Close(ctx)
+ store.DB = nil
+ }
+ return fmt.Errorf("can not connect to %s error: %v", dsn, err)
+ }
+
+ store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix)
+ if err = sugar.MakeRecursive(ctx, store.DB, store.tablePathPrefix); err != nil {
+ return fmt.Errorf("MakeRecursive %s : %v", store.tablePathPrefix, err)
+ }
+
+ if err = store.createTable(ctx, store.tablePathPrefix); err != nil {
+ glog.Errorf("createTable %s: %v", store.tablePathPrefix, err)
+ }
+ return err
+}
+
+func (store *YdbStore) doTxOrDB(ctx context.Context, query *string, params *table.QueryParameters, tc *table.TransactionControl, processResultFunc func(res result.Result) error) (err error) {
+ var res result.Result
+ if tx, ok := ctx.Value("tx").(table.Transaction); ok {
+ res, err = tx.Execute(ctx, *query, params)
+ if err != nil {
+ return fmt.Errorf("execute transaction: %v", err)
+ }
+ } else {
+ err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
+ _, res, err = s.Execute(ctx, tc, *query, params)
+ if err != nil {
+ return fmt.Errorf("execute statement: %v", err)
+ }
+ return nil
+ })
+ }
+ if err != nil {
+ return err
+ }
+ if res != nil {
+ defer func() { _ = res.Close() }()
+ if processResultFunc != nil {
+ if err = processResultFunc(res); err != nil {
+ return fmt.Errorf("process result: %v", err)
+ }
+ }
+ }
+ return err
+}
+
+func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ dir, name := entry.FullPath.DirAndName()
+ meta, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encode %s: %s", entry.FullPath, err)
+ }
+
+ if len(entry.Chunks) > filer.CountEntryChunksForGzip {
+ meta = util.MaybeGzipData(meta)
+ }
+ tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
+ fileMeta := FileMeta{util.HashStringToLong(dir), name, *shortDir, meta}
+ return store.doTxOrDB(ctx, withPragma(tablePathPrefix, upsertQuery), fileMeta.queryParameters(entry.TtlSec), rwTX, nil)
+}
+
+func (store *YdbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ return store.insertOrUpdateEntry(ctx, entry)
+}
+
+func (store *YdbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ return store.insertOrUpdateEntry(ctx, entry)
+}
+
+func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+ dir, name := fullpath.DirAndName()
+ var data []byte
+ entryFound := false
+ tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
+ query := withPragma(tablePathPrefix, findQuery)
+ queryParams := table.NewQueryParameters(
+ table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
+ table.ValueParam("$name", types.UTF8Value(name)))
+
+ err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error {
+ if !res.NextResultSet(ctx) || !res.HasNextRow() {
+ return nil
+ }
+ for res.NextRow() {
+ if err = res.ScanNamed(named.OptionalWithDefault("meta", &data)); err != nil {
+ return fmt.Errorf("scanNamed %s : %v", fullpath, err)
+ }
+ entryFound = true
+ return nil
+ }
+ return res.Err()
+ })
+ if err != nil {
+ return nil, err
+ }
+ if !entryFound {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
+ return nil, fmt.Errorf("decode %s : %v", fullpath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
+ dir, name := fullpath.DirAndName()
+ tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
+ query := withPragma(tablePathPrefix, deleteQuery)
+ queryParams := table.NewQueryParameters(
+ table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
+ table.ValueParam("$name", types.UTF8Value(name)))
+
+ return store.doTxOrDB(ctx, query, queryParams, rwTX, nil)
+}
+
+func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+ dir, _ := fullpath.DirAndName()
+ tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
+ query := withPragma(tablePathPrefix, deleteFolderChildrenQuery)
+ queryParams := table.NewQueryParameters(
+ table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
+ table.ValueParam("$directory", types.UTF8Value(*shortDir)))
+
+ return store.doTxOrDB(ctx, query, queryParams, rwTX, nil)
+}
+
+func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
+}
+
+func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ dir := string(dirPath)
+ tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
+ var query *string
+ if includeStartFile {
+ query = withPragma(tablePathPrefix, listInclusiveDirectoryQuery)
+ } else {
+ query = withPragma(tablePathPrefix, listDirectoryQuery)
+ }
+ truncated := true
+ eachEntryFuncIsNotBreake := true
+ entryCount := int64(0)
+ for truncated && eachEntryFuncIsNotBreake {
+ if lastFileName != "" {
+ startFileName = lastFileName
+ if includeStartFile {
+ query = withPragma(tablePathPrefix, listDirectoryQuery)
+ }
+ }
+ restLimit := limit - entryCount
+ queryParams := table.NewQueryParameters(
+ table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
+ table.ValueParam("$directory", types.UTF8Value(*shortDir)),
+ table.ValueParam("$start_name", types.UTF8Value(startFileName)),
+ table.ValueParam("$prefix", types.UTF8Value(prefix+"%")),
+ table.ValueParam("$limit", types.Uint64Value(uint64(restLimit))),
+ )
+ err = store.doTxOrDB(ctx, query, queryParams, roTX, func(res result.Result) error {
+ var name string
+ var data []byte
+ if !res.NextResultSet(ctx) || !res.HasNextRow() {
+ truncated = false
+ return nil
+ }
+ truncated = res.CurrentResultSet().Truncated()
+ for res.NextRow() {
+ if err := res.ScanNamed(
+ named.OptionalWithDefault("name", &name),
+ named.OptionalWithDefault("meta", &data)); err != nil {
+ return fmt.Errorf("list scanNamed %s : %v", dir, err)
+ }
+ lastFileName = name
+ entry := &filer.Entry{
+ FullPath: util.NewFullPath(dir, name),
+ }
+ if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
+ return fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
+ }
+ if !eachEntryFunc(entry) {
+ eachEntryFuncIsNotBreake = false
+ break
+ }
+ entryCount += 1
+ }
+ return res.Err()
+ })
+ }
+ if err != nil {
+ return lastFileName, err
+ }
+ return lastFileName, nil
+}
+
+func (store *YdbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ session, err := store.DB.Table().CreateSession(ctx)
+ if err != nil {
+ return ctx, err
+ }
+ tx, err := session.BeginTransaction(ctx, table.TxSettings(table.WithSerializableReadWrite()))
+ if err != nil {
+ return ctx, err
+ }
+ return context.WithValue(ctx, "tx", tx), nil
+}
+
+func (store *YdbStore) CommitTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(table.Transaction); ok {
+ _, err := tx.CommitTx(ctx)
+ return err
+ }
+ return nil
+}
+
+func (store *YdbStore) RollbackTransaction(ctx context.Context) error {
+ if tx, ok := ctx.Value("tx").(table.Transaction); ok {
+ return tx.Rollback(ctx)
+ }
+ return nil
+}
+
+func (store *YdbStore) Shutdown() {
+ _ = store.DB.Close(context.Background())
+}
+
+func (store *YdbStore) CanDropWholeBucket() bool {
+ return store.SupportBucketTable
+}
+
+func (store *YdbStore) OnBucketCreation(bucket string) {
+ store.dbsLock.Lock()
+ defer store.dbsLock.Unlock()
+
+ if err := store.createTable(context.Background(),
+ path.Join(store.tablePathPrefix, bucket)); err != nil {
+ glog.Errorf("createTable %s: %v", bucket, err)
+ }
+
+ if store.dbs == nil {
+ return
+ }
+ store.dbs[bucket] = true
+}
+
+func (store *YdbStore) OnBucketDeletion(bucket string) {
+ store.dbsLock.Lock()
+ defer store.dbsLock.Unlock()
+
+ if err := store.deleteTable(context.Background(),
+ path.Join(store.tablePathPrefix, bucket)); err != nil {
+ glog.Errorf("deleteTable %s: %v", bucket, err)
+ }
+
+ if store.dbs == nil {
+ return
+ }
+ delete(store.dbs, bucket)
+}
+
+func (store *YdbStore) createTable(ctx context.Context, prefix string) error {
+ return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
+ return s.CreateTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE), createTableOptions()...)
+ })
+}
+
+func (store *YdbStore) deleteTable(ctx context.Context, prefix string) error {
+ if !store.SupportBucketTable {
+ return nil
+ }
+ if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
+ return s.DropTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE))
+ }); err != nil {
+ return err
+ }
+ glog.V(4).Infof("deleted table %s", prefix)
+
+ return nil
+}
+
+func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPrefix *string, shortDir *string) {
+ tablePathPrefix = &store.tablePathPrefix
+ shortDir = dir
+ if !store.SupportBucketTable {
+ return
+ }
+
+ prefixBuckets := store.dirBuckets + "/"
+ if strings.HasPrefix(*dir, prefixBuckets) {
+ // detect bucket
+ bucketAndDir := (*dir)[len(prefixBuckets):]
+ var bucket string
+ if t := strings.Index(bucketAndDir, "/"); t > 0 {
+ bucket = bucketAndDir[:t]
+ } else if t < 0 {
+ bucket = bucketAndDir
+ }
+ if bucket == "" {
+ return
+ }
+
+ store.dbsLock.Lock()
+ defer store.dbsLock.Unlock()
+
+ tablePathPrefixWithBucket := path.Join(store.tablePathPrefix, bucket)
+ if _, found := store.dbs[bucket]; !found {
+ if err := store.createTable(ctx, tablePathPrefixWithBucket); err == nil {
+ store.dbs[bucket] = true
+ glog.V(4).Infof("created table %s", tablePathPrefixWithBucket)
+ } else {
+ glog.Errorf("createTable %s: %v", tablePathPrefixWithBucket, err)
+ }
+ }
+ tablePathPrefix = &tablePathPrefixWithBucket
+ }
+ return
+}
diff --git a/weed/filer/ydb/ydb_store_kv.go b/weed/filer/ydb/ydb_store_kv.go
new file mode 100644
index 000000000..72bbfff42
--- /dev/null
+++ b/weed/filer/ydb/ydb_store_kv.go
@@ -0,0 +1,75 @@
+//go:build ydb
+// +build ydb
+
+package ydb
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filer/abstract_sql"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/ydb-platform/ydb-go-sdk/v3/table"
+ "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
+ "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
+)
+
+func (store *YdbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
+ fileMeta := FileMeta{dirHash, name, dirStr, value}
+ return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
+ _, _, err = s.Execute(ctx, rwTX, *withPragma(&store.tablePathPrefix, upsertQuery),
+ fileMeta.queryParameters(0))
+ if err != nil {
+ return fmt.Errorf("kv put execute %s: %v", util.NewFullPath(dirStr, name).Name(), err)
+ }
+ return nil
+ })
+}
+
+func (store *YdbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+ dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
+ valueFound := false
+ err = store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
+ _, res, err := s.Execute(ctx, roTX, *withPragma(&store.tablePathPrefix, findQuery),
+ table.NewQueryParameters(
+ table.ValueParam("$dir_hash", types.Int64Value(dirHash)),
+ table.ValueParam("$name", types.UTF8Value(name))))
+ if err != nil {
+ return fmt.Errorf("kv get execute %s: %v", util.NewFullPath(dirStr, name).Name(), err)
+ }
+ defer func() { _ = res.Close() }()
+ if !res.NextResultSet(ctx) || !res.HasNextRow() {
+ return nil
+ }
+ for res.NextRow() {
+ if err := res.ScanNamed(named.OptionalWithDefault("meta", &value)); err != nil {
+ return fmt.Errorf("scanNamed %s : %v", util.NewFullPath(dirStr, name).Name(), err)
+ }
+ valueFound = true
+ return nil
+ }
+ return res.Err()
+ })
+
+ if !valueFound {
+ return nil, filer.ErrKvNotFound
+ }
+
+ return value, nil
+}
+
+func (store *YdbStore) KvDelete(ctx context.Context, key []byte) (err error) {
+ dirStr, dirHash, name := abstract_sql.GenDirAndName(key)
+ return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
+ _, _, err = s.Execute(ctx, rwTX, *withPragma(&store.tablePathPrefix, deleteQuery),
+ table.NewQueryParameters(
+ table.ValueParam("$dir_hash", types.Int64Value(dirHash)),
+ table.ValueParam("$name", types.UTF8Value(name))))
+ if err != nil {
+ return fmt.Errorf("kv delete %s: %v", util.NewFullPath(dirStr, name).Name(), err)
+ }
+ return nil
+ })
+
+}
diff --git a/weed/filer/ydb/ydb_store_test.go b/weed/filer/ydb/ydb_store_test.go
new file mode 100644
index 000000000..cb3c77018
--- /dev/null
+++ b/weed/filer/ydb/ydb_store_test.go
@@ -0,0 +1,19 @@
+//go:build ydb
+// +build ydb
+
+package ydb
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer/store_test"
+ "testing"
+)
+
+func TestStore(t *testing.T) {
+ // run "make test_ydb" under docker folder.
+ // to set up local env
+ if false {
+ store := &YdbStore{}
+ store.initialize("/buckets", "grpc://localhost:2136/?database=local", "seaweedfs", true, 10, 50)
+ store_test.TestFilerStore(t, store)
+ }
+}
diff --git a/weed/filer/ydb/ydb_types.go b/weed/filer/ydb/ydb_types.go
new file mode 100644
index 000000000..4e5100236
--- /dev/null
+++ b/weed/filer/ydb/ydb_types.go
@@ -0,0 +1,56 @@
+//go:build ydb
+// +build ydb
+
+package ydb
+
+import (
+ "fmt"
+ "github.com/ydb-platform/ydb-go-sdk/v3/table"
+ "github.com/ydb-platform/ydb-go-sdk/v3/table/options"
+ "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
+)
+
+type FileMeta struct {
+ DirHash int64 `ydb:"type:int64"`
+ Name string `ydb:"type:utf8"`
+ Directory string `ydb:"type:utf8"`
+ Meta []byte `ydb:"type:string"`
+}
+
+type FileMetas []FileMeta
+
+func (fm *FileMeta) queryParameters(ttlSec int32) *table.QueryParameters {
+ var expireAtValue types.Value
+ if ttlSec > 0 {
+ expireAtValue = types.Uint32Value(uint32(ttlSec))
+ } else {
+ expireAtValue = types.NullValue(types.TypeUint32)
+ }
+ return table.NewQueryParameters(
+ table.ValueParam("$dir_hash", types.Int64Value(fm.DirHash)),
+ table.ValueParam("$directory", types.UTF8Value(fm.Directory)),
+ table.ValueParam("$name", types.UTF8Value(fm.Name)),
+ table.ValueParam("$meta", types.StringValue(fm.Meta)),
+ table.ValueParam("$expire_at", expireAtValue))
+}
+
+func createTableOptions() []options.CreateTableOption {
+ columnUnit := options.TimeToLiveUnitSeconds
+ return []options.CreateTableOption{
+ options.WithColumn("dir_hash", types.Optional(types.TypeInt64)),
+ options.WithColumn("directory", types.Optional(types.TypeUTF8)),
+ options.WithColumn("name", types.Optional(types.TypeUTF8)),
+ options.WithColumn("meta", types.Optional(types.TypeString)),
+ options.WithColumn("expire_at", types.Optional(types.TypeUint32)),
+ options.WithPrimaryKeyColumn("dir_hash", "name"),
+ options.WithTimeToLiveSettings(options.TimeToLiveSettings{
+ ColumnName: "expire_at",
+ ColumnUnit: &columnUnit,
+ Mode: options.TimeToLiveModeValueSinceUnixEpoch},
+ ),
+ }
+}
+func withPragma(prefix *string, query string) *string {
+ queryWithPragma := fmt.Sprintf(query, *prefix)
+ return &queryWithPragma
+}