aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorruitao.liu <ruitao.liu@cloudminds.com>2020-09-04 15:40:13 +0800
committerruitao.liu <ruitao.liu@cloudminds.com>2020-09-04 15:40:13 +0800
commit83080b5e034bdbc0ba58eb410d04fb78bebf08cf (patch)
tree1664d69d5838fbadce0bc6bc128ff7479f7dd3c6
parent0ddcc2a8f5f2b91a78466785bc61ec0e6c6264bc (diff)
downloadseaweedfs-83080b5e034bdbc0ba58eb410d04fb78bebf08cf.tar.xz
seaweedfs-83080b5e034bdbc0ba58eb410d04fb78bebf08cf.zip
ES backended filer support kv ops.
-rw-r--r--go.mod1
-rw-r--r--weed/command/scaffold.go2
-rw-r--r--weed/filer/elastic/v7/elastic_store.go63
3 files changed, 61 insertions, 5 deletions
diff --git a/go.mod b/go.mod
index d404b9d52..98ac2b4e5 100644
--- a/go.mod
+++ b/go.mod
@@ -27,7 +27,6 @@ require (
github.com/go-sql-driver/mysql v1.5.0
github.com/gocql/gocql v0.0.0-20190829130954-e163eff7a8c6
github.com/gogo/protobuf v1.2.2-0.20190730201129-28a6bbf47e48 // indirect
- github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/protobuf v1.4.2
github.com/google/btree v1.0.0
github.com/google/uuid v1.1.1
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 479e0665f..68fe8e982 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -177,7 +177,7 @@ database = "seaweedfs"
[elastic7]
enabled = false
servers = "http://localhost:9200"
-# increase the value is recommend, both here and in elastic cluster configuration
+# increase the value is recommend, be sure the value in Elastic is greater or equal here
index.max_result_window = 10000
`
diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go
index 5c57e352a..d263b5dae 100644
--- a/weed/filer/elastic/v7/elastic_store.go
+++ b/weed/filer/elastic/v7/elastic_store.go
@@ -18,6 +18,7 @@ import (
var (
indexType = "_doc"
indexPrefix = ".seaweedfs_"
+ indexKV = ".seaweedfs_kv_entries"
)
type ESEntry struct {
@@ -34,6 +35,11 @@ type ElasticStore struct {
maxPageSize int
}
+type ESKVEntry struct {
+ Key string `json:Key`
+ Value string `json:Value`
+}
+
func (store *ElasticStore) GetName() string {
return "elastic7"
}
@@ -66,15 +72,66 @@ func (store *ElasticStore) CommitTransaction(ctx context.Context) error {
func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
return nil
}
+
func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) {
- return filer.ErrKvNotImplemented
+ id := fmt.Sprintf("%x", md5.Sum(key))
+ deleteResult, err := store.client.Delete().
+ Index(indexKV).
+ Type(indexType).
+ Id(id).
+ Do(context.Background())
+ if err == nil {
+ if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
+ return nil
+ }
+ }
+ glog.Errorf("delete key(id:%s) %v.", string(key), err)
+ return fmt.Errorf("delete key %v.", err)
}
+
func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
- return []byte(""), filer.ErrKvNotImplemented
+ id := fmt.Sprintf("%x", md5.Sum(key))
+ searchResult, err := store.client.Get().
+ Index(indexKV).
+ Type(indexType).
+ Id(id).
+ Do(context.Background())
+ if elastic.IsNotFound(err) {
+ return nil, filer_pb.ErrNotFound
+ }
+ if searchResult != nil && searchResult.Found {
+ esEntry := &ESKVEntry{}
+ if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil {
+ return []byte(esEntry.Value), nil
+ }
+ }
+ glog.Errorf("find key(%s),%v.", string(key), err)
+ return nil, filer_pb.ErrNotFound
}
+
func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
- return filer.ErrKvNotImplemented
+ id := fmt.Sprintf("%x", md5.Sum(key))
+ esEntry := &ESKVEntry{
+ string(key),
+ string(value),
+ }
+ val, err := jsoniter.Marshal(esEntry)
+ if err != nil {
+ glog.Errorf("insert key(%s) %v.", string(key), err)
+ return fmt.Errorf("insert key %v.", err)
+ }
+ _, err = store.client.Index().
+ Index(indexKV).
+ Type(indexType).
+ Id(id).
+ BodyJson(string(val)).
+ Do(context.Background())
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+ return nil
}
+
func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
return nil, filer.ErrUnsupportedListDirectoryPrefixed
}