diff options
Diffstat (limited to 'weed/filesys/dir.go')
| -rw-r--r-- | weed/filesys/dir.go | 551 |
1 files changed, 367 insertions, 184 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 7b24a1ec5..6ee20974b 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -1,27 +1,39 @@ package filesys import ( + "bytes" "context" + "math" "os" - "path" + "strings" + "syscall" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) type Dir struct { - Path string - wfs *WFS - entry *filer_pb.Entry + name string + wfs *WFS + entry *filer_pb.Entry + parent *Dir + id uint64 } var _ = fs.Node(&Dir{}) + +//var _ = fs.NodeIdentifier(&Dir{}) var _ = fs.NodeCreater(&Dir{}) +var _ = fs.NodeMknoder(&Dir{}) var _ = fs.NodeMkdirer(&Dir{}) +var _ = fs.NodeFsyncer(&Dir{}) var _ = fs.NodeRequestLookuper(&Dir{}) var _ = fs.HandleReadDirAller(&Dir{}) var _ = fs.NodeRemover(&Dir{}) @@ -31,44 +43,57 @@ var _ = fs.NodeGetxattrer(&Dir{}) var _ = fs.NodeSetxattrer(&Dir{}) var _ = fs.NodeRemovexattrer(&Dir{}) var _ = fs.NodeListxattrer(&Dir{}) +var _ = fs.NodeForgetter(&Dir{}) -func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { +func (dir *Dir) xId() uint64 { + return dir.id +} - glog.V(3).Infof("dir Attr %s", dir.Path) +func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { // https://github.com/bazil/fuse/issues/196 attr.Valid = time.Second - if dir.Path == dir.wfs.option.FilerMountRootPath { + if dir.FullPath() == dir.wfs.option.FilerMountRootPath { dir.setRootDirAttributes(attr) + glog.V(3).Infof("root dir Attr %s, attr: %+v", dir.FullPath(), attr) return nil } - if err := dir.maybeLoadEntry(ctx); err != nil { + entry, err := dir.maybeLoadEntry() + if err != nil { + glog.V(3).Infof("dir Attr %s,err: %+v", dir.FullPath(), err) return err } - attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir - attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0) - attr.Ctime = time.Unix(dir.entry.Attributes.Crtime, 0) - attr.Gid = dir.entry.Attributes.Gid - attr.Uid = dir.entry.Attributes.Uid + // attr.Inode = dir.Id() + attr.Mode = os.FileMode(entry.Attributes.FileMode) | os.ModeDir + attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) + attr.Crtime = time.Unix(entry.Attributes.Crtime, 0) + attr.Gid = entry.Attributes.Gid + attr.Uid = entry.Attributes.Uid + + glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) return nil } func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - glog.V(4).Infof("dir Getxattr %s", dir.Path) + glog.V(4).Infof("dir Getxattr %s", dir.FullPath()) - if err := dir.maybeLoadEntry(ctx); err != nil { + entry, err := dir.maybeLoadEntry() + if err != nil { return err } - return getxattr(dir.entry, req, resp) + return getxattr(entry, req, resp) } func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { + // attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode() + attr.Valid = time.Second + attr.Inode = 1 // dir.Id() attr.Uid = dir.wfs.option.MountUid attr.Gid = dir.wfs.option.MountGid attr.Mode = dir.wfs.option.MountMode @@ -76,84 +101,178 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { attr.Ctime = dir.wfs.option.MountCtime attr.Mtime = dir.wfs.option.MountMtime attr.Atime = dir.wfs.option.MountMtime + attr.BlockSize = blockSize +} + +func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { + // fsync works at OS level + // write the file chunks to the filerGrpcAddress + glog.V(3).Infof("dir %s fsync %+v", dir.FullPath(), req) + + return nil } -func (dir *Dir) newFile(name string, entry *filer_pb.Entry) *File { +func (dir *Dir) newFile(name string) fs.Node { + + fileFullPath := util.NewFullPath(dir.FullPath(), name) + fileId := fileFullPath.AsInode() + dir.wfs.handlesLock.Lock() + existingHandle, found := dir.wfs.handles[fileId] + dir.wfs.handlesLock.Unlock() + + if found { + glog.V(4).Infof("newFile found opened file handle: %+v", fileFullPath) + return existingHandle.f + } return &File{ - Name: name, - dir: dir, - wfs: dir.wfs, - entry: entry, - entryViewCache: nil, + Name: name, + dir: dir, + wfs: dir.wfs, + id: fileId, } } +func (dir *Dir) newDirectory(fullpath util.FullPath) fs.Node { + + return &Dir{name: fullpath.Name(), wfs: dir.wfs, parent: dir, id: fullpath.AsInode()} + +} + func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) { + exclusive := req.Flags&fuse.OpenExclusive != 0 + isDirectory := req.Mode&os.ModeDir > 0 + + if exclusive || isDirectory { + _, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, exclusive) + if err != nil { + return nil, nil, err + } + } + var node fs.Node + if isDirectory { + node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name)) + return node, nil, nil + } + + node = dir.newFile(req.Name) + file := node.(*File) + file.entry = &filer_pb.Entry{ + Name: req.Name, + IsDirectory: req.Mode&os.ModeDir > 0, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(req.Mode &^ dir.wfs.option.Umask), + Uid: req.Uid, + Gid: req.Gid, + Collection: dir.wfs.option.Collection, + Replication: dir.wfs.option.Replication, + TtlSec: dir.wfs.option.TtlSec, + }, + } + file.dirtyMetadata = true + fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid) + return file, fh, nil + +} + +func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) { + + _, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, false) + + if err != nil { + return nil, err + } + var node fs.Node + node = dir.newFile(req.Name) + return node, nil +} + +func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, exclusive bool) (*filer_pb.CreateEntryRequest, error) { + dirFullPath := dir.FullPath() request := &filer_pb.CreateEntryRequest{ - Directory: dir.Path, + Directory: dirFullPath, Entry: &filer_pb.Entry{ - Name: req.Name, - IsDirectory: req.Mode&os.ModeDir > 0, + Name: name, + IsDirectory: mode&os.ModeDir > 0, Attributes: &filer_pb.FuseAttributes{ Mtime: time.Now().Unix(), Crtime: time.Now().Unix(), - FileMode: uint32(req.Mode &^ dir.wfs.option.Umask), - Uid: req.Uid, - Gid: req.Gid, + FileMode: uint32(mode &^ dir.wfs.option.Umask), + Uid: uid, + Gid: gid, Collection: dir.wfs.option.Collection, Replication: dir.wfs.option.Replication, TtlSec: dir.wfs.option.TtlSec, }, }, + OExcl: exclusive, + Signatures: []int32{dir.wfs.signature}, } - glog.V(1).Infof("create: %v", request) + glog.V(1).Infof("create %s/%s", dirFullPath, name) - if request.Entry.IsDirectory { - if err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - if _, err := client.CreateEntry(ctx, request); err != nil { - glog.V(0).Infof("create %s/%s: %v", dir.Path, req.Name, err) - return fuse.EIO + err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + dir.wfs.mapPbIdFromLocalToFiler(request.Entry) + defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) + + if err := filer_pb.CreateEntry(client, request); err != nil { + if strings.Contains(err.Error(), "EEXIST") { + return fuse.EEXIST } - return nil - }); err != nil { - return nil, nil, err + glog.V(0).Infof("create %s/%s: %v", dirFullPath, name, err) + return fuse.EIO } - } - file := dir.newFile(req.Name, request.Entry) - if !request.Entry.IsDirectory { - file.isOpen = true - } - fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid) - fh.dirtyMetadata = true - return file, fh, nil + if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil { + glog.Errorf("local InsertEntry dir %s/%s: %v", dirFullPath, name, err) + return fuse.EIO + } + return nil + }) + return request, err } func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) { - err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + glog.V(4).Infof("mkdir %s: %s", dir.FullPath(), req.Name) + + newEntry := &filer_pb.Entry{ + Name: req.Name, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(req.Mode &^ dir.wfs.option.Umask), + Uid: req.Uid, + Gid: req.Gid, + }, + } + + dirFullPath := dir.FullPath() + + err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + dir.wfs.mapPbIdFromLocalToFiler(newEntry) + defer dir.wfs.mapPbIdFromFilerToLocal(newEntry) request := &filer_pb.CreateEntryRequest{ - Directory: dir.Path, - Entry: &filer_pb.Entry{ - Name: req.Name, - IsDirectory: true, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(req.Mode &^ dir.wfs.option.Umask), - Uid: req.Uid, - Gid: req.Gid, - }, - }, + Directory: dirFullPath, + Entry: newEntry, + Signatures: []int32{dir.wfs.signature}, } glog.V(1).Infof("mkdir: %v", request) - if _, err := client.CreateEntry(ctx, request); err != nil { - glog.V(0).Infof("mkdir %s/%s: %v", dir.Path, req.Name, err) + if err := filer_pb.CreateEntry(client, request); err != nil { + glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err) + return err + } + + if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil { + glog.Errorf("local mkdir dir %s/%s: %v", dirFullPath, req.Name, err) return fuse.EIO } @@ -161,221 +280,258 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err }) if err == nil { - node := &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs} + node := dir.newDirectory(util.NewFullPath(dirFullPath, req.Name)) + return node, nil } - return nil, err + glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err) + + return nil, fuse.EIO } func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { - glog.V(4).Infof("dir Lookup %s: %s", dir.Path, req.Name) + dirPath := util.FullPath(dir.FullPath()) + glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String()) - var entry *filer_pb.Entry - fullFilePath := path.Join(dir.Path, req.Name) - - item := dir.wfs.listDirectoryEntriesCache.Get(fullFilePath) - if item != nil && !item.Expired() { - entry = item.Value().(*filer_pb.Entry) + fullFilePath := dirPath.Child(req.Name) + visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) + if visitErr != nil { + glog.Errorf("dir Lookup %s: %v", dirPath, visitErr) + return nil, fuse.EIO + } + localEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT } - if entry == nil { - glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) - entry, err = filer2.GetEntry(ctx, dir.wfs, fullFilePath) + if localEntry == nil { + // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) + entry, err := filer_pb.GetEntry(dir.wfs, fullFilePath) if err != nil { - return nil, err - } - if entry != nil { - dir.wfs.listDirectoryEntriesCache.Set(fullFilePath, entry, 5*time.Minute) + glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err) + return nil, fuse.ENOENT } + localEntry = filer.FromPbEntry(string(dirPath), entry) } else { glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath) } - if entry != nil { - if entry.IsDirectory { - node = &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs, entry: entry} + if localEntry != nil { + if localEntry.IsDirectory() { + node = dir.newDirectory(fullFilePath) } else { - node = dir.newFile(req.Name, entry) + node = dir.newFile(req.Name) } - resp.EntryValid = time.Duration(0) - resp.Attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) - resp.Attr.Ctime = time.Unix(entry.Attributes.Crtime, 0) - resp.Attr.Mode = os.FileMode(entry.Attributes.FileMode) - resp.Attr.Gid = entry.Attributes.Gid - resp.Attr.Uid = entry.Attributes.Uid + // resp.EntryValid = time.Second + resp.Attr.Inode = fullFilePath.AsInode() + resp.Attr.Valid = time.Second + resp.Attr.Mtime = localEntry.Attr.Mtime + resp.Attr.Crtime = localEntry.Attr.Crtime + resp.Attr.Mode = localEntry.Attr.Mode + resp.Attr.Gid = localEntry.Attr.Gid + resp.Attr.Uid = localEntry.Attr.Uid + if localEntry.HardLinkCounter > 0 { + resp.Attr.Nlink = uint32(localEntry.HardLinkCounter) + } return node, nil } + glog.V(4).Infof("not found dir GetEntry %s: %v", fullFilePath, err) return nil, fuse.ENOENT } func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - glog.V(3).Infof("dir ReadDirAll %s", dir.Path) + dirPath := util.FullPath(dir.FullPath()) + glog.V(4).Infof("dir ReadDirAll %s", dirPath) - cacheTtl := 5 * time.Minute - - readErr := filer2.ReadDirAllEntries(ctx, dir.wfs, dir.Path, "", func(entry *filer_pb.Entry, isLast bool) { - if entry.IsDirectory { - dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir} + processEachEntryFn := func(entry *filer.Entry, isLast bool) { + if entry.IsDirectory() { + dirent := fuse.Dirent{Name: entry.Name(), Type: fuse.DT_Dir, Inode: dirPath.Child(entry.Name()).AsInode()} ret = append(ret, dirent) } else { - dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File} + dirent := fuse.Dirent{Name: entry.Name(), Type: findFileType(uint16(entry.Attr.Mode)), Inode: dirPath.Child(entry.Name()).AsInode()} ret = append(ret, dirent) } - dir.wfs.listDirectoryEntriesCache.Set(path.Join(dir.Path, entry.Name), entry, cacheTtl) + } + + if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil { + glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) + return nil, fuse.EIO + } + listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool { + processEachEntryFn(entry, false) + return true }) - if readErr != nil { - glog.V(0).Infof("list %s: %v", dir.Path, err) - return ret, fuse.EIO + if listErr != nil { + glog.Errorf("list meta cache: %v", listErr) + return nil, fuse.EIO } + return +} - return ret, err +func findFileType(mode uint16) fuse.DirentType { + switch mode & (syscall.S_IFMT & 0xffff) { + case syscall.S_IFSOCK: + return fuse.DT_Socket + case syscall.S_IFLNK: + return fuse.DT_Link + case syscall.S_IFREG: + return fuse.DT_File + case syscall.S_IFBLK: + return fuse.DT_Block + case syscall.S_IFDIR: + return fuse.DT_Dir + case syscall.S_IFCHR: + return fuse.DT_Char + case syscall.S_IFIFO: + return fuse.DT_FIFO + } + return fuse.DT_File } func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { if !req.Dir { - return dir.removeOneFile(ctx, req) + return dir.removeOneFile(req) } - return dir.removeFolder(ctx, req) + return dir.removeFolder(req) } -func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) error { +func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { - entry, err := filer2.GetEntry(ctx, dir.wfs, path.Join(dir.Path, req.Name)) + dirFullPath := dir.FullPath() + filePath := util.NewFullPath(dirFullPath, req.Name) + entry, err := filer_pb.GetEntry(dir.wfs, filePath) if err != nil { return err } - dir.wfs.deleteFileChunks(ctx, entry.Chunks) - - dir.wfs.listDirectoryEntriesCache.Delete(path.Join(dir.Path, req.Name)) - - return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + // first, ensure the filer store can correctly delete + glog.V(3).Infof("remove file: %v", req) + isDeleteData := entry != nil && entry.HardLinkCounter <= 1 + err = filer_pb.Remove(dir.wfs, dirFullPath, req.Name, isDeleteData, false, false, false, []int32{dir.wfs.signature}) + if err != nil { + glog.V(3).Infof("not found remove file %s: %v", filePath, err) + return fuse.ENOENT + } - request := &filer_pb.DeleteEntryRequest{ - Directory: dir.Path, - Name: req.Name, - IsDeleteData: false, - } + // then, delete meta cache and fsNode cache + if err = dir.wfs.metaCache.DeleteEntry(context.Background(), filePath); err != nil { + glog.V(3).Infof("local DeleteEntry %s: %v", filePath, err) + return fuse.ESTALE + } - glog.V(3).Infof("remove file: %v", request) - _, err := client.DeleteEntry(ctx, request) - if err != nil { - glog.V(3).Infof("remove file %s/%s: %v", dir.Path, req.Name, err) - return fuse.ENOENT - } + // remove current file handle if any + dir.wfs.handlesLock.Lock() + defer dir.wfs.handlesLock.Unlock() + inodeId := filePath.AsInode() + delete(dir.wfs.handles, inodeId) - return nil - }) + return nil } -func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error { - - dir.wfs.listDirectoryEntriesCache.Delete(path.Join(dir.Path, req.Name)) +func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { - return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.DeleteEntryRequest{ - Directory: dir.Path, - Name: req.Name, - IsDeleteData: true, + dirFullPath := dir.FullPath() + glog.V(3).Infof("remove directory entry: %v", req) + ignoreRecursiveErr := true // ignore recursion error since the OS should manage it + err := filer_pb.Remove(dir.wfs, dirFullPath, req.Name, true, true, ignoreRecursiveErr, false, []int32{dir.wfs.signature}) + if err != nil { + glog.V(0).Infof("remove %s/%s: %v", dirFullPath, req.Name, err) + if strings.Contains(err.Error(), "non-empty") { + return fuse.EEXIST } + return fuse.ENOENT + } - glog.V(3).Infof("remove directory entry: %v", request) - _, err := client.DeleteEntry(ctx, request) - if err != nil { - glog.V(3).Infof("remove %s/%s: %v", dir.Path, req.Name, err) - return fuse.ENOENT - } + t := util.NewFullPath(dirFullPath, req.Name) + dir.wfs.metaCache.DeleteEntry(context.Background(), t) - return nil - }) + return nil } func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - if err := dir.maybeLoadEntry(ctx); err != nil { + glog.V(4).Infof("%v dir setattr %+v", dir.FullPath(), req) + + entry, err := dir.maybeLoadEntry() + if err != nil { return err } - glog.V(3).Infof("%v dir setattr %+v, fh=%d", dir.Path, req, req.Handle) if req.Valid.Mode() { - dir.entry.Attributes.FileMode = uint32(req.Mode) + entry.Attributes.FileMode = uint32(req.Mode) } if req.Valid.Uid() { - dir.entry.Attributes.Uid = req.Uid + entry.Attributes.Uid = req.Uid } if req.Valid.Gid() { - dir.entry.Attributes.Gid = req.Gid + entry.Attributes.Gid = req.Gid } if req.Valid.Mtime() { - dir.entry.Attributes.Mtime = req.Mtime.Unix() + entry.Attributes.Mtime = req.Mtime.Unix() } - dir.wfs.listDirectoryEntriesCache.Delete(dir.Path) - - return dir.saveEntry(ctx) + return dir.saveEntry(entry) } func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { - glog.V(4).Infof("dir Setxattr %s: %s", dir.Path, req.Name) + glog.V(4).Infof("dir Setxattr %s: %s", dir.FullPath(), req.Name) - if err := dir.maybeLoadEntry(ctx); err != nil { + entry, err := dir.maybeLoadEntry() + if err != nil { return err } - if err := setxattr(dir.entry, req); err != nil { + if err := setxattr(entry, req); err != nil { return err } - dir.wfs.listDirectoryEntriesCache.Delete(dir.Path) - - return dir.saveEntry(ctx) + return dir.saveEntry(entry) } func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error { - glog.V(4).Infof("dir Removexattr %s: %s", dir.Path, req.Name) + glog.V(4).Infof("dir Removexattr %s: %s", dir.FullPath(), req.Name) - if err := dir.maybeLoadEntry(ctx); err != nil { + entry, err := dir.maybeLoadEntry() + if err != nil { return err } - if err := removexattr(dir.entry, req); err != nil { + if err := removexattr(entry, req); err != nil { return err } - dir.wfs.listDirectoryEntriesCache.Delete(dir.Path) - - return dir.saveEntry(ctx) + return dir.saveEntry(entry) } func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { - glog.V(4).Infof("dir Listxattr %s", dir.Path) + glog.V(4).Infof("dir Listxattr %s", dir.FullPath()) - if err := dir.maybeLoadEntry(ctx); err != nil { + entry, err := dir.maybeLoadEntry() + if err != nil { return err } - if err := listxattr(dir.entry, req, resp); err != nil { + if err := listxattr(entry, req, resp); err != nil { return err } @@ -383,39 +539,66 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp } -func (dir *Dir) maybeLoadEntry(ctx context.Context) error { - if dir.entry == nil { - parentDirPath, name := filer2.FullPath(dir.Path).DirAndName() - entry, err := dir.wfs.maybeLoadEntry(ctx, parentDirPath, name) - if err != nil { - return err - } - if entry == nil { - return fuse.ENOENT - } - dir.entry = entry - } - return nil +func (dir *Dir) Forget() { + glog.V(4).Infof("Forget dir %s", dir.FullPath()) } -func (dir *Dir) saveEntry(ctx context.Context) error { +func (dir *Dir) maybeLoadEntry() (*filer_pb.Entry, error) { + parentDirPath, name := util.FullPath(dir.FullPath()).DirAndName() + return dir.wfs.maybeLoadEntry(parentDirPath, name) +} + +func (dir *Dir) saveEntry(entry *filer_pb.Entry) error { - parentDir, name := filer2.FullPath(dir.Path).DirAndName() + parentDir, name := util.FullPath(dir.FullPath()).DirAndName() - return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + dir.wfs.mapPbIdFromLocalToFiler(entry) + defer dir.wfs.mapPbIdFromFilerToLocal(entry) request := &filer_pb.UpdateEntryRequest{ - Directory: parentDir, - Entry: dir.entry, + Directory: parentDir, + Entry: entry, + Signatures: []int32{dir.wfs.signature}, } glog.V(1).Infof("save dir entry: %v", request) - _, err := client.UpdateEntry(ctx, request) + _, err := client.UpdateEntry(context.Background(), request) if err != nil { - glog.V(0).Infof("UpdateEntry dir %s/%s: %v", parentDir, name, err) + glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err) return fuse.EIO } + if err := dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil { + glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err) + return fuse.ESTALE + } + return nil }) } + +func (dir *Dir) FullPath() string { + var parts []string + for p := dir; p != nil; p = p.parent { + if strings.HasPrefix(p.name, "/") { + if len(p.name) > 1 { + parts = append(parts, p.name[1:]) + } + } else { + parts = append(parts, p.name) + } + } + + if len(parts) == 0 { + return "/" + } + + var buf bytes.Buffer + for i := len(parts) - 1; i >= 0; i-- { + buf.WriteString("/") + buf.WriteString(parts[i]) + } + return buf.String() +} |
