aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/weedfs_dir_read.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2022-05-24 11:10:13 -0700
committerGitHub <noreply@github.com>2022-05-24 11:10:13 -0700
commitdf50afc64270f1728a349d28f16dd5cab9eb5073 (patch)
treef208b710984346a3c424819cbe84997d0d1d67e6 /weed/mount/weedfs_dir_read.go
parenta67ad1e9e5fcb53224bb0753347ae4388d412d42 (diff)
parent0e6e72d46263a7041606a28edbd8285b4bbf900c (diff)
downloadseaweedfs-df50afc64270f1728a349d28f16dd5cab9eb5073.tar.xz
seaweedfs-df50afc64270f1728a349d28f16dd5cab9eb5073.zip
Merge pull request #3102 from coelho/fix-mount-readdir-partial
mount: fix partial ReadDir results
Diffstat (limited to 'weed/mount/weedfs_dir_read.go')
-rw-r--r--weed/mount/weedfs_dir_read.go125
1 files changed, 72 insertions, 53 deletions
diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go
index a1b4ac0d5..1baa68fa2 100644
--- a/weed/mount/weedfs_dir_read.go
+++ b/weed/mount/weedfs_dir_read.go
@@ -12,9 +12,22 @@ import (
type DirectoryHandleId uint64
+const (
+ directoryStreamBaseOffset = 2 // . & ..
+)
+
type DirectoryHandle struct {
- isFinished bool
- lastEntryName string
+ isFinished bool
+ entryStream []*filer.Entry
+ entryStreamOffset uint64
+}
+
+func (dh *DirectoryHandle) reset() {
+ *dh = DirectoryHandle{
+ isFinished: false,
+ entryStream: []*filer.Entry{},
+ entryStreamOffset: directoryStreamBaseOffset,
+ }
}
type DirectoryHandleToInode struct {
@@ -37,10 +50,8 @@ func (wfs *WFS) AcquireDirectoryHandle() (DirectoryHandleId, *DirectoryHandle) {
wfs.dhmap.Lock()
defer wfs.dhmap.Unlock()
- dh := &DirectoryHandle{
- isFinished: false,
- lastEntryName: "",
- }
+ dh := new(DirectoryHandle)
+ dh.reset()
wfs.dhmap.dir2inode[DirectoryHandleId(fh)] = dh
return DirectoryHandleId(fh), dh
}
@@ -51,11 +62,8 @@ func (wfs *WFS) GetDirectoryHandle(dhid DirectoryHandleId) *DirectoryHandle {
if dh, found := wfs.dhmap.dir2inode[dhid]; found {
return dh
}
- dh := &DirectoryHandle{
- isFinished: false,
- lastEntryName: "",
- }
-
+ dh := new(DirectoryHandle)
+ dh.reset()
wfs.dhmap.dir2inode[dhid] = dh
return dh
}
@@ -130,13 +138,12 @@ func (wfs *WFS) ReadDirPlus(cancel <-chan struct{}, input *fuse.ReadIn, out *fus
}
func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPlusMode bool) fuse.Status {
-
dh := wfs.GetDirectoryHandle(DirectoryHandleId(input.Fh))
if input.Offset == 0 {
- dh.isFinished = false
- dh.lastEntryName = ""
- } else {
- if dh.isFinished {
+ dh.reset()
+ } else if dh.isFinished && input.Offset >= dh.entryStreamOffset {
+ entryCurrentIndex := input.Offset - dh.entryStreamOffset
+ if uint64(len(dh.entryStream)) <= entryCurrentIndex {
return fuse.OK
}
}
@@ -148,29 +155,17 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
}
var dirEntry fuse.DirEntry
- if input.Offset == 0 {
- if !isPlusMode {
- out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."})
- out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."})
- } else {
- out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."})
- out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."})
- }
- }
-
- processEachEntryFn := func(entry *filer.Entry, isLast bool) bool {
+ processEachEntryFn := func(entry *filer.Entry) bool {
dirEntry.Name = entry.Name()
dirEntry.Mode = toSyscallMode(entry.Mode)
+ inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, isPlusMode)
+ dirEntry.Ino = inode
if !isPlusMode {
- inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, false)
- dirEntry.Ino = inode
if !out.AddDirEntry(dirEntry) {
isEarlyTerminated = true
return false
}
} else {
- inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, true)
- dirEntry.Ino = inode
entryOut := out.AddDirLookupEntry(dirEntry)
if entryOut == nil {
isEarlyTerminated = true
@@ -182,35 +177,59 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
}
wfs.outputFilerEntry(entryOut, inode, entry)
}
- dh.lastEntryName = entry.Name()
return true
}
- entryChan := make(chan *filer.Entry, 128)
- var err error
- go func() {
- if err = meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath, entryChan); err != nil {
- glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
+ if input.Offset < directoryStreamBaseOffset {
+ if !isPlusMode {
+ if input.Offset == 0 {
+ out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."})
+ }
+ out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."})
+ } else {
+ if input.Offset == 0 {
+ out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."})
+ }
+ out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."})
}
- close(entryChan)
- }()
- hasData := false
- for entry := range entryChan {
- hasData = true
- processEachEntryFn(entry, false)
- }
- if err != nil {
- return fuse.EIO
+ input.Offset = directoryStreamBaseOffset
}
- if !hasData {
- listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, dh.lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
- return processEachEntryFn(entry, false)
- })
- if listErr != nil {
- glog.Errorf("list meta cache: %v", listErr)
- return fuse.EIO
+ var lastEntryName string
+ if input.Offset >= dh.entryStreamOffset {
+ if input.Offset > dh.entryStreamOffset {
+ entryPreviousIndex := (input.Offset - dh.entryStreamOffset) - 1
+ if uint64(len(dh.entryStream)) > entryPreviousIndex {
+ lastEntryName = dh.entryStream[entryPreviousIndex].Name()
+ dh.entryStream = dh.entryStream[entryPreviousIndex:]
+ dh.entryStreamOffset = input.Offset - 1
+ }
}
+ entryCurrentIndex := input.Offset - dh.entryStreamOffset
+ for uint64(len(dh.entryStream)) > entryCurrentIndex {
+ entry := dh.entryStream[entryCurrentIndex]
+ if processEachEntryFn(entry) {
+ lastEntryName = entry.Name()
+ entryCurrentIndex++
+ } else {
+ // early terminated
+ return fuse.OK
+ }
+ }
+ }
+
+ var err error
+ if err = meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath, nil); err != nil {
+ glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
+ return fuse.EIO
+ }
+ listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
+ dh.entryStream = append(dh.entryStream, entry)
+ return processEachEntryFn(entry)
+ })
+ if listErr != nil {
+ glog.Errorf("list meta cache: %v", listErr)
+ return fuse.EIO
}
if !isEarlyTerminated {