aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mount/weedfs_dir_read.go50
1 files changed, 32 insertions, 18 deletions
diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go
index ebf0d9191..c1f522a38 100644
--- a/weed/mount/weedfs_dir_read.go
+++ b/weed/mount/weedfs_dir_read.go
@@ -18,22 +18,28 @@ const (
directoryStreamBaseOffset = 2 // . & ..
)
+// DirectoryHandle represents an open directory handle.
+// It maintains state for directory listing pagination and is protected by a mutex
+// to handle concurrent readdir operations from NFS-Ganesha and other multi-threaded clients.
type DirectoryHandle struct {
+ sync.Mutex
isFinished bool
entryStream []*filer.Entry
entryStreamOffset uint64
}
func (dh *DirectoryHandle) reset() {
- *dh = DirectoryHandle{
- isFinished: false,
- entryStream: []*filer.Entry{},
- entryStreamOffset: directoryStreamBaseOffset,
+ dh.isFinished = false
+ // Nil out pointers to allow garbage collection of old entries,
+ // then reuse the slice's capacity to avoid re-allocations.
+ for i := range dh.entryStream {
+ dh.entryStream[i] = nil
}
+ dh.entryStream = dh.entryStream[:0]
+ dh.entryStreamOffset = directoryStreamBaseOffset
}
type DirectoryHandleToInode struct {
- // shares the file handle id sequencer with FileHandleToInode{nextFh}
sync.Mutex
dir2inode map[DirectoryHandleId]*DirectoryHandle
}
@@ -45,14 +51,14 @@ func NewDirectoryHandleToInode() *DirectoryHandleToInode {
}
func (wfs *WFS) AcquireDirectoryHandle() (DirectoryHandleId, *DirectoryHandle) {
- fh := FileHandleId(util.RandomUint64())
+ fh := DirectoryHandleId(util.RandomUint64())
wfs.dhMap.Lock()
defer wfs.dhMap.Unlock()
- dh := new(DirectoryHandle)
+ dh := &DirectoryHandle{}
dh.reset()
- wfs.dhMap.dir2inode[DirectoryHandleId(fh)] = dh
- return DirectoryHandleId(fh), dh
+ wfs.dhMap.dir2inode[fh] = dh
+ return fh, dh
}
func (wfs *WFS) GetDirectoryHandle(dhid DirectoryHandleId) *DirectoryHandle {
@@ -61,7 +67,7 @@ func (wfs *WFS) GetDirectoryHandle(dhid DirectoryHandleId) *DirectoryHandle {
if dh, found := wfs.dhMap.dir2inode[dhid]; found {
return dh
}
- dh := new(DirectoryHandle)
+ dh := &DirectoryHandle{}
dh.reset()
wfs.dhMap.dir2inode[dhid] = dh
return dh
@@ -137,7 +143,13 @@ func (wfs *WFS) ReadDirPlus(cancel <-chan struct{}, input *fuse.ReadIn, out *fus
}
func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPlusMode bool) fuse.Status {
+ // Get the directory handle and lock it for the duration of this operation.
+ // This serializes concurrent readdir calls on the same handle, fixing the
+ // race condition that caused hangs with NFS-Ganesha.
dh := wfs.GetDirectoryHandle(DirectoryHandleId(input.Fh))
+ dh.Lock()
+ defer dh.Unlock()
+
if input.Offset == 0 {
dh.reset()
} else if dh.isFinished && input.Offset >= dh.entryStreamOffset {
@@ -154,7 +166,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
}
var dirEntry fuse.DirEntry
- processEachEntryFn := func(entry *filer.Entry) (bool, error) {
+ processEachEntryFn := func(entry *filer.Entry) bool {
dirEntry.Name = entry.Name()
dirEntry.Mode = toSyscallMode(entry.Mode)
inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, isPlusMode)
@@ -162,13 +174,13 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
if !isPlusMode {
if !out.AddDirEntry(dirEntry) {
isEarlyTerminated = true
- return false, nil
+ return false
}
} else {
entryOut := out.AddDirLookupEntry(dirEntry)
if entryOut == nil {
isEarlyTerminated = true
- return false, nil
+ return false
}
if fh, found := wfs.fhMap.FindFileHandle(inode); found {
glog.V(4).Infof("readdir opened file %s", dirPath.Child(dirEntry.Name))
@@ -176,7 +188,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
}
wfs.outputFilerEntry(entryOut, inode, entry)
}
- return true, nil
+ return true
}
if input.Offset < directoryStreamBaseOffset {
@@ -207,7 +219,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
entryCurrentIndex := input.Offset - dh.entryStreamOffset
for uint64(len(dh.entryStream)) > entryCurrentIndex {
entry := dh.entryStream[entryCurrentIndex]
- if process, _ := processEachEntryFn(entry); process {
+ if processEachEntryFn(entry) {
lastEntryName = entry.Name()
entryCurrentIndex++
} else {
@@ -217,14 +229,16 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
}
}
- var err error
- if err = meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil {
+ if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return fuse.EIO
}
listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) (bool, error) {
dh.entryStream = append(dh.entryStream, entry)
- return processEachEntryFn(entry)
+ if !processEachEntryFn(entry) {
+ return false, nil
+ }
+ return true, nil
})
if listErr != nil {
glog.Errorf("list meta cache: %v", listErr)