diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 43 | ||||
| -rw-r--r-- | weed/filesys/dir_rename.go | 18 | ||||
| -rw-r--r-- | weed/filesys/file.go | 21 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 25 | ||||
| -rw-r--r-- | weed/filesys/xattr.go | 16 |
5 files changed, 65 insertions, 58 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index ac6291319..076252051 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -9,7 +9,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" ) @@ -51,7 +50,7 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { return err } - attr.Inode = uint64(util.HashStringToLong(dir.Path)) + attr.Inode = filer2.FullPath(dir.Path).AsInode() attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0) attr.Ctime = time.Unix(dir.entry.Attributes.Crtime, 0) @@ -75,7 +74,7 @@ func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *f } func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { - attr.Inode = uint64(util.HashStringToLong(dir.Path)) + attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode() attr.Valid = time.Hour attr.Uid = dir.wfs.option.MountUid attr.Gid = dir.wfs.option.MountGid @@ -181,13 +180,8 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. glog.V(4).Infof("dir Lookup %s: %s", dir.Path, req.Name) - var entry *filer_pb.Entry - fullFilePath := string(filer2.NewFullPath(dir.Path, req.Name)) - - item := dir.wfs.listDirectoryEntriesCache.Get(fullFilePath) - if item != nil && !item.Expired() { - entry = item.Value().(*filer_pb.Entry) - } + fullFilePath := filer2.NewFullPath(dir.Path, req.Name) + entry := dir.wfs.cacheGet(fullFilePath) if entry == nil { glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) @@ -196,22 +190,20 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err) return nil, fuse.ENOENT } - if entry != nil { - dir.wfs.listDirectoryEntriesCache.Set(fullFilePath, entry, 5*time.Minute) - } + dir.wfs.cacheSet(fullFilePath, entry, 5*time.Minute) } else { glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath) } if entry != nil { if entry.IsDirectory { - node = &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs, entry: entry} + node = &Dir{Path: string(fullFilePath), wfs: dir.wfs, entry: entry} } else { node = dir.newFile(req.Name, entry) } resp.EntryValid = time.Second - resp.Attr.Inode = uint64(util.HashStringToLong(fullFilePath)) + resp.Attr.Inode = fullFilePath.AsInode() resp.Attr.Valid = time.Second resp.Attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) resp.Attr.Ctime = time.Unix(entry.Attributes.Crtime, 0) @@ -232,9 +224,9 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { cacheTtl := 5 * time.Minute - readErr := filer2.ReadDirAllEntries(ctx, dir.wfs, dir.Path, "", func(entry *filer_pb.Entry, isLast bool) { - fullpath := string(filer2.NewFullPath(dir.Path, entry.Name)) - inode := uint64(util.HashStringToLong(fullpath)) + readErr := filer2.ReadDirAllEntries(ctx, dir.wfs, filer2.FullPath(dir.Path), "", func(entry *filer_pb.Entry, isLast bool) { + fullpath := filer2.NewFullPath(dir.Path, entry.Name) + inode := fullpath.AsInode() if entry.IsDirectory { dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_Dir} ret = append(ret, dirent) @@ -242,7 +234,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_File} ret = append(ret, dirent) } - dir.wfs.listDirectoryEntriesCache.Set(fullpath, entry, cacheTtl) + dir.wfs.cacheSet(fullpath, entry, cacheTtl) }) if readErr != nil { glog.V(0).Infof("list %s: %v", dir.Path, err) @@ -264,14 +256,15 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) error { - entry, err := filer2.GetEntry(ctx, dir.wfs, path.Join(dir.Path, req.Name)) + filePath := filer2.NewFullPath(dir.Path, req.Name) + entry, err := filer2.GetEntry(ctx, dir.wfs, filePath) if err != nil { return err } dir.wfs.deleteFileChunks(ctx, entry.Chunks) - dir.wfs.listDirectoryEntriesCache.Delete(path.Join(dir.Path, req.Name)) + dir.wfs.cacheDelete(filePath) return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { @@ -295,7 +288,7 @@ func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) erro func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error { - dir.wfs.listDirectoryEntriesCache.Delete(string(filer2.NewFullPath(dir.Path, req.Name))) + dir.wfs.cacheDelete(filer2.NewFullPath(dir.Path, req.Name)) return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { @@ -340,7 +333,7 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus dir.entry.Attributes.Mtime = req.Mtime.Unix() } - dir.wfs.listDirectoryEntriesCache.Delete(dir.Path) + dir.wfs.cacheDelete(filer2.FullPath(dir.Path)) return dir.saveEntry(ctx) @@ -358,7 +351,7 @@ func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { return err } - dir.wfs.listDirectoryEntriesCache.Delete(dir.Path) + dir.wfs.cacheDelete(filer2.FullPath(dir.Path)) return dir.saveEntry(ctx) @@ -376,7 +369,7 @@ func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) e return err } - dir.wfs.listDirectoryEntriesCache.Delete(dir.Path) + dir.wfs.cacheDelete(filer2.FullPath(dir.Path)) return dir.saveEntry(ctx) diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 8309b238a..6b68e4ee9 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -2,7 +2,6 @@ package filesys import ( "context" - "fmt" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" @@ -16,7 +15,10 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector newDir := newDirectory.(*Dir) glog.V(4).Infof("dir Rename %s/%s => %s/%s", dir.Path, req.OldName, newDir.Path, req.NewName) - err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + dir.wfs.cacheDelete(filer2.NewFullPath(newDir.Path, req.NewName)) + dir.wfs.cacheDelete(filer2.NewFullPath(dir.Path, req.OldName)) + + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: dir.Path, @@ -27,19 +29,11 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector _, err := client.AtomicRenameEntry(ctx, request) if err != nil { - return fmt.Errorf("renaming %s/%s => %s/%s: %v", dir.Path, req.OldName, newDir.Path, req.NewName, err) + glog.V(0).Infof("dir Rename %s/%s => %s/%s : %v", dir.Path, req.OldName, newDir.Path, req.NewName, err) + return fuse.EIO } return nil }) - - if err == nil { - oldpath := string(filer2.NewFullPath(dir.Path, req.OldName)) - newpath := string(filer2.NewFullPath(newDir.Path, req.NewName)) - dir.wfs.listDirectoryEntriesCache.Delete(oldpath) - dir.wfs.listDirectoryEntriesCache.Delete(newpath) - } - - return err } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index ca0550a87..7e562eabc 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -3,14 +3,12 @@ package filesys import ( "context" "os" - "path/filepath" "sort" "time" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" ) @@ -35,19 +33,22 @@ type File struct { isOpen bool } -func (file *File) fullpath() string { - return filepath.Join(file.dir.Path, file.Name) +func (file *File) fullpath() filer2.FullPath { + return filer2.NewFullPath(file.dir.Path, file.Name) } func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr) - if err := file.maybeLoadEntry(ctx); err != nil { - return err + if !file.isOpen { + if err := file.maybeLoadEntry(ctx); err != nil { + return err + } } - attr.Inode = uint64(util.HashStringToLong(file.fullpath())) + attr.Inode = file.fullpath().AsInode() + attr.Valid = time.Second attr.Mode = os.FileMode(file.entry.Attributes.FileMode) attr.Size = filer2.TotalSize(file.entry.Chunks) if file.isOpen { @@ -132,7 +133,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f return nil } - file.wfs.listDirectoryEntriesCache.Delete(file.fullpath()) + file.wfs.cacheDelete(file.fullpath()) return file.saveEntry(ctx) @@ -150,7 +151,7 @@ func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error return err } - file.wfs.listDirectoryEntriesCache.Delete(file.fullpath()) + file.wfs.cacheDelete(file.fullpath()) return file.saveEntry(ctx) @@ -168,7 +169,7 @@ func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) return err } - file.wfs.listDirectoryEntriesCache.Delete(file.fullpath()) + file.wfs.cacheDelete(file.fullpath()) return file.saveEntry(ctx) diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index e924783ec..d3cc6329d 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -11,6 +11,7 @@ import ( "github.com/karlseguin/ccache" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -47,7 +48,7 @@ type WFS struct { // contains all open handles handles []*FileHandle - pathToHandleIndex map[string]int + pathToHandleIndex map[filer2.FullPath]int pathToHandleLock sync.Mutex bufPool sync.Pool @@ -62,7 +63,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs := &WFS{ option: option, listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)), - pathToHandleIndex: make(map[string]int), + pathToHandleIndex: make(map[filer2.FullPath]int), bufPool: sync.Pool{ New: func() interface{} { return make([]byte, option.ChunkSizeLimit) @@ -117,7 +118,7 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand return } -func (wfs *WFS) ReleaseHandle(fullpath string, handleId fuse.HandleID) { +func (wfs *WFS) ReleaseHandle(fullpath filer2.FullPath, handleId fuse.HandleID) { wfs.pathToHandleLock.Lock() defer wfs.pathToHandleLock.Unlock() @@ -191,3 +192,21 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. return nil } + +func (wfs *WFS) cacheGet(path filer2.FullPath) *filer_pb.Entry { + item := wfs.listDirectoryEntriesCache.Get(string(path)) + if item != nil && !item.Expired() { + return item.Value().(*filer_pb.Entry) + } + return nil +} +func (wfs *WFS) cacheSet(path filer2.FullPath, entry *filer_pb.Entry, ttl time.Duration) { + if entry == nil { + wfs.listDirectoryEntriesCache.Delete(string(path)) + }else{ + wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl) + } +} +func (wfs *WFS) cacheDelete(path filer2.FullPath) { + wfs.listDirectoryEntriesCache.Delete(string(path)) +} diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go index a81f74638..52a447d95 100644 --- a/weed/filesys/xattr.go +++ b/weed/filesys/xattr.go @@ -2,6 +2,7 @@ package filesys import ( "context" + "strings" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" @@ -109,13 +110,12 @@ func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.Lis func (wfs *WFS) maybeLoadEntry(ctx context.Context, dir, name string) (entry *filer_pb.Entry, err error) { - fullpath := string(filer2.NewFullPath(dir, name)) - item := wfs.listDirectoryEntriesCache.Get(fullpath) - if item != nil && !item.Expired() { - entry = item.Value().(*filer_pb.Entry) + fullpath := filer2.NewFullPath(dir, name) + entry = wfs.cacheGet(fullpath) + if entry != nil { return } - // glog.V(3).Infof("read entry cache miss %s", fullpath) + glog.V(3).Infof("read entry cache miss %s", fullpath) err = wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { @@ -126,16 +126,16 @@ func (wfs *WFS) maybeLoadEntry(ctx context.Context, dir, name string) (entry *fi resp, err := client.LookupDirectoryEntry(ctx, request) if err != nil || resp == nil || resp.Entry == nil { - if err == filer2.ErrNotFound { + if err == filer2.ErrNotFound || strings.Contains(err.Error(), filer2.ErrNotFound.Error()) { glog.V(3).Infof("file attr read not found file %v: %v", request, err) return fuse.ENOENT } glog.V(3).Infof("file attr read file %v: %v", request, err) - return fuse.ENOENT + return fuse.EIO } entry = resp.Entry - wfs.listDirectoryEntriesCache.Set(fullpath, entry, wfs.option.EntryCacheTtl) + wfs.cacheSet(fullpath, entry, wfs.option.EntryCacheTtl) return nil }) |
