diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 282 | ||||
| -rw-r--r-- | weed/filesys/dir_link.go | 40 | ||||
| -rw-r--r-- | weed/filesys/dir_rename.go | 17 | ||||
| -rw-r--r-- | weed/filesys/dir_test.go | 34 | ||||
| -rw-r--r-- | weed/filesys/dirty_page.go | 9 | ||||
| -rw-r--r-- | weed/filesys/file.go | 111 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 124 | ||||
| -rw-r--r-- | weed/filesys/fscache.go | 5 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 63 |
9 files changed, 359 insertions, 326 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 10a0a2b44..6ee20974b 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -24,9 +24,12 @@ type Dir struct { wfs *WFS entry *filer_pb.Entry parent *Dir + id uint64 } var _ = fs.Node(&Dir{}) + +//var _ = fs.NodeIdentifier(&Dir{}) var _ = fs.NodeCreater(&Dir{}) var _ = fs.NodeMknoder(&Dir{}) var _ = fs.NodeMkdirer(&Dir{}) @@ -42,6 +45,10 @@ var _ = fs.NodeRemovexattrer(&Dir{}) var _ = fs.NodeListxattrer(&Dir{}) var _ = fs.NodeForgetter(&Dir{}) +func (dir *Dir) xId() uint64 { + return dir.id +} + func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { // https://github.com/bazil/fuse/issues/196 @@ -53,17 +60,18 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { return nil } - if err := dir.maybeLoadEntry(); err != nil { + entry, err := dir.maybeLoadEntry() + if err != nil { glog.V(3).Infof("dir Attr %s,err: %+v", dir.FullPath(), err) return err } - // attr.Inode = util.FullPath(dir.FullPath()).AsInode() - attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir - attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0) - attr.Crtime = time.Unix(dir.entry.Attributes.Crtime, 0) - attr.Gid = dir.entry.Attributes.Gid - attr.Uid = dir.entry.Attributes.Uid + // attr.Inode = dir.Id() + attr.Mode = os.FileMode(entry.Attributes.FileMode) | os.ModeDir + attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) + attr.Crtime = time.Unix(entry.Attributes.Crtime, 0) + attr.Gid = entry.Attributes.Gid + attr.Uid = entry.Attributes.Uid glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) @@ -74,16 +82,18 @@ func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *f glog.V(4).Infof("dir Getxattr %s", dir.FullPath()) - if err := dir.maybeLoadEntry(); err != nil { + entry, err := dir.maybeLoadEntry() + if err != nil { return err } - return getxattr(dir.entry, req, resp) + return getxattr(entry, req, resp) } func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { // attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode() attr.Valid = time.Second + attr.Inode = 1 // dir.Id() attr.Uid = dir.wfs.option.MountUid attr.Gid = dir.wfs.option.MountGid attr.Mode = dir.wfs.option.MountMode @@ -102,45 +112,67 @@ func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { return nil } -func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { - f := dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node { - return &File{ - Name: name, - dir: dir, - wfs: dir.wfs, - entry: entry, - entryViewCache: nil, - } - }) - f.(*File).dir = dir // in case dir node was created later - return f +func (dir *Dir) newFile(name string) fs.Node { + + fileFullPath := util.NewFullPath(dir.FullPath(), name) + fileId := fileFullPath.AsInode() + dir.wfs.handlesLock.Lock() + existingHandle, found := dir.wfs.handles[fileId] + dir.wfs.handlesLock.Unlock() + + if found { + glog.V(4).Infof("newFile found opened file handle: %+v", fileFullPath) + return existingHandle.f + } + return &File{ + Name: name, + dir: dir, + wfs: dir.wfs, + id: fileId, + } } -func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node { +func (dir *Dir) newDirectory(fullpath util.FullPath) fs.Node { + + return &Dir{name: fullpath.Name(), wfs: dir.wfs, parent: dir, id: fullpath.AsInode()} - d := dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node { - return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir} - }) - d.(*Dir).parent = dir // in case dir node was created later - return d } func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) { - request, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, req.Flags&fuse.OpenExclusive != 0) + exclusive := req.Flags&fuse.OpenExclusive != 0 + isDirectory := req.Mode&os.ModeDir > 0 - if err != nil { - return nil, nil, err + if exclusive || isDirectory { + _, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, exclusive) + if err != nil { + return nil, nil, err + } } var node fs.Node - if request.Entry.IsDirectory { - node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), request.Entry) + if isDirectory { + node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name)) return node, nil, nil } - node = dir.newFile(req.Name, request.Entry) + node = dir.newFile(req.Name) file := node.(*File) + file.entry = &filer_pb.Entry{ + Name: req.Name, + IsDirectory: req.Mode&os.ModeDir > 0, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(req.Mode &^ dir.wfs.option.Umask), + Uid: req.Uid, + Gid: req.Gid, + Collection: dir.wfs.option.Collection, + Replication: dir.wfs.option.Replication, + TtlSec: dir.wfs.option.TtlSec, + }, + } + file.dirtyMetadata = true fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid) return file, fh, nil @@ -148,19 +180,20 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) { - request, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, false) + _, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, false) if err != nil { return nil, err } var node fs.Node - node = dir.newFile(req.Name, request.Entry) + node = dir.newFile(req.Name) return node, nil } -func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, exlusive bool) (*filer_pb.CreateEntryRequest, error) { +func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, exclusive bool) (*filer_pb.CreateEntryRequest, error) { + dirFullPath := dir.FullPath() request := &filer_pb.CreateEntryRequest{ - Directory: dir.FullPath(), + Directory: dirFullPath, Entry: &filer_pb.Entry{ Name: name, IsDirectory: mode&os.ModeDir > 0, @@ -175,10 +208,10 @@ func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, ex TtlSec: dir.wfs.option.TtlSec, }, }, - OExcl: exlusive, + OExcl: exclusive, Signatures: []int32{dir.wfs.signature}, } - glog.V(1).Infof("create %s/%s", dir.FullPath(), name) + glog.V(1).Infof("create %s/%s", dirFullPath, name) err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { @@ -189,11 +222,14 @@ func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, ex if strings.Contains(err.Error(), "EEXIST") { return fuse.EEXIST } - glog.V(0).Infof("create %s/%s: %v", dir.FullPath(), name, err) + glog.V(0).Infof("create %s/%s: %v", dirFullPath, name, err) return fuse.EIO } - dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) + if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil { + glog.Errorf("local InsertEntry dir %s/%s: %v", dirFullPath, name, err) + return fuse.EIO + } return nil }) @@ -216,84 +252,89 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err }, } + dirFullPath := dir.FullPath() + err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(newEntry) defer dir.wfs.mapPbIdFromFilerToLocal(newEntry) request := &filer_pb.CreateEntryRequest{ - Directory: dir.FullPath(), + Directory: dirFullPath, Entry: newEntry, Signatures: []int32{dir.wfs.signature}, } glog.V(1).Infof("mkdir: %v", request) if err := filer_pb.CreateEntry(client, request); err != nil { - glog.V(0).Infof("mkdir %s/%s: %v", dir.FullPath(), req.Name, err) + glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err) return err } - dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) + if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil { + glog.Errorf("local mkdir dir %s/%s: %v", dirFullPath, req.Name, err) + return fuse.EIO + } return nil }) if err == nil { - node := dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), newEntry) + node := dir.newDirectory(util.NewFullPath(dirFullPath, req.Name)) return node, nil } - glog.V(0).Infof("mkdir %s/%s: %v", dir.FullPath(), req.Name, err) + glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err) return nil, fuse.EIO } func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { - glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String()) - - fullFilePath := util.NewFullPath(dir.FullPath(), req.Name) dirPath := util.FullPath(dir.FullPath()) + glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String()) + + fullFilePath := dirPath.Child(req.Name) visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) if visitErr != nil { glog.Errorf("dir Lookup %s: %v", dirPath, visitErr) return nil, fuse.EIO } - cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) + localEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) if cacheErr == filer_pb.ErrNotFound { return nil, fuse.ENOENT } - entry := cachedEntry.ToProtoEntry() - if entry == nil { + if localEntry == nil { // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) - entry, err = filer_pb.GetEntry(dir.wfs, fullFilePath) + entry, err := filer_pb.GetEntry(dir.wfs, fullFilePath) if err != nil { glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err) return nil, fuse.ENOENT } + localEntry = filer.FromPbEntry(string(dirPath), entry) } else { glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath) } - if entry != nil { - if entry.IsDirectory { - node = dir.newDirectory(fullFilePath, entry) + if localEntry != nil { + if localEntry.IsDirectory() { + node = dir.newDirectory(fullFilePath) } else { - node = dir.newFile(req.Name, entry) + node = dir.newFile(req.Name) } // resp.EntryValid = time.Second - // resp.Attr.Inode = fullFilePath.AsInode() + resp.Attr.Inode = fullFilePath.AsInode() resp.Attr.Valid = time.Second - resp.Attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) - resp.Attr.Crtime = time.Unix(entry.Attributes.Crtime, 0) - resp.Attr.Mode = os.FileMode(entry.Attributes.FileMode) - resp.Attr.Gid = entry.Attributes.Gid - resp.Attr.Uid = entry.Attributes.Uid - if entry.HardLinkCounter > 0 { - resp.Attr.Nlink = uint32(entry.HardLinkCounter) + resp.Attr.Mtime = localEntry.Attr.Mtime + resp.Attr.Crtime = localEntry.Attr.Crtime + resp.Attr.Mode = localEntry.Attr.Mode + resp.Attr.Gid = localEntry.Attr.Gid + resp.Attr.Uid = localEntry.Attr.Uid + if localEntry.HardLinkCounter > 0 { + resp.Attr.Nlink = uint32(localEntry.HardLinkCounter) } return node, nil @@ -305,26 +346,25 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - glog.V(4).Infof("dir ReadDirAll %s", dir.FullPath()) + dirPath := util.FullPath(dir.FullPath()) + glog.V(4).Infof("dir ReadDirAll %s", dirPath) - processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error { - if entry.IsDirectory { - dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir} + processEachEntryFn := func(entry *filer.Entry, isLast bool) { + if entry.IsDirectory() { + dirent := fuse.Dirent{Name: entry.Name(), Type: fuse.DT_Dir, Inode: dirPath.Child(entry.Name()).AsInode()} ret = append(ret, dirent) } else { - dirent := fuse.Dirent{Name: entry.Name, Type: findFileType(uint16(entry.Attributes.FileMode))} + dirent := fuse.Dirent{Name: entry.Name(), Type: findFileType(uint16(entry.Attr.Mode)), Inode: dirPath.Child(entry.Name()).AsInode()} ret = append(ret, dirent) } - return nil } - dirPath := util.FullPath(dir.FullPath()) if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil { glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) return nil, fuse.EIO } - listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool { - processEachEntryFn(entry.ToProtoEntry(), false) + listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool { + processEachEntryFn(entry, false) return true }) if listErr != nil { @@ -366,40 +406,32 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { - filePath := util.NewFullPath(dir.FullPath(), req.Name) + dirFullPath := dir.FullPath() + filePath := util.NewFullPath(dirFullPath, req.Name) entry, err := filer_pb.GetEntry(dir.wfs, filePath) if err != nil { return err } - if entry == nil { - return nil - } // first, ensure the filer store can correctly delete glog.V(3).Infof("remove file: %v", req) - isDeleteData := entry.HardLinkCounter <= 1 - err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, isDeleteData, false, false, false, []int32{dir.wfs.signature}) + isDeleteData := entry != nil && entry.HardLinkCounter <= 1 + err = filer_pb.Remove(dir.wfs, dirFullPath, req.Name, isDeleteData, false, false, false, []int32{dir.wfs.signature}) if err != nil { - glog.V(3).Infof("not found remove file %s/%s: %v", dir.FullPath(), req.Name, err) + glog.V(3).Infof("not found remove file %s: %v", filePath, err) return fuse.ENOENT } // then, delete meta cache and fsNode cache - dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) - - // clear entry inside the file - fsNode := dir.wfs.fsNodeCache.GetFsNode(filePath) - if fsNode != nil { - if file, ok := fsNode.(*File); ok { - file.clearEntry() - } + if err = dir.wfs.metaCache.DeleteEntry(context.Background(), filePath); err != nil { + glog.V(3).Infof("local DeleteEntry %s: %v", filePath, err) + return fuse.ESTALE } - dir.wfs.fsNodeCache.DeleteFsNode(filePath) // remove current file handle if any dir.wfs.handlesLock.Lock() defer dir.wfs.handlesLock.Unlock() - inodeId := util.NewFullPath(dir.FullPath(), req.Name).AsInode() + inodeId := filePath.AsInode() delete(dir.wfs.handles, inodeId) return nil @@ -408,20 +440,20 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { + dirFullPath := dir.FullPath() glog.V(3).Infof("remove directory entry: %v", req) ignoreRecursiveErr := true // ignore recursion error since the OS should manage it - err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, ignoreRecursiveErr, false, []int32{dir.wfs.signature}) + err := filer_pb.Remove(dir.wfs, dirFullPath, req.Name, true, true, ignoreRecursiveErr, false, []int32{dir.wfs.signature}) if err != nil { - glog.V(0).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err) + glog.V(0).Infof("remove %s/%s: %v", dirFullPath, req.Name, err) if strings.Contains(err.Error(), "non-empty") { return fuse.EEXIST } return fuse.ENOENT } - t := util.NewFullPath(dir.FullPath(), req.Name) + t := util.NewFullPath(dirFullPath, req.Name) dir.wfs.metaCache.DeleteEntry(context.Background(), t) - dir.wfs.fsNodeCache.DeleteFsNode(t) return nil @@ -431,27 +463,28 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus glog.V(4).Infof("%v dir setattr %+v", dir.FullPath(), req) - if err := dir.maybeLoadEntry(); err != nil { + entry, err := dir.maybeLoadEntry() + if err != nil { return err } if req.Valid.Mode() { - dir.entry.Attributes.FileMode = uint32(req.Mode) + entry.Attributes.FileMode = uint32(req.Mode) } if req.Valid.Uid() { - dir.entry.Attributes.Uid = req.Uid + entry.Attributes.Uid = req.Uid } if req.Valid.Gid() { - dir.entry.Attributes.Gid = req.Gid + entry.Attributes.Gid = req.Gid } if req.Valid.Mtime() { - dir.entry.Attributes.Mtime = req.Mtime.Unix() + entry.Attributes.Mtime = req.Mtime.Unix() } - return dir.saveEntry() + return dir.saveEntry(entry) } @@ -459,15 +492,16 @@ func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { glog.V(4).Infof("dir Setxattr %s: %s", dir.FullPath(), req.Name) - if err := dir.maybeLoadEntry(); err != nil { + entry, err := dir.maybeLoadEntry() + if err != nil { return err } - if err := setxattr(dir.entry, req); err != nil { + if err := setxattr(entry, req); err != nil { return err } - return dir.saveEntry() + return dir.saveEntry(entry) } @@ -475,15 +509,16 @@ func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) e glog.V(4).Infof("dir Removexattr %s: %s", dir.FullPath(), req.Name) - if err := dir.maybeLoadEntry(); err != nil { + entry, err := dir.maybeLoadEntry() + if err != nil { return err } - if err := removexattr(dir.entry, req); err != nil { + if err := removexattr(entry, req); err != nil { return err } - return dir.saveEntry() + return dir.saveEntry(entry) } @@ -491,11 +526,12 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp glog.V(4).Infof("dir Listxattr %s", dir.FullPath()) - if err := dir.maybeLoadEntry(); err != nil { + entry, err := dir.maybeLoadEntry() + if err != nil { return err } - if err := listxattr(dir.entry, req, resp); err != nil { + if err := listxattr(entry, req, resp); err != nil { return err } @@ -505,34 +541,25 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp func (dir *Dir) Forget() { glog.V(4).Infof("Forget dir %s", dir.FullPath()) - - dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath())) } -func (dir *Dir) maybeLoadEntry() error { - if dir.entry == nil { - parentDirPath, name := util.FullPath(dir.FullPath()).DirAndName() - entry, err := dir.wfs.maybeLoadEntry(parentDirPath, name) - if err != nil { - return err - } - dir.entry = entry - } - return nil +func (dir *Dir) maybeLoadEntry() (*filer_pb.Entry, error) { + parentDirPath, name := util.FullPath(dir.FullPath()).DirAndName() + return dir.wfs.maybeLoadEntry(parentDirPath, name) } -func (dir *Dir) saveEntry() error { +func (dir *Dir) saveEntry(entry *filer_pb.Entry) error { parentDir, name := util.FullPath(dir.FullPath()).DirAndName() return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - dir.wfs.mapPbIdFromLocalToFiler(dir.entry) - defer dir.wfs.mapPbIdFromFilerToLocal(dir.entry) + dir.wfs.mapPbIdFromLocalToFiler(entry) + defer dir.wfs.mapPbIdFromFilerToLocal(entry) request := &filer_pb.UpdateEntryRequest{ Directory: parentDir, - Entry: dir.entry, + Entry: entry, Signatures: []int32{dir.wfs.signature}, } @@ -543,7 +570,10 @@ func (dir *Dir) saveEntry() error { return fuse.EIO } - dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) + if err := dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil { + glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err) + return fuse.ESTALE + } return nil }) diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index ba3280f03..acdcd2de4 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -31,19 +31,24 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f glog.V(4).Infof("Link: %v/%v -> %v/%v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName) - if _, err := oldFile.maybeLoadEntry(ctx); err != nil { + oldEntry, err := oldFile.maybeLoadEntry(ctx) + if err != nil { return nil, err } + if oldEntry == nil { + return nil, fuse.EIO + } + // update old file to hardlink mode - if len(oldFile.entry.HardLinkId) == 0 { - oldFile.entry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER) - oldFile.entry.HardLinkCounter = 1 + if len(oldEntry.HardLinkId) == 0 { + oldEntry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER) + oldEntry.HardLinkCounter = 1 } - oldFile.entry.HardLinkCounter++ + oldEntry.HardLinkCounter++ updateOldEntryRequest := &filer_pb.UpdateEntryRequest{ Directory: oldFile.dir.FullPath(), - Entry: oldFile.entry, + Entry: oldEntry, Signatures: []int32{dir.wfs.signature}, } @@ -53,17 +58,17 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f Entry: &filer_pb.Entry{ Name: req.NewName, IsDirectory: false, - Attributes: oldFile.entry.Attributes, - Chunks: oldFile.entry.Chunks, - Extended: oldFile.entry.Extended, - HardLinkId: oldFile.entry.HardLinkId, - HardLinkCounter: oldFile.entry.HardLinkCounter, + Attributes: oldEntry.Attributes, + Chunks: oldEntry.Chunks, + Extended: oldEntry.Extended, + HardLinkId: oldEntry.HardLinkId, + HardLinkCounter: oldEntry.HardLinkCounter, }, Signatures: []int32{dir.wfs.signature}, } // apply changes to the filer, and also apply to local metaCache - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { dir.wfs.mapPbIdFromLocalToFiler(request.Entry) defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) @@ -83,12 +88,13 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f return nil }) + if err != nil { + return nil, fuse.EIO + } + // create new file node - newNode := dir.newFile(req.NewName, request.Entry) + newNode := dir.newFile(req.NewName) newFile := newNode.(*File) - if _, err := newFile.maybeLoadEntry(ctx); err != nil { - return nil, err - } return newFile, err @@ -130,7 +136,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, return nil }) - symlink := dir.newFile(req.NewName, request.Entry) + symlink := dir.newFile(req.NewName) return symlink, err diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index d2acad4b2..b07710d17 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -64,19 +64,28 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector return fuse.EIO } - // fmt.Printf("rename path: %v => %v\n", oldPath, newPath) - dir.wfs.fsNodeCache.Move(oldPath, newPath) + oldFsNode := NodeWithId(oldPath.AsInode()) + newFsNode := NodeWithId(newPath.AsInode()) + dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) { + if file, ok := internalNode.(*File); ok { + glog.V(4).Infof("internal node %s", file.Name) + file.Name = req.NewName + file.id = uint64(newFsNode) + } + }) // change file handle dir.wfs.handlesLock.Lock() defer dir.wfs.handlesLock.Unlock() inodeId := oldPath.AsInode() existingHandle, found := dir.wfs.handles[inodeId] + glog.V(4).Infof("has open filehandle %s: %v", oldPath, found) if !found || existingHandle == nil { - return err + return nil } + glog.V(4).Infof("opened filehandle %s => %s", oldPath, newPath) delete(dir.wfs.handles, inodeId) dir.wfs.handles[newPath.AsInode()] = existingHandle - return err + return nil } diff --git a/weed/filesys/dir_test.go b/weed/filesys/dir_test.go deleted file mode 100644 index 49c76eb5e..000000000 --- a/weed/filesys/dir_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package filesys - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestDirPath(t *testing.T) { - - p := &Dir{name: "/some"} - p = &Dir{name: "path", parent: p} - p = &Dir{name: "to", parent: p} - p = &Dir{name: "a", parent: p} - p = &Dir{name: "file", parent: p} - - assert.Equal(t, "/some/path/to/a/file", p.FullPath()) - - p = &Dir{name: "/some"} - assert.Equal(t, "/some", p.FullPath()) - - p = &Dir{name: "/"} - assert.Equal(t, "/", p.FullPath()) - - p = &Dir{name: "/"} - p = &Dir{name: "path", parent: p} - assert.Equal(t, "/path", p.FullPath()) - - p = &Dir{name: "/"} - p = &Dir{name: "path", parent: p} - p = &Dir{name: "to", parent: p} - assert.Equal(t, "/path/to", p.FullPath()) - -} diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index f05a3a56a..8888cff96 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -30,7 +30,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages { func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) { - glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize) + glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data))) if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { // this is more than what buffer can hold. @@ -69,7 +69,12 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD return false } - fileSize := int64(pages.f.entry.Attributes.FileSize) + entry := pages.f.getEntry() + if entry == nil { + return false + } + + fileSize := int64(entry.Attributes.FileSize) chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) if chunkSize == 0 { diff --git a/weed/filesys/file.go b/weed/filesys/file.go index a210c5152..bb57988cd 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -2,7 +2,6 @@ package filesys import ( "context" - "io" "os" "sort" "time" @@ -19,6 +18,7 @@ import ( const blockSize = 512 var _ = fs.Node(&File{}) +var _ = fs.NodeIdentifier(&File{}) var _ = fs.NodeOpener(&File{}) var _ = fs.NodeFsyncer(&File{}) var _ = fs.NodeSetattrer(&File{}) @@ -29,32 +29,37 @@ var _ = fs.NodeListxattrer(&File{}) var _ = fs.NodeForgetter(&File{}) type File struct { - Name string - dir *Dir - wfs *WFS - entry *filer_pb.Entry - entryViewCache []filer.VisibleInterval - isOpen int - reader io.ReaderAt - dirtyMetadata bool + Name string + dir *Dir + wfs *WFS + entry *filer_pb.Entry + isOpen int + dirtyMetadata bool + id uint64 } func (file *File) fullpath() util.FullPath { return util.NewFullPath(file.dir.FullPath(), file.Name) } +func (file *File) Id() uint64 { + return file.id +} + func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) { glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr) - entry := file.entry - if file.isOpen <= 0 || entry == nil { - if entry, err = file.maybeLoadEntry(ctx); err != nil { - return err - } + entry, err := file.maybeLoadEntry(ctx) + if err != nil { + return err } - // attr.Inode = file.fullpath().AsInode() + if entry == nil { + return fuse.ENOENT + } + + attr.Inode = file.Id() attr.Valid = time.Second attr.Mode = os.FileMode(entry.Attributes.FileMode) attr.Size = filer.FileSize(entry) @@ -106,13 +111,13 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f glog.V(4).Infof("%v file setattr %+v", file.fullpath(), req) - _, err := file.maybeLoadEntry(ctx) + entry, err := file.maybeLoadEntry(ctx) if err != nil { return err } if file.isOpen > 0 { file.wfs.handlesLock.Lock() - fileHandle := file.wfs.handles[file.fullpath().AsInode()] + fileHandle := file.wfs.handles[file.Id()] file.wfs.handlesLock.Unlock() if fileHandle != nil { @@ -123,12 +128,12 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f if req.Valid.Size() { - glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(file.entry.Chunks)) - if req.Size < filer.FileSize(file.entry) { + glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks)) + if req.Size < filer.FileSize(entry) { // fmt.Printf("truncate %v \n", fullPath) var chunks []*filer_pb.FileChunk var truncatedChunks []*filer_pb.FileChunk - for _, chunk := range file.entry.Chunks { + for _, chunk := range entry.Chunks { int64Size := int64(chunk.Size) if chunk.Offset+int64Size > int64(req.Size) { // this chunk is truncated @@ -143,36 +148,34 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f } } } - file.entry.Chunks = chunks - file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks) - file.reader = nil + entry.Chunks = chunks } - file.entry.Attributes.FileSize = req.Size + entry.Attributes.FileSize = req.Size file.dirtyMetadata = true } if req.Valid.Mode() { - file.entry.Attributes.FileMode = uint32(req.Mode) + entry.Attributes.FileMode = uint32(req.Mode) file.dirtyMetadata = true } if req.Valid.Uid() { - file.entry.Attributes.Uid = req.Uid + entry.Attributes.Uid = req.Uid file.dirtyMetadata = true } if req.Valid.Gid() { - file.entry.Attributes.Gid = req.Gid + entry.Attributes.Gid = req.Gid file.dirtyMetadata = true } if req.Valid.Crtime() { - file.entry.Attributes.Crtime = req.Crtime.Unix() + entry.Attributes.Crtime = req.Crtime.Unix() file.dirtyMetadata = true } if req.Valid.Mtime() { - file.entry.Attributes.Mtime = req.Mtime.Unix() + entry.Attributes.Mtime = req.Mtime.Unix() file.dirtyMetadata = true } @@ -188,7 +191,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f return nil } - return file.saveEntry(file.entry) + return file.saveEntry(entry) } @@ -254,14 +257,20 @@ func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { func (file *File) Forget() { t := util.NewFullPath(file.dir.FullPath(), file.Name) glog.V(4).Infof("Forget file %s", t) - file.wfs.fsNodeCache.DeleteFsNode(t) + file.wfs.ReleaseHandle(t, fuse.HandleID(t.AsInode())) } func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) { + + file.wfs.handlesLock.Lock() + handle, found := file.wfs.handles[file.Id()] + file.wfs.handlesLock.Unlock() entry = file.entry - if file.isOpen > 0 { - return entry, nil + if found { + glog.V(4).Infof("maybeLoadEntry found opened file %s/%s: %v %v", file.dir.FullPath(), file.Name, handle.f.entry, entry) + entry = handle.f.entry } + if entry != nil { if len(entry.HardLinkId) == 0 { // only always reload hard link @@ -274,7 +283,7 @@ func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, er return entry, err } if entry != nil { - file.setEntry(entry) + // file.entry = entry } else { glog.Warningf("maybeLoadEntry not found entry %s/%s: %v", file.dir.FullPath(), file.Name, err) } @@ -299,8 +308,13 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { } } + entry := file.getEntry() + if entry == nil { + return + } + // pick out-of-order chunks from existing chunks - for _, chunk := range file.entry.Chunks { + for _, chunk := range entry.Chunks { if lessThan(earliestChunk, chunk) { chunks = append(chunks, chunk) } @@ -311,28 +325,9 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { return lessThan(chunks[i], chunks[j]) }) - // add to entry view cache - for _, chunk := range chunks { - file.entryViewCache = filer.MergeIntoVisibles(file.entryViewCache, chunk) - } - - file.reader = nil - - glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks)) + glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(entry.Chunks), len(chunks)) - file.entry.Chunks = append(file.entry.Chunks, newChunks...) -} - -func (file *File) setEntry(entry *filer_pb.Entry) { - file.entry = entry - file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks) - file.reader = nil -} - -func (file *File) clearEntry() { - file.entry = nil - file.entryViewCache = nil - file.reader = nil + entry.Chunks = append(entry.Chunks, newChunks...) } func (file *File) saveEntry(entry *filer_pb.Entry) error { @@ -359,3 +354,7 @@ func (file *File) saveEntry(entry *filer_pb.Entry) error { return nil }) } + +func (file *File) getEntry() *filer_pb.Entry { + return file.entry +} diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index fb073c9cd..27ffab6e1 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -20,10 +20,12 @@ import ( type FileHandle struct { // cache file has been written to - dirtyPages *ContinuousDirtyPages - contentType string - handle uint64 - sync.RWMutex + dirtyPages *ContinuousDirtyPages + entryViewCache []filer.VisibleInterval + reader io.ReaderAt + contentType string + handle uint64 + sync.Mutex f *File RequestId fuse.RequestID // unique ID for request @@ -40,8 +42,9 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle { Uid: uid, Gid: gid, } - if fh.f.entry != nil { - fh.f.entry.Attributes.FileSize = filer.FileSize(fh.f.entry) + entry := fh.f.getEntry() + if entry != nil { + entry.Attributes.FileSize = filer.FileSize(entry) } return fh @@ -58,8 +61,8 @@ var _ = fs.HandleReleaser(&FileHandle{}) func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data)) - fh.RLock() - defer fh.RUnlock() + fh.Lock() + defer fh.Unlock() if req.Size <= 0 { return nil @@ -104,42 +107,48 @@ func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxSto func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { - fileSize := int64(filer.FileSize(fh.f.entry)) + entry := fh.f.getEntry() + if entry == nil { + return 0, io.EOF + } + + fileSize := int64(filer.FileSize(entry)) + fileFullPath := fh.f.fullpath() if fileSize == 0 { - glog.V(1).Infof("empty fh %v", fh.f.fullpath()) + glog.V(1).Infof("empty fh %v", fileFullPath) return 0, io.EOF } - if offset+int64(len(buff)) <= int64(len(fh.f.entry.Content)) { - totalRead := copy(buff, fh.f.entry.Content[offset:]) - glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead) + if offset+int64(len(buff)) <= int64(len(entry.Content)) { + totalRead := copy(buff, entry.Content[offset:]) + glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead) return int64(totalRead), nil } var chunkResolveErr error - if fh.f.entryViewCache == nil { - fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks) + if fh.entryViewCache == nil { + fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks) if chunkResolveErr != nil { return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) } - fh.f.reader = nil + fh.reader = nil } - reader := fh.f.reader + reader := fh.reader if reader == nil { - chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64) + chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64) reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize) } - fh.f.reader = reader + fh.reader = reader totalRead, err := reader.ReadAt(buff, offset) if err != nil && err != io.EOF { - glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) + glog.Errorf("file handle read %s: %v", fileFullPath, err) } - glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err) + glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err) return int64(totalRead), err } @@ -158,8 +167,13 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f copy(data, req.Data) } - fh.f.entry.Content = nil - fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize))) + entry := fh.f.getEntry() + if entry == nil { + return fuse.EIO + } + + entry.Content = nil + entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize))) glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) fh.dirtyPages.AddPage(req.Offset, data) @@ -179,30 +193,27 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error { - glog.V(4).Infof("Release %v fh %d", fh.f.fullpath(), fh.handle) + glog.V(4).Infof("Release %v fh %d open=%d", fh.f.fullpath(), fh.handle, fh.f.isOpen) fh.Lock() defer fh.Unlock() + fh.f.isOpen-- + if fh.f.isOpen <= 0 { + fh.f.entry = nil + fh.entryViewCache = nil + fh.reader = nil + + fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) + } + + if fh.f.isOpen < 0 { glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0) fh.f.isOpen = 0 return nil } - if fh.f.isOpen == 1 { - - fh.f.isOpen-- - - fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) - if closer, ok := fh.f.reader.(io.Closer); ok { - if closer != nil { - closer.Close() - } - } - fh.f.reader = nil - } - return nil } @@ -242,35 +253,40 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - if fh.f.entry.Attributes != nil { - fh.f.entry.Attributes.Mime = fh.contentType - if fh.f.entry.Attributes.Uid == 0 { - fh.f.entry.Attributes.Uid = header.Uid + entry := fh.f.getEntry() + if entry == nil { + return nil + } + + if entry.Attributes != nil { + entry.Attributes.Mime = fh.contentType + if entry.Attributes.Uid == 0 { + entry.Attributes.Uid = header.Uid } - if fh.f.entry.Attributes.Gid == 0 { - fh.f.entry.Attributes.Gid = header.Gid + if entry.Attributes.Gid == 0 { + entry.Attributes.Gid = header.Gid } - if fh.f.entry.Attributes.Crtime == 0 { - fh.f.entry.Attributes.Crtime = time.Now().Unix() + if entry.Attributes.Crtime == 0 { + entry.Attributes.Crtime = time.Now().Unix() } - fh.f.entry.Attributes.Mtime = time.Now().Unix() - fh.f.entry.Attributes.FileMode = uint32(os.FileMode(fh.f.entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask) - fh.f.entry.Attributes.Collection = fh.dirtyPages.collection - fh.f.entry.Attributes.Replication = fh.dirtyPages.replication + entry.Attributes.Mtime = time.Now().Unix() + entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask) + entry.Attributes.Collection = fh.dirtyPages.collection + entry.Attributes.Replication = fh.dirtyPages.replication } request := &filer_pb.CreateEntryRequest{ Directory: fh.f.dir.FullPath(), - Entry: fh.f.entry, + Entry: entry, Signatures: []int32{fh.f.wfs.signature}, } - glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks)) - for i, chunk := range fh.f.entry.Chunks { + glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(entry.Chunks)) + for i, chunk := range entry.Chunks { glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) } - manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks) + manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks) chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks) chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks) @@ -278,7 +294,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", manifestErr) } - fh.f.entry.Chunks = append(chunks, manifestChunks...) + entry.Chunks = append(chunks, manifestChunks...) fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry) defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry) diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go index fdec8253c..6b1012090 100644 --- a/weed/filesys/fscache.go +++ b/weed/filesys/fscache.go @@ -124,8 +124,9 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { } if f, ok := src.node.(*File); ok { f.Name = target.name - if f.entry != nil { - f.entry.Name = f.Name + entry := f.getEntry() + if entry != nil { + entry.Name = f.Name } } parent.disconnectChild(target) diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index c6d9080a1..42816d23d 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -41,7 +41,6 @@ type Option struct { CacheDir string CacheSizeMB int64 DataCenter string - EntryCacheTtl time.Duration Umask os.FileMode MountUid uint32 @@ -104,24 +103,16 @@ func NewSeaweedFileSystem(option *Option) *WFS { } wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) { - fsNode := wfs.fsNodeCache.GetFsNode(filePath) - if fsNode != nil { - if file, ok := fsNode.(*File); ok { - if err := wfs.Server.InvalidateNodeData(file); err != nil { - glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err) - } - file.clearEntry() - } + + fsNode := NodeWithId(filePath.AsInode()) + if err := wfs.Server.InvalidateNodeData(fsNode); err != nil { + glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err) } + dir, name := filePath.DirAndName() - parent := wfs.root - if dir != "/" { - parent = wfs.fsNodeCache.GetFsNode(util.FullPath(dir)) - } - if parent != nil { - if err := wfs.Server.InvalidateEntry(parent, name); err != nil { - glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err) - } + parent := NodeWithId(util.FullPath(dir).AsInode()) + if err := wfs.Server.InvalidateEntry(parent, name); err != nil { + glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err) } }) startTime := time.Now() @@ -130,8 +121,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.metaCache.Shutdown() }) - entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath)) - wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry} + wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} wfs.fsNodeCache = newFsCache(wfs.root) if wfs.option.ConcurrentWriters > 0 { @@ -150,25 +140,28 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand fullpath := file.fullpath() glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid) - wfs.handlesLock.Lock() - defer wfs.handlesLock.Unlock() + inodeId := file.Id() - inodeId := file.fullpath().AsInode() - if file.isOpen > 0 { - existingHandle, found := wfs.handles[inodeId] - if found && existingHandle != nil { - file.isOpen++ - return existingHandle - } + wfs.handlesLock.Lock() + existingHandle, found := wfs.handles[inodeId] + wfs.handlesLock.Unlock() + if found && existingHandle != nil { + existingHandle.f.isOpen++ + glog.V(4).Infof("Acquired Handle %s open %d", fullpath, existingHandle.f.isOpen) + return existingHandle } + entry, _ := file.maybeLoadEntry(context.Background()) + file.entry = entry fileHandle = newFileHandle(file, uid, gid) - file.maybeLoadEntry(context.Background()) file.isOpen++ + wfs.handlesLock.Lock() wfs.handles[inodeId] = fileHandle + wfs.handlesLock.Unlock() fileHandle.handle = inodeId + glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen) return } @@ -176,9 +169,9 @@ func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) { wfs.handlesLock.Lock() defer wfs.handlesLock.Unlock() - glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles)) + glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles)) - delete(wfs.handles, fullpath.AsInode()) + delete(wfs.handles, uint64(handleId)) return } @@ -268,3 +261,11 @@ func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { return filer.LookupFn(wfs) } + +type NodeWithId uint64 +func (n NodeWithId) Id() uint64 { + return uint64(n) +} +func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error { + return nil +} |
