diff options
| -rw-r--r-- | k8s/seaweedfs/Chart.yaml | 2 | ||||
| -rw-r--r-- | k8s/seaweedfs/values.yaml | 2 | ||||
| -rw-r--r-- | weed/command/mount.go | 2 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 1 | ||||
| -rw-r--r-- | weed/command/server.go | 3 | ||||
| -rw-r--r-- | weed/command/volume.go | 23 | ||||
| -rw-r--r-- | weed/filesys/dir.go | 90 | ||||
| -rw-r--r-- | weed/filesys/dir_link.go | 4 | ||||
| -rw-r--r-- | weed/filesys/dir_rename.go | 9 | ||||
| -rw-r--r-- | weed/filesys/file.go | 11 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 9 | ||||
| -rw-r--r-- | weed/filesys/fscache.go | 207 | ||||
| -rw-r--r-- | weed/filesys/fscache_test.go | 96 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 52 | ||||
| -rw-r--r-- | weed/filesys/xattr.go | 42 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_rename.go | 4 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 4 | ||||
| -rw-r--r-- | weed/util/constants.go | 2 |
18 files changed, 84 insertions, 479 deletions
diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml index 528d1be41..4495526bf 100644 --- a/k8s/seaweedfs/Chart.yaml +++ b/k8s/seaweedfs/Chart.yaml @@ -1,4 +1,4 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -version: 1.80 +version: 1.81 diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index 9c8ed42f3..fc416d4ce 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - imageTag: "1.80" + imageTag: "1.81" imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/weed/command/mount.go b/weed/command/mount.go index 21c8e7744..efa4650ab 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -20,7 +20,6 @@ type MountOptions struct { umaskString *string nonempty *bool outsideContainerClusterMode *bool - asyncMetaDataCaching *bool } var ( @@ -48,7 +47,6 @@ func init() { mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system") - mountOptions.asyncMetaDataCaching = cmdMount.Flag.Bool("asyncMetaDataCaching", true, "async meta data caching. this feature will be permanent and this option will be removed.") } var cmdMount = &Command{ diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index c95626651..b6715fc30 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -167,7 +167,6 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { MountMtime: time.Now(), Umask: umask, OutsideContainerClusterMode: *mountOptions.outsideContainerClusterMode, - AsyncMetaDataCaching: *mountOptions.asyncMetaDataCaching, Cipher: cipher, }) diff --git a/weed/command/server.go b/weed/command/server.go index da74f4760..0af583a7f 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -63,6 +63,8 @@ var ( isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker") serverWhiteList []string + + False = false ) func init() { @@ -95,6 +97,7 @@ func init() { serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") + serverOptions.v.pprof = &False s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name, {bucket}.{domainName}") diff --git a/weed/command/volume.go b/weed/command/volume.go index 89cf930f2..d0fdd2ed1 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -3,6 +3,7 @@ package command import ( "fmt" "net/http" + httppprof "net/http/pprof" "os" "runtime" "runtime/pprof" @@ -10,10 +11,11 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/spf13/viper" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/util/grace" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util/httpdown" @@ -40,7 +42,6 @@ type VolumeServerOptions struct { publicUrl *string bindIp *string masters *string - // pulseSeconds *int idleConnectionTimeout *int dataCenter *string rack *string @@ -53,6 +54,8 @@ type VolumeServerOptions struct { compactionMBPerSecond *int fileSizeLimitMB *int minFreeSpacePercent []float32 + pprof *bool + // pulseSeconds *int } func init() { @@ -74,6 +77,7 @@ func init() { v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") + v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") } var cmdVolume = &Command{ @@ -96,7 +100,12 @@ func runVolume(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) runtime.GOMAXPROCS(runtime.NumCPU()) - grace.SetupProfiling(*v.cpuProfile, *v.memProfile) + + // If --pprof is set we assume the caller wants to be able to collect + // cpu and memory profiles via go tool pprof + if !*v.pprof { + grace.SetupProfiling(*v.cpuProfile, *v.memProfile) + } v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption, *minFreeSpacePercent) @@ -157,6 +166,14 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v publicVolumeMux = http.NewServeMux() } + if *v.pprof { + volumeMux.HandleFunc("/debug/pprof/", httppprof.Index) + volumeMux.HandleFunc("/debug/pprof/cmdline", httppprof.Cmdline) + volumeMux.HandleFunc("/debug/pprof/profile", httppprof.Profile) + volumeMux.HandleFunc("/debug/pprof/symbol", httppprof.Symbol) + volumeMux.HandleFunc("/debug/pprof/trace", httppprof.Trace) + } + volumeNeedleMapKind := storage.NeedleMapInMemory switch *v.indexType { case "leveldb": diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index e4260d56f..4e164726d 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -89,22 +89,18 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { } func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { - return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node { - return &File{ - Name: name, - dir: dir, - wfs: dir.wfs, - entry: entry, - entryViewCache: nil, - } - }) + return &File{ + Name: name, + dir: dir, + wfs: dir.wfs, + entry: entry, + entryViewCache: nil, + } } func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node { - return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node { - return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir} - }) + return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir} } @@ -139,9 +135,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, return fuse.EIO } - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) return nil }); err != nil { @@ -190,9 +184,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err return err } - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) return nil }) @@ -213,15 +205,12 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String()) fullFilePath := util.NewFullPath(dir.FullPath(), req.Name) - entry := dir.wfs.cacheGet(fullFilePath) - if dir.wfs.option.AsyncMetaDataCaching { - cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) - if cacheErr == filer_pb.ErrNotFound { - return nil, fuse.ENOENT - } - entry = cachedEntry.ToProtoEntry() + cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT } + entry := cachedEntry.ToProtoEntry() if entry == nil { // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) @@ -230,7 +219,6 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err) return nil, fuse.ENOENT } - dir.wfs.cacheSet(fullFilePath, entry, 5*time.Minute) } else { glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath) } @@ -262,7 +250,6 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath()) - cacheTtl := 5 * time.Minute processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error { fullpath := util.NewFullPath(dir.FullPath(), entry.Name) inode := fullpath.AsInode() @@ -273,29 +260,19 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_File} ret = append(ret, dirent) } - dir.wfs.cacheSet(fullpath, entry, cacheTtl) return nil } - if dir.wfs.option.AsyncMetaDataCaching { - listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit)) - if listErr != nil { - glog.Errorf("list meta cache: %v", listErr) - return nil, fuse.EIO - } - for _, cachedEntry := range listedEntries { - processEachEntryFn(cachedEntry.ToProtoEntry(), false) - } - return + listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit)) + if listErr != nil { + glog.Errorf("list meta cache: %v", listErr) + return nil, fuse.EIO } - - readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", processEachEntryFn) - if readErr != nil { - glog.V(0).Infof("list %s: %v", dir.FullPath(), err) - return ret, fuse.EIO + for _, cachedEntry := range listedEntries { + processEachEntryFn(cachedEntry.ToProtoEntry(), false) } + return - return ret, err } func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { @@ -321,12 +298,7 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { dir.wfs.deleteFileChunks(entry.Chunks) - dir.wfs.cacheDelete(filePath) - dir.wfs.fsNodeCache.DeleteFsNode(filePath) - - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) - } + dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) glog.V(3).Infof("remove file: %v", req) err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false) @@ -342,12 +314,8 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { t := util.NewFullPath(dir.FullPath(), req.Name) - dir.wfs.cacheDelete(t) - dir.wfs.fsNodeCache.DeleteFsNode(t) - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.DeleteEntry(context.Background(), t) - } + dir.wfs.metaCache.DeleteEntry(context.Background(), t) glog.V(3).Infof("remove directory entry: %v", req) err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false) @@ -384,8 +352,6 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus dir.entry.Attributes.Mtime = req.Mtime.Unix() } - dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) - return dir.saveEntry() } @@ -402,8 +368,6 @@ func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { return err } - dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) - return dir.saveEntry() } @@ -420,8 +384,6 @@ func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) e return err } - dir.wfs.cacheDelete(util.FullPath(dir.FullPath())) - return dir.saveEntry() } @@ -444,8 +406,6 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp func (dir *Dir) Forget() { glog.V(3).Infof("Forget dir %s", dir.FullPath()) - - dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath())) } func (dir *Dir) maybeLoadEntry() error { @@ -478,9 +438,7 @@ func (dir *Dir) saveEntry() error { return fuse.EIO } - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) return nil }) diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index d1858e99b..4990e743c 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -42,9 +42,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, return fuse.EIO } - if dir.wfs.option.AsyncMetaDataCaching { - dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) return nil }) diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index ea40f5c31..92d667c57 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -38,14 +38,5 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector }) - if err == nil { - dir.wfs.cacheDelete(newPath) - dir.wfs.cacheDelete(oldPath) - - // fmt.Printf("rename path: %v => %v\n", oldPath, newPath) - dir.wfs.fsNodeCache.Move(oldPath, newPath) - - } - return err } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index bafbd7cc8..2932d4910 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -150,8 +150,6 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f return nil } - file.wfs.cacheDelete(file.fullpath()) - return file.saveEntry() } @@ -168,8 +166,6 @@ func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error return err } - file.wfs.cacheDelete(file.fullpath()) - return file.saveEntry() } @@ -186,8 +182,6 @@ func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) return err } - file.wfs.cacheDelete(file.fullpath()) - return file.saveEntry() } @@ -219,7 +213,6 @@ func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { func (file *File) Forget() { t := util.NewFullPath(file.dir.FullPath(), file.Name) glog.V(3).Infof("Forget file %s", t) - file.wfs.fsNodeCache.DeleteFsNode(t) } func (file *File) maybeLoadEntry(ctx context.Context) error { @@ -278,9 +271,7 @@ func (file *File) saveEntry() error { return fuse.EIO } - if file.wfs.option.AsyncMetaDataCaching { - file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) return nil }) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 372d742ea..3386256f9 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -3,6 +3,7 @@ package filesys import ( "context" "fmt" + "io" "math" "net/http" "time" @@ -98,6 +99,10 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { totalRead, err := fh.f.reader.ReadAt(buff, offset) + if err == io.EOF { + err = nil + } + if err != nil { glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) } @@ -209,9 +214,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) } - if fh.f.wfs.option.AsyncMetaDataCaching { - fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - } + fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) fh.f.wfs.deleteFileChunks(garbages) for i, chunk := range garbages { diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go deleted file mode 100644 index b146f0615..000000000 --- a/weed/filesys/fscache.go +++ /dev/null @@ -1,207 +0,0 @@ -package filesys - -import ( - "sync" - - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/seaweedfs/fuse/fs" -) - -type FsCache struct { - root *FsNode - sync.RWMutex -} -type FsNode struct { - parent *FsNode - node fs.Node - name string - childrenLock sync.RWMutex - children map[string]*FsNode -} - -func newFsCache(root fs.Node) *FsCache { - return &FsCache{ - root: &FsNode{ - node: root, - }, - } -} - -func (c *FsCache) GetFsNode(path util.FullPath) fs.Node { - - c.RLock() - defer c.RUnlock() - - return c.doGetFsNode(path) -} - -func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node { - t := c.root - for _, p := range path.Split() { - t = t.findChild(p) - if t == nil { - return nil - } - } - return t.node -} - -func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) { - - c.Lock() - defer c.Unlock() - - c.doSetFsNode(path, node) -} - -func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) { - t := c.root - for _, p := range path.Split() { - t = t.ensureChild(p) - } - t.node = node -} - -func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node { - - c.Lock() - defer c.Unlock() - - t := c.doGetFsNode(path) - if t != nil { - return t - } - t = genNodeFn() - c.doSetFsNode(path, t) - return t -} - -func (c *FsCache) DeleteFsNode(path util.FullPath) { - - c.Lock() - defer c.Unlock() - - t := c.root - for _, p := range path.Split() { - t = t.findChild(p) - if t == nil { - return - } - } - if t.parent != nil { - t.parent.disconnectChild(t) - } - t.deleteSelf() -} - -// oldPath and newPath are full path including the new name -func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { - - c.Lock() - defer c.Unlock() - - // find old node - src := c.root - for _, p := range oldPath.Split() { - src = src.findChild(p) - if src == nil { - return src - } - } - if src.parent != nil { - src.parent.disconnectChild(src) - } - - // find new node - target := c.root - for _, p := range newPath.Split() { - target = target.ensureChild(p) - } - parent := target.parent - src.name = target.name - if dir, ok := src.node.(*Dir); ok { - dir.name = target.name // target is not Dir, but a shortcut - } - if f, ok := src.node.(*File); ok { - f.Name = target.name - if f.entry != nil { - f.entry.Name = f.Name - } - } - parent.disconnectChild(target) - - target.deleteSelf() - - src.connectToParent(parent) - - return src -} - -func (n *FsNode) connectToParent(parent *FsNode) { - n.parent = parent - oldNode := parent.findChild(n.name) - if oldNode != nil { - oldNode.deleteSelf() - } - if dir, ok := n.node.(*Dir); ok { - dir.parent = parent.node.(*Dir) - } - if f, ok := n.node.(*File); ok { - f.dir = parent.node.(*Dir) - } - n.childrenLock.Lock() - parent.children[n.name] = n - n.childrenLock.Unlock() -} - -func (n *FsNode) findChild(name string) *FsNode { - n.childrenLock.RLock() - defer n.childrenLock.RUnlock() - - child, found := n.children[name] - if found { - return child - } - return nil -} - -func (n *FsNode) ensureChild(name string) *FsNode { - n.childrenLock.Lock() - defer n.childrenLock.Unlock() - - if n.children == nil { - n.children = make(map[string]*FsNode) - } - child, found := n.children[name] - if found { - return child - } - t := &FsNode{ - parent: n, - node: nil, - name: name, - children: nil, - } - n.children[name] = t - return t -} - -func (n *FsNode) disconnectChild(child *FsNode) { - n.childrenLock.Lock() - delete(n.children, child.name) - n.childrenLock.Unlock() - child.parent = nil -} - -func (n *FsNode) deleteSelf() { - n.childrenLock.Lock() - for _, child := range n.children { - child.deleteSelf() - } - n.children = nil - n.childrenLock.Unlock() - - n.node = nil - n.parent = nil - -} diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go deleted file mode 100644 index 67f9aacc8..000000000 --- a/weed/filesys/fscache_test.go +++ /dev/null @@ -1,96 +0,0 @@ -package filesys - -import ( - "testing" - - "github.com/chrislusf/seaweedfs/weed/util" -) - -func TestPathSplit(t *testing.T) { - parts := util.FullPath("/").Split() - if len(parts) != 0 { - t.Errorf("expecting an empty list, but getting %d", len(parts)) - } - - parts = util.FullPath("/readme.md").Split() - if len(parts) != 1 { - t.Errorf("expecting an empty list, but getting %d", len(parts)) - } - -} - -func TestFsCache(t *testing.T) { - - cache := newFsCache(nil) - - x := cache.GetFsNode(util.FullPath("/y/x")) - if x != nil { - t.Errorf("wrong node!") - } - - p := util.FullPath("/a/b/c") - cache.SetFsNode(p, &File{Name: "cc"}) - tNode := cache.GetFsNode(p) - tFile := tNode.(*File) - if tFile.Name != "cc" { - t.Errorf("expecting a FsNode") - } - - cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) - cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) - cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"}) - cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) - cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) - - b := cache.GetFsNode(util.FullPath("/a/b")) - if b != nil { - t.Errorf("unexpected node!") - } - - a := cache.GetFsNode(util.FullPath("/a")) - if a == nil { - t.Errorf("missing node!") - } - - cache.DeleteFsNode(util.FullPath("/a")) - if b != nil { - t.Errorf("unexpected node!") - } - - a = cache.GetFsNode(util.FullPath("/a")) - if a != nil { - t.Errorf("wrong DeleteFsNode!") - } - - z := cache.GetFsNode(util.FullPath("/z")) - if z == nil { - t.Errorf("missing node!") - } - - y := cache.GetFsNode(util.FullPath("/x/y")) - if y != nil { - t.Errorf("wrong node!") - } - -} - -func TestFsCacheMove(t *testing.T) { - - cache := newFsCache(nil) - - cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) - cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) - cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) - cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) - - cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x")) - - d := cache.GetFsNode(util.FullPath("/z/x/d")) - if d == nil { - t.Errorf("unexpected nil node!") - } - if d.(*File).Name != "dd" { - t.Errorf("unexpected non dd node!") - } - -} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 2b0ef64c2..ee4dcc916 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -10,18 +10,19 @@ import ( "sync" "time" - "github.com/chrislusf/seaweedfs/weed/util/grace" - "github.com/karlseguin/ccache" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/util/grace" + + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" ) type Option struct { @@ -47,7 +48,6 @@ type Option struct { OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers Cipher bool // whether encrypt data on volume server - AsyncMetaDataCaching bool // whether asynchronously cache meta data } @@ -56,7 +56,6 @@ var _ = fs.FSStatfser(&WFS{}) type WFS struct { option *Option - listDirectoryEntriesCache *ccache.Cache // contains all open handles, protected by handlesLock handlesLock sync.Mutex @@ -67,7 +66,6 @@ type WFS struct { stats statsCache root fs.Node - fsNodeCache *FsCache chunkCache *chunk_cache.ChunkCache metaCache *meta_cache.MetaCache @@ -80,7 +78,6 @@ type statsCache struct { func NewSeaweedFileSystem(option *Option) *WFS { wfs := &WFS{ option: option, - listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)), handles: make(map[uint64]*FileHandle), bufPool: sync.Pool{ New: func() interface{} { @@ -95,21 +92,18 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.chunkCache.Shutdown() }) } - if wfs.option.AsyncMetaDataCaching { - wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta")) - startTime := time.Now() - if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil { - glog.V(0).Infof("failed to init meta cache: %v", err) - } else { - go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) - grace.OnInterrupt(func() { - wfs.metaCache.Shutdown() - }) - } + wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta")) + startTime := time.Now() + if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil { + glog.V(0).Infof("failed to init meta cache: %v", err) + } else { + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) + grace.OnInterrupt(func() { + wfs.metaCache.Shutdown() + }) } wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} - wfs.fsNodeCache = newFsCache(wfs.root) return wfs } @@ -229,24 +223,6 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. return nil } -func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry { - item := wfs.listDirectoryEntriesCache.Get(string(path)) - if item != nil && !item.Expired() { - return item.Value().(*filer_pb.Entry) - } - return nil -} -func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) { - if entry == nil { - wfs.listDirectoryEntriesCache.Delete(string(path)) - } else { - wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl) - } -} -func (wfs *WFS) cacheDelete(path util.FullPath) { - wfs.listDirectoryEntriesCache.Delete(string(path)) -} - func (wfs *WFS) AdjustedUrl(hostAndPort string) string { if !wfs.option.OutsideContainerClusterMode { return hostAndPort diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go index 7e7b8c60b..940979457 100644 --- a/weed/filesys/xattr.go +++ b/weed/filesys/xattr.go @@ -3,10 +3,10 @@ package filesys import ( "context" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/seaweedfs/fuse" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/seaweedfs/fuse" ) func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { @@ -110,43 +110,13 @@ func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.Lis func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) { fullpath := util.NewFullPath(dir, name) - entry = wfs.cacheGet(fullpath) - if entry != nil { - return - } // glog.V(3).Infof("read entry cache miss %s", fullpath) // read from async meta cache - if wfs.option.AsyncMetaDataCaching { - cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) - if cacheErr == filer_pb.ErrNotFound { - return nil, fuse.ENOENT - } - return cachedEntry.ToProtoEntry(), nil + cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT } + return cachedEntry.ToProtoEntry(), nil - err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Name: name, - Directory: dir, - } - - resp, err := filer_pb.LookupEntry(client, request) - if err != nil { - if err == filer_pb.ErrNotFound { - glog.V(3).Infof("file attr read not found file %v: %v", request, err) - return fuse.ENOENT - } - glog.V(3).Infof("attr read %v: %v", request, err) - return fuse.EIO - } - - entry = resp.Entry - wfs.cacheSet(fullpath, entry, wfs.option.EntryCacheTtl) - - return nil - }) - - return } diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 71e2c7d17..7029c3342 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -32,11 +32,11 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom moveErr := fs.moveEntry(ctx, oldParent, oldEntry, util.FullPath(filepath.ToSlash(req.NewDirectory)), req.NewName, &events) if moveErr != nil { fs.filer.RollbackTransaction(ctx) - return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, err) + return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr) } else { if commitError := fs.filer.CommitTransaction(ctx); commitError != nil { fs.filer.RollbackTransaction(ctx) - return nil, fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, err) + return nil, fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, commitError) } } diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 37c4afd5c..d86664542 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -487,6 +487,10 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize)) f.off += int64(readSize) + if err == io.EOF { + err = nil + } + if err != nil { glog.Errorf("file read %s: %v", f.name, err) } diff --git a/weed/util/constants.go b/weed/util/constants.go index f2585fda0..6e9b83a0b 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 80) + VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 81) COMMIT = "" ) |
