aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys/dir.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-12-13 00:22:37 -0800
committerChris Lu <chris.lu@gmail.com>2019-12-13 00:22:37 -0800
commit0fa1269bc77abe30f4d108a88a97e29e1bca3124 (patch)
tree5cc1c65ea9119dc23a5a4ded21e775d62c1bf9f3 /weed/filesys/dir.go
parentd0b423bbc07368bc53a08aec47618924851725a1 (diff)
downloadseaweedfs-0fa1269bc77abe30f4d108a88a97e29e1bca3124.tar.xz
seaweedfs-0fa1269bc77abe30f4d108a88a97e29e1bca3124.zip
filer: streaming file listing
Diffstat (limited to 'weed/filesys/dir.go')
-rw-r--r--weed/filesys/dir.go74
1 files changed, 14 insertions, 60 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index abbc9288e..fd5356ae0 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -212,51 +212,23 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
- err = dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
-
- remaining := dir.wfs.option.DirListingLimit
-
- lastEntryName := ""
-
- for remaining >= 0 {
-
- request := &filer_pb.ListEntriesRequest{
- Directory: dir.Path,
- StartFromFileName: lastEntryName,
- Limit: filer2.PaginationSize,
- }
-
- glog.V(4).Infof("read directory: %v", request)
- resp, err := client.ListEntries(ctx, request)
- if err != nil {
- glog.V(0).Infof("list %s: %v", dir.Path, err)
- return fuse.EIO
- }
-
- cacheTtl := estimatedCacheTtl(len(resp.Entries))
-
- for _, entry := range resp.Entries {
- if entry.IsDirectory {
- dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir}
- ret = append(ret, dirent)
- } else {
- dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File}
- ret = append(ret, dirent)
- }
- dir.wfs.listDirectoryEntriesCache.Set(path.Join(dir.Path, entry.Name), entry, cacheTtl)
- lastEntryName = entry.Name
- }
-
- remaining -= len(resp.Entries)
-
- if len(resp.Entries) < filer2.PaginationSize {
- break
- }
+ cacheTtl := 3 * time.Second
+ readErr := filer2.ReadDirAllEntries(ctx, dir.wfs, dir.Path, "", func(entry *filer_pb.Entry, isLast bool) {
+ if entry.IsDirectory {
+ dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir}
+ ret = append(ret, dirent)
+ } else {
+ dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File}
+ ret = append(ret, dirent)
}
-
- return nil
+ cacheTtl = cacheTtl + 2 * time.Millisecond
+ dir.wfs.listDirectoryEntriesCache.Set(path.Join(dir.Path, entry.Name), entry, cacheTtl)
})
+ if readErr != nil {
+ glog.V(0).Infof("list %s: %v", dir.Path, err)
+ return ret, fuse.EIO
+ }
return ret, err
}
@@ -373,21 +345,3 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus
})
}
-
-func estimatedCacheTtl(numEntries int) time.Duration {
- if numEntries < 100 {
- // 30 ms per entry
- return 3 * time.Second
- }
- if numEntries < 1000 {
- // 10 ms per entry
- return 10 * time.Second
- }
- if numEntries < 10000 {
- // 10 ms per entry
- return 100 * time.Second
- }
-
- // 2 ms per entry
- return time.Duration(numEntries*2) * time.Millisecond
-}