diff options
| -rw-r--r-- | weed/mount/weedfs_dir_read.go | 50 |
1 files changed, 32 insertions, 18 deletions
diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go index ebf0d9191..c1f522a38 100644 --- a/weed/mount/weedfs_dir_read.go +++ b/weed/mount/weedfs_dir_read.go @@ -18,22 +18,28 @@ const ( directoryStreamBaseOffset = 2 // . & .. ) +// DirectoryHandle represents an open directory handle. +// It maintains state for directory listing pagination and is protected by a mutex +// to handle concurrent readdir operations from NFS-Ganesha and other multi-threaded clients. type DirectoryHandle struct { + sync.Mutex isFinished bool entryStream []*filer.Entry entryStreamOffset uint64 } func (dh *DirectoryHandle) reset() { - *dh = DirectoryHandle{ - isFinished: false, - entryStream: []*filer.Entry{}, - entryStreamOffset: directoryStreamBaseOffset, + dh.isFinished = false + // Nil out pointers to allow garbage collection of old entries, + // then reuse the slice's capacity to avoid re-allocations. + for i := range dh.entryStream { + dh.entryStream[i] = nil } + dh.entryStream = dh.entryStream[:0] + dh.entryStreamOffset = directoryStreamBaseOffset } type DirectoryHandleToInode struct { - // shares the file handle id sequencer with FileHandleToInode{nextFh} sync.Mutex dir2inode map[DirectoryHandleId]*DirectoryHandle } @@ -45,14 +51,14 @@ func NewDirectoryHandleToInode() *DirectoryHandleToInode { } func (wfs *WFS) AcquireDirectoryHandle() (DirectoryHandleId, *DirectoryHandle) { - fh := FileHandleId(util.RandomUint64()) + fh := DirectoryHandleId(util.RandomUint64()) wfs.dhMap.Lock() defer wfs.dhMap.Unlock() - dh := new(DirectoryHandle) + dh := &DirectoryHandle{} dh.reset() - wfs.dhMap.dir2inode[DirectoryHandleId(fh)] = dh - return DirectoryHandleId(fh), dh + wfs.dhMap.dir2inode[fh] = dh + return fh, dh } func (wfs *WFS) GetDirectoryHandle(dhid DirectoryHandleId) *DirectoryHandle { @@ -61,7 +67,7 @@ func (wfs *WFS) GetDirectoryHandle(dhid DirectoryHandleId) *DirectoryHandle { if dh, found := wfs.dhMap.dir2inode[dhid]; found { return dh } - dh := new(DirectoryHandle) + dh := &DirectoryHandle{} dh.reset() wfs.dhMap.dir2inode[dhid] = dh return dh @@ -137,7 +143,13 @@ func (wfs *WFS) ReadDirPlus(cancel <-chan struct{}, input *fuse.ReadIn, out *fus } func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPlusMode bool) fuse.Status { + // Get the directory handle and lock it for the duration of this operation. + // This serializes concurrent readdir calls on the same handle, fixing the + // race condition that caused hangs with NFS-Ganesha. dh := wfs.GetDirectoryHandle(DirectoryHandleId(input.Fh)) + dh.Lock() + defer dh.Unlock() + if input.Offset == 0 { dh.reset() } else if dh.isFinished && input.Offset >= dh.entryStreamOffset { @@ -154,7 +166,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl } var dirEntry fuse.DirEntry - processEachEntryFn := func(entry *filer.Entry) (bool, error) { + processEachEntryFn := func(entry *filer.Entry) bool { dirEntry.Name = entry.Name() dirEntry.Mode = toSyscallMode(entry.Mode) inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, isPlusMode) @@ -162,13 +174,13 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl if !isPlusMode { if !out.AddDirEntry(dirEntry) { isEarlyTerminated = true - return false, nil + return false } } else { entryOut := out.AddDirLookupEntry(dirEntry) if entryOut == nil { isEarlyTerminated = true - return false, nil + return false } if fh, found := wfs.fhMap.FindFileHandle(inode); found { glog.V(4).Infof("readdir opened file %s", dirPath.Child(dirEntry.Name)) @@ -176,7 +188,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl } wfs.outputFilerEntry(entryOut, inode, entry) } - return true, nil + return true } if input.Offset < directoryStreamBaseOffset { @@ -207,7 +219,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl entryCurrentIndex := input.Offset - dh.entryStreamOffset for uint64(len(dh.entryStream)) > entryCurrentIndex { entry := dh.entryStream[entryCurrentIndex] - if process, _ := processEachEntryFn(entry); process { + if processEachEntryFn(entry) { lastEntryName = entry.Name() entryCurrentIndex++ } else { @@ -217,14 +229,16 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl } } - var err error - if err = meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil { + if err := meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath); err != nil { glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) return fuse.EIO } listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) (bool, error) { dh.entryStream = append(dh.entryStream, entry) - return processEachEntryFn(entry) + if !processEachEntryFn(entry) { + return false, nil + } + return true, nil }) if listErr != nil { glog.Errorf("list meta cache: %v", listErr) |
