aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
authorshibinbin <shibinbin@megvii.com>2020-06-04 17:24:18 +0800
committershibinbin <shibinbin@megvii.com>2020-06-04 17:24:18 +0800
commit40334bc28d3fa694ce59b4e65077efb845264d20 (patch)
treea085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/filesys
parentd892cad15d748327c2b7c649f6398ff35d8dce0b (diff)
parentfbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff)
downloadseaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz
seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir.go263
-rw-r--r--weed/filesys/dir_link.go18
-rw-r--r--weed/filesys/dir_rename.go38
-rw-r--r--weed/filesys/dir_test.go34
-rw-r--r--weed/filesys/dirty_page.go79
-rw-r--r--weed/filesys/dirty_page_interval_test.go17
-rw-r--r--weed/filesys/file.go45
-rw-r--r--weed/filesys/filehandle.go78
-rw-r--r--weed/filesys/fscache.go207
-rw-r--r--weed/filesys/fscache_test.go96
-rw-r--r--weed/filesys/meta_cache/cache_config.go32
-rw-r--r--weed/filesys/meta_cache/meta_cache.go93
-rw-r--r--weed/filesys/meta_cache/meta_cache_init.go21
-rw-r--r--weed/filesys/meta_cache/meta_cache_subscribe.go69
-rw-r--r--weed/filesys/wfs.go133
-rw-r--r--weed/filesys/wfs_deletion.go15
-rw-r--r--weed/filesys/xattr.go24
17 files changed, 965 insertions, 297 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index abe5a21a6..e4260d56f 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -1,6 +1,7 @@
package filesys
import (
+ "bytes"
"context"
"os"
"strings"
@@ -9,14 +10,16 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
type Dir struct {
- Path string
- wfs *WFS
- entry *filer_pb.Entry
+ name string
+ wfs *WFS
+ entry *filer_pb.Entry
+ parent *Dir
}
var _ = fs.Node(&Dir{})
@@ -35,39 +38,37 @@ var _ = fs.NodeForgetter(&Dir{})
func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
- glog.V(3).Infof("dir Attr %s, existing attr: %+v", dir.Path, attr)
-
// https://github.com/bazil/fuse/issues/196
attr.Valid = time.Second
- if dir.Path == dir.wfs.option.FilerMountRootPath {
+ if dir.FullPath() == dir.wfs.option.FilerMountRootPath {
dir.setRootDirAttributes(attr)
- glog.V(3).Infof("root dir Attr %s, attr: %+v", dir.Path, attr)
+ glog.V(3).Infof("root dir Attr %s, attr: %+v", dir.FullPath(), attr)
return nil
}
- if err := dir.maybeLoadEntry(ctx); err != nil {
- glog.V(3).Infof("dir Attr %s,err: %+v", dir.Path, err)
+ if err := dir.maybeLoadEntry(); err != nil {
+ glog.V(3).Infof("dir Attr %s,err: %+v", dir.FullPath(), err)
return err
}
- attr.Inode = filer2.FullPath(dir.Path).AsInode()
+ attr.Inode = util.FullPath(dir.FullPath()).AsInode()
attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir
attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0)
- attr.Ctime = time.Unix(dir.entry.Attributes.Crtime, 0)
+ attr.Crtime = time.Unix(dir.entry.Attributes.Crtime, 0)
attr.Gid = dir.entry.Attributes.Gid
attr.Uid = dir.entry.Attributes.Uid
- glog.V(3).Infof("dir Attr %s, attr: %+v", dir.Path, attr)
+ glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr)
return nil
}
func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
- glog.V(4).Infof("dir Getxattr %s", dir.Path)
+ glog.V(4).Infof("dir Getxattr %s", dir.FullPath())
- if err := dir.maybeLoadEntry(ctx); err != nil {
+ if err := dir.maybeLoadEntry(); err != nil {
return err
}
@@ -88,7 +89,7 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
}
func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
- return dir.wfs.getNode(filer2.NewFullPath(dir.Path, name), func() fs.Node {
+ return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node {
return &File{
Name: name,
dir: dir,
@@ -99,17 +100,19 @@ func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
})
}
-func (dir *Dir) newDirectory(fullpath filer2.FullPath, entry *filer_pb.Entry) fs.Node {
- return dir.wfs.getNode(fullpath, func() fs.Node {
- return &Dir{Path: string(fullpath), wfs: dir.wfs, entry: entry}
+func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node {
+
+ return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node {
+ return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir}
})
+
}
func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
request := &filer_pb.CreateEntryRequest{
- Directory: dir.Path,
+ Directory: dir.FullPath(),
Entry: &filer_pb.Entry{
Name: req.Name,
IsDirectory: req.Mode&os.ModeDir > 0,
@@ -126,22 +129,27 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
},
OExcl: req.Flags&fuse.OpenExclusive != 0,
}
- glog.V(1).Infof("create: %v", req.String())
+ glog.V(1).Infof("create %s/%s: %v", dir.FullPath(), req.Name, req.Flags)
- if err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
- if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
+ if err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
if strings.Contains(err.Error(), "EEXIST") {
return fuse.EEXIST
}
return fuse.EIO
}
+
+ if dir.wfs.option.AsyncMetaDataCaching {
+ dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ }
+
return nil
}); err != nil {
return nil, nil, err
}
var node fs.Node
if request.Entry.IsDirectory {
- node = dir.newDirectory(filer2.NewFullPath(dir.Path, req.Name), request.Entry)
+ node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), request.Entry)
return node, nil, nil
}
@@ -155,6 +163,8 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
+ glog.V(4).Infof("mkdir %s: %s", dir.FullPath(), req.Name)
+
newEntry := &filer_pb.Entry{
Name: req.Name,
IsDirectory: true,
@@ -167,40 +177,55 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
},
}
- err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
- Directory: dir.Path,
+ Directory: dir.FullPath(),
Entry: newEntry,
}
glog.V(1).Infof("mkdir: %v", request)
- if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
- glog.V(0).Infof("mkdir %s/%s: %v", dir.Path, req.Name, err)
+ if err := filer_pb.CreateEntry(client, request); err != nil {
+ glog.V(0).Infof("mkdir %s/%s: %v", dir.FullPath(), req.Name, err)
return err
}
+ if dir.wfs.option.AsyncMetaDataCaching {
+ dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ }
+
return nil
})
if err == nil {
- node := dir.newDirectory(filer2.NewFullPath(dir.Path, req.Name), newEntry)
+ node := dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), newEntry)
+
return node, nil
}
+ glog.V(0).Infof("mkdir %s/%s: %v", dir.FullPath(), req.Name, err)
+
return nil, fuse.EIO
}
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
- glog.V(4).Infof("dir Lookup %s: %s", dir.Path, req.Name)
+ glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
- fullFilePath := filer2.NewFullPath(dir.Path, req.Name)
+ fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
entry := dir.wfs.cacheGet(fullFilePath)
+ if dir.wfs.option.AsyncMetaDataCaching {
+ cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
+ if cacheErr == filer_pb.ErrNotFound {
+ return nil, fuse.ENOENT
+ }
+ entry = cachedEntry.ToProtoEntry()
+ }
+
if entry == nil {
// glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
- entry, err = filer2.GetEntry(ctx, dir.wfs, fullFilePath)
+ entry, err = filer_pb.GetEntry(dir.wfs, fullFilePath)
if err != nil {
glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err)
return nil, fuse.ENOENT
@@ -221,7 +246,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
resp.Attr.Inode = fullFilePath.AsInode()
resp.Attr.Valid = time.Second
resp.Attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
- resp.Attr.Ctime = time.Unix(entry.Attributes.Crtime, 0)
+ resp.Attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
resp.Attr.Mode = os.FileMode(entry.Attributes.FileMode)
resp.Attr.Gid = entry.Attributes.Gid
resp.Attr.Uid = entry.Attributes.Uid
@@ -229,18 +254,17 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
return node, nil
}
- glog.V(1).Infof("not found dir GetEntry %s: %v", fullFilePath, err)
+ glog.V(4).Infof("not found dir GetEntry %s: %v", fullFilePath, err)
return nil, fuse.ENOENT
}
func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
- glog.V(3).Infof("dir ReadDirAll %s", dir.Path)
+ glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath())
cacheTtl := 5 * time.Minute
-
- readErr := filer2.ReadDirAllEntries(ctx, dir.wfs, filer2.FullPath(dir.Path), "", func(entry *filer_pb.Entry, isLast bool) {
- fullpath := filer2.NewFullPath(dir.Path, entry.Name)
+ processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error {
+ fullpath := util.NewFullPath(dir.FullPath(), entry.Name)
inode := fullpath.AsInode()
if entry.IsDirectory {
dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_Dir}
@@ -250,9 +274,24 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
ret = append(ret, dirent)
}
dir.wfs.cacheSet(fullpath, entry, cacheTtl)
- })
+ return nil
+ }
+
+ if dir.wfs.option.AsyncMetaDataCaching {
+ listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit))
+ if listErr != nil {
+ glog.Errorf("list meta cache: %v", listErr)
+ return nil, fuse.EIO
+ }
+ for _, cachedEntry := range listedEntries {
+ processEachEntryFn(cachedEntry.ToProtoEntry(), false)
+ }
+ return
+ }
+
+ readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", processEachEntryFn)
if readErr != nil {
- glog.V(0).Infof("list %s: %v", dir.Path, err)
+ glog.V(0).Infof("list %s: %v", dir.FullPath(), err)
return ret, fuse.EIO
}
@@ -262,74 +301,70 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
if !req.Dir {
- return dir.removeOneFile(ctx, req)
+ return dir.removeOneFile(req)
}
- return dir.removeFolder(ctx, req)
+ return dir.removeFolder(req)
}
-func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) error {
+func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
- filePath := filer2.NewFullPath(dir.Path, req.Name)
- entry, err := filer2.GetEntry(ctx, dir.wfs, filePath)
+ filePath := util.NewFullPath(dir.FullPath(), req.Name)
+ entry, err := filer_pb.GetEntry(dir.wfs, filePath)
if err != nil {
return err
}
+ if entry == nil {
+ return nil
+ }
- dir.wfs.deleteFileChunks(ctx, entry.Chunks)
+ dir.wfs.deleteFileChunks(entry.Chunks)
dir.wfs.cacheDelete(filePath)
+ dir.wfs.fsNodeCache.DeleteFsNode(filePath)
- return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.DeleteEntryRequest{
- Directory: dir.Path,
- Name: req.Name,
- IsDeleteData: false,
- }
+ if dir.wfs.option.AsyncMetaDataCaching {
+ dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
+ }
- glog.V(3).Infof("remove file: %v", request)
- _, err := client.DeleteEntry(ctx, request)
- if err != nil {
- glog.V(3).Infof("not found remove file %s/%s: %v", dir.Path, req.Name, err)
- return fuse.ENOENT
- }
+ glog.V(3).Infof("remove file: %v", req)
+ err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false)
+ if err != nil {
+ glog.V(3).Infof("not found remove file %s/%s: %v", dir.FullPath(), req.Name, err)
+ return fuse.ENOENT
+ }
- return nil
- })
+ return nil
}
-func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error {
-
- dir.wfs.cacheDelete(filer2.NewFullPath(dir.Path, req.Name))
+func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
- return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ t := util.NewFullPath(dir.FullPath(), req.Name)
+ dir.wfs.cacheDelete(t)
+ dir.wfs.fsNodeCache.DeleteFsNode(t)
- request := &filer_pb.DeleteEntryRequest{
- Directory: dir.Path,
- Name: req.Name,
- IsDeleteData: true,
- }
+ if dir.wfs.option.AsyncMetaDataCaching {
+ dir.wfs.metaCache.DeleteEntry(context.Background(), t)
+ }
- glog.V(3).Infof("remove directory entry: %v", request)
- _, err := client.DeleteEntry(ctx, request)
- if err != nil {
- glog.V(3).Infof("not found remove %s/%s: %v", dir.Path, req.Name, err)
- return fuse.ENOENT
- }
+ glog.V(3).Infof("remove directory entry: %v", req)
+ err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false)
+ if err != nil {
+ glog.V(3).Infof("not found remove %s/%s: %v", dir.FullPath(), req.Name, err)
+ return fuse.ENOENT
+ }
- return nil
- })
+ return nil
}
func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
- glog.V(3).Infof("%v dir setattr %+v", dir.Path, req)
+ glog.V(3).Infof("%v dir setattr %+v", dir.FullPath(), req)
- if err := dir.maybeLoadEntry(ctx); err != nil {
+ if err := dir.maybeLoadEntry(); err != nil {
return err
}
@@ -349,17 +384,17 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus
dir.entry.Attributes.Mtime = req.Mtime.Unix()
}
- dir.wfs.cacheDelete(filer2.FullPath(dir.Path))
+ dir.wfs.cacheDelete(util.FullPath(dir.FullPath()))
- return dir.saveEntry(ctx)
+ return dir.saveEntry()
}
func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
- glog.V(4).Infof("dir Setxattr %s: %s", dir.Path, req.Name)
+ glog.V(4).Infof("dir Setxattr %s: %s", dir.FullPath(), req.Name)
- if err := dir.maybeLoadEntry(ctx); err != nil {
+ if err := dir.maybeLoadEntry(); err != nil {
return err
}
@@ -367,17 +402,17 @@ func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
return err
}
- dir.wfs.cacheDelete(filer2.FullPath(dir.Path))
+ dir.wfs.cacheDelete(util.FullPath(dir.FullPath()))
- return dir.saveEntry(ctx)
+ return dir.saveEntry()
}
func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error {
- glog.V(4).Infof("dir Removexattr %s: %s", dir.Path, req.Name)
+ glog.V(4).Infof("dir Removexattr %s: %s", dir.FullPath(), req.Name)
- if err := dir.maybeLoadEntry(ctx); err != nil {
+ if err := dir.maybeLoadEntry(); err != nil {
return err
}
@@ -385,17 +420,17 @@ func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) e
return err
}
- dir.wfs.cacheDelete(filer2.FullPath(dir.Path))
+ dir.wfs.cacheDelete(util.FullPath(dir.FullPath()))
- return dir.saveEntry(ctx)
+ return dir.saveEntry()
}
func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
- glog.V(4).Infof("dir Listxattr %s", dir.Path)
+ glog.V(4).Infof("dir Listxattr %s", dir.FullPath())
- if err := dir.maybeLoadEntry(ctx); err != nil {
+ if err := dir.maybeLoadEntry(); err != nil {
return err
}
@@ -408,15 +443,15 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp
}
func (dir *Dir) Forget() {
- glog.V(3).Infof("Forget dir %s", dir.Path)
+ glog.V(3).Infof("Forget dir %s", dir.FullPath())
- dir.wfs.forgetNode(filer2.FullPath(dir.Path))
+ dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath()))
}
-func (dir *Dir) maybeLoadEntry(ctx context.Context) error {
+func (dir *Dir) maybeLoadEntry() error {
if dir.entry == nil {
- parentDirPath, name := filer2.FullPath(dir.Path).DirAndName()
- entry, err := dir.wfs.maybeLoadEntry(ctx, parentDirPath, name)
+ parentDirPath, name := util.FullPath(dir.FullPath()).DirAndName()
+ entry, err := dir.wfs.maybeLoadEntry(parentDirPath, name)
if err != nil {
return err
}
@@ -425,11 +460,11 @@ func (dir *Dir) maybeLoadEntry(ctx context.Context) error {
return nil
}
-func (dir *Dir) saveEntry(ctx context.Context) error {
+func (dir *Dir) saveEntry() error {
- parentDir, name := filer2.FullPath(dir.Path).DirAndName()
+ parentDir, name := util.FullPath(dir.FullPath()).DirAndName()
- return dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: parentDir,
@@ -437,12 +472,40 @@ func (dir *Dir) saveEntry(ctx context.Context) error {
}
glog.V(1).Infof("save dir entry: %v", request)
- _, err := client.UpdateEntry(ctx, request)
+ _, err := client.UpdateEntry(context.Background(), request)
if err != nil {
glog.V(0).Infof("UpdateEntry dir %s/%s: %v", parentDir, name, err)
return fuse.EIO
}
+ if dir.wfs.option.AsyncMetaDataCaching {
+ dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ }
+
return nil
})
}
+
+func (dir *Dir) FullPath() string {
+ var parts []string
+ for p := dir; p != nil; p = p.parent {
+ if strings.HasPrefix(p.name, "/") {
+ if len(p.name) > 1 {
+ parts = append(parts, p.name[1:])
+ }
+ } else {
+ parts = append(parts, p.name)
+ }
+ }
+
+ if len(parts) == 0 {
+ return "/"
+ }
+
+ var buf bytes.Buffer
+ for i := len(parts) - 1; i >= 0; i-- {
+ buf.WriteString("/")
+ buf.WriteString(parts[i])
+ }
+ return buf.String()
+}
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
index 8b7ec7e89..d1858e99b 100644
--- a/weed/filesys/dir_link.go
+++ b/weed/filesys/dir_link.go
@@ -6,6 +6,7 @@ import (
"syscall"
"time"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
@@ -17,10 +18,10 @@ var _ = fs.NodeReadlinker(&File{})
func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, error) {
- glog.V(3).Infof("Symlink: %v/%v to %v", dir.Path, req.NewName, req.Target)
+ glog.V(3).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target)
request := &filer_pb.CreateEntryRequest{
- Directory: dir.Path,
+ Directory: dir.FullPath(),
Entry: &filer_pb.Entry{
Name: req.NewName,
IsDirectory: false,
@@ -35,11 +36,16 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
},
}
- err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
- if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
- glog.V(0).Infof("symlink %s/%s: %v", dir.Path, req.NewName, err)
+ err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
+ glog.V(0).Infof("symlink %s/%s: %v", dir.FullPath(), req.NewName, err)
return fuse.EIO
}
+
+ if dir.wfs.option.AsyncMetaDataCaching {
+ dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ }
+
return nil
})
@@ -59,7 +65,7 @@ func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (stri
return "", fuse.Errno(syscall.EINVAL)
}
- glog.V(3).Infof("Readlink: %v/%v => %v", file.dir.Path, file.Name, file.entry.Attributes.SymlinkTarget)
+ glog.V(3).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, file.entry.Attributes.SymlinkTarget)
return file.entry.Attributes.SymlinkTarget, nil
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go
index 4eb3c15b5..ea40f5c31 100644
--- a/weed/filesys/dir_rename.go
+++ b/weed/filesys/dir_rename.go
@@ -3,9 +3,9 @@ package filesys
import (
"context"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
@@ -13,20 +13,24 @@ import (
func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error {
newDir := newDirectory.(*Dir)
- glog.V(4).Infof("dir Rename %s/%s => %s/%s", dir.Path, req.OldName, newDir.Path, req.NewName)
- err := dir.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ newPath := util.NewFullPath(newDir.FullPath(), req.NewName)
+ oldPath := util.NewFullPath(dir.FullPath(), req.OldName)
+
+ glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)
+
+ err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AtomicRenameEntryRequest{
- OldDirectory: dir.Path,
+ OldDirectory: dir.FullPath(),
OldName: req.OldName,
- NewDirectory: newDir.Path,
+ NewDirectory: newDir.FullPath(),
NewName: req.NewName,
}
- _, err := client.AtomicRenameEntry(ctx, request)
+ _, err := client.AtomicRenameEntry(context.Background(), request)
if err != nil {
- glog.V(0).Infof("dir Rename %s/%s => %s/%s : %v", dir.Path, req.OldName, newDir.Path, req.NewName, err)
+ glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err)
return fuse.EIO
}
@@ -35,28 +39,12 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
})
if err == nil {
- newPath := filer2.NewFullPath(newDir.Path, req.NewName)
- oldPath := filer2.NewFullPath(dir.Path, req.OldName)
dir.wfs.cacheDelete(newPath)
dir.wfs.cacheDelete(oldPath)
- oldFileNode := dir.wfs.getNode(oldPath, func() fs.Node {
- return nil
- })
- newDirNode := dir.wfs.getNode(filer2.FullPath(dir.Path), func() fs.Node {
- return nil
- })
- dir.wfs.forgetNode(newPath)
- dir.wfs.forgetNode(oldPath)
- if oldFileNode != nil && newDirNode != nil {
- oldFile := oldFileNode.(*File)
- oldFile.Name = req.NewName
- oldFile.dir = newDirNode.(*Dir)
- dir.wfs.getNode(newPath, func() fs.Node {
- return oldFile
- })
+ // fmt.Printf("rename path: %v => %v\n", oldPath, newPath)
+ dir.wfs.fsNodeCache.Move(oldPath, newPath)
- }
}
return err
diff --git a/weed/filesys/dir_test.go b/weed/filesys/dir_test.go
new file mode 100644
index 000000000..49c76eb5e
--- /dev/null
+++ b/weed/filesys/dir_test.go
@@ -0,0 +1,34 @@
+package filesys
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestDirPath(t *testing.T) {
+
+ p := &Dir{name: "/some"}
+ p = &Dir{name: "path", parent: p}
+ p = &Dir{name: "to", parent: p}
+ p = &Dir{name: "a", parent: p}
+ p = &Dir{name: "file", parent: p}
+
+ assert.Equal(t, "/some/path/to/a/file", p.FullPath())
+
+ p = &Dir{name: "/some"}
+ assert.Equal(t, "/some", p.FullPath())
+
+ p = &Dir{name: "/"}
+ assert.Equal(t, "/", p.FullPath())
+
+ p = &Dir{name: "/"}
+ p = &Dir{name: "path", parent: p}
+ assert.Equal(t, "/path", p.FullPath())
+
+ p = &Dir{name: "/"}
+ p = &Dir{name: "path", parent: p}
+ p = &Dir{name: "to", parent: p}
+ assert.Equal(t, "/path/to", p.FullPath())
+
+}
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 5ff128323..45224b3e7 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -15,9 +15,11 @@ import (
)
type ContinuousDirtyPages struct {
- intervals *ContinuousIntervals
- f *File
- lock sync.Mutex
+ intervals *ContinuousIntervals
+ f *File
+ lock sync.Mutex
+ collection string
+ replication string
}
func newDirtyPages(file *File) *ContinuousDirtyPages {
@@ -32,7 +34,7 @@ func (pages *ContinuousDirtyPages) releaseResource() {
var counter = int32(0)
-func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
+func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
pages.lock.Lock()
defer pages.lock.Unlock()
@@ -41,7 +43,7 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da
if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
// this is more than what buffer can hold.
- return pages.flushAndSave(ctx, offset, data)
+ return pages.flushAndSave(offset, data)
}
pages.intervals.AddInterval(data, offset)
@@ -50,7 +52,7 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da
var hasSavedData bool
if pages.intervals.TotalSize() > pages.f.wfs.option.ChunkSizeLimit {
- chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage(ctx)
+ chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage()
if hasSavedData {
chunks = append(chunks, chunk)
}
@@ -59,13 +61,13 @@ func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, da
return
}
-func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
+func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
var chunk *filer_pb.FileChunk
var newChunks []*filer_pb.FileChunk
// flush existing
- if newChunks, err = pages.saveExistingPagesToStorage(ctx); err == nil {
+ if newChunks, err = pages.saveExistingPagesToStorage(); err == nil {
if newChunks != nil {
chunks = append(chunks, newChunks...)
}
@@ -74,35 +76,35 @@ func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int6
}
// flush the new page
- if chunk, err = pages.saveToStorage(ctx, bytes.NewReader(data), offset, int64(len(data))); err == nil {
+ if chunk, err = pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))); err == nil {
if chunk != nil {
- glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId)
+ glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.FullPath(), pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId)
chunks = append(chunks, chunk)
}
} else {
- glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
+ glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.FullPath(), pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
return
}
return
}
-func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunks []*filer_pb.FileChunk, err error) {
+func (pages *ContinuousDirtyPages) FlushToStorage() (chunks []*filer_pb.FileChunk, err error) {
pages.lock.Lock()
defer pages.lock.Unlock()
- return pages.saveExistingPagesToStorage(ctx)
+ return pages.saveExistingPagesToStorage()
}
-func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (chunks []*filer_pb.FileChunk, err error) {
+func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer_pb.FileChunk, err error) {
var hasSavedData bool
var chunk *filer_pb.FileChunk
for {
- chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage(ctx)
+ chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage()
if !hasSavedData {
return chunks, err
}
@@ -116,31 +118,35 @@ func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Contex
}
-func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, hasSavedData bool, err error) {
+func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *filer_pb.FileChunk, hasSavedData bool, err error) {
maxList := pages.intervals.RemoveLargestIntervalLinkedList()
if maxList == nil {
return nil, false, nil
}
- chunk, err = pages.saveToStorage(ctx, maxList.ToReader(), maxList.Offset(), maxList.Size())
- if err == nil {
- hasSavedData = true
- glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId)
- } else {
- glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err)
- return
+ for {
+ chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size())
+ if err == nil {
+ hasSavedData = true
+ glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId)
+ return
+ } else {
+ glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err)
+ time.Sleep(5 * time.Second)
+ }
}
- return
}
-func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) {
+func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) {
var fileId, host string
var auth security.EncodedJwt
- if err := pages.f.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ dir, _ := pages.f.fullpath().DirAndName()
+
+ if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -148,15 +154,21 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, reader io.
Collection: pages.f.wfs.option.Collection,
TtlSec: pages.f.wfs.option.TtlSec,
DataCenter: pages.f.wfs.option.DataCenter,
+ ParentPath: dir,
}
- resp, err := client.AssignVolume(ctx, request)
+ resp, err := client.AssignVolume(context.Background(), request)
if err != nil {
glog.V(0).Infof("assign volume failure %v: %v", request, err)
return err
}
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ host = pages.f.wfs.AdjustedUrl(host)
+ pages.collection, pages.replication = resp.Collection, resp.Replication
return nil
}); err != nil {
@@ -164,7 +176,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, reader io.
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- uploadResult, err := operation.Upload(fileUrl, pages.f.Name, reader, false, "", nil, auth)
+ uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth)
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
return nil, fmt.Errorf("upload data: %v", err)
@@ -173,14 +185,9 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, reader io.
glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
}
+ pages.f.wfs.chunkCache.SetChunk(fileId, data)
- return &filer_pb.FileChunk{
- FileId: fileId,
- Offset: offset,
- Size: uint64(size),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
- }, nil
+ return uploadResult.ToPbFileChunk(fileId, offset), nil
}
@@ -197,7 +204,7 @@ func min(x, y int64) int64 {
return y
}
-func (pages *ContinuousDirtyPages) ReadDirtyData(ctx context.Context, data []byte, startOffset int64) (offset int64, size int) {
+func (pages *ContinuousDirtyPages) ReadDirtyData(data []byte, startOffset int64) (offset int64, size int) {
pages.lock.Lock()
defer pages.lock.Unlock()
diff --git a/weed/filesys/dirty_page_interval_test.go b/weed/filesys/dirty_page_interval_test.go
index 184be2f3b..ab3b37b7c 100644
--- a/weed/filesys/dirty_page_interval_test.go
+++ b/weed/filesys/dirty_page_interval_test.go
@@ -35,6 +35,23 @@ func TestContinuousIntervals_AddIntervalFullOverwrite(t *testing.T) {
c := &ContinuousIntervals{}
+ // 1,
+ c.AddInterval(getBytes(1, 1), 0)
+ // _, 2,
+ c.AddInterval(getBytes(2, 1), 1)
+ // _, _, 3, 3, 3
+ c.AddInterval(getBytes(3, 3), 2)
+ // _, _, _, 4, 4, 4
+ c.AddInterval(getBytes(4, 3), 3)
+
+ expectedData(t, c, 0, 1, 2, 3, 4, 4, 4)
+
+}
+
+func TestContinuousIntervals_RealCase1(t *testing.T) {
+
+ c := &ContinuousIntervals{}
+
// 25,
c.AddInterval(getBytes(25, 1), 0)
// _, _, _, _, 23, 23
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index eccef4e58..bafbd7cc8 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -2,6 +2,7 @@ package filesys
import (
"context"
+ "io"
"os"
"sort"
"time"
@@ -9,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
@@ -32,10 +34,11 @@ type File struct {
entry *filer_pb.Entry
entryViewCache []filer2.VisibleInterval
isOpen int
+ reader io.ReaderAt
}
-func (file *File) fullpath() filer2.FullPath {
- return filer2.NewFullPath(file.dir.Path, file.Name)
+func (file *File) fullpath() util.FullPath {
+ return util.NewFullPath(file.dir.FullPath(), file.Name)
}
func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
@@ -69,7 +72,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
- // glog.V(4).Infof("file Getxattr %s", file.fullpath())
+ glog.V(4).Infof("file Getxattr %s", file.fullpath())
if err := file.maybeLoadEntry(ctx); err != nil {
return err
@@ -119,6 +122,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
}
file.entry.Chunks = chunks
file.entryViewCache = nil
+ file.reader = nil
}
file.entry.Attributes.FileSize = req.Size
}
@@ -148,7 +152,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
file.wfs.cacheDelete(file.fullpath())
- return file.saveEntry(ctx)
+ return file.saveEntry()
}
@@ -166,7 +170,7 @@ func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error
file.wfs.cacheDelete(file.fullpath())
- return file.saveEntry(ctx)
+ return file.saveEntry()
}
@@ -184,7 +188,7 @@ func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest)
file.wfs.cacheDelete(file.fullpath())
- return file.saveEntry(ctx)
+ return file.saveEntry()
}
@@ -207,22 +211,22 @@ func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, res
func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
// fsync works at OS level
// write the file chunks to the filerGrpcAddress
- glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req)
+ glog.V(3).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req)
return nil
}
func (file *File) Forget() {
- glog.V(3).Infof("Forget file %s/%s", file.dir.Path, file.Name)
-
- file.wfs.forgetNode(filer2.NewFullPath(file.dir.Path, file.Name))
-
+ t := util.NewFullPath(file.dir.FullPath(), file.Name)
+ glog.V(3).Infof("Forget file %s", t)
+ file.wfs.fsNodeCache.DeleteFsNode(t)
}
func (file *File) maybeLoadEntry(ctx context.Context) error {
if file.entry == nil || file.isOpen <= 0 {
- entry, err := file.wfs.maybeLoadEntry(ctx, file.dir.Path, file.Name)
+ entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name)
if err != nil {
+ glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
return err
}
if entry != nil {
@@ -246,6 +250,8 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
newVisibles = t
}
+ file.reader = nil
+
glog.V(3).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
file.entry.Chunks = append(file.entry.Chunks, chunks...)
@@ -254,23 +260,28 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
func (file *File) setEntry(entry *filer_pb.Entry) {
file.entry = entry
file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks)
+ file.reader = nil
}
-func (file *File) saveEntry(ctx context.Context) error {
- return file.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+func (file *File) saveEntry() error {
+ return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
- Directory: file.dir.Path,
+ Directory: file.dir.FullPath(),
Entry: file.entry,
}
glog.V(1).Infof("save file entry: %v", request)
- _, err := client.UpdateEntry(ctx, request)
+ _, err := client.UpdateEntry(context.Background(), request)
if err != nil {
- glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.Path, file.Name, err)
+ glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
return fuse.EIO
}
+ if file.wfs.option.AsyncMetaDataCaching {
+ file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
+ }
+
return nil
})
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index cf253a7ed..372d742ea 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -3,12 +3,10 @@ package filesys
import (
"context"
"fmt"
- "mime"
- "path"
+ "math"
+ "net/http"
"time"
- "github.com/gabriel-vasile/mimetype"
-
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -28,15 +26,20 @@ type FileHandle struct {
NodeId fuse.NodeID // file or directory the request is about
Uid uint32 // user ID of process making request
Gid uint32 // group ID of process making request
+
}
func newFileHandle(file *File, uid, gid uint32) *FileHandle {
- return &FileHandle{
+ fh := &FileHandle{
f: file,
dirtyPages: newDirtyPages(file),
Uid: uid,
Gid: gid,
}
+ if fh.f.entry != nil {
+ fh.f.entry.Attributes.FileSize = filer2.TotalSize(fh.f.entry.Chunks)
+ }
+ return fh
}
var _ = fs.Handle(&FileHandle{})
@@ -53,9 +56,9 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus
buff := make([]byte, req.Size)
- totalRead, err := fh.readFromChunks(ctx, buff, req.Offset)
+ totalRead, err := fh.readFromChunks(buff, req.Offset)
if err == nil {
- dirtyOffset, dirtySize := fh.readFromDirtyPages(ctx, buff, req.Offset)
+ dirtyOffset, dirtySize := fh.readFromDirtyPages(buff, req.Offset)
if totalRead+req.Offset < dirtyOffset+int64(dirtySize) {
totalRead = dirtyOffset + int64(dirtySize) - req.Offset
}
@@ -71,11 +74,11 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus
return err
}
-func (fh *FileHandle) readFromDirtyPages(ctx context.Context, buff []byte, startOffset int64) (offset int64, size int) {
- return fh.dirtyPages.ReadDirtyData(ctx, buff, startOffset)
+func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (offset int64, size int) {
+ return fh.dirtyPages.ReadDirtyData(buff, startOffset)
}
-func (fh *FileHandle) readFromChunks(ctx context.Context, buff []byte, offset int64) (int64, error) {
+func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
// this value should come from the filer instead of the old f
if len(fh.f.entry.Chunks) == 0 {
@@ -85,43 +88,46 @@ func (fh *FileHandle) readFromChunks(ctx context.Context, buff []byte, offset in
if fh.f.entryViewCache == nil {
fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks)
+ fh.f.reader = nil
}
- chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, offset, len(buff))
+ if fh.f.reader == nil {
+ chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt32)
+ fh.f.reader = filer2.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache)
+ }
- totalRead, err := filer2.ReadIntoBuffer(ctx, fh.f.wfs, fh.f.fullpath(), buff, chunkViews, offset)
+ totalRead, err := fh.f.reader.ReadAt(buff, offset)
if err != nil {
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
}
- return totalRead, err
+ // glog.V(0).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err)
+
+ return int64(totalRead), err
}
// Write to the file handle
func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
// write the request to volume servers
+ data := make([]byte, len(req.Data))
+ copy(data, req.Data)
- fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(req.Data)), int64(fh.f.entry.Attributes.FileSize)))
+ fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize)))
// glog.V(0).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)))
- chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data)
+ chunks, err := fh.dirtyPages.AddPage(req.Offset, data)
if err != nil {
- glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err)
+ glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(data)), err)
return fuse.EIO
}
- resp.Size = len(req.Data)
+ resp.Size = len(data)
if req.Offset == 0 {
// detect mime type
- detectedMIME := mimetype.Detect(req.Data)
- fh.contentType = detectedMIME.String()
- if ext := path.Ext(fh.f.Name); ext != detectedMIME.Extension() {
- fh.contentType = mime.TypeByExtension(ext)
- }
-
+ fh.contentType = http.DetectContentType(data)
fh.dirtyMetadata = true
}
@@ -145,6 +151,8 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
fh.dirtyPages.releaseResource()
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
}
+ fh.f.entryViewCache = nil
+ fh.f.reader = nil
return nil
}
@@ -154,14 +162,14 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
// send the data to the OS
glog.V(4).Infof("%s fh %d flush %v", fh.f.fullpath(), fh.handle, req)
- chunks, err := fh.dirtyPages.FlushToStorage(ctx)
+ chunks, err := fh.dirtyPages.FlushToStorage()
if err != nil {
glog.Errorf("flush %s: %v", fh.f.fullpath(), err)
return fuse.EIO
}
- fh.f.addChunks(chunks)
if len(chunks) > 0 {
+ fh.f.addChunks(chunks)
fh.dirtyMetadata = true
}
@@ -169,7 +177,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
return nil
}
- err = fh.f.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ err = fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
if fh.f.entry.Attributes != nil {
fh.f.entry.Attributes.Mime = fh.contentType
@@ -177,11 +185,13 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
fh.f.entry.Attributes.Gid = req.Gid
fh.f.entry.Attributes.Mtime = time.Now().Unix()
fh.f.entry.Attributes.Crtime = time.Now().Unix()
- fh.f.entry.Attributes.FileMode = uint32(0777 &^ fh.f.wfs.option.Umask)
+ fh.f.entry.Attributes.FileMode = uint32(0666 &^ fh.f.wfs.option.Umask)
+ fh.f.entry.Attributes.Collection = fh.dirtyPages.collection
+ fh.f.entry.Attributes.Replication = fh.dirtyPages.replication
}
request := &filer_pb.CreateEntryRequest{
- Directory: fh.f.dir.Path,
+ Directory: fh.f.dir.FullPath(),
Entry: fh.f.entry,
}
@@ -194,12 +204,16 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
fh.f.entry.Chunks = chunks
// fh.f.entryViewCache = nil
- if err := filer_pb.CreateEntry(ctx, client, request); err != nil {
- glog.Errorf("update fh: %v", err)
- return fmt.Errorf("update fh: %v", err)
+ if err := filer_pb.CreateEntry(client, request); err != nil {
+ glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
+ return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
+ }
+
+ if fh.f.wfs.option.AsyncMetaDataCaching {
+ fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
}
- fh.f.wfs.deleteFileChunks(ctx, garbages)
+ fh.f.wfs.deleteFileChunks(garbages)
for i, chunk := range garbages {
glog.V(3).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
}
diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go
new file mode 100644
index 000000000..b146f0615
--- /dev/null
+++ b/weed/filesys/fscache.go
@@ -0,0 +1,207 @@
+package filesys
+
+import (
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/seaweedfs/fuse/fs"
+)
+
+type FsCache struct {
+ root *FsNode
+ sync.RWMutex
+}
+type FsNode struct {
+ parent *FsNode
+ node fs.Node
+ name string
+ childrenLock sync.RWMutex
+ children map[string]*FsNode
+}
+
+func newFsCache(root fs.Node) *FsCache {
+ return &FsCache{
+ root: &FsNode{
+ node: root,
+ },
+ }
+}
+
+func (c *FsCache) GetFsNode(path util.FullPath) fs.Node {
+
+ c.RLock()
+ defer c.RUnlock()
+
+ return c.doGetFsNode(path)
+}
+
+func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node {
+ t := c.root
+ for _, p := range path.Split() {
+ t = t.findChild(p)
+ if t == nil {
+ return nil
+ }
+ }
+ return t.node
+}
+
+func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) {
+
+ c.Lock()
+ defer c.Unlock()
+
+ c.doSetFsNode(path, node)
+}
+
+func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) {
+ t := c.root
+ for _, p := range path.Split() {
+ t = t.ensureChild(p)
+ }
+ t.node = node
+}
+
+func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node {
+
+ c.Lock()
+ defer c.Unlock()
+
+ t := c.doGetFsNode(path)
+ if t != nil {
+ return t
+ }
+ t = genNodeFn()
+ c.doSetFsNode(path, t)
+ return t
+}
+
+func (c *FsCache) DeleteFsNode(path util.FullPath) {
+
+ c.Lock()
+ defer c.Unlock()
+
+ t := c.root
+ for _, p := range path.Split() {
+ t = t.findChild(p)
+ if t == nil {
+ return
+ }
+ }
+ if t.parent != nil {
+ t.parent.disconnectChild(t)
+ }
+ t.deleteSelf()
+}
+
+// oldPath and newPath are full path including the new name
+func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
+
+ c.Lock()
+ defer c.Unlock()
+
+ // find old node
+ src := c.root
+ for _, p := range oldPath.Split() {
+ src = src.findChild(p)
+ if src == nil {
+ return src
+ }
+ }
+ if src.parent != nil {
+ src.parent.disconnectChild(src)
+ }
+
+ // find new node
+ target := c.root
+ for _, p := range newPath.Split() {
+ target = target.ensureChild(p)
+ }
+ parent := target.parent
+ src.name = target.name
+ if dir, ok := src.node.(*Dir); ok {
+ dir.name = target.name // target is not Dir, but a shortcut
+ }
+ if f, ok := src.node.(*File); ok {
+ f.Name = target.name
+ if f.entry != nil {
+ f.entry.Name = f.Name
+ }
+ }
+ parent.disconnectChild(target)
+
+ target.deleteSelf()
+
+ src.connectToParent(parent)
+
+ return src
+}
+
+func (n *FsNode) connectToParent(parent *FsNode) {
+ n.parent = parent
+ oldNode := parent.findChild(n.name)
+ if oldNode != nil {
+ oldNode.deleteSelf()
+ }
+ if dir, ok := n.node.(*Dir); ok {
+ dir.parent = parent.node.(*Dir)
+ }
+ if f, ok := n.node.(*File); ok {
+ f.dir = parent.node.(*Dir)
+ }
+ n.childrenLock.Lock()
+ parent.children[n.name] = n
+ n.childrenLock.Unlock()
+}
+
+func (n *FsNode) findChild(name string) *FsNode {
+ n.childrenLock.RLock()
+ defer n.childrenLock.RUnlock()
+
+ child, found := n.children[name]
+ if found {
+ return child
+ }
+ return nil
+}
+
+func (n *FsNode) ensureChild(name string) *FsNode {
+ n.childrenLock.Lock()
+ defer n.childrenLock.Unlock()
+
+ if n.children == nil {
+ n.children = make(map[string]*FsNode)
+ }
+ child, found := n.children[name]
+ if found {
+ return child
+ }
+ t := &FsNode{
+ parent: n,
+ node: nil,
+ name: name,
+ children: nil,
+ }
+ n.children[name] = t
+ return t
+}
+
+func (n *FsNode) disconnectChild(child *FsNode) {
+ n.childrenLock.Lock()
+ delete(n.children, child.name)
+ n.childrenLock.Unlock()
+ child.parent = nil
+}
+
+func (n *FsNode) deleteSelf() {
+ n.childrenLock.Lock()
+ for _, child := range n.children {
+ child.deleteSelf()
+ }
+ n.children = nil
+ n.childrenLock.Unlock()
+
+ n.node = nil
+ n.parent = nil
+
+}
diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go
new file mode 100644
index 000000000..67f9aacc8
--- /dev/null
+++ b/weed/filesys/fscache_test.go
@@ -0,0 +1,96 @@
+package filesys
+
+import (
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func TestPathSplit(t *testing.T) {
+ parts := util.FullPath("/").Split()
+ if len(parts) != 0 {
+ t.Errorf("expecting an empty list, but getting %d", len(parts))
+ }
+
+ parts = util.FullPath("/readme.md").Split()
+ if len(parts) != 1 {
+ t.Errorf("expecting an empty list, but getting %d", len(parts))
+ }
+
+}
+
+func TestFsCache(t *testing.T) {
+
+ cache := newFsCache(nil)
+
+ x := cache.GetFsNode(util.FullPath("/y/x"))
+ if x != nil {
+ t.Errorf("wrong node!")
+ }
+
+ p := util.FullPath("/a/b/c")
+ cache.SetFsNode(p, &File{Name: "cc"})
+ tNode := cache.GetFsNode(p)
+ tFile := tNode.(*File)
+ if tFile.Name != "cc" {
+ t.Errorf("expecting a FsNode")
+ }
+
+ cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
+ cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
+ cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"})
+ cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
+ cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
+
+ b := cache.GetFsNode(util.FullPath("/a/b"))
+ if b != nil {
+ t.Errorf("unexpected node!")
+ }
+
+ a := cache.GetFsNode(util.FullPath("/a"))
+ if a == nil {
+ t.Errorf("missing node!")
+ }
+
+ cache.DeleteFsNode(util.FullPath("/a"))
+ if b != nil {
+ t.Errorf("unexpected node!")
+ }
+
+ a = cache.GetFsNode(util.FullPath("/a"))
+ if a != nil {
+ t.Errorf("wrong DeleteFsNode!")
+ }
+
+ z := cache.GetFsNode(util.FullPath("/z"))
+ if z == nil {
+ t.Errorf("missing node!")
+ }
+
+ y := cache.GetFsNode(util.FullPath("/x/y"))
+ if y != nil {
+ t.Errorf("wrong node!")
+ }
+
+}
+
+func TestFsCacheMove(t *testing.T) {
+
+ cache := newFsCache(nil)
+
+ cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
+ cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
+ cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
+ cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
+
+ cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x"))
+
+ d := cache.GetFsNode(util.FullPath("/z/x/d"))
+ if d == nil {
+ t.Errorf("unexpected nil node!")
+ }
+ if d.(*File).Name != "dd" {
+ t.Errorf("unexpected non dd node!")
+ }
+
+}
diff --git a/weed/filesys/meta_cache/cache_config.go b/weed/filesys/meta_cache/cache_config.go
new file mode 100644
index 000000000..e6593ebde
--- /dev/null
+++ b/weed/filesys/meta_cache/cache_config.go
@@ -0,0 +1,32 @@
+package meta_cache
+
+import "github.com/chrislusf/seaweedfs/weed/util"
+
+var (
+ _ = util.Configuration(&cacheConfig{})
+)
+
+// implementing util.Configuraion
+type cacheConfig struct {
+ dir string
+}
+
+func (c cacheConfig) GetString(key string) string {
+ return c.dir
+}
+
+func (c cacheConfig) GetBool(key string) bool {
+ panic("implement me")
+}
+
+func (c cacheConfig) GetInt(key string) int {
+ panic("implement me")
+}
+
+func (c cacheConfig) GetStringSlice(key string) []string {
+ panic("implement me")
+}
+
+func (c cacheConfig) SetDefault(key string, value interface{}) {
+ panic("implement me")
+}
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
new file mode 100644
index 000000000..4c9090d42
--- /dev/null
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -0,0 +1,93 @@
+package meta_cache
+
+import (
+ "context"
+ "os"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type MetaCache struct {
+ actualStore filer2.FilerStore
+ sync.RWMutex
+}
+
+func NewMetaCache(dbFolder string) *MetaCache {
+ return &MetaCache{
+ actualStore: openMetaStore(dbFolder),
+ }
+}
+
+func openMetaStore(dbFolder string) filer2.FilerStore {
+
+ os.RemoveAll(dbFolder)
+ os.MkdirAll(dbFolder, 0755)
+
+ store := &leveldb.LevelDBStore{}
+ config := &cacheConfig{
+ dir: dbFolder,
+ }
+
+ if err := store.Initialize(config, ""); err != nil {
+ glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
+ }
+
+ return store
+
+}
+
+func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error {
+ mc.Lock()
+ defer mc.Unlock()
+ return mc.actualStore.InsertEntry(ctx, entry)
+}
+
+func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPath, newEntry *filer2.Entry) error {
+ mc.Lock()
+ defer mc.Unlock()
+ if oldPath != "" {
+ if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil {
+ return err
+ }
+ }
+ if newEntry != nil {
+ if err := mc.actualStore.InsertEntry(ctx, newEntry); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer2.Entry) error {
+ mc.Lock()
+ defer mc.Unlock()
+ return mc.actualStore.UpdateEntry(ctx, entry)
+}
+
+func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer2.Entry, err error) {
+ mc.RLock()
+ defer mc.RUnlock()
+ return mc.actualStore.FindEntry(ctx, fp)
+}
+
+func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
+ mc.Lock()
+ defer mc.Unlock()
+ return mc.actualStore.DeleteEntry(ctx, fp)
+}
+
+func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer2.Entry, error) {
+ mc.RLock()
+ defer mc.RUnlock()
+ return mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+}
+
+func (mc *MetaCache) Shutdown() {
+ mc.Lock()
+ defer mc.Unlock()
+ mc.actualStore.Shutdown()
+}
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
new file mode 100644
index 000000000..58bf6862e
--- /dev/null
+++ b/weed/filesys/meta_cache/meta_cache_init.go
@@ -0,0 +1,21 @@
+package meta_cache
+
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient, path string) error {
+ glog.V(0).Infof("synchronizing meta data ...")
+ filer_pb.TraverseBfs(client, util.FullPath(path), func(parentPath util.FullPath, pbEntry *filer_pb.Entry) {
+ entry := filer2.FromPbEntry(string(parentPath), pbEntry)
+ if err := mc.InsertEntry(context.Background(), entry); err != nil {
+ glog.V(0).Infof("read %s: %v", entry.FullPath, err)
+ }
+ })
+ return nil
+}
diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go
new file mode 100644
index 000000000..2e411a48a
--- /dev/null
+++ b/weed/filesys/meta_cache/meta_cache_subscribe.go
@@ -0,0 +1,69 @@
+package meta_cache
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
+
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ var oldPath util.FullPath
+ var newEntry *filer2.Entry
+ if message.OldEntry != nil {
+ oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name)
+ glog.V(4).Infof("deleting %v", oldPath)
+ }
+
+ if message.NewEntry != nil {
+ dir := resp.Directory
+ if message.NewParentPath != "" {
+ dir = message.NewParentPath
+ }
+ key := util.NewFullPath(dir, message.NewEntry.Name)
+ glog.V(4).Infof("creating %v", key)
+ newEntry = filer2.FromPbEntry(dir, message.NewEntry)
+ }
+ return mc.AtomicUpdateEntry(context.Background(), oldPath, newEntry)
+ }
+
+ for {
+ err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
+ ClientName: "mount",
+ PathPrefix: dir,
+ SinceNs: lastTsNs,
+ })
+ if err != nil {
+ return fmt.Errorf("subscribe: %v", err)
+ }
+
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+
+ if err := processEventFn(resp); err != nil {
+ return fmt.Errorf("process %v: %v", resp, err)
+ }
+ lastTsNs = resp.TsNs
+ }
+ })
+ if err != nil {
+ glog.V(0).Infof("subscribing filer meta change: %v", err)
+ time.Sleep(time.Second)
+ }
+ }
+}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 4807e367b..67dd2a62c 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -5,17 +5,21 @@ import (
"fmt"
"math"
"os"
+ "path"
"strings"
"sync"
"time"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/karlseguin/ccache"
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
@@ -28,6 +32,8 @@ type Option struct {
Replication string
TtlSec int32
ChunkSizeLimit int64
+ CacheDir string
+ CacheSizeMB int64
DataCenter string
DirListCacheLimit int64
EntryCacheTtl time.Duration
@@ -38,6 +44,11 @@ type Option struct {
MountMode os.FileMode
MountCtime time.Time
MountMtime time.Time
+
+ OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
+ Cipher bool // whether encrypt data on volume server
+ AsyncMetaDataCaching bool // whether asynchronously cache meta data
+
}
var _ = fs.FS(&WFS{})
@@ -48,18 +59,18 @@ type WFS struct {
listDirectoryEntriesCache *ccache.Cache
// contains all open handles, protected by handlesLock
- handlesLock sync.Mutex
- handles []*FileHandle
- pathToHandleIndex map[filer2.FullPath]int
+ handlesLock sync.Mutex
+ handles map[uint64]*FileHandle
bufPool sync.Pool
stats statsCache
- // nodes, protected by nodesLock
- nodesLock sync.Mutex
- nodes map[uint64]fs.Node
- root fs.Node
+ root fs.Node
+ fsNodeCache *FsCache
+
+ chunkCache *chunk_cache.ChunkCache
+ metaCache *meta_cache.MetaCache
}
type statsCache struct {
filer_pb.StatisticsResponse
@@ -70,16 +81,34 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
option: option,
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
- pathToHandleIndex: make(map[filer2.FullPath]int),
+ handles: make(map[uint64]*FileHandle),
bufPool: sync.Pool{
New: func() interface{} {
return make([]byte, option.ChunkSizeLimit)
},
},
- nodes: make(map[uint64]fs.Node),
+ }
+ if option.CacheSizeMB > 0 {
+ wfs.chunkCache = chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB)
+ grace.OnInterrupt(func() {
+ wfs.chunkCache.Shutdown()
+ })
+ }
+ if wfs.option.AsyncMetaDataCaching {
+ wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta"))
+ startTime := time.Now()
+ if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil {
+ glog.V(0).Infof("failed to init meta cache: %v", err)
+ } else {
+ go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
+ grace.OnInterrupt(func() {
+ wfs.metaCache.Shutdown()
+ })
+ }
}
- wfs.root = &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}
+ wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
+ wfs.fsNodeCache = newFsCache(wfs.root)
return wfs
}
@@ -88,23 +117,18 @@ func (wfs *WFS) Root() (fs.Node, error) {
return wfs.root, nil
}
-func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error {
+var _ = filer_pb.FilerClient(&WFS{})
- err := util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
+func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(ctx2, client)
+ return fn(client)
}, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
if err == nil {
return nil
}
- if strings.Contains(err.Error(), "context canceled") {
- glog.V(2).Infoln("retry context canceled request...")
- return util.WithCachedGrpcClient(context.Background(), func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(ctx2, client)
- }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
- }
return err
}
@@ -117,40 +141,27 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand
wfs.handlesLock.Lock()
defer wfs.handlesLock.Unlock()
- index, found := wfs.pathToHandleIndex[fullpath]
- if found && wfs.handles[index] != nil {
- glog.V(2).Infoln(fullpath, "found fileHandle id", index)
- return wfs.handles[index]
+ inodeId := file.fullpath().AsInode()
+ existingHandle, found := wfs.handles[inodeId]
+ if found && existingHandle != nil {
+ return existingHandle
}
fileHandle = newFileHandle(file, uid, gid)
- for i, h := range wfs.handles {
- if h == nil {
- wfs.handles[i] = fileHandle
- fileHandle.handle = uint64(i)
- wfs.pathToHandleIndex[fullpath] = i
- glog.V(4).Infof("%s reuse fh %d", fullpath, fileHandle.handle)
- return
- }
- }
-
- wfs.handles = append(wfs.handles, fileHandle)
- fileHandle.handle = uint64(len(wfs.handles) - 1)
- wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
+ wfs.handles[inodeId] = fileHandle
+ fileHandle.handle = inodeId
glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
return
}
-func (wfs *WFS) ReleaseHandle(fullpath filer2.FullPath, handleId fuse.HandleID) {
+func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
wfs.handlesLock.Lock()
defer wfs.handlesLock.Unlock()
glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
- delete(wfs.pathToHandleIndex, fullpath)
- if int(handleId) < len(wfs.handles) {
- wfs.handles[int(handleId)] = nil
- }
+
+ delete(wfs.handles, fullpath.AsInode())
return
}
@@ -162,7 +173,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
if wfs.stats.lastChecked < time.Now().Unix()-20 {
- err := wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.StatisticsRequest{
Collection: wfs.option.Collection,
@@ -171,7 +182,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
}
glog.V(4).Infof("reading filer stats: %+v", request)
- resp, err := client.Statistics(ctx, request)
+ resp, err := client.Statistics(context.Background(), request)
if err != nil {
glog.V(0).Infof("reading filer stats %v: %v", request, err)
return err
@@ -217,43 +228,33 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
return nil
}
-func (wfs *WFS) cacheGet(path filer2.FullPath) *filer_pb.Entry {
+func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry {
item := wfs.listDirectoryEntriesCache.Get(string(path))
if item != nil && !item.Expired() {
return item.Value().(*filer_pb.Entry)
}
return nil
}
-func (wfs *WFS) cacheSet(path filer2.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
+func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
if entry == nil {
wfs.listDirectoryEntriesCache.Delete(string(path))
} else {
wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl)
}
}
-func (wfs *WFS) cacheDelete(path filer2.FullPath) {
+func (wfs *WFS) cacheDelete(path util.FullPath) {
wfs.listDirectoryEntriesCache.Delete(string(path))
}
-func (wfs *WFS) getNode(fullpath filer2.FullPath, fn func() fs.Node) fs.Node {
- wfs.nodesLock.Lock()
- defer wfs.nodesLock.Unlock()
-
- node, found := wfs.nodes[fullpath.AsInode()]
- if found {
- return node
+func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
+ if !wfs.option.OutsideContainerClusterMode {
+ return hostAndPort
}
- node = fn()
- if node != nil {
- wfs.nodes[fullpath.AsInode()] = node
+ commaIndex := strings.Index(hostAndPort, ":")
+ if commaIndex < 0 {
+ return hostAndPort
}
- return node
-}
-
-func (wfs *WFS) forgetNode(fullpath filer2.FullPath) {
- wfs.nodesLock.Lock()
- defer wfs.nodesLock.Unlock()
-
- delete(wfs.nodes, fullpath.AsInode())
+ filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":")
+ return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:])
}
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
index cce0c792c..bf21b1808 100644
--- a/weed/filesys/wfs_deletion.go
+++ b/weed/filesys/wfs_deletion.go
@@ -3,14 +3,15 @@ package filesys
import (
"context"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "google.golang.org/grpc"
)
-func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChunk) {
+func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
if len(chunks) == 0 {
return
}
@@ -20,13 +21,13 @@ func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChu
fileIds = append(fileIds, chunk.GetFileIdString())
}
- wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
- deleteFileIds(ctx, wfs.option.GrpcDialOption, client, fileIds)
+ wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ wfs.deleteFileIds(wfs.option.GrpcDialOption, client, fileIds)
return nil
})
}
-func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error {
+func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error {
var vids []string
for _, fileId := range fileIds {
@@ -38,7 +39,7 @@ func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client f
m := make(map[string]operation.LookupResult)
glog.V(4).Infof("remove file lookup volume id locations: %v", vids)
- resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
+ resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: vids,
})
if err != nil {
@@ -56,7 +57,7 @@ func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client f
}
for _, loc := range locations.Locations {
lr.Locations = append(lr.Locations, operation.Location{
- Url: loc.Url,
+ Url: wfs.AdjustedUrl(loc.Url),
PublicUrl: loc.PublicUrl,
})
}
diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go
index 9dfb491fd..7e7b8c60b 100644
--- a/weed/filesys/xattr.go
+++ b/weed/filesys/xattr.go
@@ -2,11 +2,10 @@ package filesys
import (
"context"
- "strings"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse"
)
@@ -108,25 +107,34 @@ func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.Lis
}
-func (wfs *WFS) maybeLoadEntry(ctx context.Context, dir, name string) (entry *filer_pb.Entry, err error) {
+func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) {
- fullpath := filer2.NewFullPath(dir, name)
+ fullpath := util.NewFullPath(dir, name)
entry = wfs.cacheGet(fullpath)
if entry != nil {
return
}
// glog.V(3).Infof("read entry cache miss %s", fullpath)
- err = wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
+ // read from async meta cache
+ if wfs.option.AsyncMetaDataCaching {
+ cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
+ if cacheErr == filer_pb.ErrNotFound {
+ return nil, fuse.ENOENT
+ }
+ return cachedEntry.ToProtoEntry(), nil
+ }
+
+ err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Name: name,
Directory: dir,
}
- resp, err := client.LookupDirectoryEntry(ctx, request)
- if err != nil || resp == nil || resp.Entry == nil {
- if err == filer2.ErrNotFound || strings.Contains(err.Error(), filer2.ErrNotFound.Error()) {
+ resp, err := filer_pb.LookupEntry(client, request)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
glog.V(3).Infof("file attr read not found file %v: %v", request, err)
return fuse.ENOENT
}