aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2020-09-17 10:29:32 +0800
committerGitHub <noreply@github.com>2020-09-17 10:29:32 +0800
commitf71f7fcf99ba6d64bfa49fd7411e06bdc2b9d591 (patch)
treee0e98625edaab040eed21fd1a8b277f2c0546ad3 /weed/filer
parent23baa3c36ce468a36d89abae59f4411cdc446043 (diff)
parent5eee4983f36f55a2a01381e8af278b28919dbe90 (diff)
downloadseaweedfs-f71f7fcf99ba6d64bfa49fd7411e06bdc2b9d591.tar.xz
seaweedfs-f71f7fcf99ba6d64bfa49fd7411e06bdc2b9d591.zip
Merge pull request #15 from chrislusf/master
sync
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go22
-rw-r--r--weed/filer/cassandra/cassandra_store.go8
-rw-r--r--weed/filer/cassandra/cassandra_store_kv.go51
-rw-r--r--weed/filer/elastic/v7/elastic_store.go338
-rw-r--r--weed/filer/elastic/v7/elastic_store_kv.go65
-rw-r--r--weed/filer/etcd/etcd_store.go12
-rw-r--r--weed/filer/etcd/etcd_store_kv.go2
-rw-r--r--weed/filer/filechunks.go5
-rw-r--r--weed/filer/filer.go32
-rw-r--r--weed/filer/filer_delete_entry.go8
-rw-r--r--weed/filer/filer_notify.go11
-rw-r--r--weed/filer/filerstore.go5
-rw-r--r--weed/filer/leveldb/leveldb_store.go8
-rw-r--r--weed/filer/leveldb2/leveldb2_local_store.go43
-rw-r--r--weed/filer/leveldb2/leveldb2_store.go9
-rw-r--r--weed/filer/meta_aggregator.go132
-rw-r--r--weed/filer/mongodb/mongodb_store.go10
-rw-r--r--weed/filer/mongodb/mongodb_store_kv.go4
-rw-r--r--weed/filer/mysql/mysql_store.go4
-rw-r--r--weed/filer/postgres/postgres_store.go4
-rw-r--r--weed/filer/redis/universal_redis_store.go6
-rw-r--r--weed/filer/redis2/universal_redis_store.go18
22 files changed, 676 insertions, 121 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index 368bec973..7c95ffb57 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -67,15 +67,21 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
+ if len(entry.Chunks) > 50 {
+ meta = util.MaybeGzipData(meta)
+ }
+
res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta)
- if err != nil {
- if !strings.Contains(strings.ToLower(err.Error()), "duplicate") {
- return fmt.Errorf("kv insert: %s", err)
- }
+ if err == nil {
+ return
+ }
+
+ if !strings.Contains(strings.ToLower(err.Error()), "duplicate") {
+ return fmt.Errorf("kv insert: %s", err)
}
// now the insert failed possibly due to duplication constraints
- glog.V(1).Infof("insert %s falls back to update: %s", entry.FullPath, err)
+ glog.V(1).Infof("insert %s falls back to update: %v", entry.FullPath, err)
res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
if err != nil {
@@ -126,7 +132,7 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full
entry := &filer.Entry{
FullPath: fullpath,
}
- if err := entry.DecodeAttributesAndChunks(data); err != nil {
+ if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -171,7 +177,7 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
sqlText = store.SqlListInclusive
}
- rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), prefix, limit)
+ rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), prefix+"%", limit)
if err != nil {
return nil, fmt.Errorf("list %s : %v", fullpath, err)
}
@@ -188,7 +194,7 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
entry := &filer.Entry{
FullPath: util.NewFullPath(string(fullpath), name),
}
- if err = entry.DecodeAttributesAndChunks(data); err != nil {
+ if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
}
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
index fd161b1f1..250db629a 100644
--- a/weed/filer/cassandra/cassandra_store.go
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -60,6 +60,10 @@ func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
+ if len(entry.Chunks) > 50 {
+ meta = util.MaybeGzipData(meta)
+ }
+
if err := store.session.Query(
"INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
dir, name, meta, entry.TtlSec).Exec(); err != nil {
@@ -93,7 +97,7 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa
entry = &filer.Entry{
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks(data)
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -144,7 +148,7 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath
entry := &filer.Entry{
FullPath: util.NewFullPath(string(fullpath), name),
}
- if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
+ if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
diff --git a/weed/filer/cassandra/cassandra_store_kv.go b/weed/filer/cassandra/cassandra_store_kv.go
index f7668746f..b6238cf0e 100644
--- a/weed/filer/cassandra/cassandra_store_kv.go
+++ b/weed/filer/cassandra/cassandra_store_kv.go
@@ -2,17 +2,60 @@ package cassandra
import (
"context"
+ "fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/gocql/gocql"
)
func (store *CassandraStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
- return filer.ErrKvNotImplemented
+ dir, name := genDirAndName(key)
+
+ if err := store.session.Query(
+ "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
+ dir, name, value, 0).Exec(); err != nil {
+ return fmt.Errorf("kv insert: %s", err)
+ }
+
+ return nil
}
-func (store *CassandraStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
- return nil, filer.ErrKvNotImplemented
+func (store *CassandraStore) KvGet(ctx context.Context, key []byte) (data []byte, err error) {
+ dir, name := genDirAndName(key)
+
+ if err := store.session.Query(
+ "SELECT meta FROM filemeta WHERE directory=? AND name=?",
+ dir, name).Consistency(gocql.One).Scan(&data); err != nil {
+ if err != gocql.ErrNotFound {
+ return nil, filer.ErrKvNotFound
+ }
+ }
+
+ if len(data) == 0 {
+ return nil, filer.ErrKvNotFound
+ }
+
+ return data, nil
}
func (store *CassandraStore) KvDelete(ctx context.Context, key []byte) (err error) {
- return filer.ErrKvNotImplemented
+ dir, name := genDirAndName(key)
+
+ if err := store.session.Query(
+ "DELETE FROM filemeta WHERE directory=? AND name=?",
+ dir, name).Exec(); err != nil {
+ return fmt.Errorf("kv delete: %v", err)
+ }
+
+ return nil
+}
+
+func genDirAndName(key []byte) (dir string, name string) {
+ for len(key) < 8 {
+ key = append(key, 0)
+ }
+
+ dir = string(key[:8])
+ name = string(key[8:])
+
+ return
}
diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go
new file mode 100644
index 000000000..ec88e10a5
--- /dev/null
+++ b/weed/filer/elastic/v7/elastic_store.go
@@ -0,0 +1,338 @@
+package elastic
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
+ jsoniter "github.com/json-iterator/go"
+ elastic "github.com/olivere/elastic/v7"
+)
+
+var (
+ indexType = "_doc"
+ indexPrefix = ".seaweedfs_"
+ indexKV = ".seaweedfs_kv_entries"
+ kvMappings = ` {
+ "mappings": {
+ "enabled": false,
+ "properties": {
+ "Value":{
+ "type": "binary"
+ }
+ }
+ }
+ }`
+)
+
+type ESEntry struct {
+ ParentId string `json:"ParentId"`
+ Entry *filer.Entry
+}
+
+type ESKVEntry struct {
+ Value []byte `json:"Value"`
+}
+
+func init() {
+ filer.Stores = append(filer.Stores, &ElasticStore{})
+}
+
+type ElasticStore struct {
+ client *elastic.Client
+ maxPageSize int
+}
+
+func (store *ElasticStore) GetName() string {
+ return "elastic7"
+}
+
+func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
+ options := []elastic.ClientOptionFunc{}
+ servers := configuration.GetStringSlice(prefix + "servers")
+ options = append(options, elastic.SetURL(servers...))
+ username := configuration.GetString(prefix + "username")
+ password := configuration.GetString(prefix + "password")
+ if username != "" && password != "" {
+ options = append(options, elastic.SetBasicAuth(username, password))
+ }
+ options = append(options, elastic.SetSniff(configuration.GetBool(prefix+"sniff_enabled")))
+ options = append(options, elastic.SetHealthcheck(configuration.GetBool(prefix+"healthcheck_enabled")))
+ store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window")
+ if store.maxPageSize <= 0 {
+ store.maxPageSize = 10000
+ }
+ glog.Infof("filer store elastic endpoints: %v.", servers)
+ return store.initialize(options)
+}
+
+func (store *ElasticStore) initialize(options []elastic.ClientOptionFunc) (err error) {
+ ctx := context.Background()
+ store.client, err = elastic.NewClient(options...)
+ if err != nil {
+ return fmt.Errorf("init elastic %v.", err)
+ }
+ if ok, err := store.client.IndexExists(indexKV).Do(ctx); err == nil && !ok {
+ _, err = store.client.CreateIndex(indexKV).Body(kvMappings).Do(ctx)
+ if err != nil {
+ return fmt.Errorf("create index(%s) %v.", indexKV, err)
+ }
+ }
+ return nil
+}
+
+func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *ElasticStore) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+ return nil, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ index := getIndex(entry.FullPath)
+ dir, _ := entry.FullPath.DirAndName()
+ id := weed_util.Md5String([]byte(entry.FullPath))
+ esEntry := &ESEntry{
+ ParentId: weed_util.Md5String([]byte(dir)),
+ Entry: entry,
+ }
+ value, err := jsoniter.Marshal(esEntry)
+ if err != nil {
+ glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
+ return fmt.Errorf("insert entry %v.", err)
+ }
+ _, err = store.client.Index().
+ Index(index).
+ Type(indexType).
+ Id(id).
+ BodyJson(string(value)).
+ Do(ctx)
+ if err != nil {
+ glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
+ return fmt.Errorf("insert entry %v.", err)
+ }
+ return nil
+}
+
+func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
+ index := getIndex(fullpath)
+ id := weed_util.Md5String([]byte(fullpath))
+ searchResult, err := store.client.Get().
+ Index(index).
+ Type(indexType).
+ Id(id).
+ Do(ctx)
+ if elastic.IsNotFound(err) {
+ return nil, filer_pb.ErrNotFound
+ }
+ if searchResult != nil && searchResult.Found {
+ esEntry := &ESEntry{
+ ParentId: "",
+ Entry: &filer.Entry{},
+ }
+ err := jsoniter.Unmarshal(searchResult.Source, esEntry)
+ return esEntry.Entry, err
+ }
+ glog.Errorf("find entry(%s),%v.", string(fullpath), err)
+ return nil, filer_pb.ErrNotFound
+}
+
+func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ index := getIndex(fullpath)
+ id := weed_util.Md5String([]byte(fullpath))
+ if strings.Count(string(fullpath), "/") == 1 {
+ return store.deleteIndex(ctx, index)
+ }
+ return store.deleteEntry(ctx, index, id)
+}
+
+func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) {
+ deleteResult, err := store.client.DeleteIndex(index).Do(ctx)
+ if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
+ return nil
+ }
+ glog.Errorf("delete index(%s) %v.", index, err)
+ return err
+}
+
+func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) {
+ deleteResult, err := store.client.Delete().
+ Index(index).
+ Type(indexType).
+ Id(id).
+ Do(ctx)
+ if err == nil {
+ if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
+ return nil
+ }
+ }
+ glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err)
+ return fmt.Errorf("delete entry %v.", err)
+}
+
+func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
+ if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil {
+ for _, entry := range entries {
+ store.DeleteEntry(ctx, entry.FullPath)
+ }
+ }
+ return nil
+}
+
+func (store *ElasticStore) ListDirectoryEntries(
+ ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
+) (entries []*filer.Entry, err error) {
+ if string(fullpath) == "/" {
+ return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit)
+ }
+ return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
+}
+
+func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
+ indexResult, err := store.client.CatIndices().Do(ctx)
+ if err != nil {
+ glog.Errorf("list indices %v.", err)
+ return entries, err
+ }
+ for _, index := range indexResult {
+ if index.Index == indexKV {
+ continue
+ }
+ if strings.HasPrefix(index.Index, indexPrefix) {
+ if entry, err := store.FindEntry(ctx,
+ weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {
+ fileName := getFileName(entry.FullPath)
+ if fileName == startFileName && !inclusive {
+ continue
+ }
+ limit--
+ if limit < 0 {
+ break
+ }
+ entries = append(entries, entry)
+ }
+ }
+ }
+ return entries, nil
+}
+
+func (store *ElasticStore) listDirectoryEntries(
+ ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
+) (entries []*filer.Entry, err error) {
+ first := true
+ index := getIndex(fullpath)
+ nextStart := ""
+ parentId := weed_util.Md5String([]byte(fullpath))
+ if _, err := store.client.Refresh(index).Do(ctx); err != nil {
+ if elastic.IsNotFound(err) {
+ store.client.CreateIndex(index).Do(ctx)
+ return entries, nil
+ }
+ }
+ for {
+ result := &elastic.SearchResult{}
+ if (startFileName == "" && first) || inclusive {
+ if result, err = store.search(ctx, index, parentId); err != nil {
+ glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
+ return entries, err
+ }
+ } else {
+ fullPath := string(fullpath) + "/" + startFileName
+ if !first {
+ fullPath = nextStart
+ }
+ after := weed_util.Md5String([]byte(fullPath))
+ if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
+ glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
+ return entries, err
+ }
+ }
+ first = false
+ for _, hit := range result.Hits.Hits {
+ esEntry := &ESEntry{
+ ParentId: "",
+ Entry: &filer.Entry{},
+ }
+ if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil {
+ limit--
+ if limit < 0 {
+ return entries, nil
+ }
+ nextStart = string(esEntry.Entry.FullPath)
+ fileName := getFileName(esEntry.Entry.FullPath)
+ if fileName == startFileName && !inclusive {
+ continue
+ }
+ entries = append(entries, esEntry.Entry)
+ }
+ }
+ if len(result.Hits.Hits) < store.maxPageSize {
+ break
+ }
+ }
+ return entries, nil
+}
+
+func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) {
+ if count, err := store.client.Count(index).Do(ctx); err == nil && count == 0 {
+ return &elastic.SearchResult{
+ Hits: &elastic.SearchHits{
+ Hits: make([]*elastic.SearchHit, 0)},
+ }, nil
+ }
+ queryResult, err := store.client.Search().
+ Index(index).
+ Query(elastic.NewMatchQuery("ParentId", parentId)).
+ Size(store.maxPageSize).
+ Sort("_id", false).
+ Do(ctx)
+ return queryResult, err
+}
+
+func (store *ElasticStore) searchAfter(ctx context.Context, index, parentId, after string) (result *elastic.SearchResult, err error) {
+ queryResult, err := store.client.Search().
+ Index(index).
+ Query(elastic.NewMatchQuery("ParentId", parentId)).
+ SearchAfter(after).
+ Size(store.maxPageSize).
+ Sort("_id", false).
+ Do(ctx)
+ return queryResult, err
+
+}
+
+func (store *ElasticStore) Shutdown() {
+ store.client.Stop()
+}
+
+func getIndex(fullpath weed_util.FullPath) string {
+ path := strings.Split(string(fullpath), "/")
+ if len(path) > 1 {
+ return indexPrefix + path[1]
+ }
+ return ""
+}
+
+func getFileName(fullpath weed_util.FullPath) string {
+ path := strings.Split(string(fullpath), "/")
+ if len(path) > 1 {
+ return path[len(path)-1]
+ }
+ return ""
+}
diff --git a/weed/filer/elastic/v7/elastic_store_kv.go b/weed/filer/elastic/v7/elastic_store_kv.go
new file mode 100644
index 000000000..99c03314e
--- /dev/null
+++ b/weed/filer/elastic/v7/elastic_store_kv.go
@@ -0,0 +1,65 @@
+package elastic
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ jsoniter "github.com/json-iterator/go"
+ elastic "github.com/olivere/elastic/v7"
+)
+
+func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) {
+ deleteResult, err := store.client.Delete().
+ Index(indexKV).
+ Type(indexType).
+ Id(string(key)).
+ Do(ctx)
+ if err == nil {
+ if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
+ return nil
+ }
+ }
+ glog.Errorf("delete key(id:%s) %v.", string(key), err)
+ return fmt.Errorf("delete key %v.", err)
+}
+
+func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+ searchResult, err := store.client.Get().
+ Index(indexKV).
+ Type(indexType).
+ Id(string(key)).
+ Do(ctx)
+ if elastic.IsNotFound(err) {
+ return value, filer.ErrKvNotFound
+ }
+ if searchResult != nil && searchResult.Found {
+ esEntry := &ESKVEntry{}
+ if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil {
+ return esEntry.Value, nil
+ }
+ }
+ glog.Errorf("find key(%s),%v.", string(key), err)
+ return value, filer.ErrKvNotFound
+}
+
+func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ esEntry := &ESKVEntry{value}
+ val, err := jsoniter.Marshal(esEntry)
+ if err != nil {
+ glog.Errorf("insert key(%s) %v.", string(key), err)
+ return fmt.Errorf("insert key %v.", err)
+ }
+ _, err = store.client.Index().
+ Index(indexKV).
+ Type(indexType).
+ Id(string(key)).
+ BodyJson(string(val)).
+ Do(ctx)
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+ return nil
+}
diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go
index 36db4ac01..634fba1eb 100644
--- a/weed/filer/etcd/etcd_store.go
+++ b/weed/filer/etcd/etcd_store.go
@@ -76,12 +76,16 @@ func (store *EtcdStore) RollbackTransaction(ctx context.Context) error {
func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
key := genKey(entry.DirAndName())
- value, err := entry.EncodeAttributesAndChunks()
+ meta, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
- if _, err := store.client.Put(ctx, string(key), string(value)); err != nil {
+ if len(entry.Chunks) > 50 {
+ meta = weed_util.MaybeGzipData(meta)
+ }
+
+ if _, err := store.client.Put(ctx, string(key), string(meta)); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
@@ -107,7 +111,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPa
entry = &filer.Entry{
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value)
+ err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(resp.Kvs[0].Value))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -163,7 +167,7 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_
entry := &filer.Entry{
FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
- if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil {
+ if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(kv.Value)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
diff --git a/weed/filer/etcd/etcd_store_kv.go b/weed/filer/etcd/etcd_store_kv.go
index a803a5834..df252f46c 100644
--- a/weed/filer/etcd/etcd_store_kv.go
+++ b/weed/filer/etcd/etcd_store_kv.go
@@ -19,7 +19,7 @@ func (store *EtcdStore) KvPut(ctx context.Context, key []byte, value []byte) (er
func (store *EtcdStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
- resp, err := store.client.Get(ctx, string(key), nil)
+ resp, err := store.client.Get(ctx, string(key))
if err != nil {
return nil, fmt.Errorf("kv get: %v", err)
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index c45963193..db55eec00 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -226,6 +226,11 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chu
sort.Slice(chunks, func(i, j int) bool {
if chunks[i].Mtime == chunks[j].Mtime {
+ filer_pb.EnsureFid(chunks[i])
+ filer_pb.EnsureFid(chunks[j])
+ if chunks[i].Fid == nil || chunks[j].Fid == nil {
+ return true
+ }
return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey
}
return chunks[i].Mtime < chunks[j].Mtime // keep this to make tests run
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 7a555372f..acbe63486 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -19,6 +19,7 @@ import (
const (
LogFlushInterval = time.Minute
PaginationSize = 1024 * 256
+ FilerStoreId = "filer.store.id"
)
var (
@@ -48,7 +49,6 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters),
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
- Signature: util.RandomInt32(),
}
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
@@ -62,9 +62,16 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption,
func (f *Filer) AggregateFromPeers(self string, filers []string) {
// set peers
- if len(filers) == 0 {
+ found := false
+ for _, peer := range filers {
+ if peer == self {
+ found = true
+ }
+ }
+ if !found {
filers = append(filers, self)
}
+
f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption)
f.MetaAggregator.StartLoopSubscribe(f, self)
@@ -72,6 +79,27 @@ func (f *Filer) AggregateFromPeers(self string, filers []string) {
func (f *Filer) SetStore(store FilerStore) {
f.Store = NewFilerStoreWrapper(store)
+
+ f.setOrLoadFilerStoreSignature(store)
+
+}
+
+func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) {
+ storeIdBytes, err := store.KvGet(context.Background(), []byte(FilerStoreId))
+ if err == ErrKvNotFound || err == nil && len(storeIdBytes) == 0 {
+ f.Signature = util.RandomInt32()
+ storeIdBytes = make([]byte, 4)
+ util.Uint32toBytes(storeIdBytes, uint32(f.Signature))
+ if err = store.KvPut(context.Background(), []byte(FilerStoreId), storeIdBytes); err != nil {
+ glog.Fatalf("set %s=%d : %v", FilerStoreId, f.Signature, err)
+ }
+ glog.V(0).Infof("create %s to %d", FilerStoreId, f.Signature)
+ } else if err == nil && len(storeIdBytes) == 4 {
+ f.Signature = int32(util.BytesToUint32(storeIdBytes))
+ glog.V(0).Infof("existing %s = %d", FilerStoreId, f.Signature)
+ } else {
+ glog.Fatalf("read %v=%v : %v", FilerStoreId, string(storeIdBytes), err)
+ }
}
func (f *Filer) GetStore() (store FilerStore) {
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index e2198bd21..379156321 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -27,7 +27,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
if entry.IsDirectory() {
// delete the folder children, not including the folder itself
var dirChunks []*filer_pb.FileChunk
- dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster)
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster, signatures)
if err != nil {
glog.V(0).Infof("delete directory %s: %v", p, err)
return fmt.Errorf("delete directory %s: %v", p, err)
@@ -53,7 +53,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
return nil
}
-func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool) (chunks []*filer_pb.FileChunk, err error) {
+func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, err error) {
lastFileName := ""
includeLastFile := false
@@ -73,7 +73,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
lastFileName = sub.Name()
var dirChunks []*filer_pb.FileChunk
if sub.IsDirectory() {
- dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false)
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false, nil)
chunks = append(chunks, dirChunks...)
} else {
f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
@@ -95,7 +95,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
}
- f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, nil)
+ f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
return chunks, nil
}
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index e00117382..8719cf5b5 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -30,6 +30,15 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry
if strings.HasPrefix(fullpath, SystemLogDir) {
return
}
+ foundSelf := false
+ for _, sig := range signatures {
+ if sig == f.Signature {
+ foundSelf = true
+ }
+ }
+ if !foundSelf {
+ signatures = append(signatures, f.Signature)
+ }
newParentPath := ""
if newEntry != nil {
@@ -41,7 +50,7 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry
DeleteChunks: deleteChunks,
NewParentPath: newParentPath,
IsFromOtherCluster: isFromOtherCluster,
- Signatures: append(signatures, f.Signature),
+ Signatures: signatures,
}
if notification.Queue != nil {
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index 518212437..d313b7ba3 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -42,11 +42,6 @@ type FilerStore interface {
Shutdown()
}
-type FilerLocalStore interface {
- UpdateOffset(filer string, lastTsNs int64) error
- ReadOffset(filer string) (lastTsNs int64, err error)
-}
-
type FilerStoreWrapper struct {
ActualStore FilerStore
}
diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go
index eccb760a2..4b8dd5ea9 100644
--- a/weed/filer/leveldb/leveldb_store.go
+++ b/weed/filer/leveldb/leveldb_store.go
@@ -78,6 +78,10 @@ func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
+ if len(entry.Chunks) > 50 {
+ value = weed_util.MaybeGzipData(value)
+ }
+
err = store.db.Put(key, value, nil)
if err != nil {
@@ -109,7 +113,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
entry = &filer.Entry{
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks(data)
+ err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData((data)))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -187,7 +191,7 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath we
entry := &filer.Entry{
FullPath: weed_util.NewFullPath(string(fullpath), fileName),
}
- if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
+ if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
diff --git a/weed/filer/leveldb2/leveldb2_local_store.go b/weed/filer/leveldb2/leveldb2_local_store.go
deleted file mode 100644
index faae25c45..000000000
--- a/weed/filer/leveldb2/leveldb2_local_store.go
+++ /dev/null
@@ -1,43 +0,0 @@
-package leveldb
-
-import (
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-var (
- _ = filer.FilerLocalStore(&LevelDB2Store{})
-)
-
-func (store *LevelDB2Store) UpdateOffset(filer string, lastTsNs int64) error {
-
- value := make([]byte, 8)
- util.Uint64toBytes(value, uint64(lastTsNs))
-
- err := store.dbs[0].Put([]byte("meta"+filer), value, nil)
-
- if err != nil {
- return fmt.Errorf("UpdateOffset %s : %v", filer, err)
- }
-
- println("UpdateOffset", filer, "lastTsNs", lastTsNs)
-
- return nil
-}
-
-func (store *LevelDB2Store) ReadOffset(filer string) (lastTsNs int64, err error) {
-
- value, err := store.dbs[0].Get([]byte("meta"+filer), nil)
-
- if err != nil {
- return 0, fmt.Errorf("ReadOffset %s : %v", filer, err)
- }
-
- lastTsNs = int64(util.BytesToUint64(value))
-
- println("ReadOffset", filer, "lastTsNs", lastTsNs)
-
- return
-}
diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go
index 7a2bdac2e..2ad0dd648 100644
--- a/weed/filer/leveldb2/leveldb2_store.go
+++ b/weed/filer/leveldb2/leveldb2_store.go
@@ -85,6 +85,10 @@ func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
+ if len(entry.Chunks) > 50 {
+ value = weed_util.MaybeGzipData(value)
+ }
+
err = store.dbs[partitionId].Put(key, value, nil)
if err != nil {
@@ -117,7 +121,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.Fu
entry = &filer.Entry{
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks(data)
+ err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -199,8 +203,7 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath w
}
// println("list", entry.FullPath, "chunks", len(entry.Chunks))
-
- if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
+ if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 506f03e4c..4918899ff 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -3,6 +3,7 @@ package filer
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
"io"
"sync"
"time"
@@ -45,40 +46,57 @@ func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) {
}
}
-func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer string) {
+func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string) {
+
+ /*
+ Each filer reads the "filer.store.id", which is the store's signature when filer starts.
+
+ When reading from other filers' local meta changes:
+ * if the received change does not contain signature from self, apply the change to current filer store.
+
+ Upon connecting to other filers, need to remember their signature and their offsets.
+
+ */
var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
lastPersistTime := time.Now()
- changesSinceLastPersist := 0
lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano()
- MaxChangeLimit := 100
+ peerSignature, err := ma.readFilerStoreSignature(peer)
+ for err != nil {
+ glog.V(0).Infof("connecting to peer filer %s: %v", peer, err)
+ time.Sleep(1357 * time.Millisecond)
+ peerSignature, err = ma.readFilerStoreSignature(peer)
+ }
- if localStore, ok := f.Store.ActualStore.(FilerLocalStore); ok {
- if self != filer {
+ if peerSignature != f.Signature {
+ if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil {
+ lastTsNs = prevTsNs
+ }
- if prevTsNs, err := localStore.ReadOffset(filer); err == nil {
- lastTsNs = prevTsNs
+ glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
+ var counter int64
+ var synced bool
+ maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
+ if err := Replay(f.Store.ActualStore, event); err != nil {
+ glog.Errorf("failed to reply metadata change from %v: %v", peer, err)
+ return
}
-
- glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs)
- maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
- if err := Replay(f.Store.ActualStore, event); err != nil {
- glog.Errorf("failed to reply metadata change from %v: %v", filer, err)
- return
- }
- changesSinceLastPersist++
- if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) {
- if err := localStore.UpdateOffset(filer, event.TsNs); err == nil {
- lastPersistTime = time.Now()
- changesSinceLastPersist = 0
- } else {
- glog.V(0).Infof("failed to update offset for %v: %v", filer, err)
+ counter++
+ if lastPersistTime.Add(time.Minute).Before(time.Now()) {
+ if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil {
+ if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() {
+ glog.V(0).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0)
+ } else if !synced {
+ synced = true
+ glog.V(0).Infof("synced with %s", peer)
}
+ lastPersistTime = time.Now()
+ counter = 0
+ } else {
+ glog.V(0).Infof("failed to update offset for %v: %v", peer, err)
}
}
- } else {
- glog.V(0).Infof("skipping following self: %v", self)
}
}
@@ -90,7 +108,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
}
dir := event.Directory
// println("received meta change", dir, "size", len(data))
- ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
+ ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, 0)
if maybeReplicateMetadataChange != nil {
maybeReplicateMetadataChange(event)
}
@@ -98,8 +116,10 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
}
for {
- err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
+ err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "filer:" + self,
PathPrefix: "/",
SinceNs: lastTsNs,
@@ -124,8 +144,66 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin
}
})
if err != nil {
- glog.V(0).Infof("subscribing remote %s meta change: %v", filer, err)
+ glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err)
time.Sleep(1733 * time.Millisecond)
}
}
}
+
+func (ma *MetaAggregator) readFilerStoreSignature(peer string) (sig int32, err error) {
+ err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ sig = resp.Signature
+ return nil
+ })
+ return
+}
+
+const (
+ MetaOffsetPrefix = "Meta"
+)
+
+func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32) (lastTsNs int64, err error) {
+
+ key := []byte(MetaOffsetPrefix + "xxxx")
+ util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
+
+ value, err := f.Store.KvGet(context.Background(), key)
+
+ if err == ErrKvNotFound {
+ glog.Warningf("readOffset %s not found", peer)
+ return 0, nil
+ }
+
+ if err != nil {
+ return 0, fmt.Errorf("readOffset %s : %v", peer, err)
+ }
+
+ lastTsNs = int64(util.BytesToUint64(value))
+
+ glog.V(0).Infof("readOffset %s : %d", peer, lastTsNs)
+
+ return
+}
+
+func (ma *MetaAggregator) updateOffset(f *Filer, peer string, peerSignature int32, lastTsNs int64) (err error) {
+
+ key := []byte(MetaOffsetPrefix + "xxxx")
+ util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
+
+ value := make([]byte, 8)
+ util.Uint64toBytes(value, uint64(lastTsNs))
+
+ err = f.Store.KvPut(context.Background(), key, value)
+
+ if err != nil {
+ return fmt.Errorf("updateOffset %s : %v", peer, err)
+ }
+
+ glog.V(4).Infof("updateOffset %s : %d", peer, lastTsNs)
+
+ return
+}
diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go
index 104d1f9e2..b7e855165 100644
--- a/weed/filer/mongodb/mongodb_store.go
+++ b/weed/filer/mongodb/mongodb_store.go
@@ -101,6 +101,10 @@ func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer.Entry)
return fmt.Errorf("encode %s: %s", entry.FullPath, err)
}
+ if len(entry.Chunks) > 50 {
+ meta = util.MaybeGzipData(meta)
+ }
+
c := store.connect.Database(store.database).Collection(store.collectionName)
_, err = c.InsertOne(ctx, Model{
@@ -128,7 +132,7 @@ func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath
var where = bson.M{"directory": dir, "name": name}
err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data)
if err != mongo.ErrNoDocuments && err != nil {
- glog.Error("find %s: %v", fullpath, err)
+ glog.Errorf("find %s: %v", fullpath, err)
return nil, filer_pb.ErrNotFound
}
@@ -140,7 +144,7 @@ func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks(data.Meta)
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -197,7 +201,7 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath ut
entry := &filer.Entry{
FullPath: util.NewFullPath(string(fullpath), data.Name),
}
- if decodeErr := entry.DecodeAttributesAndChunks(data.Meta); decodeErr != nil {
+ if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
diff --git a/weed/filer/mongodb/mongodb_store_kv.go b/weed/filer/mongodb/mongodb_store_kv.go
index 09508e691..4aa9c3a33 100644
--- a/weed/filer/mongodb/mongodb_store_kv.go
+++ b/weed/filer/mongodb/mongodb_store_kv.go
@@ -36,7 +36,7 @@ func (store *MongodbStore) KvGet(ctx context.Context, key []byte) (value []byte,
var where = bson.M{"directory": dir, "name": name}
err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data)
if err != mongo.ErrNoDocuments && err != nil {
- glog.Error("kv get: %v", err)
+ glog.Errorf("kv get: %v", err)
return nil, filer.ErrKvNotFound
}
@@ -54,7 +54,7 @@ func (store *MongodbStore) KvDelete(ctx context.Context, key []byte) (err error)
where := bson.M{"directory": dir, "name": name}
_, err = store.connect.Database(store.database).Collection(store.collectionName).DeleteOne(ctx, where)
if err != nil {
- return fmt.Errorf("kv delete %s : %v", err)
+ return fmt.Errorf("kv delete: %v", err)
}
return nil
diff --git a/weed/filer/mysql/mysql_store.go b/weed/filer/mysql/mysql_store.go
index 708a67cc3..5bc132980 100644
--- a/weed/filer/mysql/mysql_store.go
+++ b/weed/filer/mysql/mysql_store.go
@@ -47,8 +47,8 @@ func (store *MysqlStore) initialize(user, password, hostname string, port int, d
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?"
- store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like CONCAT(?,'%') ORDER BY NAME ASC LIMIT ?"
- store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like CONCAT(?,'%') ORDER BY NAME ASC LIMIT ?"
+ store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?"
+ store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?"
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database)
if interpolateParams {
diff --git a/weed/filer/postgres/postgres_store.go b/weed/filer/postgres/postgres_store.go
index 4544c8416..c41700d17 100644
--- a/weed/filer/postgres/postgres_store.go
+++ b/weed/filer/postgres/postgres_store.go
@@ -46,8 +46,8 @@ func (store *PostgresStore) initialize(user, password, hostname string, port int
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2"
- store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like CONCAT($4,'%')ORDER BY NAME ASC LIMIT $5"
- store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like CONCAT($4,'%') ORDER BY NAME ASC LIMIT $5"
+ store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5"
+ store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5"
sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, sslmode)
if password != "" {
diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go
index cc8819019..0de9924a3 100644
--- a/weed/filer/redis/universal_redis_store.go
+++ b/weed/filer/redis/universal_redis_store.go
@@ -40,6 +40,10 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer.
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
+ if len(entry.Chunks) > 50 {
+ value = util.MaybeGzipData(value)
+ }
+
_, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result()
if err != nil {
@@ -76,7 +80,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.F
entry = &filer.Entry{
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks([]byte(data))
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go
index 9e06ff68f..0374314c0 100644
--- a/weed/filer/redis2/universal_redis_store.go
+++ b/weed/filer/redis2/universal_redis_store.go
@@ -38,6 +38,10 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer
return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
}
+ if len(entry.Chunks) > 50 {
+ value = util.MaybeGzipData(value)
+ }
+
if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
}
@@ -71,7 +75,7 @@ func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.
entry = &filer.Entry{
FullPath: fullpath,
}
- err = entry.DecodeAttributesAndChunks([]byte(data))
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
if err != nil {
return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
}
@@ -81,8 +85,12 @@ func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util.
func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
- _, err = store.Client.Del(string(fullpath)).Result()
+ _, err = store.Client.Del(genDirectoryListKey(string(fullpath))).Result()
+ if err != nil {
+ return fmt.Errorf("delete dir list %s : %v", fullpath, err)
+ }
+ _, err = store.Client.Del(string(fullpath)).Result()
if err != nil {
return fmt.Errorf("delete %s : %v", fullpath, err)
}
@@ -91,7 +99,7 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti
if name != "" {
_, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result()
if err != nil {
- return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
+ return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err)
}
}
@@ -102,14 +110,14 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful
members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result()
if err != nil {
- return fmt.Errorf("delete folder %s : %v", fullpath, err)
+ return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err)
}
for _, fileName := range members {
path := util.NewFullPath(string(fullpath), fileName)
_, err = store.Client.Del(string(path)).Result()
if err != nil {
- return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
+ return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
}
}