aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/elastic/v7/elastic_store.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/elastic/v7/elastic_store.go')
-rw-r--r--weed/filer/elastic/v7/elastic_store.go101
1 files changed, 35 insertions, 66 deletions
diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go
index ec88e10a5..a16e5ebca 100644
--- a/weed/filer/elastic/v7/elastic_store.go
+++ b/weed/filer/elastic/v7/elastic_store.go
@@ -96,12 +96,12 @@ func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
return nil
}
-func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
- index := getIndex(entry.FullPath)
+ index := getIndex(entry.FullPath, false)
dir, _ := entry.FullPath.DirAndName()
id := weed_util.Md5String([]byte(entry.FullPath))
esEntry := &ESEntry{
@@ -131,7 +131,7 @@ func (store *ElasticStore) UpdateEntry(ctx context.Context, entry *filer.Entry)
}
func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
- index := getIndex(fullpath)
+ index := getIndex(fullpath, false)
id := weed_util.Md5String([]byte(fullpath))
searchResult, err := store.client.Get().
Index(index).
@@ -154,7 +154,7 @@ func (store *ElasticStore) FindEntry(ctx context.Context, fullpath weed_util.Ful
}
func (store *ElasticStore) DeleteEntry(ctx context.Context, fullpath weed_util.FullPath) (err error) {
- index := getIndex(fullpath)
+ index := getIndex(fullpath, false)
id := weed_util.Md5String([]byte(fullpath))
if strings.Count(string(fullpath), "/") == 1 {
return store.deleteIndex(ctx, index)
@@ -187,62 +187,30 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e
}
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
- if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil {
- for _, entry := range entries {
- store.DeleteEntry(ctx, entry.FullPath)
+ _, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool {
+ if err := store.DeleteEntry(ctx, entry.FullPath); err != nil {
+ glog.Errorf("elastic delete %s: %v.", entry.FullPath, err)
+ return false
}
- }
- return nil
+ return true
+ })
+ return
}
-func (store *ElasticStore) ListDirectoryEntries(
- ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
-) (entries []*filer.Entry, err error) {
- if string(fullpath) == "/" {
- return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit)
- }
- return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit)
-}
-
-func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) {
- indexResult, err := store.client.CatIndices().Do(ctx)
- if err != nil {
- glog.Errorf("list indices %v.", err)
- return entries, err
- }
- for _, index := range indexResult {
- if index.Index == indexKV {
- continue
- }
- if strings.HasPrefix(index.Index, indexPrefix) {
- if entry, err := store.FindEntry(ctx,
- weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {
- fileName := getFileName(entry.FullPath)
- if fileName == startFileName && !inclusive {
- continue
- }
- limit--
- if limit < 0 {
- break
- }
- entries = append(entries, entry)
- }
- }
- }
- return entries, nil
+func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
}
func (store *ElasticStore) listDirectoryEntries(
- ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int,
-) (entries []*filer.Entry, err error) {
+ ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
first := true
- index := getIndex(fullpath)
+ index := getIndex(fullpath, true)
nextStart := ""
parentId := weed_util.Md5String([]byte(fullpath))
- if _, err := store.client.Refresh(index).Do(ctx); err != nil {
+ if _, err = store.client.Refresh(index).Do(ctx); err != nil {
if elastic.IsNotFound(err) {
store.client.CreateIndex(index).Do(ctx)
- return entries, nil
+ return
}
}
for {
@@ -250,7 +218,7 @@ func (store *ElasticStore) listDirectoryEntries(
if (startFileName == "" && first) || inclusive {
if result, err = store.search(ctx, index, parentId); err != nil {
glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
- return entries, err
+ return
}
} else {
fullPath := string(fullpath) + "/" + startFileName
@@ -260,7 +228,7 @@ func (store *ElasticStore) listDirectoryEntries(
after := weed_util.Md5String([]byte(fullPath))
if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
- return entries, err
+ return
}
}
first = false
@@ -272,21 +240,24 @@ func (store *ElasticStore) listDirectoryEntries(
if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil {
limit--
if limit < 0 {
- return entries, nil
+ return lastFileName, nil
}
nextStart = string(esEntry.Entry.FullPath)
- fileName := getFileName(esEntry.Entry.FullPath)
+ fileName := esEntry.Entry.FullPath.Name()
if fileName == startFileName && !inclusive {
continue
}
- entries = append(entries, esEntry.Entry)
+ if !eachEntryFunc(esEntry.Entry) {
+ break
+ }
+ lastFileName = fileName
}
}
if len(result.Hits.Hits) < store.maxPageSize {
break
}
}
- return entries, nil
+ return
}
func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) {
@@ -321,18 +292,16 @@ func (store *ElasticStore) Shutdown() {
store.client.Stop()
}
-func getIndex(fullpath weed_util.FullPath) string {
+func getIndex(fullpath weed_util.FullPath, isDirectory bool) string {
path := strings.Split(string(fullpath), "/")
- if len(path) > 1 {
- return indexPrefix + path[1]
+ if isDirectory && len(path) >= 2 {
+ return indexPrefix + strings.ToLower(path[1])
}
- return ""
-}
-
-func getFileName(fullpath weed_util.FullPath) string {
- path := strings.Split(string(fullpath), "/")
- if len(path) > 1 {
- return path[len(path)-1]
+ if len(path) > 2 {
+ return indexPrefix + strings.ToLower(path[1])
+ }
+ if len(path) == 2 {
+ return indexPrefix
}
return ""
}