aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorBruce Zou <gift_secondst@msn.com>2025-12-10 13:13:29 +0800
committerGitHub <noreply@github.com>2025-12-09 21:13:29 -0800
commit4bc2b6d62b70cbc322706191e0e98228af4729e3 (patch)
treeb7f3f1442612723120b38a747600b955bff0022e /weed
parentd970c15d71be190e994f22bda5da461c5327131d (diff)
downloadseaweedfs-4bc2b6d62b70cbc322706191e0e98228af4729e3.tar.xz
seaweedfs-4bc2b6d62b70cbc322706191e0e98228af4729e3.zip
fix nfs list with prefix batch scan (#7694)
* fix nfs list with prefix batch scan * remove else branch
Diffstat (limited to 'weed')
-rw-r--r--weed/mount/weedfs_dir_read.go99
1 files changed, 69 insertions, 30 deletions
diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go
index 22af4cc5c..492b5422d 100644
--- a/weed/mount/weedfs_dir_read.go
+++ b/weed/mount/weedfs_dir_read.go
@@ -2,7 +2,6 @@ package mount
import (
"context"
- "math"
"sync"
"github.com/hanwen/go-fuse/v2/fuse"
@@ -16,6 +15,7 @@ type DirectoryHandleId uint64
const (
directoryStreamBaseOffset = 2 // . & ..
+ batchSize = 1000
)
// DirectoryHandle represents an open directory handle.
@@ -166,11 +166,17 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
}
var dirEntry fuse.DirEntry
- processEachEntryFn := func(entry *filer.Entry) bool {
+
+ // index is the position in entryStream, used to calculate the offset for next readdir
+ processEachEntryFn := func(entry *filer.Entry, index int64) bool {
dirEntry.Name = entry.Name()
dirEntry.Mode = toSyscallMode(entry.Mode)
inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, false)
dirEntry.Ino = inode
+
+ // Set Off to the next offset so client can resume from correct position
+ dirEntry.Off = dh.entryStreamOffset + uint64(index) + 1
+
if !isPlusMode {
if !out.AddDirEntry(dirEntry) {
isEarlyTerminated = true
@@ -195,59 +201,92 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
if input.Offset < directoryStreamBaseOffset {
if !isPlusMode {
if input.Offset == 0 {
- out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."})
+ out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".", Off: 1})
}
- out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."})
+ out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "..", Off: 2})
} else {
if input.Offset == 0 {
- out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."})
+ out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".", Off: 1})
}
- out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."})
+ out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "..", Off: 2})
}
input.Offset = directoryStreamBaseOffset
}
var lastEntryName string
+
+ // Read from cache first, then load next batch if needed
if input.Offset >= dh.entryStreamOffset {
+ // Handle case: new handle with non-zero offset but empty cache
+ // This happens when NFS-Ganesha opens multiple directory handles
+ if len(dh.entryStream) == 0 && input.Offset > dh.entryStreamOffset {
+ skipCount := int64(input.Offset - dh.entryStreamOffset)
+
+ if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil {
+ glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
+ return fuse.EIO
+ }
+
+ // Load entries from beginning to fill cache up to the requested offset
+ loadErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, skipCount+int64(batchSize), func(entry *filer.Entry) (bool, error) {
+ dh.entryStream = append(dh.entryStream, entry)
+ return true, nil
+ })
+ if loadErr != nil {
+ glog.Errorf("list meta cache: %v", loadErr)
+ return fuse.EIO
+ }
+ }
+
if input.Offset > dh.entryStreamOffset {
entryPreviousIndex := (input.Offset - dh.entryStreamOffset) - 1
if uint64(len(dh.entryStream)) > entryPreviousIndex {
lastEntryName = dh.entryStream[entryPreviousIndex].Name()
- dh.entryStream = dh.entryStream[entryPreviousIndex:]
- dh.entryStreamOffset = input.Offset - 1
}
}
- entryCurrentIndex := input.Offset - dh.entryStreamOffset
- for uint64(len(dh.entryStream)) > entryCurrentIndex {
+
+ entryCurrentIndex := int64(input.Offset - dh.entryStreamOffset)
+ for int64(len(dh.entryStream)) > entryCurrentIndex {
entry := dh.entryStream[entryCurrentIndex]
- if processEachEntryFn(entry) {
+ if processEachEntryFn(entry, entryCurrentIndex) {
lastEntryName = entry.Name()
entryCurrentIndex++
} else {
- // early terminated
return fuse.OK
}
}
- }
- if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil {
- glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
- return fuse.EIO
- }
- listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) (bool, error) {
- dh.entryStream = append(dh.entryStream, entry)
- if !processEachEntryFn(entry) {
- return false, nil
- }
- return true, nil
- })
- if listErr != nil {
- glog.Errorf("list meta cache: %v", listErr)
- return fuse.EIO
- }
+ // Cache exhausted, load next batch
+ if !isEarlyTerminated {
+ if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil {
+ glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
+ return fuse.EIO
+ }
+
+ // Batch loading: fetch batchSize entries starting from lastEntryName
+ loadedCount := 0
+ bufferFull := false
+ loadErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(batchSize), func(entry *filer.Entry) (bool, error) {
+ currentIndex := int64(len(dh.entryStream))
+ dh.entryStream = append(dh.entryStream, entry)
+ loadedCount++
+ if !processEachEntryFn(entry, currentIndex) {
+ bufferFull = true
+ return false, nil
+ }
+ return true, nil
+ })
+ if loadErr != nil {
+ glog.Errorf("list meta cache: %v", loadErr)
+ return fuse.EIO
+ }
- if !isEarlyTerminated {
- dh.isFinished = true
+ // Mark finished only when loading completed normally (not buffer full)
+ // and we got fewer entries than requested
+ if !bufferFull && loadedCount < batchSize {
+ dh.isFinished = true
+ }
+ }
}
return fuse.OK