diff options
Diffstat (limited to 'weed/filer/elastic/v7/elastic_store.go')
| -rw-r--r-- | weed/filer/elastic/v7/elastic_store.go | 60 |
1 files changed, 31 insertions, 29 deletions
diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go index ec88e10a5..1e7f55599 100644 --- a/weed/filer/elastic/v7/elastic_store.go +++ b/weed/filer/elastic/v7/elastic_store.go @@ -96,8 +96,8 @@ func (store *ElasticStore) RollbackTransaction(ctx context.Context) error { return nil } -func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) { - return nil, filer.ErrUnsupportedListDirectoryPrefixed +func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed } func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) { @@ -187,28 +187,28 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e } func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) { - if entries, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil { - for _, entry := range entries { - store.DeleteEntry(ctx, entry.FullPath) + _, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool { + if err := store.DeleteEntry(ctx, entry.FullPath); err != nil { + glog.Errorf("elastic delete %s: %v.", entry.FullPath, err) + return false } - } - return nil + return true + }) + return } -func (store *ElasticStore) ListDirectoryEntries( - ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, -) (entries []*filer.Entry, err error) { - if string(fullpath) == "/" { - return store.listRootDirectoryEntries(ctx, startFileName, inclusive, limit) +func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { + if string(dirPath) == "/" { + return store.listRootDirectoryEntries(ctx, startFileName, includeStartFile, limit, eachEntryFunc) } - return store.listDirectoryEntries(ctx, fullpath, startFileName, inclusive, limit) + return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc) } -func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int) (entries []*filer.Entry, err error) { +func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { indexResult, err := store.client.CatIndices().Do(ctx) if err != nil { glog.Errorf("list indices %v.", err) - return entries, err + return } for _, index := range indexResult { if index.Index == indexKV { @@ -218,6 +218,7 @@ func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFi if entry, err := store.FindEntry(ctx, weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil { fileName := getFileName(entry.FullPath) + lastFileName = fileName if fileName == startFileName && !inclusive { continue } @@ -225,24 +226,25 @@ func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFi if limit < 0 { break } - entries = append(entries, entry) + if !eachEntryFunc(entry) { + break + } } } } - return entries, nil + return } func (store *ElasticStore) listDirectoryEntries( - ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, -) (entries []*filer.Entry, err error) { + ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) { first := true index := getIndex(fullpath) nextStart := "" parentId := weed_util.Md5String([]byte(fullpath)) - if _, err := store.client.Refresh(index).Do(ctx); err != nil { + if _, err = store.client.Refresh(index).Do(ctx); err != nil { if elastic.IsNotFound(err) { store.client.CreateIndex(index).Do(ctx) - return entries, nil + return } } for { @@ -250,7 +252,7 @@ func (store *ElasticStore) listDirectoryEntries( if (startFileName == "" && first) || inclusive { if result, err = store.search(ctx, index, parentId); err != nil { glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) - return entries, err + return } } else { fullPath := string(fullpath) + "/" + startFileName @@ -260,7 +262,7 @@ func (store *ElasticStore) listDirectoryEntries( after := weed_util.Md5String([]byte(fullPath)) if result, err = store.searchAfter(ctx, index, parentId, after); err != nil { glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err) - return entries, err + return } } first = false @@ -272,21 +274,21 @@ func (store *ElasticStore) listDirectoryEntries( if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil { limit-- if limit < 0 { - return entries, nil + return lastFileName, nil } nextStart = string(esEntry.Entry.FullPath) fileName := getFileName(esEntry.Entry.FullPath) + lastFileName = fileName if fileName == startFileName && !inclusive { continue } - entries = append(entries, esEntry.Entry) + if !eachEntryFunc(esEntry.Entry) { + break + } } } - if len(result.Hits.Hits) < store.maxPageSize { - break - } } - return entries, nil + return } func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) { |
