aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir.go618
-rw-r--r--weed/filesys/dir_link.go160
-rw-r--r--weed/filesys/dir_rename.go174
-rw-r--r--weed/filesys/dirty_page_interval.go222
-rw-r--r--weed/filesys/dirty_page_interval_test.go113
-rw-r--r--weed/filesys/dirty_pages.go10
-rw-r--r--weed/filesys/dirty_pages_continuous.go160
-rw-r--r--weed/filesys/dirty_pages_temp_file.go157
-rw-r--r--weed/filesys/dirty_pages_temp_interval.go289
-rw-r--r--weed/filesys/file.go389
-rw-r--r--weed/filesys/filehandle.go336
-rw-r--r--weed/filesys/fscache.go213
-rw-r--r--weed/filesys/fscache_test.go115
-rw-r--r--weed/filesys/meta_cache/cache_config.go32
-rw-r--r--weed/filesys/meta_cache/id_mapper.go101
-rw-r--r--weed/filesys/meta_cache/meta_cache.go146
-rw-r--r--weed/filesys/meta_cache/meta_cache_init.go47
-rw-r--r--weed/filesys/meta_cache/meta_cache_subscribe.go68
-rw-r--r--weed/filesys/unimplemented.go22
-rw-r--r--weed/filesys/wfs.go303
-rw-r--r--weed/filesys/wfs_filer_client.go51
-rw-r--r--weed/filesys/wfs_write.go78
-rw-r--r--weed/filesys/xattr.go138
23 files changed, 0 insertions, 3942 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
deleted file mode 100644
index 9a791e013..000000000
--- a/weed/filesys/dir.go
+++ /dev/null
@@ -1,618 +0,0 @@
-package filesys
-
-import (
- "bytes"
- "context"
- "math"
- "os"
- "strings"
- "syscall"
- "time"
-
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type Dir struct {
- name string
- wfs *WFS
- entry *filer_pb.Entry
- parent *Dir
- id uint64
-}
-
-var _ = fs.Node(&Dir{})
-
-var _ = fs.NodeIdentifier(&Dir{})
-var _ = fs.NodeCreater(&Dir{})
-var _ = fs.NodeMknoder(&Dir{})
-var _ = fs.NodeMkdirer(&Dir{})
-var _ = fs.NodeFsyncer(&Dir{})
-var _ = fs.NodeRequestLookuper(&Dir{})
-var _ = fs.HandleReadDirAller(&Dir{})
-var _ = fs.NodeRemover(&Dir{})
-var _ = fs.NodeRenamer(&Dir{})
-var _ = fs.NodeSetattrer(&Dir{})
-var _ = fs.NodeGetxattrer(&Dir{})
-var _ = fs.NodeSetxattrer(&Dir{})
-var _ = fs.NodeRemovexattrer(&Dir{})
-var _ = fs.NodeListxattrer(&Dir{})
-var _ = fs.NodeForgetter(&Dir{})
-
-func (dir *Dir) Id() uint64 {
- if dir.parent == nil {
- return 1
- }
- return dir.id
-}
-
-func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
-
- entry, err := dir.maybeLoadEntry()
- if err != nil {
- glog.V(3).Infof("dir Attr %s, err: %+v", dir.FullPath(), err)
- return err
- }
-
- // https://github.com/bazil/fuse/issues/196
- attr.Valid = time.Second
- attr.Inode = dir.Id()
- attr.Mode = os.FileMode(entry.Attributes.FileMode) | os.ModeDir
- attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
- attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
- attr.Ctime = time.Unix(entry.Attributes.Crtime, 0)
- attr.Atime = time.Unix(entry.Attributes.Mtime, 0)
- attr.Gid = entry.Attributes.Gid
- attr.Uid = entry.Attributes.Uid
-
- if dir.FullPath() == dir.wfs.option.FilerMountRootPath {
- attr.BlockSize = blockSize
- }
-
- glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr)
-
- return nil
-}
-
-func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
-
- glog.V(4).Infof("dir Getxattr %s", dir.FullPath())
-
- entry, err := dir.maybeLoadEntry()
- if err != nil {
- return err
- }
-
- return getxattr(entry, req, resp)
-}
-
-func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
- // fsync works at OS level
- // write the file chunks to the filerGrpcAddress
- glog.V(3).Infof("dir %s fsync %+v", dir.FullPath(), req)
-
- return nil
-}
-
-func (dir *Dir) newFile(name string) fs.Node {
-
- fileFullPath := util.NewFullPath(dir.FullPath(), name)
- fileId := fileFullPath.AsInode()
- dir.wfs.handlesLock.Lock()
- existingHandle, found := dir.wfs.handles[fileId]
- dir.wfs.handlesLock.Unlock()
-
- if found {
- glog.V(4).Infof("newFile found opened file handle: %+v", fileFullPath)
- return existingHandle.f
- }
- return &File{
- Name: name,
- dir: dir,
- wfs: dir.wfs,
- id: fileId,
- }
-}
-
-func (dir *Dir) newDirectory(fullpath util.FullPath) fs.Node {
-
- return &Dir{name: fullpath.Name(), wfs: dir.wfs, parent: dir, id: fullpath.AsInode()}
-
-}
-
-func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
- resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
-
- exclusive := req.Flags&fuse.OpenExclusive != 0
- isDirectory := req.Mode&os.ModeDir > 0
-
- if exclusive || isDirectory {
- _, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, exclusive)
- if err != nil {
- return nil, nil, err
- }
- }
- var node fs.Node
- if isDirectory {
- node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name))
- return node, node, nil
- }
-
- node = dir.newFile(req.Name)
- file := node.(*File)
- file.entry = &filer_pb.Entry{
- Name: req.Name,
- IsDirectory: req.Mode&os.ModeDir > 0,
- Attributes: &filer_pb.FuseAttributes{
- Mtime: time.Now().Unix(),
- Crtime: time.Now().Unix(),
- FileMode: uint32(req.Mode &^ dir.wfs.option.Umask),
- Uid: req.Uid,
- Gid: req.Gid,
- Collection: dir.wfs.option.Collection,
- Replication: dir.wfs.option.Replication,
- TtlSec: dir.wfs.option.TtlSec,
- },
- }
- file.dirtyMetadata = true
- fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid, req.Flags&fuse.OpenWriteOnly > 0)
- return file, fh, nil
-
-}
-
-func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) {
-
- _, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, false)
-
- if err != nil {
- return nil, err
- }
- var node fs.Node
- node = dir.newFile(req.Name)
- return node, nil
-}
-
-func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, exclusive bool) (*filer_pb.CreateEntryRequest, error) {
- dirFullPath := dir.FullPath()
- request := &filer_pb.CreateEntryRequest{
- Directory: dirFullPath,
- Entry: &filer_pb.Entry{
- Name: name,
- IsDirectory: mode&os.ModeDir > 0,
- Attributes: &filer_pb.FuseAttributes{
- Mtime: time.Now().Unix(),
- Crtime: time.Now().Unix(),
- FileMode: uint32(mode &^ dir.wfs.option.Umask),
- Uid: uid,
- Gid: gid,
- Collection: dir.wfs.option.Collection,
- Replication: dir.wfs.option.Replication,
- TtlSec: dir.wfs.option.TtlSec,
- },
- },
- OExcl: exclusive,
- Signatures: []int32{dir.wfs.signature},
- }
- glog.V(1).Infof("create %s/%s", dirFullPath, name)
-
- err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
- defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
-
- if err := filer_pb.CreateEntry(client, request); err != nil {
- if strings.Contains(err.Error(), "EEXIST") {
- return fuse.EEXIST
- }
- glog.V(0).Infof("create %s/%s: %v", dirFullPath, name, err)
- return fuse.EIO
- }
-
- if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
- glog.Errorf("local InsertEntry dir %s/%s: %v", dirFullPath, name, err)
- return fuse.EIO
- }
-
- return nil
- })
- return request, err
-}
-
-func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
-
- glog.V(4).Infof("mkdir %s: %s", dir.FullPath(), req.Name)
-
- newEntry := &filer_pb.Entry{
- Name: req.Name,
- IsDirectory: true,
- Attributes: &filer_pb.FuseAttributes{
- Mtime: time.Now().Unix(),
- Crtime: time.Now().Unix(),
- FileMode: uint32(req.Mode &^ dir.wfs.option.Umask),
- Uid: req.Uid,
- Gid: req.Gid,
- },
- }
-
- dirFullPath := dir.FullPath()
-
- err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- dir.wfs.mapPbIdFromLocalToFiler(newEntry)
- defer dir.wfs.mapPbIdFromFilerToLocal(newEntry)
-
- request := &filer_pb.CreateEntryRequest{
- Directory: dirFullPath,
- Entry: newEntry,
- Signatures: []int32{dir.wfs.signature},
- }
-
- glog.V(1).Infof("mkdir: %v", request)
- if err := filer_pb.CreateEntry(client, request); err != nil {
- glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err)
- return err
- }
-
- if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
- glog.Errorf("local mkdir dir %s/%s: %v", dirFullPath, req.Name, err)
- return fuse.EIO
- }
-
- return nil
- })
-
- if err == nil {
- node := dir.newDirectory(util.NewFullPath(dirFullPath, req.Name))
-
- return node, nil
- }
-
- glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err)
-
- return nil, fuse.EIO
-}
-
-func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
-
- dirPath := util.FullPath(dir.FullPath())
- // glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String())
-
- fullFilePath := dirPath.Child(req.Name)
- visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
- if visitErr != nil {
- glog.Errorf("dir Lookup %s: %v", dirPath, visitErr)
- return nil, fuse.EIO
- }
- localEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
- if cacheErr == filer_pb.ErrNotFound {
- return nil, fuse.ENOENT
- }
-
- if localEntry == nil {
- // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
- entry, err := filer_pb.GetEntry(dir.wfs, fullFilePath)
- if err != nil {
- glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err)
- return nil, fuse.ENOENT
- }
- localEntry = filer.FromPbEntry(string(dirPath), entry)
- } else {
- glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath)
- }
-
- if localEntry != nil {
- if localEntry.IsDirectory() {
- node = dir.newDirectory(fullFilePath)
- } else {
- node = dir.newFile(req.Name)
- }
-
- // resp.EntryValid = time.Second
- resp.Attr.Inode = fullFilePath.AsInode()
- resp.Attr.Valid = time.Second
- resp.Attr.Size = localEntry.FileSize
- resp.Attr.Mtime = localEntry.Attr.Mtime
- resp.Attr.Crtime = localEntry.Attr.Crtime
- resp.Attr.Mode = localEntry.Attr.Mode
- resp.Attr.Gid = localEntry.Attr.Gid
- resp.Attr.Uid = localEntry.Attr.Uid
- if localEntry.HardLinkCounter > 0 {
- resp.Attr.Nlink = uint32(localEntry.HardLinkCounter)
- }
-
- return node, nil
- }
-
- glog.V(4).Infof("not found dir GetEntry %s: %v", fullFilePath, err)
- return nil, fuse.ENOENT
-}
-
-func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
-
- dirPath := util.FullPath(dir.FullPath())
- glog.V(4).Infof("dir ReadDirAll %s", dirPath)
-
- processEachEntryFn := func(entry *filer.Entry, isLast bool) {
- if entry.IsDirectory() {
- dirent := fuse.Dirent{Name: entry.Name(), Type: fuse.DT_Dir, Inode: dirPath.Child(entry.Name()).AsInode()}
- ret = append(ret, dirent)
- } else {
- dirent := fuse.Dirent{Name: entry.Name(), Type: findFileType(uint16(entry.Attr.Mode)), Inode: dirPath.Child(entry.Name()).AsInode()}
- ret = append(ret, dirent)
- }
- }
-
- if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
- glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
- return nil, fuse.EIO
- }
- listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
- processEachEntryFn(entry, false)
- return true
- })
- if listErr != nil {
- glog.Errorf("list meta cache: %v", listErr)
- return nil, fuse.EIO
- }
-
- // create proper . and .. directories
- ret = append(ret, fuse.Dirent{
- Inode: dirPath.AsInode(),
- Name: ".",
- Type: fuse.DT_Dir,
- })
-
- // return the correct parent inode for the mount root
- var inode uint64
- if string(dirPath) == dir.wfs.option.FilerMountRootPath {
- inode = dir.wfs.option.MountParentInode
- } else {
- inode = util.FullPath(dir.parent.FullPath()).AsInode()
- }
-
- ret = append(ret, fuse.Dirent{
- Inode: inode,
- Name: "..",
- Type: fuse.DT_Dir,
- })
-
- return
-}
-
-func findFileType(mode uint16) fuse.DirentType {
- switch mode & (syscall.S_IFMT & 0xffff) {
- case syscall.S_IFSOCK:
- return fuse.DT_Socket
- case syscall.S_IFLNK:
- return fuse.DT_Link
- case syscall.S_IFREG:
- return fuse.DT_File
- case syscall.S_IFBLK:
- return fuse.DT_Block
- case syscall.S_IFDIR:
- return fuse.DT_Dir
- case syscall.S_IFCHR:
- return fuse.DT_Char
- case syscall.S_IFIFO:
- return fuse.DT_FIFO
- }
- return fuse.DT_File
-}
-
-func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
-
- if !req.Dir {
- return dir.removeOneFile(req)
- }
-
- return dir.removeFolder(req)
-
-}
-
-func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
-
- dirFullPath := dir.FullPath()
- filePath := util.NewFullPath(dirFullPath, req.Name)
- entry, err := filer_pb.GetEntry(dir.wfs, filePath)
- if err != nil {
- return err
- }
-
- // first, ensure the filer store can correctly delete
- glog.V(3).Infof("remove file: %v", req)
- isDeleteData := entry != nil && entry.HardLinkCounter <= 1
- err = filer_pb.Remove(dir.wfs, dirFullPath, req.Name, isDeleteData, false, false, false, []int32{dir.wfs.signature})
- if err != nil {
- glog.V(3).Infof("not found remove file %s: %v", filePath, err)
- return fuse.ENOENT
- }
-
- // then, delete meta cache and fsNode cache
- if err = dir.wfs.metaCache.DeleteEntry(context.Background(), filePath); err != nil {
- glog.V(3).Infof("local DeleteEntry %s: %v", filePath, err)
- return fuse.ESTALE
- }
-
- // remove current file handle if any
- dir.wfs.handlesLock.Lock()
- defer dir.wfs.handlesLock.Unlock()
- inodeId := filePath.AsInode()
- if fh, ok := dir.wfs.handles[inodeId]; ok {
- delete(dir.wfs.handles, inodeId)
- fh.isDeleted = true
- }
-
- return nil
-
-}
-
-func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
-
- dirFullPath := dir.FullPath()
- glog.V(3).Infof("remove directory entry: %v", req)
- ignoreRecursiveErr := true // ignore recursion error since the OS should manage it
- err := filer_pb.Remove(dir.wfs, dirFullPath, req.Name, true, true, ignoreRecursiveErr, false, []int32{dir.wfs.signature})
- if err != nil {
- glog.V(0).Infof("remove %s/%s: %v", dirFullPath, req.Name, err)
- if strings.Contains(err.Error(), "non-empty") {
- return fuse.EEXIST
- }
- return fuse.ENOENT
- }
-
- t := util.NewFullPath(dirFullPath, req.Name)
- dir.wfs.metaCache.DeleteEntry(context.Background(), t)
-
- return nil
-
-}
-
-func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
-
- glog.V(4).Infof("%v dir setattr %+v", dir.FullPath(), req)
-
- entry, err := dir.maybeLoadEntry()
- if err != nil {
- return err
- }
-
- if req.Valid.Mode() {
- entry.Attributes.FileMode = uint32(req.Mode)
- }
-
- if req.Valid.Uid() {
- entry.Attributes.Uid = req.Uid
- }
-
- if req.Valid.Gid() {
- entry.Attributes.Gid = req.Gid
- }
-
- if req.Valid.Mtime() {
- entry.Attributes.Mtime = req.Mtime.Unix()
- }
-
- return dir.saveEntry(entry)
-
-}
-
-func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
-
- glog.V(4).Infof("dir Setxattr %s: %s", dir.FullPath(), req.Name)
-
- entry, err := dir.maybeLoadEntry()
- if err != nil {
- return err
- }
-
- if err := setxattr(entry, req); err != nil {
- return err
- }
-
- return dir.saveEntry(entry)
-
-}
-
-func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error {
-
- glog.V(4).Infof("dir Removexattr %s: %s", dir.FullPath(), req.Name)
-
- entry, err := dir.maybeLoadEntry()
- if err != nil {
- return err
- }
-
- if err := removexattr(entry, req); err != nil {
- return err
- }
-
- return dir.saveEntry(entry)
-
-}
-
-func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
-
- glog.V(4).Infof("dir Listxattr %s", dir.FullPath())
-
- entry, err := dir.maybeLoadEntry()
- if err != nil {
- return err
- }
-
- if err := listxattr(entry, req, resp); err != nil {
- return err
- }
-
- return nil
-
-}
-
-func (dir *Dir) Forget() {
- glog.V(4).Infof("Forget dir %s", dir.FullPath())
-}
-
-func (dir *Dir) maybeLoadEntry() (*filer_pb.Entry, error) {
- parentDirPath, name := util.FullPath(dir.FullPath()).DirAndName()
- return dir.wfs.maybeLoadEntry(parentDirPath, name)
-}
-
-func (dir *Dir) saveEntry(entry *filer_pb.Entry) error {
-
- parentDir, name := util.FullPath(dir.FullPath()).DirAndName()
-
- return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- dir.wfs.mapPbIdFromLocalToFiler(entry)
- defer dir.wfs.mapPbIdFromFilerToLocal(entry)
-
- request := &filer_pb.UpdateEntryRequest{
- Directory: parentDir,
- Entry: entry,
- Signatures: []int32{dir.wfs.signature},
- }
-
- glog.V(1).Infof("save dir entry: %v", request)
- _, err := client.UpdateEntry(context.Background(), request)
- if err != nil {
- glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err)
- return fuse.EIO
- }
-
- if err := dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
- glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err)
- return fuse.ESTALE
- }
-
- return nil
- })
-}
-
-func (dir *Dir) FullPath() string {
- var parts []string
- for p := dir; p != nil; p = p.parent {
- if strings.HasPrefix(p.name, "/") {
- if len(p.name) > 1 {
- parts = append(parts, p.name[1:])
- }
- } else {
- parts = append(parts, p.name)
- }
- }
-
- if len(parts) == 0 {
- return "/"
- }
-
- var buf bytes.Buffer
- for i := len(parts) - 1; i >= 0; i-- {
- buf.WriteString("/")
- buf.WriteString(parts[i])
- }
- return buf.String()
-}
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
deleted file mode 100644
index acdcd2de4..000000000
--- a/weed/filesys/dir_link.go
+++ /dev/null
@@ -1,160 +0,0 @@
-package filesys
-
-import (
- "context"
- "github.com/chrislusf/seaweedfs/weed/util"
- "os"
- "syscall"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
-)
-
-var _ = fs.NodeLinker(&Dir{})
-var _ = fs.NodeSymlinker(&Dir{})
-var _ = fs.NodeReadlinker(&File{})
-
-const (
- HARD_LINK_MARKER = '\x01'
-)
-
-func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (fs.Node, error) {
-
- oldFile, ok := old.(*File)
- if !ok {
- glog.Errorf("old node is not a file: %+v", old)
- }
-
- glog.V(4).Infof("Link: %v/%v -> %v/%v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName)
-
- oldEntry, err := oldFile.maybeLoadEntry(ctx)
- if err != nil {
- return nil, err
- }
-
- if oldEntry == nil {
- return nil, fuse.EIO
- }
-
- // update old file to hardlink mode
- if len(oldEntry.HardLinkId) == 0 {
- oldEntry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
- oldEntry.HardLinkCounter = 1
- }
- oldEntry.HardLinkCounter++
- updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
- Directory: oldFile.dir.FullPath(),
- Entry: oldEntry,
- Signatures: []int32{dir.wfs.signature},
- }
-
- // CreateLink 1.2 : update new file to hardlink mode
- request := &filer_pb.CreateEntryRequest{
- Directory: dir.FullPath(),
- Entry: &filer_pb.Entry{
- Name: req.NewName,
- IsDirectory: false,
- Attributes: oldEntry.Attributes,
- Chunks: oldEntry.Chunks,
- Extended: oldEntry.Extended,
- HardLinkId: oldEntry.HardLinkId,
- HardLinkCounter: oldEntry.HardLinkCounter,
- },
- Signatures: []int32{dir.wfs.signature},
- }
-
- // apply changes to the filer, and also apply to local metaCache
- err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
- defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
-
- if err := filer_pb.UpdateEntry(client, updateOldEntryRequest); err != nil {
- glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err)
- return fuse.EIO
- }
- dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(updateOldEntryRequest.Directory, updateOldEntryRequest.Entry))
-
- if err := filer_pb.CreateEntry(client, request); err != nil {
- glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err)
- return fuse.EIO
- }
- dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
-
- return nil
- })
-
- if err != nil {
- return nil, fuse.EIO
- }
-
- // create new file node
- newNode := dir.newFile(req.NewName)
- newFile := newNode.(*File)
-
- return newFile, err
-
-}
-
-func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, error) {
-
- glog.V(4).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target)
-
- request := &filer_pb.CreateEntryRequest{
- Directory: dir.FullPath(),
- Entry: &filer_pb.Entry{
- Name: req.NewName,
- IsDirectory: false,
- Attributes: &filer_pb.FuseAttributes{
- Mtime: time.Now().Unix(),
- Crtime: time.Now().Unix(),
- FileMode: uint32((os.FileMode(0777) | os.ModeSymlink) &^ dir.wfs.option.Umask),
- Uid: req.Uid,
- Gid: req.Gid,
- SymlinkTarget: req.Target,
- },
- },
- Signatures: []int32{dir.wfs.signature},
- }
-
- err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
- defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
-
- if err := filer_pb.CreateEntry(client, request); err != nil {
- glog.V(0).Infof("symlink %s/%s: %v", dir.FullPath(), req.NewName, err)
- return fuse.EIO
- }
-
- dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
-
- return nil
- })
-
- symlink := dir.newFile(req.NewName)
-
- return symlink, err
-
-}
-
-func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (string, error) {
-
- entry, err := file.maybeLoadEntry(ctx)
- if err != nil {
- return "", err
- }
-
- if os.FileMode(entry.Attributes.FileMode)&os.ModeSymlink == 0 {
- return "", fuse.Errno(syscall.EINVAL)
- }
-
- glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, entry.Attributes.SymlinkTarget)
-
- return entry.Attributes.SymlinkTarget, nil
-
-}
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go
deleted file mode 100644
index dd76577b0..000000000
--- a/weed/filesys/dir_rename.go
+++ /dev/null
@@ -1,174 +0,0 @@
-package filesys
-
-import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/filer"
- "math"
-
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error {
-
- newDir := newDirectory.(*Dir)
-
- newPath := util.NewFullPath(newDir.FullPath(), req.NewName)
- oldPath := util.NewFullPath(dir.FullPath(), req.OldName)
-
- glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)
-
- // find local old entry
- oldEntry, err := dir.wfs.metaCache.FindEntry(context.Background(), oldPath)
- if err != nil {
- glog.Errorf("dir Rename can not find source %s : %v", oldPath, err)
- return fuse.ENOENT
- }
-
- // update remote filer
- err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- request := &filer_pb.AtomicRenameEntryRequest{
- OldDirectory: dir.FullPath(),
- OldName: req.OldName,
- NewDirectory: newDir.FullPath(),
- NewName: req.NewName,
- Signatures: []int32{dir.wfs.signature},
- }
-
- _, err := client.AtomicRenameEntry(ctx, request)
- if err != nil {
- glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err)
- return fuse.EXDEV
- }
-
- return nil
-
- })
- if err != nil {
- glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err)
- return fuse.EIO
- }
-
- err = dir.moveEntry(context.Background(), util.FullPath(dir.FullPath()), oldEntry, util.FullPath(newDir.FullPath()), req.NewName)
- if err != nil {
- glog.V(0).Infof("dir local Rename %s => %s : %v", oldPath, newPath, err)
- return fuse.EIO
- }
-
- return nil
-}
-
-func (dir *Dir) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
-
- oldName := entry.Name()
-
- oldPath := oldParent.Child(oldName)
- newPath := newParent.Child(newName)
- if err := dir.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error {
-
- oldFsNode := NodeWithId(oldPath.AsInode())
- newFsNode := NodeWithId(newPath.AsInode())
- newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode()))
- var newDir *Dir
- if found {
- newDir = newDirNode.(*Dir)
- }
- dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) {
- if file, ok := internalNode.(*File); ok {
- glog.V(4).Infof("internal file node %s", oldParent.Child(oldName))
- file.Name = newName
- file.id = uint64(newFsNode)
- if found {
- file.dir = newDir
- }
- }
- if dir, ok := internalNode.(*Dir); ok {
- glog.V(4).Infof("internal dir node %s", oldParent.Child(oldName))
- dir.name = newName
- dir.id = uint64(newFsNode)
- if found {
- dir.parent = newDir
- }
- }
- })
-
- // change file handle
- inodeId := oldPath.AsInode()
- dir.wfs.handlesLock.Lock()
- if existingHandle, found := dir.wfs.handles[inodeId]; found && existingHandle == nil {
- glog.V(4).Infof("opened file handle %s => %s", oldPath, newPath)
- delete(dir.wfs.handles, inodeId)
- dir.wfs.handles[newPath.AsInode()] = existingHandle
- }
- dir.wfs.handlesLock.Unlock()
-
- if entry.IsDirectory() {
- if err := dir.moveFolderSubEntries(ctx, oldParent, oldName, newParent, newName); err != nil {
- return err
- }
- }
- return nil
- }); err != nil {
- return fmt.Errorf("fail to move %s => %s: %v", oldPath, newPath, err)
- }
-
- return nil
-}
-
-func (dir *Dir) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, oldName string, newParent util.FullPath, newName string) error {
-
- currentDirPath := oldParent.Child(oldName)
- newDirPath := newParent.Child(newName)
-
- glog.V(1).Infof("moving folder %s => %s", currentDirPath, newDirPath)
-
- var moveErr error
- listErr := dir.wfs.metaCache.ListDirectoryEntries(ctx, currentDirPath, "", false, int64(math.MaxInt32), func(item *filer.Entry) bool {
- moveErr = dir.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name())
- if moveErr != nil {
- return false
- }
- return true
- })
- if listErr != nil {
- return listErr
- }
- if moveErr != nil {
- return moveErr
- }
-
- return nil
-}
-
-func (dir *Dir) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error {
-
- newPath := newParent.Child(newName)
- oldPath := oldParent.Child(entry.Name())
-
- entry.FullPath = newPath
- if err := dir.wfs.metaCache.InsertEntry(ctx, entry); err != nil {
- glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err)
- return fuse.EIO
- }
-
- if moveFolderSubEntries != nil {
- if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil {
- return moveChildrenErr
- }
- }
-
- if err := dir.wfs.metaCache.DeleteEntry(ctx, oldPath); err != nil {
- glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err)
- return fuse.EIO
- }
-
- return nil
-}
diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go
deleted file mode 100644
index 304793340..000000000
--- a/weed/filesys/dirty_page_interval.go
+++ /dev/null
@@ -1,222 +0,0 @@
-package filesys
-
-import (
- "io"
-
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type IntervalNode struct {
- Data []byte
- Offset int64
- Size int64
- Next *IntervalNode
-}
-
-type IntervalLinkedList struct {
- Head *IntervalNode
- Tail *IntervalNode
-}
-
-type ContinuousIntervals struct {
- lists []*IntervalLinkedList
-}
-
-func (list *IntervalLinkedList) Offset() int64 {
- return list.Head.Offset
-}
-func (list *IntervalLinkedList) Size() int64 {
- return list.Tail.Offset + list.Tail.Size - list.Head.Offset
-}
-func (list *IntervalLinkedList) addNodeToTail(node *IntervalNode) {
- // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size)
- list.Tail.Next = node
- list.Tail = node
-}
-func (list *IntervalLinkedList) addNodeToHead(node *IntervalNode) {
- // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size)
- node.Next = list.Head
- list.Head = node
-}
-
-func (list *IntervalLinkedList) ReadData(buf []byte, start, stop int64) {
- t := list.Head
- for {
-
- nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size)
- if nodeStart < nodeStop {
- // glog.V(0).Infof("copying start=%d stop=%d t=[%d,%d) t.data=%d => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.Offset, t.Offset+t.Size, len(t.Data), len(buf), nodeStart, nodeStop)
- copy(buf[nodeStart-start:], t.Data[nodeStart-t.Offset:nodeStop-t.Offset])
- }
-
- if t.Next == nil {
- break
- }
- t = t.Next
- }
-}
-
-func (c *ContinuousIntervals) TotalSize() (total int64) {
- for _, list := range c.lists {
- total += list.Size()
- }
- return
-}
-
-func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList {
- var nodes []*IntervalNode
- for t := list.Head; t != nil; t = t.Next {
- nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size)
- if nodeStart >= nodeStop {
- // skip non overlapping IntervalNode
- continue
- }
- nodes = append(nodes, &IntervalNode{
- Data: t.Data[nodeStart-t.Offset : nodeStop-t.Offset],
- Offset: nodeStart,
- Size: nodeStop - nodeStart,
- Next: nil,
- })
- }
- for i := 1; i < len(nodes); i++ {
- nodes[i-1].Next = nodes[i]
- }
- return &IntervalLinkedList{
- Head: nodes[0],
- Tail: nodes[len(nodes)-1],
- }
-}
-
-func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) {
-
- interval := &IntervalNode{Data: data, Offset: offset, Size: int64(len(data))}
-
- // append to the tail and return
- if len(c.lists) == 1 {
- lastSpan := c.lists[0]
- if lastSpan.Tail.Offset+lastSpan.Tail.Size == offset {
- lastSpan.addNodeToTail(interval)
- return
- }
- }
-
- var newLists []*IntervalLinkedList
- for _, list := range c.lists {
- // if list is to the left of new interval, add to the new list
- if list.Tail.Offset+list.Tail.Size <= interval.Offset {
- newLists = append(newLists, list)
- }
- // if list is to the right of new interval, add to the new list
- if interval.Offset+interval.Size <= list.Head.Offset {
- newLists = append(newLists, list)
- }
- // if new interval overwrite the right part of the list
- if list.Head.Offset < interval.Offset && interval.Offset < list.Tail.Offset+list.Tail.Size {
- // create a new list of the left part of existing list
- newLists = append(newLists, subList(list, list.Offset(), interval.Offset))
- }
- // if new interval overwrite the left part of the list
- if list.Head.Offset < interval.Offset+interval.Size && interval.Offset+interval.Size < list.Tail.Offset+list.Tail.Size {
- // create a new list of the right part of existing list
- newLists = append(newLists, subList(list, interval.Offset+interval.Size, list.Tail.Offset+list.Tail.Size))
- }
- // skip anything that is fully overwritten by the new interval
- }
-
- c.lists = newLists
- // add the new interval to the lists, connecting neighbor lists
- var prevList, nextList *IntervalLinkedList
-
- for _, list := range c.lists {
- if list.Head.Offset == interval.Offset+interval.Size {
- nextList = list
- break
- }
- }
-
- for _, list := range c.lists {
- if list.Head.Offset+list.Size() == offset {
- list.addNodeToTail(interval)
- prevList = list
- break
- }
- }
-
- if prevList != nil && nextList != nil {
- // glog.V(4).Infof("connecting [%d,%d) + [%d,%d) => [%d,%d)", prevList.Head.Offset, prevList.Tail.Offset+prevList.Tail.Size, nextList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size, prevList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size)
- prevList.Tail.Next = nextList.Head
- prevList.Tail = nextList.Tail
- c.removeList(nextList)
- } else if nextList != nil {
- // add to head was not done when checking
- nextList.addNodeToHead(interval)
- }
- if prevList == nil && nextList == nil {
- c.lists = append(c.lists, &IntervalLinkedList{
- Head: interval,
- Tail: interval,
- })
- }
-
- return
-}
-
-func (c *ContinuousIntervals) RemoveLargestIntervalLinkedList() *IntervalLinkedList {
- var maxSize int64
- maxIndex := -1
- for k, list := range c.lists {
- if maxSize <= list.Size() {
- maxSize = list.Size()
- maxIndex = k
- }
- }
- if maxSize <= 0 {
- return nil
- }
-
- t := c.lists[maxIndex]
- c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...)
- return t
-
-}
-
-func (c *ContinuousIntervals) removeList(target *IntervalLinkedList) {
- index := -1
- for k, list := range c.lists {
- if list.Offset() == target.Offset() {
- index = k
- }
- }
- if index < 0 {
- return
- }
-
- c.lists = append(c.lists[0:index], c.lists[index+1:]...)
-
-}
-
-func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) {
- for _, list := range c.lists {
- start := max(startOffset, list.Offset())
- stop := min(startOffset+int64(len(data)), list.Offset()+list.Size())
- if start < stop {
- list.ReadData(data[start-startOffset:], start, stop)
- maxStop = max(maxStop, stop)
- }
- }
- return
-}
-
-func (l *IntervalLinkedList) ToReader() io.Reader {
- var readers []io.Reader
- t := l.Head
- readers = append(readers, util.NewBytesReader(t.Data))
- for t.Next != nil {
- t = t.Next
- readers = append(readers, util.NewBytesReader(t.Data))
- }
- if len(readers) == 1 {
- return readers[0]
- }
- return io.MultiReader(readers...)
-}
diff --git a/weed/filesys/dirty_page_interval_test.go b/weed/filesys/dirty_page_interval_test.go
deleted file mode 100644
index d02ad27fd..000000000
--- a/weed/filesys/dirty_page_interval_test.go
+++ /dev/null
@@ -1,113 +0,0 @@
-package filesys
-
-import (
- "bytes"
- "math/rand"
- "testing"
-)
-
-func TestContinuousIntervals_AddIntervalAppend(t *testing.T) {
-
- c := &ContinuousIntervals{}
-
- // 25, 25, 25
- c.AddInterval(getBytes(25, 3), 0)
- // _, _, 23, 23, 23, 23
- c.AddInterval(getBytes(23, 4), 2)
-
- expectedData(t, c, 0, 25, 25, 23, 23, 23, 23)
-
-}
-
-func TestContinuousIntervals_AddIntervalInnerOverwrite(t *testing.T) {
-
- c := &ContinuousIntervals{}
-
- // 25, 25, 25, 25, 25
- c.AddInterval(getBytes(25, 5), 0)
- // _, _, 23, 23
- c.AddInterval(getBytes(23, 2), 2)
-
- expectedData(t, c, 0, 25, 25, 23, 23, 25)
-
-}
-
-func TestContinuousIntervals_AddIntervalFullOverwrite(t *testing.T) {
-
- c := &ContinuousIntervals{}
-
- // 1,
- c.AddInterval(getBytes(1, 1), 0)
- // _, 2,
- c.AddInterval(getBytes(2, 1), 1)
- // _, _, 3, 3, 3
- c.AddInterval(getBytes(3, 3), 2)
- // _, _, _, 4, 4, 4
- c.AddInterval(getBytes(4, 3), 3)
-
- expectedData(t, c, 0, 1, 2, 3, 4, 4, 4)
-
-}
-
-func TestContinuousIntervals_RealCase1(t *testing.T) {
-
- c := &ContinuousIntervals{}
-
- // 25,
- c.AddInterval(getBytes(25, 1), 0)
- // _, _, _, _, 23, 23
- c.AddInterval(getBytes(23, 2), 4)
- // _, _, _, 24, 24, 24, 24
- c.AddInterval(getBytes(24, 4), 3)
-
- // _, 22, 22
- c.AddInterval(getBytes(22, 2), 1)
-
- expectedData(t, c, 0, 25, 22, 22, 24, 24, 24, 24)
-
-}
-
-func TestRandomWrites(t *testing.T) {
-
- c := &ContinuousIntervals{}
-
- data := make([]byte, 1024)
-
- for i := 0; i < 1024; i++ {
-
- start, stop := rand.Intn(len(data)), rand.Intn(len(data))
- if start > stop {
- start, stop = stop, start
- }
-
- rand.Read(data[start : stop+1])
-
- c.AddInterval(data[start:stop+1], int64(start))
-
- expectedData(t, c, 0, data...)
-
- }
-
-}
-
-func expectedData(t *testing.T, c *ContinuousIntervals, offset int, data ...byte) {
- start, stop := int64(offset), int64(offset+len(data))
- for _, list := range c.lists {
- nodeStart, nodeStop := max(start, list.Head.Offset), min(stop, list.Head.Offset+list.Size())
- if nodeStart < nodeStop {
- buf := make([]byte, nodeStop-nodeStart)
- list.ReadData(buf, nodeStart, nodeStop)
- if bytes.Compare(buf, data[nodeStart-start:nodeStop-start]) != 0 {
- t.Errorf("expected %v actual %v", data[nodeStart-start:nodeStop-start], buf)
- }
- }
- }
-}
-
-func getBytes(content byte, length int) []byte {
- data := make([]byte, length)
- for i := 0; i < length; i++ {
- data[i] = content
- }
- return data
-}
diff --git a/weed/filesys/dirty_pages.go b/weed/filesys/dirty_pages.go
deleted file mode 100644
index 8505323ef..000000000
--- a/weed/filesys/dirty_pages.go
+++ /dev/null
@@ -1,10 +0,0 @@
-package filesys
-
-type DirtyPages interface {
- AddPage(offset int64, data []byte)
- FlushData() error
- ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
- GetStorageOptions() (collection, replication string)
- SetWriteOnly(writeOnly bool)
- GetWriteOnly() (writeOnly bool)
-}
diff --git a/weed/filesys/dirty_pages_continuous.go b/weed/filesys/dirty_pages_continuous.go
deleted file mode 100644
index b7514a2eb..000000000
--- a/weed/filesys/dirty_pages_continuous.go
+++ /dev/null
@@ -1,160 +0,0 @@
-package filesys
-
-import (
- "bytes"
- "fmt"
- "io"
- "sync"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-type ContinuousDirtyPages struct {
- intervals *ContinuousIntervals
- f *File
- writeOnly bool
- writeWaitGroup sync.WaitGroup
- chunkAddLock sync.Mutex
- lastErr error
- collection string
- replication string
-}
-
-func newContinuousDirtyPages(file *File, writeOnly bool) *ContinuousDirtyPages {
- dirtyPages := &ContinuousDirtyPages{
- intervals: &ContinuousIntervals{},
- f: file,
- writeOnly: writeOnly,
- }
- return dirtyPages
-}
-
-func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) {
-
- glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data)))
-
- if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
- // this is more than what buffer can hold.
- pages.flushAndSave(offset, data)
- }
-
- pages.intervals.AddInterval(data, offset)
-
- if pages.intervals.TotalSize() >= pages.f.wfs.option.ChunkSizeLimit {
- pages.saveExistingLargestPageToStorage()
- }
-
- return
-}
-
-func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) {
-
- // flush existing
- pages.saveExistingPagesToStorage()
-
- // flush the new page
- pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data)))
-
- return
-}
-
-func (pages *ContinuousDirtyPages) FlushData() error {
-
- pages.saveExistingPagesToStorage()
- pages.writeWaitGroup.Wait()
- if pages.lastErr != nil {
- return fmt.Errorf("flush data: %v", pages.lastErr)
- }
- return nil
-}
-
-func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() {
- for pages.saveExistingLargestPageToStorage() {
- }
-}
-
-func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedData bool) {
-
- maxList := pages.intervals.RemoveLargestIntervalLinkedList()
- if maxList == nil {
- return false
- }
-
- entry := pages.f.getEntry()
- if entry == nil {
- return false
- }
-
- fileSize := int64(entry.Attributes.FileSize)
-
- chunkSize := min(maxList.Size(), fileSize-maxList.Offset())
- if chunkSize == 0 {
- return false
- }
-
- pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize)
-
- return true
-}
-
-func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
-
- mtime := time.Now().UnixNano()
- pages.writeWaitGroup.Add(1)
- writer := func() {
- defer pages.writeWaitGroup.Done()
-
- reader = io.LimitReader(reader, size)
- chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset)
- if err != nil {
- glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
- pages.lastErr = err
- return
- }
- chunk.Mtime = mtime
- pages.collection, pages.replication = collection, replication
- pages.chunkAddLock.Lock()
- defer pages.chunkAddLock.Unlock()
- pages.f.addChunks([]*filer_pb.FileChunk{chunk})
- glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size)
- }
-
- if pages.f.wfs.concurrentWriters != nil {
- pages.f.wfs.concurrentWriters.Execute(writer)
- } else {
- go writer()
- }
-}
-
-func max(x, y int64) int64 {
- if x > y {
- return x
- }
- return y
-}
-func min(x, y int64) int64 {
- if x < y {
- return x
- }
- return y
-}
-
-func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
- return pages.intervals.ReadDataAt(data, startOffset)
-}
-
-func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) {
- return pages.collection, pages.replication
-}
-
-func (pages *ContinuousDirtyPages) SetWriteOnly(writeOnly bool) {
- if pages.writeOnly {
- pages.writeOnly = writeOnly
- }
-}
-
-func (pages *ContinuousDirtyPages) GetWriteOnly() (writeOnly bool) {
- return pages.writeOnly
-}
diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go
deleted file mode 100644
index 9fa7c0c8e..000000000
--- a/weed/filesys/dirty_pages_temp_file.go
+++ /dev/null
@@ -1,157 +0,0 @@
-package filesys
-
-import (
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "io"
- "os"
- "sync"
- "time"
-)
-
-type TempFileDirtyPages struct {
- f *File
- tf *os.File
- writtenIntervals *WrittenContinuousIntervals
- writeOnly bool
- writeWaitGroup sync.WaitGroup
- pageAddLock sync.Mutex
- chunkAddLock sync.Mutex
- lastErr error
- collection string
- replication string
-}
-
-func newTempFileDirtyPages(file *File, writeOnly bool) *TempFileDirtyPages {
-
- tempFile := &TempFileDirtyPages{
- f: file,
- writeOnly: writeOnly,
- writtenIntervals: &WrittenContinuousIntervals{},
- }
-
- return tempFile
-}
-
-func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
-
- pages.pageAddLock.Lock()
- defer pages.pageAddLock.Unlock()
-
- if pages.tf == nil {
- tf, err := os.CreateTemp(pages.f.wfs.option.getTempFilePageDir(), "")
- if err != nil {
- glog.Errorf("create temp file: %v", err)
- pages.lastErr = err
- return
- }
- pages.tf = tf
- pages.writtenIntervals.tempFile = tf
- pages.writtenIntervals.lastOffset = 0
- }
-
- writtenOffset := pages.writtenIntervals.lastOffset
- dataSize := int64(len(data))
-
- // glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize)
-
- if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil {
- pages.lastErr = err
- } else {
- pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset)
- pages.writtenIntervals.lastOffset += dataSize
- }
-
- // pages.writtenIntervals.debug()
-
- return
-}
-
-func (pages *TempFileDirtyPages) FlushData() error {
-
- pages.saveExistingPagesToStorage()
- pages.writeWaitGroup.Wait()
- if pages.lastErr != nil {
- return fmt.Errorf("flush data: %v", pages.lastErr)
- }
- pages.pageAddLock.Lock()
- defer pages.pageAddLock.Unlock()
- if pages.tf != nil {
-
- pages.writtenIntervals.tempFile = nil
- pages.writtenIntervals.lists = nil
-
- pages.tf.Close()
- os.Remove(pages.tf.Name())
- pages.tf = nil
- }
- return nil
-}
-
-func (pages *TempFileDirtyPages) saveExistingPagesToStorage() {
-
- pageSize := pages.f.wfs.option.ChunkSizeLimit
-
- // glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists))
-
- for _, list := range pages.writtenIntervals.lists {
- listStopOffset := list.Offset() + list.Size()
- for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
- start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize)
- if start >= stop {
- continue
- }
- // glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists))
- pages.saveToStorage(list.ToReader(start, stop), start, stop-start)
- }
- }
-
-}
-
-func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
-
- mtime := time.Now().UnixNano()
- pages.writeWaitGroup.Add(1)
- writer := func() {
- defer pages.writeWaitGroup.Done()
-
- reader = io.LimitReader(reader, size)
- chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset)
- if err != nil {
- glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
- pages.lastErr = err
- return
- }
- chunk.Mtime = mtime
- pages.collection, pages.replication = collection, replication
- pages.chunkAddLock.Lock()
- defer pages.chunkAddLock.Unlock()
- pages.f.addChunks([]*filer_pb.FileChunk{chunk})
- glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size)
- }
-
- if pages.f.wfs.concurrentWriters != nil {
- pages.f.wfs.concurrentWriters.Execute(writer)
- } else {
- go writer()
- }
-}
-
-func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
- return pages.writtenIntervals.ReadDataAt(data, startOffset)
-}
-
-func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) {
- return pages.collection, pages.replication
-}
-
-func (pages *TempFileDirtyPages) SetWriteOnly(writeOnly bool) {
- if pages.writeOnly {
- pages.writeOnly = writeOnly
- }
-}
-
-func (pages *TempFileDirtyPages) GetWriteOnly() (writeOnly bool) {
- return pages.writeOnly
-}
diff --git a/weed/filesys/dirty_pages_temp_interval.go b/weed/filesys/dirty_pages_temp_interval.go
deleted file mode 100644
index 42c4b5a3b..000000000
--- a/weed/filesys/dirty_pages_temp_interval.go
+++ /dev/null
@@ -1,289 +0,0 @@
-package filesys
-
-import (
- "io"
- "log"
- "os"
-)
-
-type WrittenIntervalNode struct {
- DataOffset int64
- TempOffset int64
- Size int64
- Next *WrittenIntervalNode
-}
-
-type WrittenIntervalLinkedList struct {
- tempFile *os.File
- Head *WrittenIntervalNode
- Tail *WrittenIntervalNode
-}
-
-type WrittenContinuousIntervals struct {
- tempFile *os.File
- lastOffset int64
- lists []*WrittenIntervalLinkedList
-}
-
-func (list *WrittenIntervalLinkedList) Offset() int64 {
- return list.Head.DataOffset
-}
-func (list *WrittenIntervalLinkedList) Size() int64 {
- return list.Tail.DataOffset + list.Tail.Size - list.Head.DataOffset
-}
-func (list *WrittenIntervalLinkedList) addNodeToTail(node *WrittenIntervalNode) {
- // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size)
- if list.Tail.TempOffset+list.Tail.Size == node.TempOffset {
- // already connected
- list.Tail.Size += node.Size
- } else {
- list.Tail.Next = node
- list.Tail = node
- }
-}
-func (list *WrittenIntervalLinkedList) addNodeToHead(node *WrittenIntervalNode) {
- // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size)
- node.Next = list.Head
- list.Head = node
-}
-
-func (list *WrittenIntervalLinkedList) ReadData(buf []byte, start, stop int64) {
- t := list.Head
- for {
-
- nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size)
- if nodeStart < nodeStop {
- // glog.V(4).Infof("copying start=%d stop=%d t=[%d,%d) => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.DataOffset, t.DataOffset+t.Size, len(buf), nodeStart, nodeStop)
- list.tempFile.ReadAt(buf[nodeStart-start:nodeStop-start], t.TempOffset+nodeStart-t.DataOffset)
- }
-
- if t.Next == nil {
- break
- }
- t = t.Next
- }
-}
-
-func (c *WrittenContinuousIntervals) TotalSize() (total int64) {
- for _, list := range c.lists {
- total += list.Size()
- }
- return
-}
-
-func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenIntervalLinkedList {
- var nodes []*WrittenIntervalNode
- for t := list.Head; t != nil; t = t.Next {
- nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size)
- if nodeStart >= nodeStop {
- // skip non overlapping WrittenIntervalNode
- continue
- }
- nodes = append(nodes, &WrittenIntervalNode{
- TempOffset: t.TempOffset + nodeStart - t.DataOffset,
- DataOffset: nodeStart,
- Size: nodeStop - nodeStart,
- Next: nil,
- })
- }
- for i := 1; i < len(nodes); i++ {
- nodes[i-1].Next = nodes[i]
- }
- return &WrittenIntervalLinkedList{
- tempFile: list.tempFile,
- Head: nodes[0],
- Tail: nodes[len(nodes)-1],
- }
-}
-
-func (c *WrittenContinuousIntervals) debug() {
- log.Printf("++")
- for _, l := range c.lists {
- log.Printf("++++")
- for t := l.Head; ; t = t.Next {
- log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size)
- if t.Next == nil {
- break
- }
- }
- log.Printf("----")
- }
- log.Printf("--")
-}
-
-func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, dataOffset int64) {
-
- interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)}
-
- // append to the tail and return
- if len(c.lists) == 1 {
- lastSpan := c.lists[0]
- if lastSpan.Tail.DataOffset+lastSpan.Tail.Size == dataOffset {
- lastSpan.addNodeToTail(interval)
- return
- }
- }
-
- var newLists []*WrittenIntervalLinkedList
- for _, list := range c.lists {
- // if list is to the left of new interval, add to the new list
- if list.Tail.DataOffset+list.Tail.Size <= interval.DataOffset {
- newLists = append(newLists, list)
- }
- // if list is to the right of new interval, add to the new list
- if interval.DataOffset+interval.Size <= list.Head.DataOffset {
- newLists = append(newLists, list)
- }
- // if new interval overwrite the right part of the list
- if list.Head.DataOffset < interval.DataOffset && interval.DataOffset < list.Tail.DataOffset+list.Tail.Size {
- // create a new list of the left part of existing list
- newLists = append(newLists, list.subList(list.Offset(), interval.DataOffset))
- }
- // if new interval overwrite the left part of the list
- if list.Head.DataOffset < interval.DataOffset+interval.Size && interval.DataOffset+interval.Size < list.Tail.DataOffset+list.Tail.Size {
- // create a new list of the right part of existing list
- newLists = append(newLists, list.subList(interval.DataOffset+interval.Size, list.Tail.DataOffset+list.Tail.Size))
- }
- // skip anything that is fully overwritten by the new interval
- }
-
- c.lists = newLists
- // add the new interval to the lists, connecting neighbor lists
- var prevList, nextList *WrittenIntervalLinkedList
-
- for _, list := range c.lists {
- if list.Head.DataOffset == interval.DataOffset+interval.Size {
- nextList = list
- break
- }
- }
-
- for _, list := range c.lists {
- if list.Head.DataOffset+list.Size() == dataOffset {
- list.addNodeToTail(interval)
- prevList = list
- break
- }
- }
-
- if prevList != nil && nextList != nil {
- // glog.V(4).Infof("connecting [%d,%d) + [%d,%d) => [%d,%d)", prevList.Head.Offset, prevList.Tail.Offset+prevList.Tail.Size, nextList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size, prevList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size)
- prevList.Tail.Next = nextList.Head
- prevList.Tail = nextList.Tail
- c.removeList(nextList)
- } else if nextList != nil {
- // add to head was not done when checking
- nextList.addNodeToHead(interval)
- }
- if prevList == nil && nextList == nil {
- c.lists = append(c.lists, &WrittenIntervalLinkedList{
- tempFile: c.tempFile,
- Head: interval,
- Tail: interval,
- })
- }
-
- return
-}
-
-func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenIntervalLinkedList {
- var maxSize int64
- maxIndex := -1
- for k, list := range c.lists {
- if maxSize <= list.Size() {
- maxSize = list.Size()
- maxIndex = k
- }
- }
- if maxSize <= 0 {
- return nil
- }
-
- t := c.lists[maxIndex]
- t.tempFile = c.tempFile
- c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...)
- return t
-
-}
-
-func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedList) {
- index := -1
- for k, list := range c.lists {
- if list.Offset() == target.Offset() {
- index = k
- }
- }
- if index < 0 {
- return
- }
-
- c.lists = append(c.lists[0:index], c.lists[index+1:]...)
-
-}
-
-func (c *WrittenContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) {
- for _, list := range c.lists {
- start := max(startOffset, list.Offset())
- stop := min(startOffset+int64(len(data)), list.Offset()+list.Size())
- if start < stop {
- list.ReadData(data[start-startOffset:], start, stop)
- maxStop = max(maxStop, stop)
- }
- }
- return
-}
-
-func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader {
- // TODO: optimize this to avoid another loop
- var readers []io.Reader
- for t := l.Head; ; t = t.Next {
- startOffset, stopOffset := max(t.DataOffset, start), min(t.DataOffset+t.Size, stop)
- if startOffset < stopOffset {
- // glog.V(4).Infof("ToReader read [%d,%d) from [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size)
- readers = append(readers, newFileSectionReader(l.tempFile, startOffset-t.DataOffset+t.TempOffset, startOffset, stopOffset-startOffset))
- }
- if t.Next == nil {
- break
- }
- }
- if len(readers) == 1 {
- return readers[0]
- }
- return io.MultiReader(readers...)
-}
-
-type FileSectionReader struct {
- file *os.File
- tempStartOffset int64
- Offset int64
- dataStart int64
- dataStop int64
-}
-
-var _ = io.Reader(&FileSectionReader{})
-
-func newFileSectionReader(tempfile *os.File, offset int64, dataOffset int64, size int64) *FileSectionReader {
- return &FileSectionReader{
- file: tempfile,
- tempStartOffset: offset,
- Offset: offset,
- dataStart: dataOffset,
- dataStop: dataOffset + size,
- }
-}
-
-func (f *FileSectionReader) Read(p []byte) (n int, err error) {
- remaining := (f.dataStop - f.dataStart) - (f.Offset - f.tempStartOffset)
- if remaining <= 0 {
- return 0, io.EOF
- }
- dataLen := min(remaining, int64(len(p)))
- // glog.V(4).Infof("reading [%d,%d) from %v [%d,%d)/[%d,%d) %d", f.Offset-f.tempStartOffset+f.dataStart, f.Offset-f.tempStartOffset+f.dataStart+dataLen, f.file.Name(), f.Offset, f.Offset+dataLen, f.tempStartOffset, f.tempStartOffset+f.dataStop-f.dataStart, f.dataStop-f.dataStart)
- n, err = f.file.ReadAt(p[:dataLen], f.Offset)
- if n > 0 {
- f.Offset += int64(n)
- } else {
- err = io.EOF
- }
- return
-}
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
deleted file mode 100644
index b990b20d1..000000000
--- a/weed/filesys/file.go
+++ /dev/null
@@ -1,389 +0,0 @@
-package filesys
-
-import (
- "context"
- "os"
- "sort"
- "time"
-
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-const blockSize = 512
-
-var _ = fs.Node(&File{})
-var _ = fs.NodeIdentifier(&File{})
-var _ = fs.NodeOpener(&File{})
-var _ = fs.NodeFsyncer(&File{})
-var _ = fs.NodeSetattrer(&File{})
-var _ = fs.NodeGetxattrer(&File{})
-var _ = fs.NodeSetxattrer(&File{})
-var _ = fs.NodeRemovexattrer(&File{})
-var _ = fs.NodeListxattrer(&File{})
-var _ = fs.NodeForgetter(&File{})
-
-type File struct {
- Name string
- dir *Dir
- wfs *WFS
- entry *filer_pb.Entry
- isOpen int
- dirtyMetadata bool
- id uint64
-}
-
-func (file *File) fullpath() util.FullPath {
- return util.NewFullPath(file.dir.FullPath(), file.Name)
-}
-
-func (file *File) Id() uint64 {
- return file.id
-}
-
-func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
-
- glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr)
-
- entry, err := file.maybeLoadEntry(ctx)
- if err != nil {
- return err
- }
-
- if entry == nil {
- return fuse.ENOENT
- }
-
- attr.Inode = file.Id()
- attr.Valid = time.Second
- attr.Mode = os.FileMode(entry.Attributes.FileMode)
- attr.Size = filer.FileSize(entry)
- if file.isOpen > 0 {
- attr.Size = entry.Attributes.FileSize
- glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size)
- }
- attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
- attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
- attr.Gid = entry.Attributes.Gid
- attr.Uid = entry.Attributes.Uid
- attr.Blocks = attr.Size/blockSize + 1
- attr.BlockSize = uint32(file.wfs.option.ChunkSizeLimit)
- if entry.HardLinkCounter > 0 {
- attr.Nlink = uint32(entry.HardLinkCounter)
- }
-
- return nil
-
-}
-
-func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
-
- // glog.V(4).Infof("file Getxattr %s", file.fullpath())
-
- entry, err := file.maybeLoadEntry(ctx)
- if err != nil {
- return err
- }
-
- return getxattr(entry, req, resp)
-}
-
-func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
-
- glog.V(4).Infof("file %v open %+v", file.fullpath(), req)
-
- handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid, req.Flags&fuse.OpenWriteOnly > 0)
-
- resp.Handle = fuse.HandleID(handle.handle)
-
- glog.V(4).Infof("%v file open handle id = %d", file.fullpath(), handle.handle)
-
- return handle, nil
-
-}
-
-func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
-
- glog.V(4).Infof("%v file setattr %+v", file.fullpath(), req)
-
- entry, err := file.maybeLoadEntry(ctx)
- if err != nil {
- return err
- }
-
- if req.Valid.Size() {
-
- glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks))
- if req.Size < filer.FileSize(entry) {
- // fmt.Printf("truncate %v \n", fullPath)
- var chunks []*filer_pb.FileChunk
- var truncatedChunks []*filer_pb.FileChunk
- for _, chunk := range entry.Chunks {
- int64Size := int64(chunk.Size)
- if chunk.Offset+int64Size > int64(req.Size) {
- // this chunk is truncated
- int64Size = int64(req.Size) - chunk.Offset
- if int64Size > 0 {
- chunks = append(chunks, chunk)
- glog.V(4).Infof("truncated chunk %+v from %d to %d\n", chunk.GetFileIdString(), chunk.Size, int64Size)
- chunk.Size = uint64(int64Size)
- } else {
- glog.V(4).Infof("truncated whole chunk %+v\n", chunk.GetFileIdString())
- truncatedChunks = append(truncatedChunks, chunk)
- }
- }
- }
- entry.Chunks = chunks
- }
- entry.Attributes.FileSize = req.Size
- file.dirtyMetadata = true
- }
-
- if req.Valid.Mode() && entry.Attributes.FileMode != uint32(req.Mode) {
- entry.Attributes.FileMode = uint32(req.Mode)
- file.dirtyMetadata = true
- }
-
- if req.Valid.Uid() && entry.Attributes.Uid != req.Uid {
- entry.Attributes.Uid = req.Uid
- file.dirtyMetadata = true
- }
-
- if req.Valid.Gid() && entry.Attributes.Gid != req.Gid {
- entry.Attributes.Gid = req.Gid
- file.dirtyMetadata = true
- }
-
- if req.Valid.Crtime() {
- entry.Attributes.Crtime = req.Crtime.Unix()
- file.dirtyMetadata = true
- }
-
- if req.Valid.Mtime() && entry.Attributes.Mtime != req.Mtime.Unix() {
- entry.Attributes.Mtime = req.Mtime.Unix()
- file.dirtyMetadata = true
- }
-
- if req.Valid.Handle() {
- // fmt.Printf("file handle => %d\n", req.Handle)
- }
-
- if file.isOpen > 0 {
- return nil
- }
-
- if !file.dirtyMetadata {
- return nil
- }
-
- return file.saveEntry(entry)
-
-}
-
-func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
-
- glog.V(4).Infof("file Setxattr %s: %s", file.fullpath(), req.Name)
-
- entry, err := file.maybeLoadEntry(ctx)
- if err != nil {
- return err
- }
-
- if err := setxattr(entry, req); err != nil {
- return err
- }
- file.dirtyMetadata = true
-
- if file.isOpen > 0 {
- return nil
- }
-
- return file.saveEntry(entry)
-
-}
-
-func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error {
-
- glog.V(4).Infof("file Removexattr %s: %s", file.fullpath(), req.Name)
-
- entry, err := file.maybeLoadEntry(ctx)
- if err != nil {
- return err
- }
-
- if err := removexattr(entry, req); err != nil {
- return err
- }
- file.dirtyMetadata = true
-
- if file.isOpen > 0 {
- return nil
- }
-
- return file.saveEntry(entry)
-
-}
-
-func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
-
- glog.V(4).Infof("file Listxattr %s", file.fullpath())
-
- entry, err := file.maybeLoadEntry(ctx)
- if err != nil {
- return err
- }
-
- if err := listxattr(entry, req, resp); err != nil {
- return err
- }
-
- return nil
-
-}
-
-func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
- // fsync works at OS level
- // write the file chunks to the filerGrpcAddress
- glog.V(4).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req)
-
- return nil
-}
-
-func (file *File) Forget() {
- t := util.NewFullPath(file.dir.FullPath(), file.Name)
- glog.V(4).Infof("Forget file %s", t)
- file.wfs.ReleaseHandle(t, fuse.HandleID(t.AsInode()))
-}
-
-func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) {
-
- file.wfs.handlesLock.Lock()
- handle, found := file.wfs.handles[file.Id()]
- file.wfs.handlesLock.Unlock()
- entry = file.entry
- if found {
- // glog.V(4).Infof("maybeLoadEntry found opened file %s/%s", file.dir.FullPath(), file.Name)
- entry = handle.f.entry
- }
-
- if entry != nil {
- if len(entry.HardLinkId) == 0 {
- // only always reload hard link
- return entry, nil
- }
- }
- entry, err = file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name)
- if err != nil {
- glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
- return entry, err
- }
- if entry != nil {
- // file.entry = entry
- } else {
- glog.Warningf("maybeLoadEntry not found entry %s/%s: %v", file.dir.FullPath(), file.Name, err)
- }
- return entry, nil
-}
-
-func lessThan(a, b *filer_pb.FileChunk) bool {
- if a.Mtime == b.Mtime {
- return a.Fid.FileKey < b.Fid.FileKey
- }
- return a.Mtime < b.Mtime
-}
-
-func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
-
- // find the earliest incoming chunk
- newChunks := chunks
- earliestChunk := newChunks[0]
- for i := 1; i < len(newChunks); i++ {
- if lessThan(earliestChunk, newChunks[i]) {
- earliestChunk = newChunks[i]
- }
- }
-
- entry := file.getEntry()
- if entry == nil {
- return
- }
-
- // pick out-of-order chunks from existing chunks
- for _, chunk := range entry.Chunks {
- if lessThan(earliestChunk, chunk) {
- chunks = append(chunks, chunk)
- }
- }
-
- // sort incoming chunks
- sort.Slice(chunks, func(i, j int) bool {
- return lessThan(chunks[i], chunks[j])
- })
-
- glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(entry.Chunks), len(chunks))
-
- entry.Chunks = append(entry.Chunks, newChunks...)
-}
-
-func (file *File) saveEntry(entry *filer_pb.Entry) error {
- return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- file.wfs.mapPbIdFromLocalToFiler(entry)
- defer file.wfs.mapPbIdFromFilerToLocal(entry)
-
- request := &filer_pb.CreateEntryRequest{
- Directory: file.dir.FullPath(),
- Entry: entry,
- Signatures: []int32{file.wfs.signature},
- }
-
- glog.V(4).Infof("save file entry: %v", request)
- _, err := client.CreateEntry(context.Background(), request)
- if err != nil {
- glog.Errorf("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
- return fuse.EIO
- }
-
- file.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
-
- file.dirtyMetadata = false
-
- return nil
- })
-}
-
-func (file *File) getEntry() *filer_pb.Entry {
- return file.entry
-}
-
-func (file *File) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) {
- err := file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.DownloadToLocalRequest{
- Directory: file.dir.FullPath(),
- Name: entry.Name,
- }
-
- glog.V(4).Infof("download entry: %v", request)
- resp, err := client.DownloadToLocal(context.Background(), request)
- if err != nil {
- glog.Errorf("DownloadToLocal file %s/%s: %v", file.dir.FullPath(), file.Name, err)
- return fuse.EIO
- }
-
- entry = resp.Entry
-
- file.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, resp.Entry))
-
- file.dirtyMetadata = false
-
- return nil
- })
-
- return entry, err
-}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
deleted file mode 100644
index 34affddb9..000000000
--- a/weed/filesys/filehandle.go
+++ /dev/null
@@ -1,336 +0,0 @@
-package filesys
-
-import (
- "context"
- "fmt"
- "io"
- "math"
- "net/http"
- "os"
- "sync"
- "time"
-
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-type FileHandle struct {
- // cache file has been written to
- dirtyPages DirtyPages
- entryViewCache []filer.VisibleInterval
- reader io.ReaderAt
- contentType string
- handle uint64
- sync.Mutex
-
- f *File
- RequestId fuse.RequestID // unique ID for request
- NodeId fuse.NodeID // file or directory the request is about
- Uid uint32 // user ID of process making request
- Gid uint32 // group ID of process making request
- writeOnly bool
- isDeleted bool
-}
-
-func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle {
- fh := &FileHandle{
- f: file,
- // dirtyPages: newContinuousDirtyPages(file, writeOnly),
- dirtyPages: newTempFileDirtyPages(file, writeOnly),
- Uid: uid,
- Gid: gid,
- }
- entry := fh.f.getEntry()
- if entry != nil {
- entry.Attributes.FileSize = filer.FileSize(entry)
- }
-
- return fh
-}
-
-var _ = fs.Handle(&FileHandle{})
-
-// var _ = fs.HandleReadAller(&FileHandle{})
-var _ = fs.HandleReader(&FileHandle{})
-var _ = fs.HandleFlusher(&FileHandle{})
-var _ = fs.HandleWriter(&FileHandle{})
-var _ = fs.HandleReleaser(&FileHandle{})
-
-func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
-
- glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data))
- fh.Lock()
- defer fh.Unlock()
-
- if req.Size <= 0 {
- return nil
- }
-
- buff := resp.Data[:cap(resp.Data)]
- if req.Size > cap(resp.Data) {
- // should not happen
- buff = make([]byte, req.Size)
- }
-
- totalRead, err := fh.readFromChunks(buff, req.Offset)
- if err == nil || err == io.EOF {
- maxStop := fh.readFromDirtyPages(buff, req.Offset)
- totalRead = max(maxStop-req.Offset, totalRead)
- }
-
- if err == io.EOF {
- err = nil
- }
-
- if err != nil {
- glog.Warningf("file handle read %s %d: %v", fh.f.fullpath(), totalRead, err)
- return fuse.EIO
- }
-
- if totalRead > int64(len(buff)) {
- glog.Warningf("%s FileHandle Read %d: [%d,%d) size %d totalRead %d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, totalRead)
- totalRead = min(int64(len(buff)), totalRead)
- }
- if err == nil {
- resp.Data = buff[:totalRead]
- }
-
- return err
-}
-
-func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) {
- maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset)
- return
-}
-
-func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
-
- entry := fh.f.getEntry()
- if entry == nil {
- return 0, io.EOF
- }
-
- if entry.IsInRemoteOnly() {
- glog.V(4).Infof("download remote entry %s", fh.f.fullpath())
- newEntry, err := fh.f.downloadRemoteEntry(entry)
- if err != nil {
- glog.V(1).Infof("download remote entry %s: %v", fh.f.fullpath(), err)
- return 0, err
- }
- entry = newEntry
- }
-
- fileSize := int64(filer.FileSize(entry))
- fileFullPath := fh.f.fullpath()
-
- if fileSize == 0 {
- glog.V(1).Infof("empty fh %v", fileFullPath)
- return 0, io.EOF
- }
-
- if offset+int64(len(buff)) <= int64(len(entry.Content)) {
- totalRead := copy(buff, entry.Content[offset:])
- glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
- return int64(totalRead), nil
- }
-
- var chunkResolveErr error
- if fh.entryViewCache == nil {
- fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks, 0, math.MaxInt64)
- if chunkResolveErr != nil {
- return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
- }
- fh.reader = nil
- }
-
- reader := fh.reader
- if reader == nil {
- chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64)
- reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize)
- }
- fh.reader = reader
-
- totalRead, err := reader.ReadAt(buff, offset)
-
- if err != nil && err != io.EOF {
- glog.Errorf("file handle read %s: %v", fileFullPath, err)
- }
-
- // glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
-
- return int64(totalRead), err
-}
-
-// Write to the file handle
-func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
-
- fh.Lock()
- defer fh.Unlock()
-
- // write the request to volume servers
- data := req.Data
- if len(data) <= 512 {
- // fuse message cacheable size
- data = make([]byte, len(req.Data))
- copy(data, req.Data)
- }
-
- entry := fh.f.getEntry()
- if entry == nil {
- return fuse.EIO
- }
-
- entry.Content = nil
- entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize)))
- // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
-
- fh.dirtyPages.AddPage(req.Offset, data)
-
- resp.Size = len(data)
-
- if req.Offset == 0 {
- // detect mime type
- fh.contentType = http.DetectContentType(data)
- fh.f.dirtyMetadata = true
- }
-
- fh.f.dirtyMetadata = true
-
- return nil
-}
-
-func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
-
- glog.V(4).Infof("Release %v fh %d open=%d", fh.f.fullpath(), fh.handle, fh.f.isOpen)
-
- fh.Lock()
- defer fh.Unlock()
-
- fh.f.wfs.handlesLock.Lock()
- fh.f.isOpen--
- fh.f.wfs.handlesLock.Unlock()
-
- if fh.f.isOpen <= 0 {
- fh.f.entry = nil
- fh.entryViewCache = nil
- fh.reader = nil
-
- fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
- }
-
- if fh.f.isOpen < 0 {
- glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0)
- fh.f.isOpen = 0
- return nil
- }
-
- return nil
-}
-
-func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
-
- glog.V(4).Infof("Flush %v fh %d", fh.f.fullpath(), fh.handle)
-
- if fh.isDeleted {
- glog.V(4).Infof("Flush %v fh %d skip deleted", fh.f.fullpath(), fh.handle)
- return nil
- }
-
- fh.Lock()
- defer fh.Unlock()
-
- if err := fh.doFlush(ctx, req.Header); err != nil {
- glog.Errorf("Flush doFlush %s: %v", fh.f.Name, err)
- return err
- }
-
- glog.V(4).Infof("Flush %v fh %d success", fh.f.fullpath(), fh.handle)
- return nil
-}
-
-func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
- // flush works at fh level
- // send the data to the OS
- glog.V(4).Infof("doFlush %s fh %d", fh.f.fullpath(), fh.handle)
-
- if err := fh.dirtyPages.FlushData(); err != nil {
- glog.Errorf("%v doFlush: %v", fh.f.fullpath(), err)
- return fuse.EIO
- }
-
- if !fh.f.dirtyMetadata {
- return nil
- }
-
- err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- entry := fh.f.getEntry()
- if entry == nil {
- return nil
- }
-
- if entry.Attributes != nil {
- entry.Attributes.Mime = fh.contentType
- if entry.Attributes.Uid == 0 {
- entry.Attributes.Uid = header.Uid
- }
- if entry.Attributes.Gid == 0 {
- entry.Attributes.Gid = header.Gid
- }
- if entry.Attributes.Crtime == 0 {
- entry.Attributes.Crtime = time.Now().Unix()
- }
- entry.Attributes.Mtime = time.Now().Unix()
- entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
- entry.Attributes.Collection, entry.Attributes.Replication = fh.dirtyPages.GetStorageOptions()
- }
-
- request := &filer_pb.CreateEntryRequest{
- Directory: fh.f.dir.FullPath(),
- Entry: entry,
- Signatures: []int32{fh.f.wfs.signature},
- }
-
- glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(entry.Chunks))
- for i, chunk := range entry.Chunks {
- glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
- }
-
- manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks)
-
- chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
- chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath(), fh.dirtyPages.GetWriteOnly()), chunks)
- if manifestErr != nil {
- // not good, but should be ok
- glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
- }
- entry.Chunks = append(chunks, manifestChunks...)
-
- fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry)
- defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry)
-
- if err := filer_pb.CreateEntry(client, request); err != nil {
- glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
- return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
- }
-
- fh.f.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
-
- return nil
- })
-
- if err == nil {
- fh.f.dirtyMetadata = false
- }
-
- if err != nil {
- glog.Errorf("%v fh %d flush: %v", fh.f.fullpath(), fh.handle, err)
- return fuse.EIO
- }
-
- return nil
-}
diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go
deleted file mode 100644
index 6b1012090..000000000
--- a/weed/filesys/fscache.go
+++ /dev/null
@@ -1,213 +0,0 @@
-package filesys
-
-import (
- "sync"
-
- "github.com/seaweedfs/fuse/fs"
-
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type FsCache struct {
- root *FsNode
- sync.RWMutex
-}
-type FsNode struct {
- parent *FsNode
- node fs.Node
- name string
- childrenLock sync.RWMutex
- children map[string]*FsNode
-}
-
-func newFsCache(root fs.Node) *FsCache {
- return &FsCache{
- root: &FsNode{
- node: root,
- },
- }
-}
-
-func (c *FsCache) GetFsNode(path util.FullPath) fs.Node {
-
- c.RLock()
- defer c.RUnlock()
-
- return c.doGetFsNode(path)
-}
-
-func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node {
- t := c.root
- for _, p := range path.Split() {
- t = t.findChild(p)
- if t == nil {
- return nil
- }
- }
- return t.node
-}
-
-func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) {
-
- c.Lock()
- defer c.Unlock()
-
- c.doSetFsNode(path, node)
-}
-
-func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) {
- t := c.root
- for _, p := range path.Split() {
- t = t.ensureChild(p)
- }
- t.node = node
-}
-
-func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node {
-
- c.Lock()
- defer c.Unlock()
-
- t := c.doGetFsNode(path)
- if t != nil {
- return t
- }
- t = genNodeFn()
- c.doSetFsNode(path, t)
- return t
-}
-
-func (c *FsCache) DeleteFsNode(path util.FullPath) {
-
- c.Lock()
- defer c.Unlock()
-
- t := c.root
- for _, p := range path.Split() {
- t = t.findChild(p)
- if t == nil {
- return
- }
- }
- if t.parent != nil {
- t.parent.disconnectChild(t)
- }
- t.deleteSelf()
-}
-
-// oldPath and newPath are full path including the new name
-func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
-
- c.Lock()
- defer c.Unlock()
-
- // find old node
- src := c.root
- for _, p := range oldPath.Split() {
- src = src.findChild(p)
- if src == nil {
- return src
- }
- }
- if src.parent != nil {
- src.parent.disconnectChild(src)
- }
-
- // find new node
- target := c.root
- for _, p := range newPath.Split() {
- target = target.ensureChild(p)
- }
- parent := target.parent
- if dir, ok := src.node.(*Dir); ok {
- dir.name = target.name // target is not Dir, but a shortcut
- }
- if f, ok := src.node.(*File); ok {
- f.Name = target.name
- entry := f.getEntry()
- if entry != nil {
- entry.Name = f.Name
- }
- }
- parent.disconnectChild(target)
-
- target.deleteSelf()
-
- src.name = target.name
- src.connectToParent(parent)
-
- return src
-}
-
-func (n *FsNode) connectToParent(parent *FsNode) {
- n.parent = parent
- oldNode := parent.findChild(n.name)
- if oldNode != nil {
- oldNode.deleteSelf()
- }
- if dir, ok := n.node.(*Dir); ok {
- if parent.node != nil {
- dir.parent = parent.node.(*Dir)
- }
- }
- if f, ok := n.node.(*File); ok {
- if parent.node != nil {
- f.dir = parent.node.(*Dir)
- }
- }
- n.childrenLock.Lock()
- parent.children[n.name] = n
- n.childrenLock.Unlock()
-}
-
-func (n *FsNode) findChild(name string) *FsNode {
- n.childrenLock.RLock()
- defer n.childrenLock.RUnlock()
-
- child, found := n.children[name]
- if found {
- return child
- }
- return nil
-}
-
-func (n *FsNode) ensureChild(name string) *FsNode {
- n.childrenLock.Lock()
- defer n.childrenLock.Unlock()
-
- if n.children == nil {
- n.children = make(map[string]*FsNode)
- }
- child, found := n.children[name]
- if found {
- return child
- }
- t := &FsNode{
- parent: n,
- node: nil,
- name: name,
- children: nil,
- }
- n.children[name] = t
- return t
-}
-
-func (n *FsNode) disconnectChild(child *FsNode) {
- n.childrenLock.Lock()
- delete(n.children, child.name)
- n.childrenLock.Unlock()
- child.parent = nil
-}
-
-func (n *FsNode) deleteSelf() {
- n.childrenLock.Lock()
- for _, child := range n.children {
- child.deleteSelf()
- }
- n.children = nil
- n.childrenLock.Unlock()
-
- n.node = nil
- n.parent = nil
-
-}
diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go
deleted file mode 100644
index 1152eb32e..000000000
--- a/weed/filesys/fscache_test.go
+++ /dev/null
@@ -1,115 +0,0 @@
-package filesys
-
-import (
- "testing"
-
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func TestPathSplit(t *testing.T) {
- parts := util.FullPath("/").Split()
- if len(parts) != 0 {
- t.Errorf("expecting an empty list, but getting %d", len(parts))
- }
-
- parts = util.FullPath("/readme.md").Split()
- if len(parts) != 1 {
- t.Errorf("expecting an empty list, but getting %d", len(parts))
- }
-
-}
-
-func TestFsCache(t *testing.T) {
-
- cache := newFsCache(nil)
-
- x := cache.GetFsNode(util.FullPath("/y/x"))
- if x != nil {
- t.Errorf("wrong node!")
- }
-
- p := util.FullPath("/a/b/c")
- cache.SetFsNode(p, &File{Name: "cc"})
- tNode := cache.GetFsNode(p)
- tFile := tNode.(*File)
- if tFile.Name != "cc" {
- t.Errorf("expecting a FsNode")
- }
-
- cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
- cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
- cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"})
- cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
- cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
-
- b := cache.GetFsNode(util.FullPath("/a/b"))
- if b != nil {
- t.Errorf("unexpected node!")
- }
-
- a := cache.GetFsNode(util.FullPath("/a"))
- if a == nil {
- t.Errorf("missing node!")
- }
-
- cache.DeleteFsNode(util.FullPath("/a"))
- if b != nil {
- t.Errorf("unexpected node!")
- }
-
- a = cache.GetFsNode(util.FullPath("/a"))
- if a != nil {
- t.Errorf("wrong DeleteFsNode!")
- }
-
- z := cache.GetFsNode(util.FullPath("/z"))
- if z == nil {
- t.Errorf("missing node!")
- }
-
- y := cache.GetFsNode(util.FullPath("/x/y"))
- if y != nil {
- t.Errorf("wrong node!")
- }
-
-}
-
-func TestFsCacheMove(t *testing.T) {
-
- cache := newFsCache(nil)
-
- cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
- cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
- cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
- cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
-
- cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x"))
-
- d := cache.GetFsNode(util.FullPath("/z/x/d"))
- if d == nil {
- t.Errorf("unexpected nil node!")
- }
- if d.(*File).Name != "dd" {
- t.Errorf("unexpected non dd node!")
- }
-
-}
-
-func TestFsCacheMove2(t *testing.T) {
-
- cache := newFsCache(nil)
-
- cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
- cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
-
- cache.Move(util.FullPath("/a/b/d"), util.FullPath("/a/b/e"))
-
- d := cache.GetFsNode(util.FullPath("/a/b/e"))
- if d == nil {
- t.Errorf("unexpected nil node!")
- }
- if d.(*File).Name != "e" {
- t.Errorf("unexpected node!")
- }
-
-}
diff --git a/weed/filesys/meta_cache/cache_config.go b/weed/filesys/meta_cache/cache_config.go
deleted file mode 100644
index e6593ebde..000000000
--- a/weed/filesys/meta_cache/cache_config.go
+++ /dev/null
@@ -1,32 +0,0 @@
-package meta_cache
-
-import "github.com/chrislusf/seaweedfs/weed/util"
-
-var (
- _ = util.Configuration(&cacheConfig{})
-)
-
-// implementing util.Configuraion
-type cacheConfig struct {
- dir string
-}
-
-func (c cacheConfig) GetString(key string) string {
- return c.dir
-}
-
-func (c cacheConfig) GetBool(key string) bool {
- panic("implement me")
-}
-
-func (c cacheConfig) GetInt(key string) int {
- panic("implement me")
-}
-
-func (c cacheConfig) GetStringSlice(key string) []string {
- panic("implement me")
-}
-
-func (c cacheConfig) SetDefault(key string, value interface{}) {
- panic("implement me")
-}
diff --git a/weed/filesys/meta_cache/id_mapper.go b/weed/filesys/meta_cache/id_mapper.go
deleted file mode 100644
index 4a2179f31..000000000
--- a/weed/filesys/meta_cache/id_mapper.go
+++ /dev/null
@@ -1,101 +0,0 @@
-package meta_cache
-
-import (
- "fmt"
- "strconv"
- "strings"
-)
-
-type UidGidMapper struct {
- uidMapper *IdMapper
- gidMapper *IdMapper
-}
-
-type IdMapper struct {
- localToFiler map[uint32]uint32
- filerToLocal map[uint32]uint32
-}
-
-// UidGidMapper translates local uid/gid to filer uid/gid
-// The local storage always persists the same as the filer.
-// The local->filer translation happens when updating the filer first and later saving to meta_cache.
-// And filer->local happens when reading from the meta_cache.
-func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) {
- uidMapper, err := newIdMapper(uidPairsStr)
- if err != nil {
- return nil, err
- }
- gidMapper, err := newIdMapper(gidPairStr)
- if err != nil {
- return nil, err
- }
-
- return &UidGidMapper{
- uidMapper: uidMapper,
- gidMapper: gidMapper,
- }, nil
-}
-
-func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) {
- return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid)
-}
-func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) {
- return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid)
-}
-
-func (m *IdMapper) LocalToFiler(id uint32) uint32 {
- value, found := m.localToFiler[id]
- if found {
- return value
- }
- return id
-}
-func (m *IdMapper) FilerToLocal(id uint32) uint32 {
- value, found := m.filerToLocal[id]
- if found {
- return value
- }
- return id
-}
-
-func newIdMapper(pairsStr string) (*IdMapper, error) {
-
- localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr)
- if err != nil {
- return nil, err
- }
-
- return &IdMapper{
- localToFiler: localToFiler,
- filerToLocal: filerToLocal,
- }, nil
-
-}
-
-func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) {
-
- if pairsStr == "" {
- return
- }
-
- localToFiler = make(map[uint32]uint32)
- filerToLocal = make(map[uint32]uint32)
- for _, pairStr := range strings.Split(pairsStr, ",") {
- pair := strings.Split(pairStr, ":")
- localUidStr, filerUidStr := pair[0], pair[1]
- localUid, localUidErr := strconv.Atoi(localUidStr)
- if localUidErr != nil {
- err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr)
- return
- }
- filerUid, filerUidErr := strconv.Atoi(filerUidStr)
- if filerUidErr != nil {
- err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr)
- return
- }
- localToFiler[uint32(localUid)] = uint32(filerUid)
- filerToLocal[uint32(filerUid)] = uint32(localUid)
- }
-
- return
-}
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
deleted file mode 100644
index 69d1655ee..000000000
--- a/weed/filesys/meta_cache/meta_cache.go
+++ /dev/null
@@ -1,146 +0,0 @@
-package meta_cache
-
-import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/util/bounded_tree"
- "os"
-)
-
-// need to have logic similar to FilerStoreWrapper
-// e.g. fill fileId field for chunks
-
-type MetaCache struct {
- localStore filer.VirtualFilerStore
- // sync.RWMutex
- visitedBoundary *bounded_tree.BoundedTree
- uidGidMapper *UidGidMapper
- invalidateFunc func(util.FullPath)
-}
-
-func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper, invalidateFunc func(util.FullPath)) *MetaCache {
- return &MetaCache{
- localStore: openMetaStore(dbFolder),
- visitedBoundary: bounded_tree.NewBoundedTree(baseDir),
- uidGidMapper: uidGidMapper,
- invalidateFunc: func(fullpath util.FullPath) {
- invalidateFunc(fullpath)
- },
- }
-}
-
-func openMetaStore(dbFolder string) filer.VirtualFilerStore {
-
- os.RemoveAll(dbFolder)
- os.MkdirAll(dbFolder, 0755)
-
- store := &leveldb.LevelDBStore{}
- config := &cacheConfig{
- dir: dbFolder,
- }
-
- if err := store.Initialize(config, ""); err != nil {
- glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
- }
-
- return filer.NewFilerStoreWrapper(store)
-
-}
-
-func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
- //mc.Lock()
- //defer mc.Unlock()
- return mc.doInsertEntry(ctx, entry)
-}
-
-func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
- return mc.localStore.InsertEntry(ctx, entry)
-}
-
-func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
- //mc.Lock()
- //defer mc.Unlock()
-
- oldDir, _ := oldPath.DirAndName()
- if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) {
- if oldPath != "" {
- if newEntry != nil && oldPath == newEntry.FullPath {
- // skip the unnecessary deletion
- // leave the update to the following InsertEntry operation
- } else {
- glog.V(3).Infof("DeleteEntry %s", oldPath)
- if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
- return err
- }
- }
- }
- } else {
- // println("unknown old directory:", oldDir)
- }
-
- if newEntry != nil {
- newDir, _ := newEntry.DirAndName()
- if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) {
- glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name())
- if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
- return err
- }
- }
- }
- return nil
-}
-
-func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
- //mc.Lock()
- //defer mc.Unlock()
- return mc.localStore.UpdateEntry(ctx, entry)
-}
-
-func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
- //mc.RLock()
- //defer mc.RUnlock()
- entry, err = mc.localStore.FindEntry(ctx, fp)
- if err != nil {
- return nil, err
- }
- mc.mapIdFromFilerToLocal(entry)
- return
-}
-
-func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
- //mc.Lock()
- //defer mc.Unlock()
- return mc.localStore.DeleteEntry(ctx, fp)
-}
-
-func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
- //mc.RLock()
- //defer mc.RUnlock()
-
- if !mc.visitedBoundary.HasVisited(dirPath) {
- return fmt.Errorf("unsynchronized dir: %v", dirPath)
- }
-
- _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
- mc.mapIdFromFilerToLocal(entry)
- return eachEntryFunc(entry)
- })
- if err != nil {
- return err
- }
- return err
-}
-
-func (mc *MetaCache) Shutdown() {
- //mc.Lock()
- //defer mc.Unlock()
- mc.localStore.Shutdown()
-}
-
-func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) {
- entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid)
-}
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
deleted file mode 100644
index 07098bf6b..000000000
--- a/weed/filesys/meta_cache/meta_cache_init.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package meta_cache
-
-import (
- "context"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
-
- return mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {
-
- glog.V(4).Infof("ReadDirAllEntries %s ...", path)
-
- util.Retry("ReadDirAllEntries", func() error {
- err = filer_pb.ReadDirAllEntries(client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
- entry := filer.FromPbEntry(string(path), pbEntry)
- if IsHiddenSystemEntry(string(path), entry.Name()) {
- return nil
- }
- if err := mc.doInsertEntry(context.Background(), entry); err != nil {
- glog.V(0).Infof("read %s: %v", entry.FullPath, err)
- return err
- }
- if entry.IsDirectory() {
- childDirectories = append(childDirectories, entry.Name())
- }
- return nil
- })
- return err
- })
-
- if err != nil {
- err = fmt.Errorf("list %s: %v", path, err)
- }
-
- return
- })
-}
-
-func IsHiddenSystemEntry(dir, name string) bool {
- return dir == "/" && (name == "topics" || name == "etc")
-}
diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go
deleted file mode 100644
index 2099cf1f8..000000000
--- a/weed/filesys/meta_cache/meta_cache_subscribe.go
+++ /dev/null
@@ -1,68 +0,0 @@
-package meta_cache
-
-import (
- "context"
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
-
- processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
- message := resp.EventNotification
-
- for _, sig := range message.Signatures {
- if sig == selfSignature && selfSignature != 0 {
- return nil
- }
- }
-
- dir := resp.Directory
- var oldPath util.FullPath
- var newEntry *filer.Entry
- if message.OldEntry != nil {
- oldPath = util.NewFullPath(dir, message.OldEntry.Name)
- glog.V(4).Infof("deleting %v", oldPath)
- }
-
- if message.NewEntry != nil {
- if message.NewParentPath != "" {
- dir = message.NewParentPath
- }
- key := util.NewFullPath(dir, message.NewEntry.Name)
- glog.V(4).Infof("creating %v", key)
- newEntry = filer.FromPbEntry(dir, message.NewEntry)
- }
- err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
- if err == nil {
- if message.OldEntry != nil && message.NewEntry != nil {
- oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
- mc.invalidateFunc(oldKey)
- if message.OldEntry.Name != message.NewEntry.Name {
- newKey := util.NewFullPath(dir, message.NewEntry.Name)
- mc.invalidateFunc(newKey)
- }
- } else if message.OldEntry == nil && message.NewEntry != nil {
- // no need to invaalidate
- } else if message.OldEntry != nil && message.NewEntry == nil {
- oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
- mc.invalidateFunc(oldKey)
- }
- }
-
- return err
-
- }
-
- util.RetryForever("followMetaUpdates", func() error {
- return pb.WithFilerClientFollowMetadata(client, "mount", dir, lastTsNs, selfSignature, processEventFn, true)
- }, func(err error) bool {
- glog.Errorf("follow metadata updates: %v", err)
- return true
- })
-
- return nil
-}
diff --git a/weed/filesys/unimplemented.go b/weed/filesys/unimplemented.go
deleted file mode 100644
index 5c2dcf0e1..000000000
--- a/weed/filesys/unimplemented.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package filesys
-
-import (
- "context"
-
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
-)
-
-// https://github.com/bazil/fuse/issues/130
-
-var _ = fs.NodeAccesser(&Dir{})
-
-func (dir *Dir) Access(ctx context.Context, req *fuse.AccessRequest) error {
- return fuse.ENOSYS
-}
-
-var _ = fs.NodeAccesser(&File{})
-
-func (file *File) Access(ctx context.Context, req *fuse.AccessRequest) error {
- return fuse.ENOSYS
-}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
deleted file mode 100644
index 84d4bdfa2..000000000
--- a/weed/filesys/wfs.go
+++ /dev/null
@@ -1,303 +0,0 @@
-package filesys
-
-import (
- "context"
- "fmt"
- "math"
- "math/rand"
- "os"
- "path"
- "path/filepath"
- "sync"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/util/grace"
-
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
-
- "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
-)
-
-type Option struct {
- MountDirectory string
- FilerAddresses []string
- filerIndex int
- FilerGrpcAddresses []string
- GrpcDialOption grpc.DialOption
- FilerMountRootPath string
- Collection string
- Replication string
- TtlSec int32
- DiskType types.DiskType
- ChunkSizeLimit int64
- ConcurrentWriters int
- CacheDir string
- CacheSizeMB int64
- DataCenter string
- Umask os.FileMode
-
- MountUid uint32
- MountGid uint32
- MountMode os.FileMode
- MountCtime time.Time
- MountMtime time.Time
- MountParentInode uint64
-
- VolumeServerAccess string // how to access volume servers
- Cipher bool // whether encrypt data on volume server
- UidGidMapper *meta_cache.UidGidMapper
-
- uniqueCacheDir string
- uniqueCacheTempPageDir string
-}
-
-var _ = fs.FS(&WFS{})
-var _ = fs.FSStatfser(&WFS{})
-
-type WFS struct {
- option *Option
-
- // contains all open handles, protected by handlesLock
- handlesLock sync.Mutex
- handles map[uint64]*FileHandle
-
- bufPool sync.Pool
-
- stats statsCache
-
- root fs.Node
- fsNodeCache *FsCache
-
- chunkCache *chunk_cache.TieredChunkCache
- metaCache *meta_cache.MetaCache
- signature int32
-
- // throttle writers
- concurrentWriters *util.LimitedConcurrentExecutor
- Server *fs.Server
-}
-type statsCache struct {
- filer_pb.StatisticsResponse
- lastChecked int64 // unix time in seconds
-}
-
-func NewSeaweedFileSystem(option *Option) *WFS {
- wfs := &WFS{
- option: option,
- handles: make(map[uint64]*FileHandle),
- bufPool: sync.Pool{
- New: func() interface{} {
- return make([]byte, option.ChunkSizeLimit)
- },
- },
- signature: util.RandomInt32(),
- }
- wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
- wfs.option.setupUniqueCacheDirectory()
- if option.CacheSizeMB > 0 {
- wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
- }
-
- wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
-
- fsNode := NodeWithId(filePath.AsInode())
- if err := wfs.Server.InvalidateNodeData(fsNode); err != nil {
- glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
- }
-
- dir, name := filePath.DirAndName()
- parent := NodeWithId(util.FullPath(dir).AsInode())
- if dir == option.FilerMountRootPath {
- parent = NodeWithId(1)
- }
- if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
- glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
- }
- })
- grace.OnInterrupt(func() {
- wfs.metaCache.Shutdown()
- })
-
- wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, id: 1}
- wfs.fsNodeCache = newFsCache(wfs.root)
-
- if wfs.option.ConcurrentWriters > 0 {
- wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
- }
-
- return wfs
-}
-
-func (wfs *WFS) StartBackgroundTasks() {
- startTime := time.Now()
- go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
-}
-
-func (wfs *WFS) Root() (fs.Node, error) {
- return wfs.root, nil
-}
-
-func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (fileHandle *FileHandle) {
-
- fullpath := file.fullpath()
- glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
-
- inodeId := file.Id()
-
- wfs.handlesLock.Lock()
- existingHandle, found := wfs.handles[inodeId]
- if found && existingHandle != nil && existingHandle.f.isOpen > 0 {
- existingHandle.f.isOpen++
- wfs.handlesLock.Unlock()
- existingHandle.dirtyPages.SetWriteOnly(writeOnly)
- glog.V(4).Infof("Reuse AcquiredHandle %s open %d", fullpath, existingHandle.f.isOpen)
- return existingHandle
- }
- wfs.handlesLock.Unlock()
-
- entry, _ := file.maybeLoadEntry(context.Background())
- file.entry = entry
- fileHandle = newFileHandle(file, uid, gid, writeOnly)
-
- wfs.handlesLock.Lock()
- file.isOpen++
- wfs.handles[inodeId] = fileHandle
- wfs.handlesLock.Unlock()
- fileHandle.handle = inodeId
-
- glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen)
- return
-}
-
-func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
- wfs.handlesLock.Lock()
- defer wfs.handlesLock.Unlock()
-
- glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles))
-
- delete(wfs.handles, uint64(handleId))
-
- return
-}
-
-// Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
-func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
-
- glog.V(4).Infof("reading fs stats: %+v", req)
-
- if wfs.stats.lastChecked < time.Now().Unix()-20 {
-
- err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.StatisticsRequest{
- Collection: wfs.option.Collection,
- Replication: wfs.option.Replication,
- Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
- DiskType: string(wfs.option.DiskType),
- }
-
- glog.V(4).Infof("reading filer stats: %+v", request)
- resp, err := client.Statistics(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("reading filer stats %v: %v", request, err)
- return err
- }
- glog.V(4).Infof("read filer stats: %+v", resp)
-
- wfs.stats.TotalSize = resp.TotalSize
- wfs.stats.UsedSize = resp.UsedSize
- wfs.stats.FileCount = resp.FileCount
- wfs.stats.lastChecked = time.Now().Unix()
-
- return nil
- })
- if err != nil {
- glog.V(0).Infof("filer Statistics: %v", err)
- return err
- }
- }
-
- totalDiskSize := wfs.stats.TotalSize
- usedDiskSize := wfs.stats.UsedSize
- actualFileCount := wfs.stats.FileCount
-
- // Compute the total number of available blocks
- resp.Blocks = totalDiskSize / blockSize
-
- // Compute the number of used blocks
- numBlocks := uint64(usedDiskSize / blockSize)
-
- // Report the number of free and available blocks for the block size
- resp.Bfree = resp.Blocks - numBlocks
- resp.Bavail = resp.Blocks - numBlocks
- resp.Bsize = uint32(blockSize)
-
- // Report the total number of possible files in the file system (and those free)
- resp.Files = math.MaxInt64
- resp.Ffree = math.MaxInt64 - actualFileCount
-
- // Report the maximum length of a name and the minimum fragment size
- resp.Namelen = 1024
- resp.Frsize = uint32(blockSize)
-
- return nil
-}
-
-func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
- if entry.Attributes == nil {
- return
- }
- entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
-}
-func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
- if entry.Attributes == nil {
- return
- }
- entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
-}
-
-func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
- if wfs.option.VolumeServerAccess == "filerProxy" {
- return func(fileId string) (targetUrls []string, err error) {
- return []string{"http://" + wfs.getCurrentFiler() + "/?proxyChunkId=" + fileId}, nil
- }
- }
- return filer.LookupFn(wfs)
-}
-func (wfs *WFS) getCurrentFiler() string {
- return wfs.option.FilerAddresses[wfs.option.filerIndex]
-}
-
-func (option *Option) setupUniqueCacheDirectory() {
- cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddresses[0] + option.FilerMountRootPath + util.Version()))[0:8]
- option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
- option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "sw")
- os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
-}
-
-func (option *Option) getTempFilePageDir() string {
- return option.uniqueCacheTempPageDir
-}
-func (option *Option) getUniqueCacheDir() string {
- return option.uniqueCacheDir
-}
-
-type NodeWithId uint64
-
-func (n NodeWithId) Id() uint64 {
- return uint64(n)
-}
-func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error {
- return nil
-}
diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go
deleted file mode 100644
index 95ebdb9b8..000000000
--- a/weed/filesys/wfs_filer_client.go
+++ /dev/null
@@ -1,51 +0,0 @@
-package filesys
-
-import (
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-var _ = filer_pb.FilerClient(&WFS{})
-
-func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
-
- return util.Retry("filer grpc", func() error {
-
- i := wfs.option.filerIndex
- n := len(wfs.option.FilerGrpcAddresses)
- for x := 0; x < n; x++ {
-
- filerGrpcAddress := wfs.option.FilerGrpcAddresses[i]
- err = pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
- }, filerGrpcAddress, wfs.option.GrpcDialOption)
-
- if err != nil {
- glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err)
- } else {
- wfs.option.filerIndex = i
- return nil
- }
-
- i++
- if i >= n {
- i = 0
- }
-
- }
- return err
- })
-
-}
-
-func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
- if wfs.option.VolumeServerAccess == "publicUrl" {
- return location.PublicUrl
- }
- return location.Url
-}
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
deleted file mode 100644
index 42c13cfd0..000000000
--- a/weed/filesys/wfs_write.go
+++ /dev/null
@@ -1,78 +0,0 @@
-package filesys
-
-import (
- "context"
- "fmt"
- "io"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.SaveDataAsChunkFunctionType {
-
- return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
- var fileId, host string
- var auth security.EncodedJwt
-
- if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- return util.Retry("assignVolume", func() error {
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: wfs.option.Replication,
- Collection: wfs.option.Collection,
- TtlSec: wfs.option.TtlSec,
- DiskType: string(wfs.option.DiskType),
- DataCenter: wfs.option.DataCenter,
- Path: string(fullPath),
- }
-
- resp, err := client.AssignVolume(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
-
- fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
- loc := &filer_pb.Location{
- Url: resp.Url,
- PublicUrl: resp.PublicUrl,
- }
- host = wfs.AdjustedUrl(loc)
- collection, replication = resp.Collection, resp.Replication
-
- return nil
- })
- }); err != nil {
- return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
- }
-
- fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- if wfs.option.VolumeServerAccess == "filerProxy" {
- fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.getCurrentFiler(), fileId)
- }
- uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
- if err != nil {
- glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
- return nil, "", "", fmt.Errorf("upload data: %v", err)
- }
- if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
- return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error)
- }
-
- if !writeOnly {
- wfs.chunkCache.SetChunk(fileId, data)
- }
-
- chunk = uploadResult.ToPbFileChunk(fileId, offset)
- return chunk, collection, replication, nil
- }
-}
diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go
deleted file mode 100644
index 4b2ee0064..000000000
--- a/weed/filesys/xattr.go
+++ /dev/null
@@ -1,138 +0,0 @@
-package filesys
-
-import (
- "context"
-
- "github.com/seaweedfs/fuse"
-
- "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
-
- if entry == nil {
- return fuse.ErrNoXattr
- }
- if entry.Extended == nil {
- return fuse.ErrNoXattr
- }
- data, found := entry.Extended[req.Name]
- if !found {
- return fuse.ErrNoXattr
- }
- if req.Position < uint32(len(data)) {
- size := req.Size
- if req.Position+size >= uint32(len(data)) {
- size = uint32(len(data)) - req.Position
- }
- if size == 0 {
- resp.Xattr = data[req.Position:]
- } else {
- resp.Xattr = data[req.Position : req.Position+size]
- }
- }
-
- return nil
-
-}
-
-func setxattr(entry *filer_pb.Entry, req *fuse.SetxattrRequest) error {
-
- if entry == nil {
- return fuse.EIO
- }
-
- if entry.Extended == nil {
- entry.Extended = make(map[string][]byte)
- }
- data, _ := entry.Extended[req.Name]
-
- newData := make([]byte, int(req.Position)+len(req.Xattr))
-
- copy(newData, data)
-
- copy(newData[int(req.Position):], req.Xattr)
-
- entry.Extended[req.Name] = newData
-
- return nil
-
-}
-
-func removexattr(entry *filer_pb.Entry, req *fuse.RemovexattrRequest) error {
-
- if entry == nil {
- return fuse.ErrNoXattr
- }
-
- if entry.Extended == nil {
- return fuse.ErrNoXattr
- }
-
- _, found := entry.Extended[req.Name]
-
- if !found {
- return fuse.ErrNoXattr
- }
-
- delete(entry.Extended, req.Name)
-
- return nil
-
-}
-
-func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
-
- if entry == nil {
- return fuse.EIO
- }
-
- for k := range entry.Extended {
- resp.Append(k)
- }
-
- size := req.Size
- if req.Position+size >= uint32(len(resp.Xattr)) {
- size = uint32(len(resp.Xattr)) - req.Position
- }
-
- if size == 0 {
- resp.Xattr = resp.Xattr[req.Position:]
- } else {
- resp.Xattr = resp.Xattr[req.Position : req.Position+size]
- }
-
- return nil
-
-}
-
-func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) {
-
- fullpath := util.NewFullPath(dir, name)
- // glog.V(3).Infof("read entry cache miss %s", fullpath)
-
- // return a valid entry for the mount root
- if string(fullpath) == wfs.option.FilerMountRootPath {
- return &filer_pb.Entry{
- Name: name,
- IsDirectory: true,
- Attributes: &filer_pb.FuseAttributes{
- Mtime: wfs.option.MountMtime.Unix(),
- FileMode: uint32(wfs.option.MountMode),
- Uid: wfs.option.MountUid,
- Gid: wfs.option.MountGid,
- Crtime: wfs.option.MountCtime.Unix(),
- },
- }, nil
- }
-
- // read from async meta cache
- meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
- cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
- if cacheErr == filer_pb.ErrNotFound {
- return nil, fuse.ENOENT
- }
- return cachedEntry.ToProtoEntry(), cacheErr
-}