aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-01-15 23:56:24 -0800
committerChris Lu <chris.lu@gmail.com>2021-01-15 23:56:24 -0800
commita4063a5437f0554962bcdadefc96c9131f8c0395 (patch)
treed5dc7e0937d00fd0b3586917656becdac2f8e16f
parent01dc8a43ba007d21f97c40271b366e538f752227 (diff)
downloadseaweedfs-a4063a5437f0554962bcdadefc96c9131f8c0395.tar.xz
seaweedfs-a4063a5437f0554962bcdadefc96c9131f8c0395.zip
add stream list directory entries
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go23
-rw-r--r--weed/filer/cassandra/cassandra_store.go18
-rw-r--r--weed/filer/elastic/v7/elastic_store.go58
-rw-r--r--weed/filer/etcd/etcd_store.go16
-rw-r--r--weed/filer/filer.go17
-rw-r--r--weed/filer/filer_search.go67
-rw-r--r--weed/filer/filerstore.go6
-rw-r--r--weed/filer/filerstore_translate_path.go24
-rw-r--r--weed/filer/filerstore_wrapper.go60
-rw-r--r--weed/filer/hbase/hbase_store.go21
-rw-r--r--weed/filer/leveldb/leveldb_store.go14
-rw-r--r--weed/filer/leveldb2/leveldb2_store.go14
-rw-r--r--weed/filer/leveldb3/leveldb3_store.go16
-rw-r--r--weed/filer/mongodb/mongodb_store.go21
-rw-r--r--weed/filer/redis/universal_redis_store.go18
-rw-r--r--weed/filer/redis2/universal_redis_store.go22
-rw-r--r--weed/filer/rocksdb/rocksdb_store.go26
-rw-r--r--weed/filesys/dir.go8
-rw-r--r--weed/filesys/meta_cache/meta_cache.go16
-rw-r--r--weed/server/filer_grpc_server.go42
20 files changed, 266 insertions, 241 deletions
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index bd1ac9734..f5dcf6e03 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -172,7 +172,7 @@ func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpat
return nil
}
-func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
+func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
sqlText := store.SqlListExclusive
if includeStartFile {
sqlText = store.SqlListInclusive
@@ -180,7 +180,7 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(dirPath)), startFileName, string(dirPath), prefix+"%", limit+1)
if err != nil {
- return nil, false, fmt.Errorf("list %s : %v", dirPath, err)
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
defer rows.Close()
@@ -189,30 +189,29 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
var data []byte
if err = rows.Scan(&name, &data); err != nil {
glog.V(0).Infof("scan %s : %v", dirPath, err)
- return nil, false, fmt.Errorf("scan %s: %v", dirPath, err)
+ return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
}
+ lastFileName = name
entry := &filer.Entry{
FullPath: util.NewFullPath(string(dirPath), name),
}
if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
- return nil, false, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
+ return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
}
- entries = append(entries, entry)
- }
+ if !eachEntryFunc(entry) {
+ break
+ }
- hasMore = int64(len(entries)) == limit+1
- if hasMore {
- entries = entries[:limit]
}
- return entries, hasMore, nil
+ return lastFileName, nil
}
-func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) {
- return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "")
+func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
}
func (store *AbstractSqlStore) Shutdown() {
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
index 06fb3af46..fd2ce91a6 100644
--- a/weed/filer/cassandra/cassandra_store.go
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -168,11 +168,11 @@ func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath
return nil
}
-func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
- return nil, false, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) {
+func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
if _, ok := store.isSuperLargeDirectory(string(dirPath)); ok {
return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing
@@ -190,23 +190,21 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath u
entry := &filer.Entry{
FullPath: util.NewFullPath(string(dirPath), name),
}
+ lastFileName = name
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
if err := iter.Close(); err != nil {
glog.V(0).Infof("list iterator close: %v", err)
}
- hasMore = int64(len(entries)) == limit+1
- if hasMore {
- entries = entries[:limit]
- }
-
- return entries, hasMore, err
+ return lastFileName, err
}
func (store *CassandraStore) Shutdown() {
diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go
index 0e055e1fc..1e7f55599 100644
--- a/weed/filer/elastic/v7/elastic_store.go
+++ b/weed/filer/elastic/v7/elastic_store.go
@@ -96,8 +96,8 @@ func (store *ElasticStore) RollbackTransaction(ctx context.Context) error {
return nil
}
-func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
- return nil, false, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *ElasticStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
func (store *ElasticStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
@@ -187,26 +187,28 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e
}
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
- if entries, _, err := store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32); err == nil {
- for _, entry := range entries {
- store.DeleteEntry(ctx, entry.FullPath)
+ _, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool {
+ if err := store.DeleteEntry(ctx, entry.FullPath); err != nil {
+ glog.Errorf("elastic delete %s: %v.", entry.FullPath, err)
+ return false
}
- }
- return nil
+ return true
+ })
+ return
}
-func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) {
+func (store *ElasticStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
if string(dirPath) == "/" {
- return store.listRootDirectoryEntries(ctx, startFileName, includeStartFile, limit)
+ return store.listRootDirectoryEntries(ctx, startFileName, includeStartFile, limit, eachEntryFunc)
}
- return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+ return store.listDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
}
-func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) {
+func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
indexResult, err := store.client.CatIndices().Do(ctx)
if err != nil {
glog.Errorf("list indices %v.", err)
- return entries, false, err
+ return
}
for _, index := range indexResult {
if index.Index == indexKV {
@@ -216,32 +218,33 @@ func (store *ElasticStore) listRootDirectoryEntries(ctx context.Context, startFi
if entry, err := store.FindEntry(ctx,
weed_util.FullPath("/"+strings.Replace(index.Index, indexPrefix, "", 1))); err == nil {
fileName := getFileName(entry.FullPath)
+ lastFileName = fileName
if fileName == startFileName && !inclusive {
continue
}
limit--
if limit < 0 {
- hasMore = true
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
}
}
- return entries, hasMore, nil
+ return
}
func (store *ElasticStore) listDirectoryEntries(
- ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int64,
-) (entries []*filer.Entry, hasMore bool, err error) {
+ ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
first := true
index := getIndex(fullpath)
nextStart := ""
parentId := weed_util.Md5String([]byte(fullpath))
- if _, err := store.client.Refresh(index).Do(ctx); err != nil {
+ if _, err = store.client.Refresh(index).Do(ctx); err != nil {
if elastic.IsNotFound(err) {
store.client.CreateIndex(index).Do(ctx)
- return entries, hasMore, nil
+ return
}
}
for {
@@ -249,7 +252,7 @@ func (store *ElasticStore) listDirectoryEntries(
if (startFileName == "" && first) || inclusive {
if result, err = store.search(ctx, index, parentId); err != nil {
glog.Errorf("search (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
- return entries, hasMore, err
+ return
}
} else {
fullPath := string(fullpath) + "/" + startFileName
@@ -259,7 +262,7 @@ func (store *ElasticStore) listDirectoryEntries(
after := weed_util.Md5String([]byte(fullPath))
if result, err = store.searchAfter(ctx, index, parentId, after); err != nil {
glog.Errorf("searchAfter (%s,%s,%t,%d) %v.", string(fullpath), startFileName, inclusive, limit, err)
- return entries, hasMore, err
+ return
}
}
first = false
@@ -271,22 +274,21 @@ func (store *ElasticStore) listDirectoryEntries(
if err := jsoniter.Unmarshal(hit.Source, esEntry); err == nil {
limit--
if limit < 0 {
- hasMore = true
- return entries, hasMore, nil
+ return lastFileName, nil
}
nextStart = string(esEntry.Entry.FullPath)
fileName := getFileName(esEntry.Entry.FullPath)
+ lastFileName = fileName
if fileName == startFileName && !inclusive {
continue
}
- entries = append(entries, esEntry.Entry)
+ if !eachEntryFunc(esEntry.Entry) {
+ break
+ }
}
}
- if len(result.Hits.Hits) < store.maxPageSize {
- break
- }
}
- return entries, hasMore, nil
+ return
}
func (store *ElasticStore) search(ctx context.Context, index, parentId string) (result *elastic.SearchResult, err error) {
diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go
index b7acc2049..8159c634d 100644
--- a/weed/filer/etcd/etcd_store.go
+++ b/weed/filer/etcd/etcd_store.go
@@ -139,17 +139,17 @@ func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath weed_
return nil
}
-func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
- return nil, false, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) {
+func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
directoryPrefix := genDirectoryKeyPrefix(dirPath, "")
resp, err := store.client.Get(ctx, string(directoryPrefix),
clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
if err != nil {
- return nil, false, fmt.Errorf("list %s : %v", dirPath, err)
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
for _, kv := range resp.Kvs {
@@ -160,9 +160,9 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_u
if fileName == startFileName && !includeStartFile {
continue
}
+ lastFileName = fileName
limit--
if limit < 0 {
- hasMore = true
break
}
entry := &filer.Entry{
@@ -173,10 +173,12 @@ func (store *EtcdStore) ListDirectoryEntries(ctx context.Context, dirPath weed_u
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
- return entries, hasMore, err
+ return lastFileName, err
}
func genKey(dirPath, fileName string) (key []byte) {
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index dd0fcf2cf..e59887763 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -281,22 +281,19 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
}
-func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string) (entries []*Entry, hasMore bool, expiredCount int64, lastFileName string, err error) {
- listedEntries, listHasMore, listErr := f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix)
- hasMore = listHasMore
- if listErr != nil {
- return listedEntries, hasMore, expiredCount, "", listErr
- }
- for _, entry := range listedEntries {
- lastFileName = entry.Name()
+func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
+ lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
if entry.TtlSec > 0 {
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
f.Store.DeleteOneEntry(ctx, entry)
expiredCount++
- continue
+ return true
}
}
- entries = append(entries, entry)
+ return eachEntryFunc(entry)
+ })
+ if err != nil {
+ return expiredCount, lastFileName, err
}
return
}
diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go
index 0fa57dd06..8c9688ceb 100644
--- a/weed/filer/filer_search.go
+++ b/weed/filer/filer_search.go
@@ -32,49 +32,76 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
var missedCount int64
var lastFileName string
- entries, hasMore, missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern)
+ missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit+1, prefix, restNamePattern, func(entry *Entry) bool {
+ entries = append(entries, entry)
+ return true
+ })
for missedCount > 0 && err == nil {
- var makeupEntries []*Entry
- makeupEntries, hasMore, missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern)
- for _, entry := range makeupEntries {
+ missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount+1, prefix, restNamePattern, func(entry *Entry) bool {
entries = append(entries, entry)
- }
+ return true
+ })
+ }
+
+ hasMore = int64(len(entries)) >= limit+1
+ if hasMore {
+ entries = entries[:limit]
}
return entries, hasMore, err
}
-func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix, restNamePattern string) (matchedEntries []*Entry, hasMore bool, missedCount int64, lastFileName string, err error) {
- var foundEntries []*Entry
+// For now, prefix and namePattern are mutually exclusive
+func (f *Filer) StreamListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, namePattern string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
+ if strings.HasSuffix(string(p), "/") && len(p) > 1 {
+ p = p[0 : len(p)-1]
+ }
- foundEntries, hasMore, lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix)
- if err != nil {
- return
+ prefixInNamePattern, restNamePattern := splitPattern(namePattern)
+ if prefixInNamePattern != "" {
+ prefix = prefixInNamePattern
+ }
+ var missedCount int64
+
+ missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, startFileName, inclusive, limit, prefix, restNamePattern, eachEntryFunc)
+
+ for missedCount > 0 && err == nil {
+ missedCount, lastFileName, err = f.doListPatternMatchedEntries(ctx, p, lastFileName, false, missedCount, prefix, restNamePattern, eachEntryFunc)
}
+
+ return
+}
+
+func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix, restNamePattern string, eachEntryFunc ListEachEntryFunc) (missedCount int64, lastFileName string, err error) {
+
if len(restNamePattern) == 0 {
- return foundEntries, false, 0, lastFileName, nil
+ lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, eachEntryFunc)
+ return 0, lastFileName, err
}
- for _, entry := range foundEntries {
+
+ lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
nameToTest := strings.ToLower(entry.Name())
if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && matched {
- matchedEntries = append(matchedEntries, entry)
+ if !eachEntryFunc(entry) {
+ return false
+ }
} else {
missedCount++
}
+ return true
+ })
+ if err != nil {
+ return
}
return
}
-func (f *Filer) doListValidEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string) (entries []*Entry, hasMore bool, lastFileName string, err error) {
- var makeupEntries []*Entry
+func (f *Filer) doListValidEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
var expiredCount int64
- entries, hasMore, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix)
+ expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, startFileName, inclusive, limit, prefix, eachEntryFunc)
for expiredCount > 0 && err == nil {
- makeupEntries, hasMore, expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix)
- if err == nil {
- entries = append(entries, makeupEntries...)
- }
+ expiredCount, lastFileName, err = f.doListDirectoryEntries(ctx, p, lastFileName, false, expiredCount, prefix, eachEntryFunc)
}
return
}
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index a28c4d01b..8955a25c7 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -13,6 +13,8 @@ var (
ErrKvNotFound = errors.New("kv: not found")
)
+type ListEachEntryFunc func(entry *Entry) bool
+
type FilerStore interface {
// GetName gets the name to locate the configuration in filer.toml file
GetName() string
@@ -24,8 +26,8 @@ type FilerStore interface {
FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
DeleteEntry(context.Context, util.FullPath) (err error)
DeleteFolderChildren(context.Context, util.FullPath) (err error)
- ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) ([]*Entry, bool, error)
- ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) ([]*Entry, bool, error)
+ ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
+ ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error)
BeginTransaction(ctx context.Context) (context.Context, error)
CommitTransaction(ctx context.Context) error
diff --git a/weed/filer/filerstore_translate_path.go b/weed/filer/filerstore_translate_path.go
index 0e2ddd914..00bf82ed4 100644
--- a/weed/filer/filerstore_translate_path.go
+++ b/weed/filer/filerstore_translate_path.go
@@ -106,32 +106,24 @@ func (t *FilerStorePathTranlator) DeleteFolderChildren(ctx context.Context, fp u
return t.actualStore.DeleteFolderChildren(ctx, newFullPath)
}
-func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) ([]*Entry, bool, error) {
+func (t *FilerStorePathTranlator) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
newFullPath := t.translatePath(dirPath)
- entries, hasMore, err := t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, hasMore, err
- }
- for _, entry := range entries {
+ return t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
- }
- return entries, hasMore, err
+ return eachEntryFunc(entry)
+ })
}
-func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) ([]*Entry, bool, error) {
+func (t *FilerStorePathTranlator) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) {
newFullPath := t.translatePath(dirPath)
- entries, hasMore, err := t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix)
- if err != nil {
- return nil, hasMore, err
- }
- for _, entry := range entries {
+ return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
- }
- return entries, hasMore, nil
+ return eachEntryFunc(entry)
+ })
}
func (t *FilerStorePathTranlator) BeginTransaction(ctx context.Context) (context.Context, error) {
diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go
index 2bb7793f0..64baac371 100644
--- a/weed/filer/filerstore_wrapper.go
+++ b/weed/filer/filerstore_wrapper.go
@@ -194,7 +194,7 @@ func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.
return actualStore.DeleteFolderChildren(ctx, fp)
}
-func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) ([]*Entry, bool, error) {
+func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) {
actualStore := fsw.getActualStore(dirPath + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "list").Inc()
start := time.Now()
@@ -203,18 +203,14 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath
}()
glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
- entries, hasMore, err := actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, hasMore, err
- }
- for _, entry := range entries {
+ return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.Chunks)
- }
- return entries, hasMore, err
+ return eachEntryFunc(entry)
+ })
}
-func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) ([]*Entry, bool, error) {
+func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
actualStore := fsw.getActualStore(dirPath + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "prefixList").Inc()
start := time.Now()
@@ -222,48 +218,52 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
}()
glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
- entries, hasMore, err := actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
+ lastFileName, err = actualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, eachEntryFunc)
if err == ErrUnsupportedListDirectoryPrefixed {
- entries, hasMore, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
- }
- if err != nil {
- return nil, hasMore, err
- }
- for _, entry := range entries {
- fsw.maybeReadHardLink(ctx, entry)
- filer_pb.AfterEntryDeserialization(entry.Chunks)
+ lastFileName, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
+ fsw.maybeReadHardLink(ctx, entry)
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ return eachEntryFunc(entry)
+ })
}
- return entries, hasMore, nil
+ return lastFileName, err
}
-func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*Entry, hasMore bool, err error) {
+func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (lastFileName string, err error) {
actualStore := fsw.getActualStore(dirPath + "/")
- entries, hasMore, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, hasMore, err
- }
if prefix == "" {
+ return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, eachEntryFunc)
+ }
+
+ var notPrefixed []*Entry
+ lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
+ notPrefixed = append(notPrefixed, entry)
+ return true
+ })
+ if err != nil {
return
}
count := int64(0)
- var lastFileName string
- notPrefixed := entries
- entries = nil
for count < limit && len(notPrefixed) > 0 {
for _, entry := range notPrefixed {
- lastFileName = entry.Name()
if strings.HasPrefix(entry.Name(), prefix) {
count++
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ return
+ }
if count >= limit {
break
}
}
}
if count < limit {
- notPrefixed, hasMore, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
+ notPrefixed = notPrefixed[:0]
+ _, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
+ notPrefixed = append(notPrefixed, entry)
+ return true
+ })
if err != nil {
return
}
diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go
index c93374b15..2e4491515 100644
--- a/weed/filer/hbase/hbase_store.go
+++ b/weed/filer/hbase/hbase_store.go
@@ -148,20 +148,18 @@ func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.Ful
return
}
-func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) ([]*filer.Entry, bool, error) {
- return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "")
+func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
-func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) ([]*filer.Entry, bool, error) {
+func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
expectedPrefix := []byte(dirPath.Child(prefix))
scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
if err != nil {
- return nil, false, err
+ return lastFileName, err
}
- var hasMore bool
- var entries []*filer.Entry
scanner := store.Client.Scan(scan)
defer scanner.Close()
for {
@@ -170,7 +168,7 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
break
}
if err != nil {
- return entries, hasMore, err
+ return lastFileName, err
}
if len(res.Cells) == 0 {
continue
@@ -187,6 +185,8 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
continue
}
+ lastFileName = fileName
+
value := cell.Value
if fileName == startFileName && !includeStartFile {
@@ -195,7 +195,6 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
limit--
if limit < 0 {
- hasMore = true
break
}
entry := &filer.Entry{
@@ -206,10 +205,12 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
- return entries, hasMore, nil
+ return lastFileName, nil
}
func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) {
diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go
index c968ca2da..f0ae64769 100644
--- a/weed/filer/leveldb/leveldb_store.go
+++ b/weed/filer/leveldb/leveldb_store.go
@@ -162,11 +162,11 @@ func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
return nil
}
-func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) {
- return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "")
+func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
-func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
+func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
lastFileStart := directoryPrefix
@@ -187,9 +187,9 @@ func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
if fileName == startFileName && !includeStartFile {
continue
}
+ lastFileName = fileName
limit--
if limit < 0 {
- hasMore = true
break
}
entry := &filer.Entry{
@@ -200,11 +200,13 @@ func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
iter.Release()
- return entries, hasMore, err
+ return lastFileName, err
}
func genKey(dirPath, fileName string) (key []byte) {
diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go
index cc0d67eac..965721460 100644
--- a/weed/filer/leveldb2/leveldb2_store.go
+++ b/weed/filer/leveldb2/leveldb2_store.go
@@ -171,11 +171,11 @@ func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath w
return nil
}
-func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) {
- return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "")
+func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
-func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
+func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
directoryPrefix, partitionId := genDirectoryKeyPrefix(dirPath, prefix, store.dbCount)
lastFileStart := directoryPrefix
@@ -196,9 +196,9 @@ func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, di
if fileName == startFileName && !includeStartFile {
continue
}
+ lastFileName = fileName
limit--
if limit < 0 {
- hasMore = true
break
}
entry := &filer.Entry{
@@ -211,11 +211,13 @@ func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, di
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
iter.Release()
- return entries, hasMore, err
+ return lastFileName, err
}
func genKey(dirPath, fileName string, dbCount int) (key []byte, partitionId int) {
diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go
index afbcabaa6..24e00edc7 100644
--- a/weed/filer/leveldb3/leveldb3_store.go
+++ b/weed/filer/leveldb3/leveldb3_store.go
@@ -286,15 +286,15 @@ func (store *LevelDB3Store) DeleteFolderChildren(ctx context.Context, fullpath w
return nil
}
-func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) {
- return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "")
+func (store *LevelDB3Store) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
-func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
+func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
db, _, shortPath, err := store.findDB(dirPath, true)
if err != nil {
- return nil, false, fmt.Errorf("findDB %s : %v", dirPath, err)
+ return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
}
directoryPrefix := genDirectoryKeyPrefix(shortPath, prefix)
@@ -316,9 +316,9 @@ func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, di
if fileName == startFileName && !includeStartFile {
continue
}
+ lastFileName = fileName
limit--
if limit < 0 {
- hasMore = true
break
}
entry := &filer.Entry{
@@ -331,11 +331,13 @@ func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, di
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
iter.Release()
- return entries, hasMore, err
+ return lastFileName, err
}
func genKey(dirPath, fileName string) (key []byte) {
diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go
index 2e9556f1f..1ef5056f4 100644
--- a/weed/filer/mongodb/mongodb_store.go
+++ b/weed/filer/mongodb/mongodb_store.go
@@ -178,11 +178,11 @@ func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath ut
return nil
}
-func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
- return nil, false, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) {
+func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
var where = bson.M{"directory": string(dirPath), "name": bson.M{"$gt": startFileName}}
if includeStartFile {
@@ -190,38 +190,37 @@ func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath uti
"$gte": startFileName,
}
}
- optLimit := int64(limit + 1)
+ optLimit := int64(limit)
opts := &options.FindOptions{Limit: &optLimit, Sort: bson.M{"name": 1}}
cur, err := store.connect.Database(store.database).Collection(store.collectionName).Find(ctx, where, opts)
for cur.Next(ctx) {
var data Model
err := cur.Decode(&data)
if err != nil && err != mongo.ErrNoDocuments {
- return nil, false, err
+ return lastFileName, err
}
entry := &filer.Entry{
FullPath: util.NewFullPath(string(dirPath), data.Name),
}
+ lastFileName = data.Name
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta)); decodeErr != nil {
err = decodeErr
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
break
}
- entries = append(entries, entry)
- }
+ if !eachEntryFunc(entry) {
+ break
+ }
- hasMore = int64(len(entries)) == limit+1
- if hasMore {
- entries = entries[:limit]
}
if err := cur.Close(ctx); err != nil {
glog.V(0).Infof("list iterator close: %v", err)
}
- return entries, hasMore, err
+ return lastFileName, err
}
func (store *MongodbStore) Shutdown() {
diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go
index 8399b4e99..30d11a7f4 100644
--- a/weed/filer/redis/universal_redis_store.go
+++ b/weed/filer/redis/universal_redis_store.go
@@ -125,16 +125,16 @@ func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, full
return nil
}
-func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
- return nil, false, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *UniversalRedisStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) {
+func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath))
members, err := store.Client.SMembers(ctx, dirListKey).Result()
if err != nil {
- return nil, false, fmt.Errorf("list %s : %v", dirPath, err)
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
// skip
@@ -160,15 +160,15 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirP
})
// limit
- if limit < int64(len(entries)) {
+ if limit < int64(len(members)) {
members = members[:limit]
- hasMore = true
}
// fetch entry meta
for _, fileName := range members {
path := util.NewFullPath(string(dirPath), fileName)
entry, err := store.FindEntry(ctx, path)
+ lastFileName = fileName
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
if err == filer_pb.ErrNotFound {
@@ -182,11 +182,13 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirP
continue
}
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
}
- return entries, hasMore, err
+ return lastFileName, err
}
func genDirectoryListKey(dir string) (dirList string) {
diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go
index 7b4e9d325..aab3d1f4a 100644
--- a/weed/filer/redis2/universal_redis_store.go
+++ b/weed/filer/redis2/universal_redis_store.go
@@ -149,11 +149,11 @@ func (store *UniversalRedis2Store) DeleteFolderChildren(ctx context.Context, ful
return nil
}
-func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
- return nil, false, filer.ErrUnsupportedListDirectoryPrefixed
+func (store *UniversalRedis2Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
}
-func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) {
+func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
dirListKey := genDirectoryListKey(string(dirPath))
start := int64(0)
@@ -163,20 +163,16 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dir
start++
}
}
- members, err := store.Client.ZRange(ctx, dirListKey, start, start+int64(limit)-1+1).Result()
+ members, err := store.Client.ZRange(ctx, dirListKey, start, start+int64(limit)-1).Result()
if err != nil {
- return nil, false, fmt.Errorf("list %s : %v", dirPath, err)
- }
-
- hasMore = int64(len(entries)) == limit+1
- if hasMore {
- members = members[:len(members)-1]
+ return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
}
// fetch entry meta
for _, fileName := range members {
path := util.NewFullPath(string(dirPath), fileName)
entry, err := store.FindEntry(ctx, path)
+ lastFileName = fileName
if err != nil {
glog.V(0).Infof("list %s : %v", path, err)
if err == filer_pb.ErrNotFound {
@@ -190,11 +186,13 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dir
continue
}
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ break
+ }
}
}
- return entries, hasMore, err
+ return lastFileName, err
}
func genDirectoryListKey(dir string) (dirList string) {
diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go
index 98023e82e..70c301725 100644
--- a/weed/filer/rocksdb/rocksdb_store.go
+++ b/weed/filer/rocksdb/rocksdb_store.go
@@ -158,7 +158,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
iter := store.db.NewIterator(ro)
defer iter.Close()
- _, err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool {
+ err = enumerate(iter, directoryPrefix, nil, false, -1, func(key, value []byte) bool {
batch.Delete(key)
return true
})
@@ -175,7 +175,7 @@ func (store *RocksDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
return nil
}
-func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int64, fn func(key, value []byte) bool) (hasMore bool, err error) {
+func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey bool, limit int64, fn func(key, value []byte) bool) (err error) {
if len(lastKey) == 0 {
iter.Seek(prefix)
@@ -196,7 +196,6 @@ func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey
if limit > 0 {
i++
if i > limit {
- hasMore = true
break
}
}
@@ -216,16 +215,16 @@ func enumerate(iter *gorocksdb.Iterator, prefix, lastKey []byte, includeLastKey
}
if err := iter.Err(); err != nil {
- return hasMore, fmt.Errorf("prefix scan iterator: %v", err)
+ return fmt.Errorf("prefix scan iterator: %v", err)
}
- return hasMore, nil
+ return nil
}
-func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64) (entries []*filer.Entry, hasMore bool, err error) {
- return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "")
+func (store *RocksDBStore) ListDirectoryEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
-func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string) (entries []*filer.Entry, hasMore bool, err error) {
+func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath weed_util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
directoryPrefix := genDirectoryKeyPrefix(dirPath, prefix)
lastFileStart := directoryPrefix
@@ -239,7 +238,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
iter := store.db.NewIterator(ro)
defer iter.Close()
- hasMore, err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, func(key, value []byte) bool {
+ err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, func(key, value []byte) bool {
fileName := getNameFromKey(key)
if fileName == "" {
return true
@@ -247,6 +246,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
entry := &filer.Entry{
FullPath: weed_util.NewFullPath(string(dirPath), fileName),
}
+ lastFileName = fileName
// println("list", entry.FullPath, "chunks", len(entry.Chunks))
if decodeErr := entry.DecodeAttributesAndChunks(value); decodeErr != nil {
@@ -254,14 +254,16 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
glog.V(0).Infof("list %s : %v", entry.FullPath, err)
return false
}
- entries = append(entries, entry)
+ if !eachEntryFunc(entry) {
+ return false
+ }
return true
})
if err != nil {
- return entries, false, fmt.Errorf("prefix list %s : %v", dirPath, err)
+ return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
}
- return entries, false, err
+ return lastFileName, err
}
func genKey(dirPath, fileName string) (key []byte) {
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index f2fbd99d3..2cc332635 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -319,14 +319,14 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return nil, fuse.EIO
}
- listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int64(math.MaxInt32))
+ listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
+ processEachEntryFn(entry.ToProtoEntry(), false)
+ return true
+ })
if listErr != nil {
glog.Errorf("list meta cache: %v", listErr)
return nil, fuse.EIO
}
- for _, cachedEntry := range listedEntries {
- processEachEntryFn(cachedEntry.ToProtoEntry(), false)
- }
return
}
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index c16c4a938..f4e4d7d6e 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -117,22 +117,22 @@ func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err err
return mc.localStore.DeleteEntry(ctx, fp)
}
-func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64) ([]*filer.Entry, error) {
+func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
mc.RLock()
defer mc.RUnlock()
if !mc.visitedBoundary.HasVisited(dirPath) {
- return nil, fmt.Errorf("unsynchronized dir: %v", dirPath)
+ return fmt.Errorf("unsynchronized dir: %v", dirPath)
}
- entries, _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
+ _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
mc.mapIdFromFilerToLocal(entry)
+ return eachEntryFunc(entry)
+ })
+ if err != nil {
+ return err
}
- return entries, err
+ return err
}
func (mc *MetaCache) Shutdown() {
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 5cdf44e96..b0563d8bd 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -44,7 +44,7 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
}, nil
}
-func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error {
+func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) (err error) {
glog.V(4).Infof("ListEntries %v", req)
@@ -60,23 +60,12 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
+ var listErr error
for limit > 0 {
- entries, hasMore, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "")
-
- if err != nil {
- return err
- }
- if len(entries) == 0 {
- return nil
- }
-
- includeLastFile = false
-
- for _, entry := range entries {
-
- lastFileName = entry.Name()
-
- if err := stream.Send(&filer_pb.ListEntriesResponse{
+ var hasEntries bool
+ lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", func(entry *filer.Entry) bool {
+ hasEntries = true
+ if err = stream.Send(&filer_pb.ListEntriesResponse{
Entry: &filer_pb.Entry{
Name: entry.Name(),
IsDirectory: entry.IsDirectory(),
@@ -88,19 +77,28 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
Content: entry.Content,
},
}); err != nil {
- return err
+ return false
}
limit--
if limit == 0 {
- return nil
+ return false
}
- }
+ return true
+ })
- if !hasMore {
- break
+ if listErr != nil {
+ return listErr
+ }
+ if err != nil {
+ return err
+ }
+ if !hasEntries {
+ return nil
}
+ includeLastFile = false
+
}
return nil