aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/weedfs_dir_read.go
diff options
context:
space:
mode:
authorRobert Coelho <me@iscoelho.com>2022-05-24 12:16:58 -0400
committerRobert Coelho <me@iscoelho.com>2022-05-24 12:16:58 -0400
commitcb422d96f7976c35838d6d392c23931b58f7b723 (patch)
tree96f01f58663c34237046c037fd2cd92bd9d76a9e /weed/mount/weedfs_dir_read.go
parent05602167f4b91b972bd6ac8d0b741bd5579e6d0a (diff)
downloadseaweedfs-cb422d96f7976c35838d6d392c23931b58f7b723.tar.xz
seaweedfs-cb422d96f7976c35838d6d392c23931b58f7b723.zip
mount: rewrite ReadDir to respect input.Offset to fix partial results
Diffstat (limited to 'weed/mount/weedfs_dir_read.go')
-rw-r--r--weed/mount/weedfs_dir_read.go124
1 files changed, 71 insertions, 53 deletions
diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go
index a1b4ac0d5..6250283ca 100644
--- a/weed/mount/weedfs_dir_read.go
+++ b/weed/mount/weedfs_dir_read.go
@@ -12,9 +12,22 @@ import (
type DirectoryHandleId uint64
+const (
+ directoryStreamBaseOffset = 2 // . & ..
+)
+
type DirectoryHandle struct {
- isFinished bool
- lastEntryName string
+ isFinished bool
+ entryStream []*filer.Entry
+ entryStreamOffset uint64
+}
+
+func (dh *DirectoryHandle) reset() {
+ *dh = DirectoryHandle{
+ isFinished: false,
+ entryStream: []*filer.Entry{},
+ entryStreamOffset: directoryStreamBaseOffset,
+ }
}
type DirectoryHandleToInode struct {
@@ -37,10 +50,8 @@ func (wfs *WFS) AcquireDirectoryHandle() (DirectoryHandleId, *DirectoryHandle) {
wfs.dhmap.Lock()
defer wfs.dhmap.Unlock()
- dh := &DirectoryHandle{
- isFinished: false,
- lastEntryName: "",
- }
+ dh := new(DirectoryHandle)
+ dh.reset()
wfs.dhmap.dir2inode[DirectoryHandleId(fh)] = dh
return DirectoryHandleId(fh), dh
}
@@ -51,11 +62,8 @@ func (wfs *WFS) GetDirectoryHandle(dhid DirectoryHandleId) *DirectoryHandle {
if dh, found := wfs.dhmap.dir2inode[dhid]; found {
return dh
}
- dh := &DirectoryHandle{
- isFinished: false,
- lastEntryName: "",
- }
-
+ dh := new(DirectoryHandle)
+ dh.reset()
wfs.dhmap.dir2inode[dhid] = dh
return dh
}
@@ -130,13 +138,12 @@ func (wfs *WFS) ReadDirPlus(cancel <-chan struct{}, input *fuse.ReadIn, out *fus
}
func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPlusMode bool) fuse.Status {
-
dh := wfs.GetDirectoryHandle(DirectoryHandleId(input.Fh))
if input.Offset == 0 {
- dh.isFinished = false
- dh.lastEntryName = ""
- } else {
- if dh.isFinished {
+ dh.reset()
+ } else if dh.isFinished && input.Offset >= directoryStreamBaseOffset {
+ entryCurrentIndex := input.Offset - dh.entryStreamOffset
+ if uint64(len(dh.entryStream)) <= entryCurrentIndex {
return fuse.OK
}
}
@@ -148,29 +155,17 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
}
var dirEntry fuse.DirEntry
- if input.Offset == 0 {
- if !isPlusMode {
- out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."})
- out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."})
- } else {
- out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."})
- out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."})
- }
- }
-
- processEachEntryFn := func(entry *filer.Entry, isLast bool) bool {
+ processEachEntryFn := func(entry *filer.Entry) bool {
dirEntry.Name = entry.Name()
dirEntry.Mode = toSyscallMode(entry.Mode)
+ inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, isPlusMode)
+ dirEntry.Ino = inode
if !isPlusMode {
- inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, false)
- dirEntry.Ino = inode
if !out.AddDirEntry(dirEntry) {
isEarlyTerminated = true
return false
}
} else {
- inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, true)
- dirEntry.Ino = inode
entryOut := out.AddDirLookupEntry(dirEntry)
if entryOut == nil {
isEarlyTerminated = true
@@ -182,37 +177,60 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
}
wfs.outputFilerEntry(entryOut, inode, entry)
}
- dh.lastEntryName = entry.Name()
return true
}
- entryChan := make(chan *filer.Entry, 128)
- var err error
- go func() {
- if err = meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath, entryChan); err != nil {
- glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
+ if input.Offset < directoryStreamBaseOffset {
+ if !isPlusMode {
+ if input.Offset == 0 {
+ out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."})
+ }
+ out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."})
+ } else {
+ if input.Offset == 0 {
+ out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."})
+ }
+ out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."})
}
- close(entryChan)
- }()
- hasData := false
- for entry := range entryChan {
- hasData = true
- processEachEntryFn(entry, false)
- }
- if err != nil {
- return fuse.EIO
+ input.Offset = directoryStreamBaseOffset
}
- if !hasData {
- listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, dh.lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
- return processEachEntryFn(entry, false)
- })
- if listErr != nil {
- glog.Errorf("list meta cache: %v", listErr)
- return fuse.EIO
+ var lastEntryName string
+ if input.Offset >= directoryStreamBaseOffset {
+ if input.Offset > directoryStreamBaseOffset {
+ entryPreviousIndex := (input.Offset - dh.entryStreamOffset) - 1
+ if uint64(len(dh.entryStream)) > entryPreviousIndex {
+ lastEntryName = dh.entryStream[entryPreviousIndex].Name()
+ dh.entryStream = dh.entryStream[entryPreviousIndex:]
+ dh.entryStreamOffset = input.Offset - 1
+ }
+ }
+ entryCurrentIndex := input.Offset - dh.entryStreamOffset
+ for uint64(len(dh.entryStream)) > entryCurrentIndex {
+ entry := dh.entryStream[entryCurrentIndex]
+ if processEachEntryFn(entry) {
+ lastEntryName = entry.Name()
+ entryCurrentIndex++
+ } else {
+ // early terminated
+ return fuse.OK
+ }
}
}
+ var err error
+ if err = meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath, nil); err != nil {
+ glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
+ }
+ listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
+ dh.entryStream = append(dh.entryStream, entry)
+ return processEachEntryFn(entry)
+ })
+ if listErr != nil {
+ glog.Errorf("list meta cache: %v", listErr)
+ return fuse.EIO
+ }
+
if !isEarlyTerminated {
dh.isFinished = true
}