diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 263 | ||||
| -rw-r--r-- | weed/filesys/dir_link.go | 18 | ||||
| -rw-r--r-- | weed/filesys/dir_rename.go | 38 | ||||
| -rw-r--r-- | weed/filesys/dir_test.go | 34 | ||||
| -rw-r--r-- | weed/filesys/dirty_page.go | 79 | ||||
| -rw-r--r-- | weed/filesys/dirty_page_interval_test.go | 17 | ||||
| -rw-r--r-- | weed/filesys/file.go | 45 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 78 | ||||
| -rw-r--r-- | weed/filesys/fscache.go | 207 | ||||
| -rw-r--r-- | weed/filesys/fscache_test.go | 96 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/cache_config.go | 32 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache.go | 93 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache_init.go | 21 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache_subscribe.go | 69 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 133 | ||||
| -rw-r--r-- | weed/filesys/wfs_deletion.go | 15 | ||||
| -rw-r--r-- | weed/filesys/xattr.go | 24 |
17 files changed, 965 insertions, 297 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index abe5a21a6..e4260d56f 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -1,6 +1,7 @@ package filesys import ( + "bytes" "context" "os" "strings" @@ -9,14 +10,16 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" ) type Dir struct { - Path string - wfs *WFS - entry *filer_pb.Entry + name string + wfs *WFS + entry *filer_pb.Entry + parent *Dir } var _ = fs.Node(&Dir{}) @@ -35,39 +38,37 @@ var _ = fs.NodeForgetter(&Dir{}) func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { - glog.V(3).Infof("dir Attr %s, existing attr: %+v", dir.Path, attr) - // https://github.com/bazil/fuse/issues/196 attr.Valid = time.Second - if dir.Path == dir.wfs.option.FilerMountRootPath { + if dir.FullPath() == dir.wfs.option.FilerMountRootPath { dir.setRootDirAttributes(attr) - glog.V(3).Infof("root dir Attr %s, attr: %+v", dir.Path, attr) + glog.V(3).Infof("root dir Attr %s, attr: %+v", dir.FullPath(), attr) return nil } - if err := dir.maybeLoadEntry(ctx); err != nil { - glog.V(3).Infof("dir Attr %s,err: %+v", dir.Path, err) + if err := dir.maybeLoadEntry(); err != nil { + glog.V(3).Infof("dir Attr %s,err: %+v", dir.FullPath(), err) return err } - attr.Inode = filer2.FullPath(dir.Path).AsInode() + attr.Inode = util.FullPath(dir.FullPath()).AsInode() attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0) - attr.Ctime = time.Unix(dir.entry.Attributes.Crtime, 0) + attr.Crtime = time.Unix(dir.entry.Attributes.Crtime, 0) attr.Gid = dir.entry.Attributes.Gid attr.Uid = dir.entry.Attributes.Uid - glog.V(3).Infof("dir Attr %s, attr: %+v", dir.Path, attr) + glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) return nil } func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - glog.V(4).Infof("dir Getxattr %s", dir.Path) + glog.V(4).Infof("dir Getxattr %s", dir.FullPath()) - if err := dir.maybeLoadEntry(ctx); err != nil { + if err := dir.maybeLoadEntry(); err != nil { return err } @@ -88,7 +89,7 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { } func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { - return dir.wfs.getNode(filer2.NewFullPath(dir.Path, name), func() fs.Node { + return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node { return &File{ Name: name, dir: dir, @@ -99,17 +100,19 @@ func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { }) } -func (dir *Dir) newDirectory(fullpath filer2.FullPath, entry *filer_pb.Entry) fs.Node { - return dir.wfs.getNode(fullpath, func() fs.Node { - return &Dir{Path: string(fullpath), wfs: dir.wfs, entry: entry} +func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node { + + return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node { + return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir} }) + } func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) { request := &filer_pb.CreateEntryRequest{ - Directory: dir.Path, + Directory: dir.FullPath(), Entry: &filer_pb.Entry{ Name: req.Name, IsDirectory: req.Mode&os.ModeDir > 0, @@ -126,22 +129,27 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, }, OExcl: req.Flags&fuse.OpenExclusive != 0, } - glog.V(1).Infof("create: %v", req.String()) + glog.V(1).Infof("create %s/%s: %v", dir.FullPath(), req.Name, req.Flags) - if err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - if err := filer_pb.CreateEntry(ctx, client, request); err != nil { + if err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := filer_pb.CreateEntry(client, request); err != nil { if strings.Contains(err.Error(), "EEXIST") { return fuse.EEXIST } return fuse.EIO } + + if dir.wfs.option.AsyncMetaDataCaching { + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + } + return nil }); err != nil { return nil, nil, err } var node fs.Node if request.Entry.IsDirectory { - node = dir.newDirectory(filer2.NewFullPath(dir.Path, req.Name), request.Entry) + node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), request.Entry) return node, nil, nil } @@ -155,6 +163,8 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) { + glog.V(4).Infof("mkdir %s: %s", dir.FullPath(), req.Name) + newEntry := &filer_pb.Entry{ Name: req.Name, IsDirectory: true, @@ -167,40 +177,55 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err }, } - err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ - Directory: dir.Path, + Directory: dir.FullPath(), Entry: newEntry, } glog.V(1).Infof("mkdir: %v", request) - if err := filer_pb.CreateEntry(ctx, client, request); err != nil { - glog.V(0).Infof("mkdir %s/%s: %v", dir.Path, req.Name, err) + if err := filer_pb.CreateEntry(client, request); err != nil { + glog.V(0).Infof("mkdir %s/%s: %v", dir.FullPath(), req.Name, err) return err } + if dir.wfs.option.AsyncMetaDataCaching { + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + } + return nil }) if err == nil { - node := dir.newDirectory(filer2.NewFullPath(dir.Path, req.Name), newEntry) + node := dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), newEntry) + return node, nil } + glog.V(0).Infof("mkdir %s/%s: %v", dir.FullPath(), req.Name, err) + return nil, fuse.EIO } func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { - glog.V(4).Infof("dir Lookup %s: %s", dir.Path, req.Name) + glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String()) - fullFilePath := filer2.NewFullPath(dir.Path, req.Name) + fullFilePath := util.NewFullPath(dir.FullPath(), req.Name) entry := dir.wfs.cacheGet(fullFilePath) + if dir.wfs.option.AsyncMetaDataCaching { + cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT + } + entry = cachedEntry.ToProtoEntry() + } + if entry == nil { // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) - entry, err = filer2.GetEntry(ctx, dir.wfs, fullFilePath) + entry, err = filer_pb.GetEntry(dir.wfs, fullFilePath) if err != nil { glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err) return nil, fuse.ENOENT @@ -221,7 +246,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. resp.Attr.Inode = fullFilePath.AsInode() resp.Attr.Valid = time.Second resp.Attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) - resp.Attr.Ctime = time.Unix(entry.Attributes.Crtime, 0) + resp.Attr.Crtime = time.Unix(entry.Attributes.Crtime, 0) resp.Attr.Mode = os.FileMode(entry.Attributes.FileMode) resp.Attr.Gid = entry.Attributes.Gid resp.Attr.Uid = entry.Attributes.Uid @@ -229,18 +254,17 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. return node, nil } - glog.V(1).Infof("not found dir GetEntry %s: %v", fullFilePath, err) + glog.V(4).Infof("not found dir GetEntry %s: %v", fullFilePath, err) return nil, fuse.ENOENT } func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - glog.V(3).Infof("dir ReadDirAll %s", dir.Path) + glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath()) cacheTtl := 5 * time.Minute - - readErr := filer2.ReadDirAllEntries(ctx, dir.wfs, filer2.FullPath(dir.Path), "", func(entry *filer_pb.Entry, isLast bool) { - fullpath := filer2.NewFullPath(dir.Path, entry.Name) + processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error { + fullpath := util.NewFullPath(dir.FullPath(), entry.Name) inode := fullpath.AsInode() if entry.IsDirectory { dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_Dir} @@ -250,9 +274,24 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { ret = append(ret, dirent) } dir.wfs.cacheSet(fullpath, entry, cacheTtl) - }) + return nil + } + + if dir.wfs.option.AsyncMetaDataCaching { + listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit)) + if listErr != nil { + glog.Errorf("list meta cache: %v", listErr) + return nil, fuse.EIO + } + for _, cachedEntry := range listedEntries { + processEachEntryFn(cachedEntry.ToProtoEntry(), false) + } + return + } + + readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", processEachEntryFn) if readErr != nil { - glog.V(0).Infof("list %s: %v", dir.Path, err) + glog.V(0).Infof("list %s: %v", dir.FullPath(), err) return ret, fuse.EIO } @@ -262,74 +301,70 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { if !req.Dir { - return dir.removeOneFile(ctx, req) + return dir.removeOneFile(req) } - return dir.removeFolder(ctx, req) + return dir.removeFolder(req) } -func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) error { +func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { - filePath := filer2.NewFullPath(dir.Path, req.Name) - entry, err := filer2.GetEntry(ctx, dir.wfs, filePath) + filePath := util.NewFullPath(dir.FullPath(), req.Name) + entry, err := filer_pb.GetEntry(dir.wfs, filePath) if err != nil { return err } + if entry == nil { + return nil + } - dir.wfs.deleteFileChunks(ctx, entry.Chunks) + dir.wfs.deleteFileChunks(entry.Chunks) dir.wfs.cacheDelete(filePath) + dir.wfs.fsNodeCache.DeleteFsNode(filePath) - return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.DeleteEntryRequest{ - Directory: dir.Path, - Name: req.Name, - IsDeleteData: false, - } + if dir.wfs.option.AsyncMetaDataCaching { + dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) + } - glog.V(3).Infof("remove file: %v", request) - _, err := client.DeleteEntry(ctx, request) - if err != nil { - glog.V(3).Infof("not found remove file %s/%s: %v", dir.Path, req.Name, err) - return fuse.ENOENT - } + glog.V(3).Infof("remove file: %v", req) + err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false) + if err != nil { + glog.V(3).Infof("not found remove file %s/%s: %v", dir.FullPath(), req.Name, err) + return fuse.ENOENT + } - return nil - }) + return nil } -func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error { - - dir.wfs.cacheDelete(filer2.NewFullPath(dir.Path, req.Name)) +func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { - return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + t := util.NewFullPath(dir.FullPath(), req.Name) + dir.wfs.cacheDelete(t) + dir.wfs.fsNodeCache.DeleteFsNode(t) - request := &filer_pb.DeleteEntryRequest{ - Directory: dir.Path, - Name: req.Name, - IsDeleteData: true, - } + if dir.wfs.option.AsyncMetaDataCaching { + dir.wfs.metaCache.DeleteEntry(context.Background(), t) + } - glog.V(3).Infof("remove directory entry: %v", request) - _, err := client.DeleteEntry(ctx, request) - if err != nil { - glog.V(3).Infof("not found remove %s/%s: %v", dir.Path, req.Name, err) - return fuse.ENOENT - } + glog.V(3).Infof("remove directory entry: %v", req) + err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false) + if err != nil { + glog.V(3).Infof("not found remove %s/%s: %v", dir.FullPath(), req.Name, err) + return fuse.ENOENT + } - return nil - }) + return nil } func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - glog.V(3).Infof("%v dir setattr %+v", dir.Path, req) + glog.V(3).Infof("%v dir setattr %+v", dir.FullPath(), req) - if err := dir.maybeLoadEntry(ctx); err != nil { + if err := dir.maybeLoadEntry(); err != nil { return err } @@ -349,17 +384,17 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus dir.entry.Attributes.Mtime = req.Mtime.Unix() } - dir.wfs.cacheDelete(filer2.FullPath(dir.Path)) + dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) - return dir.saveEntry(ctx) + return dir.saveEntry() } func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { - glog.V(4).Infof("dir Setxattr %s: %s", dir.Path, req.Name) + glog.V(4).Infof("dir Setxattr %s: %s", dir.FullPath(), req.Name) - if err := dir.maybeLoadEntry(ctx); err != nil { + if err := dir.maybeLoadEntry(); err != nil { return err } @@ -367,17 +402,17 @@ func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { return err } - dir.wfs.cacheDelete(filer2.FullPath(dir.Path)) + dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) - return dir.saveEntry(ctx) + return dir.saveEntry() } func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error { - glog.V(4).Infof("dir Removexattr %s: %s", dir.Path, req.Name) + glog.V(4).Infof("dir Removexattr %s: %s", dir.FullPath(), req.Name) - if err := dir.maybeLoadEntry(ctx); err != nil { + if err := dir.maybeLoadEntry(); err != nil { return err } @@ -385,17 +420,17 @@ func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) e return err } - dir.wfs.cacheDelete(filer2.FullPath(dir.Path)) + dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) - return dir.saveEntry(ctx) + return dir.saveEntry() } func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { - glog.V(4).Infof("dir Listxattr %s", dir.Path) + glog.V(4).Infof("dir Listxattr %s", dir.FullPath()) - if err := dir.maybeLoadEntry(ctx); err != nil { + if err := dir.maybeLoadEntry(); err != nil { return err } @@ -408,15 +443,15 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp } func (dir *Dir) Forget() { - glog.V(3).Infof("Forget dir %s", dir.Path) + glog.V(3).Infof("Forget dir %s", dir.FullPath()) - dir.wfs.forgetNode(filer2.FullPath(dir.Path)) + dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath())) } -func (dir *Dir) maybeLoadEntry(ctx context.Context) error { +func (dir *Dir) maybeLoadEntry() error { if dir.entry == nil { - parentDirPath, name := filer2.FullPath(dir.Path).DirAndName() - entry, err := dir.wfs.maybeLoadEntry(ctx, parentDirPath, name) + parentDirPath, name := util.FullPath(dir.FullPath()).DirAndName() + entry, err := dir.wfs.maybeLoadEntry(parentDirPath, name) if err != nil { return err } @@ -425,11 +460,11 @@ func (dir *Dir) maybeLoadEntry(ctx context.Context) error { return nil } -func (dir *Dir) saveEntry(ctx context.Context) error { +func (dir *Dir) saveEntry() error { - parentDir, name := filer2.FullPath(dir.Path).DirAndName() + parentDir, name := util.FullPath(dir.FullPath()).DirAndName() - return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: parentDir, @@ -437,12 +472,40 @@ func (dir *Dir) saveEntry(ctx context.Context) error { } glog.V(1).Infof("save dir entry: %v", request) - _, err := client.UpdateEntry(ctx, request) + _, err := client.UpdateEntry(context.Background(), request) if err != nil { glog.V(0).Infof("UpdateEntry dir %s/%s: %v", parentDir, name, err) return fuse.EIO } + if dir.wfs.option.AsyncMetaDataCaching { + dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + } + return nil }) } + +func (dir *Dir) FullPath() string { + var parts []string + for p := dir; p != nil; p = p.parent { + if strings.HasPrefix(p.name, "/") { + if len(p.name) > 1 { + parts = append(parts, p.name[1:]) + } + } else { + parts = append(parts, p.name) + } + } + + if len(parts) == 0 { + return "/" + } + + var buf bytes.Buffer + for i := len(parts) - 1; i >= 0; i-- { + buf.WriteString("/") + buf.WriteString(parts[i]) + } + return buf.String() +} diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index 8b7ec7e89..d1858e99b 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -6,6 +6,7 @@ import ( "syscall" "time" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/fuse" @@ -17,10 +18,10 @@ var _ = fs.NodeReadlinker(&File{}) func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, error) { - glog.V(3).Infof("Symlink: %v/%v to %v", dir.Path, req.NewName, req.Target) + glog.V(3).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target) request := &filer_pb.CreateEntryRequest{ - Directory: dir.Path, + Directory: dir.FullPath(), Entry: &filer_pb.Entry{ Name: req.NewName, IsDirectory: false, @@ -35,11 +36,16 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, }, } - err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - if err := filer_pb.CreateEntry(ctx, client, request); err != nil { - glog.V(0).Infof("symlink %s/%s: %v", dir.Path, req.NewName, err) + err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := filer_pb.CreateEntry(client, request); err != nil { + glog.V(0).Infof("symlink %s/%s: %v", dir.FullPath(), req.NewName, err) return fuse.EIO } + + if dir.wfs.option.AsyncMetaDataCaching { + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + } + return nil }) @@ -59,7 +65,7 @@ func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (stri return "", fuse.Errno(syscall.EINVAL) } - glog.V(3).Infof("Readlink: %v/%v => %v", file.dir.Path, file.Name, file.entry.Attributes.SymlinkTarget) + glog.V(3).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, file.entry.Attributes.SymlinkTarget) return file.entry.Attributes.SymlinkTarget, nil diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index 4eb3c15b5..ea40f5c31 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -3,9 +3,9 @@ package filesys import ( "context" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" ) @@ -13,20 +13,24 @@ import ( func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error { newDir := newDirectory.(*Dir) - glog.V(4).Infof("dir Rename %s/%s => %s/%s", dir.Path, req.OldName, newDir.Path, req.NewName) - err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + newPath := util.NewFullPath(newDir.FullPath(), req.NewName) + oldPath := util.NewFullPath(dir.FullPath(), req.OldName) + + glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath) + + err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ - OldDirectory: dir.Path, + OldDirectory: dir.FullPath(), OldName: req.OldName, - NewDirectory: newDir.Path, + NewDirectory: newDir.FullPath(), NewName: req.NewName, } - _, err := client.AtomicRenameEntry(ctx, request) + _, err := client.AtomicRenameEntry(context.Background(), request) if err != nil { - glog.V(0).Infof("dir Rename %s/%s => %s/%s : %v", dir.Path, req.OldName, newDir.Path, req.NewName, err) + glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err) return fuse.EIO } @@ -35,28 +39,12 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector }) if err == nil { - newPath := filer2.NewFullPath(newDir.Path, req.NewName) - oldPath := filer2.NewFullPath(dir.Path, req.OldName) dir.wfs.cacheDelete(newPath) dir.wfs.cacheDelete(oldPath) - oldFileNode := dir.wfs.getNode(oldPath, func() fs.Node { - return nil - }) - newDirNode := dir.wfs.getNode(filer2.FullPath(dir.Path), func() fs.Node { - return nil - }) - dir.wfs.forgetNode(newPath) - dir.wfs.forgetNode(oldPath) - if oldFileNode != nil && newDirNode != nil { - oldFile := oldFileNode.(*File) - oldFile.Name = req.NewName - oldFile.dir = newDirNode.(*Dir) - dir.wfs.getNode(newPath, func() fs.Node { - return oldFile - }) + // fmt.Printf("rename path: %v => %v\n", oldPath, newPath) + dir.wfs.fsNodeCache.Move(oldPath, newPath) - } } return err diff --git a/weed/filesys/dir_test.go b/weed/filesys/dir_test.go new file mode 100644 index 000000000..49c76eb5e --- /dev/null +++ b/weed/filesys/dir_test.go @@ -0,0 +1,34 @@ +package filesys + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDirPath(t *testing.T) { + + p := &Dir{name: "/some"} + p = &Dir{name: "path", parent: p} + p = &Dir{name: "to", parent: p} + p = &Dir{name: "a", parent: p} + p = &Dir{name: "file", parent: p} + + assert.Equal(t, "/some/path/to/a/file", p.FullPath()) + + p = &Dir{name: "/some"} + assert.Equal(t, "/some", p.FullPath()) + + p = &Dir{name: "/"} + assert.Equal(t, "/", p.FullPath()) + + p = &Dir{name: "/"} + p = &Dir{name: "path", parent: p} + assert.Equal(t, "/path", p.FullPath()) + + p = &Dir{name: "/"} + p = &Dir{name: "path", parent: p} + p = &Dir{name: "to", parent: p} + assert.Equal(t, "/path/to", p.FullPath()) + +} diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 5ff128323..45224b3e7 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -15,9 +15,11 @@ import ( ) type ContinuousDirtyPages struct { - intervals *ContinuousIntervals - f *File - lock sync.Mutex + intervals *ContinuousIntervals + f *File + lock sync.Mutex + collection string + replication string } func newDirtyPages(file *File) *ContinuousDirtyPages { @@ -32,7 +34,7 @@ func (pages *ContinuousDirtyPages) releaseResource() { var counter = int32(0) -func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { +func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { pages.lock.Lock() defer pages.lock.Unlock() @@ -41,7 +43,7 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { // this is more than what buffer can hold. - return pages.flushAndSave(ctx, offset, data) + return pages.flushAndSave(offset, data) } pages.intervals.AddInterval(data, offset) @@ -50,7 +52,7 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da var hasSavedData bool if pages.intervals.TotalSize() > pages.f.wfs.option.ChunkSizeLimit { - chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage(ctx) + chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage() if hasSavedData { chunks = append(chunks, chunk) } @@ -59,13 +61,13 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da return } -func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { +func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { var chunk *filer_pb.FileChunk var newChunks []*filer_pb.FileChunk // flush existing - if newChunks, err = pages.saveExistingPagesToStorage(ctx); err == nil { + if newChunks, err = pages.saveExistingPagesToStorage(); err == nil { if newChunks != nil { chunks = append(chunks, newChunks...) } @@ -74,35 +76,35 @@ func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int6 } // flush the new page - if chunk, err = pages.saveToStorage(ctx, bytes.NewReader(data), offset, int64(len(data))); err == nil { + if chunk, err = pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))); err == nil { if chunk != nil { - glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId) + glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.FullPath(), pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId) chunks = append(chunks, chunk) } } else { - glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.FullPath(), pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) return } return } -func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunks []*filer_pb.FileChunk, err error) { +func (pages *ContinuousDirtyPages) FlushToStorage() (chunks []*filer_pb.FileChunk, err error) { pages.lock.Lock() defer pages.lock.Unlock() - return pages.saveExistingPagesToStorage(ctx) + return pages.saveExistingPagesToStorage() } -func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (chunks []*filer_pb.FileChunk, err error) { +func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer_pb.FileChunk, err error) { var hasSavedData bool var chunk *filer_pb.FileChunk for { - chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage(ctx) + chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage() if !hasSavedData { return chunks, err } @@ -116,31 +118,35 @@ func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Contex } -func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, hasSavedData bool, err error) { +func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *filer_pb.FileChunk, hasSavedData bool, err error) { maxList := pages.intervals.RemoveLargestIntervalLinkedList() if maxList == nil { return nil, false, nil } - chunk, err = pages.saveToStorage(ctx, maxList.ToReader(), maxList.Offset(), maxList.Size()) - if err == nil { - hasSavedData = true - glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId) - } else { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err) - return + for { + chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size()) + if err == nil { + hasSavedData = true + glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId) + return + } else { + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err) + time.Sleep(5 * time.Second) + } } - return } -func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) { +func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) { var fileId, host string var auth security.EncodedJwt - if err := pages.f.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + dir, _ := pages.f.fullpath().DirAndName() + + if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -148,15 +154,21 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, reader io. Collection: pages.f.wfs.option.Collection, TtlSec: pages.f.wfs.option.TtlSec, DataCenter: pages.f.wfs.option.DataCenter, + ParentPath: dir, } - resp, err := client.AssignVolume(ctx, request) + resp, err := client.AssignVolume(context.Background(), request) if err != nil { glog.V(0).Infof("assign volume failure %v: %v", request, err) return err } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + host = pages.f.wfs.AdjustedUrl(host) + pages.collection, pages.replication = resp.Collection, resp.Replication return nil }); err != nil { @@ -164,7 +176,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, reader io. } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - uploadResult, err := operation.Upload(fileUrl, pages.f.Name, reader, false, "", nil, auth) + uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) return nil, fmt.Errorf("upload data: %v", err) @@ -173,14 +185,9 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, reader io. glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err) return nil, fmt.Errorf("upload result: %v", uploadResult.Error) } + pages.f.wfs.chunkCache.SetChunk(fileId, data) - return &filer_pb.FileChunk{ - FileId: fileId, - Offset: offset, - Size: uint64(size), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - }, nil + return uploadResult.ToPbFileChunk(fileId, offset), nil } @@ -197,7 +204,7 @@ func min(x, y int64) int64 { return y } -func (pages *ContinuousDirtyPages) ReadDirtyData(ctx context.Context, data []byte, startOffset int64) (offset int64, size int) { +func (pages *ContinuousDirtyPages) ReadDirtyData(data []byte, startOffset int64) (offset int64, size int) { pages.lock.Lock() defer pages.lock.Unlock() diff --git a/weed/filesys/dirty_page_interval_test.go b/weed/filesys/dirty_page_interval_test.go index 184be2f3b..ab3b37b7c 100644 --- a/weed/filesys/dirty_page_interval_test.go +++ b/weed/filesys/dirty_page_interval_test.go @@ -35,6 +35,23 @@ func TestContinuousIntervals_AddIntervalFullOverwrite(t *testing.T) { c := &ContinuousIntervals{} + // 1, + c.AddInterval(getBytes(1, 1), 0) + // _, 2, + c.AddInterval(getBytes(2, 1), 1) + // _, _, 3, 3, 3 + c.AddInterval(getBytes(3, 3), 2) + // _, _, _, 4, 4, 4 + c.AddInterval(getBytes(4, 3), 3) + + expectedData(t, c, 0, 1, 2, 3, 4, 4, 4) + +} + +func TestContinuousIntervals_RealCase1(t *testing.T) { + + c := &ContinuousIntervals{} + // 25, c.AddInterval(getBytes(25, 1), 0) // _, _, _, _, 23, 23 diff --git a/weed/filesys/file.go b/weed/filesys/file.go index eccef4e58..bafbd7cc8 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -2,6 +2,7 @@ package filesys import ( "context" + "io" "os" "sort" "time" @@ -9,6 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" ) @@ -32,10 +34,11 @@ type File struct { entry *filer_pb.Entry entryViewCache []filer2.VisibleInterval isOpen int + reader io.ReaderAt } -func (file *File) fullpath() filer2.FullPath { - return filer2.NewFullPath(file.dir.Path, file.Name) +func (file *File) fullpath() util.FullPath { + return util.NewFullPath(file.dir.FullPath(), file.Name) } func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { @@ -69,7 +72,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - // glog.V(4).Infof("file Getxattr %s", file.fullpath()) + glog.V(4).Infof("file Getxattr %s", file.fullpath()) if err := file.maybeLoadEntry(ctx); err != nil { return err @@ -119,6 +122,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f } file.entry.Chunks = chunks file.entryViewCache = nil + file.reader = nil } file.entry.Attributes.FileSize = req.Size } @@ -148,7 +152,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f file.wfs.cacheDelete(file.fullpath()) - return file.saveEntry(ctx) + return file.saveEntry() } @@ -166,7 +170,7 @@ func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error file.wfs.cacheDelete(file.fullpath()) - return file.saveEntry(ctx) + return file.saveEntry() } @@ -184,7 +188,7 @@ func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) file.wfs.cacheDelete(file.fullpath()) - return file.saveEntry(ctx) + return file.saveEntry() } @@ -207,22 +211,22 @@ func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, res func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { // fsync works at OS level // write the file chunks to the filerGrpcAddress - glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req) + glog.V(3).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req) return nil } func (file *File) Forget() { - glog.V(3).Infof("Forget file %s/%s", file.dir.Path, file.Name) - - file.wfs.forgetNode(filer2.NewFullPath(file.dir.Path, file.Name)) - + t := util.NewFullPath(file.dir.FullPath(), file.Name) + glog.V(3).Infof("Forget file %s", t) + file.wfs.fsNodeCache.DeleteFsNode(t) } func (file *File) maybeLoadEntry(ctx context.Context) error { if file.entry == nil || file.isOpen <= 0 { - entry, err := file.wfs.maybeLoadEntry(ctx, file.dir.Path, file.Name) + entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name) if err != nil { + glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) return err } if entry != nil { @@ -246,6 +250,8 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { newVisibles = t } + file.reader = nil + glog.V(3).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks)) file.entry.Chunks = append(file.entry.Chunks, chunks...) @@ -254,23 +260,28 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { func (file *File) setEntry(entry *filer_pb.Entry) { file.entry = entry file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks) + file.reader = nil } -func (file *File) saveEntry(ctx context.Context) error { - return file.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { +func (file *File) saveEntry() error { + return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ - Directory: file.dir.Path, + Directory: file.dir.FullPath(), Entry: file.entry, } glog.V(1).Infof("save file entry: %v", request) - _, err := client.UpdateEntry(ctx, request) + _, err := client.UpdateEntry(context.Background(), request) if err != nil { - glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.Path, file.Name, err) + glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) return fuse.EIO } + if file.wfs.option.AsyncMetaDataCaching { + file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + } + return nil }) } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index cf253a7ed..372d742ea 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -3,12 +3,10 @@ package filesys import ( "context" "fmt" - "mime" - "path" + "math" + "net/http" "time" - "github.com/gabriel-vasile/mimetype" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -28,15 +26,20 @@ type FileHandle struct { NodeId fuse.NodeID // file or directory the request is about Uid uint32 // user ID of process making request Gid uint32 // group ID of process making request + } func newFileHandle(file *File, uid, gid uint32) *FileHandle { - return &FileHandle{ + fh := &FileHandle{ f: file, dirtyPages: newDirtyPages(file), Uid: uid, Gid: gid, } + if fh.f.entry != nil { + fh.f.entry.Attributes.FileSize = filer2.TotalSize(fh.f.entry.Chunks) + } + return fh } var _ = fs.Handle(&FileHandle{}) @@ -53,9 +56,9 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus buff := make([]byte, req.Size) - totalRead, err := fh.readFromChunks(ctx, buff, req.Offset) + totalRead, err := fh.readFromChunks(buff, req.Offset) if err == nil { - dirtyOffset, dirtySize := fh.readFromDirtyPages(ctx, buff, req.Offset) + dirtyOffset, dirtySize := fh.readFromDirtyPages(buff, req.Offset) if totalRead+req.Offset < dirtyOffset+int64(dirtySize) { totalRead = dirtyOffset + int64(dirtySize) - req.Offset } @@ -71,11 +74,11 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus return err } -func (fh *FileHandle) readFromDirtyPages(ctx context.Context, buff []byte, startOffset int64) (offset int64, size int) { - return fh.dirtyPages.ReadDirtyData(ctx, buff, startOffset) +func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (offset int64, size int) { + return fh.dirtyPages.ReadDirtyData(buff, startOffset) } -func (fh *FileHandle) readFromChunks(ctx context.Context, buff []byte, offset int64) (int64, error) { +func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { // this value should come from the filer instead of the old f if len(fh.f.entry.Chunks) == 0 { @@ -85,43 +88,46 @@ func (fh *FileHandle) readFromChunks(ctx context.Context, buff []byte, offset in if fh.f.entryViewCache == nil { fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks) + fh.f.reader = nil } - chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, offset, len(buff)) + if fh.f.reader == nil { + chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt32) + fh.f.reader = filer2.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache) + } - totalRead, err := filer2.ReadIntoBuffer(ctx, fh.f.wfs, fh.f.fullpath(), buff, chunkViews, offset) + totalRead, err := fh.f.reader.ReadAt(buff, offset) if err != nil { glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) } - return totalRead, err + // glog.V(0).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err) + + return int64(totalRead), err } // Write to the file handle func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { // write the request to volume servers + data := make([]byte, len(req.Data)) + copy(data, req.Data) - fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(req.Data)), int64(fh.f.entry.Attributes.FileSize))) + fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize))) // glog.V(0).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data))) - chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data) + chunks, err := fh.dirtyPages.AddPage(req.Offset, data) if err != nil { - glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err) + glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(data)), err) return fuse.EIO } - resp.Size = len(req.Data) + resp.Size = len(data) if req.Offset == 0 { // detect mime type - detectedMIME := mimetype.Detect(req.Data) - fh.contentType = detectedMIME.String() - if ext := path.Ext(fh.f.Name); ext != detectedMIME.Extension() { - fh.contentType = mime.TypeByExtension(ext) - } - + fh.contentType = http.DetectContentType(data) fh.dirtyMetadata = true } @@ -145,6 +151,8 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err fh.dirtyPages.releaseResource() fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) } + fh.f.entryViewCache = nil + fh.f.reader = nil return nil } @@ -154,14 +162,14 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { // send the data to the OS glog.V(4).Infof("%s fh %d flush %v", fh.f.fullpath(), fh.handle, req) - chunks, err := fh.dirtyPages.FlushToStorage(ctx) + chunks, err := fh.dirtyPages.FlushToStorage() if err != nil { glog.Errorf("flush %s: %v", fh.f.fullpath(), err) return fuse.EIO } - fh.f.addChunks(chunks) if len(chunks) > 0 { + fh.f.addChunks(chunks) fh.dirtyMetadata = true } @@ -169,7 +177,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return nil } - err = fh.f.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + err = fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { if fh.f.entry.Attributes != nil { fh.f.entry.Attributes.Mime = fh.contentType @@ -177,11 +185,13 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { fh.f.entry.Attributes.Gid = req.Gid fh.f.entry.Attributes.Mtime = time.Now().Unix() fh.f.entry.Attributes.Crtime = time.Now().Unix() - fh.f.entry.Attributes.FileMode = uint32(0777 &^ fh.f.wfs.option.Umask) + fh.f.entry.Attributes.FileMode = uint32(0666 &^ fh.f.wfs.option.Umask) + fh.f.entry.Attributes.Collection = fh.dirtyPages.collection + fh.f.entry.Attributes.Replication = fh.dirtyPages.replication } request := &filer_pb.CreateEntryRequest{ - Directory: fh.f.dir.Path, + Directory: fh.f.dir.FullPath(), Entry: fh.f.entry, } @@ -194,12 +204,16 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { fh.f.entry.Chunks = chunks // fh.f.entryViewCache = nil - if err := filer_pb.CreateEntry(ctx, client, request); err != nil { - glog.Errorf("update fh: %v", err) - return fmt.Errorf("update fh: %v", err) + if err := filer_pb.CreateEntry(client, request); err != nil { + glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) + return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) + } + + if fh.f.wfs.option.AsyncMetaDataCaching { + fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) } - fh.f.wfs.deleteFileChunks(ctx, garbages) + fh.f.wfs.deleteFileChunks(garbages) for i, chunk := range garbages { glog.V(3).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) } diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go new file mode 100644 index 000000000..b146f0615 --- /dev/null +++ b/weed/filesys/fscache.go @@ -0,0 +1,207 @@ +package filesys + +import ( + "sync" + + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/fuse/fs" +) + +type FsCache struct { + root *FsNode + sync.RWMutex +} +type FsNode struct { + parent *FsNode + node fs.Node + name string + childrenLock sync.RWMutex + children map[string]*FsNode +} + +func newFsCache(root fs.Node) *FsCache { + return &FsCache{ + root: &FsNode{ + node: root, + }, + } +} + +func (c *FsCache) GetFsNode(path util.FullPath) fs.Node { + + c.RLock() + defer c.RUnlock() + + return c.doGetFsNode(path) +} + +func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node { + t := c.root + for _, p := range path.Split() { + t = t.findChild(p) + if t == nil { + return nil + } + } + return t.node +} + +func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) { + + c.Lock() + defer c.Unlock() + + c.doSetFsNode(path, node) +} + +func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) { + t := c.root + for _, p := range path.Split() { + t = t.ensureChild(p) + } + t.node = node +} + +func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node { + + c.Lock() + defer c.Unlock() + + t := c.doGetFsNode(path) + if t != nil { + return t + } + t = genNodeFn() + c.doSetFsNode(path, t) + return t +} + +func (c *FsCache) DeleteFsNode(path util.FullPath) { + + c.Lock() + defer c.Unlock() + + t := c.root + for _, p := range path.Split() { + t = t.findChild(p) + if t == nil { + return + } + } + if t.parent != nil { + t.parent.disconnectChild(t) + } + t.deleteSelf() +} + +// oldPath and newPath are full path including the new name +func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { + + c.Lock() + defer c.Unlock() + + // find old node + src := c.root + for _, p := range oldPath.Split() { + src = src.findChild(p) + if src == nil { + return src + } + } + if src.parent != nil { + src.parent.disconnectChild(src) + } + + // find new node + target := c.root + for _, p := range newPath.Split() { + target = target.ensureChild(p) + } + parent := target.parent + src.name = target.name + if dir, ok := src.node.(*Dir); ok { + dir.name = target.name // target is not Dir, but a shortcut + } + if f, ok := src.node.(*File); ok { + f.Name = target.name + if f.entry != nil { + f.entry.Name = f.Name + } + } + parent.disconnectChild(target) + + target.deleteSelf() + + src.connectToParent(parent) + + return src +} + +func (n *FsNode) connectToParent(parent *FsNode) { + n.parent = parent + oldNode := parent.findChild(n.name) + if oldNode != nil { + oldNode.deleteSelf() + } + if dir, ok := n.node.(*Dir); ok { + dir.parent = parent.node.(*Dir) + } + if f, ok := n.node.(*File); ok { + f.dir = parent.node.(*Dir) + } + n.childrenLock.Lock() + parent.children[n.name] = n + n.childrenLock.Unlock() +} + +func (n *FsNode) findChild(name string) *FsNode { + n.childrenLock.RLock() + defer n.childrenLock.RUnlock() + + child, found := n.children[name] + if found { + return child + } + return nil +} + +func (n *FsNode) ensureChild(name string) *FsNode { + n.childrenLock.Lock() + defer n.childrenLock.Unlock() + + if n.children == nil { + n.children = make(map[string]*FsNode) + } + child, found := n.children[name] + if found { + return child + } + t := &FsNode{ + parent: n, + node: nil, + name: name, + children: nil, + } + n.children[name] = t + return t +} + +func (n *FsNode) disconnectChild(child *FsNode) { + n.childrenLock.Lock() + delete(n.children, child.name) + n.childrenLock.Unlock() + child.parent = nil +} + +func (n *FsNode) deleteSelf() { + n.childrenLock.Lock() + for _, child := range n.children { + child.deleteSelf() + } + n.children = nil + n.childrenLock.Unlock() + + n.node = nil + n.parent = nil + +} diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go new file mode 100644 index 000000000..67f9aacc8 --- /dev/null +++ b/weed/filesys/fscache_test.go @@ -0,0 +1,96 @@ +package filesys + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/util" +) + +func TestPathSplit(t *testing.T) { + parts := util.FullPath("/").Split() + if len(parts) != 0 { + t.Errorf("expecting an empty list, but getting %d", len(parts)) + } + + parts = util.FullPath("/readme.md").Split() + if len(parts) != 1 { + t.Errorf("expecting an empty list, but getting %d", len(parts)) + } + +} + +func TestFsCache(t *testing.T) { + + cache := newFsCache(nil) + + x := cache.GetFsNode(util.FullPath("/y/x")) + if x != nil { + t.Errorf("wrong node!") + } + + p := util.FullPath("/a/b/c") + cache.SetFsNode(p, &File{Name: "cc"}) + tNode := cache.GetFsNode(p) + tFile := tNode.(*File) + if tFile.Name != "cc" { + t.Errorf("expecting a FsNode") + } + + cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) + cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) + cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"}) + cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) + cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) + + b := cache.GetFsNode(util.FullPath("/a/b")) + if b != nil { + t.Errorf("unexpected node!") + } + + a := cache.GetFsNode(util.FullPath("/a")) + if a == nil { + t.Errorf("missing node!") + } + + cache.DeleteFsNode(util.FullPath("/a")) + if b != nil { + t.Errorf("unexpected node!") + } + + a = cache.GetFsNode(util.FullPath("/a")) + if a != nil { + t.Errorf("wrong DeleteFsNode!") + } + + z := cache.GetFsNode(util.FullPath("/z")) + if z == nil { + t.Errorf("missing node!") + } + + y := cache.GetFsNode(util.FullPath("/x/y")) + if y != nil { + t.Errorf("wrong node!") + } + +} + +func TestFsCacheMove(t *testing.T) { + + cache := newFsCache(nil) + + cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) + cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) + cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) + cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) + + cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x")) + + d := cache.GetFsNode(util.FullPath("/z/x/d")) + if d == nil { + t.Errorf("unexpected nil node!") + } + if d.(*File).Name != "dd" { + t.Errorf("unexpected non dd node!") + } + +} diff --git a/weed/filesys/meta_cache/cache_config.go b/weed/filesys/meta_cache/cache_config.go new file mode 100644 index 000000000..e6593ebde --- /dev/null +++ b/weed/filesys/meta_cache/cache_config.go @@ -0,0 +1,32 @@ +package meta_cache + +import "github.com/chrislusf/seaweedfs/weed/util" + +var ( + _ = util.Configuration(&cacheConfig{}) +) + +// implementing util.Configuraion +type cacheConfig struct { + dir string +} + +func (c cacheConfig) GetString(key string) string { + return c.dir +} + +func (c cacheConfig) GetBool(key string) bool { + panic("implement me") +} + +func (c cacheConfig) GetInt(key string) int { + panic("implement me") +} + +func (c cacheConfig) GetStringSlice(key string) []string { + panic("implement me") +} + +func (c cacheConfig) SetDefault(key string, value interface{}) { + panic("implement me") +} diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go new file mode 100644 index 000000000..4c9090d42 --- /dev/null +++ b/weed/filesys/meta_cache/meta_cache.go @@ -0,0 +1,93 @@ +package meta_cache + +import ( + "context" + "os" + "sync" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type MetaCache struct { + actualStore filer2.FilerStore + sync.RWMutex +} + +func NewMetaCache(dbFolder string) *MetaCache { + return &MetaCache{ + actualStore: openMetaStore(dbFolder), + } +} + +func openMetaStore(dbFolder string) filer2.FilerStore { + + os.RemoveAll(dbFolder) + os.MkdirAll(dbFolder, 0755) + + store := &leveldb.LevelDBStore{} + config := &cacheConfig{ + dir: dbFolder, + } + + if err := store.Initialize(config, ""); err != nil { + glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err) + } + + return store + +} + +func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error { + mc.Lock() + defer mc.Unlock() + return mc.actualStore.InsertEntry(ctx, entry) +} + +func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPath, newEntry *filer2.Entry) error { + mc.Lock() + defer mc.Unlock() + if oldPath != "" { + if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil { + return err + } + } + if newEntry != nil { + if err := mc.actualStore.InsertEntry(ctx, newEntry); err != nil { + return err + } + } + return nil +} + +func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer2.Entry) error { + mc.Lock() + defer mc.Unlock() + return mc.actualStore.UpdateEntry(ctx, entry) +} + +func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer2.Entry, err error) { + mc.RLock() + defer mc.RUnlock() + return mc.actualStore.FindEntry(ctx, fp) +} + +func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { + mc.Lock() + defer mc.Unlock() + return mc.actualStore.DeleteEntry(ctx, fp) +} + +func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer2.Entry, error) { + mc.RLock() + defer mc.RUnlock() + return mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) +} + +func (mc *MetaCache) Shutdown() { + mc.Lock() + defer mc.Unlock() + mc.actualStore.Shutdown() +} diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go new file mode 100644 index 000000000..58bf6862e --- /dev/null +++ b/weed/filesys/meta_cache/meta_cache_init.go @@ -0,0 +1,21 @@ +package meta_cache + +import ( + "context" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient, path string) error { + glog.V(0).Infof("synchronizing meta data ...") + filer_pb.TraverseBfs(client, util.FullPath(path), func(parentPath util.FullPath, pbEntry *filer_pb.Entry) { + entry := filer2.FromPbEntry(string(parentPath), pbEntry) + if err := mc.InsertEntry(context.Background(), entry); err != nil { + glog.V(0).Infof("read %s: %v", entry.FullPath, err) + } + }) + return nil +} diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go new file mode 100644 index 000000000..2e411a48a --- /dev/null +++ b/weed/filesys/meta_cache/meta_cache_subscribe.go @@ -0,0 +1,69 @@ +package meta_cache + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string, lastTsNs int64) error { + + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + var oldPath util.FullPath + var newEntry *filer2.Entry + if message.OldEntry != nil { + oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name) + glog.V(4).Infof("deleting %v", oldPath) + } + + if message.NewEntry != nil { + dir := resp.Directory + if message.NewParentPath != "" { + dir = message.NewParentPath + } + key := util.NewFullPath(dir, message.NewEntry.Name) + glog.V(4).Infof("creating %v", key) + newEntry = filer2.FromPbEntry(dir, message.NewEntry) + } + return mc.AtomicUpdateEntry(context.Background(), oldPath, newEntry) + } + + for { + err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ + ClientName: "mount", + PathPrefix: dir, + SinceNs: lastTsNs, + }) + if err != nil { + return fmt.Errorf("subscribe: %v", err) + } + + for { + resp, listenErr := stream.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + + if err := processEventFn(resp); err != nil { + return fmt.Errorf("process %v: %v", resp, err) + } + lastTsNs = resp.TsNs + } + }) + if err != nil { + glog.V(0).Infof("subscribing filer meta change: %v", err) + time.Sleep(time.Second) + } + } +} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 4807e367b..67dd2a62c 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -5,17 +5,21 @@ import ( "fmt" "math" "os" + "path" "strings" "sync" "time" + "github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/karlseguin/ccache" "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" ) @@ -28,6 +32,8 @@ type Option struct { Replication string TtlSec int32 ChunkSizeLimit int64 + CacheDir string + CacheSizeMB int64 DataCenter string DirListCacheLimit int64 EntryCacheTtl time.Duration @@ -38,6 +44,11 @@ type Option struct { MountMode os.FileMode MountCtime time.Time MountMtime time.Time + + OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers + Cipher bool // whether encrypt data on volume server + AsyncMetaDataCaching bool // whether asynchronously cache meta data + } var _ = fs.FS(&WFS{}) @@ -48,18 +59,18 @@ type WFS struct { listDirectoryEntriesCache *ccache.Cache // contains all open handles, protected by handlesLock - handlesLock sync.Mutex - handles []*FileHandle - pathToHandleIndex map[filer2.FullPath]int + handlesLock sync.Mutex + handles map[uint64]*FileHandle bufPool sync.Pool stats statsCache - // nodes, protected by nodesLock - nodesLock sync.Mutex - nodes map[uint64]fs.Node - root fs.Node + root fs.Node + fsNodeCache *FsCache + + chunkCache *chunk_cache.ChunkCache + metaCache *meta_cache.MetaCache } type statsCache struct { filer_pb.StatisticsResponse @@ -70,16 +81,34 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs := &WFS{ option: option, listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)), - pathToHandleIndex: make(map[filer2.FullPath]int), + handles: make(map[uint64]*FileHandle), bufPool: sync.Pool{ New: func() interface{} { return make([]byte, option.ChunkSizeLimit) }, }, - nodes: make(map[uint64]fs.Node), + } + if option.CacheSizeMB > 0 { + wfs.chunkCache = chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB) + grace.OnInterrupt(func() { + wfs.chunkCache.Shutdown() + }) + } + if wfs.option.AsyncMetaDataCaching { + wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta")) + startTime := time.Now() + if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil { + glog.V(0).Infof("failed to init meta cache: %v", err) + } else { + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) + grace.OnInterrupt(func() { + wfs.metaCache.Shutdown() + }) + } } - wfs.root = &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs} + wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} + wfs.fsNodeCache = newFsCache(wfs.root) return wfs } @@ -88,23 +117,18 @@ func (wfs *WFS) Root() (fs.Node, error) { return wfs.root, nil } -func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error { +var _ = filer_pb.FilerClient(&WFS{}) - err := util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { +func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(ctx2, client) + return fn(client) }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) if err == nil { return nil } - if strings.Contains(err.Error(), "context canceled") { - glog.V(2).Infoln("retry context canceled request...") - return util.WithCachedGrpcClient(context.Background(), func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(ctx2, client) - }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) - } return err } @@ -117,40 +141,27 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand wfs.handlesLock.Lock() defer wfs.handlesLock.Unlock() - index, found := wfs.pathToHandleIndex[fullpath] - if found && wfs.handles[index] != nil { - glog.V(2).Infoln(fullpath, "found fileHandle id", index) - return wfs.handles[index] + inodeId := file.fullpath().AsInode() + existingHandle, found := wfs.handles[inodeId] + if found && existingHandle != nil { + return existingHandle } fileHandle = newFileHandle(file, uid, gid) - for i, h := range wfs.handles { - if h == nil { - wfs.handles[i] = fileHandle - fileHandle.handle = uint64(i) - wfs.pathToHandleIndex[fullpath] = i - glog.V(4).Infof("%s reuse fh %d", fullpath, fileHandle.handle) - return - } - } - - wfs.handles = append(wfs.handles, fileHandle) - fileHandle.handle = uint64(len(wfs.handles) - 1) - wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle) + wfs.handles[inodeId] = fileHandle + fileHandle.handle = inodeId glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle) return } -func (wfs *WFS) ReleaseHandle(fullpath filer2.FullPath, handleId fuse.HandleID) { +func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) { wfs.handlesLock.Lock() defer wfs.handlesLock.Unlock() glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles)) - delete(wfs.pathToHandleIndex, fullpath) - if int(handleId) < len(wfs.handles) { - wfs.handles[int(handleId)] = nil - } + + delete(wfs.handles, fullpath.AsInode()) return } @@ -162,7 +173,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. if wfs.stats.lastChecked < time.Now().Unix()-20 { - err := wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.StatisticsRequest{ Collection: wfs.option.Collection, @@ -171,7 +182,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. } glog.V(4).Infof("reading filer stats: %+v", request) - resp, err := client.Statistics(ctx, request) + resp, err := client.Statistics(context.Background(), request) if err != nil { glog.V(0).Infof("reading filer stats %v: %v", request, err) return err @@ -217,43 +228,33 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. return nil } -func (wfs *WFS) cacheGet(path filer2.FullPath) *filer_pb.Entry { +func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry { item := wfs.listDirectoryEntriesCache.Get(string(path)) if item != nil && !item.Expired() { return item.Value().(*filer_pb.Entry) } return nil } -func (wfs *WFS) cacheSet(path filer2.FullPath, entry *filer_pb.Entry, ttl time.Duration) { +func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) { if entry == nil { wfs.listDirectoryEntriesCache.Delete(string(path)) } else { wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl) } } -func (wfs *WFS) cacheDelete(path filer2.FullPath) { +func (wfs *WFS) cacheDelete(path util.FullPath) { wfs.listDirectoryEntriesCache.Delete(string(path)) } -func (wfs *WFS) getNode(fullpath filer2.FullPath, fn func() fs.Node) fs.Node { - wfs.nodesLock.Lock() - defer wfs.nodesLock.Unlock() - - node, found := wfs.nodes[fullpath.AsInode()] - if found { - return node +func (wfs *WFS) AdjustedUrl(hostAndPort string) string { + if !wfs.option.OutsideContainerClusterMode { + return hostAndPort } - node = fn() - if node != nil { - wfs.nodes[fullpath.AsInode()] = node + commaIndex := strings.Index(hostAndPort, ":") + if commaIndex < 0 { + return hostAndPort } - return node -} - -func (wfs *WFS) forgetNode(fullpath filer2.FullPath) { - wfs.nodesLock.Lock() - defer wfs.nodesLock.Unlock() - - delete(wfs.nodes, fullpath.AsInode()) + filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":") + return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:]) } diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index cce0c792c..bf21b1808 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -3,14 +3,15 @@ package filesys import ( "context" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "google.golang.org/grpc" ) -func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChunk) { +func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { if len(chunks) == 0 { return } @@ -20,13 +21,13 @@ func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChu fileIds = append(fileIds, chunk.GetFileIdString()) } - wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - deleteFileIds(ctx, wfs.option.GrpcDialOption, client, fileIds) + wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + wfs.deleteFileIds(wfs.option.GrpcDialOption, client, fileIds) return nil }) } -func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error { +func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error { var vids []string for _, fileId := range fileIds { @@ -38,7 +39,7 @@ func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client f m := make(map[string]operation.LookupResult) glog.V(4).Infof("remove file lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: vids, }) if err != nil { @@ -56,7 +57,7 @@ func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client f } for _, loc := range locations.Locations { lr.Locations = append(lr.Locations, operation.Location{ - Url: loc.Url, + Url: wfs.AdjustedUrl(loc.Url), PublicUrl: loc.PublicUrl, }) } diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go index 9dfb491fd..7e7b8c60b 100644 --- a/weed/filesys/xattr.go +++ b/weed/filesys/xattr.go @@ -2,11 +2,10 @@ package filesys import ( "context" - "strings" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse" ) @@ -108,25 +107,34 @@ func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.Lis } -func (wfs *WFS) maybeLoadEntry(ctx context.Context, dir, name string) (entry *filer_pb.Entry, err error) { +func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) { - fullpath := filer2.NewFullPath(dir, name) + fullpath := util.NewFullPath(dir, name) entry = wfs.cacheGet(fullpath) if entry != nil { return } // glog.V(3).Infof("read entry cache miss %s", fullpath) - err = wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + // read from async meta cache + if wfs.option.AsyncMetaDataCaching { + cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT + } + return cachedEntry.ToProtoEntry(), nil + } + + err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Name: name, Directory: dir, } - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil || resp == nil || resp.Entry == nil { - if err == filer2.ErrNotFound || strings.Contains(err.Error(), filer2.ErrNotFound.Error()) { + resp, err := filer_pb.LookupEntry(client, request) + if err != nil { + if err == filer_pb.ErrNotFound { glog.V(3).Infof("file attr read not found file %v: %v", request, err) return fuse.ENOENT } |
