aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir.go282
-rw-r--r--weed/filesys/dir_link.go40
-rw-r--r--weed/filesys/dir_rename.go17
-rw-r--r--weed/filesys/dir_test.go34
-rw-r--r--weed/filesys/dirty_page.go9
-rw-r--r--weed/filesys/file.go111
-rw-r--r--weed/filesys/filehandle.go124
-rw-r--r--weed/filesys/fscache.go5
-rw-r--r--weed/filesys/wfs.go63
9 files changed, 359 insertions, 326 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 10a0a2b44..6ee20974b 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -24,9 +24,12 @@ type Dir struct {
wfs *WFS
entry *filer_pb.Entry
parent *Dir
+ id uint64
}
var _ = fs.Node(&Dir{})
+
+//var _ = fs.NodeIdentifier(&Dir{})
var _ = fs.NodeCreater(&Dir{})
var _ = fs.NodeMknoder(&Dir{})
var _ = fs.NodeMkdirer(&Dir{})
@@ -42,6 +45,10 @@ var _ = fs.NodeRemovexattrer(&Dir{})
var _ = fs.NodeListxattrer(&Dir{})
var _ = fs.NodeForgetter(&Dir{})
+func (dir *Dir) xId() uint64 {
+ return dir.id
+}
+
func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
// https://github.com/bazil/fuse/issues/196
@@ -53,17 +60,18 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
return nil
}
- if err := dir.maybeLoadEntry(); err != nil {
+ entry, err := dir.maybeLoadEntry()
+ if err != nil {
glog.V(3).Infof("dir Attr %s,err: %+v", dir.FullPath(), err)
return err
}
- // attr.Inode = util.FullPath(dir.FullPath()).AsInode()
- attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir
- attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0)
- attr.Crtime = time.Unix(dir.entry.Attributes.Crtime, 0)
- attr.Gid = dir.entry.Attributes.Gid
- attr.Uid = dir.entry.Attributes.Uid
+ // attr.Inode = dir.Id()
+ attr.Mode = os.FileMode(entry.Attributes.FileMode) | os.ModeDir
+ attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
+ attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
+ attr.Gid = entry.Attributes.Gid
+ attr.Uid = entry.Attributes.Uid
glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr)
@@ -74,16 +82,18 @@ func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *f
glog.V(4).Infof("dir Getxattr %s", dir.FullPath())
- if err := dir.maybeLoadEntry(); err != nil {
+ entry, err := dir.maybeLoadEntry()
+ if err != nil {
return err
}
- return getxattr(dir.entry, req, resp)
+ return getxattr(entry, req, resp)
}
func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
// attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode()
attr.Valid = time.Second
+ attr.Inode = 1 // dir.Id()
attr.Uid = dir.wfs.option.MountUid
attr.Gid = dir.wfs.option.MountGid
attr.Mode = dir.wfs.option.MountMode
@@ -102,45 +112,67 @@ func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
return nil
}
-func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
- f := dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node {
- return &File{
- Name: name,
- dir: dir,
- wfs: dir.wfs,
- entry: entry,
- entryViewCache: nil,
- }
- })
- f.(*File).dir = dir // in case dir node was created later
- return f
+func (dir *Dir) newFile(name string) fs.Node {
+
+ fileFullPath := util.NewFullPath(dir.FullPath(), name)
+ fileId := fileFullPath.AsInode()
+ dir.wfs.handlesLock.Lock()
+ existingHandle, found := dir.wfs.handles[fileId]
+ dir.wfs.handlesLock.Unlock()
+
+ if found {
+ glog.V(4).Infof("newFile found opened file handle: %+v", fileFullPath)
+ return existingHandle.f
+ }
+ return &File{
+ Name: name,
+ dir: dir,
+ wfs: dir.wfs,
+ id: fileId,
+ }
}
-func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node {
+func (dir *Dir) newDirectory(fullpath util.FullPath) fs.Node {
+
+ return &Dir{name: fullpath.Name(), wfs: dir.wfs, parent: dir, id: fullpath.AsInode()}
- d := dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node {
- return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir}
- })
- d.(*Dir).parent = dir // in case dir node was created later
- return d
}
func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
- request, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, req.Flags&fuse.OpenExclusive != 0)
+ exclusive := req.Flags&fuse.OpenExclusive != 0
+ isDirectory := req.Mode&os.ModeDir > 0
- if err != nil {
- return nil, nil, err
+ if exclusive || isDirectory {
+ _, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, exclusive)
+ if err != nil {
+ return nil, nil, err
+ }
}
var node fs.Node
- if request.Entry.IsDirectory {
- node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), request.Entry)
+ if isDirectory {
+ node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name))
return node, nil, nil
}
- node = dir.newFile(req.Name, request.Entry)
+ node = dir.newFile(req.Name)
file := node.(*File)
+ file.entry = &filer_pb.Entry{
+ Name: req.Name,
+ IsDirectory: req.Mode&os.ModeDir > 0,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(req.Mode &^ dir.wfs.option.Umask),
+ Uid: req.Uid,
+ Gid: req.Gid,
+ Collection: dir.wfs.option.Collection,
+ Replication: dir.wfs.option.Replication,
+ TtlSec: dir.wfs.option.TtlSec,
+ },
+ }
+ file.dirtyMetadata = true
fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
return file, fh, nil
@@ -148,19 +180,20 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) {
- request, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, false)
+ _, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, false)
if err != nil {
return nil, err
}
var node fs.Node
- node = dir.newFile(req.Name, request.Entry)
+ node = dir.newFile(req.Name)
return node, nil
}
-func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, exlusive bool) (*filer_pb.CreateEntryRequest, error) {
+func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, exclusive bool) (*filer_pb.CreateEntryRequest, error) {
+ dirFullPath := dir.FullPath()
request := &filer_pb.CreateEntryRequest{
- Directory: dir.FullPath(),
+ Directory: dirFullPath,
Entry: &filer_pb.Entry{
Name: name,
IsDirectory: mode&os.ModeDir > 0,
@@ -175,10 +208,10 @@ func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, ex
TtlSec: dir.wfs.option.TtlSec,
},
},
- OExcl: exlusive,
+ OExcl: exclusive,
Signatures: []int32{dir.wfs.signature},
}
- glog.V(1).Infof("create %s/%s", dir.FullPath(), name)
+ glog.V(1).Infof("create %s/%s", dirFullPath, name)
err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
@@ -189,11 +222,14 @@ func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, ex
if strings.Contains(err.Error(), "EEXIST") {
return fuse.EEXIST
}
- glog.V(0).Infof("create %s/%s: %v", dir.FullPath(), name, err)
+ glog.V(0).Infof("create %s/%s: %v", dirFullPath, name, err)
return fuse.EIO
}
- dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
+ if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
+ glog.Errorf("local InsertEntry dir %s/%s: %v", dirFullPath, name, err)
+ return fuse.EIO
+ }
return nil
})
@@ -216,84 +252,89 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
},
}
+ dirFullPath := dir.FullPath()
+
err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(newEntry)
defer dir.wfs.mapPbIdFromFilerToLocal(newEntry)
request := &filer_pb.CreateEntryRequest{
- Directory: dir.FullPath(),
+ Directory: dirFullPath,
Entry: newEntry,
Signatures: []int32{dir.wfs.signature},
}
glog.V(1).Infof("mkdir: %v", request)
if err := filer_pb.CreateEntry(client, request); err != nil {
- glog.V(0).Infof("mkdir %s/%s: %v", dir.FullPath(), req.Name, err)
+ glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err)
return err
}
- dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
+ if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
+ glog.Errorf("local mkdir dir %s/%s: %v", dirFullPath, req.Name, err)
+ return fuse.EIO
+ }
return nil
})
if err == nil {
- node := dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), newEntry)
+ node := dir.newDirectory(util.NewFullPath(dirFullPath, req.Name))
return node, nil
}
- glog.V(0).Infof("mkdir %s/%s: %v", dir.FullPath(), req.Name, err)
+ glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err)
return nil, fuse.EIO
}
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
- glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
-
- fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
dirPath := util.FullPath(dir.FullPath())
+ glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String())
+
+ fullFilePath := dirPath.Child(req.Name)
visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
if visitErr != nil {
glog.Errorf("dir Lookup %s: %v", dirPath, visitErr)
return nil, fuse.EIO
}
- cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
+ localEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
if cacheErr == filer_pb.ErrNotFound {
return nil, fuse.ENOENT
}
- entry := cachedEntry.ToProtoEntry()
- if entry == nil {
+ if localEntry == nil {
// glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
- entry, err = filer_pb.GetEntry(dir.wfs, fullFilePath)
+ entry, err := filer_pb.GetEntry(dir.wfs, fullFilePath)
if err != nil {
glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err)
return nil, fuse.ENOENT
}
+ localEntry = filer.FromPbEntry(string(dirPath), entry)
} else {
glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath)
}
- if entry != nil {
- if entry.IsDirectory {
- node = dir.newDirectory(fullFilePath, entry)
+ if localEntry != nil {
+ if localEntry.IsDirectory() {
+ node = dir.newDirectory(fullFilePath)
} else {
- node = dir.newFile(req.Name, entry)
+ node = dir.newFile(req.Name)
}
// resp.EntryValid = time.Second
- // resp.Attr.Inode = fullFilePath.AsInode()
+ resp.Attr.Inode = fullFilePath.AsInode()
resp.Attr.Valid = time.Second
- resp.Attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
- resp.Attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
- resp.Attr.Mode = os.FileMode(entry.Attributes.FileMode)
- resp.Attr.Gid = entry.Attributes.Gid
- resp.Attr.Uid = entry.Attributes.Uid
- if entry.HardLinkCounter > 0 {
- resp.Attr.Nlink = uint32(entry.HardLinkCounter)
+ resp.Attr.Mtime = localEntry.Attr.Mtime
+ resp.Attr.Crtime = localEntry.Attr.Crtime
+ resp.Attr.Mode = localEntry.Attr.Mode
+ resp.Attr.Gid = localEntry.Attr.Gid
+ resp.Attr.Uid = localEntry.Attr.Uid
+ if localEntry.HardLinkCounter > 0 {
+ resp.Attr.Nlink = uint32(localEntry.HardLinkCounter)
}
return node, nil
@@ -305,26 +346,25 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
- glog.V(4).Infof("dir ReadDirAll %s", dir.FullPath())
+ dirPath := util.FullPath(dir.FullPath())
+ glog.V(4).Infof("dir ReadDirAll %s", dirPath)
- processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error {
- if entry.IsDirectory {
- dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir}
+ processEachEntryFn := func(entry *filer.Entry, isLast bool) {
+ if entry.IsDirectory() {
+ dirent := fuse.Dirent{Name: entry.Name(), Type: fuse.DT_Dir, Inode: dirPath.Child(entry.Name()).AsInode()}
ret = append(ret, dirent)
} else {
- dirent := fuse.Dirent{Name: entry.Name, Type: findFileType(uint16(entry.Attributes.FileMode))}
+ dirent := fuse.Dirent{Name: entry.Name(), Type: findFileType(uint16(entry.Attr.Mode)), Inode: dirPath.Child(entry.Name()).AsInode()}
ret = append(ret, dirent)
}
- return nil
}
- dirPath := util.FullPath(dir.FullPath())
if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return nil, fuse.EIO
}
- listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
- processEachEntryFn(entry.ToProtoEntry(), false)
+ listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
+ processEachEntryFn(entry, false)
return true
})
if listErr != nil {
@@ -366,40 +406,32 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
- filePath := util.NewFullPath(dir.FullPath(), req.Name)
+ dirFullPath := dir.FullPath()
+ filePath := util.NewFullPath(dirFullPath, req.Name)
entry, err := filer_pb.GetEntry(dir.wfs, filePath)
if err != nil {
return err
}
- if entry == nil {
- return nil
- }
// first, ensure the filer store can correctly delete
glog.V(3).Infof("remove file: %v", req)
- isDeleteData := entry.HardLinkCounter <= 1
- err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, isDeleteData, false, false, false, []int32{dir.wfs.signature})
+ isDeleteData := entry != nil && entry.HardLinkCounter <= 1
+ err = filer_pb.Remove(dir.wfs, dirFullPath, req.Name, isDeleteData, false, false, false, []int32{dir.wfs.signature})
if err != nil {
- glog.V(3).Infof("not found remove file %s/%s: %v", dir.FullPath(), req.Name, err)
+ glog.V(3).Infof("not found remove file %s: %v", filePath, err)
return fuse.ENOENT
}
// then, delete meta cache and fsNode cache
- dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
-
- // clear entry inside the file
- fsNode := dir.wfs.fsNodeCache.GetFsNode(filePath)
- if fsNode != nil {
- if file, ok := fsNode.(*File); ok {
- file.clearEntry()
- }
+ if err = dir.wfs.metaCache.DeleteEntry(context.Background(), filePath); err != nil {
+ glog.V(3).Infof("local DeleteEntry %s: %v", filePath, err)
+ return fuse.ESTALE
}
- dir.wfs.fsNodeCache.DeleteFsNode(filePath)
// remove current file handle if any
dir.wfs.handlesLock.Lock()
defer dir.wfs.handlesLock.Unlock()
- inodeId := util.NewFullPath(dir.FullPath(), req.Name).AsInode()
+ inodeId := filePath.AsInode()
delete(dir.wfs.handles, inodeId)
return nil
@@ -408,20 +440,20 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
+ dirFullPath := dir.FullPath()
glog.V(3).Infof("remove directory entry: %v", req)
ignoreRecursiveErr := true // ignore recursion error since the OS should manage it
- err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, ignoreRecursiveErr, false, []int32{dir.wfs.signature})
+ err := filer_pb.Remove(dir.wfs, dirFullPath, req.Name, true, true, ignoreRecursiveErr, false, []int32{dir.wfs.signature})
if err != nil {
- glog.V(0).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err)
+ glog.V(0).Infof("remove %s/%s: %v", dirFullPath, req.Name, err)
if strings.Contains(err.Error(), "non-empty") {
return fuse.EEXIST
}
return fuse.ENOENT
}
- t := util.NewFullPath(dir.FullPath(), req.Name)
+ t := util.NewFullPath(dirFullPath, req.Name)
dir.wfs.metaCache.DeleteEntry(context.Background(), t)
- dir.wfs.fsNodeCache.DeleteFsNode(t)
return nil
@@ -431,27 +463,28 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus
glog.V(4).Infof("%v dir setattr %+v", dir.FullPath(), req)
- if err := dir.maybeLoadEntry(); err != nil {
+ entry, err := dir.maybeLoadEntry()
+ if err != nil {
return err
}
if req.Valid.Mode() {
- dir.entry.Attributes.FileMode = uint32(req.Mode)
+ entry.Attributes.FileMode = uint32(req.Mode)
}
if req.Valid.Uid() {
- dir.entry.Attributes.Uid = req.Uid
+ entry.Attributes.Uid = req.Uid
}
if req.Valid.Gid() {
- dir.entry.Attributes.Gid = req.Gid
+ entry.Attributes.Gid = req.Gid
}
if req.Valid.Mtime() {
- dir.entry.Attributes.Mtime = req.Mtime.Unix()
+ entry.Attributes.Mtime = req.Mtime.Unix()
}
- return dir.saveEntry()
+ return dir.saveEntry(entry)
}
@@ -459,15 +492,16 @@ func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
glog.V(4).Infof("dir Setxattr %s: %s", dir.FullPath(), req.Name)
- if err := dir.maybeLoadEntry(); err != nil {
+ entry, err := dir.maybeLoadEntry()
+ if err != nil {
return err
}
- if err := setxattr(dir.entry, req); err != nil {
+ if err := setxattr(entry, req); err != nil {
return err
}
- return dir.saveEntry()
+ return dir.saveEntry(entry)
}
@@ -475,15 +509,16 @@ func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) e
glog.V(4).Infof("dir Removexattr %s: %s", dir.FullPath(), req.Name)
- if err := dir.maybeLoadEntry(); err != nil {
+ entry, err := dir.maybeLoadEntry()
+ if err != nil {
return err
}
- if err := removexattr(dir.entry, req); err != nil {
+ if err := removexattr(entry, req); err != nil {
return err
}
- return dir.saveEntry()
+ return dir.saveEntry(entry)
}
@@ -491,11 +526,12 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp
glog.V(4).Infof("dir Listxattr %s", dir.FullPath())
- if err := dir.maybeLoadEntry(); err != nil {
+ entry, err := dir.maybeLoadEntry()
+ if err != nil {
return err
}
- if err := listxattr(dir.entry, req, resp); err != nil {
+ if err := listxattr(entry, req, resp); err != nil {
return err
}
@@ -505,34 +541,25 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp
func (dir *Dir) Forget() {
glog.V(4).Infof("Forget dir %s", dir.FullPath())
-
- dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath()))
}
-func (dir *Dir) maybeLoadEntry() error {
- if dir.entry == nil {
- parentDirPath, name := util.FullPath(dir.FullPath()).DirAndName()
- entry, err := dir.wfs.maybeLoadEntry(parentDirPath, name)
- if err != nil {
- return err
- }
- dir.entry = entry
- }
- return nil
+func (dir *Dir) maybeLoadEntry() (*filer_pb.Entry, error) {
+ parentDirPath, name := util.FullPath(dir.FullPath()).DirAndName()
+ return dir.wfs.maybeLoadEntry(parentDirPath, name)
}
-func (dir *Dir) saveEntry() error {
+func (dir *Dir) saveEntry(entry *filer_pb.Entry) error {
parentDir, name := util.FullPath(dir.FullPath()).DirAndName()
return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- dir.wfs.mapPbIdFromLocalToFiler(dir.entry)
- defer dir.wfs.mapPbIdFromFilerToLocal(dir.entry)
+ dir.wfs.mapPbIdFromLocalToFiler(entry)
+ defer dir.wfs.mapPbIdFromFilerToLocal(entry)
request := &filer_pb.UpdateEntryRequest{
Directory: parentDir,
- Entry: dir.entry,
+ Entry: entry,
Signatures: []int32{dir.wfs.signature},
}
@@ -543,7 +570,10 @@ func (dir *Dir) saveEntry() error {
return fuse.EIO
}
- dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
+ if err := dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
+ glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err)
+ return fuse.ESTALE
+ }
return nil
})
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
index ba3280f03..acdcd2de4 100644
--- a/weed/filesys/dir_link.go
+++ b/weed/filesys/dir_link.go
@@ -31,19 +31,24 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
glog.V(4).Infof("Link: %v/%v -> %v/%v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName)
- if _, err := oldFile.maybeLoadEntry(ctx); err != nil {
+ oldEntry, err := oldFile.maybeLoadEntry(ctx)
+ if err != nil {
return nil, err
}
+ if oldEntry == nil {
+ return nil, fuse.EIO
+ }
+
// update old file to hardlink mode
- if len(oldFile.entry.HardLinkId) == 0 {
- oldFile.entry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
- oldFile.entry.HardLinkCounter = 1
+ if len(oldEntry.HardLinkId) == 0 {
+ oldEntry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
+ oldEntry.HardLinkCounter = 1
}
- oldFile.entry.HardLinkCounter++
+ oldEntry.HardLinkCounter++
updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
Directory: oldFile.dir.FullPath(),
- Entry: oldFile.entry,
+ Entry: oldEntry,
Signatures: []int32{dir.wfs.signature},
}
@@ -53,17 +58,17 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
Entry: &filer_pb.Entry{
Name: req.NewName,
IsDirectory: false,
- Attributes: oldFile.entry.Attributes,
- Chunks: oldFile.entry.Chunks,
- Extended: oldFile.entry.Extended,
- HardLinkId: oldFile.entry.HardLinkId,
- HardLinkCounter: oldFile.entry.HardLinkCounter,
+ Attributes: oldEntry.Attributes,
+ Chunks: oldEntry.Chunks,
+ Extended: oldEntry.Extended,
+ HardLinkId: oldEntry.HardLinkId,
+ HardLinkCounter: oldEntry.HardLinkCounter,
},
Signatures: []int32{dir.wfs.signature},
}
// apply changes to the filer, and also apply to local metaCache
- err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
@@ -83,12 +88,13 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
return nil
})
+ if err != nil {
+ return nil, fuse.EIO
+ }
+
// create new file node
- newNode := dir.newFile(req.NewName, request.Entry)
+ newNode := dir.newFile(req.NewName)
newFile := newNode.(*File)
- if _, err := newFile.maybeLoadEntry(ctx); err != nil {
- return nil, err
- }
return newFile, err
@@ -130,7 +136,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
return nil
})
- symlink := dir.newFile(req.NewName, request.Entry)
+ symlink := dir.newFile(req.NewName)
return symlink, err
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go
index d2acad4b2..b07710d17 100644
--- a/weed/filesys/dir_rename.go
+++ b/weed/filesys/dir_rename.go
@@ -64,19 +64,28 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
return fuse.EIO
}
- // fmt.Printf("rename path: %v => %v\n", oldPath, newPath)
- dir.wfs.fsNodeCache.Move(oldPath, newPath)
+ oldFsNode := NodeWithId(oldPath.AsInode())
+ newFsNode := NodeWithId(newPath.AsInode())
+ dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) {
+ if file, ok := internalNode.(*File); ok {
+ glog.V(4).Infof("internal node %s", file.Name)
+ file.Name = req.NewName
+ file.id = uint64(newFsNode)
+ }
+ })
// change file handle
dir.wfs.handlesLock.Lock()
defer dir.wfs.handlesLock.Unlock()
inodeId := oldPath.AsInode()
existingHandle, found := dir.wfs.handles[inodeId]
+ glog.V(4).Infof("has open filehandle %s: %v", oldPath, found)
if !found || existingHandle == nil {
- return err
+ return nil
}
+ glog.V(4).Infof("opened filehandle %s => %s", oldPath, newPath)
delete(dir.wfs.handles, inodeId)
dir.wfs.handles[newPath.AsInode()] = existingHandle
- return err
+ return nil
}
diff --git a/weed/filesys/dir_test.go b/weed/filesys/dir_test.go
deleted file mode 100644
index 49c76eb5e..000000000
--- a/weed/filesys/dir_test.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package filesys
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestDirPath(t *testing.T) {
-
- p := &Dir{name: "/some"}
- p = &Dir{name: "path", parent: p}
- p = &Dir{name: "to", parent: p}
- p = &Dir{name: "a", parent: p}
- p = &Dir{name: "file", parent: p}
-
- assert.Equal(t, "/some/path/to/a/file", p.FullPath())
-
- p = &Dir{name: "/some"}
- assert.Equal(t, "/some", p.FullPath())
-
- p = &Dir{name: "/"}
- assert.Equal(t, "/", p.FullPath())
-
- p = &Dir{name: "/"}
- p = &Dir{name: "path", parent: p}
- assert.Equal(t, "/path", p.FullPath())
-
- p = &Dir{name: "/"}
- p = &Dir{name: "path", parent: p}
- p = &Dir{name: "to", parent: p}
- assert.Equal(t, "/path/to", p.FullPath())
-
-}
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index f05a3a56a..8888cff96 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -30,7 +30,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) {
- glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize)
+ glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data)))
if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
// this is more than what buffer can hold.
@@ -69,7 +69,12 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
return false
}
- fileSize := int64(pages.f.entry.Attributes.FileSize)
+ entry := pages.f.getEntry()
+ if entry == nil {
+ return false
+ }
+
+ fileSize := int64(entry.Attributes.FileSize)
chunkSize := min(maxList.Size(), fileSize-maxList.Offset())
if chunkSize == 0 {
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index a210c5152..bb57988cd 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -2,7 +2,6 @@ package filesys
import (
"context"
- "io"
"os"
"sort"
"time"
@@ -19,6 +18,7 @@ import (
const blockSize = 512
var _ = fs.Node(&File{})
+var _ = fs.NodeIdentifier(&File{})
var _ = fs.NodeOpener(&File{})
var _ = fs.NodeFsyncer(&File{})
var _ = fs.NodeSetattrer(&File{})
@@ -29,32 +29,37 @@ var _ = fs.NodeListxattrer(&File{})
var _ = fs.NodeForgetter(&File{})
type File struct {
- Name string
- dir *Dir
- wfs *WFS
- entry *filer_pb.Entry
- entryViewCache []filer.VisibleInterval
- isOpen int
- reader io.ReaderAt
- dirtyMetadata bool
+ Name string
+ dir *Dir
+ wfs *WFS
+ entry *filer_pb.Entry
+ isOpen int
+ dirtyMetadata bool
+ id uint64
}
func (file *File) fullpath() util.FullPath {
return util.NewFullPath(file.dir.FullPath(), file.Name)
}
+func (file *File) Id() uint64 {
+ return file.id
+}
+
func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr)
- entry := file.entry
- if file.isOpen <= 0 || entry == nil {
- if entry, err = file.maybeLoadEntry(ctx); err != nil {
- return err
- }
+ entry, err := file.maybeLoadEntry(ctx)
+ if err != nil {
+ return err
}
- // attr.Inode = file.fullpath().AsInode()
+ if entry == nil {
+ return fuse.ENOENT
+ }
+
+ attr.Inode = file.Id()
attr.Valid = time.Second
attr.Mode = os.FileMode(entry.Attributes.FileMode)
attr.Size = filer.FileSize(entry)
@@ -106,13 +111,13 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
glog.V(4).Infof("%v file setattr %+v", file.fullpath(), req)
- _, err := file.maybeLoadEntry(ctx)
+ entry, err := file.maybeLoadEntry(ctx)
if err != nil {
return err
}
if file.isOpen > 0 {
file.wfs.handlesLock.Lock()
- fileHandle := file.wfs.handles[file.fullpath().AsInode()]
+ fileHandle := file.wfs.handles[file.Id()]
file.wfs.handlesLock.Unlock()
if fileHandle != nil {
@@ -123,12 +128,12 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
if req.Valid.Size() {
- glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(file.entry.Chunks))
- if req.Size < filer.FileSize(file.entry) {
+ glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks))
+ if req.Size < filer.FileSize(entry) {
// fmt.Printf("truncate %v \n", fullPath)
var chunks []*filer_pb.FileChunk
var truncatedChunks []*filer_pb.FileChunk
- for _, chunk := range file.entry.Chunks {
+ for _, chunk := range entry.Chunks {
int64Size := int64(chunk.Size)
if chunk.Offset+int64Size > int64(req.Size) {
// this chunk is truncated
@@ -143,36 +148,34 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
}
}
}
- file.entry.Chunks = chunks
- file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks)
- file.reader = nil
+ entry.Chunks = chunks
}
- file.entry.Attributes.FileSize = req.Size
+ entry.Attributes.FileSize = req.Size
file.dirtyMetadata = true
}
if req.Valid.Mode() {
- file.entry.Attributes.FileMode = uint32(req.Mode)
+ entry.Attributes.FileMode = uint32(req.Mode)
file.dirtyMetadata = true
}
if req.Valid.Uid() {
- file.entry.Attributes.Uid = req.Uid
+ entry.Attributes.Uid = req.Uid
file.dirtyMetadata = true
}
if req.Valid.Gid() {
- file.entry.Attributes.Gid = req.Gid
+ entry.Attributes.Gid = req.Gid
file.dirtyMetadata = true
}
if req.Valid.Crtime() {
- file.entry.Attributes.Crtime = req.Crtime.Unix()
+ entry.Attributes.Crtime = req.Crtime.Unix()
file.dirtyMetadata = true
}
if req.Valid.Mtime() {
- file.entry.Attributes.Mtime = req.Mtime.Unix()
+ entry.Attributes.Mtime = req.Mtime.Unix()
file.dirtyMetadata = true
}
@@ -188,7 +191,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
return nil
}
- return file.saveEntry(file.entry)
+ return file.saveEntry(entry)
}
@@ -254,14 +257,20 @@ func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
func (file *File) Forget() {
t := util.NewFullPath(file.dir.FullPath(), file.Name)
glog.V(4).Infof("Forget file %s", t)
- file.wfs.fsNodeCache.DeleteFsNode(t)
+ file.wfs.ReleaseHandle(t, fuse.HandleID(t.AsInode()))
}
func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) {
+
+ file.wfs.handlesLock.Lock()
+ handle, found := file.wfs.handles[file.Id()]
+ file.wfs.handlesLock.Unlock()
entry = file.entry
- if file.isOpen > 0 {
- return entry, nil
+ if found {
+ glog.V(4).Infof("maybeLoadEntry found opened file %s/%s: %v %v", file.dir.FullPath(), file.Name, handle.f.entry, entry)
+ entry = handle.f.entry
}
+
if entry != nil {
if len(entry.HardLinkId) == 0 {
// only always reload hard link
@@ -274,7 +283,7 @@ func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, er
return entry, err
}
if entry != nil {
- file.setEntry(entry)
+ // file.entry = entry
} else {
glog.Warningf("maybeLoadEntry not found entry %s/%s: %v", file.dir.FullPath(), file.Name, err)
}
@@ -299,8 +308,13 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
}
}
+ entry := file.getEntry()
+ if entry == nil {
+ return
+ }
+
// pick out-of-order chunks from existing chunks
- for _, chunk := range file.entry.Chunks {
+ for _, chunk := range entry.Chunks {
if lessThan(earliestChunk, chunk) {
chunks = append(chunks, chunk)
}
@@ -311,28 +325,9 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
return lessThan(chunks[i], chunks[j])
})
- // add to entry view cache
- for _, chunk := range chunks {
- file.entryViewCache = filer.MergeIntoVisibles(file.entryViewCache, chunk)
- }
-
- file.reader = nil
-
- glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
+ glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(entry.Chunks), len(chunks))
- file.entry.Chunks = append(file.entry.Chunks, newChunks...)
-}
-
-func (file *File) setEntry(entry *filer_pb.Entry) {
- file.entry = entry
- file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks)
- file.reader = nil
-}
-
-func (file *File) clearEntry() {
- file.entry = nil
- file.entryViewCache = nil
- file.reader = nil
+ entry.Chunks = append(entry.Chunks, newChunks...)
}
func (file *File) saveEntry(entry *filer_pb.Entry) error {
@@ -359,3 +354,7 @@ func (file *File) saveEntry(entry *filer_pb.Entry) error {
return nil
})
}
+
+func (file *File) getEntry() *filer_pb.Entry {
+ return file.entry
+}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index fb073c9cd..27ffab6e1 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -20,10 +20,12 @@ import (
type FileHandle struct {
// cache file has been written to
- dirtyPages *ContinuousDirtyPages
- contentType string
- handle uint64
- sync.RWMutex
+ dirtyPages *ContinuousDirtyPages
+ entryViewCache []filer.VisibleInterval
+ reader io.ReaderAt
+ contentType string
+ handle uint64
+ sync.Mutex
f *File
RequestId fuse.RequestID // unique ID for request
@@ -40,8 +42,9 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle {
Uid: uid,
Gid: gid,
}
- if fh.f.entry != nil {
- fh.f.entry.Attributes.FileSize = filer.FileSize(fh.f.entry)
+ entry := fh.f.getEntry()
+ if entry != nil {
+ entry.Attributes.FileSize = filer.FileSize(entry)
}
return fh
@@ -58,8 +61,8 @@ var _ = fs.HandleReleaser(&FileHandle{})
func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data))
- fh.RLock()
- defer fh.RUnlock()
+ fh.Lock()
+ defer fh.Unlock()
if req.Size <= 0 {
return nil
@@ -104,42 +107,48 @@ func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxSto
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
- fileSize := int64(filer.FileSize(fh.f.entry))
+ entry := fh.f.getEntry()
+ if entry == nil {
+ return 0, io.EOF
+ }
+
+ fileSize := int64(filer.FileSize(entry))
+ fileFullPath := fh.f.fullpath()
if fileSize == 0 {
- glog.V(1).Infof("empty fh %v", fh.f.fullpath())
+ glog.V(1).Infof("empty fh %v", fileFullPath)
return 0, io.EOF
}
- if offset+int64(len(buff)) <= int64(len(fh.f.entry.Content)) {
- totalRead := copy(buff, fh.f.entry.Content[offset:])
- glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead)
+ if offset+int64(len(buff)) <= int64(len(entry.Content)) {
+ totalRead := copy(buff, entry.Content[offset:])
+ glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
return int64(totalRead), nil
}
var chunkResolveErr error
- if fh.f.entryViewCache == nil {
- fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks)
+ if fh.entryViewCache == nil {
+ fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks)
if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
- fh.f.reader = nil
+ fh.reader = nil
}
- reader := fh.f.reader
+ reader := fh.reader
if reader == nil {
- chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64)
+ chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64)
reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize)
}
- fh.f.reader = reader
+ fh.reader = reader
totalRead, err := reader.ReadAt(buff, offset)
if err != nil && err != io.EOF {
- glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
+ glog.Errorf("file handle read %s: %v", fileFullPath, err)
}
- glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err)
+ glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
return int64(totalRead), err
}
@@ -158,8 +167,13 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
copy(data, req.Data)
}
- fh.f.entry.Content = nil
- fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize)))
+ entry := fh.f.getEntry()
+ if entry == nil {
+ return fuse.EIO
+ }
+
+ entry.Content = nil
+ entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize)))
glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
fh.dirtyPages.AddPage(req.Offset, data)
@@ -179,30 +193,27 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
- glog.V(4).Infof("Release %v fh %d", fh.f.fullpath(), fh.handle)
+ glog.V(4).Infof("Release %v fh %d open=%d", fh.f.fullpath(), fh.handle, fh.f.isOpen)
fh.Lock()
defer fh.Unlock()
+ fh.f.isOpen--
+
if fh.f.isOpen <= 0 {
+ fh.f.entry = nil
+ fh.entryViewCache = nil
+ fh.reader = nil
+
+ fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
+ }
+
+ if fh.f.isOpen < 0 {
glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0)
fh.f.isOpen = 0
return nil
}
- if fh.f.isOpen == 1 {
-
- fh.f.isOpen--
-
- fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
- if closer, ok := fh.f.reader.(io.Closer); ok {
- if closer != nil {
- closer.Close()
- }
- }
- fh.f.reader = nil
- }
-
return nil
}
@@ -242,35 +253,40 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- if fh.f.entry.Attributes != nil {
- fh.f.entry.Attributes.Mime = fh.contentType
- if fh.f.entry.Attributes.Uid == 0 {
- fh.f.entry.Attributes.Uid = header.Uid
+ entry := fh.f.getEntry()
+ if entry == nil {
+ return nil
+ }
+
+ if entry.Attributes != nil {
+ entry.Attributes.Mime = fh.contentType
+ if entry.Attributes.Uid == 0 {
+ entry.Attributes.Uid = header.Uid
}
- if fh.f.entry.Attributes.Gid == 0 {
- fh.f.entry.Attributes.Gid = header.Gid
+ if entry.Attributes.Gid == 0 {
+ entry.Attributes.Gid = header.Gid
}
- if fh.f.entry.Attributes.Crtime == 0 {
- fh.f.entry.Attributes.Crtime = time.Now().Unix()
+ if entry.Attributes.Crtime == 0 {
+ entry.Attributes.Crtime = time.Now().Unix()
}
- fh.f.entry.Attributes.Mtime = time.Now().Unix()
- fh.f.entry.Attributes.FileMode = uint32(os.FileMode(fh.f.entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
- fh.f.entry.Attributes.Collection = fh.dirtyPages.collection
- fh.f.entry.Attributes.Replication = fh.dirtyPages.replication
+ entry.Attributes.Mtime = time.Now().Unix()
+ entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
+ entry.Attributes.Collection = fh.dirtyPages.collection
+ entry.Attributes.Replication = fh.dirtyPages.replication
}
request := &filer_pb.CreateEntryRequest{
Directory: fh.f.dir.FullPath(),
- Entry: fh.f.entry,
+ Entry: entry,
Signatures: []int32{fh.f.wfs.signature},
}
- glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks))
- for i, chunk := range fh.f.entry.Chunks {
+ glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(entry.Chunks))
+ for i, chunk := range entry.Chunks {
glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
}
- manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
+ manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks)
chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
@@ -278,7 +294,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
// not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
}
- fh.f.entry.Chunks = append(chunks, manifestChunks...)
+ entry.Chunks = append(chunks, manifestChunks...)
fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry)
diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go
index fdec8253c..6b1012090 100644
--- a/weed/filesys/fscache.go
+++ b/weed/filesys/fscache.go
@@ -124,8 +124,9 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
}
if f, ok := src.node.(*File); ok {
f.Name = target.name
- if f.entry != nil {
- f.entry.Name = f.Name
+ entry := f.getEntry()
+ if entry != nil {
+ entry.Name = f.Name
}
}
parent.disconnectChild(target)
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index c6d9080a1..42816d23d 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -41,7 +41,6 @@ type Option struct {
CacheDir string
CacheSizeMB int64
DataCenter string
- EntryCacheTtl time.Duration
Umask os.FileMode
MountUid uint32
@@ -104,24 +103,16 @@ func NewSeaweedFileSystem(option *Option) *WFS {
}
wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
- fsNode := wfs.fsNodeCache.GetFsNode(filePath)
- if fsNode != nil {
- if file, ok := fsNode.(*File); ok {
- if err := wfs.Server.InvalidateNodeData(file); err != nil {
- glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
- }
- file.clearEntry()
- }
+
+ fsNode := NodeWithId(filePath.AsInode())
+ if err := wfs.Server.InvalidateNodeData(fsNode); err != nil {
+ glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
}
+
dir, name := filePath.DirAndName()
- parent := wfs.root
- if dir != "/" {
- parent = wfs.fsNodeCache.GetFsNode(util.FullPath(dir))
- }
- if parent != nil {
- if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
- glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
- }
+ parent := NodeWithId(util.FullPath(dir).AsInode())
+ if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
+ glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
}
})
startTime := time.Now()
@@ -130,8 +121,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.metaCache.Shutdown()
})
- entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath))
- wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
+ wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
wfs.fsNodeCache = newFsCache(wfs.root)
if wfs.option.ConcurrentWriters > 0 {
@@ -150,25 +140,28 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand
fullpath := file.fullpath()
glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
- wfs.handlesLock.Lock()
- defer wfs.handlesLock.Unlock()
+ inodeId := file.Id()
- inodeId := file.fullpath().AsInode()
- if file.isOpen > 0 {
- existingHandle, found := wfs.handles[inodeId]
- if found && existingHandle != nil {
- file.isOpen++
- return existingHandle
- }
+ wfs.handlesLock.Lock()
+ existingHandle, found := wfs.handles[inodeId]
+ wfs.handlesLock.Unlock()
+ if found && existingHandle != nil {
+ existingHandle.f.isOpen++
+ glog.V(4).Infof("Acquired Handle %s open %d", fullpath, existingHandle.f.isOpen)
+ return existingHandle
}
+ entry, _ := file.maybeLoadEntry(context.Background())
+ file.entry = entry
fileHandle = newFileHandle(file, uid, gid)
- file.maybeLoadEntry(context.Background())
file.isOpen++
+ wfs.handlesLock.Lock()
wfs.handles[inodeId] = fileHandle
+ wfs.handlesLock.Unlock()
fileHandle.handle = inodeId
+ glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen)
return
}
@@ -176,9 +169,9 @@ func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
wfs.handlesLock.Lock()
defer wfs.handlesLock.Unlock()
- glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
+ glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles))
- delete(wfs.handles, fullpath.AsInode())
+ delete(wfs.handles, uint64(handleId))
return
}
@@ -268,3 +261,11 @@ func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
return filer.LookupFn(wfs)
}
+
+type NodeWithId uint64
+func (n NodeWithId) Id() uint64 {
+ return uint64(n)
+}
+func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error {
+ return nil
+}