aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorruitao.liu <ruitao.liu@cloudminds.com>2020-09-04 17:34:26 +0800
committerruitao.liu <ruitao.liu@cloudminds.com>2020-09-04 17:34:26 +0800
commit1384ff9a2f521ddc17ae6ef41d9e4cd8237565f4 (patch)
treec10282037fb3d18667dee8825b3344a5390a46cc
parent83080b5e034bdbc0ba58eb410d04fb78bebf08cf (diff)
downloadseaweedfs-1384ff9a2f521ddc17ae6ef41d9e4cd8237565f4.tar.xz
seaweedfs-1384ff9a2f521ddc17ae6ef41d9e4cd8237565f4.zip
1.split kv in one file.
2.disable query for kv in es index.
-rw-r--r--weed/filer/elastic/v7/elastic_store.go122
-rw-r--r--weed/filer/elastic/v7/elastic_store_kv.go64
2 files changed, 100 insertions, 86 deletions
diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go
index d263b5dae..6f00c8768 100644
--- a/weed/filer/elastic/v7/elastic_store.go
+++ b/weed/filer/elastic/v7/elastic_store.go
@@ -16,9 +16,14 @@ import (
)
var (
- indexType = "_doc"
- indexPrefix = ".seaweedfs_"
- indexKV = ".seaweedfs_kv_entries"
+ indexType = "_doc"
+ indexPrefix = ".seaweedfs_"
+ indexKV = ".seaweedfs_kv_entries"
+ mappingWithoutQuery = ` {
+ "mappings": {
+ "enabled": false
+ }
+}`
)
type ESEntry struct {
@@ -26,6 +31,10 @@ type ESEntry struct {
Entry *filer.Entry
}
+type ESKVEntry struct {
+ Value string `json:Value`
+}
+
func init() {
filer.Stores = append(filer.Stores, &ElasticStore{})
}
@@ -35,11 +44,6 @@ type ElasticStore struct {
maxPageSize int
}
-type ESKVEntry struct {
- Key string `json:Key`
- Value string `json:Value`
-}
-
func (store *ElasticStore) GetName() string {
return "elastic7"
}
@@ -61,6 +65,12 @@ func (store *ElasticStore) Initialize(configuration weed_util.Configuration, pre
if err != nil {
return fmt.Errorf("init elastic %s: %v.", servers, err)
}
+ if ok, err := store.client.IndexExists(indexKV).Do(context.Background()); err == nil && !ok {
+ _, err = store.client.CreateIndex(indexKV).Body(mappingWithoutQuery).Do(context.Background())
+ if err != nil {
+ return fmt.Errorf("create index(%s) %v.", indexKV, err)
+ }
+ }
return nil
}
func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {
@@ -72,66 +82,6 @@ func (store *ElasticStore) CommitTransaction(ctx context.Context) error {
func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
return nil
}
-
-func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) {
- id := fmt.Sprintf("%x", md5.Sum(key))
- deleteResult, err := store.client.Delete().
- Index(indexKV).
- Type(indexType).
- Id(id).
- Do(context.Background())
- if err == nil {
- if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
- return nil
- }
- }
- glog.Errorf("delete key(id:%s) %v.", string(key), err)
- return fmt.Errorf("delete key %v.", err)
-}
-
-func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
- id := fmt.Sprintf("%x", md5.Sum(key))
- searchResult, err := store.client.Get().
- Index(indexKV).
- Type(indexType).
- Id(id).
- Do(context.Background())
- if elastic.IsNotFound(err) {
- return nil, filer_pb.ErrNotFound
- }
- if searchResult != nil && searchResult.Found {
- esEntry := &ESKVEntry{}
- if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil {
- return []byte(esEntry.Value), nil
- }
- }
- glog.Errorf("find key(%s),%v.", string(key), err)
- return nil, filer_pb.ErrNotFound
-}
-
-func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
- id := fmt.Sprintf("%x", md5.Sum(key))
- esEntry := &ESKVEntry{
- string(key),
- string(value),
- }
- val, err := jsoniter.Marshal(esEntry)
- if err != nil {
- glog.Errorf("insert key(%s) %v.", string(key), err)
- return fmt.Errorf("insert key %v.", err)
- }
- _, err = store.client.Index().
- Index(indexKV).
- Type(indexType).
- Id(id).
- BodyJson(string(val)).
- Do(context.Background())
- if err != nil {
- return fmt.Errorf("kv put: %v", err)
- }
- return nil
-}
-
func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
return nil, filer.ErrUnsupportedListDirectoryPrefixed
}
@@ -154,7 +104,7 @@ func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry)
Type(indexType).
Id(id).
BodyJson(string(value)).
- Do(context.Background())
+ Do(ctx)
if err != nil {
glog.Errorf("insert entry(%s) %v.", string(entry.FullPath), err)
return fmt.Errorf("insert entry %v.", err)
@@ -171,7 +121,7 @@ func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
Index(index).
Type(indexType).
Id(id).
- Do(context.Background())
+ Do(ctx)
if elastic.IsNotFound(err) {
return nil, filer_pb.ErrNotFound
}
@@ -190,24 +140,24 @@ func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.F
index := getIndex(fullpath)
id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
if strings.Count(string(fullpath), "/") == 1 {
- return store.deleteIndex(index)
+ return store.deleteIndex(ctx, index)
}
- return store.deleteEntry(index, id)
+ return store.deleteEntry(ctx, index, id)
}
-func (store *ElasticStore) deleteIndex(index string) (err error) {
- deleteResult, err := store.client.DeleteIndex(index).Do(context.Background())
+func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) {
+ deleteResult, err := store.client.DeleteIndex(index).Do(ctx)
if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
return nil
}
glog.Errorf("delete index(%s) %v.", index, err)
return err
}
-func (store *ElasticStore) deleteEntry(index, id string) (err error) {
+func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) {
deleteResult, err := store.client.Delete().
Index(index).
Type(indexType).
Id(id).
- Do(context.Background())
+ Do(ctx)
if err == nil {
if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
return nil
@@ -235,7 +185,7 @@ func (store *ElasticStore) ListDirectoryEntries(
}
func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
- indexResult, err := store.client.CatIndices().Do(context.Background())
+ indexResult, err := store.client.CatIndices().Do(ctx)
if err != nil {
glog.Errorf("list indices %v.", err)
return entries, err
@@ -266,16 +216,16 @@ func (store *ElasticStore) listDirectoryEntries(
index := getIndex(fullpath)
nextStart := ""
parentId := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
- if _, err := store.client.Refresh(index).Do(context.Background()); err != nil {
+ if _, err := store.client.Refresh(index).Do(ctx); err != nil {
if elastic.IsNotFound(err) {
- store.client.CreateIndex(index).Do(context.Background())
+ store.client.CreateIndex(index).Do(ctx)
return entries, nil
}
}
for {
result := &elastic.SearchResult{}
if (startFileName == "" && first) || inclusive {
- if result, err = store.search(index, parentId); err != nil {
+ if result, err = store.search(ctx, index, parentId); err != nil {
glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
return entries, err
}
@@ -285,7 +235,7 @@ func (store *ElasticStore) listDirectoryEntries(
fullPath = nextStart
}
after := fmt.Sprintf("%x", md5.Sum([]byte(fullPath)))
- if result, err = store.searchAfter(index, parentId, after); err != nil {
+ if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
return entries, err
}
@@ -316,8 +266,8 @@ func (store *ElasticStore) listDirectoryEntries(
return entries, nil
}
-func (store *ElasticStore) search(index, parentId string) (result *elastic.SearchResult, err error) {
- if count, err := store.client.Count(index).Do(context.Background()); err == nil && count == 0 {
+func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) {
+ if count, err := store.client.Count(index).Do(ctx); err == nil && count == 0 {
return &elastic.SearchResult{
Hits: &elastic.SearchHits{
Hits: make([]*elastic.SearchHit, 0)},
@@ -328,18 +278,18 @@ func (store *ElasticStore) search(index, parentId string) (result *elastic.Searc
Query(elastic.NewMatchQuery("ParentId", parentId)).
Size(store.maxPageSize).
Sort("_id", false).
- Do(context.Background())
+ Do(ctx)
return queryResult, err
}
-func (store *ElasticStore) searchAfter(index, parentId, after string) (result *elastic.SearchResult, err error) {
+func (store *ElasticStore) searchAfter(ctx context.Context, index, parentId, after string) (result *elastic.SearchResult, err error) {
queryResult, err := store.client.Search().
Index(index).
Query(elastic.NewMatchQuery("ParentId", parentId)).
SearchAfter(after).
Size(store.maxPageSize).
Sort("_id", false).
- Do(context.Background())
+ Do(ctx)
return queryResult, err
}
diff --git a/weed/filer/elastic/v7/elastic_store_kv.go b/weed/filer/elastic/v7/elastic_store_kv.go
new file mode 100644
index 000000000..cfb3fdf8c
--- /dev/null
+++ b/weed/filer/elastic/v7/elastic_store_kv.go
@@ -0,0 +1,64 @@
+package elastic
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ jsoniter "github.com/json-iterator/go"
+ elastic "github.com/olivere/elastic/v7"
+)
+
+func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) {
+ deleteResult, err := store.client.Delete().
+ Index(indexKV).
+ Type(indexType).
+ Id(string(key)).
+ Do(ctx)
+ if err == nil {
+ if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
+ return nil
+ }
+ }
+ glog.Errorf("delete key(id:%s) %v.", string(key), err)
+ return fmt.Errorf("delete key %v.", err)
+}
+
+func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+ searchResult, err := store.client.Get().
+ Index(indexKV).
+ Type(indexType).
+ Id(string(key)).
+ Do(ctx)
+ if elastic.IsNotFound(err) {
+ return nil, filer_pb.ErrNotFound
+ }
+ if searchResult != nil && searchResult.Found {
+ esEntry := &ESKVEntry{}
+ if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil {
+ return []byte(esEntry.Value), nil
+ }
+ }
+ glog.Errorf("find key(%s),%v.", string(key), err)
+ return nil, filer_pb.ErrNotFound
+}
+
+func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+ esEntry := &ESKVEntry{string(value)}
+ val, err := jsoniter.Marshal(esEntry)
+ if err != nil {
+ glog.Errorf("insert key(%s) %v.", string(key), err)
+ return fmt.Errorf("insert key %v.", err)
+ }
+ _, err = store.client.Index().
+ Index(indexKV).
+ Type(indexType).
+ Id(string(key)).
+ BodyJson(string(val)).
+ Do(ctx)
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+ return nil
+}