diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2020-09-17 10:29:32 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-09-17 10:29:32 +0800 |
| commit | f71f7fcf99ba6d64bfa49fd7411e06bdc2b9d591 (patch) | |
| tree | e0e98625edaab040eed21fd1a8b277f2c0546ad3 /weed/filer | |
| parent | 23baa3c36ce468a36d89abae59f4411cdc446043 (diff) | |
| parent | 5eee4983f36f55a2a01381e8af278b28919dbe90 (diff) | |
| download | seaweedfs-f71f7fcf99ba6d64bfa49fd7411e06bdc2b9d591.tar.xz seaweedfs-f71f7fcf99ba6d64bfa49fd7411e06bdc2b9d591.zip | |
Merge pull request #15 from chrislusf/master
sync
Diffstat (limited to 'weed/filer')
22 files changed, 676 insertions, 121 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go index 368bec973..7c95ffb57 100644 --- a/weed/filer/abstract_sql/abstract_sql_store.go +++ b/weed/filer/abstract_sql/abstract_sql_store.go @@ -67,15 +67,21 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Ent return fmt.Errorf("encode %s: %s", entry.FullPath, err) } + if len(entry.Chunks) > 50 { + meta = util.MaybeGzipData(meta) + } + res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta) - if err != nil { - if !strings.Contains(strings.ToLower(err.Error()), "duplicate") { - return fmt.Errorf("kv insert: %s", err) - } + if err == nil { + return + } + + if !strings.Contains(strings.ToLower(err.Error()), "duplicate") { + return fmt.Errorf("kv insert: %s", err) } // now the insert failed possibly due to duplication constraints - glog.V(1).Infof("insert %s falls back to update: %s", entry.FullPath, err) + glog.V(1).Infof("insert %s falls back to update: %v", entry.FullPath, err) res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir) if err != nil { @@ -126,7 +132,7 @@ func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.Full entry := &filer.Entry{ FullPath: fullpath, } - if err := entry.DecodeAttributesAndChunks(data); err != nil { + if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -171,7 +177,7 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, sqlText = store.SqlListInclusive } - rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), prefix, limit) + rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), prefix+"%", limit) if err != nil { return nil, fmt.Errorf("list %s : %v", fullpath, err) } @@ -188,7 +194,7 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, entry := &filer.Entry{ FullPath: util.NewFullPath(string(fullpath), name), } - if err = entry.DecodeAttributesAndChunks(data); err != nil { + if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil { glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err) return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err) } diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go index fd161b1f1..250db629a 100644 --- a/weed/filer/cassandra/cassandra_store.go +++ b/weed/filer/cassandra/cassandra_store.go @@ -60,6 +60,10 @@ func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry return fmt.Errorf("encode %s: %s", entry.FullPath, err) } + if len(entry.Chunks) > 50 { + meta = util.MaybeGzipData(meta) + } + if err := store.session.Query( "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ", dir, name, meta, entry.TtlSec).Exec(); err != nil { @@ -93,7 +97,7 @@ func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPa entry = &filer.Entry{ FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks(data) + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -144,7 +148,7 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath entry := &filer.Entry{ FullPath: util.NewFullPath(string(fullpath), name), } - if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil { + if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil { err = decodeErr glog.V(0).Infof("list %s : %v", entry.FullPath, err) break diff --git a/weed/filer/cassandra/cassandra_store_kv.go b/weed/filer/cassandra/cassandra_store_kv.go index f7668746f..b6238cf0e 100644 --- a/weed/filer/cassandra/cassandra_store_kv.go +++ b/weed/filer/cassandra/cassandra_store_kv.go @@ -2,17 +2,60 @@ package cassandra import ( "context" + "fmt" "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/gocql/gocql" ) func (store *CassandraStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { - return filer.ErrKvNotImplemented + dir, name := genDirAndName(key) + + if err := store.session.Query( + "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ", + dir, name, value, 0).Exec(); err != nil { + return fmt.Errorf("kv insert: %s", err) + } + + return nil } -func (store *CassandraStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { - return nil, filer.ErrKvNotImplemented +func (store *CassandraStore) KvGet(ctx context.Context, key []byte) (data []byte, err error) { + dir, name := genDirAndName(key) + + if err := store.session.Query( + "SELECT meta FROM filemeta WHERE directory=? AND name=?", + dir, name).Consistency(gocql.One).Scan(&data); err != nil { + if err != gocql.ErrNotFound { + return nil, filer.ErrKvNotFound + } + } + + if len(data) == 0 { + return nil, filer.ErrKvNotFound + } + + return data, nil } func (store *CassandraStore) KvDelete(ctx context.Context, key []byte) (err error) { - return filer.ErrKvNotImplemented + dir, name := genDirAndName(key) + + if err := store.session.Query( + "DELETE FROM filemeta WHERE directory=? AND name=?", + dir, name).Exec(); err != nil { + return fmt.Errorf("kv delete: %v", err) + } + + return nil +} + +func genDirAndName(key []byte) (dir string, name string) { + for len(key) < 8 { + key = append(key, 0) + } + + dir = string(key[:8]) + name = string(key[8:]) + + return } diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go new file mode 100644 index 000000000..ec88e10a5 --- /dev/null +++ b/weed/filer/elastic/v7/elastic_store.go @@ -0,0 +1,338 @@ +package elastic + +import ( + "context" + "fmt" + "math" + "strings" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + weed_util "github.com/chrislusf/seaweedfs/weed/util" + jsoniter "github.com/json-iterator/go" + elastic "github.com/olivere/elastic/v7" +) + +var ( + indexType = "_doc" + indexPrefix = ".seaweedfs_" + indexKV = ".seaweedfs_kv_entries" + kvMappings = ` { + "mappings": { + "enabled": false, + "properties": { + "Value":{ + "type": "binary" + } + } + } + }` +) + +type ESEntry struct { + ParentId string `json:"ParentId"` + Entry *filer.Entry +} + +type ESKVEntry struct { + Value []byte `json:"Value"` +} + +func init() { + filer.Stores = append(filer.Stores, &ElasticStore{}) +} + +type ElasticStore struct { + client *elastic.Client + maxPageSize int +} + +func (store *ElasticStore) GetName() string { + return "elastic7" +} + +func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) { + options := []elastic.ClientOptionFunc{} + servers := configuration.GetStringSlice(prefix + "servers") + options = append(options, elastic.SetURL(servers...)) + username := configuration.GetString(prefix + "username") + password := configuration.GetString(prefix + "password") + if username != "" && password != "" { + options = append(options, elastic.SetBasicAuth(username, password)) + } + options = append(options, elastic.SetSniff(configuration.GetBool(prefix+"sniff_enabled"))) + options = append(options, elastic.SetHealthcheck(configuration.GetBool(prefix+"healthcheck_enabled"))) + store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window") + if store.maxPageSize <= 0 { + store.maxPageSize = 10000 + } + glog.Infof("filer store elastic endpoints: %v.", servers) + return store.initialize(options) +} + +func (store *ElasticStore) initialize(options []elastic.ClientOptionFunc) (err error) { + ctx := context.Background() + store.client, err = elastic.NewClient(options...) + if err != nil { + return fmt.Errorf("init elastic %v.", err) + } + if ok, err := store.client.IndexExists(indexKV).Do(ctx); err == nil && !ok { + _, err = store.client.CreateIndex(indexKV).Body(kvMappings).Do(ctx) + if err != nil { + return fmt.Errorf("create index(%s) %v.", indexKV, err) + } + } + return nil +} + +func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (store *ElasticStore) CommitTransaction(ctx context.Context) error { + return nil +} +func (store *ElasticStore) RollbackTransaction(ctx context.Context) error { + return nil +} + +func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { + return nil, filer.ErrUnsupportedListDirectoryPrefixed +} + +func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { + index := getIndex(entry.FullPath) + dir, _ := entry.FullPath.DirAndName() + id := weed_util.Md5String([]byte(entry.FullPath)) + esEntry := &ESEntry{ + ParentId: weed_util.Md5String([]byte(dir)), + Entry: entry, + } + value, err := jsoniter.Marshal(esEntry) + if err != nil { + glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err) + return fmt.Errorf("insert entry %v.", err) + } + _, err = store.client.Index(). + Index(index). + Type(indexType). + Id(id). + BodyJson(string(value)). + Do(ctx) + if err != nil { + glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err) + return fmt.Errorf("insert entry %v.", err) + } + return nil +} + +func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) { + return store.InsertEntry(ctx, entry) +} + +func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) { + index := getIndex(fullpath) + id := weed_util.Md5String([]byte(fullpath)) + searchResult, err := store.client.Get(). + Index(index). + Type(indexType). + Id(id). + Do(ctx) + if elastic.IsNotFound(err) { + return nil, filer_pb.ErrNotFound + } + if searchResult != nil && searchResult.Found { + esEntry := &ESEntry{ + ParentId: "", + Entry: &filer.Entry{}, + } + err := jsoniter.Unmarshal(searchResult.Source, esEntry) + return esEntry.Entry, err + } + glog.Errorf("find entry(%s),%v.", string(fullpath), err) + return nil, filer_pb.ErrNotFound +} + +func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) { + index := getIndex(fullpath) + id := weed_util.Md5String([]byte(fullpath)) + if strings.Count(string(fullpath), "/") == 1 { + return store.deleteIndex(ctx, index) + } + return store.deleteEntry(ctx, index, id) +} + +func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) { + deleteResult, err := store.client.DeleteIndex(index).Do(ctx) + if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) { + return nil + } + glog.Errorf("delete index(%s) %v.", index, err) + return err +} + +func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) { + deleteResult, err := store.client.Delete(). + Index(index). + Type(indexType). + Id(id). + Do(ctx) + if err == nil { + if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" { + return nil + } + } + glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err) + return fmt.Errorf("delete entry %v.", err) +} + +func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { + if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil { + for _, entry := range entries { + store.DeleteEntry(ctx, entry.FullPath) + } + } + return nil +} + +func (store *ElasticStore) ListDirectoryEntries( + ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, +) (entries []*filer.Entry, err error) { + if string(fullpath) == "/" { + return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit) + } + return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit) +} + +func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) { + indexResult, err := store.client.CatIndices().Do(ctx) + if err != nil { + glog.Errorf("list indices %v.", err) + return entries, err + } + for _, index := range indexResult { + if index.Index == indexKV { + continue + } + if strings.HasPrefix(index.Index, indexPrefix) { + if entry, err := store.FindEntry(ctx, + weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil { + fileName := getFileName(entry.FullPath) + if fileName == startFileName && !inclusive { + continue + } + limit-- + if limit < 0 { + break + } + entries = append(entries, entry) + } + } + } + return entries, nil +} + +func (store *ElasticStore) listDirectoryEntries( + ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, +) (entries []*filer.Entry, err error) { + first := true + index := getIndex(fullpath) + nextStart := "" + parentId := weed_util.Md5String([]byte(fullpath)) + if _, err := store.client.Refresh(index).Do(ctx); err != nil { + if elastic.IsNotFound(err) { + store.client.CreateIndex(index).Do(ctx) + return entries, nil + } + } + for { + result := &elastic.SearchResult{} + if (startFileName == "" && first) || inclusive { + if result, err = store.search(ctx, index, parentId); err != nil { + glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) + return entries, err + } + } else { + fullPath := string(fullpath) + "/" + startFileName + if !first { + fullPath = nextStart + } + after := weed_util.Md5String([]byte(fullPath)) + if result, err = store.searchAfter(ctx, index, parentId, after); err != nil { + glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) + return entries, err + } + } + first = false + for _, hit := range result.Hits.Hits { + esEntry := &ESEntry{ + ParentId: "", + Entry: &filer.Entry{}, + } + if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil { + limit-- + if limit < 0 { + return entries, nil + } + nextStart = string(esEntry.Entry.FullPath) + fileName := getFileName(esEntry.Entry.FullPath) + if fileName == startFileName && !inclusive { + continue + } + entries = append(entries, esEntry.Entry) + } + } + if len(result.Hits.Hits) < store.maxPageSize { + break + } + } + return entries, nil +} + +func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) { + if count, err := store.client.Count(index).Do(ctx); err == nil && count == 0 { + return &elastic.SearchResult{ + Hits: &elastic.SearchHits{ + Hits: make([]*elastic.SearchHit, 0)}, + }, nil + } + queryResult, err := store.client.Search(). + Index(index). + Query(elastic.NewMatchQuery("ParentId", parentId)). + Size(store.maxPageSize). + Sort("_id", false). + Do(ctx) + return queryResult, err +} + +func (store *ElasticStore) searchAfter(ctx context.Context, index, parentId, after string) (result *elastic.SearchResult, err error) { + queryResult, err := store.client.Search(). + Index(index). + Query(elastic.NewMatchQuery("ParentId", parentId)). + SearchAfter(after). + Size(store.maxPageSize). + Sort("_id", false). + Do(ctx) + return queryResult, err + +} + +func (store *ElasticStore) Shutdown() { + store.client.Stop() +} + +func getIndex(fullpath weed_util.FullPath) string { + path := strings.Split(string(fullpath), "/") + if len(path) > 1 { + return indexPrefix + path[1] + } + return "" +} + +func getFileName(fullpath weed_util.FullPath) string { + path := strings.Split(string(fullpath), "/") + if len(path) > 1 { + return path[len(path)-1] + } + return "" +} diff --git a/weed/filer/elastic/v7/elastic_store_kv.go b/weed/filer/elastic/v7/elastic_store_kv.go new file mode 100644 index 000000000..99c03314e --- /dev/null +++ b/weed/filer/elastic/v7/elastic_store_kv.go @@ -0,0 +1,65 @@ +package elastic + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer" + + "github.com/chrislusf/seaweedfs/weed/glog" + jsoniter "github.com/json-iterator/go" + elastic "github.com/olivere/elastic/v7" +) + +func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) { + deleteResult, err := store.client.Delete(). + Index(indexKV). + Type(indexType). + Id(string(key)). + Do(ctx) + if err == nil { + if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" { + return nil + } + } + glog.Errorf("delete key(id:%s) %v.", string(key), err) + return fmt.Errorf("delete key %v.", err) +} + +func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { + searchResult, err := store.client.Get(). + Index(indexKV). + Type(indexType). + Id(string(key)). + Do(ctx) + if elastic.IsNotFound(err) { + return value, filer.ErrKvNotFound + } + if searchResult != nil && searchResult.Found { + esEntry := &ESKVEntry{} + if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil { + return esEntry.Value, nil + } + } + glog.Errorf("find key(%s),%v.", string(key), err) + return value, filer.ErrKvNotFound +} + +func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) { + esEntry := &ESKVEntry{value} + val, err := jsoniter.Marshal(esEntry) + if err != nil { + glog.Errorf("insert key(%s) %v.", string(key), err) + return fmt.Errorf("insert key %v.", err) + } + _, err = store.client.Index(). + Index(indexKV). + Type(indexType). + Id(string(key)). + BodyJson(string(val)). + Do(ctx) + if err != nil { + return fmt.Errorf("kv put: %v", err) + } + return nil +} diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go index 36db4ac01..634fba1eb 100644 --- a/weed/filer/etcd/etcd_store.go +++ b/weed/filer/etcd/etcd_store.go @@ -76,12 +76,16 @@ func (store *EtcdStore) RollbackTransaction(ctx context.Context) error { func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { key := genKey(entry.DirAndName()) - value, err := entry.EncodeAttributesAndChunks() + meta, err := entry.EncodeAttributesAndChunks() if err != nil { return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } - if _, err := store.client.Put(ctx, string(key), string(value)); err != nil { + if len(entry.Chunks) > 50 { + meta = weed_util.MaybeGzipData(meta) + } + + if _, err := store.client.Put(ctx, string(key), string(meta)); err != nil { return fmt.Errorf("persisting %s : %v", entry.FullPath, err) } @@ -107,7 +111,7 @@ func (store *EtcdStore) FindEntry(ctx context.Context, fullpath weed_util.FullPa entry = &filer.Entry{ FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value) + err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(resp.Kvs[0].Value)) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -163,7 +167,7 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, fullpath weed_ entry := &filer.Entry{ FullPath: weed_util.NewFullPath(string(fullpath), fileName), } - if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil { + if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(kv.Value)); decodeErr != nil { err = decodeErr glog.V(0).Infof("list %s : %v", entry.FullPath, err) break diff --git a/weed/filer/etcd/etcd_store_kv.go b/weed/filer/etcd/etcd_store_kv.go index a803a5834..df252f46c 100644 --- a/weed/filer/etcd/etcd_store_kv.go +++ b/weed/filer/etcd/etcd_store_kv.go @@ -19,7 +19,7 @@ func (store *EtcdStore) KvPut(ctx context.Context, key []byte, value []byte) (er func (store *EtcdStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) { - resp, err := store.client.Get(ctx, string(key), nil) + resp, err := store.client.Get(ctx, string(key)) if err != nil { return nil, fmt.Errorf("kv get: %v", err) diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index c45963193..db55eec00 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -226,6 +226,11 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chu sort.Slice(chunks, func(i, j int) bool { if chunks[i].Mtime == chunks[j].Mtime { + filer_pb.EnsureFid(chunks[i]) + filer_pb.EnsureFid(chunks[j]) + if chunks[i].Fid == nil || chunks[j].Fid == nil { + return true + } return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey } return chunks[i].Mtime < chunks[j].Mtime // keep this to make tests run diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 7a555372f..acbe63486 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -19,6 +19,7 @@ import ( const ( LogFlushInterval = time.Minute PaginationSize = 1024 * 256 + FilerStoreId = "filer.store.id" ) var ( @@ -48,7 +49,6 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters), fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, - Signature: util.RandomInt32(), } f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection @@ -62,9 +62,16 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, func (f *Filer) AggregateFromPeers(self string, filers []string) { // set peers - if len(filers) == 0 { + found := false + for _, peer := range filers { + if peer == self { + found = true + } + } + if !found { filers = append(filers, self) } + f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption) f.MetaAggregator.StartLoopSubscribe(f, self) @@ -72,6 +79,27 @@ func (f *Filer) AggregateFromPeers(self string, filers []string) { func (f *Filer) SetStore(store FilerStore) { f.Store = NewFilerStoreWrapper(store) + + f.setOrLoadFilerStoreSignature(store) + +} + +func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) { + storeIdBytes, err := store.KvGet(context.Background(), []byte(FilerStoreId)) + if err == ErrKvNotFound || err == nil && len(storeIdBytes) == 0 { + f.Signature = util.RandomInt32() + storeIdBytes = make([]byte, 4) + util.Uint32toBytes(storeIdBytes, uint32(f.Signature)) + if err = store.KvPut(context.Background(), []byte(FilerStoreId), storeIdBytes); err != nil { + glog.Fatalf("set %s=%d : %v", FilerStoreId, f.Signature, err) + } + glog.V(0).Infof("create %s to %d", FilerStoreId, f.Signature) + } else if err == nil && len(storeIdBytes) == 4 { + f.Signature = int32(util.BytesToUint32(storeIdBytes)) + glog.V(0).Infof("existing %s = %d", FilerStoreId, f.Signature) + } else { + glog.Fatalf("read %v=%v : %v", FilerStoreId, string(storeIdBytes), err) + } } func (f *Filer) GetStore() (store FilerStore) { diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index e2198bd21..379156321 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -27,7 +27,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR if entry.IsDirectory() { // delete the folder children, not including the folder itself var dirChunks []*filer_pb.FileChunk - dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster) + dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster, signatures) if err != nil { glog.V(0).Infof("delete directory %s: %v", p, err) return fmt.Errorf("delete directory %s: %v", p, err) @@ -53,7 +53,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR return nil } -func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool) (chunks []*filer_pb.FileChunk, err error) { +func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, err error) { lastFileName := "" includeLastFile := false @@ -73,7 +73,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry lastFileName = sub.Name() var dirChunks []*filer_pb.FileChunk if sub.IsDirectory() { - dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false) + dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false, nil) chunks = append(chunks, dirChunks...) } else { f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil) @@ -95,7 +95,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) } - f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, nil) + f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures) return chunks, nil } diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index e00117382..8719cf5b5 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -30,6 +30,15 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry if strings.HasPrefix(fullpath, SystemLogDir) { return } + foundSelf := false + for _, sig := range signatures { + if sig == f.Signature { + foundSelf = true + } + } + if !foundSelf { + signatures = append(signatures, f.Signature) + } newParentPath := "" if newEntry != nil { @@ -41,7 +50,7 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry DeleteChunks: deleteChunks, NewParentPath: newParentPath, IsFromOtherCluster: isFromOtherCluster, - Signatures: append(signatures, f.Signature), + Signatures: signatures, } if notification.Queue != nil { diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index 518212437..d313b7ba3 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -42,11 +42,6 @@ type FilerStore interface { Shutdown() } -type FilerLocalStore interface { - UpdateOffset(filer string, lastTsNs int64) error - ReadOffset(filer string) (lastTsNs int64, err error) -} - type FilerStoreWrapper struct { ActualStore FilerStore } diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go index eccb760a2..4b8dd5ea9 100644 --- a/weed/filer/leveldb/leveldb_store.go +++ b/weed/filer/leveldb/leveldb_store.go @@ -78,6 +78,10 @@ func (store *LevelDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } + if len(entry.Chunks) > 50 { + value = weed_util.MaybeGzipData(value) + } + err = store.db.Put(key, value, nil) if err != nil { @@ -109,7 +113,7 @@ func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.Ful entry = &filer.Entry{ FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks(data) + err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData((data))) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -187,7 +191,7 @@ func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath we entry := &filer.Entry{ FullPath: weed_util.NewFullPath(string(fullpath), fileName), } - if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { + if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil { err = decodeErr glog.V(0).Infof("list %s : %v", entry.FullPath, err) break diff --git a/weed/filer/leveldb2/leveldb2_local_store.go b/weed/filer/leveldb2/leveldb2_local_store.go deleted file mode 100644 index faae25c45..000000000 --- a/weed/filer/leveldb2/leveldb2_local_store.go +++ /dev/null @@ -1,43 +0,0 @@ -package leveldb - -import ( - "fmt" - - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - _ = filer.FilerLocalStore(&LevelDB2Store{}) -) - -func (store *LevelDB2Store) UpdateOffset(filer string, lastTsNs int64) error { - - value := make([]byte, 8) - util.Uint64toBytes(value, uint64(lastTsNs)) - - err := store.dbs[0].Put([]byte("meta"+filer), value, nil) - - if err != nil { - return fmt.Errorf("UpdateOffset %s : %v", filer, err) - } - - println("UpdateOffset", filer, "lastTsNs", lastTsNs) - - return nil -} - -func (store *LevelDB2Store) ReadOffset(filer string) (lastTsNs int64, err error) { - - value, err := store.dbs[0].Get([]byte("meta"+filer), nil) - - if err != nil { - return 0, fmt.Errorf("ReadOffset %s : %v", filer, err) - } - - lastTsNs = int64(util.BytesToUint64(value)) - - println("ReadOffset", filer, "lastTsNs", lastTsNs) - - return -} diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go index 7a2bdac2e..2ad0dd648 100644 --- a/weed/filer/leveldb2/leveldb2_store.go +++ b/weed/filer/leveldb2/leveldb2_store.go @@ -85,6 +85,10 @@ func (store *LevelDB2Store) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } + if len(entry.Chunks) > 50 { + value = weed_util.MaybeGzipData(value) + } + err = store.dbs[partitionId].Put(key, value, nil) if err != nil { @@ -117,7 +121,7 @@ func (store *LevelDB2Store) FindEntry(ctx context.Context, fullpath weed_util.Fu entry = &filer.Entry{ FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks(data) + err = entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(data)) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -199,8 +203,7 @@ func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath w } // println("list", entry.FullPath, "chunks", len(entry.Chunks)) - - if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil { + if decodeErr := entry.DecodeAttributesAndChunks(weed_util.MaybeDecompressData(iter.Value())); decodeErr != nil { err = decodeErr glog.V(0).Infof("list %s : %v", entry.FullPath, err) break diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 506f03e4c..4918899ff 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/util" "io" "sync" "time" @@ -45,40 +46,57 @@ func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) { } } -func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer string) { +func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string) { + + /* + Each filer reads the "filer.store.id", which is the store's signature when filer starts. + + When reading from other filers' local meta changes: + * if the received change does not contain signature from self, apply the change to current filer store. + + Upon connecting to other filers, need to remember their signature and their offsets. + + */ var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse) lastPersistTime := time.Now() - changesSinceLastPersist := 0 lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano() - MaxChangeLimit := 100 + peerSignature, err := ma.readFilerStoreSignature(peer) + for err != nil { + glog.V(0).Infof("connecting to peer filer %s: %v", peer, err) + time.Sleep(1357 * time.Millisecond) + peerSignature, err = ma.readFilerStoreSignature(peer) + } - if localStore, ok := f.Store.ActualStore.(FilerLocalStore); ok { - if self != filer { + if peerSignature != f.Signature { + if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil { + lastTsNs = prevTsNs + } - if prevTsNs, err := localStore.ReadOffset(filer); err == nil { - lastTsNs = prevTsNs + glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs) + var counter int64 + var synced bool + maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { + if err := Replay(f.Store.ActualStore, event); err != nil { + glog.Errorf("failed to reply metadata change from %v: %v", peer, err) + return } - - glog.V(0).Infof("follow filer: %v, last %v (%d)", filer, time.Unix(0, lastTsNs), lastTsNs) - maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) { - if err := Replay(f.Store.ActualStore, event); err != nil { - glog.Errorf("failed to reply metadata change from %v: %v", filer, err) - return - } - changesSinceLastPersist++ - if changesSinceLastPersist >= MaxChangeLimit || lastPersistTime.Add(time.Minute).Before(time.Now()) { - if err := localStore.UpdateOffset(filer, event.TsNs); err == nil { - lastPersistTime = time.Now() - changesSinceLastPersist = 0 - } else { - glog.V(0).Infof("failed to update offset for %v: %v", filer, err) + counter++ + if lastPersistTime.Add(time.Minute).Before(time.Now()) { + if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil { + if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() { + glog.V(0).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0) + } else if !synced { + synced = true + glog.V(0).Infof("synced with %s", peer) } + lastPersistTime = time.Now() + counter = 0 + } else { + glog.V(0).Infof("failed to update offset for %v: %v", peer, err) } } - } else { - glog.V(0).Infof("skipping following self: %v", self) } } @@ -90,7 +108,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin } dir := event.Directory // println("received meta change", dir, "size", len(data)) - ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs) + ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, 0) if maybeReplicateMetadataChange != nil { maybeReplicateMetadataChange(event) } @@ -98,8 +116,10 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin } for { - err := pb.WithFilerClient(filer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ + err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ ClientName: "filer:" + self, PathPrefix: "/", SinceNs: lastTsNs, @@ -124,8 +144,66 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin } }) if err != nil { - glog.V(0).Infof("subscribing remote %s meta change: %v", filer, err) + glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err) time.Sleep(1733 * time.Millisecond) } } } + +func (ma *MetaAggregator) readFilerStoreSignature(peer string) (sig int32, err error) { + err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return err + } + sig = resp.Signature + return nil + }) + return +} + +const ( + MetaOffsetPrefix = "Meta" +) + +func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32) (lastTsNs int64, err error) { + + key := []byte(MetaOffsetPrefix + "xxxx") + util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) + + value, err := f.Store.KvGet(context.Background(), key) + + if err == ErrKvNotFound { + glog.Warningf("readOffset %s not found", peer) + return 0, nil + } + + if err != nil { + return 0, fmt.Errorf("readOffset %s : %v", peer, err) + } + + lastTsNs = int64(util.BytesToUint64(value)) + + glog.V(0).Infof("readOffset %s : %d", peer, lastTsNs) + + return +} + +func (ma *MetaAggregator) updateOffset(f *Filer, peer string, peerSignature int32, lastTsNs int64) (err error) { + + key := []byte(MetaOffsetPrefix + "xxxx") + util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) + + value := make([]byte, 8) + util.Uint64toBytes(value, uint64(lastTsNs)) + + err = f.Store.KvPut(context.Background(), key, value) + + if err != nil { + return fmt.Errorf("updateOffset %s : %v", peer, err) + } + + glog.V(4).Infof("updateOffset %s : %d", peer, lastTsNs) + + return +} diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go index 104d1f9e2..b7e855165 100644 --- a/weed/filer/mongodb/mongodb_store.go +++ b/weed/filer/mongodb/mongodb_store.go @@ -101,6 +101,10 @@ func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) return fmt.Errorf("encode %s: %s", entry.FullPath, err) } + if len(entry.Chunks) > 50 { + meta = util.MaybeGzipData(meta) + } + c := store.connect.Database(store.database).Collection(store.collectionName) _, err = c.InsertOne(ctx, Model{ @@ -128,7 +132,7 @@ func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath var where = bson.M{"directory": dir, "name": name} err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data) if err != mongo.ErrNoDocuments && err != nil { - glog.Error("find %s: %v", fullpath, err) + glog.Errorf("find %s: %v", fullpath, err) return nil, filer_pb.ErrNotFound } @@ -140,7 +144,7 @@ func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks(data.Meta) + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta)) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -197,7 +201,7 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, fullpath ut entry := &filer.Entry{ FullPath: util.NewFullPath(string(fullpath), data.Name), } - if decodeErr := entry.DecodeAttributesAndChunks(data.Meta); decodeErr != nil { + if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta)); decodeErr != nil { err = decodeErr glog.V(0).Infof("list %s : %v", entry.FullPath, err) break diff --git a/weed/filer/mongodb/mongodb_store_kv.go b/weed/filer/mongodb/mongodb_store_kv.go index 09508e691..4aa9c3a33 100644 --- a/weed/filer/mongodb/mongodb_store_kv.go +++ b/weed/filer/mongodb/mongodb_store_kv.go @@ -36,7 +36,7 @@ func (store *MongodbStore) KvGet(ctx context.Context, key []byte) (value []byte, var where = bson.M{"directory": dir, "name": name} err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data) if err != mongo.ErrNoDocuments && err != nil { - glog.Error("kv get: %v", err) + glog.Errorf("kv get: %v", err) return nil, filer.ErrKvNotFound } @@ -54,7 +54,7 @@ func (store *MongodbStore) KvDelete(ctx context.Context, key []byte) (err error) where := bson.M{"directory": dir, "name": name} _, err = store.connect.Database(store.database).Collection(store.collectionName).DeleteOne(ctx, where) if err != nil { - return fmt.Errorf("kv delete %s : %v", err) + return fmt.Errorf("kv delete: %v", err) } return nil diff --git a/weed/filer/mysql/mysql_store.go b/weed/filer/mysql/mysql_store.go index 708a67cc3..5bc132980 100644 --- a/weed/filer/mysql/mysql_store.go +++ b/weed/filer/mysql/mysql_store.go @@ -47,8 +47,8 @@ func (store *MysqlStore) initialize(user, password, hostname string, port int, d store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?" store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?" store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?" - store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like CONCAT(?,'%') ORDER BY NAME ASC LIMIT ?" - store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like CONCAT(?,'%') ORDER BY NAME ASC LIMIT ?" + store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?" + store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? AND name like ? ORDER BY NAME ASC LIMIT ?" sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, user, password, hostname, port, database) if interpolateParams { diff --git a/weed/filer/postgres/postgres_store.go b/weed/filer/postgres/postgres_store.go index 4544c8416..c41700d17 100644 --- a/weed/filer/postgres/postgres_store.go +++ b/weed/filer/postgres/postgres_store.go @@ -46,8 +46,8 @@ func (store *PostgresStore) initialize(user, password, hostname string, port int store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3" store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2" - store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like CONCAT($4,'%')ORDER BY NAME ASC LIMIT $5" - store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like CONCAT($4,'%') ORDER BY NAME ASC LIMIT $5" + store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5" + store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 AND name like $4 ORDER BY NAME ASC LIMIT $5" sqlUrl := fmt.Sprintf(CONNECTION_URL_PATTERN, hostname, port, user, sslmode) if password != "" { diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go index cc8819019..0de9924a3 100644 --- a/weed/filer/redis/universal_redis_store.go +++ b/weed/filer/redis/universal_redis_store.go @@ -40,6 +40,10 @@ func (store *UniversalRedisStore) InsertEntry(ctx context.Context, entry *filer. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } + if len(entry.Chunks) > 50 { + value = util.MaybeGzipData(value) + } + _, err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Result() if err != nil { @@ -76,7 +80,7 @@ func (store *UniversalRedisStore) FindEntry(ctx context.Context, fullpath util.F entry = &filer.Entry{ FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks([]byte(data)) + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go index 9e06ff68f..0374314c0 100644 --- a/weed/filer/redis2/universal_redis_store.go +++ b/weed/filer/redis2/universal_redis_store.go @@ -38,6 +38,10 @@ func (store *UniversalRedis2Store) InsertEntry(ctx context.Context, entry *filer return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err) } + if len(entry.Chunks) > 50 { + value = util.MaybeGzipData(value) + } + if err = store.Client.Set(string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil { return fmt.Errorf("persisting %s : %v", entry.FullPath, err) } @@ -71,7 +75,7 @@ func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util. entry = &filer.Entry{ FullPath: fullpath, } - err = entry.DecodeAttributesAndChunks([]byte(data)) + err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data))) if err != nil { return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err) } @@ -81,8 +85,12 @@ func (store *UniversalRedis2Store) FindEntry(ctx context.Context, fullpath util. func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) { - _, err = store.Client.Del(string(fullpath)).Result() + _, err = store.Client.Del(genDirectoryListKey(string(fullpath))).Result() + if err != nil { + return fmt.Errorf("delete dir list %s : %v", fullpath, err) + } + _, err = store.Client.Del(string(fullpath)).Result() if err != nil { return fmt.Errorf("delete %s : %v", fullpath, err) } @@ -91,7 +99,7 @@ func (store *UniversalRedis2Store) DeleteEntry(ctx context.Context, fullpath uti if name != "" { _, err = store.Client.ZRem(genDirectoryListKey(dir), name).Result() if err != nil { - return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) + return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err) } } @@ -102,14 +110,14 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful members, err := store.Client.ZRange(genDirectoryListKey(string(fullpath)), 0, -1).Result() if err != nil { - return fmt.Errorf("delete folder %s : %v", fullpath, err) + return fmt.Errorf("DeleteFolderChildren %s : %v", fullpath, err) } for _, fileName := range members { path := util.NewFullPath(string(fullpath), fileName) _, err = store.Client.Del(string(path)).Result() if err != nil { - return fmt.Errorf("delete %s in parent dir: %v", fullpath, err) + return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err) } } |
