diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 173 | ||||
| -rw-r--r-- | weed/filesys/dir_link.go | 93 | ||||
| -rw-r--r-- | weed/filesys/dir_rename.go | 51 | ||||
| -rw-r--r-- | weed/filesys/dirty_page.go | 210 | ||||
| -rw-r--r-- | weed/filesys/dirty_page_interval.go | 31 | ||||
| -rw-r--r-- | weed/filesys/dirty_page_interval_test.go | 24 | ||||
| -rw-r--r-- | weed/filesys/file.go | 120 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 215 | ||||
| -rw-r--r-- | weed/filesys/fscache.go | 13 | ||||
| -rw-r--r-- | weed/filesys/fscache_test.go | 19 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/id_mapper.go | 101 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache.go | 99 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache_init.go | 41 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache_subscribe.go | 39 | ||||
| -rw-r--r-- | weed/filesys/unimplemented.go | 22 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 130 | ||||
| -rw-r--r-- | weed/filesys/wfs_deletion.go | 19 | ||||
| -rw-r--r-- | weed/filesys/wfs_filer_client.go | 31 | ||||
| -rw-r--r-- | weed/filesys/wfs_write.go | 71 | ||||
| -rw-r--r-- | weed/filesys/xattr.go | 45 |
20 files changed, 1004 insertions, 543 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index e4260d56f..ae2ae3418 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -3,16 +3,19 @@ package filesys import ( "bytes" "context" + "math" "os" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" ) type Dir struct { @@ -25,6 +28,7 @@ type Dir struct { var _ = fs.Node(&Dir{}) var _ = fs.NodeCreater(&Dir{}) var _ = fs.NodeMkdirer(&Dir{}) +var _ = fs.NodeFsyncer(&Dir{}) var _ = fs.NodeRequestLookuper(&Dir{}) var _ = fs.HandleReadDirAller(&Dir{}) var _ = fs.NodeRemover(&Dir{}) @@ -88,8 +92,16 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { attr.BlockSize = 1024 * 1024 } +func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { + // fsync works at OS level + // write the file chunks to the filerGrpcAddress + glog.V(3).Infof("dir %s fsync %+v", dir.FullPath(), req) + + return nil +} + func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { - return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node { + f := dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node { return &File{ Name: name, dir: dir, @@ -98,14 +110,17 @@ func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { entryViewCache: nil, } }) + f.(*File).dir = dir // in case dir node was created later + return f } func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node { - return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node { + d := dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node { return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir} }) - + d.(*Dir).parent = dir // in case dir node was created later + return d } func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, @@ -127,21 +142,25 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, TtlSec: dir.wfs.option.TtlSec, }, }, - OExcl: req.Flags&fuse.OpenExclusive != 0, + OExcl: req.Flags&fuse.OpenExclusive != 0, + Signatures: []int32{dir.wfs.signature}, } glog.V(1).Infof("create %s/%s: %v", dir.FullPath(), req.Name, req.Flags) if err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + dir.wfs.mapPbIdFromLocalToFiler(request.Entry) + defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) + if err := filer_pb.CreateEntry(client, request); err != nil { if strings.Contains(err.Error(), "EEXIST") { return fuse.EEXIST } + glog.V(0).Infof("create %s/%s: %v", dir.FullPath(), req.Name, err) return fuse.EIO } - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) return nil }); err != nil { @@ -155,7 +174,6 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, node = dir.newFile(req.Name, request.Entry) file := node.(*File) - file.isOpen++ fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid) return file, fh, nil @@ -179,9 +197,13 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + dir.wfs.mapPbIdFromLocalToFiler(newEntry) + defer dir.wfs.mapPbIdFromFilerToLocal(newEntry) + request := &filer_pb.CreateEntryRequest{ - Directory: dir.FullPath(), - Entry: newEntry, + Directory: dir.FullPath(), + Entry: newEntry, + Signatures: []int32{dir.wfs.signature}, } glog.V(1).Infof("mkdir: %v", request) @@ -190,9 +212,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err return err } - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) return nil }) @@ -213,15 +233,17 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String()) fullFilePath := util.NewFullPath(dir.FullPath(), req.Name) - entry := dir.wfs.cacheGet(fullFilePath) - - if dir.wfs.option.AsyncMetaDataCaching { - cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) - if cacheErr == filer_pb.ErrNotFound { - return nil, fuse.ENOENT - } - entry = cachedEntry.ToProtoEntry() + dirPath := util.FullPath(dir.FullPath()) + visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) + if visitErr != nil { + glog.Errorf("dir Lookup %s: %v", dirPath, visitErr) + return nil, fuse.EIO } + cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT + } + entry := cachedEntry.ToProtoEntry() if entry == nil { // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) @@ -230,7 +252,6 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err) return nil, fuse.ENOENT } - dir.wfs.cacheSet(fullFilePath, entry, 5*time.Minute) } else { glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath) } @@ -250,6 +271,9 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. resp.Attr.Mode = os.FileMode(entry.Attributes.FileMode) resp.Attr.Gid = entry.Attributes.Gid resp.Attr.Uid = entry.Attributes.Uid + if entry.HardLinkCounter > 0 { + resp.Attr.Nlink = uint32(entry.HardLinkCounter) + } return node, nil } @@ -260,9 +284,8 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath()) + glog.V(4).Infof("dir ReadDirAll %s", dir.FullPath()) - cacheTtl := 5 * time.Minute processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error { fullpath := util.NewFullPath(dir.FullPath(), entry.Name) inode := fullpath.AsInode() @@ -273,29 +296,23 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_File} ret = append(ret, dirent) } - dir.wfs.cacheSet(fullpath, entry, cacheTtl) return nil } - if dir.wfs.option.AsyncMetaDataCaching { - listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit)) - if listErr != nil { - glog.Errorf("list meta cache: %v", listErr) - return nil, fuse.EIO - } - for _, cachedEntry := range listedEntries { - processEachEntryFn(cachedEntry.ToProtoEntry(), false) - } - return + dirPath := util.FullPath(dir.FullPath()) + if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil { + glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) + return nil, fuse.EIO } - - readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", processEachEntryFn) - if readErr != nil { - glog.V(0).Infof("list %s: %v", dir.FullPath(), err) - return ret, fuse.EIO + listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(math.MaxInt32)) + if listErr != nil { + glog.Errorf("list meta cache: %v", listErr) + return nil, fuse.EIO } - - return ret, err + for _, cachedEntry := range listedEntries { + processEachEntryFn(cachedEntry.ToProtoEntry(), false) + } + return } func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { @@ -319,50 +336,52 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { return nil } - dir.wfs.deleteFileChunks(entry.Chunks) - - dir.wfs.cacheDelete(filePath) - dir.wfs.fsNodeCache.DeleteFsNode(filePath) - - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) - } - + // first, ensure the filer store can correctly delete glog.V(3).Infof("remove file: %v", req) - err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false) + isDeleteData := entry.HardLinkCounter <= 1 + err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, isDeleteData, false, false, false, []int32{dir.wfs.signature}) if err != nil { glog.V(3).Infof("not found remove file %s/%s: %v", dir.FullPath(), req.Name, err) return fuse.ENOENT } + // then, delete meta cache and fsNode cache + dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) + dir.wfs.fsNodeCache.DeleteFsNode(filePath) + + // delete the chunks last + if isDeleteData { + dir.wfs.deleteFileChunks(entry.Chunks) + } + return nil } func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { - t := util.NewFullPath(dir.FullPath(), req.Name) - dir.wfs.cacheDelete(t) - dir.wfs.fsNodeCache.DeleteFsNode(t) - - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.DeleteEntry(context.Background(), t) - } - glog.V(3).Infof("remove directory entry: %v", req) - err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false) + ignoreRecursiveErr := true // ignore recursion error since the OS should manage it + err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, ignoreRecursiveErr, false, []int32{dir.wfs.signature}) if err != nil { - glog.V(3).Infof("not found remove %s/%s: %v", dir.FullPath(), req.Name, err) + glog.V(0).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err) + if strings.Contains(err.Error(), "non-empty") { + return fuse.EEXIST + } return fuse.ENOENT } + t := util.NewFullPath(dir.FullPath(), req.Name) + dir.wfs.metaCache.DeleteEntry(context.Background(), t) + dir.wfs.fsNodeCache.DeleteFsNode(t) + return nil } func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - glog.V(3).Infof("%v dir setattr %+v", dir.FullPath(), req) + glog.V(4).Infof("%v dir setattr %+v", dir.FullPath(), req) if err := dir.maybeLoadEntry(); err != nil { return err @@ -384,8 +403,6 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus dir.entry.Attributes.Mtime = req.Mtime.Unix() } - dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) - return dir.saveEntry() } @@ -402,8 +419,6 @@ func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { return err } - dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) - return dir.saveEntry() } @@ -420,8 +435,6 @@ func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) e return err } - dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) - return dir.saveEntry() } @@ -443,7 +456,7 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp } func (dir *Dir) Forget() { - glog.V(3).Infof("Forget dir %s", dir.FullPath()) + glog.V(4).Infof("Forget dir %s", dir.FullPath()) dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath())) } @@ -466,21 +479,23 @@ func (dir *Dir) saveEntry() error { return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + dir.wfs.mapPbIdFromLocalToFiler(dir.entry) + defer dir.wfs.mapPbIdFromFilerToLocal(dir.entry) + request := &filer_pb.UpdateEntryRequest{ - Directory: parentDir, - Entry: dir.entry, + Directory: parentDir, + Entry: dir.entry, + Signatures: []int32{dir.wfs.signature}, } glog.V(1).Infof("save dir entry: %v", request) _, err := client.UpdateEntry(context.Background(), request) if err != nil { - glog.V(0).Infof("UpdateEntry dir %s/%s: %v", parentDir, name, err) + glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err) return fuse.EIO } - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) return nil }) diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index d1858e99b..f6bc41b56 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -2,23 +2,101 @@ package filesys import ( "context" + "github.com/chrislusf/seaweedfs/weed/util" "os" "syscall" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" ) +var _ = fs.NodeLinker(&Dir{}) var _ = fs.NodeSymlinker(&Dir{}) var _ = fs.NodeReadlinker(&File{}) +const ( + HARD_LINK_MARKER = '\x01' +) + +func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (fs.Node, error) { + + oldFile, ok := old.(*File) + if !ok { + glog.Errorf("old node is not a file: %+v", old) + } + + glog.V(4).Infof("Link: %v/%v -> %v/%v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName) + + if err := oldFile.maybeLoadEntry(ctx); err != nil { + return nil, err + } + + // update old file to hardlink mode + if len(oldFile.entry.HardLinkId) == 0 { + oldFile.entry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER) + oldFile.entry.HardLinkCounter = 1 + } + oldFile.entry.HardLinkCounter++ + updateOldEntryRequest := &filer_pb.UpdateEntryRequest{ + Directory: oldFile.dir.FullPath(), + Entry: oldFile.entry, + Signatures: []int32{dir.wfs.signature}, + } + + // CreateLink 1.2 : update new file to hardlink mode + request := &filer_pb.CreateEntryRequest{ + Directory: dir.FullPath(), + Entry: &filer_pb.Entry{ + Name: req.NewName, + IsDirectory: false, + Attributes: oldFile.entry.Attributes, + Chunks: oldFile.entry.Chunks, + Extended: oldFile.entry.Extended, + HardLinkId: oldFile.entry.HardLinkId, + HardLinkCounter: oldFile.entry.HardLinkCounter, + }, + Signatures: []int32{dir.wfs.signature}, + } + + // apply changes to the filer, and also apply to local metaCache + err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + dir.wfs.mapPbIdFromLocalToFiler(request.Entry) + defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) + + if err := filer_pb.UpdateEntry(client, updateOldEntryRequest); err != nil { + glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err) + return fuse.EIO + } + dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(updateOldEntryRequest.Directory, updateOldEntryRequest.Entry)) + + if err := filer_pb.CreateEntry(client, request); err != nil { + glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err) + return fuse.EIO + } + dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) + + return nil + }) + + // create new file node + newNode := dir.newFile(req.NewName, request.Entry) + newFile := newNode.(*File) + if err := newFile.maybeLoadEntry(ctx); err != nil { + return nil, err + } + + return newFile, err + +} + func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, error) { - glog.V(3).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target) + glog.V(4).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target) request := &filer_pb.CreateEntryRequest{ Directory: dir.FullPath(), @@ -34,17 +112,20 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, SymlinkTarget: req.Target, }, }, + Signatures: []int32{dir.wfs.signature}, } err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + dir.wfs.mapPbIdFromLocalToFiler(request.Entry) + defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry) + if err := filer_pb.CreateEntry(client, request); err != nil { glog.V(0).Infof("symlink %s/%s: %v", dir.FullPath(), req.NewName, err) return fuse.EIO } - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) return nil }) @@ -65,7 +146,7 @@ func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (stri return "", fuse.Errno(syscall.EINVAL) } - glog.V(3).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, file.entry.Attributes.SymlinkTarget) + glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, file.entry.Attributes.SymlinkTarget) return file.entry.Attributes.SymlinkTarget, nil diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index ea40f5c31..3f73d0eb6 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -3,11 +3,12 @@ package filesys import ( "context" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" ) func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error { @@ -19,7 +20,17 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath) - err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + // find local old entry + oldEntry, err := dir.wfs.metaCache.FindEntry(context.Background(), oldPath) + if err != nil { + glog.Errorf("dir Rename can not find source %s : %v", oldPath, err) + return fuse.ENOENT + } + + // update remote filer + err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: dir.FullPath(), @@ -28,24 +39,44 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector NewName: req.NewName, } - _, err := client.AtomicRenameEntry(context.Background(), request) + _, err := client.AtomicRenameEntry(ctx, request) if err != nil { - glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err) + glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err) return fuse.EIO } return nil }) + if err != nil { + glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err) + return fuse.EIO + } - if err == nil { - dir.wfs.cacheDelete(newPath) - dir.wfs.cacheDelete(oldPath) + // TODO: replicate renaming logic on filer + if err := dir.wfs.metaCache.DeleteEntry(context.Background(), oldPath); err != nil { + glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err) + return fuse.EIO + } + oldEntry.FullPath = newPath + if err := dir.wfs.metaCache.InsertEntry(context.Background(), oldEntry); err != nil { + glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err) + return fuse.EIO + } - // fmt.Printf("rename path: %v => %v\n", oldPath, newPath) - dir.wfs.fsNodeCache.Move(oldPath, newPath) + // fmt.Printf("rename path: %v => %v\n", oldPath, newPath) + dir.wfs.fsNodeCache.Move(oldPath, newPath) + // change file handle + dir.wfs.handlesLock.Lock() + defer dir.wfs.handlesLock.Unlock() + inodeId := oldPath.AsInode() + existingHandle, found := dir.wfs.handles[inodeId] + if !found || existingHandle == nil { + return err } + delete(dir.wfs.handles, inodeId) + dir.wfs.handles[newPath.AsInode()] = existingHandle return err } diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 45224b3e7..dd0c48796 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -2,193 +2,126 @@ package filesys import ( "bytes" - "context" - "fmt" "io" + "runtime" "sync" "time" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + concurrentWriterLimit = runtime.NumCPU() + concurrentWriters = util.NewLimitedConcurrentExecutor(4 * concurrentWriterLimit) ) type ContinuousDirtyPages struct { - intervals *ContinuousIntervals - f *File - lock sync.Mutex - collection string - replication string + intervals *ContinuousIntervals + f *File + writeWaitGroup sync.WaitGroup + chunkSaveErrChan chan error + chunkSaveErrChanClosed bool + lastErr error + lock sync.Mutex + collection string + replication string } func newDirtyPages(file *File) *ContinuousDirtyPages { - return &ContinuousDirtyPages{ - intervals: &ContinuousIntervals{}, - f: file, + dirtyPages := &ContinuousDirtyPages{ + intervals: &ContinuousIntervals{}, + f: file, + chunkSaveErrChan: make(chan error, concurrentWriterLimit), } + go func() { + for t := range dirtyPages.chunkSaveErrChan { + if t != nil { + dirtyPages.lastErr = t + } + } + }() + return dirtyPages } -func (pages *ContinuousDirtyPages) releaseResource() { -} - -var counter = int32(0) - -func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { +func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) { - pages.lock.Lock() - defer pages.lock.Unlock() - - glog.V(3).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data))) + glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize) if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { // this is more than what buffer can hold. - return pages.flushAndSave(offset, data) + pages.flushAndSave(offset, data) } pages.intervals.AddInterval(data, offset) - var chunk *filer_pb.FileChunk - var hasSavedData bool - - if pages.intervals.TotalSize() > pages.f.wfs.option.ChunkSizeLimit { - chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage() - if hasSavedData { - chunks = append(chunks, chunk) - } + if pages.intervals.TotalSize() >= pages.f.wfs.option.ChunkSizeLimit { + pages.saveExistingLargestPageToStorage() } return } -func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { - - var chunk *filer_pb.FileChunk - var newChunks []*filer_pb.FileChunk +func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) { // flush existing - if newChunks, err = pages.saveExistingPagesToStorage(); err == nil { - if newChunks != nil { - chunks = append(chunks, newChunks...) - } - } else { - return - } + pages.saveExistingPagesToStorage() // flush the new page - if chunk, err = pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))); err == nil { - if chunk != nil { - glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.FullPath(), pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId) - chunks = append(chunks, chunk) - } - } else { - glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.FullPath(), pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) - return - } + pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))) return } -func (pages *ContinuousDirtyPages) FlushToStorage() (chunks []*filer_pb.FileChunk, err error) { - - pages.lock.Lock() - defer pages.lock.Unlock() - - return pages.saveExistingPagesToStorage() -} - -func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer_pb.FileChunk, err error) { - - var hasSavedData bool - var chunk *filer_pb.FileChunk - - for { - - chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage() - if !hasSavedData { - return chunks, err - } - - if err == nil { - chunks = append(chunks, chunk) - } else { - return - } +func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() { + for pages.saveExistingLargestPageToStorage() { } - } -func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *filer_pb.FileChunk, hasSavedData bool, err error) { +func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedData bool) { maxList := pages.intervals.RemoveLargestIntervalLinkedList() if maxList == nil { - return nil, false, nil + return false } - for { - chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size()) - if err == nil { - hasSavedData = true - glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId) - return - } else { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err) - time.Sleep(5 * time.Second) - } - } + fileSize := int64(pages.f.entry.Attributes.FileSize) -} + chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) + if chunkSize == 0 { + return false + } -func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) { + pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) - var fileId, host string - var auth security.EncodedJwt + return true +} - dir, _ := pages.f.fullpath().DirAndName() +func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { - if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if pages.chunkSaveErrChanClosed { + pages.chunkSaveErrChan = make(chan error, concurrentWriterLimit) + pages.chunkSaveErrChanClosed = false + } - request := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: pages.f.wfs.option.Replication, - Collection: pages.f.wfs.option.Collection, - TtlSec: pages.f.wfs.option.TtlSec, - DataCenter: pages.f.wfs.option.DataCenter, - ParentPath: dir, - } + mtime := time.Now().UnixNano() + pages.writeWaitGroup.Add(1) + go func() { + defer pages.writeWaitGroup.Done() - resp, err := client.AssignVolume(context.Background(), request) + reader = io.LimitReader(reader, size) + chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) if err != nil { - glog.V(0).Infof("assign volume failure %v: %v", request, err) - return err - } - if resp.Error != "" { - return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) + pages.chunkSaveErrChan <- err + return } - - fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) - host = pages.f.wfs.AdjustedUrl(host) - pages.collection, pages.replication = resp.Collection, resp.Replication - - return nil - }); err != nil { - return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err) - } - - fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth) - if err != nil { - glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) - return nil, fmt.Errorf("upload data: %v", err) - } - if uploadResult.Error != "" { - glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err) - return nil, fmt.Errorf("upload result: %v", uploadResult.Error) - } - pages.f.wfs.chunkCache.SetChunk(fileId, data) - - return uploadResult.ToPbFileChunk(fileId, offset), nil - + chunk.Mtime = mtime + pages.collection, pages.replication = collection, replication + pages.f.addChunks([]*filer_pb.FileChunk{chunk}) + glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size) + }() } func max(x, y int64) int64 { @@ -204,11 +137,6 @@ func min(x, y int64) int64 { return y } -func (pages *ContinuousDirtyPages) ReadDirtyData(data []byte, startOffset int64) (offset int64, size int) { - - pages.lock.Lock() - defer pages.lock.Unlock() - - return pages.intervals.ReadData(data, startOffset) - +func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { + return pages.intervals.ReadDataAt(data, startOffset) } diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go index ec94c6df1..1404bf78c 100644 --- a/weed/filesys/dirty_page_interval.go +++ b/weed/filesys/dirty_page_interval.go @@ -3,7 +3,8 @@ package filesys import ( "bytes" "io" - "math" + + "github.com/chrislusf/seaweedfs/weed/util" ) type IntervalNode struct { @@ -91,6 +92,15 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { interval := &IntervalNode{Data: data, Offset: offset, Size: int64(len(data))} + // append to the tail and return + if len(c.lists) == 1 { + lastSpan := c.lists[0] + if lastSpan.Tail.Offset+lastSpan.Tail.Size == offset { + lastSpan.addNodeToTail(interval) + return + } + } + var newLists []*IntervalLinkedList for _, list := range c.lists { // if list is to the left of new interval, add to the new list @@ -186,35 +196,28 @@ func (c *ContinuousIntervals) removeList(target *IntervalLinkedList) { } -func (c *ContinuousIntervals) ReadData(data []byte, startOffset int64) (offset int64, size int) { - var minOffset int64 = math.MaxInt64 - var maxStop int64 +func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) { for _, list := range c.lists { start := max(startOffset, list.Offset()) stop := min(startOffset+int64(len(data)), list.Offset()+list.Size()) - if start <= stop { + if start < stop { list.ReadData(data[start-startOffset:], start, stop) - minOffset = min(minOffset, start) maxStop = max(maxStop, stop) } } - - if minOffset == math.MaxInt64 { - return 0, 0 - } - - offset = minOffset - size = int(maxStop - offset) return } func (l *IntervalLinkedList) ToReader() io.Reader { var readers []io.Reader t := l.Head - readers = append(readers, bytes.NewReader(t.Data)) + readers = append(readers, util.NewBytesReader(t.Data)) for t.Next != nil { t = t.Next readers = append(readers, bytes.NewReader(t.Data)) } + if len(readers) == 1 { + return readers[0] + } return io.MultiReader(readers...) } diff --git a/weed/filesys/dirty_page_interval_test.go b/weed/filesys/dirty_page_interval_test.go index ab3b37b7c..d02ad27fd 100644 --- a/weed/filesys/dirty_page_interval_test.go +++ b/weed/filesys/dirty_page_interval_test.go @@ -2,6 +2,7 @@ package filesys import ( "bytes" + "math/rand" "testing" ) @@ -66,6 +67,29 @@ func TestContinuousIntervals_RealCase1(t *testing.T) { } +func TestRandomWrites(t *testing.T) { + + c := &ContinuousIntervals{} + + data := make([]byte, 1024) + + for i := 0; i < 1024; i++ { + + start, stop := rand.Intn(len(data)), rand.Intn(len(data)) + if start > stop { + start, stop = stop, start + } + + rand.Read(data[start : stop+1]) + + c.AddInterval(data[start:stop+1], int64(start)) + + expectedData(t, c, 0, data...) + + } + +} + func expectedData(t *testing.T, c *ContinuousIntervals, offset int, data ...byte) { start, stop := int64(offset), int64(offset+len(data)) for _, list := range c.lists { diff --git a/weed/filesys/file.go b/weed/filesys/file.go index bafbd7cc8..7aa1016d7 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -7,12 +7,13 @@ import ( "sort" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" ) const blockSize = 512 @@ -32,9 +33,10 @@ type File struct { dir *Dir wfs *WFS entry *filer_pb.Entry - entryViewCache []filer2.VisibleInterval + entryViewCache []filer.VisibleInterval isOpen int reader io.ReaderAt + dirtyMetadata bool } func (file *File) fullpath() util.FullPath { @@ -54,7 +56,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { attr.Inode = file.fullpath().AsInode() attr.Valid = time.Second attr.Mode = os.FileMode(file.entry.Attributes.FileMode) - attr.Size = filer2.TotalSize(file.entry.Chunks) + attr.Size = filer.FileSize(file.entry) if file.isOpen > 0 { attr.Size = file.entry.Attributes.FileSize glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size) @@ -65,6 +67,9 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { attr.Uid = file.entry.Attributes.Uid attr.Blocks = attr.Size/blockSize + 1 attr.BlockSize = uint32(file.wfs.option.ChunkSizeLimit) + if file.entry.HardLinkCounter > 0 { + attr.Nlink = uint32(file.entry.HardLinkCounter) + } return nil @@ -85,13 +90,11 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op glog.V(4).Infof("file %v open %+v", file.fullpath(), req) - file.isOpen++ - handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid) resp.Handle = fuse.HandleID(handle.handle) - glog.V(3).Infof("%v file open handle id = %d", file.fullpath(), handle.handle) + glog.V(4).Infof("%v file open handle id = %d", file.fullpath(), handle.handle) return handle, nil @@ -99,58 +102,89 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.entry.Attributes) + glog.V(4).Infof("%v file setattr %+v", file.fullpath(), req) if err := file.maybeLoadEntry(ctx); err != nil { return err } + if file.isOpen > 0 { + file.wfs.handlesLock.Lock() + fileHandle := file.wfs.handles[file.fullpath().AsInode()] + file.wfs.handlesLock.Unlock() + + if fileHandle != nil { + fileHandle.Lock() + defer fileHandle.Unlock() + } + } if req.Valid.Size() { - glog.V(3).Infof("%v file setattr set size=%v", file.fullpath(), req.Size) - if req.Size < filer2.TotalSize(file.entry.Chunks) { + glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(file.entry.Chunks)) + if req.Size < filer.FileSize(file.entry) { // fmt.Printf("truncate %v \n", fullPath) var chunks []*filer_pb.FileChunk + var truncatedChunks []*filer_pb.FileChunk for _, chunk := range file.entry.Chunks { int64Size := int64(chunk.Size) if chunk.Offset+int64Size > int64(req.Size) { + // this chunk is truncated int64Size = int64(req.Size) - chunk.Offset - } - if int64Size > 0 { - chunks = append(chunks, chunk) + if int64Size > 0 { + chunks = append(chunks, chunk) + glog.V(4).Infof("truncated chunk %+v from %d to %d\n", chunk.GetFileIdString(), chunk.Size, int64Size) + chunk.Size = uint64(int64Size) + } else { + glog.V(4).Infof("truncated whole chunk %+v\n", chunk.GetFileIdString()) + truncatedChunks = append(truncatedChunks, chunk) + } } } file.entry.Chunks = chunks file.entryViewCache = nil file.reader = nil + file.wfs.deleteFileChunks(truncatedChunks) } file.entry.Attributes.FileSize = req.Size + file.dirtyMetadata = true } + if req.Valid.Mode() { file.entry.Attributes.FileMode = uint32(req.Mode) + file.dirtyMetadata = true } if req.Valid.Uid() { file.entry.Attributes.Uid = req.Uid + file.dirtyMetadata = true } if req.Valid.Gid() { file.entry.Attributes.Gid = req.Gid + file.dirtyMetadata = true } if req.Valid.Crtime() { file.entry.Attributes.Crtime = req.Crtime.Unix() + file.dirtyMetadata = true } if req.Valid.Mtime() { file.entry.Attributes.Mtime = req.Mtime.Unix() + file.dirtyMetadata = true + } + + if req.Valid.Handle() { + // fmt.Printf("file handle => %d\n", req.Handle) } if file.isOpen > 0 { return nil } - file.wfs.cacheDelete(file.fullpath()) + if !file.dirtyMetadata { + return nil + } return file.saveEntry() @@ -168,8 +202,6 @@ func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error return err } - file.wfs.cacheDelete(file.fullpath()) - return file.saveEntry() } @@ -186,8 +218,6 @@ func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) return err } - file.wfs.cacheDelete(file.fullpath()) - return file.saveEntry() } @@ -211,27 +241,28 @@ func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, res func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { // fsync works at OS level // write the file chunks to the filerGrpcAddress - glog.V(3).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req) + glog.V(4).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req) return nil } func (file *File) Forget() { t := util.NewFullPath(file.dir.FullPath(), file.Name) - glog.V(3).Infof("Forget file %s", t) + glog.V(4).Infof("Forget file %s", t) file.wfs.fsNodeCache.DeleteFsNode(t) } func (file *File) maybeLoadEntry(ctx context.Context) error { - if file.entry == nil || file.isOpen <= 0 { - entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name) - if err != nil { - glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) - return err - } - if entry != nil { - file.setEntry(entry) - } + if (file.entry != nil && len(file.entry.HardLinkId) != 0) || file.isOpen > 0 { + return nil + } + entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name) + if err != nil { + glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) + return err + } + if entry != nil { + file.setEntry(entry) } return nil } @@ -239,48 +270,49 @@ func (file *File) maybeLoadEntry(ctx context.Context) error { func (file *File) addChunks(chunks []*filer_pb.FileChunk) { sort.Slice(chunks, func(i, j int) bool { + if chunks[i].Mtime == chunks[j].Mtime { + return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey + } return chunks[i].Mtime < chunks[j].Mtime }) - var newVisibles []filer2.VisibleInterval for _, chunk := range chunks { - newVisibles = filer2.MergeIntoVisibles(file.entryViewCache, newVisibles, chunk) - t := file.entryViewCache[:0] - file.entryViewCache = newVisibles - newVisibles = t + file.entryViewCache = filer.MergeIntoVisibles(file.entryViewCache, chunk) } file.reader = nil - glog.V(3).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks)) + glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks)) file.entry.Chunks = append(file.entry.Chunks, chunks...) } func (file *File) setEntry(entry *filer_pb.Entry) { file.entry = entry - file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks) + file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), file.entry.Chunks) file.reader = nil } func (file *File) saveEntry() error { return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + file.wfs.mapPbIdFromLocalToFiler(file.entry) + defer file.wfs.mapPbIdFromFilerToLocal(file.entry) + request := &filer_pb.UpdateEntryRequest{ - Directory: file.dir.FullPath(), - Entry: file.entry, + Directory: file.dir.FullPath(), + Entry: file.entry, + Signatures: []int32{file.wfs.signature}, } - glog.V(1).Infof("save file entry: %v", request) + glog.V(4).Infof("save file entry: %v", request) _, err := client.UpdateEntry(context.Background(), request) if err != nil { - glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) + glog.Errorf("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) return fuse.EIO } - if file.wfs.option.AsyncMetaDataCaching { - file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + file.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) return nil }) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index c6637259d..54bde3494 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -6,21 +6,24 @@ import ( "io" "math" "net/http" + "os" + "sync" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) type FileHandle struct { // cache file has been written to - dirtyPages *ContinuousDirtyPages - contentType string - dirtyMetadata bool - handle uint64 + dirtyPages *ContinuousDirtyPages + contentType string + handle uint64 + sync.RWMutex f *File RequestId fuse.RequestID // unique ID for request @@ -38,8 +41,9 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle { Gid: gid, } if fh.f.entry != nil { - fh.f.entry.Attributes.FileSize = filer2.TotalSize(fh.f.entry.Chunks) + fh.f.entry.Attributes.FileSize = filer.FileSize(fh.f.entry) } + return fh } @@ -53,61 +57,80 @@ var _ = fs.HandleReleaser(&FileHandle{}) func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size)) + glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data)) + fh.RLock() + defer fh.RUnlock() + + if req.Size <= 0 { + return nil + } - buff := make([]byte, req.Size) + buff := resp.Data[:cap(resp.Data)] + if req.Size > cap(resp.Data) { + // should not happen + buff = make([]byte, req.Size) + } totalRead, err := fh.readFromChunks(buff, req.Offset) if err == nil { - dirtyOffset, dirtySize := fh.readFromDirtyPages(buff, req.Offset) - if totalRead+req.Offset < dirtyOffset+int64(dirtySize) { - totalRead = dirtyOffset + int64(dirtySize) - req.Offset - } + maxStop := fh.readFromDirtyPages(buff, req.Offset) + totalRead = max(maxStop-req.Offset, totalRead) } - resp.Data = buff[:totalRead] + if err == io.EOF { + err = nil + } if err != nil { - glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) + glog.Warningf("file handle read %s %d: %v", fh.f.fullpath(), totalRead, err) return fuse.EIO } + if totalRead > int64(len(buff)) { + glog.Warningf("%s FileHandle Read %d: [%d,%d) size %d totalRead %d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, totalRead) + totalRead = min(int64(len(buff)), totalRead) + } + // resp.Data = buff[:totalRead] + resp.Data = buff + return err } -func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (offset int64, size int) { - return fh.dirtyPages.ReadDirtyData(buff, startOffset) +func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) { + maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset) + return } func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { - // this value should come from the filer instead of the old f - if len(fh.f.entry.Chunks) == 0 { + fileSize := int64(filer.FileSize(fh.f.entry)) + + if fileSize == 0 { glog.V(1).Infof("empty fh %v", fh.f.fullpath()) - return 0, nil + return 0, io.EOF } + var chunkResolveErr error if fh.f.entryViewCache == nil { - fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks) + fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(filer.LookupFn(fh.f.wfs), fh.f.entry.Chunks) + if chunkResolveErr != nil { + return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) + } fh.f.reader = nil } if fh.f.reader == nil { - chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt32) - fh.f.reader = filer2.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache) + chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64) + fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache, fileSize) } totalRead, err := fh.f.reader.ReadAt(buff, offset) - if err == io.EOF { - err = nil - } - - if err != nil { + if err != nil && err != io.EOF { glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) } - // glog.V(0).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err) + glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err) return int64(totalRead), err } @@ -115,119 +138,147 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { // Write to the file handle func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { + fh.Lock() + defer fh.Unlock() + // write the request to volume servers - data := make([]byte, len(req.Data)) - copy(data, req.Data) + data := req.Data + if len(data) <= 512 { + // fuse message cacheable size + data = make([]byte, len(req.Data)) + copy(data, req.Data) + } fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize))) - // glog.V(0).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data))) + glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) - chunks, err := fh.dirtyPages.AddPage(req.Offset, data) - if err != nil { - glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(data)), err) - return fuse.EIO - } + fh.dirtyPages.AddPage(req.Offset, data) resp.Size = len(data) if req.Offset == 0 { // detect mime type fh.contentType = http.DetectContentType(data) - fh.dirtyMetadata = true + fh.f.dirtyMetadata = true } - if len(chunks) > 0 { - - fh.f.addChunks(chunks) - - fh.dirtyMetadata = true - } + fh.f.dirtyMetadata = true return nil } func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error { - glog.V(4).Infof("%v release fh %d", fh.f.fullpath(), fh.handle) + glog.V(4).Infof("Release %v fh %d", fh.f.fullpath(), fh.handle) + + fh.Lock() + defer fh.Unlock() fh.f.isOpen-- - if fh.f.isOpen <= 0 { - fh.dirtyPages.releaseResource() + if fh.f.isOpen < 0 { + glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0) + fh.f.isOpen = 0 + return nil + } + + if fh.f.isOpen == 0 { + if err := fh.doFlush(ctx, req.Header); err != nil { + glog.Errorf("Release doFlush %s: %v", fh.f.Name, err) + } fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) } - fh.f.entryViewCache = nil - fh.f.reader = nil + + // stop the goroutine + if !fh.dirtyPages.chunkSaveErrChanClosed { + fh.dirtyPages.chunkSaveErrChanClosed = true + close(fh.dirtyPages.chunkSaveErrChan) + } return nil } func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { - // fflush works at fh level + + fh.Lock() + defer fh.Unlock() + + return fh.doFlush(ctx, req.Header) +} + +func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { + // flush works at fh level // send the data to the OS - glog.V(4).Infof("%s fh %d flush %v", fh.f.fullpath(), fh.handle, req) + glog.V(4).Infof("doFlush %s fh %d", fh.f.fullpath(), fh.handle) - chunks, err := fh.dirtyPages.FlushToStorage() - if err != nil { - glog.Errorf("flush %s: %v", fh.f.fullpath(), err) - return fuse.EIO - } + fh.dirtyPages.saveExistingPagesToStorage() + + fh.dirtyPages.writeWaitGroup.Wait() - if len(chunks) > 0 { - fh.f.addChunks(chunks) - fh.dirtyMetadata = true + if fh.dirtyPages.lastErr != nil { + return fh.dirtyPages.lastErr } - if !fh.dirtyMetadata { + if !fh.f.dirtyMetadata { return nil } - err = fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { if fh.f.entry.Attributes != nil { fh.f.entry.Attributes.Mime = fh.contentType - fh.f.entry.Attributes.Uid = req.Uid - fh.f.entry.Attributes.Gid = req.Gid + if fh.f.entry.Attributes.Uid == 0 { + fh.f.entry.Attributes.Uid = header.Uid + } + if fh.f.entry.Attributes.Gid == 0 { + fh.f.entry.Attributes.Gid = header.Gid + } + if fh.f.entry.Attributes.Crtime == 0 { + fh.f.entry.Attributes.Crtime = time.Now().Unix() + } fh.f.entry.Attributes.Mtime = time.Now().Unix() - fh.f.entry.Attributes.Crtime = time.Now().Unix() - fh.f.entry.Attributes.FileMode = uint32(0666 &^ fh.f.wfs.option.Umask) + fh.f.entry.Attributes.FileMode = uint32(os.FileMode(fh.f.entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask) fh.f.entry.Attributes.Collection = fh.dirtyPages.collection fh.f.entry.Attributes.Replication = fh.dirtyPages.replication } request := &filer_pb.CreateEntryRequest{ - Directory: fh.f.dir.FullPath(), - Entry: fh.f.entry, + Directory: fh.f.dir.FullPath(), + Entry: fh.f.entry, + Signatures: []int32{fh.f.wfs.signature}, } - glog.V(3).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks)) + glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks)) for i, chunk := range fh.f.entry.Chunks { - glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) + glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) } - chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks) - fh.f.entry.Chunks = chunks - // fh.f.entryViewCache = nil + manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks) + + chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks) + chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks) + if manifestErr != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", manifestErr) + } + fh.f.entry.Chunks = append(chunks, manifestChunks...) + fh.f.entryViewCache = nil + + fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry) + defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry) if err := filer_pb.CreateEntry(client, request); err != nil { glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) } - if fh.f.wfs.option.AsyncMetaDataCaching { - fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } - - fh.f.wfs.deleteFileChunks(garbages) - for i, chunk := range garbages { - glog.V(3).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) - } + fh.f.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) return nil }) if err == nil { - fh.dirtyMetadata = false + fh.f.dirtyMetadata = false } if err != nil { diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go index b146f0615..fdec8253c 100644 --- a/weed/filesys/fscache.go +++ b/weed/filesys/fscache.go @@ -3,8 +3,9 @@ package filesys import ( "sync" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse/fs" + + "github.com/chrislusf/seaweedfs/weed/util" ) type FsCache struct { @@ -118,7 +119,6 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { target = target.ensureChild(p) } parent := target.parent - src.name = target.name if dir, ok := src.node.(*Dir); ok { dir.name = target.name // target is not Dir, but a shortcut } @@ -132,6 +132,7 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { target.deleteSelf() + src.name = target.name src.connectToParent(parent) return src @@ -144,10 +145,14 @@ func (n *FsNode) connectToParent(parent *FsNode) { oldNode.deleteSelf() } if dir, ok := n.node.(*Dir); ok { - dir.parent = parent.node.(*Dir) + if parent.node != nil { + dir.parent = parent.node.(*Dir) + } } if f, ok := n.node.(*File); ok { - f.dir = parent.node.(*Dir) + if parent.node != nil { + f.dir = parent.node.(*Dir) + } } n.childrenLock.Lock() parent.children[n.name] = n diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go index 67f9aacc8..1152eb32e 100644 --- a/weed/filesys/fscache_test.go +++ b/weed/filesys/fscache_test.go @@ -94,3 +94,22 @@ func TestFsCacheMove(t *testing.T) { } } + +func TestFsCacheMove2(t *testing.T) { + + cache := newFsCache(nil) + + cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) + cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) + + cache.Move(util.FullPath("/a/b/d"), util.FullPath("/a/b/e")) + + d := cache.GetFsNode(util.FullPath("/a/b/e")) + if d == nil { + t.Errorf("unexpected nil node!") + } + if d.(*File).Name != "e" { + t.Errorf("unexpected node!") + } + +} diff --git a/weed/filesys/meta_cache/id_mapper.go b/weed/filesys/meta_cache/id_mapper.go new file mode 100644 index 000000000..4a2179f31 --- /dev/null +++ b/weed/filesys/meta_cache/id_mapper.go @@ -0,0 +1,101 @@ +package meta_cache + +import ( + "fmt" + "strconv" + "strings" +) + +type UidGidMapper struct { + uidMapper *IdMapper + gidMapper *IdMapper +} + +type IdMapper struct { + localToFiler map[uint32]uint32 + filerToLocal map[uint32]uint32 +} + +// UidGidMapper translates local uid/gid to filer uid/gid +// The local storage always persists the same as the filer. +// The local->filer translation happens when updating the filer first and later saving to meta_cache. +// And filer->local happens when reading from the meta_cache. +func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) { + uidMapper, err := newIdMapper(uidPairsStr) + if err != nil { + return nil, err + } + gidMapper, err := newIdMapper(gidPairStr) + if err != nil { + return nil, err + } + + return &UidGidMapper{ + uidMapper: uidMapper, + gidMapper: gidMapper, + }, nil +} + +func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) { + return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid) +} +func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) { + return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid) +} + +func (m *IdMapper) LocalToFiler(id uint32) uint32 { + value, found := m.localToFiler[id] + if found { + return value + } + return id +} +func (m *IdMapper) FilerToLocal(id uint32) uint32 { + value, found := m.filerToLocal[id] + if found { + return value + } + return id +} + +func newIdMapper(pairsStr string) (*IdMapper, error) { + + localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr) + if err != nil { + return nil, err + } + + return &IdMapper{ + localToFiler: localToFiler, + filerToLocal: filerToLocal, + }, nil + +} + +func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) { + + if pairsStr == "" { + return + } + + localToFiler = make(map[uint32]uint32) + filerToLocal = make(map[uint32]uint32) + for _, pairStr := range strings.Split(pairsStr, ",") { + pair := strings.Split(pairStr, ":") + localUidStr, filerUidStr := pair[0], pair[1] + localUid, localUidErr := strconv.Atoi(localUidStr) + if localUidErr != nil { + err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr) + return + } + filerUid, filerUidErr := strconv.Atoi(filerUidStr) + if filerUidErr != nil { + err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr) + return + } + localToFiler[uint32(localUid)] = uint32(filerUid) + filerToLocal[uint32(filerUid)] = uint32(localUid) + } + + return +} diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index 4c9090d42..4b282253d 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -2,27 +2,38 @@ package meta_cache import ( "context" + "fmt" "os" "sync" - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/filer/leveldb" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/bounded_tree" ) +// need to have logic similar to FilerStoreWrapper +// e.g. fill fileId field for chunks + type MetaCache struct { - actualStore filer2.FilerStore + localStore filer.VirtualFilerStore sync.RWMutex + visitedBoundary *bounded_tree.BoundedTree + uidGidMapper *UidGidMapper + invalidateFunc func(util.FullPath) } -func NewMetaCache(dbFolder string) *MetaCache { +func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper, invalidateFunc func(util.FullPath)) *MetaCache { return &MetaCache{ - actualStore: openMetaStore(dbFolder), + localStore: openMetaStore(dbFolder), + visitedBoundary: bounded_tree.NewBoundedTree(baseDir), + uidGidMapper: uidGidMapper, + invalidateFunc: invalidateFunc, } } -func openMetaStore(dbFolder string) filer2.FilerStore { +func openMetaStore(dbFolder string) filer.VirtualFilerStore { os.RemoveAll(dbFolder) os.MkdirAll(dbFolder, 0755) @@ -36,58 +47,100 @@ func openMetaStore(dbFolder string) filer2.FilerStore { glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err) } - return store + return filer.NewFilerStoreWrapper(store) } -func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error { +func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error { mc.Lock() defer mc.Unlock() - return mc.actualStore.InsertEntry(ctx, entry) + return mc.doInsertEntry(ctx, entry) +} + +func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error { + return mc.localStore.InsertEntry(ctx, entry) } -func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPath, newEntry *filer2.Entry) error { +func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error { mc.Lock() defer mc.Unlock() - if oldPath != "" { - if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil { - return err + + oldDir, _ := oldPath.DirAndName() + if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) { + if oldPath != "" { + if newEntry != nil && oldPath == newEntry.FullPath { + // skip the unnecessary deletion + // leave the update to the following InsertEntry operation + } else { + glog.V(3).Infof("DeleteEntry %s/%s", oldPath, oldPath.Name()) + if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil { + return err + } + } } + } else { + // println("unknown old directory:", oldDir) } + if newEntry != nil { - if err := mc.actualStore.InsertEntry(ctx, newEntry); err != nil { - return err + newDir, _ := newEntry.DirAndName() + if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) { + glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name()) + if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil { + return err + } } } return nil } -func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer2.Entry) error { +func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error { mc.Lock() defer mc.Unlock() - return mc.actualStore.UpdateEntry(ctx, entry) + return mc.localStore.UpdateEntry(ctx, entry) } -func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer2.Entry, err error) { +func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) { mc.RLock() defer mc.RUnlock() - return mc.actualStore.FindEntry(ctx, fp) + entry, err = mc.localStore.FindEntry(ctx, fp) + if err != nil { + return nil, err + } + mc.mapIdFromFilerToLocal(entry) + return } func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { mc.Lock() defer mc.Unlock() - return mc.actualStore.DeleteEntry(ctx, fp) + return mc.localStore.DeleteEntry(ctx, fp) } -func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer2.Entry, error) { +func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer.Entry, error) { mc.RLock() defer mc.RUnlock() - return mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) + + if !mc.visitedBoundary.HasVisited(dirPath) { + return nil, fmt.Errorf("unsynchronized dir: %v", dirPath) + } + + entries, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) + if err != nil { + return nil, err + } + for _, entry := range entries { + mc.mapIdFromFilerToLocal(entry) + } + return entries, err } func (mc *MetaCache) Shutdown() { mc.Lock() defer mc.Unlock() - mc.actualStore.Shutdown() + mc.localStore.Shutdown() +} + +func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) { + entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid) } diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go index 58bf6862e..f42d61230 100644 --- a/weed/filesys/meta_cache/meta_cache_init.go +++ b/weed/filesys/meta_cache/meta_cache_init.go @@ -2,20 +2,45 @@ package meta_cache import ( "context" + "fmt" + "strings" + "time" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) -func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient, path string) error { - glog.V(0).Infof("synchronizing meta data ...") - filer_pb.TraverseBfs(client, util.FullPath(path), func(parentPath util.FullPath, pbEntry *filer_pb.Entry) { - entry := filer2.FromPbEntry(string(parentPath), pbEntry) - if err := mc.InsertEntry(context.Background(), entry); err != nil { - glog.V(0).Infof("read %s: %v", entry.FullPath, err) +func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error { + + return mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) { + + glog.V(4).Infof("ReadDirAllEntries %s ...", path) + + for waitTime := time.Second; waitTime < filer.ReadWaitTime; waitTime += waitTime / 2 { + err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error { + entry := filer.FromPbEntry(string(dirPath), pbEntry) + if err := mc.doInsertEntry(context.Background(), entry); err != nil { + glog.V(0).Infof("read %s: %v", entry.FullPath, err) + return err + } + if entry.IsDirectory() { + childDirectories = append(childDirectories, entry.Name()) + } + return nil + }) + if err == nil { + break + } + if strings.Contains(err.Error(), "transport: ") { + glog.V(0).Infof("ReadDirAllEntries %s: %v. Retry in %v", path, err, waitTime) + time.Sleep(waitTime) + continue + } + err = fmt.Errorf("list %s: %v", dirPath, err) + break } + return }) - return nil } diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go index 2e411a48a..f9973f436 100644 --- a/weed/filesys/meta_cache/meta_cache_subscribe.go +++ b/weed/filesys/meta_cache/meta_cache_subscribe.go @@ -6,41 +6,58 @@ import ( "io" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) -func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string, lastTsNs int64) error { +func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error { processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification + + for _, sig := range message.Signatures { + if sig == selfSignature && selfSignature != 0 { + return nil + } + } + + dir := resp.Directory var oldPath util.FullPath - var newEntry *filer2.Entry + var newEntry *filer.Entry if message.OldEntry != nil { - oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name) + oldPath = util.NewFullPath(dir, message.OldEntry.Name) glog.V(4).Infof("deleting %v", oldPath) } if message.NewEntry != nil { - dir := resp.Directory if message.NewParentPath != "" { dir = message.NewParentPath } key := util.NewFullPath(dir, message.NewEntry.Name) glog.V(4).Infof("creating %v", key) - newEntry = filer2.FromPbEntry(dir, message.NewEntry) + newEntry = filer.FromPbEntry(dir, message.NewEntry) } - return mc.AtomicUpdateEntry(context.Background(), oldPath, newEntry) + err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry) + if err == nil && message.OldEntry != nil && message.NewEntry != nil { + key := util.NewFullPath(dir, message.NewEntry.Name) + mc.invalidateFunc(key) + } + + return err + } for { err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ ClientName: "mount", PathPrefix: dir, SinceNs: lastTsNs, + Signature: selfSignature, }) if err != nil { return fmt.Errorf("subscribe: %v", err) @@ -56,14 +73,14 @@ func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string, } if err := processEventFn(resp); err != nil { - return fmt.Errorf("process %v: %v", resp, err) + glog.Fatalf("process %v: %v", resp, err) } lastTsNs = resp.TsNs } }) if err != nil { - glog.V(0).Infof("subscribing filer meta change: %v", err) - time.Sleep(time.Second) + glog.Errorf("subscribing filer meta change: %v", err) } + time.Sleep(time.Second) } } diff --git a/weed/filesys/unimplemented.go b/weed/filesys/unimplemented.go new file mode 100644 index 000000000..5c2dcf0e1 --- /dev/null +++ b/weed/filesys/unimplemented.go @@ -0,0 +1,22 @@ +package filesys + +import ( + "context" + + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" +) + +// https://github.com/bazil/fuse/issues/130 + +var _ = fs.NodeAccesser(&Dir{}) + +func (dir *Dir) Access(ctx context.Context, req *fuse.AccessRequest) error { + return fuse.ENOSYS +} + +var _ = fs.NodeAccesser(&File{}) + +func (file *File) Access(ctx context.Context, req *fuse.AccessRequest) error { + return fuse.ENOSYS +} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 2b0ef64c2..759e21b15 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -6,22 +6,21 @@ import ( "math" "os" "path" - "strings" "sync" "time" - "github.com/chrislusf/seaweedfs/weed/util/grace" - "github.com/karlseguin/ccache" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/util/grace" + + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" ) type Option struct { @@ -35,7 +34,6 @@ type Option struct { CacheDir string CacheSizeMB int64 DataCenter string - DirListCacheLimit int64 EntryCacheTtl time.Duration Umask os.FileMode @@ -47,16 +45,14 @@ type Option struct { OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers Cipher bool // whether encrypt data on volume server - AsyncMetaDataCaching bool // whether asynchronously cache meta data - + UidGidMapper *meta_cache.UidGidMapper } var _ = fs.FS(&WFS{}) var _ = fs.FSStatfser(&WFS{}) type WFS struct { - option *Option - listDirectoryEntriesCache *ccache.Cache + option *Option // contains all open handles, protected by handlesLock handlesLock sync.Mutex @@ -69,8 +65,9 @@ type WFS struct { root fs.Node fsNodeCache *FsCache - chunkCache *chunk_cache.ChunkCache + chunkCache *chunk_cache.TieredChunkCache metaCache *meta_cache.MetaCache + signature int32 } type statsCache struct { filer_pb.StatisticsResponse @@ -79,36 +76,38 @@ type statsCache struct { func NewSeaweedFileSystem(option *Option) *WFS { wfs := &WFS{ - option: option, - listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)), - handles: make(map[uint64]*FileHandle), + option: option, + handles: make(map[uint64]*FileHandle), bufPool: sync.Pool{ New: func() interface{} { return make([]byte, option.ChunkSizeLimit) }, }, + signature: util.RandomInt32(), } + cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4] + cacheDir := path.Join(option.CacheDir, cacheUniqueId) if option.CacheSizeMB > 0 { - os.MkdirAll(option.CacheDir, 0755) - wfs.chunkCache = chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB) - grace.OnInterrupt(func() { - wfs.chunkCache.Shutdown() - }) - } - if wfs.option.AsyncMetaDataCaching { - wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta")) - startTime := time.Now() - if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil { - glog.V(0).Infof("failed to init meta cache: %v", err) - } else { - go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) - grace.OnInterrupt(func() { - wfs.metaCache.Shutdown() - }) - } + os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask) + wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024) } - wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} + wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) { + fsNode := wfs.fsNodeCache.GetFsNode(filePath) + if fsNode != nil { + if file, ok := fsNode.(*File); ok { + file.entry = nil + } + } + }) + startTime := time.Now() + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) + grace.OnInterrupt(func() { + wfs.metaCache.Shutdown() + }) + + entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath)) + wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry} wfs.fsNodeCache = newFsCache(wfs.root) return wfs @@ -118,40 +117,29 @@ func (wfs *WFS) Root() (fs.Node, error) { return wfs.root, nil } -var _ = filer_pb.FilerClient(&WFS{}) - -func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - - err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) - }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) - - if err == nil { - return nil - } - return err - -} - func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) { fullpath := file.fullpath() - glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid) + glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid) wfs.handlesLock.Lock() defer wfs.handlesLock.Unlock() inodeId := file.fullpath().AsInode() - existingHandle, found := wfs.handles[inodeId] - if found && existingHandle != nil { - return existingHandle + if file.isOpen > 0 { + existingHandle, found := wfs.handles[inodeId] + if found && existingHandle != nil { + file.isOpen++ + return existingHandle + } } fileHandle = newFileHandle(file, uid, gid) + file.maybeLoadEntry(context.Background()) + file.isOpen++ + wfs.handles[inodeId] = fileHandle fileHandle.handle = inodeId - glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle) return } @@ -229,33 +217,15 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. return nil } -func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry { - item := wfs.listDirectoryEntriesCache.Get(string(path)) - if item != nil && !item.Expired() { - return item.Value().(*filer_pb.Entry) +func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) { + if entry.Attributes == nil { + return } - return nil + entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid) } -func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) { - if entry == nil { - wfs.listDirectoryEntriesCache.Delete(string(path)) - } else { - wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl) +func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) { + if entry.Attributes == nil { + return } -} -func (wfs *WFS) cacheDelete(path util.FullPath) { - wfs.listDirectoryEntriesCache.Delete(string(path)) -} - -func (wfs *WFS) AdjustedUrl(hostAndPort string) string { - if !wfs.option.OutsideContainerClusterMode { - return hostAndPort - } - commaIndex := strings.Index(hostAndPort, ":") - if commaIndex < 0 { - return hostAndPort - } - filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":") - return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:]) - + entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid) } diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index bf21b1808..a245b6795 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -5,7 +5,7 @@ import ( "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -18,6 +18,17 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { var fileIds []string for _, chunk := range chunks { + if !chunk.IsChunkManifest { + fileIds = append(fileIds, chunk.GetFileIdString()) + continue + } + dataChunks, manifestResolveErr := filer.ResolveOneChunkManifest(filer.LookupFn(wfs), chunk) + if manifestResolveErr != nil { + glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) + } + for _, dChunk := range dataChunks { + fileIds = append(fileIds, dChunk.GetFileIdString()) + } fileIds = append(fileIds, chunk.GetFileIdString()) } @@ -31,14 +42,14 @@ func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.Se var vids []string for _, fileId := range fileIds { - vids = append(vids, filer2.VolumeId(fileId)) + vids = append(vids, filer.VolumeId(fileId)) } lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { m := make(map[string]operation.LookupResult) - glog.V(4).Infof("remove file lookup volume id locations: %v", vids) + glog.V(4).Infof("deleteFileIds lookup volume id locations: %v", vids) resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: vids, }) @@ -57,7 +68,7 @@ func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.Se } for _, loc := range locations.Locations { lr.Locations = append(lr.Locations, operation.Location{ - Url: wfs.AdjustedUrl(loc.Url), + Url: wfs.AdjustedUrl(loc), PublicUrl: loc.PublicUrl, }) } diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go new file mode 100644 index 000000000..096ee555f --- /dev/null +++ b/weed/filesys/wfs_filer_client.go @@ -0,0 +1,31 @@ +package filesys + +import ( + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +var _ = filer_pb.FilerClient(&WFS{}) + +func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) + + if err == nil { + return nil + } + return err + +} + +func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string { + if wfs.option.OutsideContainerClusterMode { + return location.PublicUrl + } + return location.Url +} diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go new file mode 100644 index 000000000..83e40e7f5 --- /dev/null +++ b/weed/filesys/wfs_write.go @@ -0,0 +1,71 @@ +package filesys + +import ( + "context" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType { + + return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { + var fileId, host string + var auth security.EncodedJwt + + if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: wfs.option.Replication, + Collection: wfs.option.Collection, + TtlSec: wfs.option.TtlSec, + DataCenter: wfs.option.DataCenter, + Path: string(fullPath), + } + + resp, err := client.AssignVolume(context.Background(), request) + if err != nil { + glog.V(0).Infof("assign volume failure %v: %v", request, err) + return err + } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } + + fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth) + loc := &filer_pb.Location{ + Url: resp.Url, + PublicUrl: resp.PublicUrl, + } + host = wfs.AdjustedUrl(loc) + collection, replication = resp.Collection, resp.Replication + + return nil + }); err != nil { + return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err) + } + + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth) + if err != nil { + glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) + return nil, "", "", fmt.Errorf("upload data: %v", err) + } + if uploadResult.Error != "" { + glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err) + return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error) + } + + wfs.chunkCache.SetChunk(fileId, data) + + chunk = uploadResult.ToPbFileChunk(fileId, offset) + return chunk, collection, replication, nil + } +} diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go index 7e7b8c60b..92e43b675 100644 --- a/weed/filesys/xattr.go +++ b/weed/filesys/xattr.go @@ -3,10 +3,11 @@ package filesys import ( "context" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/seaweedfs/fuse" + + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/seaweedfs/fuse" ) func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { @@ -110,43 +111,13 @@ func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.Lis func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) { fullpath := util.NewFullPath(dir, name) - entry = wfs.cacheGet(fullpath) - if entry != nil { - return - } // glog.V(3).Infof("read entry cache miss %s", fullpath) // read from async meta cache - if wfs.option.AsyncMetaDataCaching { - cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) - if cacheErr == filer_pb.ErrNotFound { - return nil, fuse.ENOENT - } - return cachedEntry.ToProtoEntry(), nil + meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir)) + cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT } - - err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Name: name, - Directory: dir, - } - - resp, err := filer_pb.LookupEntry(client, request) - if err != nil { - if err == filer_pb.ErrNotFound { - glog.V(3).Infof("file attr read not found file %v: %v", request, err) - return fuse.ENOENT - } - glog.V(3).Infof("attr read %v: %v", request, err) - return fuse.EIO - } - - entry = resp.Entry - wfs.cacheSet(fullpath, entry, wfs.option.EntryCacheTtl) - - return nil - }) - - return + return cachedEntry.ToProtoEntry(), cacheErr } |
