diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 248 | ||||
| -rw-r--r-- | weed/filesys/dir_link.go | 4 | ||||
| -rw-r--r-- | weed/filesys/dirty_page.go | 2 | ||||
| -rw-r--r-- | weed/filesys/file.go | 144 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 13 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 10 | ||||
| -rw-r--r-- | weed/filesys/xattr.go | 142 |
7 files changed, 394 insertions, 169 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 0e9e92e16..7b24a1ec5 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -14,9 +14,9 @@ import ( ) type Dir struct { - Path string - wfs *WFS - attributes *filer_pb.FuseAttributes + Path string + wfs *WFS + entry *filer_pb.Entry } var _ = fs.Node(&Dir{}) @@ -27,9 +27,15 @@ var _ = fs.HandleReadDirAller(&Dir{}) var _ = fs.NodeRemover(&Dir{}) var _ = fs.NodeRenamer(&Dir{}) var _ = fs.NodeSetattrer(&Dir{}) +var _ = fs.NodeGetxattrer(&Dir{}) +var _ = fs.NodeSetxattrer(&Dir{}) +var _ = fs.NodeRemovexattrer(&Dir{}) +var _ = fs.NodeListxattrer(&Dir{}) func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { + glog.V(3).Infof("dir Attr %s", dir.Path) + // https://github.com/bazil/fuse/issues/196 attr.Valid = time.Second @@ -38,36 +44,28 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { return nil } - item := dir.wfs.listDirectoryEntriesCache.Get(dir.Path) - if item != nil && !item.Expired() { - entry := item.Value().(*filer_pb.Entry) - - attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) - attr.Ctime = time.Unix(entry.Attributes.Crtime, 0) - attr.Mode = os.FileMode(entry.Attributes.FileMode) - attr.Gid = entry.Attributes.Gid - attr.Uid = entry.Attributes.Uid - - return nil - } - - entry, err := filer2.GetEntry(ctx, dir.wfs, dir.Path) - if err != nil { - glog.V(2).Infof("read dir %s attr: %v, error: %v", dir.Path, dir.attributes, err) + if err := dir.maybeLoadEntry(ctx); err != nil { return err } - dir.attributes = entry.Attributes - glog.V(2).Infof("dir %s: %v perm: %v", dir.Path, dir.attributes, os.FileMode(dir.attributes.FileMode)) + attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir + attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0) + attr.Ctime = time.Unix(dir.entry.Attributes.Crtime, 0) + attr.Gid = dir.entry.Attributes.Gid + attr.Uid = dir.entry.Attributes.Uid + + return nil +} - attr.Mode = os.FileMode(dir.attributes.FileMode) | os.ModeDir +func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - attr.Mtime = time.Unix(dir.attributes.Mtime, 0) - attr.Ctime = time.Unix(dir.attributes.Crtime, 0) - attr.Gid = dir.attributes.Gid - attr.Uid = dir.attributes.Uid + glog.V(4).Infof("dir Getxattr %s", dir.Path) - return nil + if err := dir.maybeLoadEntry(ctx); err != nil { + return err + } + + return getxattr(dir.entry, req, resp) } func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { @@ -101,7 +99,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, Attributes: &filer_pb.FuseAttributes{ Mtime: time.Now().Unix(), Crtime: time.Now().Unix(), - FileMode: uint32(req.Mode), + FileMode: uint32(req.Mode &^ dir.wfs.option.Umask), Uid: req.Uid, Gid: req.Gid, Collection: dir.wfs.option.Collection, @@ -146,7 +144,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err Attributes: &filer_pb.FuseAttributes{ Mtime: time.Now().Unix(), Crtime: time.Now().Unix(), - FileMode: uint32(req.Mode), + FileMode: uint32(req.Mode &^ dir.wfs.option.Umask), Uid: req.Uid, Gid: req.Gid, }, @@ -172,6 +170,8 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { + glog.V(4).Infof("dir Lookup %s: %s", dir.Path, req.Name) + var entry *filer_pb.Entry fullFilePath := path.Join(dir.Path, req.Name) @@ -181,15 +181,21 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. } if entry == nil { + glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) entry, err = filer2.GetEntry(ctx, dir.wfs, fullFilePath) if err != nil { return nil, err } + if entry != nil { + dir.wfs.listDirectoryEntriesCache.Set(fullFilePath, entry, 5*time.Minute) + } + } else { + glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath) } if entry != nil { if entry.IsDirectory { - node = &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs, attributes: entry.Attributes} + node = &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs, entry: entry} } else { node = dir.newFile(req.Name, entry) } @@ -209,52 +215,24 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - err = dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - paginationLimit := 1024 - remaining := dir.wfs.option.DirListingLimit - - lastEntryName := "" - - for remaining >= 0 { - - request := &filer_pb.ListEntriesRequest{ - Directory: dir.Path, - StartFromFileName: lastEntryName, - Limit: uint32(paginationLimit), - } - - glog.V(4).Infof("read directory: %v", request) - resp, err := client.ListEntries(ctx, request) - if err != nil { - glog.V(0).Infof("list %s: %v", dir.Path, err) - return fuse.EIO - } - - cacheTtl := estimatedCacheTtl(len(resp.Entries)) - - for _, entry := range resp.Entries { - if entry.IsDirectory { - dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir} - ret = append(ret, dirent) - } else { - dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File} - ret = append(ret, dirent) - } - dir.wfs.listDirectoryEntriesCache.Set(path.Join(dir.Path, entry.Name), entry, cacheTtl) - lastEntryName = entry.Name - } - - remaining -= len(resp.Entries) + glog.V(3).Infof("dir ReadDirAll %s", dir.Path) - if len(resp.Entries) < paginationLimit { - break - } + cacheTtl := 5 * time.Minute + readErr := filer2.ReadDirAllEntries(ctx, dir.wfs, dir.Path, "", func(entry *filer_pb.Entry, isLast bool) { + if entry.IsDirectory { + dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir} + ret = append(ret, dirent) + } else { + dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File} + ret = append(ret, dirent) } - - return nil + dir.wfs.listDirectoryEntriesCache.Set(path.Join(dir.Path, entry.Name), entry, cacheTtl) }) + if readErr != nil { + glog.V(0).Infof("list %s: %v", dir.Path, err) + return ret, fuse.EIO + } return ret, err } @@ -278,6 +256,8 @@ func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) erro dir.wfs.deleteFileChunks(ctx, entry.Chunks) + dir.wfs.listDirectoryEntriesCache.Delete(path.Join(dir.Path, req.Name)) + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ @@ -293,8 +273,6 @@ func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) erro return fuse.ENOENT } - dir.wfs.listDirectoryEntriesCache.Delete(path.Join(dir.Path, req.Name)) - return nil }) @@ -302,6 +280,8 @@ func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) erro func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error { + dir.wfs.listDirectoryEntriesCache.Delete(path.Join(dir.Path, req.Name)) + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.DeleteEntryRequest{ @@ -317,8 +297,6 @@ func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error return fuse.ENOENT } - dir.wfs.listDirectoryEntriesCache.Delete(path.Join(dir.Path, req.Name)) - return nil }) @@ -326,66 +304,118 @@ func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - if dir.attributes == nil { - return nil + if err := dir.maybeLoadEntry(ctx); err != nil { + return err } glog.V(3).Infof("%v dir setattr %+v, fh=%d", dir.Path, req, req.Handle) if req.Valid.Mode() { - dir.attributes.FileMode = uint32(req.Mode) + dir.entry.Attributes.FileMode = uint32(req.Mode) } if req.Valid.Uid() { - dir.attributes.Uid = req.Uid + dir.entry.Attributes.Uid = req.Uid } if req.Valid.Gid() { - dir.attributes.Gid = req.Gid + dir.entry.Attributes.Gid = req.Gid } if req.Valid.Mtime() { - dir.attributes.Mtime = req.Mtime.Unix() + dir.entry.Attributes.Mtime = req.Mtime.Unix() + } + + dir.wfs.listDirectoryEntriesCache.Delete(dir.Path) + + return dir.saveEntry(ctx) + +} + +func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { + + glog.V(4).Infof("dir Setxattr %s: %s", dir.Path, req.Name) + + if err := dir.maybeLoadEntry(ctx); err != nil { + return err + } + + if err := setxattr(dir.entry, req); err != nil { + return err + } + + dir.wfs.listDirectoryEntriesCache.Delete(dir.Path) + + return dir.saveEntry(ctx) + +} + +func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error { + + glog.V(4).Infof("dir Removexattr %s: %s", dir.Path, req.Name) + + if err := dir.maybeLoadEntry(ctx); err != nil { + return err + } + + if err := removexattr(dir.entry, req); err != nil { + return err + } + + dir.wfs.listDirectoryEntriesCache.Delete(dir.Path) + + return dir.saveEntry(ctx) + +} + +func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { + + glog.V(4).Infof("dir Listxattr %s", dir.Path) + + if err := dir.maybeLoadEntry(ctx); err != nil { + return err + } + + if err := listxattr(dir.entry, req, resp); err != nil { + return err } + return nil + +} + +func (dir *Dir) maybeLoadEntry(ctx context.Context) error { + if dir.entry == nil { + parentDirPath, name := filer2.FullPath(dir.Path).DirAndName() + entry, err := dir.wfs.maybeLoadEntry(ctx, parentDirPath, name) + if err != nil { + return err + } + if entry == nil { + return fuse.ENOENT + } + dir.entry = entry + } + return nil +} + +func (dir *Dir) saveEntry(ctx context.Context) error { + parentDir, name := filer2.FullPath(dir.Path).DirAndName() + return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: parentDir, - Entry: &filer_pb.Entry{ - Name: name, - Attributes: dir.attributes, - }, + Entry: dir.entry, } - glog.V(1).Infof("set attr directory entry: %v", request) + glog.V(1).Infof("save dir entry: %v", request) _, err := client.UpdateEntry(ctx, request) if err != nil { - glog.V(0).Infof("UpdateEntry %s: %v", dir.Path, err) + glog.V(0).Infof("UpdateEntry dir %s/%s: %v", parentDir, name, err) return fuse.EIO } - dir.wfs.listDirectoryEntriesCache.Delete(dir.Path) - return nil }) - -} - -func estimatedCacheTtl(numEntries int) time.Duration { - if numEntries < 100 { - // 30 ms per entry - return 3 * time.Second - } - if numEntries < 1000 { - // 10 ms per entry - return 10 * time.Second - } - if numEntries < 10000 { - // 10 ms per entry - return 100 * time.Second - } - - // 2 ms per entry - return time.Duration(numEntries*2) * time.Millisecond } diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index 92cf04d58..8e60872d3 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -27,7 +27,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, Attributes: &filer_pb.FuseAttributes{ Mtime: time.Now().Unix(), Crtime: time.Now().Unix(), - FileMode: uint32(os.FileMode(0755) | os.ModeSymlink), + FileMode: uint32((os.FileMode(0777) | os.ModeSymlink) &^ dir.wfs.option.Umask), Uid: req.Uid, Gid: req.Gid, SymlinkTarget: req.Target, @@ -51,7 +51,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (string, error) { - if err := file.maybeLoadAttributes(ctx); err != nil { + if err := file.maybeLoadEntry(ctx); err != nil { return "", err } diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index baee412b2..35d8f249a 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -192,7 +192,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) bufReader := bytes.NewReader(buf) - uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, auth) + uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "", nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) return nil, fmt.Errorf("upload data: %v", err) diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 1b359ebbe..afe78ee0f 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -20,6 +20,10 @@ var _ = fs.Node(&File{}) var _ = fs.NodeOpener(&File{}) var _ = fs.NodeFsyncer(&File{}) var _ = fs.NodeSetattrer(&File{}) +var _ = fs.NodeGetxattrer(&File{}) +var _ = fs.NodeSetxattrer(&File{}) +var _ = fs.NodeRemovexattrer(&File{}) +var _ = fs.NodeListxattrer(&File{}) type File struct { Name string @@ -36,7 +40,9 @@ func (file *File) fullpath() string { func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { - if err := file.maybeLoadAttributes(ctx); err != nil { + glog.V(4).Infof("file Attr %s", file.fullpath()) + + if err := file.maybeLoadEntry(ctx); err != nil { return err } @@ -52,9 +58,20 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { } +func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { + + glog.V(4).Infof("file Getxattr %s", file.fullpath()) + + if err := file.maybeLoadEntry(ctx); err != nil { + return err + } + + return getxattr(file.entry, req, resp) +} + func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { - glog.V(3).Infof("%v file open %+v", file.fullpath(), req) + glog.V(4).Infof("file %v open %+v", file.fullpath(), req) file.isOpen = true @@ -70,7 +87,7 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - if err := file.maybeLoadAttributes(ctx); err != nil { + if err := file.maybeLoadEntry(ctx); err != nil { return err } @@ -109,66 +126,80 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f return nil } - return file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + file.wfs.listDirectoryEntriesCache.Delete(file.fullpath()) - request := &filer_pb.UpdateEntryRequest{ - Directory: file.dir.Path, - Entry: file.entry, - } + return file.saveEntry(ctx) - glog.V(1).Infof("set attr file entry: %v", request) - _, err := client.UpdateEntry(ctx, request) - if err != nil { - glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.Path, file.Name, err) - return fuse.EIO - } +} - return nil - }) +func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { + + glog.V(4).Infof("file Setxattr %s: %s", file.fullpath(), req.Name) + + if err := file.maybeLoadEntry(ctx); err != nil { + return err + } + + if err := setxattr(file.entry, req); err != nil { + return err + } + + file.wfs.listDirectoryEntriesCache.Delete(file.fullpath()) + + return file.saveEntry(ctx) } -func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { - // fsync works at OS level - // write the file chunks to the filerGrpcAddress - glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req) +func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error { + + glog.V(4).Infof("file Removexattr %s: %s", file.fullpath(), req.Name) + + if err := file.maybeLoadEntry(ctx); err != nil { + return err + } + + if err := removexattr(file.entry, req); err != nil { + return err + } + + file.wfs.listDirectoryEntriesCache.Delete(file.fullpath()) + + return file.saveEntry(ctx) - return nil } -func (file *File) maybeLoadAttributes(ctx context.Context) error { - if file.entry == nil || !file.isOpen { - item := file.wfs.listDirectoryEntriesCache.Get(file.fullpath()) - if item != nil && !item.Expired() { - entry := item.Value().(*filer_pb.Entry) - file.setEntry(entry) - // glog.V(1).Infof("file attr read cached %v attributes", file.Name) - } else { - err := file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { +func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { - request := &filer_pb.LookupDirectoryEntryRequest{ - Name: file.Name, - Directory: file.dir.Path, - } + glog.V(4).Infof("file Listxattr %s", file.fullpath()) - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - glog.V(3).Infof("file attr read file %v: %v", request, err) - return fuse.ENOENT - } + if err := file.maybeLoadEntry(ctx); err != nil { + return err + } + + if err := listxattr(file.entry, req, resp); err != nil { + return err + } - file.setEntry(resp.Entry) + return nil - glog.V(3).Infof("file attr %v %+v: %d", file.fullpath(), file.entry.Attributes, filer2.TotalSize(file.entry.Chunks)) +} - // file.wfs.listDirectoryEntriesCache.Set(file.fullpath(), file.entry, file.wfs.option.EntryCacheTtl) +func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { + // fsync works at OS level + // write the file chunks to the filerGrpcAddress + glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req) - return nil - }) + return nil +} - if err != nil { - return err - } +func (file *File) maybeLoadEntry(ctx context.Context) error { + if file.entry == nil || !file.isOpen { + entry, err := file.wfs.maybeLoadEntry(ctx, file.dir.Path, file.Name) + if err != nil { + return err + } + if entry != nil { + file.setEntry(entry) } } return nil @@ -203,3 +234,22 @@ func (file *File) setEntry(entry *filer_pb.Entry) { file.entry = entry file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks) } + +func (file *File) saveEntry(ctx context.Context) error { + return file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.UpdateEntryRequest{ + Directory: file.dir.Path, + Entry: file.entry, + } + + glog.V(1).Infof("save file entry: %v", request) + _, err := client.UpdateEntry(ctx, request) + if err != nil { + glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.Path, file.Name, err) + return fuse.EIO + } + + return nil + }) +} diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index ceec50e13..101f5c056 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -92,18 +92,19 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f if req.Offset == 0 { // detect mime type - var possibleExt string - fh.contentType, possibleExt = mimetype.Detect(req.Data) - if ext := path.Ext(fh.f.Name); ext != possibleExt { + detectedMIME := mimetype.Detect(req.Data) + fh.contentType = detectedMIME.String() + if ext := path.Ext(fh.f.Name); ext != detectedMIME.Extension() { fh.contentType = mime.TypeByExtension(ext) } fh.dirtyMetadata = true } - fh.f.addChunks(chunks) - if len(chunks) > 0 { + + fh.f.addChunks(chunks) + fh.dirtyMetadata = true } @@ -148,7 +149,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { fh.f.entry.Attributes.Gid = req.Gid fh.f.entry.Attributes.Mtime = time.Now().Unix() fh.f.entry.Attributes.Crtime = time.Now().Unix() - fh.f.entry.Attributes.FileMode = uint32(0770) + fh.f.entry.Attributes.FileMode = uint32(0777 &^ fh.f.wfs.option.Umask) } request := &filer_pb.CreateEntryRequest{ diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 9018c36ed..e924783ec 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -8,13 +8,14 @@ import ( "sync" "time" + "github.com/karlseguin/ccache" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/karlseguin/ccache" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" - "google.golang.org/grpc" ) type Option struct { @@ -26,8 +27,9 @@ type Option struct { TtlSec int32 ChunkSizeLimit int64 DataCenter string - DirListingLimit int + DirListCacheLimit int64 EntryCacheTtl time.Duration + Umask os.FileMode MountUid uint32 MountGid uint32 @@ -59,7 +61,7 @@ type statsCache struct { func NewSeaweedFileSystem(option *Option) *WFS { wfs := &WFS{ option: option, - listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(1024 * 8).ItemsToPrune(100)), + listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)), pathToHandleIndex: make(map[string]int), bufPool: sync.Pool{ New: func() interface{} { diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go new file mode 100644 index 000000000..3c0ba164a --- /dev/null +++ b/weed/filesys/xattr.go @@ -0,0 +1,142 @@ +package filesys + +import ( + "context" + "path/filepath" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/fuse" +) + +func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { + + if entry == nil { + return fuse.ErrNoXattr + } + if entry.Extended == nil { + return fuse.ErrNoXattr + } + data, found := entry.Extended[req.Name] + if !found { + return fuse.ErrNoXattr + } + if req.Position < uint32(len(data)) { + size := req.Size + if req.Position+size >= uint32(len(data)) { + size = uint32(len(data)) - req.Position + } + if size == 0 { + resp.Xattr = data[req.Position:] + } else { + resp.Xattr = data[req.Position : req.Position+size] + } + } + + return nil + +} + +func setxattr(entry *filer_pb.Entry, req *fuse.SetxattrRequest) error { + + if entry == nil { + return fuse.EIO + } + + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + data, _ := entry.Extended[req.Name] + + newData := make([]byte, int(req.Position)+len(req.Xattr)) + + copy(newData, data) + + copy(newData[int(req.Position):], req.Xattr) + + entry.Extended[req.Name] = newData + + return nil + +} + +func removexattr(entry *filer_pb.Entry, req *fuse.RemovexattrRequest) error { + + if entry == nil { + return fuse.ErrNoXattr + } + + if entry.Extended == nil { + return fuse.ErrNoXattr + } + + _, found := entry.Extended[req.Name] + + if !found { + return fuse.ErrNoXattr + } + + delete(entry.Extended, req.Name) + + return nil + +} + +func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { + + if entry == nil { + return fuse.EIO + } + + for k := range entry.Extended { + resp.Append(k) + } + + size := req.Size + if req.Position+size >= uint32(len(resp.Xattr)) { + size = uint32(len(resp.Xattr)) - req.Position + } + + if size == 0 { + resp.Xattr = resp.Xattr[req.Position:] + } else { + resp.Xattr = resp.Xattr[req.Position : req.Position+size] + } + + return nil + +} + +func (wfs *WFS) maybeLoadEntry(ctx context.Context, dir, name string) (entry *filer_pb.Entry, err error) { + + fullpath := filepath.Join(dir, name) + item := wfs.listDirectoryEntriesCache.Get(fullpath) + if item != nil && !item.Expired() { + entry = item.Value().(*filer_pb.Entry) + return + } + glog.V(3).Infof("read entry cache miss %s", fullpath) + + err = wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.LookupDirectoryEntryRequest{ + Name: name, + Directory: dir, + } + + resp, err := client.LookupDirectoryEntry(ctx, request) + if err != nil { + glog.V(3).Infof("file attr read file %v: %v", request, err) + return fuse.ENOENT + } + + entry = resp.Entry + if entry != nil { + wfs.listDirectoryEntriesCache.Set(fullpath, entry, wfs.option.EntryCacheTtl) + } + + return nil + }) + + return +} |
