diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 546 | ||||
| -rw-r--r-- | weed/filesys/dir_link.go | 20 | ||||
| -rw-r--r-- | weed/filesys/dir_rename.go | 115 | ||||
| -rw-r--r-- | weed/filesys/dir_test.go | 34 | ||||
| -rw-r--r-- | weed/filesys/dirty_page.go | 194 | ||||
| -rw-r--r-- | weed/filesys/dirty_page_interval.go | 220 | ||||
| -rw-r--r-- | weed/filesys/dirty_page_interval_test.go | 89 | ||||
| -rw-r--r-- | weed/filesys/file.go | 206 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 243 | ||||
| -rw-r--r-- | weed/filesys/fscache.go | 207 | ||||
| -rw-r--r-- | weed/filesys/fscache_test.go | 96 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/cache_config.go | 32 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache.go | 106 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache_init.go | 47 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache_subscribe.go | 69 | ||||
| -rw-r--r-- | weed/filesys/unimplemented.go | 20 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 136 | ||||
| -rw-r--r-- | weed/filesys/wfs_deletion.go | 89 | ||||
| -rw-r--r-- | weed/filesys/wfs_filer_client.go | 40 | ||||
| -rw-r--r-- | weed/filesys/xattr.go | 123 |
20 files changed, 1877 insertions, 755 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index fae289217..9ef74a95c 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -1,23 +1,27 @@ package filesys import ( + "bytes" "context" "os" - "path" - "path/filepath" + "strings" "time" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/util" ) type Dir struct { - Path string - wfs *WFS - attributes *filer_pb.FuseAttributes + name string + wfs *WFS + entry *filer_pb.Entry + parent *Dir } var _ = fs.Node(&Dir{}) @@ -28,99 +32,96 @@ var _ = fs.HandleReadDirAller(&Dir{}) var _ = fs.NodeRemover(&Dir{}) var _ = fs.NodeRenamer(&Dir{}) var _ = fs.NodeSetattrer(&Dir{}) +var _ = fs.NodeGetxattrer(&Dir{}) +var _ = fs.NodeSetxattrer(&Dir{}) +var _ = fs.NodeRemovexattrer(&Dir{}) +var _ = fs.NodeListxattrer(&Dir{}) +var _ = fs.NodeForgetter(&Dir{}) -func (dir *Dir) Attr(context context.Context, attr *fuse.Attr) error { +func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { // https://github.com/bazil/fuse/issues/196 attr.Valid = time.Second - if dir.Path == dir.wfs.option.FilerMountRootPath { - attr.Uid = dir.wfs.option.MountUid - attr.Gid = dir.wfs.option.MountGid - attr.Mode = dir.wfs.option.MountMode + if dir.FullPath() == dir.wfs.option.FilerMountRootPath { + dir.setRootDirAttributes(attr) + glog.V(3).Infof("root dir Attr %s, attr: %+v", dir.FullPath(), attr) return nil } - item := dir.wfs.listDirectoryEntriesCache.Get(dir.Path) - if item != nil && !item.Expired() { - entry := item.Value().(*filer_pb.Entry) - - attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) - attr.Ctime = time.Unix(entry.Attributes.Crtime, 0) - attr.Mode = os.FileMode(entry.Attributes.FileMode) - attr.Gid = entry.Attributes.Gid - attr.Uid = entry.Attributes.Uid - - return nil + if err := dir.maybeLoadEntry(); err != nil { + glog.V(3).Infof("dir Attr %s,err: %+v", dir.FullPath(), err) + return err } - parent, name := filepath.Split(dir.Path) + attr.Inode = util.FullPath(dir.FullPath()).AsInode() + attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir + attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0) + attr.Crtime = time.Unix(dir.entry.Attributes.Crtime, 0) + attr.Gid = dir.entry.Attributes.Gid + attr.Uid = dir.entry.Attributes.Uid - err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: parent, - Name: name, - } + glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) - glog.V(1).Infof("read dir %s attr: %v", dir.Path, request) - resp, err := client.LookupDirectoryEntry(context, request) - if err != nil { - if err == filer2.ErrNotFound { - return nil - } - glog.V(0).Infof("read dir %s attr %v: %v", dir.Path, request, err) - return err - } - - if resp.Entry != nil { - dir.attributes = resp.Entry.Attributes - } + return nil +} - // dir.wfs.listDirectoryEntriesCache.Set(dir.Path, resp.Entry, dir.wfs.option.EntryCacheTtl) +func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { - return nil - }) + glog.V(4).Infof("dir Getxattr %s", dir.FullPath()) - if err != nil { + if err := dir.maybeLoadEntry(); err != nil { return err } - // glog.V(1).Infof("dir %s: %v", dir.Path, attributes) - // glog.V(1).Infof("dir %s permission: %v", dir.Path, os.FileMode(attributes.FileMode)) - - attr.Mode = os.FileMode(dir.attributes.FileMode) | os.ModeDir + return getxattr(dir.entry, req, resp) +} - attr.Mtime = time.Unix(dir.attributes.Mtime, 0) - attr.Ctime = time.Unix(dir.attributes.Crtime, 0) - attr.Gid = dir.attributes.Gid - attr.Uid = dir.attributes.Uid +func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { + attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode() + attr.Valid = time.Hour + attr.Uid = dir.wfs.option.MountUid + attr.Gid = dir.wfs.option.MountGid + attr.Mode = dir.wfs.option.MountMode + attr.Crtime = dir.wfs.option.MountCtime + attr.Ctime = dir.wfs.option.MountCtime + attr.Mtime = dir.wfs.option.MountMtime + attr.Atime = dir.wfs.option.MountMtime + attr.BlockSize = 1024 * 1024 +} - return nil +func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { + return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node { + return &File{ + Name: name, + dir: dir, + wfs: dir.wfs, + entry: entry, + entryViewCache: nil, + } + }) } -func (dir *Dir) newFile(name string, entry *filer_pb.Entry) *File { - return &File{ - Name: name, - dir: dir, - wfs: dir.wfs, - entry: entry, - entryViewCache: nil, - } +func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node { + + return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node { + return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir} + }) + } func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) { request := &filer_pb.CreateEntryRequest{ - Directory: dir.Path, + Directory: dir.FullPath(), Entry: &filer_pb.Entry{ Name: req.Name, IsDirectory: req.Mode&os.ModeDir > 0, Attributes: &filer_pb.FuseAttributes{ Mtime: time.Now().Unix(), Crtime: time.Now().Unix(), - FileMode: uint32(req.Mode), + FileMode: uint32(req.Mode &^ dir.wfs.option.Umask), Uid: req.Uid, Gid: req.Gid, Collection: dir.wfs.option.Collection, @@ -128,109 +129,119 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, TtlSec: dir.wfs.option.TtlSec, }, }, + OExcl: req.Flags&fuse.OpenExclusive != 0, } - glog.V(1).Infof("create: %v", request) + glog.V(1).Infof("create %s/%s: %v", dir.FullPath(), req.Name, req.Flags) - if request.Entry.IsDirectory { - if err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - if _, err := client.CreateEntry(ctx, request); err != nil { - glog.V(0).Infof("create %s/%s: %v", dir.Path, req.Name, err) - return fuse.EIO + if err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := filer_pb.CreateEntry(client, request); err != nil { + if strings.Contains(err.Error(), "EEXIST") { + return fuse.EEXIST } - return nil - }); err != nil { - return nil, nil, err + return fuse.EIO } - } - file := dir.newFile(req.Name, request.Entry) - if !request.Entry.IsDirectory { - file.isOpen = true + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + + return nil + }); err != nil { + return nil, nil, err } + var node fs.Node + if request.Entry.IsDirectory { + node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), request.Entry) + return node, nil, nil + } + + node = dir.newFile(req.Name, request.Entry) + file := node.(*File) + file.isOpen++ fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid) - fh.dirtyMetadata = true return file, fh, nil } func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) { - err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + glog.V(4).Infof("mkdir %s: %s", dir.FullPath(), req.Name) + + newEntry := &filer_pb.Entry{ + Name: req.Name, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(req.Mode &^ dir.wfs.option.Umask), + Uid: req.Uid, + Gid: req.Gid, + }, + } + + err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ - Directory: dir.Path, - Entry: &filer_pb.Entry{ - Name: req.Name, - IsDirectory: true, - Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(req.Mode), - Uid: req.Uid, - Gid: req.Gid, - }, - }, + Directory: dir.FullPath(), + Entry: newEntry, } glog.V(1).Infof("mkdir: %v", request) - if _, err := client.CreateEntry(ctx, request); err != nil { - glog.V(0).Infof("mkdir %s/%s: %v", dir.Path, req.Name, err) - return fuse.EIO + if err := filer_pb.CreateEntry(client, request); err != nil { + glog.V(0).Infof("mkdir %s/%s: %v", dir.FullPath(), req.Name, err) + return err } + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + return nil }) if err == nil { - node := &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs} + node := dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), newEntry) + return node, nil } - return nil, err + glog.V(0).Infof("mkdir %s/%s: %v", dir.FullPath(), req.Name, err) + + return nil, fuse.EIO } func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { - var entry *filer_pb.Entry + glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String()) - item := dir.wfs.listDirectoryEntriesCache.Get(path.Join(dir.Path, req.Name)) - if item != nil && !item.Expired() { - entry = item.Value().(*filer_pb.Entry) + fullFilePath := util.NewFullPath(dir.FullPath(), req.Name) + dirPath := util.FullPath(dir.FullPath()) + meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, util.FullPath(dirPath)) + cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT } + entry := cachedEntry.ToProtoEntry() if entry == nil { - err = dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir.Path, - Name: req.Name, - } - - glog.V(4).Infof("lookup directory entry: %v", request) - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - // glog.V(0).Infof("lookup %s/%s: %v", dir.Path, name, err) - return fuse.ENOENT - } - - entry = resp.Entry - - // dir.wfs.listDirectoryEntriesCache.Set(path.Join(dir.Path, entry.Name), entry, dir.wfs.option.EntryCacheTtl) - - return nil - }) + // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath) + entry, err = filer_pb.GetEntry(dir.wfs, fullFilePath) + if err != nil { + glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err) + return nil, fuse.ENOENT + } + } else { + glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath) } if entry != nil { if entry.IsDirectory { - node = &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs, attributes: entry.Attributes} + node = dir.newDirectory(fullFilePath, entry) } else { node = dir.newFile(req.Name, entry) } - resp.EntryValid = time.Duration(0) + // resp.EntryValid = time.Second + resp.Attr.Inode = fullFilePath.AsInode() + resp.Attr.Valid = time.Second resp.Attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) - resp.Attr.Ctime = time.Unix(entry.Attributes.Crtime, 0) + resp.Attr.Crtime = time.Unix(entry.Attributes.Crtime, 0) resp.Attr.Mode = os.FileMode(entry.Attributes.FileMode) resp.Attr.Gid = entry.Attributes.Gid resp.Attr.Uid = entry.Attributes.Uid @@ -238,203 +249,234 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. return node, nil } + glog.V(4).Infof("not found dir GetEntry %s: %v", fullFilePath, err) return nil, fuse.ENOENT } func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - err = dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath()) - paginationLimit := 1024 - remaining := dir.wfs.option.DirListingLimit + processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error { + fullpath := util.NewFullPath(dir.FullPath(), entry.Name) + inode := fullpath.AsInode() + if entry.IsDirectory { + dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_Dir} + ret = append(ret, dirent) + } else { + dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_File} + ret = append(ret, dirent) + } + return nil + } - lastEntryName := "" + dirPath := util.FullPath(dir.FullPath()) + meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) + listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit)) + if listErr != nil { + glog.Errorf("list meta cache: %v", listErr) + return nil, fuse.EIO + } + for _, cachedEntry := range listedEntries { + processEachEntryFn(cachedEntry.ToProtoEntry(), false) + } + return +} - for remaining >= 0 { +func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { - request := &filer_pb.ListEntriesRequest{ - Directory: dir.Path, - StartFromFileName: lastEntryName, - Limit: uint32(paginationLimit), - } + if !req.Dir { + return dir.removeOneFile(req) + } - glog.V(4).Infof("read directory: %v", request) - resp, err := client.ListEntries(ctx, request) - if err != nil { - glog.V(0).Infof("list %s: %v", dir.Path, err) - return fuse.EIO - } + return dir.removeFolder(req) - cacheTtl := estimatedCacheTtl(len(resp.Entries)) - - for _, entry := range resp.Entries { - if entry.IsDirectory { - dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir} - ret = append(ret, dirent) - } else { - dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File} - ret = append(ret, dirent) - } - dir.wfs.listDirectoryEntriesCache.Set(path.Join(dir.Path, entry.Name), entry, cacheTtl) - lastEntryName = entry.Name - } +} - remaining -= len(resp.Entries) +func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { - if len(resp.Entries) < paginationLimit { - break - } + filePath := util.NewFullPath(dir.FullPath(), req.Name) + entry, err := filer_pb.GetEntry(dir.wfs, filePath) + if err != nil { + return err + } + if entry == nil { + return nil + } - } + dir.wfs.deleteFileChunks(entry.Chunks) - return nil - }) + dir.wfs.fsNodeCache.DeleteFsNode(filePath) + + dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) + + glog.V(3).Infof("remove file: %v", req) + err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false, false) + if err != nil { + glog.V(3).Infof("not found remove file %s/%s: %v", dir.FullPath(), req.Name, err) + return fuse.ENOENT + } + + return nil - return ret, err } -func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error { +func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { - if !req.Dir { - return dir.removeOneFile(ctx, req) + t := util.NewFullPath(dir.FullPath(), req.Name) + dir.wfs.fsNodeCache.DeleteFsNode(t) + + dir.wfs.metaCache.DeleteEntry(context.Background(), t) + + glog.V(3).Infof("remove directory entry: %v", req) + err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false, false) + if err != nil { + glog.V(3).Infof("not found remove %s/%s: %v", dir.FullPath(), req.Name, err) + return fuse.ENOENT } - return dir.removeFolder(ctx, req) + return nil } -func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) error { +func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - var entry *filer_pb.Entry - err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + glog.V(3).Infof("%v dir setattr %+v", dir.FullPath(), req) - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir.Path, - Name: req.Name, - } + if err := dir.maybeLoadEntry(); err != nil { + return err + } - glog.V(4).Infof("lookup to-be-removed entry: %v", request) - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - // glog.V(0).Infof("lookup %s/%s: %v", dir.Path, name, err) - return fuse.ENOENT - } + if req.Valid.Mode() { + dir.entry.Attributes.FileMode = uint32(req.Mode) + } - entry = resp.Entry + if req.Valid.Uid() { + dir.entry.Attributes.Uid = req.Uid + } - return nil - }) + if req.Valid.Gid() { + dir.entry.Attributes.Gid = req.Gid + } - if err != nil { - return err + if req.Valid.Mtime() { + dir.entry.Attributes.Mtime = req.Mtime.Unix() } - dir.wfs.deleteFileChunks(entry.Chunks) + return dir.saveEntry() - return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { +} - request := &filer_pb.DeleteEntryRequest{ - Directory: dir.Path, - Name: req.Name, - IsDeleteData: false, - } +func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { - glog.V(3).Infof("remove file: %v", request) - _, err := client.DeleteEntry(ctx, request) - if err != nil { - glog.V(3).Infof("remove file %s/%s: %v", dir.Path, req.Name, err) - return fuse.ENOENT - } + glog.V(4).Infof("dir Setxattr %s: %s", dir.FullPath(), req.Name) - dir.wfs.listDirectoryEntriesCache.Delete(path.Join(dir.Path, req.Name)) + if err := dir.maybeLoadEntry(); err != nil { + return err + } - return nil - }) + if err := setxattr(dir.entry, req); err != nil { + return err + } -} + return dir.saveEntry() -func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error { +} - return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { +func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error { - request := &filer_pb.DeleteEntryRequest{ - Directory: dir.Path, - Name: req.Name, - IsDeleteData: true, - } + glog.V(4).Infof("dir Removexattr %s: %s", dir.FullPath(), req.Name) - glog.V(3).Infof("remove directory entry: %v", request) - _, err := client.DeleteEntry(ctx, request) - if err != nil { - glog.V(3).Infof("remove %s/%s: %v", dir.Path, req.Name, err) - return fuse.ENOENT - } + if err := dir.maybeLoadEntry(); err != nil { + return err + } - dir.wfs.listDirectoryEntriesCache.Delete(path.Join(dir.Path, req.Name)) + if err := removexattr(dir.entry, req); err != nil { + return err + } - return nil - }) + return dir.saveEntry() } -func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { +func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { - glog.V(3).Infof("%v dir setattr %+v, fh=%d", dir.Path, req, req.Handle) - if req.Valid.Mode() { - dir.attributes.FileMode = uint32(req.Mode) - } + glog.V(4).Infof("dir Listxattr %s", dir.FullPath()) - if req.Valid.Uid() { - dir.attributes.Uid = req.Uid + if err := dir.maybeLoadEntry(); err != nil { + return err } - if req.Valid.Gid() { - dir.attributes.Gid = req.Gid + if err := listxattr(dir.entry, req, resp); err != nil { + return err } - if req.Valid.Mtime() { - dir.attributes.Mtime = req.Mtime.Unix() + return nil + +} + +func (dir *Dir) Forget() { + glog.V(3).Infof("Forget dir %s", dir.FullPath()) + + dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath())) +} + +func (dir *Dir) maybeLoadEntry() error { + if dir.entry == nil { + parentDirPath, name := util.FullPath(dir.FullPath()).DirAndName() + entry, err := dir.wfs.maybeLoadEntry(parentDirPath, name) + if err != nil { + return err + } + dir.entry = entry } + return nil +} - parentDir, name := filer2.FullPath(dir.Path).DirAndName() - return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { +func (dir *Dir) saveEntry() error { + + parentDir, name := util.FullPath(dir.FullPath()).DirAndName() + + return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ Directory: parentDir, - Entry: &filer_pb.Entry{ - Name: name, - Attributes: dir.attributes, - }, + Entry: dir.entry, } - glog.V(1).Infof("set attr directory entry: %v", request) - _, err := client.UpdateEntry(ctx, request) + glog.V(1).Infof("save dir entry: %v", request) + _, err := client.UpdateEntry(context.Background(), request) if err != nil { - glog.V(0).Infof("UpdateEntry %s: %v", dir.Path, err) + glog.V(0).Infof("UpdateEntry dir %s/%s: %v", parentDir, name, err) return fuse.EIO } - dir.wfs.listDirectoryEntriesCache.Delete(dir.Path) + dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) return nil }) - } -func estimatedCacheTtl(numEntries int) time.Duration { - if numEntries < 100 { - // 30 ms per entry - return 3 * time.Second - } - if numEntries < 1000 { - // 10 ms per entry - return 10 * time.Second +func (dir *Dir) FullPath() string { + var parts []string + for p := dir; p != nil; p = p.parent { + if strings.HasPrefix(p.name, "/") { + if len(p.name) > 1 { + parts = append(parts, p.name[1:]) + } + } else { + parts = append(parts, p.name) + } } - if numEntries < 10000 { - // 10 ms per entry - return 100 * time.Second + + if len(parts) == 0 { + return "/" } - // 2 ms per entry - return time.Duration(numEntries*2) * time.Millisecond + var buf bytes.Buffer + for i := len(parts) - 1; i >= 0; i-- { + buf.WriteString("/") + buf.WriteString(parts[i]) + } + return buf.String() } diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index 3b3735369..4990e743c 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -6,6 +6,7 @@ import ( "syscall" "time" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/fuse" @@ -17,17 +18,17 @@ var _ = fs.NodeReadlinker(&File{}) func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, error) { - glog.V(3).Infof("Symlink: %v/%v to %v", dir.Path, req.NewName, req.Target) + glog.V(3).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target) request := &filer_pb.CreateEntryRequest{ - Directory: dir.Path, + Directory: dir.FullPath(), Entry: &filer_pb.Entry{ Name: req.NewName, IsDirectory: false, Attributes: &filer_pb.FuseAttributes{ Mtime: time.Now().Unix(), Crtime: time.Now().Unix(), - FileMode: uint32(os.FileMode(0755) | os.ModeSymlink), + FileMode: uint32((os.FileMode(0777) | os.ModeSymlink) &^ dir.wfs.option.Umask), Uid: req.Uid, Gid: req.Gid, SymlinkTarget: req.Target, @@ -35,11 +36,14 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, }, } - err := dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - if _, err := client.CreateEntry(ctx, request); err != nil { - glog.V(0).Infof("symlink %s/%s: %v", dir.Path, req.NewName, err) + err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := filer_pb.CreateEntry(client, request); err != nil { + glog.V(0).Infof("symlink %s/%s: %v", dir.FullPath(), req.NewName, err) return fuse.EIO } + + dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + return nil }) @@ -51,7 +55,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (string, error) { - if err := file.maybeLoadAttributes(ctx); err != nil { + if err := file.maybeLoadEntry(ctx); err != nil { return "", err } @@ -59,7 +63,7 @@ func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (stri return "", fuse.Errno(syscall.EINVAL) } - glog.V(3).Infof("Readlink: %v/%v => %v", file.dir.Path, file.Name, file.entry.Attributes.SymlinkTarget) + glog.V(3).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, file.entry.Attributes.SymlinkTarget) return file.entry.Attributes.SymlinkTarget, nil diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index d29281f35..0f7f131b1 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -2,118 +2,49 @@ package filesys import ( "context" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" - "math" - "path/filepath" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" ) func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error { newDir := newDirectory.(*Dir) - return dir.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + newPath := util.NewFullPath(newDir.FullPath(), req.NewName) + oldPath := util.NewFullPath(dir.FullPath(), req.OldName) - // find existing entry - request := &filer_pb.LookupDirectoryEntryRequest{ - Directory: dir.Path, - Name: req.OldName, + glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath) + + err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: dir.FullPath(), + OldName: req.OldName, + NewDirectory: newDir.FullPath(), + NewName: req.NewName, } - glog.V(4).Infof("find existing directory entry: %v", request) - resp, err := client.LookupDirectoryEntry(ctx, request) + _, err := client.AtomicRenameEntry(context.Background(), request) if err != nil { - glog.V(3).Infof("renaming find %s/%s: %v", dir.Path, req.OldName, err) - return fuse.ENOENT + glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err) + return fuse.EIO } - entry := resp.Entry + return nil - glog.V(4).Infof("found existing directory entry resp: %+v", resp) - - return moveEntry(ctx, client, dir.Path, entry, newDir.Path, req.NewName) }) -} - -func moveEntry(ctx context.Context, client filer_pb.SeaweedFilerClient, oldParent string, entry *filer_pb.Entry, newParent, newName string) error { - if entry.IsDirectory { - currentDirPath := filepath.Join(oldParent, entry.Name) + if err == nil { - lastFileName := "" - includeLastFile := false - limit := math.MaxInt32 - for limit > 0 { - request := &filer_pb.ListEntriesRequest{ - Directory: currentDirPath, - StartFromFileName: lastFileName, - InclusiveStartFrom: includeLastFile, - Limit: 1024, - } - glog.V(4).Infof("read directory: %v", request) - resp, err := client.ListEntries(ctx, request) - if err != nil { - glog.V(0).Infof("list %s: %v", oldParent, err) - return fuse.EIO - } - if len(resp.Entries) == 0 { - break - } - - for _, item := range resp.Entries { - lastFileName = item.Name - err := moveEntry(ctx, client, currentDirPath, item, filepath.Join(newParent, newName), item.Name) - if err != nil { - return err - } - limit-- - } - if len(resp.Entries) < 1024 { - break - } - } + // fmt.Printf("rename path: %v => %v\n", oldPath, newPath) + dir.wfs.fsNodeCache.Move(oldPath, newPath) + delete(dir.wfs.handles, oldPath.AsInode()) } - // add to new directory - { - request := &filer_pb.CreateEntryRequest{ - Directory: newParent, - Entry: &filer_pb.Entry{ - Name: newName, - IsDirectory: entry.IsDirectory, - Attributes: entry.Attributes, - Chunks: entry.Chunks, - }, - } - - glog.V(1).Infof("create new entry: %v", request) - if _, err := client.CreateEntry(ctx, request); err != nil { - glog.V(0).Infof("renaming create %s/%s: %v", newParent, newName, err) - return fuse.EIO - } - } - - // delete old entry - { - request := &filer_pb.DeleteEntryRequest{ - Directory: oldParent, - Name: entry.Name, - IsDeleteData: false, - } - - glog.V(1).Infof("remove old entry: %v", request) - _, err := client.DeleteEntry(ctx, request) - if err != nil { - glog.V(0).Infof("renaming delete %s/%s: %v", oldParent, entry.Name, err) - return fuse.EIO - } - - } - - return nil - + return err } diff --git a/weed/filesys/dir_test.go b/weed/filesys/dir_test.go new file mode 100644 index 000000000..49c76eb5e --- /dev/null +++ b/weed/filesys/dir_test.go @@ -0,0 +1,34 @@ +package filesys + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDirPath(t *testing.T) { + + p := &Dir{name: "/some"} + p = &Dir{name: "path", parent: p} + p = &Dir{name: "to", parent: p} + p = &Dir{name: "a", parent: p} + p = &Dir{name: "file", parent: p} + + assert.Equal(t, "/some/path/to/a/file", p.FullPath()) + + p = &Dir{name: "/some"} + assert.Equal(t, "/some", p.FullPath()) + + p = &Dir{name: "/"} + assert.Equal(t, "/", p.FullPath()) + + p = &Dir{name: "/"} + p = &Dir{name: "path", parent: p} + assert.Equal(t, "/path", p.FullPath()) + + p = &Dir{name: "/"} + p = &Dir{name: "path", parent: p} + p = &Dir{name: "to", parent: p} + assert.Equal(t, "/path/to", p.FullPath()) + +} diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 696296e62..45224b3e7 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -4,168 +4,149 @@ import ( "bytes" "context" "fmt" - "sync/atomic" + "io" + "sync" "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "sync" + "github.com/chrislusf/seaweedfs/weed/security" ) type ContinuousDirtyPages struct { - hasData bool - Offset int64 - Size int64 - Data []byte - f *File - lock sync.Mutex + intervals *ContinuousIntervals + f *File + lock sync.Mutex + collection string + replication string } func newDirtyPages(file *File) *ContinuousDirtyPages { return &ContinuousDirtyPages{ - Data: nil, - f: file, + intervals: &ContinuousIntervals{}, + f: file, } } func (pages *ContinuousDirtyPages) releaseResource() { - if pages.Data != nil { - pages.f.wfs.bufPool.Put(pages.Data) - pages.Data = nil - atomic.AddInt32(&counter, -1) - glog.V(3).Infof("%s/%s releasing resource %d", pages.f.dir.Path, pages.f.Name, counter) - } } var counter = int32(0) -func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { +func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { pages.lock.Lock() defer pages.lock.Unlock() - var chunk *filer_pb.FileChunk + glog.V(3).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data))) if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { // this is more than what buffer can hold. - return pages.flushAndSave(ctx, offset, data) + return pages.flushAndSave(offset, data) } - if pages.Data == nil { - pages.Data = pages.f.wfs.bufPool.Get().([]byte) - atomic.AddInt32(&counter, 1) - glog.V(3).Infof("%s/%s acquire resource %d", pages.f.dir.Path, pages.f.Name, counter) - } - - if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) || - pages.Offset+int64(len(pages.Data)) < offset+int64(len(data)) { - // if the data is out of range, - // or buffer is full if adding new data, - // flush current buffer and add new data - - // println("offset", offset, "size", len(data), "existing offset", pages.Offset, "size", pages.Size) + pages.intervals.AddInterval(data, offset) - if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { - if chunk != nil { - glog.V(4).Infof("%s/%s add save [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) - chunks = append(chunks, chunk) - } - } else { - glog.V(0).Infof("%s/%s add save [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) - return - } - pages.Offset = offset - copy(pages.Data, data) - pages.Size = int64(len(data)) - return - } + var chunk *filer_pb.FileChunk + var hasSavedData bool - if offset != pages.Offset+pages.Size { - // when this happens, debug shows the data overlapping with existing data is empty - // the data is not just append - if offset == pages.Offset && int(pages.Size) < len(data) { - // glog.V(2).Infof("pages[%d,%d) pages.Data len=%v, data len=%d, pages.Size=%d", pages.Offset, pages.Offset+pages.Size, len(pages.Data), len(data), pages.Size) - copy(pages.Data[pages.Size:], data[pages.Size:]) - } else { - if pages.Size != 0 { - glog.V(1).Infof("%s/%s add page: pages [%d, %d) write [%d, %d)", pages.f.dir.Path, pages.f.Name, pages.Offset, pages.Offset+pages.Size, offset, offset+int64(len(data))) - } - return pages.flushAndSave(ctx, offset, data) + if pages.intervals.TotalSize() > pages.f.wfs.option.ChunkSizeLimit { + chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage() + if hasSavedData { + chunks = append(chunks, chunk) } - } else { - copy(pages.Data[offset-pages.Offset:], data) } - pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset) - return } -func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { +func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { var chunk *filer_pb.FileChunk + var newChunks []*filer_pb.FileChunk // flush existing - if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { - if chunk != nil { - glog.V(4).Infof("%s/%s flush existing [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) - chunks = append(chunks, chunk) + if newChunks, err = pages.saveExistingPagesToStorage(); err == nil { + if newChunks != nil { + chunks = append(chunks, newChunks...) } } else { - glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) return } - pages.Size = 0 - pages.Offset = 0 // flush the new page - if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil { + if chunk, err = pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))); err == nil { if chunk != nil { - glog.V(4).Infof("%s/%s flush big request [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) + glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.FullPath(), pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId) chunks = append(chunks, chunk) } } else { - glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.FullPath(), pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err) return } return } -func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) { +func (pages *ContinuousDirtyPages) FlushToStorage() (chunks []*filer_pb.FileChunk, err error) { pages.lock.Lock() defer pages.lock.Unlock() - if pages.Size == 0 { - return nil, nil - } + return pages.saveExistingPagesToStorage() +} - if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil { - pages.Size = 0 - pages.Offset = 0 - if chunk != nil { - glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size)) +func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer_pb.FileChunk, err error) { + + var hasSavedData bool + var chunk *filer_pb.FileChunk + + for { + + chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage() + if !hasSavedData { + return chunks, err + } + + if err == nil { + chunks = append(chunks, chunk) + } else { + return } } - return + } -func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) { +func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *filer_pb.FileChunk, hasSavedData bool, err error) { - if pages.Size == 0 { - return nil, nil + maxList := pages.intervals.RemoveLargestIntervalLinkedList() + if maxList == nil { + return nil, false, nil + } + + for { + chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size()) + if err == nil { + hasSavedData = true + glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId) + return + } else { + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err) + time.Sleep(5 * time.Second) + } } - return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset) } -func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) { +func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) { var fileId, host string + var auth security.EncodedJwt + + dir, _ := pages.f.fullpath().DirAndName() - if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -173,15 +154,21 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte Collection: pages.f.wfs.option.Collection, TtlSec: pages.f.wfs.option.TtlSec, DataCenter: pages.f.wfs.option.DataCenter, + ParentPath: dir, } - resp, err := client.AssignVolume(ctx, request) + resp, err := client.AssignVolume(context.Background(), request) if err != nil { glog.V(0).Infof("assign volume failure %v: %v", request, err) return err } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } - fileId, host = resp.FileId, resp.Url + fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + host = pages.f.wfs.AdjustedUrl(host) + pages.collection, pages.replication = resp.Collection, resp.Replication return nil }); err != nil { @@ -189,8 +176,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - bufReader := bytes.NewReader(buf) - uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, "") + uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) return nil, fmt.Errorf("upload data: %v", err) @@ -199,14 +185,9 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err) return nil, fmt.Errorf("upload result: %v", uploadResult.Error) } + pages.f.wfs.chunkCache.SetChunk(fileId, data) - return &filer_pb.FileChunk{ - FileId: fileId, - Offset: offset, - Size: uint64(len(buf)), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - }, nil + return uploadResult.ToPbFileChunk(fileId, offset), nil } @@ -216,3 +197,18 @@ func max(x, y int64) int64 { } return y } +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} + +func (pages *ContinuousDirtyPages) ReadDirtyData(data []byte, startOffset int64) (offset int64, size int) { + + pages.lock.Lock() + defer pages.lock.Unlock() + + return pages.intervals.ReadData(data, startOffset) + +} diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go new file mode 100644 index 000000000..ec94c6df1 --- /dev/null +++ b/weed/filesys/dirty_page_interval.go @@ -0,0 +1,220 @@ +package filesys + +import ( + "bytes" + "io" + "math" +) + +type IntervalNode struct { + Data []byte + Offset int64 + Size int64 + Next *IntervalNode +} + +type IntervalLinkedList struct { + Head *IntervalNode + Tail *IntervalNode +} + +type ContinuousIntervals struct { + lists []*IntervalLinkedList +} + +func (list *IntervalLinkedList) Offset() int64 { + return list.Head.Offset +} +func (list *IntervalLinkedList) Size() int64 { + return list.Tail.Offset + list.Tail.Size - list.Head.Offset +} +func (list *IntervalLinkedList) addNodeToTail(node *IntervalNode) { + // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size) + list.Tail.Next = node + list.Tail = node +} +func (list *IntervalLinkedList) addNodeToHead(node *IntervalNode) { + // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size) + node.Next = list.Head + list.Head = node +} + +func (list *IntervalLinkedList) ReadData(buf []byte, start, stop int64) { + t := list.Head + for { + + nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size) + if nodeStart < nodeStop { + // glog.V(0).Infof("copying start=%d stop=%d t=[%d,%d) t.data=%d => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.Offset, t.Offset+t.Size, len(t.Data), len(buf), nodeStart, nodeStop) + copy(buf[nodeStart-start:], t.Data[nodeStart-t.Offset:nodeStop-t.Offset]) + } + + if t.Next == nil { + break + } + t = t.Next + } +} + +func (c *ContinuousIntervals) TotalSize() (total int64) { + for _, list := range c.lists { + total += list.Size() + } + return +} + +func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList { + var nodes []*IntervalNode + for t := list.Head; t != nil; t = t.Next { + nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size) + if nodeStart >= nodeStop { + // skip non overlapping IntervalNode + continue + } + nodes = append(nodes, &IntervalNode{ + Data: t.Data[nodeStart-t.Offset : nodeStop-t.Offset], + Offset: nodeStart, + Size: nodeStop - nodeStart, + Next: nil, + }) + } + for i := 1; i < len(nodes); i++ { + nodes[i-1].Next = nodes[i] + } + return &IntervalLinkedList{ + Head: nodes[0], + Tail: nodes[len(nodes)-1], + } +} + +func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { + + interval := &IntervalNode{Data: data, Offset: offset, Size: int64(len(data))} + + var newLists []*IntervalLinkedList + for _, list := range c.lists { + // if list is to the left of new interval, add to the new list + if list.Tail.Offset+list.Tail.Size <= interval.Offset { + newLists = append(newLists, list) + } + // if list is to the right of new interval, add to the new list + if interval.Offset+interval.Size <= list.Head.Offset { + newLists = append(newLists, list) + } + // if new interval overwrite the right part of the list + if list.Head.Offset < interval.Offset && interval.Offset < list.Tail.Offset+list.Tail.Size { + // create a new list of the left part of existing list + newLists = append(newLists, subList(list, list.Offset(), interval.Offset)) + } + // if new interval overwrite the left part of the list + if list.Head.Offset < interval.Offset+interval.Size && interval.Offset+interval.Size < list.Tail.Offset+list.Tail.Size { + // create a new list of the right part of existing list + newLists = append(newLists, subList(list, interval.Offset+interval.Size, list.Tail.Offset+list.Tail.Size)) + } + // skip anything that is fully overwritten by the new interval + } + + c.lists = newLists + // add the new interval to the lists, connecting neighbor lists + var prevList, nextList *IntervalLinkedList + + for _, list := range c.lists { + if list.Head.Offset == interval.Offset+interval.Size { + nextList = list + break + } + } + + for _, list := range c.lists { + if list.Head.Offset+list.Size() == offset { + list.addNodeToTail(interval) + prevList = list + break + } + } + + if prevList != nil && nextList != nil { + // glog.V(4).Infof("connecting [%d,%d) + [%d,%d) => [%d,%d)", prevList.Head.Offset, prevList.Tail.Offset+prevList.Tail.Size, nextList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size, prevList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size) + prevList.Tail.Next = nextList.Head + prevList.Tail = nextList.Tail + c.removeList(nextList) + } else if nextList != nil { + // add to head was not done when checking + nextList.addNodeToHead(interval) + } + if prevList == nil && nextList == nil { + c.lists = append(c.lists, &IntervalLinkedList{ + Head: interval, + Tail: interval, + }) + } + + return +} + +func (c *ContinuousIntervals) RemoveLargestIntervalLinkedList() *IntervalLinkedList { + var maxSize int64 + maxIndex := -1 + for k, list := range c.lists { + if maxSize <= list.Size() { + maxSize = list.Size() + maxIndex = k + } + } + if maxSize <= 0 { + return nil + } + + t := c.lists[maxIndex] + c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...) + return t + +} + +func (c *ContinuousIntervals) removeList(target *IntervalLinkedList) { + index := -1 + for k, list := range c.lists { + if list.Offset() == target.Offset() { + index = k + } + } + if index < 0 { + return + } + + c.lists = append(c.lists[0:index], c.lists[index+1:]...) + +} + +func (c *ContinuousIntervals) ReadData(data []byte, startOffset int64) (offset int64, size int) { + var minOffset int64 = math.MaxInt64 + var maxStop int64 + for _, list := range c.lists { + start := max(startOffset, list.Offset()) + stop := min(startOffset+int64(len(data)), list.Offset()+list.Size()) + if start <= stop { + list.ReadData(data[start-startOffset:], start, stop) + minOffset = min(minOffset, start) + maxStop = max(maxStop, stop) + } + } + + if minOffset == math.MaxInt64 { + return 0, 0 + } + + offset = minOffset + size = int(maxStop - offset) + return +} + +func (l *IntervalLinkedList) ToReader() io.Reader { + var readers []io.Reader + t := l.Head + readers = append(readers, bytes.NewReader(t.Data)) + for t.Next != nil { + t = t.Next + readers = append(readers, bytes.NewReader(t.Data)) + } + return io.MultiReader(readers...) +} diff --git a/weed/filesys/dirty_page_interval_test.go b/weed/filesys/dirty_page_interval_test.go new file mode 100644 index 000000000..ab3b37b7c --- /dev/null +++ b/weed/filesys/dirty_page_interval_test.go @@ -0,0 +1,89 @@ +package filesys + +import ( + "bytes" + "testing" +) + +func TestContinuousIntervals_AddIntervalAppend(t *testing.T) { + + c := &ContinuousIntervals{} + + // 25, 25, 25 + c.AddInterval(getBytes(25, 3), 0) + // _, _, 23, 23, 23, 23 + c.AddInterval(getBytes(23, 4), 2) + + expectedData(t, c, 0, 25, 25, 23, 23, 23, 23) + +} + +func TestContinuousIntervals_AddIntervalInnerOverwrite(t *testing.T) { + + c := &ContinuousIntervals{} + + // 25, 25, 25, 25, 25 + c.AddInterval(getBytes(25, 5), 0) + // _, _, 23, 23 + c.AddInterval(getBytes(23, 2), 2) + + expectedData(t, c, 0, 25, 25, 23, 23, 25) + +} + +func TestContinuousIntervals_AddIntervalFullOverwrite(t *testing.T) { + + c := &ContinuousIntervals{} + + // 1, + c.AddInterval(getBytes(1, 1), 0) + // _, 2, + c.AddInterval(getBytes(2, 1), 1) + // _, _, 3, 3, 3 + c.AddInterval(getBytes(3, 3), 2) + // _, _, _, 4, 4, 4 + c.AddInterval(getBytes(4, 3), 3) + + expectedData(t, c, 0, 1, 2, 3, 4, 4, 4) + +} + +func TestContinuousIntervals_RealCase1(t *testing.T) { + + c := &ContinuousIntervals{} + + // 25, + c.AddInterval(getBytes(25, 1), 0) + // _, _, _, _, 23, 23 + c.AddInterval(getBytes(23, 2), 4) + // _, _, _, 24, 24, 24, 24 + c.AddInterval(getBytes(24, 4), 3) + + // _, 22, 22 + c.AddInterval(getBytes(22, 2), 1) + + expectedData(t, c, 0, 25, 22, 22, 24, 24, 24, 24) + +} + +func expectedData(t *testing.T, c *ContinuousIntervals, offset int, data ...byte) { + start, stop := int64(offset), int64(offset+len(data)) + for _, list := range c.lists { + nodeStart, nodeStop := max(start, list.Head.Offset), min(stop, list.Head.Offset+list.Size()) + if nodeStart < nodeStop { + buf := make([]byte, nodeStop-nodeStart) + list.ReadData(buf, nodeStart, nodeStop) + if bytes.Compare(buf, data[nodeStart-start:nodeStop-start]) != 0 { + t.Errorf("expected %v actual %v", data[nodeStart-start:nodeStop-start], buf) + } + } + } +} + +func getBytes(content byte, length int) []byte { + data := make([]byte, length) + for i := 0; i < length; i++ { + data[i] = content + } + return data +} diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 4bb169a33..4a6bc9a8a 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -2,14 +2,15 @@ package filesys import ( "context" + "io" "os" - "path/filepath" "sort" "time" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" ) @@ -20,6 +21,11 @@ var _ = fs.Node(&File{}) var _ = fs.NodeOpener(&File{}) var _ = fs.NodeFsyncer(&File{}) var _ = fs.NodeSetattrer(&File{}) +var _ = fs.NodeGetxattrer(&File{}) +var _ = fs.NodeSetxattrer(&File{}) +var _ = fs.NodeRemovexattrer(&File{}) +var _ = fs.NodeListxattrer(&File{}) +var _ = fs.NodeForgetter(&File{}) type File struct { Name string @@ -27,21 +33,33 @@ type File struct { wfs *WFS entry *filer_pb.Entry entryViewCache []filer2.VisibleInterval - isOpen bool + isOpen int + reader io.ReaderAt } -func (file *File) fullpath() string { - return filepath.Join(file.dir.Path, file.Name) +func (file *File) fullpath() util.FullPath { + return util.NewFullPath(file.dir.FullPath(), file.Name) } func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { - if err := file.maybeLoadAttributes(ctx); err != nil { - return err + glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr) + + if file.isOpen <= 0 { + if err := file.maybeLoadEntry(ctx); err != nil { + return err + } } + attr.Inode = file.fullpath().AsInode() + attr.Valid = time.Second attr.Mode = os.FileMode(file.entry.Attributes.FileMode) attr.Size = filer2.TotalSize(file.entry.Chunks) + if file.isOpen > 0 { + attr.Size = file.entry.Attributes.FileSize + glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size) + } + attr.Crtime = time.Unix(file.entry.Attributes.Crtime, 0) attr.Mtime = time.Unix(file.entry.Attributes.Mtime, 0) attr.Gid = file.entry.Attributes.Gid attr.Uid = file.entry.Attributes.Uid @@ -52,11 +70,22 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { } +func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { + + glog.V(4).Infof("file Getxattr %s", file.fullpath()) + + if err := file.maybeLoadEntry(ctx); err != nil { + return err + } + + return getxattr(file.entry, req, resp) +} + func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { - glog.V(3).Infof("%v file open %+v", file.fullpath(), req) + glog.V(4).Infof("file %v open %+v", file.fullpath(), req) - file.isOpen = true + file.isOpen++ handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid) @@ -70,22 +99,30 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - if err := file.maybeLoadAttributes(ctx); err != nil { - return err - } + glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.entry.Attributes) - if file.isOpen { - return nil + if err := file.maybeLoadEntry(ctx); err != nil { + return err } - glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.entry.Attributes) if req.Valid.Size() { glog.V(3).Infof("%v file setattr set size=%v", file.fullpath(), req.Size) - if req.Size == 0 { + if req.Size < filer2.TotalSize(file.entry.Chunks) { // fmt.Printf("truncate %v \n", fullPath) - file.entry.Chunks = nil + var chunks []*filer_pb.FileChunk + for _, chunk := range file.entry.Chunks { + int64Size := int64(chunk.Size) + if chunk.Offset+int64Size > int64(req.Size) { + int64Size = int64(req.Size) - chunk.Offset + } + if int64Size > 0 { + chunks = append(chunks, chunk) + } + } + file.entry.Chunks = chunks file.entryViewCache = nil + file.reader = nil } file.entry.Attributes.FileSize = req.Size } @@ -109,75 +146,88 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f file.entry.Attributes.Mtime = req.Mtime.Unix() } - return file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if file.isOpen > 0 { + return nil + } - request := &filer_pb.UpdateEntryRequest{ - Directory: file.dir.Path, - Entry: file.entry, - } + return file.saveEntry() - glog.V(1).Infof("set attr file entry: %v", request) - _, err := client.UpdateEntry(ctx, request) - if err != nil { - glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.Path, file.Name, err) - return fuse.EIO - } +} - return nil - }) +func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error { -} + glog.V(4).Infof("file Setxattr %s: %s", file.fullpath(), req.Name) -func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { - // fsync works at OS level - // write the file chunks to the filerGrpcAddress - glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req) + if err := file.maybeLoadEntry(ctx); err != nil { + return err + } + + if err := setxattr(file.entry, req); err != nil { + return err + } + + return file.saveEntry() - return nil } -func (file *File) maybeLoadAttributes(ctx context.Context) error { - if file.entry == nil || !file.isOpen { - item := file.wfs.listDirectoryEntriesCache.Get(file.fullpath()) - if item != nil && !item.Expired() { - entry := item.Value().(*filer_pb.Entry) - file.setEntry(entry) - // glog.V(1).Infof("file attr read cached %v attributes", file.Name) - } else { - err := file.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { +func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error { - request := &filer_pb.LookupDirectoryEntryRequest{ - Name: file.Name, - Directory: file.dir.Path, - } + glog.V(4).Infof("file Removexattr %s: %s", file.fullpath(), req.Name) - resp, err := client.LookupDirectoryEntry(ctx, request) - if err != nil { - glog.V(3).Infof("file attr read file %v: %v", request, err) - return fuse.ENOENT - } + if err := file.maybeLoadEntry(ctx); err != nil { + return err + } - file.setEntry(resp.Entry) + if err := removexattr(file.entry, req); err != nil { + return err + } - glog.V(3).Infof("file attr %v %+v: %d", file.fullpath(), file.entry.Attributes, filer2.TotalSize(file.entry.Chunks)) + return file.saveEntry() - // file.wfs.listDirectoryEntriesCache.Set(file.fullpath(), file.entry, file.wfs.option.EntryCacheTtl) +} - return nil - }) +func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { - if err != nil { - return err - } - } + glog.V(4).Infof("file Listxattr %s", file.fullpath()) + + if err := file.maybeLoadEntry(ctx); err != nil { + return err + } + + if err := listxattr(file.entry, req, resp); err != nil { + return err } + return nil + +} + +func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { + // fsync works at OS level + // write the file chunks to the filerGrpcAddress + glog.V(3).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req) + + return nil +} + +func (file *File) Forget() { + t := util.NewFullPath(file.dir.FullPath(), file.Name) + glog.V(3).Infof("Forget file %s", t) + file.wfs.fsNodeCache.DeleteFsNode(t) } -func (file *File) addChunk(chunk *filer_pb.FileChunk) { - if chunk != nil { - file.addChunks([]*filer_pb.FileChunk{chunk}) +func (file *File) maybeLoadEntry(ctx context.Context) error { + if file.entry == nil || file.isOpen <= 0 { + entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name) + if err != nil { + glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) + return err + } + if entry != nil { + file.setEntry(entry) + } } + return nil } func (file *File) addChunks(chunks []*filer_pb.FileChunk) { @@ -194,10 +244,36 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { newVisibles = t } + file.reader = nil + + glog.V(3).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks)) + file.entry.Chunks = append(file.entry.Chunks, chunks...) } func (file *File) setEntry(entry *filer_pb.Entry) { file.entry = entry file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks) + file.reader = nil +} + +func (file *File) saveEntry() error { + return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.UpdateEntryRequest{ + Directory: file.dir.FullPath(), + Entry: file.entry, + } + + glog.V(1).Infof("save file entry: %v", request) + _, err := client.UpdateEntry(context.Background(), request) + if err != nil { + glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) + return fuse.EIO + } + + file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) + + return nil + }) } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 0f6ca1164..9b9df916c 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -3,17 +3,17 @@ package filesys import ( "context" "fmt" + "io" + "math" + "net/http" + "os" + "time" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" - "net/http" - "strings" - "sync" - "time" ) type FileHandle struct { @@ -28,15 +28,20 @@ type FileHandle struct { NodeId fuse.NodeID // file or directory the request is about Uid uint32 // user ID of process making request Gid uint32 // group ID of process making request + } func newFileHandle(file *File, uid, gid uint32) *FileHandle { - return &FileHandle{ + fh := &FileHandle{ f: file, dirtyPages: newDirtyPages(file), Uid: uid, Gid: gid, } + if fh.f.entry != nil { + fh.f.entry.Attributes.FileSize = filer2.TotalSize(fh.f.entry.Chunks) + } + return fh } var _ = fs.Handle(&FileHandle{}) @@ -51,115 +56,91 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size)) - // this value should come from the filer instead of the old f - if len(fh.f.entry.Chunks) == 0 { - glog.V(1).Infof("empty fh %v/%v", fh.f.dir.Path, fh.f.Name) - return nil - } - buff := make([]byte, req.Size) - if fh.f.entryViewCache == nil { - fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks) - } - - chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, req.Offset, req.Size) - - var vids []string - for _, chunkView := range chunkViews { - vids = append(vids, volumeId(chunkView.FileId)) - } - - vid2Locations := make(map[string]*filer_pb.Locations) - - err := fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - glog.V(4).Infof("read fh lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ - VolumeIds: vids, - }) - if err != nil { - return err + totalRead, err := fh.readFromChunks(buff, req.Offset) + if err == nil { + dirtyOffset, dirtySize := fh.readFromDirtyPages(buff, req.Offset) + if totalRead+req.Offset < dirtyOffset+int64(dirtySize) { + totalRead = dirtyOffset + int64(dirtySize) - req.Offset } + } - vid2Locations = resp.LocationsMap - - return nil - }) + resp.Data = buff[:totalRead] if err != nil { - glog.V(4).Infof("%v/%v read fh lookup volume ids: %v", fh.f.dir.Path, fh.f.Name, err) - return fmt.Errorf("failed to lookup volume ids %v: %v", vids, err) + glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) + return fuse.EIO } - var totalRead int64 - var wg sync.WaitGroup - for _, chunkView := range chunkViews { - wg.Add(1) - go func(chunkView *filer2.ChunkView) { - defer wg.Done() + return err +} - glog.V(4).Infof("read fh reading chunk: %+v", chunkView) +func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (offset int64, size int) { + return fh.dirtyPages.ReadDirtyData(buff, startOffset) +} - locations := vid2Locations[volumeId(chunkView.FileId)] - if locations == nil || len(locations.Locations) == 0 { - glog.V(0).Infof("failed to locate %s", chunkView.FileId) - err = fmt.Errorf("failed to locate %s", chunkView.FileId) - return - } +func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { - var n int64 - n, err = util.ReadUrl( - fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), - chunkView.Offset, - int(chunkView.Size), - buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)], - !chunkView.IsFullChunk) + // this value should come from the filer instead of the old f + if len(fh.f.entry.Chunks) == 0 { + glog.V(1).Infof("empty fh %v", fh.f.fullpath()) + return 0, nil + } - if err != nil { + if fh.f.entryViewCache == nil { + fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks) + fh.f.reader = nil + } - glog.V(0).Infof("%v/%v read http://%s/%v %v bytes: %v", fh.f.dir.Path, fh.f.Name, locations.Locations[0].Url, chunkView.FileId, n, err) + if fh.f.reader == nil { + chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt32) + fh.f.reader = filer2.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache) + } - err = fmt.Errorf("failed to read http://%s/%s: %v", - locations.Locations[0].Url, chunkView.FileId, err) - return - } + totalRead, err := fh.f.reader.ReadAt(buff, offset) - glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView) - totalRead += n + if err == io.EOF { + err = nil + } - }(chunkView) + if err != nil { + glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) } - wg.Wait() - resp.Data = buff[:totalRead] + // glog.V(0).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err) - return err + return int64(totalRead), err } // Write to the file handle func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { // write the request to volume servers + data := make([]byte, len(req.Data)) + copy(data, req.Data) - glog.V(4).Infof("%+v/%v write fh %d: [%d,%d)", fh.f.dir.Path, fh.f.Name, fh.handle, req.Offset, req.Offset+int64(len(req.Data))) + fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize))) + // glog.V(0).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data))) - chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data) + chunks, err := fh.dirtyPages.AddPage(req.Offset, data) if err != nil { - glog.Errorf("%+v/%v write fh %d: [%d,%d): %v", fh.f.dir.Path, fh.f.Name, fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err) - return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err) + glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(data)), err) + return fuse.EIO } - resp.Size = len(req.Data) + resp.Size = len(data) if req.Offset == 0 { - fh.contentType = http.DetectContentType(req.Data) + // detect mime type + fh.contentType = http.DetectContentType(data) fh.dirtyMetadata = true } - fh.f.addChunks(chunks) - if len(chunks) > 0 { + + fh.f.addChunks(chunks) + fh.dirtyMetadata = true } @@ -170,11 +151,14 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err glog.V(4).Infof("%v release fh %d", fh.f.fullpath(), fh.handle) - fh.dirtyPages.releaseResource() - - fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) + fh.f.isOpen-- - fh.f.isOpen = false + if fh.f.isOpen <= 0 { + fh.dirtyPages.releaseResource() + fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) + } + fh.f.entryViewCache = nil + fh.f.reader = nil return nil } @@ -184,19 +168,22 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { // send the data to the OS glog.V(4).Infof("%s fh %d flush %v", fh.f.fullpath(), fh.handle, req) - chunk, err := fh.dirtyPages.FlushToStorage(ctx) + chunks, err := fh.dirtyPages.FlushToStorage() if err != nil { - glog.Errorf("flush %s/%s: %v", fh.f.dir.Path, fh.f.Name, err) - return fmt.Errorf("flush %s/%s: %v", fh.f.dir.Path, fh.f.Name, err) + glog.Errorf("flush %s: %v", fh.f.fullpath(), err) + return fuse.EIO } - fh.f.addChunk(chunk) + if len(chunks) > 0 { + fh.f.addChunks(chunks) + fh.dirtyMetadata = true + } if !fh.dirtyMetadata { return nil } - return fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { if fh.f.entry.Attributes != nil { fh.f.entry.Attributes.Mime = fh.contentType @@ -204,78 +191,48 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { fh.f.entry.Attributes.Gid = req.Gid fh.f.entry.Attributes.Mtime = time.Now().Unix() fh.f.entry.Attributes.Crtime = time.Now().Unix() - fh.f.entry.Attributes.FileMode = uint32(0770) + fh.f.entry.Attributes.FileMode = uint32(os.FileMode(fh.f.entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask) + fh.f.entry.Attributes.Collection = fh.dirtyPages.collection + fh.f.entry.Attributes.Replication = fh.dirtyPages.replication } request := &filer_pb.CreateEntryRequest{ - Directory: fh.f.dir.Path, + Directory: fh.f.dir.FullPath(), Entry: fh.f.entry, } - //glog.V(1).Infof("%s/%s set chunks: %v", fh.f.dir.Path, fh.f.Name, len(fh.f.entry.Chunks)) - //for i, chunk := range fh.f.entry.Chunks { - // glog.V(4).Infof("%s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) - //} + glog.V(3).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks)) + for i, chunk := range fh.f.entry.Chunks { + glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) + } chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks) fh.f.entry.Chunks = chunks // fh.f.entryViewCache = nil - fh.f.wfs.deleteFileChunks(garbages) - if _, err := client.CreateEntry(ctx, request); err != nil { - return fmt.Errorf("update fh: %v", err) + if err := filer_pb.CreateEntry(client, request); err != nil { + glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) + return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) } - return nil - }) -} - -func deleteFileIds(ctx context.Context, client filer_pb.SeaweedFilerClient, fileIds []string) error { - - var vids []string - for _, fileId := range fileIds { - vids = append(vids, volumeId(fileId)) - } - - lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { + fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - m := make(map[string]operation.LookupResult) - - glog.V(4).Infof("remove file lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ - VolumeIds: vids, - }) - if err != nil { - return m, err + fh.f.wfs.deleteFileChunks(garbages) + for i, chunk := range garbages { + glog.V(3).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) } - for _, vid := range vids { - lr := operation.LookupResult{ - VolumeId: vid, - Locations: nil, - } - locations := resp.LocationsMap[vid] - for _, loc := range locations.Locations { - lr.Locations = append(lr.Locations, operation.Location{ - Url: loc.Url, - PublicUrl: loc.PublicUrl, - }) - } - m[vid] = lr - } + return nil + }) - return m, err + if err == nil { + fh.dirtyMetadata = false } - _, err := operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc) - - return err -} - -func volumeId(fileId string) string { - lastCommaIndex := strings.LastIndex(fileId, ",") - if lastCommaIndex > 0 { - return fileId[:lastCommaIndex] + if err != nil { + glog.Errorf("%v fh %d flush: %v", fh.f.fullpath(), fh.handle, err) + return fuse.EIO } - return fileId + + return nil } diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go new file mode 100644 index 000000000..b146f0615 --- /dev/null +++ b/weed/filesys/fscache.go @@ -0,0 +1,207 @@ +package filesys + +import ( + "sync" + + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/fuse/fs" +) + +type FsCache struct { + root *FsNode + sync.RWMutex +} +type FsNode struct { + parent *FsNode + node fs.Node + name string + childrenLock sync.RWMutex + children map[string]*FsNode +} + +func newFsCache(root fs.Node) *FsCache { + return &FsCache{ + root: &FsNode{ + node: root, + }, + } +} + +func (c *FsCache) GetFsNode(path util.FullPath) fs.Node { + + c.RLock() + defer c.RUnlock() + + return c.doGetFsNode(path) +} + +func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node { + t := c.root + for _, p := range path.Split() { + t = t.findChild(p) + if t == nil { + return nil + } + } + return t.node +} + +func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) { + + c.Lock() + defer c.Unlock() + + c.doSetFsNode(path, node) +} + +func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) { + t := c.root + for _, p := range path.Split() { + t = t.ensureChild(p) + } + t.node = node +} + +func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node { + + c.Lock() + defer c.Unlock() + + t := c.doGetFsNode(path) + if t != nil { + return t + } + t = genNodeFn() + c.doSetFsNode(path, t) + return t +} + +func (c *FsCache) DeleteFsNode(path util.FullPath) { + + c.Lock() + defer c.Unlock() + + t := c.root + for _, p := range path.Split() { + t = t.findChild(p) + if t == nil { + return + } + } + if t.parent != nil { + t.parent.disconnectChild(t) + } + t.deleteSelf() +} + +// oldPath and newPath are full path including the new name +func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { + + c.Lock() + defer c.Unlock() + + // find old node + src := c.root + for _, p := range oldPath.Split() { + src = src.findChild(p) + if src == nil { + return src + } + } + if src.parent != nil { + src.parent.disconnectChild(src) + } + + // find new node + target := c.root + for _, p := range newPath.Split() { + target = target.ensureChild(p) + } + parent := target.parent + src.name = target.name + if dir, ok := src.node.(*Dir); ok { + dir.name = target.name // target is not Dir, but a shortcut + } + if f, ok := src.node.(*File); ok { + f.Name = target.name + if f.entry != nil { + f.entry.Name = f.Name + } + } + parent.disconnectChild(target) + + target.deleteSelf() + + src.connectToParent(parent) + + return src +} + +func (n *FsNode) connectToParent(parent *FsNode) { + n.parent = parent + oldNode := parent.findChild(n.name) + if oldNode != nil { + oldNode.deleteSelf() + } + if dir, ok := n.node.(*Dir); ok { + dir.parent = parent.node.(*Dir) + } + if f, ok := n.node.(*File); ok { + f.dir = parent.node.(*Dir) + } + n.childrenLock.Lock() + parent.children[n.name] = n + n.childrenLock.Unlock() +} + +func (n *FsNode) findChild(name string) *FsNode { + n.childrenLock.RLock() + defer n.childrenLock.RUnlock() + + child, found := n.children[name] + if found { + return child + } + return nil +} + +func (n *FsNode) ensureChild(name string) *FsNode { + n.childrenLock.Lock() + defer n.childrenLock.Unlock() + + if n.children == nil { + n.children = make(map[string]*FsNode) + } + child, found := n.children[name] + if found { + return child + } + t := &FsNode{ + parent: n, + node: nil, + name: name, + children: nil, + } + n.children[name] = t + return t +} + +func (n *FsNode) disconnectChild(child *FsNode) { + n.childrenLock.Lock() + delete(n.children, child.name) + n.childrenLock.Unlock() + child.parent = nil +} + +func (n *FsNode) deleteSelf() { + n.childrenLock.Lock() + for _, child := range n.children { + child.deleteSelf() + } + n.children = nil + n.childrenLock.Unlock() + + n.node = nil + n.parent = nil + +} diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go new file mode 100644 index 000000000..67f9aacc8 --- /dev/null +++ b/weed/filesys/fscache_test.go @@ -0,0 +1,96 @@ +package filesys + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/util" +) + +func TestPathSplit(t *testing.T) { + parts := util.FullPath("/").Split() + if len(parts) != 0 { + t.Errorf("expecting an empty list, but getting %d", len(parts)) + } + + parts = util.FullPath("/readme.md").Split() + if len(parts) != 1 { + t.Errorf("expecting an empty list, but getting %d", len(parts)) + } + +} + +func TestFsCache(t *testing.T) { + + cache := newFsCache(nil) + + x := cache.GetFsNode(util.FullPath("/y/x")) + if x != nil { + t.Errorf("wrong node!") + } + + p := util.FullPath("/a/b/c") + cache.SetFsNode(p, &File{Name: "cc"}) + tNode := cache.GetFsNode(p) + tFile := tNode.(*File) + if tFile.Name != "cc" { + t.Errorf("expecting a FsNode") + } + + cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) + cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) + cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"}) + cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) + cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) + + b := cache.GetFsNode(util.FullPath("/a/b")) + if b != nil { + t.Errorf("unexpected node!") + } + + a := cache.GetFsNode(util.FullPath("/a")) + if a == nil { + t.Errorf("missing node!") + } + + cache.DeleteFsNode(util.FullPath("/a")) + if b != nil { + t.Errorf("unexpected node!") + } + + a = cache.GetFsNode(util.FullPath("/a")) + if a != nil { + t.Errorf("wrong DeleteFsNode!") + } + + z := cache.GetFsNode(util.FullPath("/z")) + if z == nil { + t.Errorf("missing node!") + } + + y := cache.GetFsNode(util.FullPath("/x/y")) + if y != nil { + t.Errorf("wrong node!") + } + +} + +func TestFsCacheMove(t *testing.T) { + + cache := newFsCache(nil) + + cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) + cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) + cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"}) + cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"}) + + cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x")) + + d := cache.GetFsNode(util.FullPath("/z/x/d")) + if d == nil { + t.Errorf("unexpected nil node!") + } + if d.(*File).Name != "dd" { + t.Errorf("unexpected non dd node!") + } + +} diff --git a/weed/filesys/meta_cache/cache_config.go b/weed/filesys/meta_cache/cache_config.go new file mode 100644 index 000000000..e6593ebde --- /dev/null +++ b/weed/filesys/meta_cache/cache_config.go @@ -0,0 +1,32 @@ +package meta_cache + +import "github.com/chrislusf/seaweedfs/weed/util" + +var ( + _ = util.Configuration(&cacheConfig{}) +) + +// implementing util.Configuraion +type cacheConfig struct { + dir string +} + +func (c cacheConfig) GetString(key string) string { + return c.dir +} + +func (c cacheConfig) GetBool(key string) bool { + panic("implement me") +} + +func (c cacheConfig) GetInt(key string) int { + panic("implement me") +} + +func (c cacheConfig) GetStringSlice(key string) []string { + panic("implement me") +} + +func (c cacheConfig) SetDefault(key string, value interface{}) { + panic("implement me") +} diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go new file mode 100644 index 000000000..3b04040a5 --- /dev/null +++ b/weed/filesys/meta_cache/meta_cache.go @@ -0,0 +1,106 @@ +package meta_cache + +import ( + "context" + "os" + "sync" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/bounded_tree" +) + +type MetaCache struct { + actualStore filer2.FilerStore + sync.RWMutex + visitedBoundary *bounded_tree.BoundedTree +} + +func NewMetaCache(dbFolder string) *MetaCache { + return &MetaCache{ + actualStore: openMetaStore(dbFolder), + visitedBoundary: bounded_tree.NewBoundedTree(), + } +} + +func openMetaStore(dbFolder string) filer2.FilerStore { + + os.RemoveAll(dbFolder) + os.MkdirAll(dbFolder, 0755) + + store := &leveldb.LevelDBStore{} + config := &cacheConfig{ + dir: dbFolder, + } + + if err := store.Initialize(config, ""); err != nil { + glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err) + } + + return store + +} + +func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error { + mc.Lock() + defer mc.Unlock() + return mc.actualStore.InsertEntry(ctx, entry) +} + +func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPath, newEntry *filer2.Entry) error { + mc.Lock() + defer mc.Unlock() + + oldDir, _ := oldPath.DirAndName() + if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) { + if oldPath != "" { + if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil { + return err + } + } + }else{ + // println("unknown old directory:", oldDir) + } + + if newEntry != nil { + newDir, _ := newEntry.DirAndName() + if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) { + if err := mc.actualStore.InsertEntry(ctx, newEntry); err != nil { + return err + } + } + } + return nil +} + +func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer2.Entry) error { + mc.Lock() + defer mc.Unlock() + return mc.actualStore.UpdateEntry(ctx, entry) +} + +func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer2.Entry, err error) { + mc.RLock() + defer mc.RUnlock() + return mc.actualStore.FindEntry(ctx, fp) +} + +func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { + mc.Lock() + defer mc.Unlock() + return mc.actualStore.DeleteEntry(ctx, fp) +} + +func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer2.Entry, error) { + mc.RLock() + defer mc.RUnlock() + return mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) +} + +func (mc *MetaCache) Shutdown() { + mc.Lock() + defer mc.Unlock() + mc.actualStore.Shutdown() +} diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go new file mode 100644 index 000000000..1fbc3e532 --- /dev/null +++ b/weed/filesys/meta_cache/meta_cache_init.go @@ -0,0 +1,47 @@ +package meta_cache + +import ( + "context" + "fmt" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient, path string) error { + return nil + glog.V(0).Infof("synchronizing meta data ...") + filer_pb.TraverseBfs(client, util.FullPath(path), func(parentPath util.FullPath, pbEntry *filer_pb.Entry) { + entry := filer2.FromPbEntry(string(parentPath), pbEntry) + if err := mc.InsertEntry(context.Background(), entry); err != nil { + glog.V(0).Infof("read %s: %v", entry.FullPath, err) + } + }) + return nil +} + +func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) { + + mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) { + + glog.V(2).Infof("ReadDirAllEntries %s ...", path) + + err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error { + entry := filer2.FromPbEntry(string(dirPath), pbEntry) + if err := mc.InsertEntry(context.Background(), entry); err != nil { + glog.V(0).Infof("read %s: %v", entry.FullPath, err) + return err + } + if entry.IsDirectory() { + childDirectories = append(childDirectories, entry.Name()) + } + return nil + }) + if err != nil { + err = fmt.Errorf("list %s: %v", dirPath, err) + } + return + }) +} diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go new file mode 100644 index 000000000..2e411a48a --- /dev/null +++ b/weed/filesys/meta_cache/meta_cache_subscribe.go @@ -0,0 +1,69 @@ +package meta_cache + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string, lastTsNs int64) error { + + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + var oldPath util.FullPath + var newEntry *filer2.Entry + if message.OldEntry != nil { + oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name) + glog.V(4).Infof("deleting %v", oldPath) + } + + if message.NewEntry != nil { + dir := resp.Directory + if message.NewParentPath != "" { + dir = message.NewParentPath + } + key := util.NewFullPath(dir, message.NewEntry.Name) + glog.V(4).Infof("creating %v", key) + newEntry = filer2.FromPbEntry(dir, message.NewEntry) + } + return mc.AtomicUpdateEntry(context.Background(), oldPath, newEntry) + } + + for { + err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ + ClientName: "mount", + PathPrefix: dir, + SinceNs: lastTsNs, + }) + if err != nil { + return fmt.Errorf("subscribe: %v", err) + } + + for { + resp, listenErr := stream.Recv() + if listenErr == io.EOF { + return nil + } + if listenErr != nil { + return listenErr + } + + if err := processEventFn(resp); err != nil { + return fmt.Errorf("process %v: %v", resp, err) + } + lastTsNs = resp.TsNs + } + }) + if err != nil { + glog.V(0).Infof("subscribing filer meta change: %v", err) + time.Sleep(time.Second) + } + } +} diff --git a/weed/filesys/unimplemented.go b/weed/filesys/unimplemented.go new file mode 100644 index 000000000..1f4fe554d --- /dev/null +++ b/weed/filesys/unimplemented.go @@ -0,0 +1,20 @@ +package filesys + +import ( + "context" + + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" +) + +// https://github.com/bazil/fuse/issues/130 + +var _ = fs.NodeAccesser(&Dir{}) +func (dir *Dir) Access(ctx context.Context, req *fuse.AccessRequest) error { + return fuse.ENOSYS +} + +var _ = fs.NodeAccesser(&File{}) +func (file *File) Access(ctx context.Context, req *fuse.AccessRequest) error { + return fuse.ENOSYS +} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 969514a06..8dffa6555 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -5,32 +5,48 @@ import ( "fmt" "math" "os" + "path" "sync" "time" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/util/grace" + + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/karlseguin/ccache" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" - "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" ) type Option struct { FilerGrpcAddress string + GrpcDialOption grpc.DialOption FilerMountRootPath string Collection string Replication string TtlSec int32 ChunkSizeLimit int64 + CacheDir string + CacheSizeMB int64 DataCenter string - DirListingLimit int + DirListCacheLimit int64 EntryCacheTtl time.Duration + Umask os.FileMode + + MountUid uint32 + MountGid uint32 + MountMode os.FileMode + MountCtime time.Time + MountMtime time.Time + + OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers + Cipher bool // whether encrypt data on volume server - MountUid uint32 - MountGid uint32 - MountMode os.FileMode } var _ = fs.FS(&WFS{}) @@ -38,17 +54,20 @@ var _ = fs.FSStatfser(&WFS{}) type WFS struct { option *Option - listDirectoryEntriesCache *ccache.Cache - // contains all open handles - handles []*FileHandle - pathToHandleIndex map[string]int - pathToHandleLock sync.Mutex - bufPool sync.Pool + // contains all open handles, protected by handlesLock + handlesLock sync.Mutex + handles map[uint64]*FileHandle - fileIdsDeletionChan chan []string + bufPool sync.Pool stats statsCache + + root fs.Node + fsNodeCache *FsCache + + chunkCache *chunk_cache.ChunkCache + metaCache *meta_cache.MetaCache } type statsCache struct { filer_pb.StatisticsResponse @@ -58,74 +77,73 @@ type statsCache struct { func NewSeaweedFileSystem(option *Option) *WFS { wfs := &WFS{ option: option, - listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(1024 * 8).ItemsToPrune(100)), - pathToHandleIndex: make(map[string]int), + handles: make(map[uint64]*FileHandle), bufPool: sync.Pool{ New: func() interface{} { return make([]byte, option.ChunkSizeLimit) }, }, - fileIdsDeletionChan: make(chan []string, 32), + } + cacheUniqueId := util.Md5([]byte(option.FilerGrpcAddress))[0:4] + cacheDir := path.Join(option.CacheDir, cacheUniqueId) + if option.CacheSizeMB > 0 { + os.MkdirAll(cacheDir, 0755) + wfs.chunkCache = chunk_cache.NewChunkCache(256, cacheDir, option.CacheSizeMB) + grace.OnInterrupt(func() { + wfs.chunkCache.Shutdown() + }) + } + + wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta")) + startTime := time.Now() + if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil { + glog.V(0).Infof("failed to init meta cache: %v", err) + } else { + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) + grace.OnInterrupt(func() { + wfs.metaCache.Shutdown() + }) } - go wfs.loopProcessingDeletion() + wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} + wfs.fsNodeCache = newFsCache(wfs.root) return wfs } func (wfs *WFS) Root() (fs.Node, error) { - return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil -} - -func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) - }, wfs.option.FilerGrpcAddress) - + return wfs.root, nil } func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) { - wfs.pathToHandleLock.Lock() - defer wfs.pathToHandleLock.Unlock() fullpath := file.fullpath() + glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid) - index, found := wfs.pathToHandleIndex[fullpath] - if found && wfs.handles[index] != nil { - glog.V(2).Infoln(fullpath, "found fileHandle id", index) - return wfs.handles[index] - } + wfs.handlesLock.Lock() + defer wfs.handlesLock.Unlock() - fileHandle = newFileHandle(file, uid, gid) - for i, h := range wfs.handles { - if h == nil { - wfs.handles[i] = fileHandle - fileHandle.handle = uint64(i) - wfs.pathToHandleIndex[fullpath] = i - glog.V(4).Infoln(fullpath, "reuse fileHandle id", fileHandle.handle) - return - } + inodeId := file.fullpath().AsInode() + existingHandle, found := wfs.handles[inodeId] + if found && existingHandle != nil { + return existingHandle } - wfs.handles = append(wfs.handles, fileHandle) - fileHandle.handle = uint64(len(wfs.handles) - 1) - glog.V(2).Infoln(fullpath, "new fileHandle id", fileHandle.handle) - wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle) + fileHandle = newFileHandle(file, uid, gid) + wfs.handles[inodeId] = fileHandle + fileHandle.handle = inodeId + glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle) return } -func (wfs *WFS) ReleaseHandle(fullpath string, handleId fuse.HandleID) { - wfs.pathToHandleLock.Lock() - defer wfs.pathToHandleLock.Unlock() +func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) { + wfs.handlesLock.Lock() + defer wfs.handlesLock.Unlock() - glog.V(4).Infof("%s releasing handle id %d current handles length %d", fullpath, handleId, len(wfs.handles)) - delete(wfs.pathToHandleIndex, fullpath) - if int(handleId) < len(wfs.handles) { - wfs.handles[int(handleId)] = nil - } + glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles)) + + delete(wfs.handles, fullpath.AsInode()) return } @@ -137,7 +155,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. if wfs.stats.lastChecked < time.Now().Unix()-20 { - err := wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.StatisticsRequest{ Collection: wfs.option.Collection, @@ -146,7 +164,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. } glog.V(4).Infof("reading filer stats: %+v", request) - resp, err := client.Statistics(ctx, request) + resp, err := client.Statistics(context.Background(), request) if err != nil { glog.V(0).Infof("reading filer stats %v: %v", request, err) return err diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index f58ef24f4..bf21b1808 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -2,39 +2,15 @@ package filesys import ( "context" - "time" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) -func (wfs *WFS) loopProcessingDeletion() { - - ticker := time.NewTicker(2 * time.Second) - - wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - var fileIds []string - for { - select { - case fids := <-wfs.fileIdsDeletionChan: - fileIds = append(fileIds, fids...) - if len(fileIds) >= 1024 { - glog.V(1).Infof("deleting fileIds len=%d", len(fileIds)) - deleteFileIds(context.Background(), client, fileIds) - fileIds = fileIds[:0] - } - case <-ticker.C: - if len(fileIds) > 0 { - glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds)) - deleteFileIds(context.Background(), client, fileIds) - fileIds = fileIds[:0] - } - } - } - }) - -} - func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { if len(chunks) == 0 { return @@ -42,17 +18,56 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { var fileIds []string for _, chunk := range chunks { - fileIds = append(fileIds, chunk.FileId) + fileIds = append(fileIds, chunk.GetFileIdString()) } - var async = false - if async { - wfs.fileIdsDeletionChan <- fileIds - return - } - - wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - deleteFileIds(context.Background(), client, fileIds) + wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + wfs.deleteFileIds(wfs.option.GrpcDialOption, client, fileIds) return nil }) } + +func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error { + + var vids []string + for _, fileId := range fileIds { + vids = append(vids, filer2.VolumeId(fileId)) + } + + lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { + + m := make(map[string]operation.LookupResult) + + glog.V(4).Infof("remove file lookup volume id locations: %v", vids) + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: vids, + }) + if err != nil { + return m, err + } + + for _, vid := range vids { + lr := operation.LookupResult{ + VolumeId: vid, + Locations: nil, + } + locations, found := resp.LocationsMap[vid] + if !found { + continue + } + for _, loc := range locations.Locations { + lr.Locations = append(lr.Locations, operation.Location{ + Url: wfs.AdjustedUrl(loc.Url), + PublicUrl: loc.PublicUrl, + }) + } + m[vid] = lr + } + + return m, err + } + + _, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) + + return err +} diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go new file mode 100644 index 000000000..736df3588 --- /dev/null +++ b/weed/filesys/wfs_filer_client.go @@ -0,0 +1,40 @@ +package filesys + +import ( + "fmt" + "strings" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +var _ = filer_pb.FilerClient(&WFS{}) + +func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) + + if err == nil { + return nil + } + return err + +} + +func (wfs *WFS) AdjustedUrl(hostAndPort string) string { + if !wfs.option.OutsideContainerClusterMode { + return hostAndPort + } + commaIndex := strings.Index(hostAndPort, ":") + if commaIndex < 0 { + return hostAndPort + } + filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":") + return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:]) + +} diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go new file mode 100644 index 000000000..091a70fa3 --- /dev/null +++ b/weed/filesys/xattr.go @@ -0,0 +1,123 @@ +package filesys + +import ( + "context" + + "github.com/seaweedfs/fuse" + + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error { + + if entry == nil { + return fuse.ErrNoXattr + } + if entry.Extended == nil { + return fuse.ErrNoXattr + } + data, found := entry.Extended[req.Name] + if !found { + return fuse.ErrNoXattr + } + if req.Position < uint32(len(data)) { + size := req.Size + if req.Position+size >= uint32(len(data)) { + size = uint32(len(data)) - req.Position + } + if size == 0 { + resp.Xattr = data[req.Position:] + } else { + resp.Xattr = data[req.Position : req.Position+size] + } + } + + return nil + +} + +func setxattr(entry *filer_pb.Entry, req *fuse.SetxattrRequest) error { + + if entry == nil { + return fuse.EIO + } + + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + data, _ := entry.Extended[req.Name] + + newData := make([]byte, int(req.Position)+len(req.Xattr)) + + copy(newData, data) + + copy(newData[int(req.Position):], req.Xattr) + + entry.Extended[req.Name] = newData + + return nil + +} + +func removexattr(entry *filer_pb.Entry, req *fuse.RemovexattrRequest) error { + + if entry == nil { + return fuse.ErrNoXattr + } + + if entry.Extended == nil { + return fuse.ErrNoXattr + } + + _, found := entry.Extended[req.Name] + + if !found { + return fuse.ErrNoXattr + } + + delete(entry.Extended, req.Name) + + return nil + +} + +func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error { + + if entry == nil { + return fuse.EIO + } + + for k := range entry.Extended { + resp.Append(k) + } + + size := req.Size + if req.Position+size >= uint32(len(resp.Xattr)) { + size = uint32(len(resp.Xattr)) - req.Position + } + + if size == 0 { + resp.Xattr = resp.Xattr[req.Position:] + } else { + resp.Xattr = resp.Xattr[req.Position : req.Position+size] + } + + return nil + +} + +func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) { + + fullpath := util.NewFullPath(dir, name) + // glog.V(3).Infof("read entry cache miss %s", fullpath) + + // read from async meta cache + meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir)) + cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) + if cacheErr == filer_pb.ErrNotFound { + return nil, fuse.ENOENT + } + return cachedEntry.ToProtoEntry(), nil +} |
