aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/weedfs_dir_read.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mount/weedfs_dir_read.go')
-rw-r--r--weed/mount/weedfs_dir_read.go221
1 files changed, 221 insertions, 0 deletions
diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go
new file mode 100644
index 000000000..4862167dc
--- /dev/null
+++ b/weed/mount/weedfs_dir_read.go
@@ -0,0 +1,221 @@
+package mount
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
+ "github.com/hanwen/go-fuse/v2/fuse"
+ "math"
+ "sync"
+)
+
+type DirectoryHandleId uint64
+
+type DirectoryHandle struct {
+ isFinished bool
+ lastEntryName string
+}
+
+type DirectoryHandleToInode struct {
+ // shares the file handle id sequencer with FileHandleToInode{nextFh}
+ sync.Mutex
+ dir2inode map[DirectoryHandleId]*DirectoryHandle
+}
+
+func NewDirectoryHandleToInode() *DirectoryHandleToInode {
+ return &DirectoryHandleToInode{
+ dir2inode: make(map[DirectoryHandleId]*DirectoryHandle),
+ }
+}
+
+func (wfs *WFS) AcquireDirectoryHandle() (DirectoryHandleId, *DirectoryHandle) {
+ wfs.fhmap.Lock()
+ fh := wfs.fhmap.nextFh
+ wfs.fhmap.nextFh++
+ wfs.fhmap.Unlock()
+
+ wfs.dhmap.Lock()
+ defer wfs.dhmap.Unlock()
+ dh := &DirectoryHandle{
+ isFinished: false,
+ lastEntryName: "",
+ }
+ wfs.dhmap.dir2inode[DirectoryHandleId(fh)] = dh
+ return DirectoryHandleId(fh), dh
+}
+
+func (wfs *WFS) GetDirectoryHandle(dhid DirectoryHandleId) *DirectoryHandle {
+ wfs.dhmap.Lock()
+ defer wfs.dhmap.Unlock()
+ if dh, found := wfs.dhmap.dir2inode[dhid]; found {
+ return dh
+ }
+ dh := &DirectoryHandle{
+ isFinished: false,
+ lastEntryName: "",
+ }
+
+ wfs.dhmap.dir2inode[dhid] = dh
+ return dh
+}
+
+func (wfs *WFS) ReleaseDirectoryHandle(dhid DirectoryHandleId) {
+ wfs.dhmap.Lock()
+ defer wfs.dhmap.Unlock()
+ delete(wfs.dhmap.dir2inode, dhid)
+}
+
+// Directory handling
+
+/** Open directory
+ *
+ * Unless the 'default_permissions' mount option is given,
+ * this method should check if opendir is permitted for this
+ * directory. Optionally opendir may also return an arbitrary
+ * filehandle in the fuse_file_info structure, which will be
+ * passed to readdir, releasedir and fsyncdir.
+ */
+func (wfs *WFS) OpenDir(cancel <-chan struct{}, input *fuse.OpenIn, out *fuse.OpenOut) (code fuse.Status) {
+ if !wfs.inodeToPath.HasInode(input.NodeId) {
+ return fuse.ENOENT
+ }
+ dhid, _ := wfs.AcquireDirectoryHandle()
+ out.Fh = uint64(dhid)
+ return fuse.OK
+}
+
+/** Release directory
+ *
+ * If the directory has been removed after the call to opendir, the
+ * path parameter will be NULL.
+ */
+func (wfs *WFS) ReleaseDir(input *fuse.ReleaseIn) {
+ wfs.ReleaseDirectoryHandle(DirectoryHandleId(input.Fh))
+}
+
+/** Synchronize directory contents
+ *
+ * If the directory has been removed after the call to opendir, the
+ * path parameter will be NULL.
+ *
+ * If the datasync parameter is non-zero, then only the user data
+ * should be flushed, not the meta data
+ */
+func (wfs *WFS) FsyncDir(cancel <-chan struct{}, input *fuse.FsyncIn) (code fuse.Status) {
+ return fuse.OK
+}
+
+/** Read directory
+ *
+ * The filesystem may choose between two modes of operation:
+ *
+ * 1) The readdir implementation ignores the offset parameter, and
+ * passes zero to the filler function's offset. The filler
+ * function will not return '1' (unless an error happens), so the
+ * whole directory is read in a single readdir operation.
+ *
+ * 2) The readdir implementation keeps track of the offsets of the
+ * directory entries. It uses the offset parameter and always
+ * passes non-zero offset to the filler function. When the buffer
+ * is full (or an error happens) the filler function will return
+ * '1'.
+ */
+func (wfs *WFS) ReadDir(cancel <-chan struct{}, input *fuse.ReadIn, out *fuse.DirEntryList) (code fuse.Status) {
+ return wfs.doReadDirectory(input, out, false)
+}
+
+func (wfs *WFS) ReadDirPlus(cancel <-chan struct{}, input *fuse.ReadIn, out *fuse.DirEntryList) (code fuse.Status) {
+ return wfs.doReadDirectory(input, out, true)
+}
+
+func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPlusMode bool) fuse.Status {
+
+ dh := wfs.GetDirectoryHandle(DirectoryHandleId(input.Fh))
+ if dh.isFinished {
+ if input.Offset == 0 {
+ dh.isFinished = false
+ dh.lastEntryName = ""
+ } else {
+ return fuse.OK
+ }
+ }
+
+ isEarlyTerminated := false
+ dirPath, code := wfs.inodeToPath.GetPath(input.NodeId)
+ if code != fuse.OK {
+ return code
+ }
+
+ var dirEntry fuse.DirEntry
+ if input.Offset == 0 {
+ if !isPlusMode {
+ out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."})
+ out.AddDirEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."})
+ } else {
+ out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: "."})
+ out.AddDirLookupEntry(fuse.DirEntry{Mode: fuse.S_IFDIR, Name: ".."})
+ }
+ }
+
+ processEachEntryFn := func(entry *filer.Entry, isLast bool) bool {
+ dirEntry.Name = entry.Name()
+ dirEntry.Mode = toSyscallMode(entry.Mode)
+ if !isPlusMode {
+ inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Mode, false, entry.Inode, false)
+ dirEntry.Ino = inode
+ if !out.AddDirEntry(dirEntry) {
+ isEarlyTerminated = true
+ return false
+ }
+ } else {
+ inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Mode, false, entry.Inode, true)
+ dirEntry.Ino = inode
+ entryOut := out.AddDirLookupEntry(dirEntry)
+ if entryOut == nil {
+ isEarlyTerminated = true
+ return false
+ }
+ if fh, found := wfs.fhmap.FindFileHandle(inode); found {
+ glog.V(4).Infof("readdir opened file %s", dirPath.Child(dirEntry.Name))
+ entry = filer.FromPbEntry(string(dirPath), fh.entry)
+ }
+ wfs.outputFilerEntry(entryOut, inode, entry)
+ }
+ dh.lastEntryName = entry.Name()
+ return true
+ }
+
+ entryChan := make(chan *filer.Entry, 128)
+ var err error
+ go func() {
+ if err = meta_cache.EnsureVisited(wfs.metaCache, wfs, dirPath, entryChan); err != nil {
+ glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
+ }
+ close(entryChan)
+ }()
+ hasData := false
+ for entry := range entryChan {
+ hasData = true
+ processEachEntryFn(entry, false)
+ }
+ if err != nil {
+ return fuse.EIO
+ }
+
+ if !hasData {
+ listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, dh.lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
+ return processEachEntryFn(entry, false)
+ })
+ if listErr != nil {
+ glog.Errorf("list meta cache: %v", listErr)
+ return fuse.EIO
+ }
+ }
+
+ if !isEarlyTerminated {
+ dh.isFinished = true
+ }
+
+ return fuse.OK
+}