aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorruitao.liu <ruitao.liu@cloudminds.com>2020-09-10 14:22:07 +0800
committerruitao.liu <ruitao.liu@cloudminds.com>2020-09-10 14:22:07 +0800
commit3f7fbfddca8614fc0d5bb31960993599b5c44bca (patch)
treeaca29e242c134420b62d34d13c4bdb0a7c8b1f9a
parentdaf0a449f7424d4a8252673509af5afd0b9bd8ec (diff)
downloadseaweedfs-3f7fbfddca8614fc0d5bb31960993599b5c44bca.tar.xz
seaweedfs-3f7fbfddca8614fc0d5bb31960993599b5c44bca.zip
add more basic elastic options.
-rw-r--r--weed/command/scaffold.go6
-rw-r--r--weed/filer/elastic/v7/elastic_store.go50
2 files changed, 40 insertions, 16 deletions
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 68fe8e982..7ced118ca 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -176,7 +176,11 @@ database = "seaweedfs"
[elastic7]
enabled = false
-servers = "http://localhost:9200"
+servers = "http://localhost1:9200,http://localhost2:9200,http://localhost3:9200"
+username = ""
+password = ""
+sniff_enabled = false
+healthcheck_enabled = false
# increase the value is recommend, be sure the value in Elastic is greater or equal here
index.max_result_window = 10000
`
diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go
index 29e9689f4..f720fdea0 100644
--- a/weed/filer/elastic/v7/elastic_store.go
+++ b/weed/filer/elastic/v7/elastic_store.go
@@ -47,23 +47,12 @@ type ElasticStore struct {
func (store *ElasticStore) GetName() string {
return "elastic7"
}
+
func (store *ElasticStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
- servers := configuration.GetString(prefix + "servers")
- if servers == "" {
- return fmt.Errorf("error elastic endpoints.")
- }
- store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window")
- if store.maxPageSize <= 0 {
- store.maxPageSize = 10000
- }
- glog.Infof("filer store elastic endpoints: %s, index.max_result_window:%d", servers, store.maxPageSize)
- store.client, err = elastic.NewClient(
- elastic.SetSniff(false),
- elastic.SetHealthcheck(false),
- elastic.SetURL(servers),
- )
+ options := store.initialize(configuration, prefix)
+ store.client, err = elastic.NewClient(options...)
if err != nil {
- return fmt.Errorf("init elastic %s: %v.", servers, err)
+ return fmt.Errorf("init elastic %v.", err)
}
if ok, err := store.client.IndexExists(indexKV).Do(context.Background()); err == nil && !ok {
_, err = store.client.CreateIndex(indexKV).Body(mappingWithoutQuery).Do(context.Background())
@@ -73,6 +62,30 @@ func (store *ElasticStore) Initialize(configuration weed_util.Configuration, pre
}
return nil
}
+
+func (store *ElasticStore) initialize(configuration weed_util.Configuration, prefix string) (options []elastic.ClientOptionFunc) {
+ configuration.SetDefault(prefix+"servers", "http://localhost:9200")
+ servers := configuration.GetString(prefix + "servers")
+ url := make([]string, 0)
+ for _, v := range strings.Split(servers, ",") {
+ url = append(url, v)
+ }
+ options = append(options, elastic.SetURL(url...))
+ username := configuration.GetString(prefix + "username")
+ password := configuration.GetString(prefix + "password")
+ if username != "" && password != "" {
+ options = append(options, elastic.SetBasicAuth(username, password))
+ }
+ options = append(options, elastic.SetSniff(configuration.GetBool(prefix+"sniff_enabled")))
+ options = append(options, elastic.SetHealthcheck(configuration.GetBool(prefix+"healthcheck_enabled")))
+ store.maxPageSize = configuration.GetInt(prefix + "index.max_result_window")
+ if store.maxPageSize <= 0 {
+ store.maxPageSize = 10000
+ }
+ glog.Infof("filer store elastic endpoints: %s.", servers)
+ return options
+}
+
func (store *ElasticStore) BeginTransaction(ctx context.Context) (context.Context, error) {
return ctx, nil
}
@@ -82,6 +95,7 @@ func (store *ElasticStore) CommitTransaction(ctx context.Context) error {
func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
return nil
}
+
func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
return nil, filer.ErrUnsupportedListDirectoryPrefixed
}
@@ -111,9 +125,11 @@ func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry)
}
return nil
}
+
func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
return store.InsertEntry(ctx, entry)
}
+
func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
index := getIndex(fullpath)
id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
@@ -136,6 +152,7 @@ func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
glog.Errorf("find entry(%s),%v.", string(fullpath), err)
return nil, filer_pb.ErrNotFound
}
+
func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
index := getIndex(fullpath)
id := fmt.Sprintf("%x", md5.Sum([]byte(fullpath)))
@@ -144,6 +161,7 @@ func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.F
}
return store.deleteEntry(ctx, index, id)
}
+
func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err error) {
deleteResult, err := store.client.DeleteIndex(index).Do(ctx)
if elastic.IsNotFound(err) || (err == nil && deleteResult.Acknowledged) {
@@ -152,6 +170,7 @@ func (store *ElasticStore) deleteIndex(ctx context.Context, index string) (err e
glog.Errorf("delete index(%s) %v.", index, err)
return err
}
+
func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (err error) {
deleteResult, err := store.client.Delete().
Index(index).
@@ -166,6 +185,7 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e
glog.Errorf("delete entry(index:%s,_id:%s) %v.", index, id, err)
return fmt.Errorf("delete entry %v.", err)
}
+
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil {
for _, entry := range entries {