aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
committerbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
commitd861cbd81b75b6684c971ac00e33685e6575b833 (patch)
tree301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/filesys
parent70da715d8d917527291b35fb069fac077d17b868 (diff)
parent4ee58922eff61a5a4ca29c0b4829b097a498549e (diff)
downloadseaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz
seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir.go551
-rw-r--r--weed/filesys/dir_link.go114
-rw-r--r--weed/filesys/dir_rename.go70
-rw-r--r--weed/filesys/dirty_page.go225
-rw-r--r--weed/filesys/dirty_page_interval.go223
-rw-r--r--weed/filesys/dirty_page_interval_test.go113
-rw-r--r--weed/filesys/file.go279
-rw-r--r--weed/filesys/filehandle.go297
-rw-r--r--weed/filesys/fscache.go213
-rw-r--r--weed/filesys/fscache_test.go115
-rw-r--r--weed/filesys/meta_cache/cache_config.go32
-rw-r--r--weed/filesys/meta_cache/id_mapper.go101
-rw-r--r--weed/filesys/meta_cache/meta_cache.go152
-rw-r--r--weed/filesys/meta_cache/meta_cache_init.go47
-rw-r--r--weed/filesys/meta_cache/meta_cache_subscribe.go86
-rw-r--r--weed/filesys/unimplemented.go22
-rw-r--r--weed/filesys/wfs.go186
-rw-r--r--weed/filesys/wfs_deletion.go69
-rw-r--r--weed/filesys/wfs_filer_client.go34
-rw-r--r--weed/filesys/wfs_write.go75
-rw-r--r--weed/filesys/xattr.go47
21 files changed, 2374 insertions, 677 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 7b24a1ec5..6ee20974b 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -1,27 +1,39 @@
package filesys
import (
+ "bytes"
"context"
+ "math"
"os"
- "path"
+ "strings"
+ "syscall"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type Dir struct {
- Path string
- wfs *WFS
- entry *filer_pb.Entry
+ name string
+ wfs *WFS
+ entry *filer_pb.Entry
+ parent *Dir
+ id uint64
}
var _ = fs.Node(&Dir{})
+
+//var _ = fs.NodeIdentifier(&Dir{})
var _ = fs.NodeCreater(&Dir{})
+var _ = fs.NodeMknoder(&Dir{})
var _ = fs.NodeMkdirer(&Dir{})
+var _ = fs.NodeFsyncer(&Dir{})
var _ = fs.NodeRequestLookuper(&Dir{})
var _ = fs.HandleReadDirAller(&Dir{})
var _ = fs.NodeRemover(&Dir{})
@@ -31,44 +43,57 @@ var _ = fs.NodeGetxattrer(&Dir{})
var _ = fs.NodeSetxattrer(&Dir{})
var _ = fs.NodeRemovexattrer(&Dir{})
var _ = fs.NodeListxattrer(&Dir{})
+var _ = fs.NodeForgetter(&Dir{})
-func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
+func (dir *Dir) xId() uint64 {
+ return dir.id
+}
- glog.V(3).Infof("dir Attr %s", dir.Path)
+func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
// https://github.com/bazil/fuse/issues/196
attr.Valid = time.Second
- if dir.Path == dir.wfs.option.FilerMountRootPath {
+ if dir.FullPath() == dir.wfs.option.FilerMountRootPath {
dir.setRootDirAttributes(attr)
+ glog.V(3).Infof("root dir Attr %s, attr: %+v", dir.FullPath(), attr)
return nil
}
- if err := dir.maybeLoadEntry(ctx); err != nil {
+ entry, err := dir.maybeLoadEntry()
+ if err != nil {
+ glog.V(3).Infof("dir Attr %s,err: %+v", dir.FullPath(), err)
return err
}
- attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir
- attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0)
- attr.Ctime = time.Unix(dir.entry.Attributes.Crtime, 0)
- attr.Gid = dir.entry.Attributes.Gid
- attr.Uid = dir.entry.Attributes.Uid
+ // attr.Inode = dir.Id()
+ attr.Mode = os.FileMode(entry.Attributes.FileMode) | os.ModeDir
+ attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
+ attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
+ attr.Gid = entry.Attributes.Gid
+ attr.Uid = entry.Attributes.Uid
+
+ glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr)
return nil
}
func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
- glog.V(4).Infof("dir Getxattr %s", dir.Path)
+ glog.V(4).Infof("dir Getxattr %s", dir.FullPath())
- if err := dir.maybeLoadEntry(ctx); err != nil {
+ entry, err := dir.maybeLoadEntry()
+ if err != nil {
return err
}
- return getxattr(dir.entry, req, resp)
+ return getxattr(entry, req, resp)
}
func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
+ // attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode()
+ attr.Valid = time.Second
+ attr.Inode = 1 // dir.Id()
attr.Uid = dir.wfs.option.MountUid
attr.Gid = dir.wfs.option.MountGid
attr.Mode = dir.wfs.option.MountMode
@@ -76,84 +101,178 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
attr.Ctime = dir.wfs.option.MountCtime
attr.Mtime = dir.wfs.option.MountMtime
attr.Atime = dir.wfs.option.MountMtime
+ attr.BlockSize = blockSize
+}
+
+func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
+ // fsync works at OS level
+ // write the file chunks to the filerGrpcAddress
+ glog.V(3).Infof("dir %s fsync %+v", dir.FullPath(), req)
+
+ return nil
}
-func (dir *Dir) newFile(name string, entry *filer_pb.Entry) *File {
+func (dir *Dir) newFile(name string) fs.Node {
+
+ fileFullPath := util.NewFullPath(dir.FullPath(), name)
+ fileId := fileFullPath.AsInode()
+ dir.wfs.handlesLock.Lock()
+ existingHandle, found := dir.wfs.handles[fileId]
+ dir.wfs.handlesLock.Unlock()
+
+ if found {
+ glog.V(4).Infof("newFile found opened file handle: %+v", fileFullPath)
+ return existingHandle.f
+ }
return &File{
- Name: name,
- dir: dir,
- wfs: dir.wfs,
- entry: entry,
- entryViewCache: nil,
+ Name: name,
+ dir: dir,
+ wfs: dir.wfs,
+ id: fileId,
}
}
+func (dir *Dir) newDirectory(fullpath util.FullPath) fs.Node {
+
+ return &Dir{name: fullpath.Name(), wfs: dir.wfs, parent: dir, id: fullpath.AsInode()}
+
+}
+
func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
+ exclusive := req.Flags&fuse.OpenExclusive != 0
+ isDirectory := req.Mode&os.ModeDir > 0
+
+ if exclusive || isDirectory {
+ _, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, exclusive)
+ if err != nil {
+ return nil, nil, err
+ }
+ }
+ var node fs.Node
+ if isDirectory {
+ node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name))
+ return node, nil, nil
+ }
+
+ node = dir.newFile(req.Name)
+ file := node.(*File)
+ file.entry = &filer_pb.Entry{
+ Name: req.Name,
+ IsDirectory: req.Mode&os.ModeDir > 0,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(req.Mode &^ dir.wfs.option.Umask),
+ Uid: req.Uid,
+ Gid: req.Gid,
+ Collection: dir.wfs.option.Collection,
+ Replication: dir.wfs.option.Replication,
+ TtlSec: dir.wfs.option.TtlSec,
+ },
+ }
+ file.dirtyMetadata = true
+ fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
+ return file, fh, nil
+
+}
+
+func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) {
+
+ _, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, false)
+
+ if err != nil {
+ return nil, err
+ }
+ var node fs.Node
+ node = dir.newFile(req.Name)
+ return node, nil
+}
+
+func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, exclusive bool) (*filer_pb.CreateEntryRequest, error) {
+ dirFullPath := dir.FullPath()
request := &filer_pb.CreateEntryRequest{
- Directory: dir.Path,
+ Directory: dirFullPath,
Entry: &filer_pb.Entry{
- Name: req.Name,
- IsDirectory: req.Mode&os.ModeDir > 0,
+ Name: name,
+ IsDirectory: mode&os.ModeDir > 0,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
- FileMode: uint32(req.Mode &^ dir.wfs.option.Umask),
- Uid: req.Uid,
- Gid: req.Gid,
+ FileMode: uint32(mode &^ dir.wfs.option.Umask),
+ Uid: uid,
+ Gid: gid,
Collection: dir.wfs.option.Collection,
Replication: dir.wfs.option.Replication,
TtlSec: dir.wfs.option.TtlSec,
},
},
+ OExcl: exclusive,
+ Signatures: []int32{dir.wfs.signature},
}
- glog.V(1).Infof("create: %v", request)
+ glog.V(1).Infof("create %s/%s", dirFullPath, name)
- if request.Entry.IsDirectory {
- if err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- if _, err := client.CreateEntry(ctx, request); err != nil {
- glog.V(0).Infof("create %s/%s: %v", dir.Path, req.Name, err)
- return fuse.EIO
+ err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
+ defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
+
+ if err := filer_pb.CreateEntry(client, request); err != nil {
+ if strings.Contains(err.Error(), "EEXIST") {
+ return fuse.EEXIST
}
- return nil
- }); err != nil {
- return nil, nil, err
+ glog.V(0).Infof("create %s/%s: %v", dirFullPath, name, err)
+ return fuse.EIO
}
- }
- file := dir.newFile(req.Name, request.Entry)
- if !request.Entry.IsDirectory {
- file.isOpen = true
- }
- fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
- fh.dirtyMetadata = true
- return file, fh, nil
+ if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
+ glog.Errorf("local InsertEntry dir %s/%s: %v", dirFullPath, name, err)
+ return fuse.EIO
+ }
+ return nil
+ })
+ return request, err
}
func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
- err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ glog.V(4).Infof("mkdir %s: %s", dir.FullPath(), req.Name)
+
+ newEntry := &filer_pb.Entry{
+ Name: req.Name,
+ IsDirectory: true,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(req.Mode &^ dir.wfs.option.Umask),
+ Uid: req.Uid,
+ Gid: req.Gid,
+ },
+ }
+
+ dirFullPath := dir.FullPath()
+
+ err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ dir.wfs.mapPbIdFromLocalToFiler(newEntry)
+ defer dir.wfs.mapPbIdFromFilerToLocal(newEntry)
request := &filer_pb.CreateEntryRequest{
- Directory: dir.Path,
- Entry: &filer_pb.Entry{
- Name: req.Name,
- IsDirectory: true,
- Attributes: &filer_pb.FuseAttributes{
- Mtime: time.Now().Unix(),
- Crtime: time.Now().Unix(),
- FileMode: uint32(req.Mode &^ dir.wfs.option.Umask),
- Uid: req.Uid,
- Gid: req.Gid,
- },
- },
+ Directory: dirFullPath,
+ Entry: newEntry,
+ Signatures: []int32{dir.wfs.signature},
}
glog.V(1).Infof("mkdir: %v", request)
- if _, err := client.CreateEntry(ctx, request); err != nil {
- glog.V(0).Infof("mkdir %s/%s: %v", dir.Path, req.Name, err)
+ if err := filer_pb.CreateEntry(client, request); err != nil {
+ glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err)
+ return err
+ }
+
+ if err := dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
+ glog.Errorf("local mkdir dir %s/%s: %v", dirFullPath, req.Name, err)
return fuse.EIO
}
@@ -161,221 +280,258 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
})
if err == nil {
- node := &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs}
+ node := dir.newDirectory(util.NewFullPath(dirFullPath, req.Name))
+
return node, nil
}
- return nil, err
+ glog.V(0).Infof("mkdir %s/%s: %v", dirFullPath, req.Name, err)
+
+ return nil, fuse.EIO
}
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
- glog.V(4).Infof("dir Lookup %s: %s", dir.Path, req.Name)
+ dirPath := util.FullPath(dir.FullPath())
+ glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String())
- var entry *filer_pb.Entry
- fullFilePath := path.Join(dir.Path, req.Name)
-
- item := dir.wfs.listDirectoryEntriesCache.Get(fullFilePath)
- if item != nil && !item.Expired() {
- entry = item.Value().(*filer_pb.Entry)
+ fullFilePath := dirPath.Child(req.Name)
+ visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
+ if visitErr != nil {
+ glog.Errorf("dir Lookup %s: %v", dirPath, visitErr)
+ return nil, fuse.EIO
+ }
+ localEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
+ if cacheErr == filer_pb.ErrNotFound {
+ return nil, fuse.ENOENT
}
- if entry == nil {
- glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
- entry, err = filer2.GetEntry(ctx, dir.wfs, fullFilePath)
+ if localEntry == nil {
+ // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
+ entry, err := filer_pb.GetEntry(dir.wfs, fullFilePath)
if err != nil {
- return nil, err
- }
- if entry != nil {
- dir.wfs.listDirectoryEntriesCache.Set(fullFilePath, entry, 5*time.Minute)
+ glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err)
+ return nil, fuse.ENOENT
}
+ localEntry = filer.FromPbEntry(string(dirPath), entry)
} else {
glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath)
}
- if entry != nil {
- if entry.IsDirectory {
- node = &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs, entry: entry}
+ if localEntry != nil {
+ if localEntry.IsDirectory() {
+ node = dir.newDirectory(fullFilePath)
} else {
- node = dir.newFile(req.Name, entry)
+ node = dir.newFile(req.Name)
}
- resp.EntryValid = time.Duration(0)
- resp.Attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
- resp.Attr.Ctime = time.Unix(entry.Attributes.Crtime, 0)
- resp.Attr.Mode = os.FileMode(entry.Attributes.FileMode)
- resp.Attr.Gid = entry.Attributes.Gid
- resp.Attr.Uid = entry.Attributes.Uid
+ // resp.EntryValid = time.Second
+ resp.Attr.Inode = fullFilePath.AsInode()
+ resp.Attr.Valid = time.Second
+ resp.Attr.Mtime = localEntry.Attr.Mtime
+ resp.Attr.Crtime = localEntry.Attr.Crtime
+ resp.Attr.Mode = localEntry.Attr.Mode
+ resp.Attr.Gid = localEntry.Attr.Gid
+ resp.Attr.Uid = localEntry.Attr.Uid
+ if localEntry.HardLinkCounter > 0 {
+ resp.Attr.Nlink = uint32(localEntry.HardLinkCounter)
+ }
return node, nil
}
+ glog.V(4).Infof("not found dir GetEntry %s: %v", fullFilePath, err)
return nil, fuse.ENOENT
}
func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
- glog.V(3).Infof("dir ReadDirAll %s", dir.Path)
+ dirPath := util.FullPath(dir.FullPath())
+ glog.V(4).Infof("dir ReadDirAll %s", dirPath)
- cacheTtl := 5 * time.Minute
-
- readErr := filer2.ReadDirAllEntries(ctx, dir.wfs, dir.Path, "", func(entry *filer_pb.Entry, isLast bool) {
- if entry.IsDirectory {
- dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir}
+ processEachEntryFn := func(entry *filer.Entry, isLast bool) {
+ if entry.IsDirectory() {
+ dirent := fuse.Dirent{Name: entry.Name(), Type: fuse.DT_Dir, Inode: dirPath.Child(entry.Name()).AsInode()}
ret = append(ret, dirent)
} else {
- dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File}
+ dirent := fuse.Dirent{Name: entry.Name(), Type: findFileType(uint16(entry.Attr.Mode)), Inode: dirPath.Child(entry.Name()).AsInode()}
ret = append(ret, dirent)
}
- dir.wfs.listDirectoryEntriesCache.Set(path.Join(dir.Path, entry.Name), entry, cacheTtl)
+ }
+
+ if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
+ glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
+ return nil, fuse.EIO
+ }
+ listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
+ processEachEntryFn(entry, false)
+ return true
})
- if readErr != nil {
- glog.V(0).Infof("list %s: %v", dir.Path, err)
- return ret, fuse.EIO
+ if listErr != nil {
+ glog.Errorf("list meta cache: %v", listErr)
+ return nil, fuse.EIO
}
+ return
+}
- return ret, err
+func findFileType(mode uint16) fuse.DirentType {
+ switch mode & (syscall.S_IFMT & 0xffff) {
+ case syscall.S_IFSOCK:
+ return fuse.DT_Socket
+ case syscall.S_IFLNK:
+ return fuse.DT_Link
+ case syscall.S_IFREG:
+ return fuse.DT_File
+ case syscall.S_IFBLK:
+ return fuse.DT_Block
+ case syscall.S_IFDIR:
+ return fuse.DT_Dir
+ case syscall.S_IFCHR:
+ return fuse.DT_Char
+ case syscall.S_IFIFO:
+ return fuse.DT_FIFO
+ }
+ return fuse.DT_File
}
func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
if !req.Dir {
- return dir.removeOneFile(ctx, req)
+ return dir.removeOneFile(req)
}
- return dir.removeFolder(ctx, req)
+ return dir.removeFolder(req)
}
-func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) error {
+func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
- entry, err := filer2.GetEntry(ctx, dir.wfs, path.Join(dir.Path, req.Name))
+ dirFullPath := dir.FullPath()
+ filePath := util.NewFullPath(dirFullPath, req.Name)
+ entry, err := filer_pb.GetEntry(dir.wfs, filePath)
if err != nil {
return err
}
- dir.wfs.deleteFileChunks(ctx, entry.Chunks)
-
- dir.wfs.listDirectoryEntriesCache.Delete(path.Join(dir.Path, req.Name))
-
- return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ // first, ensure the filer store can correctly delete
+ glog.V(3).Infof("remove file: %v", req)
+ isDeleteData := entry != nil && entry.HardLinkCounter <= 1
+ err = filer_pb.Remove(dir.wfs, dirFullPath, req.Name, isDeleteData, false, false, false, []int32{dir.wfs.signature})
+ if err != nil {
+ glog.V(3).Infof("not found remove file %s: %v", filePath, err)
+ return fuse.ENOENT
+ }
- request := &filer_pb.DeleteEntryRequest{
- Directory: dir.Path,
- Name: req.Name,
- IsDeleteData: false,
- }
+ // then, delete meta cache and fsNode cache
+ if err = dir.wfs.metaCache.DeleteEntry(context.Background(), filePath); err != nil {
+ glog.V(3).Infof("local DeleteEntry %s: %v", filePath, err)
+ return fuse.ESTALE
+ }
- glog.V(3).Infof("remove file: %v", request)
- _, err := client.DeleteEntry(ctx, request)
- if err != nil {
- glog.V(3).Infof("remove file %s/%s: %v", dir.Path, req.Name, err)
- return fuse.ENOENT
- }
+ // remove current file handle if any
+ dir.wfs.handlesLock.Lock()
+ defer dir.wfs.handlesLock.Unlock()
+ inodeId := filePath.AsInode()
+ delete(dir.wfs.handles, inodeId)
- return nil
- })
+ return nil
}
-func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error {
-
- dir.wfs.listDirectoryEntriesCache.Delete(path.Join(dir.Path, req.Name))
+func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
- return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.DeleteEntryRequest{
- Directory: dir.Path,
- Name: req.Name,
- IsDeleteData: true,
+ dirFullPath := dir.FullPath()
+ glog.V(3).Infof("remove directory entry: %v", req)
+ ignoreRecursiveErr := true // ignore recursion error since the OS should manage it
+ err := filer_pb.Remove(dir.wfs, dirFullPath, req.Name, true, true, ignoreRecursiveErr, false, []int32{dir.wfs.signature})
+ if err != nil {
+ glog.V(0).Infof("remove %s/%s: %v", dirFullPath, req.Name, err)
+ if strings.Contains(err.Error(), "non-empty") {
+ return fuse.EEXIST
}
+ return fuse.ENOENT
+ }
- glog.V(3).Infof("remove directory entry: %v", request)
- _, err := client.DeleteEntry(ctx, request)
- if err != nil {
- glog.V(3).Infof("remove %s/%s: %v", dir.Path, req.Name, err)
- return fuse.ENOENT
- }
+ t := util.NewFullPath(dirFullPath, req.Name)
+ dir.wfs.metaCache.DeleteEntry(context.Background(), t)
- return nil
- })
+ return nil
}
func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
- if err := dir.maybeLoadEntry(ctx); err != nil {
+ glog.V(4).Infof("%v dir setattr %+v", dir.FullPath(), req)
+
+ entry, err := dir.maybeLoadEntry()
+ if err != nil {
return err
}
- glog.V(3).Infof("%v dir setattr %+v, fh=%d", dir.Path, req, req.Handle)
if req.Valid.Mode() {
- dir.entry.Attributes.FileMode = uint32(req.Mode)
+ entry.Attributes.FileMode = uint32(req.Mode)
}
if req.Valid.Uid() {
- dir.entry.Attributes.Uid = req.Uid
+ entry.Attributes.Uid = req.Uid
}
if req.Valid.Gid() {
- dir.entry.Attributes.Gid = req.Gid
+ entry.Attributes.Gid = req.Gid
}
if req.Valid.Mtime() {
- dir.entry.Attributes.Mtime = req.Mtime.Unix()
+ entry.Attributes.Mtime = req.Mtime.Unix()
}
- dir.wfs.listDirectoryEntriesCache.Delete(dir.Path)
-
- return dir.saveEntry(ctx)
+ return dir.saveEntry(entry)
}
func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
- glog.V(4).Infof("dir Setxattr %s: %s", dir.Path, req.Name)
+ glog.V(4).Infof("dir Setxattr %s: %s", dir.FullPath(), req.Name)
- if err := dir.maybeLoadEntry(ctx); err != nil {
+ entry, err := dir.maybeLoadEntry()
+ if err != nil {
return err
}
- if err := setxattr(dir.entry, req); err != nil {
+ if err := setxattr(entry, req); err != nil {
return err
}
- dir.wfs.listDirectoryEntriesCache.Delete(dir.Path)
-
- return dir.saveEntry(ctx)
+ return dir.saveEntry(entry)
}
func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error {
- glog.V(4).Infof("dir Removexattr %s: %s", dir.Path, req.Name)
+ glog.V(4).Infof("dir Removexattr %s: %s", dir.FullPath(), req.Name)
- if err := dir.maybeLoadEntry(ctx); err != nil {
+ entry, err := dir.maybeLoadEntry()
+ if err != nil {
return err
}
- if err := removexattr(dir.entry, req); err != nil {
+ if err := removexattr(entry, req); err != nil {
return err
}
- dir.wfs.listDirectoryEntriesCache.Delete(dir.Path)
-
- return dir.saveEntry(ctx)
+ return dir.saveEntry(entry)
}
func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
- glog.V(4).Infof("dir Listxattr %s", dir.Path)
+ glog.V(4).Infof("dir Listxattr %s", dir.FullPath())
- if err := dir.maybeLoadEntry(ctx); err != nil {
+ entry, err := dir.maybeLoadEntry()
+ if err != nil {
return err
}
- if err := listxattr(dir.entry, req, resp); err != nil {
+ if err := listxattr(entry, req, resp); err != nil {
return err
}
@@ -383,39 +539,66 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp
}
-func (dir *Dir) maybeLoadEntry(ctx context.Context) error {
- if dir.entry == nil {
- parentDirPath, name := filer2.FullPath(dir.Path).DirAndName()
- entry, err := dir.wfs.maybeLoadEntry(ctx, parentDirPath, name)
- if err != nil {
- return err
- }
- if entry == nil {
- return fuse.ENOENT
- }
- dir.entry = entry
- }
- return nil
+func (dir *Dir) Forget() {
+ glog.V(4).Infof("Forget dir %s", dir.FullPath())
}
-func (dir *Dir) saveEntry(ctx context.Context) error {
+func (dir *Dir) maybeLoadEntry() (*filer_pb.Entry, error) {
+ parentDirPath, name := util.FullPath(dir.FullPath()).DirAndName()
+ return dir.wfs.maybeLoadEntry(parentDirPath, name)
+}
+
+func (dir *Dir) saveEntry(entry *filer_pb.Entry) error {
- parentDir, name := filer2.FullPath(dir.Path).DirAndName()
+ parentDir, name := util.FullPath(dir.FullPath()).DirAndName()
- return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ dir.wfs.mapPbIdFromLocalToFiler(entry)
+ defer dir.wfs.mapPbIdFromFilerToLocal(entry)
request := &filer_pb.UpdateEntryRequest{
- Directory: parentDir,
- Entry: dir.entry,
+ Directory: parentDir,
+ Entry: entry,
+ Signatures: []int32{dir.wfs.signature},
}
glog.V(1).Infof("save dir entry: %v", request)
- _, err := client.UpdateEntry(ctx, request)
+ _, err := client.UpdateEntry(context.Background(), request)
if err != nil {
- glog.V(0).Infof("UpdateEntry dir %s/%s: %v", parentDir, name, err)
+ glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err)
return fuse.EIO
}
+ if err := dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)); err != nil {
+ glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err)
+ return fuse.ESTALE
+ }
+
return nil
})
}
+
+func (dir *Dir) FullPath() string {
+ var parts []string
+ for p := dir; p != nil; p = p.parent {
+ if strings.HasPrefix(p.name, "/") {
+ if len(p.name) > 1 {
+ parts = append(parts, p.name[1:])
+ }
+ } else {
+ parts = append(parts, p.name)
+ }
+ }
+
+ if len(parts) == 0 {
+ return "/"
+ }
+
+ var buf bytes.Buffer
+ for i := len(parts) - 1; i >= 0; i-- {
+ buf.WriteString("/")
+ buf.WriteString(parts[i])
+ }
+ return buf.String()
+}
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
index 8e60872d3..acdcd2de4 100644
--- a/weed/filesys/dir_link.go
+++ b/weed/filesys/dir_link.go
@@ -2,25 +2,110 @@ package filesys
import (
"context"
+ "github.com/chrislusf/seaweedfs/weed/util"
"os"
"syscall"
"time"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
+var _ = fs.NodeLinker(&Dir{})
var _ = fs.NodeSymlinker(&Dir{})
var _ = fs.NodeReadlinker(&File{})
+const (
+ HARD_LINK_MARKER = '\x01'
+)
+
+func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (fs.Node, error) {
+
+ oldFile, ok := old.(*File)
+ if !ok {
+ glog.Errorf("old node is not a file: %+v", old)
+ }
+
+ glog.V(4).Infof("Link: %v/%v -> %v/%v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName)
+
+ oldEntry, err := oldFile.maybeLoadEntry(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ if oldEntry == nil {
+ return nil, fuse.EIO
+ }
+
+ // update old file to hardlink mode
+ if len(oldEntry.HardLinkId) == 0 {
+ oldEntry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
+ oldEntry.HardLinkCounter = 1
+ }
+ oldEntry.HardLinkCounter++
+ updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
+ Directory: oldFile.dir.FullPath(),
+ Entry: oldEntry,
+ Signatures: []int32{dir.wfs.signature},
+ }
+
+ // CreateLink 1.2 : update new file to hardlink mode
+ request := &filer_pb.CreateEntryRequest{
+ Directory: dir.FullPath(),
+ Entry: &filer_pb.Entry{
+ Name: req.NewName,
+ IsDirectory: false,
+ Attributes: oldEntry.Attributes,
+ Chunks: oldEntry.Chunks,
+ Extended: oldEntry.Extended,
+ HardLinkId: oldEntry.HardLinkId,
+ HardLinkCounter: oldEntry.HardLinkCounter,
+ },
+ Signatures: []int32{dir.wfs.signature},
+ }
+
+ // apply changes to the filer, and also apply to local metaCache
+ err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
+ defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
+
+ if err := filer_pb.UpdateEntry(client, updateOldEntryRequest); err != nil {
+ glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err)
+ return fuse.EIO
+ }
+ dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(updateOldEntryRequest.Directory, updateOldEntryRequest.Entry))
+
+ if err := filer_pb.CreateEntry(client, request); err != nil {
+ glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err)
+ return fuse.EIO
+ }
+ dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
+
+ return nil
+ })
+
+ if err != nil {
+ return nil, fuse.EIO
+ }
+
+ // create new file node
+ newNode := dir.newFile(req.NewName)
+ newFile := newNode.(*File)
+
+ return newFile, err
+
+}
+
func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, error) {
- glog.V(3).Infof("Symlink: %v/%v to %v", dir.Path, req.NewName, req.Target)
+ glog.V(4).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target)
request := &filer_pb.CreateEntryRequest{
- Directory: dir.Path,
+ Directory: dir.FullPath(),
Entry: &filer_pb.Entry{
Name: req.NewName,
IsDirectory: false,
@@ -33,17 +118,25 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
SymlinkTarget: req.Target,
},
},
+ Signatures: []int32{dir.wfs.signature},
}
- err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- if _, err := client.CreateEntry(ctx, request); err != nil {
- glog.V(0).Infof("symlink %s/%s: %v", dir.Path, req.NewName, err)
+ err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
+ defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
+
+ if err := filer_pb.CreateEntry(client, request); err != nil {
+ glog.V(0).Infof("symlink %s/%s: %v", dir.FullPath(), req.NewName, err)
return fuse.EIO
}
+
+ dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
+
return nil
})
- symlink := dir.newFile(req.NewName, request.Entry)
+ symlink := dir.newFile(req.NewName)
return symlink, err
@@ -51,16 +144,17 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (string, error) {
- if err := file.maybeLoadEntry(ctx); err != nil {
+ entry, err := file.maybeLoadEntry(ctx)
+ if err != nil {
return "", err
}
- if os.FileMode(file.entry.Attributes.FileMode)&os.ModeSymlink == 0 {
+ if os.FileMode(entry.Attributes.FileMode)&os.ModeSymlink == 0 {
return "", fuse.Errno(syscall.EINVAL)
}
- glog.V(3).Infof("Readlink: %v/%v => %v", file.dir.Path, file.Name, file.entry.Attributes.SymlinkTarget)
+ glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, entry.Attributes.SymlinkTarget)
- return file.entry.Attributes.SymlinkTarget, nil
+ return entry.Attributes.SymlinkTarget, nil
}
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go
index e72a15758..b07710d17 100644
--- a/weed/filesys/dir_rename.go
+++ b/weed/filesys/dir_rename.go
@@ -2,32 +2,90 @@ package filesys
import (
"context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error {
newDir := newDirectory.(*Dir)
- return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ newPath := util.NewFullPath(newDir.FullPath(), req.NewName)
+ oldPath := util.NewFullPath(dir.FullPath(), req.OldName)
+
+ glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)
+
+ // find local old entry
+ oldEntry, err := dir.wfs.metaCache.FindEntry(context.Background(), oldPath)
+ if err != nil {
+ glog.Errorf("dir Rename can not find source %s : %v", oldPath, err)
+ return fuse.ENOENT
+ }
+
+ // update remote filer
+ err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
request := &filer_pb.AtomicRenameEntryRequest{
- OldDirectory: dir.Path,
+ OldDirectory: dir.FullPath(),
OldName: req.OldName,
- NewDirectory: newDir.Path,
+ NewDirectory: newDir.FullPath(),
NewName: req.NewName,
}
_, err := client.AtomicRenameEntry(ctx, request)
if err != nil {
- return fmt.Errorf("renaming %s/%s => %s/%s: %v", dir.Path, req.OldName, newDir.Path, req.NewName, err)
+ glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err)
+ return fuse.EXDEV
}
return nil
})
+ if err != nil {
+ glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err)
+ return fuse.EIO
+ }
+
+ // TODO: replicate renaming logic on filer
+ if err := dir.wfs.metaCache.DeleteEntry(context.Background(), oldPath); err != nil {
+ glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err)
+ return fuse.EIO
+ }
+ oldEntry.FullPath = newPath
+ if err := dir.wfs.metaCache.InsertEntry(context.Background(), oldEntry); err != nil {
+ glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err)
+ return fuse.EIO
+ }
+
+ oldFsNode := NodeWithId(oldPath.AsInode())
+ newFsNode := NodeWithId(newPath.AsInode())
+ dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) {
+ if file, ok := internalNode.(*File); ok {
+ glog.V(4).Infof("internal node %s", file.Name)
+ file.Name = req.NewName
+ file.id = uint64(newFsNode)
+ }
+ })
+
+ // change file handle
+ dir.wfs.handlesLock.Lock()
+ defer dir.wfs.handlesLock.Unlock()
+ inodeId := oldPath.AsInode()
+ existingHandle, found := dir.wfs.handles[inodeId]
+ glog.V(4).Infof("has open filehandle %s: %v", oldPath, found)
+ if !found || existingHandle == nil {
+ return nil
+ }
+ glog.V(4).Infof("opened filehandle %s => %s", oldPath, newPath)
+ delete(dir.wfs.handles, inodeId)
+ dir.wfs.handles[newPath.AsInode()] = existingHandle
+ return nil
}
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 35d8f249a..8888cff96 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -2,214 +2,117 @@ package filesys
import (
"bytes"
- "context"
- "fmt"
+ "io"
"sync"
- "sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
)
type ContinuousDirtyPages struct {
- hasData bool
- Offset int64
- Size int64
- Data []byte
- f *File
- lock sync.Mutex
+ intervals *ContinuousIntervals
+ f *File
+ writeWaitGroup sync.WaitGroup
+ chunkAddLock sync.Mutex
+ lastErr error
+ collection string
+ replication string
}
func newDirtyPages(file *File) *ContinuousDirtyPages {
- return &ContinuousDirtyPages{
- Data: nil,
- f: file,
+ dirtyPages := &ContinuousDirtyPages{
+ intervals: &ContinuousIntervals{},
+ f: file,
}
+ return dirtyPages
}
-func (pages *ContinuousDirtyPages) releaseResource() {
- if pages.Data != nil {
- pages.f.wfs.bufPool.Put(pages.Data)
- pages.Data = nil
- atomic.AddInt32(&counter, -1)
- glog.V(3).Infof("%s/%s releasing resource %d", pages.f.dir.Path, pages.f.Name, counter)
- }
-}
-
-var counter = int32(0)
+func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) {
-func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
-
- pages.lock.Lock()
- defer pages.lock.Unlock()
-
- var chunk *filer_pb.FileChunk
+ glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data)))
if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
// this is more than what buffer can hold.
- return pages.flushAndSave(ctx, offset, data)
+ pages.flushAndSave(offset, data)
}
- if pages.Data == nil {
- pages.Data = pages.f.wfs.bufPool.Get().([]byte)
- atomic.AddInt32(&counter, 1)
- glog.V(3).Infof("%s/%s acquire resource %d", pages.f.dir.Path, pages.f.Name, counter)
- }
+ pages.intervals.AddInterval(data, offset)
- if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) ||
- pages.Offset+int64(len(pages.Data)) < offset+int64(len(data)) {
- // if the data is out of range,
- // or buffer is full if adding new data,
- // flush current buffer and add new data
-
- // println("offset", offset, "size", len(data), "existing offset", pages.Offset, "size", pages.Size)
-
- if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
- if chunk != nil {
- glog.V(4).Infof("%s/%s add save [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
- chunks = append(chunks, chunk)
- }
- } else {
- glog.V(0).Infof("%s/%s add save [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
- return
- }
- pages.Offset = offset
- copy(pages.Data, data)
- pages.Size = int64(len(data))
- return
+ if pages.intervals.TotalSize() >= pages.f.wfs.option.ChunkSizeLimit {
+ pages.saveExistingLargestPageToStorage()
}
- if offset != pages.Offset+pages.Size {
- // when this happens, debug shows the data overlapping with existing data is empty
- // the data is not just append
- if offset == pages.Offset && int(pages.Size) < len(data) {
- // glog.V(2).Infof("pages[%d,%d) pages.Data len=%v, data len=%d, pages.Size=%d", pages.Offset, pages.Offset+pages.Size, len(pages.Data), len(data), pages.Size)
- copy(pages.Data[pages.Size:], data[pages.Size:])
- } else {
- if pages.Size != 0 {
- glog.V(1).Infof("%s/%s add page: pages [%d, %d) write [%d, %d)", pages.f.dir.Path, pages.f.Name, pages.Offset, pages.Offset+pages.Size, offset, offset+int64(len(data)))
- }
- return pages.flushAndSave(ctx, offset, data)
- }
- } else {
- copy(pages.Data[offset-pages.Offset:], data)
- }
-
- pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset)
-
return
}
-func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
-
- var chunk *filer_pb.FileChunk
+func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) {
// flush existing
- if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
- if chunk != nil {
- glog.V(4).Infof("%s/%s flush existing [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId)
- chunks = append(chunks, chunk)
- }
- } else {
- glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
- return
- }
- pages.Size = 0
- pages.Offset = 0
+ pages.saveExistingPagesToStorage()
// flush the new page
- if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
- if chunk != nil {
- glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId)
- chunks = append(chunks, chunk)
- }
- } else {
- glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
- return
- }
+ pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data)))
return
}
-func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
+func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() {
+ for pages.saveExistingLargestPageToStorage() {
+ }
+}
- pages.lock.Lock()
- defer pages.lock.Unlock()
+func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedData bool) {
- if pages.Size == 0 {
- return nil, nil
+ maxList := pages.intervals.RemoveLargestIntervalLinkedList()
+ if maxList == nil {
+ return false
}
- if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
- pages.Size = 0
- pages.Offset = 0
- if chunk != nil {
- glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
- }
+ entry := pages.f.getEntry()
+ if entry == nil {
+ return false
}
- return
-}
-func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
+ fileSize := int64(entry.Attributes.FileSize)
- if pages.Size == 0 {
- return nil, nil
+ chunkSize := min(maxList.Size(), fileSize-maxList.Offset())
+ if chunkSize == 0 {
+ return false
}
- return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset)
-}
-
-func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
+ pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize)
- var fileId, host string
- var auth security.EncodedJwt
+ return true
+}
- if err := pages.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: pages.f.wfs.option.Replication,
- Collection: pages.f.wfs.option.Collection,
- TtlSec: pages.f.wfs.option.TtlSec,
- DataCenter: pages.f.wfs.option.DataCenter,
- }
+ mtime := time.Now().UnixNano()
+ pages.writeWaitGroup.Add(1)
+ writer := func() {
+ defer pages.writeWaitGroup.Done()
- resp, err := client.AssignVolume(ctx, request)
+ reader = io.LimitReader(reader, size)
+ chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
+ glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
+ pages.lastErr = err
+ return
}
-
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
-
- return nil
- }); err != nil {
- return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
+ chunk.Mtime = mtime
+ pages.collection, pages.replication = collection, replication
+ pages.chunkAddLock.Lock()
+ defer pages.chunkAddLock.Unlock()
+ pages.f.addChunks([]*filer_pb.FileChunk{chunk})
+ glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size)
}
- fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- bufReader := bytes.NewReader(buf)
- uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "", nil, auth)
- if err != nil {
- glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
- return nil, fmt.Errorf("upload data: %v", err)
- }
- if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
- return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
+ if pages.f.wfs.concurrentWriters != nil {
+ pages.f.wfs.concurrentWriters.Execute(writer)
+ } else {
+ go writer()
}
-
- return &filer_pb.FileChunk{
- FileId: fileId,
- Offset: offset,
- Size: uint64(len(buf)),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
- }, nil
-
}
func max(x, y int64) int64 {
@@ -218,3 +121,13 @@ func max(x, y int64) int64 {
}
return y
}
+func min(x, y int64) int64 {
+ if x < y {
+ return x
+ }
+ return y
+}
+
+func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
+ return pages.intervals.ReadDataAt(data, startOffset)
+}
diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go
new file mode 100644
index 000000000..1404bf78c
--- /dev/null
+++ b/weed/filesys/dirty_page_interval.go
@@ -0,0 +1,223 @@
+package filesys
+
+import (
+ "bytes"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type IntervalNode struct {
+ Data []byte
+ Offset int64
+ Size int64
+ Next *IntervalNode
+}
+
+type IntervalLinkedList struct {
+ Head *IntervalNode
+ Tail *IntervalNode
+}
+
+type ContinuousIntervals struct {
+ lists []*IntervalLinkedList
+}
+
+func (list *IntervalLinkedList) Offset() int64 {
+ return list.Head.Offset
+}
+func (list *IntervalLinkedList) Size() int64 {
+ return list.Tail.Offset + list.Tail.Size - list.Head.Offset
+}
+func (list *IntervalLinkedList) addNodeToTail(node *IntervalNode) {
+ // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size)
+ list.Tail.Next = node
+ list.Tail = node
+}
+func (list *IntervalLinkedList) addNodeToHead(node *IntervalNode) {
+ // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size)
+ node.Next = list.Head
+ list.Head = node
+}
+
+func (list *IntervalLinkedList) ReadData(buf []byte, start, stop int64) {
+ t := list.Head
+ for {
+
+ nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size)
+ if nodeStart < nodeStop {
+ // glog.V(0).Infof("copying start=%d stop=%d t=[%d,%d) t.data=%d => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.Offset, t.Offset+t.Size, len(t.Data), len(buf), nodeStart, nodeStop)
+ copy(buf[nodeStart-start:], t.Data[nodeStart-t.Offset:nodeStop-t.Offset])
+ }
+
+ if t.Next == nil {
+ break
+ }
+ t = t.Next
+ }
+}
+
+func (c *ContinuousIntervals) TotalSize() (total int64) {
+ for _, list := range c.lists {
+ total += list.Size()
+ }
+ return
+}
+
+func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList {
+ var nodes []*IntervalNode
+ for t := list.Head; t != nil; t = t.Next {
+ nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size)
+ if nodeStart >= nodeStop {
+ // skip non overlapping IntervalNode
+ continue
+ }
+ nodes = append(nodes, &IntervalNode{
+ Data: t.Data[nodeStart-t.Offset : nodeStop-t.Offset],
+ Offset: nodeStart,
+ Size: nodeStop - nodeStart,
+ Next: nil,
+ })
+ }
+ for i := 1; i < len(nodes); i++ {
+ nodes[i-1].Next = nodes[i]
+ }
+ return &IntervalLinkedList{
+ Head: nodes[0],
+ Tail: nodes[len(nodes)-1],
+ }
+}
+
+func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) {
+
+ interval := &IntervalNode{Data: data, Offset: offset, Size: int64(len(data))}
+
+ // append to the tail and return
+ if len(c.lists) == 1 {
+ lastSpan := c.lists[0]
+ if lastSpan.Tail.Offset+lastSpan.Tail.Size == offset {
+ lastSpan.addNodeToTail(interval)
+ return
+ }
+ }
+
+ var newLists []*IntervalLinkedList
+ for _, list := range c.lists {
+ // if list is to the left of new interval, add to the new list
+ if list.Tail.Offset+list.Tail.Size <= interval.Offset {
+ newLists = append(newLists, list)
+ }
+ // if list is to the right of new interval, add to the new list
+ if interval.Offset+interval.Size <= list.Head.Offset {
+ newLists = append(newLists, list)
+ }
+ // if new interval overwrite the right part of the list
+ if list.Head.Offset < interval.Offset && interval.Offset < list.Tail.Offset+list.Tail.Size {
+ // create a new list of the left part of existing list
+ newLists = append(newLists, subList(list, list.Offset(), interval.Offset))
+ }
+ // if new interval overwrite the left part of the list
+ if list.Head.Offset < interval.Offset+interval.Size && interval.Offset+interval.Size < list.Tail.Offset+list.Tail.Size {
+ // create a new list of the right part of existing list
+ newLists = append(newLists, subList(list, interval.Offset+interval.Size, list.Tail.Offset+list.Tail.Size))
+ }
+ // skip anything that is fully overwritten by the new interval
+ }
+
+ c.lists = newLists
+ // add the new interval to the lists, connecting neighbor lists
+ var prevList, nextList *IntervalLinkedList
+
+ for _, list := range c.lists {
+ if list.Head.Offset == interval.Offset+interval.Size {
+ nextList = list
+ break
+ }
+ }
+
+ for _, list := range c.lists {
+ if list.Head.Offset+list.Size() == offset {
+ list.addNodeToTail(interval)
+ prevList = list
+ break
+ }
+ }
+
+ if prevList != nil && nextList != nil {
+ // glog.V(4).Infof("connecting [%d,%d) + [%d,%d) => [%d,%d)", prevList.Head.Offset, prevList.Tail.Offset+prevList.Tail.Size, nextList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size, prevList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size)
+ prevList.Tail.Next = nextList.Head
+ prevList.Tail = nextList.Tail
+ c.removeList(nextList)
+ } else if nextList != nil {
+ // add to head was not done when checking
+ nextList.addNodeToHead(interval)
+ }
+ if prevList == nil && nextList == nil {
+ c.lists = append(c.lists, &IntervalLinkedList{
+ Head: interval,
+ Tail: interval,
+ })
+ }
+
+ return
+}
+
+func (c *ContinuousIntervals) RemoveLargestIntervalLinkedList() *IntervalLinkedList {
+ var maxSize int64
+ maxIndex := -1
+ for k, list := range c.lists {
+ if maxSize <= list.Size() {
+ maxSize = list.Size()
+ maxIndex = k
+ }
+ }
+ if maxSize <= 0 {
+ return nil
+ }
+
+ t := c.lists[maxIndex]
+ c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...)
+ return t
+
+}
+
+func (c *ContinuousIntervals) removeList(target *IntervalLinkedList) {
+ index := -1
+ for k, list := range c.lists {
+ if list.Offset() == target.Offset() {
+ index = k
+ }
+ }
+ if index < 0 {
+ return
+ }
+
+ c.lists = append(c.lists[0:index], c.lists[index+1:]...)
+
+}
+
+func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) {
+ for _, list := range c.lists {
+ start := max(startOffset, list.Offset())
+ stop := min(startOffset+int64(len(data)), list.Offset()+list.Size())
+ if start < stop {
+ list.ReadData(data[start-startOffset:], start, stop)
+ maxStop = max(maxStop, stop)
+ }
+ }
+ return
+}
+
+func (l *IntervalLinkedList) ToReader() io.Reader {
+ var readers []io.Reader
+ t := l.Head
+ readers = append(readers, util.NewBytesReader(t.Data))
+ for t.Next != nil {
+ t = t.Next
+ readers = append(readers, bytes.NewReader(t.Data))
+ }
+ if len(readers) == 1 {
+ return readers[0]
+ }
+ return io.MultiReader(readers...)
+}
diff --git a/weed/filesys/dirty_page_interval_test.go b/weed/filesys/dirty_page_interval_test.go
new file mode 100644
index 000000000..d02ad27fd
--- /dev/null
+++ b/weed/filesys/dirty_page_interval_test.go
@@ -0,0 +1,113 @@
+package filesys
+
+import (
+ "bytes"
+ "math/rand"
+ "testing"
+)
+
+func TestContinuousIntervals_AddIntervalAppend(t *testing.T) {
+
+ c := &ContinuousIntervals{}
+
+ // 25, 25, 25
+ c.AddInterval(getBytes(25, 3), 0)
+ // _, _, 23, 23, 23, 23
+ c.AddInterval(getBytes(23, 4), 2)
+
+ expectedData(t, c, 0, 25, 25, 23, 23, 23, 23)
+
+}
+
+func TestContinuousIntervals_AddIntervalInnerOverwrite(t *testing.T) {
+
+ c := &ContinuousIntervals{}
+
+ // 25, 25, 25, 25, 25
+ c.AddInterval(getBytes(25, 5), 0)
+ // _, _, 23, 23
+ c.AddInterval(getBytes(23, 2), 2)
+
+ expectedData(t, c, 0, 25, 25, 23, 23, 25)
+
+}
+
+func TestContinuousIntervals_AddIntervalFullOverwrite(t *testing.T) {
+
+ c := &ContinuousIntervals{}
+
+ // 1,
+ c.AddInterval(getBytes(1, 1), 0)
+ // _, 2,
+ c.AddInterval(getBytes(2, 1), 1)
+ // _, _, 3, 3, 3
+ c.AddInterval(getBytes(3, 3), 2)
+ // _, _, _, 4, 4, 4
+ c.AddInterval(getBytes(4, 3), 3)
+
+ expectedData(t, c, 0, 1, 2, 3, 4, 4, 4)
+
+}
+
+func TestContinuousIntervals_RealCase1(t *testing.T) {
+
+ c := &ContinuousIntervals{}
+
+ // 25,
+ c.AddInterval(getBytes(25, 1), 0)
+ // _, _, _, _, 23, 23
+ c.AddInterval(getBytes(23, 2), 4)
+ // _, _, _, 24, 24, 24, 24
+ c.AddInterval(getBytes(24, 4), 3)
+
+ // _, 22, 22
+ c.AddInterval(getBytes(22, 2), 1)
+
+ expectedData(t, c, 0, 25, 22, 22, 24, 24, 24, 24)
+
+}
+
+func TestRandomWrites(t *testing.T) {
+
+ c := &ContinuousIntervals{}
+
+ data := make([]byte, 1024)
+
+ for i := 0; i < 1024; i++ {
+
+ start, stop := rand.Intn(len(data)), rand.Intn(len(data))
+ if start > stop {
+ start, stop = stop, start
+ }
+
+ rand.Read(data[start : stop+1])
+
+ c.AddInterval(data[start:stop+1], int64(start))
+
+ expectedData(t, c, 0, data...)
+
+ }
+
+}
+
+func expectedData(t *testing.T, c *ContinuousIntervals, offset int, data ...byte) {
+ start, stop := int64(offset), int64(offset+len(data))
+ for _, list := range c.lists {
+ nodeStart, nodeStop := max(start, list.Head.Offset), min(stop, list.Head.Offset+list.Size())
+ if nodeStart < nodeStop {
+ buf := make([]byte, nodeStop-nodeStart)
+ list.ReadData(buf, nodeStart, nodeStop)
+ if bytes.Compare(buf, data[nodeStart-start:nodeStop-start]) != 0 {
+ t.Errorf("expected %v actual %v", data[nodeStart-start:nodeStop-start], buf)
+ }
+ }
+ }
+}
+
+func getBytes(content byte, length int) []byte {
+ data := make([]byte, length)
+ for i := 0; i < length; i++ {
+ data[i] = content
+ }
+ return data
+}
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index afe78ee0f..bb57988cd 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -3,20 +3,22 @@ package filesys
import (
"context"
"os"
- "path/filepath"
"sort"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
const blockSize = 512
var _ = fs.Node(&File{})
+var _ = fs.NodeIdentifier(&File{})
var _ = fs.NodeOpener(&File{})
var _ = fs.NodeFsyncer(&File{})
var _ = fs.NodeSetattrer(&File{})
@@ -24,35 +26,56 @@ var _ = fs.NodeGetxattrer(&File{})
var _ = fs.NodeSetxattrer(&File{})
var _ = fs.NodeRemovexattrer(&File{})
var _ = fs.NodeListxattrer(&File{})
+var _ = fs.NodeForgetter(&File{})
type File struct {
- Name string
- dir *Dir
- wfs *WFS
- entry *filer_pb.Entry
- entryViewCache []filer2.VisibleInterval
- isOpen bool
+ Name string
+ dir *Dir
+ wfs *WFS
+ entry *filer_pb.Entry
+ isOpen int
+ dirtyMetadata bool
+ id uint64
+}
+
+func (file *File) fullpath() util.FullPath {
+ return util.NewFullPath(file.dir.FullPath(), file.Name)
}
-func (file *File) fullpath() string {
- return filepath.Join(file.dir.Path, file.Name)
+func (file *File) Id() uint64 {
+ return file.id
}
-func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
+func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
- glog.V(4).Infof("file Attr %s", file.fullpath())
+ glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr)
- if err := file.maybeLoadEntry(ctx); err != nil {
+ entry, err := file.maybeLoadEntry(ctx)
+ if err != nil {
return err
}
- attr.Mode = os.FileMode(file.entry.Attributes.FileMode)
- attr.Size = filer2.TotalSize(file.entry.Chunks)
- attr.Mtime = time.Unix(file.entry.Attributes.Mtime, 0)
- attr.Gid = file.entry.Attributes.Gid
- attr.Uid = file.entry.Attributes.Uid
+ if entry == nil {
+ return fuse.ENOENT
+ }
+
+ attr.Inode = file.Id()
+ attr.Valid = time.Second
+ attr.Mode = os.FileMode(entry.Attributes.FileMode)
+ attr.Size = filer.FileSize(entry)
+ if file.isOpen > 0 {
+ attr.Size = entry.Attributes.FileSize
+ glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size)
+ }
+ attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
+ attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
+ attr.Gid = entry.Attributes.Gid
+ attr.Uid = entry.Attributes.Uid
attr.Blocks = attr.Size/blockSize + 1
attr.BlockSize = uint32(file.wfs.option.ChunkSizeLimit)
+ if entry.HardLinkCounter > 0 {
+ attr.Nlink = uint32(entry.HardLinkCounter)
+ }
return nil
@@ -62,24 +85,23 @@ func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp
glog.V(4).Infof("file Getxattr %s", file.fullpath())
- if err := file.maybeLoadEntry(ctx); err != nil {
+ entry, err := file.maybeLoadEntry(ctx)
+ if err != nil {
return err
}
- return getxattr(file.entry, req, resp)
+ return getxattr(entry, req, resp)
}
func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
glog.V(4).Infof("file %v open %+v", file.fullpath(), req)
- file.isOpen = true
-
handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid)
resp.Handle = fuse.HandleID(handle.handle)
- glog.V(3).Infof("%v file open handle id = %d", file.fullpath(), handle.handle)
+ glog.V(4).Infof("%v file open handle id = %d", file.fullpath(), handle.handle)
return handle, nil
@@ -87,48 +109,89 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op
func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
- if err := file.maybeLoadEntry(ctx); err != nil {
+ glog.V(4).Infof("%v file setattr %+v", file.fullpath(), req)
+
+ entry, err := file.maybeLoadEntry(ctx)
+ if err != nil {
return err
}
+ if file.isOpen > 0 {
+ file.wfs.handlesLock.Lock()
+ fileHandle := file.wfs.handles[file.Id()]
+ file.wfs.handlesLock.Unlock()
+
+ if fileHandle != nil {
+ fileHandle.Lock()
+ defer fileHandle.Unlock()
+ }
+ }
- glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.entry.Attributes)
if req.Valid.Size() {
- glog.V(3).Infof("%v file setattr set size=%v", file.fullpath(), req.Size)
- if req.Size == 0 {
+ glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(entry.Chunks))
+ if req.Size < filer.FileSize(entry) {
// fmt.Printf("truncate %v \n", fullPath)
- file.entry.Chunks = nil
- file.entryViewCache = nil
+ var chunks []*filer_pb.FileChunk
+ var truncatedChunks []*filer_pb.FileChunk
+ for _, chunk := range entry.Chunks {
+ int64Size := int64(chunk.Size)
+ if chunk.Offset+int64Size > int64(req.Size) {
+ // this chunk is truncated
+ int64Size = int64(req.Size) - chunk.Offset
+ if int64Size > 0 {
+ chunks = append(chunks, chunk)
+ glog.V(4).Infof("truncated chunk %+v from %d to %d\n", chunk.GetFileIdString(), chunk.Size, int64Size)
+ chunk.Size = uint64(int64Size)
+ } else {
+ glog.V(4).Infof("truncated whole chunk %+v\n", chunk.GetFileIdString())
+ truncatedChunks = append(truncatedChunks, chunk)
+ }
+ }
+ }
+ entry.Chunks = chunks
}
- file.entry.Attributes.FileSize = req.Size
+ entry.Attributes.FileSize = req.Size
+ file.dirtyMetadata = true
}
+
if req.Valid.Mode() {
- file.entry.Attributes.FileMode = uint32(req.Mode)
+ entry.Attributes.FileMode = uint32(req.Mode)
+ file.dirtyMetadata = true
}
if req.Valid.Uid() {
- file.entry.Attributes.Uid = req.Uid
+ entry.Attributes.Uid = req.Uid
+ file.dirtyMetadata = true
}
if req.Valid.Gid() {
- file.entry.Attributes.Gid = req.Gid
+ entry.Attributes.Gid = req.Gid
+ file.dirtyMetadata = true
}
if req.Valid.Crtime() {
- file.entry.Attributes.Crtime = req.Crtime.Unix()
+ entry.Attributes.Crtime = req.Crtime.Unix()
+ file.dirtyMetadata = true
}
if req.Valid.Mtime() {
- file.entry.Attributes.Mtime = req.Mtime.Unix()
+ entry.Attributes.Mtime = req.Mtime.Unix()
+ file.dirtyMetadata = true
+ }
+
+ if req.Valid.Handle() {
+ // fmt.Printf("file handle => %d\n", req.Handle)
}
- if file.isOpen {
+ if file.isOpen > 0 {
return nil
}
- file.wfs.listDirectoryEntriesCache.Delete(file.fullpath())
+ if !file.dirtyMetadata {
+ return nil
+ }
- return file.saveEntry(ctx)
+ return file.saveEntry(entry)
}
@@ -136,17 +199,16 @@ func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error
glog.V(4).Infof("file Setxattr %s: %s", file.fullpath(), req.Name)
- if err := file.maybeLoadEntry(ctx); err != nil {
+ entry, err := file.maybeLoadEntry(ctx)
+ if err != nil {
return err
}
- if err := setxattr(file.entry, req); err != nil {
+ if err := setxattr(entry, req); err != nil {
return err
}
- file.wfs.listDirectoryEntriesCache.Delete(file.fullpath())
-
- return file.saveEntry(ctx)
+ return file.saveEntry(entry)
}
@@ -154,17 +216,16 @@ func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest)
glog.V(4).Infof("file Removexattr %s: %s", file.fullpath(), req.Name)
- if err := file.maybeLoadEntry(ctx); err != nil {
+ entry, err := file.maybeLoadEntry(ctx)
+ if err != nil {
return err
}
- if err := removexattr(file.entry, req); err != nil {
+ if err := removexattr(entry, req); err != nil {
return err
}
- file.wfs.listDirectoryEntriesCache.Delete(file.fullpath())
-
- return file.saveEntry(ctx)
+ return file.saveEntry(entry)
}
@@ -172,11 +233,12 @@ func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, res
glog.V(4).Infof("file Listxattr %s", file.fullpath())
- if err := file.maybeLoadEntry(ctx); err != nil {
+ entry, err := file.maybeLoadEntry(ctx)
+ if err != nil {
return err
}
- if err := listxattr(file.entry, req, resp); err != nil {
+ if err := listxattr(entry, req, resp); err != nil {
return err
}
@@ -187,69 +249,112 @@ func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, res
func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
// fsync works at OS level
// write the file chunks to the filerGrpcAddress
- glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req)
+ glog.V(4).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req)
return nil
}
-func (file *File) maybeLoadEntry(ctx context.Context) error {
- if file.entry == nil || !file.isOpen {
- entry, err := file.wfs.maybeLoadEntry(ctx, file.dir.Path, file.Name)
- if err != nil {
- return err
- }
- if entry != nil {
- file.setEntry(entry)
+func (file *File) Forget() {
+ t := util.NewFullPath(file.dir.FullPath(), file.Name)
+ glog.V(4).Infof("Forget file %s", t)
+ file.wfs.ReleaseHandle(t, fuse.HandleID(t.AsInode()))
+}
+
+func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) {
+
+ file.wfs.handlesLock.Lock()
+ handle, found := file.wfs.handles[file.Id()]
+ file.wfs.handlesLock.Unlock()
+ entry = file.entry
+ if found {
+ glog.V(4).Infof("maybeLoadEntry found opened file %s/%s: %v %v", file.dir.FullPath(), file.Name, handle.f.entry, entry)
+ entry = handle.f.entry
+ }
+
+ if entry != nil {
+ if len(entry.HardLinkId) == 0 {
+ // only always reload hard link
+ return entry, nil
}
}
- return nil
+ entry, err = file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name)
+ if err != nil {
+ glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
+ return entry, err
+ }
+ if entry != nil {
+ // file.entry = entry
+ } else {
+ glog.Warningf("maybeLoadEntry not found entry %s/%s: %v", file.dir.FullPath(), file.Name, err)
+ }
+ return entry, nil
}
-func (file *File) addChunk(chunk *filer_pb.FileChunk) {
- if chunk != nil {
- file.addChunks([]*filer_pb.FileChunk{chunk})
+func lessThan(a, b *filer_pb.FileChunk) bool {
+ if a.Mtime == b.Mtime {
+ return a.Fid.FileKey < b.Fid.FileKey
}
+ return a.Mtime < b.Mtime
}
func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
- sort.Slice(chunks, func(i, j int) bool {
- return chunks[i].Mtime < chunks[j].Mtime
- })
+ // find the earliest incoming chunk
+ newChunks := chunks
+ earliestChunk := newChunks[0]
+ for i := 1; i < len(newChunks); i++ {
+ if lessThan(earliestChunk, newChunks[i]) {
+ earliestChunk = newChunks[i]
+ }
+ }
- var newVisibles []filer2.VisibleInterval
- for _, chunk := range chunks {
- newVisibles = filer2.MergeIntoVisibles(file.entryViewCache, newVisibles, chunk)
- t := file.entryViewCache[:0]
- file.entryViewCache = newVisibles
- newVisibles = t
+ entry := file.getEntry()
+ if entry == nil {
+ return
}
- glog.V(3).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
+ // pick out-of-order chunks from existing chunks
+ for _, chunk := range entry.Chunks {
+ if lessThan(earliestChunk, chunk) {
+ chunks = append(chunks, chunk)
+ }
+ }
- file.entry.Chunks = append(file.entry.Chunks, chunks...)
-}
+ // sort incoming chunks
+ sort.Slice(chunks, func(i, j int) bool {
+ return lessThan(chunks[i], chunks[j])
+ })
-func (file *File) setEntry(entry *filer_pb.Entry) {
- file.entry = entry
- file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks)
+ glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(entry.Chunks), len(chunks))
+
+ entry.Chunks = append(entry.Chunks, newChunks...)
}
-func (file *File) saveEntry(ctx context.Context) error {
- return file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+func (file *File) saveEntry(entry *filer_pb.Entry) error {
+ return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ file.wfs.mapPbIdFromLocalToFiler(entry)
+ defer file.wfs.mapPbIdFromFilerToLocal(entry)
request := &filer_pb.UpdateEntryRequest{
- Directory: file.dir.Path,
- Entry: file.entry,
+ Directory: file.dir.FullPath(),
+ Entry: entry,
+ Signatures: []int32{file.wfs.signature},
}
- glog.V(1).Infof("save file entry: %v", request)
- _, err := client.UpdateEntry(ctx, request)
+ glog.V(4).Infof("save file entry: %v", request)
+ _, err := client.UpdateEntry(context.Background(), request)
if err != nil {
- glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.Path, file.Name, err)
+ glog.Errorf("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
return fuse.EIO
}
+ file.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
+
return nil
})
}
+
+func (file *File) getEntry() *filer_pb.Entry {
+ return file.entry
+}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 101f5c056..27ffab6e1 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -3,39 +3,51 @@ package filesys
import (
"context"
"fmt"
- "mime"
- "path"
+ "io"
+ "math"
+ "net/http"
+ "os"
+ "sync"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/gabriel-vasile/mimetype"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
type FileHandle struct {
// cache file has been written to
- dirtyPages *ContinuousDirtyPages
- contentType string
- dirtyMetadata bool
- handle uint64
+ dirtyPages *ContinuousDirtyPages
+ entryViewCache []filer.VisibleInterval
+ reader io.ReaderAt
+ contentType string
+ handle uint64
+ sync.Mutex
f *File
RequestId fuse.RequestID // unique ID for request
NodeId fuse.NodeID // file or directory the request is about
Uid uint32 // user ID of process making request
Gid uint32 // group ID of process making request
+
}
func newFileHandle(file *File, uid, gid uint32) *FileHandle {
- return &FileHandle{
+ fh := &FileHandle{
f: file,
dirtyPages: newDirtyPages(file),
Uid: uid,
Gid: gid,
}
+ entry := fh.f.getEntry()
+ if entry != nil {
+ entry.Attributes.FileSize = filer.FileSize(entry)
+ }
+
+ return fh
}
var _ = fs.Handle(&FileHandle{})
@@ -48,134 +60,263 @@ var _ = fs.HandleReleaser(&FileHandle{})
func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
- glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size))
+ glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data))
+ fh.Lock()
+ defer fh.Unlock()
- // this value should come from the filer instead of the old f
- if len(fh.f.entry.Chunks) == 0 {
- glog.V(1).Infof("empty fh %v/%v", fh.f.dir.Path, fh.f.Name)
+ if req.Size <= 0 {
return nil
}
- buff := make([]byte, req.Size)
-
- if fh.f.entryViewCache == nil {
- fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks)
+ buff := resp.Data[:cap(resp.Data)]
+ if req.Size > cap(resp.Data) {
+ // should not happen
+ buff = make([]byte, req.Size)
}
- chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, req.Offset, req.Size)
-
- totalRead, err := filer2.ReadIntoBuffer(ctx, fh.f.wfs, fh.f.fullpath(), buff, chunkViews, req.Offset)
+ totalRead, err := fh.readFromChunks(buff, req.Offset)
+ if err == nil || err == io.EOF {
+ maxStop := fh.readFromDirtyPages(buff, req.Offset)
+ totalRead = max(maxStop-req.Offset, totalRead)
+ }
- resp.Data = buff[:totalRead]
+ if err == io.EOF {
+ err = nil
+ }
if err != nil {
- glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
+ glog.Warningf("file handle read %s %d: %v", fh.f.fullpath(), totalRead, err)
+ return fuse.EIO
+ }
+
+ if totalRead > int64(len(buff)) {
+ glog.Warningf("%s FileHandle Read %d: [%d,%d) size %d totalRead %d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, totalRead)
+ totalRead = min(int64(len(buff)), totalRead)
+ }
+ if err == nil {
+ resp.Data = buff[:totalRead]
}
return err
}
-// Write to the file handle
-func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
+func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) {
+ maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset)
+ return
+}
- // write the request to volume servers
+func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
- glog.V(4).Infof("%+v/%v write fh %d: [%d,%d)", fh.f.dir.Path, fh.f.Name, fh.handle, req.Offset, req.Offset+int64(len(req.Data)))
+ entry := fh.f.getEntry()
+ if entry == nil {
+ return 0, io.EOF
+ }
- chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data)
- if err != nil {
- glog.Errorf("%+v/%v write fh %d: [%d,%d): %v", fh.f.dir.Path, fh.f.Name, fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err)
- return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err)
+ fileSize := int64(filer.FileSize(entry))
+ fileFullPath := fh.f.fullpath()
+
+ if fileSize == 0 {
+ glog.V(1).Infof("empty fh %v", fileFullPath)
+ return 0, io.EOF
}
- resp.Size = len(req.Data)
+ if offset+int64(len(buff)) <= int64(len(entry.Content)) {
+ totalRead := copy(buff, entry.Content[offset:])
+ glog.V(4).Infof("file handle read cached %s [%d,%d] %d", fileFullPath, offset, offset+int64(totalRead), totalRead)
+ return int64(totalRead), nil
+ }
- if req.Offset == 0 {
- // detect mime type
- detectedMIME := mimetype.Detect(req.Data)
- fh.contentType = detectedMIME.String()
- if ext := path.Ext(fh.f.Name); ext != detectedMIME.Extension() {
- fh.contentType = mime.TypeByExtension(ext)
+ var chunkResolveErr error
+ if fh.entryViewCache == nil {
+ fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks)
+ if chunkResolveErr != nil {
+ return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
+ fh.reader = nil
+ }
+
+ reader := fh.reader
+ if reader == nil {
+ chunkViews := filer.ViewFromVisibleIntervals(fh.entryViewCache, 0, math.MaxInt64)
+ reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize)
+ }
+ fh.reader = reader
- fh.dirtyMetadata = true
+ totalRead, err := reader.ReadAt(buff, offset)
+
+ if err != nil && err != io.EOF {
+ glog.Errorf("file handle read %s: %v", fileFullPath, err)
}
- if len(chunks) > 0 {
+ glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
+
+ return int64(totalRead), err
+}
- fh.f.addChunks(chunks)
+// Write to the file handle
+func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
+
+ fh.Lock()
+ defer fh.Unlock()
+
+ // write the request to volume servers
+ data := req.Data
+ if len(data) <= 512 {
+ // fuse message cacheable size
+ data = make([]byte, len(req.Data))
+ copy(data, req.Data)
+ }
- fh.dirtyMetadata = true
+ entry := fh.f.getEntry()
+ if entry == nil {
+ return fuse.EIO
}
+ entry.Content = nil
+ entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize)))
+ glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
+
+ fh.dirtyPages.AddPage(req.Offset, data)
+
+ resp.Size = len(data)
+
+ if req.Offset == 0 {
+ // detect mime type
+ fh.contentType = http.DetectContentType(data)
+ fh.f.dirtyMetadata = true
+ }
+
+ fh.f.dirtyMetadata = true
+
return nil
}
func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
- glog.V(4).Infof("%v release fh %d", fh.f.fullpath(), fh.handle)
+ glog.V(4).Infof("Release %v fh %d open=%d", fh.f.fullpath(), fh.handle, fh.f.isOpen)
+
+ fh.Lock()
+ defer fh.Unlock()
- fh.dirtyPages.releaseResource()
+ fh.f.isOpen--
- fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
+ if fh.f.isOpen <= 0 {
+ fh.f.entry = nil
+ fh.entryViewCache = nil
+ fh.reader = nil
- fh.f.isOpen = false
+ fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
+ }
+
+ if fh.f.isOpen < 0 {
+ glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0)
+ fh.f.isOpen = 0
+ return nil
+ }
return nil
}
func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
- // fflush works at fh level
- // send the data to the OS
- glog.V(4).Infof("%s fh %d flush %v", fh.f.fullpath(), fh.handle, req)
- chunk, err := fh.dirtyPages.FlushToStorage(ctx)
- if err != nil {
- glog.Errorf("flush %s/%s: %v", fh.f.dir.Path, fh.f.Name, err)
- return fmt.Errorf("flush %s/%s: %v", fh.f.dir.Path, fh.f.Name, err)
+ glog.V(4).Infof("Flush %v fh %d", fh.f.fullpath(), fh.handle)
+
+ fh.Lock()
+ defer fh.Unlock()
+
+ if err := fh.doFlush(ctx, req.Header); err != nil {
+ glog.Errorf("Flush doFlush %s: %v", fh.f.Name, err)
+ return err
}
- fh.f.addChunk(chunk)
+ glog.V(4).Infof("Flush %v fh %d success", fh.f.fullpath(), fh.handle)
+ return nil
+}
+
+func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
+ // flush works at fh level
+ // send the data to the OS
+ glog.V(4).Infof("doFlush %s fh %d", fh.f.fullpath(), fh.handle)
+
+ fh.dirtyPages.saveExistingPagesToStorage()
+
+ fh.dirtyPages.writeWaitGroup.Wait()
- if !fh.dirtyMetadata {
+ if fh.dirtyPages.lastErr != nil {
+ glog.Errorf("%v doFlush last err: %v", fh.f.fullpath(), fh.dirtyPages.lastErr)
+ return fuse.EIO
+ }
+
+ if !fh.f.dirtyMetadata {
return nil
}
- return fh.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ entry := fh.f.getEntry()
+ if entry == nil {
+ return nil
+ }
- if fh.f.entry.Attributes != nil {
- fh.f.entry.Attributes.Mime = fh.contentType
- fh.f.entry.Attributes.Uid = req.Uid
- fh.f.entry.Attributes.Gid = req.Gid
- fh.f.entry.Attributes.Mtime = time.Now().Unix()
- fh.f.entry.Attributes.Crtime = time.Now().Unix()
- fh.f.entry.Attributes.FileMode = uint32(0777 &^ fh.f.wfs.option.Umask)
+ if entry.Attributes != nil {
+ entry.Attributes.Mime = fh.contentType
+ if entry.Attributes.Uid == 0 {
+ entry.Attributes.Uid = header.Uid
+ }
+ if entry.Attributes.Gid == 0 {
+ entry.Attributes.Gid = header.Gid
+ }
+ if entry.Attributes.Crtime == 0 {
+ entry.Attributes.Crtime = time.Now().Unix()
+ }
+ entry.Attributes.Mtime = time.Now().Unix()
+ entry.Attributes.FileMode = uint32(os.FileMode(entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
+ entry.Attributes.Collection = fh.dirtyPages.collection
+ entry.Attributes.Replication = fh.dirtyPages.replication
}
request := &filer_pb.CreateEntryRequest{
- Directory: fh.f.dir.Path,
- Entry: fh.f.entry,
+ Directory: fh.f.dir.FullPath(),
+ Entry: entry,
+ Signatures: []int32{fh.f.wfs.signature},
}
- glog.V(3).Infof("%s/%s set chunks: %v", fh.f.dir.Path, fh.f.Name, len(fh.f.entry.Chunks))
- for i, chunk := range fh.f.entry.Chunks {
- glog.V(3).Infof("%s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(entry.Chunks))
+ for i, chunk := range entry.Chunks {
+ glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
}
- chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks)
- fh.f.entry.Chunks = chunks
- // fh.f.entryViewCache = nil
+ manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks)
- if _, err := client.CreateEntry(ctx, request); err != nil {
- glog.Errorf("update fh: %v", err)
- return fmt.Errorf("update fh: %v", err)
+ chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
+ chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
+ if manifestErr != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
}
+ entry.Chunks = append(chunks, manifestChunks...)
- fh.f.wfs.deleteFileChunks(ctx, garbages)
- for i, chunk := range garbages {
- glog.V(3).Infof("garbage %s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry)
+ defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry)
+
+ if err := filer_pb.CreateEntry(client, request); err != nil {
+ glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
+ return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
}
+ fh.f.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
+
return nil
})
+
+ if err == nil {
+ fh.f.dirtyMetadata = false
+ }
+
+ if err != nil {
+ glog.Errorf("%v fh %d flush: %v", fh.f.fullpath(), fh.handle, err)
+ return fuse.EIO
+ }
+
+ return nil
}
diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go
new file mode 100644
index 000000000..6b1012090
--- /dev/null
+++ b/weed/filesys/fscache.go
@@ -0,0 +1,213 @@
+package filesys
+
+import (
+ "sync"
+
+ "github.com/seaweedfs/fuse/fs"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type FsCache struct {
+ root *FsNode
+ sync.RWMutex
+}
+type FsNode struct {
+ parent *FsNode
+ node fs.Node
+ name string
+ childrenLock sync.RWMutex
+ children map[string]*FsNode
+}
+
+func newFsCache(root fs.Node) *FsCache {
+ return &FsCache{
+ root: &FsNode{
+ node: root,
+ },
+ }
+}
+
+func (c *FsCache) GetFsNode(path util.FullPath) fs.Node {
+
+ c.RLock()
+ defer c.RUnlock()
+
+ return c.doGetFsNode(path)
+}
+
+func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node {
+ t := c.root
+ for _, p := range path.Split() {
+ t = t.findChild(p)
+ if t == nil {
+ return nil
+ }
+ }
+ return t.node
+}
+
+func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) {
+
+ c.Lock()
+ defer c.Unlock()
+
+ c.doSetFsNode(path, node)
+}
+
+func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) {
+ t := c.root
+ for _, p := range path.Split() {
+ t = t.ensureChild(p)
+ }
+ t.node = node
+}
+
+func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node {
+
+ c.Lock()
+ defer c.Unlock()
+
+ t := c.doGetFsNode(path)
+ if t != nil {
+ return t
+ }
+ t = genNodeFn()
+ c.doSetFsNode(path, t)
+ return t
+}
+
+func (c *FsCache) DeleteFsNode(path util.FullPath) {
+
+ c.Lock()
+ defer c.Unlock()
+
+ t := c.root
+ for _, p := range path.Split() {
+ t = t.findChild(p)
+ if t == nil {
+ return
+ }
+ }
+ if t.parent != nil {
+ t.parent.disconnectChild(t)
+ }
+ t.deleteSelf()
+}
+
+// oldPath and newPath are full path including the new name
+func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
+
+ c.Lock()
+ defer c.Unlock()
+
+ // find old node
+ src := c.root
+ for _, p := range oldPath.Split() {
+ src = src.findChild(p)
+ if src == nil {
+ return src
+ }
+ }
+ if src.parent != nil {
+ src.parent.disconnectChild(src)
+ }
+
+ // find new node
+ target := c.root
+ for _, p := range newPath.Split() {
+ target = target.ensureChild(p)
+ }
+ parent := target.parent
+ if dir, ok := src.node.(*Dir); ok {
+ dir.name = target.name // target is not Dir, but a shortcut
+ }
+ if f, ok := src.node.(*File); ok {
+ f.Name = target.name
+ entry := f.getEntry()
+ if entry != nil {
+ entry.Name = f.Name
+ }
+ }
+ parent.disconnectChild(target)
+
+ target.deleteSelf()
+
+ src.name = target.name
+ src.connectToParent(parent)
+
+ return src
+}
+
+func (n *FsNode) connectToParent(parent *FsNode) {
+ n.parent = parent
+ oldNode := parent.findChild(n.name)
+ if oldNode != nil {
+ oldNode.deleteSelf()
+ }
+ if dir, ok := n.node.(*Dir); ok {
+ if parent.node != nil {
+ dir.parent = parent.node.(*Dir)
+ }
+ }
+ if f, ok := n.node.(*File); ok {
+ if parent.node != nil {
+ f.dir = parent.node.(*Dir)
+ }
+ }
+ n.childrenLock.Lock()
+ parent.children[n.name] = n
+ n.childrenLock.Unlock()
+}
+
+func (n *FsNode) findChild(name string) *FsNode {
+ n.childrenLock.RLock()
+ defer n.childrenLock.RUnlock()
+
+ child, found := n.children[name]
+ if found {
+ return child
+ }
+ return nil
+}
+
+func (n *FsNode) ensureChild(name string) *FsNode {
+ n.childrenLock.Lock()
+ defer n.childrenLock.Unlock()
+
+ if n.children == nil {
+ n.children = make(map[string]*FsNode)
+ }
+ child, found := n.children[name]
+ if found {
+ return child
+ }
+ t := &FsNode{
+ parent: n,
+ node: nil,
+ name: name,
+ children: nil,
+ }
+ n.children[name] = t
+ return t
+}
+
+func (n *FsNode) disconnectChild(child *FsNode) {
+ n.childrenLock.Lock()
+ delete(n.children, child.name)
+ n.childrenLock.Unlock()
+ child.parent = nil
+}
+
+func (n *FsNode) deleteSelf() {
+ n.childrenLock.Lock()
+ for _, child := range n.children {
+ child.deleteSelf()
+ }
+ n.children = nil
+ n.childrenLock.Unlock()
+
+ n.node = nil
+ n.parent = nil
+
+}
diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go
new file mode 100644
index 000000000..1152eb32e
--- /dev/null
+++ b/weed/filesys/fscache_test.go
@@ -0,0 +1,115 @@
+package filesys
+
+import (
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func TestPathSplit(t *testing.T) {
+ parts := util.FullPath("/").Split()
+ if len(parts) != 0 {
+ t.Errorf("expecting an empty list, but getting %d", len(parts))
+ }
+
+ parts = util.FullPath("/readme.md").Split()
+ if len(parts) != 1 {
+ t.Errorf("expecting an empty list, but getting %d", len(parts))
+ }
+
+}
+
+func TestFsCache(t *testing.T) {
+
+ cache := newFsCache(nil)
+
+ x := cache.GetFsNode(util.FullPath("/y/x"))
+ if x != nil {
+ t.Errorf("wrong node!")
+ }
+
+ p := util.FullPath("/a/b/c")
+ cache.SetFsNode(p, &File{Name: "cc"})
+ tNode := cache.GetFsNode(p)
+ tFile := tNode.(*File)
+ if tFile.Name != "cc" {
+ t.Errorf("expecting a FsNode")
+ }
+
+ cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
+ cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
+ cache.SetFsNode(util.FullPath("/a/b/f"), &File{Name: "ff"})
+ cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
+ cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
+
+ b := cache.GetFsNode(util.FullPath("/a/b"))
+ if b != nil {
+ t.Errorf("unexpected node!")
+ }
+
+ a := cache.GetFsNode(util.FullPath("/a"))
+ if a == nil {
+ t.Errorf("missing node!")
+ }
+
+ cache.DeleteFsNode(util.FullPath("/a"))
+ if b != nil {
+ t.Errorf("unexpected node!")
+ }
+
+ a = cache.GetFsNode(util.FullPath("/a"))
+ if a != nil {
+ t.Errorf("wrong DeleteFsNode!")
+ }
+
+ z := cache.GetFsNode(util.FullPath("/z"))
+ if z == nil {
+ t.Errorf("missing node!")
+ }
+
+ y := cache.GetFsNode(util.FullPath("/x/y"))
+ if y != nil {
+ t.Errorf("wrong node!")
+ }
+
+}
+
+func TestFsCacheMove(t *testing.T) {
+
+ cache := newFsCache(nil)
+
+ cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
+ cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
+ cache.SetFsNode(util.FullPath("/z"), &File{Name: "zz"})
+ cache.SetFsNode(util.FullPath("/a"), &File{Name: "aa"})
+
+ cache.Move(util.FullPath("/a/b"), util.FullPath("/z/x"))
+
+ d := cache.GetFsNode(util.FullPath("/z/x/d"))
+ if d == nil {
+ t.Errorf("unexpected nil node!")
+ }
+ if d.(*File).Name != "dd" {
+ t.Errorf("unexpected non dd node!")
+ }
+
+}
+
+func TestFsCacheMove2(t *testing.T) {
+
+ cache := newFsCache(nil)
+
+ cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
+ cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
+
+ cache.Move(util.FullPath("/a/b/d"), util.FullPath("/a/b/e"))
+
+ d := cache.GetFsNode(util.FullPath("/a/b/e"))
+ if d == nil {
+ t.Errorf("unexpected nil node!")
+ }
+ if d.(*File).Name != "e" {
+ t.Errorf("unexpected node!")
+ }
+
+}
diff --git a/weed/filesys/meta_cache/cache_config.go b/weed/filesys/meta_cache/cache_config.go
new file mode 100644
index 000000000..e6593ebde
--- /dev/null
+++ b/weed/filesys/meta_cache/cache_config.go
@@ -0,0 +1,32 @@
+package meta_cache
+
+import "github.com/chrislusf/seaweedfs/weed/util"
+
+var (
+ _ = util.Configuration(&cacheConfig{})
+)
+
+// implementing util.Configuraion
+type cacheConfig struct {
+ dir string
+}
+
+func (c cacheConfig) GetString(key string) string {
+ return c.dir
+}
+
+func (c cacheConfig) GetBool(key string) bool {
+ panic("implement me")
+}
+
+func (c cacheConfig) GetInt(key string) int {
+ panic("implement me")
+}
+
+func (c cacheConfig) GetStringSlice(key string) []string {
+ panic("implement me")
+}
+
+func (c cacheConfig) SetDefault(key string, value interface{}) {
+ panic("implement me")
+}
diff --git a/weed/filesys/meta_cache/id_mapper.go b/weed/filesys/meta_cache/id_mapper.go
new file mode 100644
index 000000000..4a2179f31
--- /dev/null
+++ b/weed/filesys/meta_cache/id_mapper.go
@@ -0,0 +1,101 @@
+package meta_cache
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+)
+
+type UidGidMapper struct {
+ uidMapper *IdMapper
+ gidMapper *IdMapper
+}
+
+type IdMapper struct {
+ localToFiler map[uint32]uint32
+ filerToLocal map[uint32]uint32
+}
+
+// UidGidMapper translates local uid/gid to filer uid/gid
+// The local storage always persists the same as the filer.
+// The local->filer translation happens when updating the filer first and later saving to meta_cache.
+// And filer->local happens when reading from the meta_cache.
+func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) {
+ uidMapper, err := newIdMapper(uidPairsStr)
+ if err != nil {
+ return nil, err
+ }
+ gidMapper, err := newIdMapper(gidPairStr)
+ if err != nil {
+ return nil, err
+ }
+
+ return &UidGidMapper{
+ uidMapper: uidMapper,
+ gidMapper: gidMapper,
+ }, nil
+}
+
+func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) {
+ return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid)
+}
+func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) {
+ return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid)
+}
+
+func (m *IdMapper) LocalToFiler(id uint32) uint32 {
+ value, found := m.localToFiler[id]
+ if found {
+ return value
+ }
+ return id
+}
+func (m *IdMapper) FilerToLocal(id uint32) uint32 {
+ value, found := m.filerToLocal[id]
+ if found {
+ return value
+ }
+ return id
+}
+
+func newIdMapper(pairsStr string) (*IdMapper, error) {
+
+ localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr)
+ if err != nil {
+ return nil, err
+ }
+
+ return &IdMapper{
+ localToFiler: localToFiler,
+ filerToLocal: filerToLocal,
+ }, nil
+
+}
+
+func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) {
+
+ if pairsStr == "" {
+ return
+ }
+
+ localToFiler = make(map[uint32]uint32)
+ filerToLocal = make(map[uint32]uint32)
+ for _, pairStr := range strings.Split(pairsStr, ",") {
+ pair := strings.Split(pairStr, ":")
+ localUidStr, filerUidStr := pair[0], pair[1]
+ localUid, localUidErr := strconv.Atoi(localUidStr)
+ if localUidErr != nil {
+ err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr)
+ return
+ }
+ filerUid, filerUidErr := strconv.Atoi(filerUidStr)
+ if filerUidErr != nil {
+ err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr)
+ return
+ }
+ localToFiler[uint32(localUid)] = uint32(filerUid)
+ filerToLocal[uint32(filerUid)] = uint32(localUid)
+ }
+
+ return
+}
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
new file mode 100644
index 000000000..b9d4724c9
--- /dev/null
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -0,0 +1,152 @@
+package meta_cache
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "strings"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/bounded_tree"
+)
+
+// need to have logic similar to FilerStoreWrapper
+// e.g. fill fileId field for chunks
+
+type MetaCache struct {
+ localStore filer.VirtualFilerStore
+ sync.RWMutex
+ visitedBoundary *bounded_tree.BoundedTree
+ uidGidMapper *UidGidMapper
+ invalidateFunc func(util.FullPath)
+}
+
+func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper, invalidateFunc func(util.FullPath)) *MetaCache {
+ return &MetaCache{
+ localStore: openMetaStore(dbFolder),
+ visitedBoundary: bounded_tree.NewBoundedTree(baseDir),
+ uidGidMapper: uidGidMapper,
+ invalidateFunc: func(fullpath util.FullPath) {
+ if baseDir != "/" && strings.HasPrefix(string(fullpath), string(baseDir)) {
+ fullpath = fullpath[len(baseDir):]
+ }
+ invalidateFunc(fullpath)
+ },
+ }
+}
+
+func openMetaStore(dbFolder string) filer.VirtualFilerStore {
+
+ os.RemoveAll(dbFolder)
+ os.MkdirAll(dbFolder, 0755)
+
+ store := &leveldb.LevelDBStore{}
+ config := &cacheConfig{
+ dir: dbFolder,
+ }
+
+ if err := store.Initialize(config, ""); err != nil {
+ glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
+ }
+
+ return filer.NewFilerStoreWrapper(store)
+
+}
+
+func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
+ mc.Lock()
+ defer mc.Unlock()
+ return mc.doInsertEntry(ctx, entry)
+}
+
+func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
+ return mc.localStore.InsertEntry(ctx, entry)
+}
+
+func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
+ mc.Lock()
+ defer mc.Unlock()
+
+ oldDir, _ := oldPath.DirAndName()
+ if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) {
+ if oldPath != "" {
+ if newEntry != nil && oldPath == newEntry.FullPath {
+ // skip the unnecessary deletion
+ // leave the update to the following InsertEntry operation
+ } else {
+ glog.V(3).Infof("DeleteEntry %s/%s", oldPath, oldPath.Name())
+ if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
+ return err
+ }
+ }
+ }
+ } else {
+ // println("unknown old directory:", oldDir)
+ }
+
+ if newEntry != nil {
+ newDir, _ := newEntry.DirAndName()
+ if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) {
+ glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name())
+ if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
+ mc.Lock()
+ defer mc.Unlock()
+ return mc.localStore.UpdateEntry(ctx, entry)
+}
+
+func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
+ mc.RLock()
+ defer mc.RUnlock()
+ entry, err = mc.localStore.FindEntry(ctx, fp)
+ if err != nil {
+ return nil, err
+ }
+ mc.mapIdFromFilerToLocal(entry)
+ return
+}
+
+func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
+ mc.Lock()
+ defer mc.Unlock()
+ return mc.localStore.DeleteEntry(ctx, fp)
+}
+
+func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
+ mc.RLock()
+ defer mc.RUnlock()
+
+ if !mc.visitedBoundary.HasVisited(dirPath) {
+ return fmt.Errorf("unsynchronized dir: %v", dirPath)
+ }
+
+ _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
+ mc.mapIdFromFilerToLocal(entry)
+ return eachEntryFunc(entry)
+ })
+ if err != nil {
+ return err
+ }
+ return err
+}
+
+func (mc *MetaCache) Shutdown() {
+ mc.Lock()
+ defer mc.Unlock()
+ mc.localStore.Shutdown()
+}
+
+func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) {
+ entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid)
+}
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
new file mode 100644
index 000000000..1ca3b16d5
--- /dev/null
+++ b/weed/filesys/meta_cache/meta_cache_init.go
@@ -0,0 +1,47 @@
+package meta_cache
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
+
+ return mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {
+
+ glog.V(4).Infof("ReadDirAllEntries %s ...", path)
+
+ util.Retry("ReadDirAllEntries", func() error {
+ err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
+ entry := filer.FromPbEntry(string(dirPath), pbEntry)
+ if IsHiddenSystemEntry(string(dirPath), entry.Name()) {
+ return nil
+ }
+ if err := mc.doInsertEntry(context.Background(), entry); err != nil {
+ glog.V(0).Infof("read %s: %v", entry.FullPath, err)
+ return err
+ }
+ if entry.IsDirectory() {
+ childDirectories = append(childDirectories, entry.Name())
+ }
+ return nil
+ })
+ return err
+ })
+
+ if err != nil {
+ err = fmt.Errorf("list %s: %v", dirPath, err)
+ }
+
+ return
+ })
+}
+
+func IsHiddenSystemEntry(dir, name string) bool {
+ return dir == "/" && name == "topics"
+}
diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go
new file mode 100644
index 000000000..f9973f436
--- /dev/null
+++ b/weed/filesys/meta_cache/meta_cache_subscribe.go
@@ -0,0 +1,86 @@
+package meta_cache
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
+
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+
+ for _, sig := range message.Signatures {
+ if sig == selfSignature && selfSignature != 0 {
+ return nil
+ }
+ }
+
+ dir := resp.Directory
+ var oldPath util.FullPath
+ var newEntry *filer.Entry
+ if message.OldEntry != nil {
+ oldPath = util.NewFullPath(dir, message.OldEntry.Name)
+ glog.V(4).Infof("deleting %v", oldPath)
+ }
+
+ if message.NewEntry != nil {
+ if message.NewParentPath != "" {
+ dir = message.NewParentPath
+ }
+ key := util.NewFullPath(dir, message.NewEntry.Name)
+ glog.V(4).Infof("creating %v", key)
+ newEntry = filer.FromPbEntry(dir, message.NewEntry)
+ }
+ err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
+ if err == nil && message.OldEntry != nil && message.NewEntry != nil {
+ key := util.NewFullPath(dir, message.NewEntry.Name)
+ mc.invalidateFunc(key)
+ }
+
+ return err
+
+ }
+
+ for {
+ err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "mount",
+ PathPrefix: dir,
+ SinceNs: lastTsNs,
+ Signature: selfSignature,
+ })
+ if err != nil {
+ return fmt.Errorf("subscribe: %v", err)
+ }
+
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+
+ if err := processEventFn(resp); err != nil {
+ glog.Fatalf("process %v: %v", resp, err)
+ }
+ lastTsNs = resp.TsNs
+ }
+ })
+ if err != nil {
+ glog.Errorf("subscribing filer meta change: %v", err)
+ }
+ time.Sleep(time.Second)
+ }
+}
diff --git a/weed/filesys/unimplemented.go b/weed/filesys/unimplemented.go
new file mode 100644
index 000000000..5c2dcf0e1
--- /dev/null
+++ b/weed/filesys/unimplemented.go
@@ -0,0 +1,22 @@
+package filesys
+
+import (
+ "context"
+
+ "github.com/seaweedfs/fuse"
+ "github.com/seaweedfs/fuse/fs"
+)
+
+// https://github.com/bazil/fuse/issues/130
+
+var _ = fs.NodeAccesser(&Dir{})
+
+func (dir *Dir) Access(ctx context.Context, req *fuse.AccessRequest) error {
+ return fuse.ENOSYS
+}
+
+var _ = fs.NodeAccesser(&File{})
+
+func (file *File) Access(ctx context.Context, req *fuse.AccessRequest) error {
+ return fuse.ENOSYS
+}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index e924783ec..42816d23d 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -3,32 +3,44 @@ package filesys
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
"os"
+ "path"
"sync"
"time"
- "github.com/karlseguin/ccache"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
+ "github.com/seaweedfs/fuse"
+ "github.com/seaweedfs/fuse/fs"
+
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
+ "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
)
type Option struct {
+ MountDirectory string
+ FilerAddress string
FilerGrpcAddress string
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
+ DiskType types.DiskType
ChunkSizeLimit int64
+ ConcurrentWriters int
+ CacheDir string
+ CacheSizeMB int64
DataCenter string
- DirListCacheLimit int64
- EntryCacheTtl time.Duration
Umask os.FileMode
MountUid uint32
@@ -36,22 +48,36 @@ type Option struct {
MountMode os.FileMode
MountCtime time.Time
MountMtime time.Time
+
+ VolumeServerAccess string // how to access volume servers
+ Cipher bool // whether encrypt data on volume server
+ UidGidMapper *meta_cache.UidGidMapper
}
var _ = fs.FS(&WFS{})
var _ = fs.FSStatfser(&WFS{})
type WFS struct {
- option *Option
- listDirectoryEntriesCache *ccache.Cache
+ option *Option
+
+ // contains all open handles, protected by handlesLock
+ handlesLock sync.Mutex
+ handles map[uint64]*FileHandle
- // contains all open handles
- handles []*FileHandle
- pathToHandleIndex map[string]int
- pathToHandleLock sync.Mutex
- bufPool sync.Pool
+ bufPool sync.Pool
stats statsCache
+
+ root fs.Node
+ fsNodeCache *FsCache
+
+ chunkCache *chunk_cache.TieredChunkCache
+ metaCache *meta_cache.MetaCache
+ signature int32
+
+ // throttle writers
+ concurrentWriters *util.LimitedConcurrentExecutor
+ Server *fs.Server
}
type statsCache struct {
filer_pb.StatisticsResponse
@@ -60,72 +86,92 @@ type statsCache struct {
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
- option: option,
- listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
- pathToHandleIndex: make(map[string]int),
+ option: option,
+ handles: make(map[uint64]*FileHandle),
bufPool: sync.Pool{
New: func() interface{} {
return make([]byte, option.ChunkSizeLimit)
},
},
+ signature: util.RandomInt32(),
+ }
+ cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:8]
+ cacheDir := path.Join(option.CacheDir, cacheUniqueId)
+ if option.CacheSizeMB > 0 {
+ os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
+ wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
}
- return wfs
-}
+ wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
-func (wfs *WFS) Root() (fs.Node, error) {
- return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil
-}
+ fsNode := NodeWithId(filePath.AsInode())
+ if err := wfs.Server.InvalidateNodeData(fsNode); err != nil {
+ glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
+ }
-func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
+ dir, name := filePath.DirAndName()
+ parent := NodeWithId(util.FullPath(dir).AsInode())
+ if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
+ glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
+ }
+ })
+ startTime := time.Now()
+ go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
+ grace.OnInterrupt(func() {
+ wfs.metaCache.Shutdown()
+ })
+
+ wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
+ wfs.fsNodeCache = newFsCache(wfs.root)
+
+ if wfs.option.ConcurrentWriters > 0 {
+ wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
+ }
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
- }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
+ return wfs
+}
+func (wfs *WFS) Root() (fs.Node, error) {
+ return wfs.root, nil
}
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
- wfs.pathToHandleLock.Lock()
- defer wfs.pathToHandleLock.Unlock()
fullpath := file.fullpath()
+ glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
+
+ inodeId := file.Id()
- index, found := wfs.pathToHandleIndex[fullpath]
- if found && wfs.handles[index] != nil {
- glog.V(2).Infoln(fullpath, "found fileHandle id", index)
- return wfs.handles[index]
+ wfs.handlesLock.Lock()
+ existingHandle, found := wfs.handles[inodeId]
+ wfs.handlesLock.Unlock()
+ if found && existingHandle != nil {
+ existingHandle.f.isOpen++
+ glog.V(4).Infof("Acquired Handle %s open %d", fullpath, existingHandle.f.isOpen)
+ return existingHandle
}
+ entry, _ := file.maybeLoadEntry(context.Background())
+ file.entry = entry
fileHandle = newFileHandle(file, uid, gid)
- for i, h := range wfs.handles {
- if h == nil {
- wfs.handles[i] = fileHandle
- fileHandle.handle = uint64(i)
- wfs.pathToHandleIndex[fullpath] = i
- glog.V(4).Infoln(fullpath, "reuse fileHandle id", fileHandle.handle)
- return
- }
- }
+ file.isOpen++
- wfs.handles = append(wfs.handles, fileHandle)
- fileHandle.handle = uint64(len(wfs.handles) - 1)
- glog.V(2).Infoln(fullpath, "new fileHandle id", fileHandle.handle)
- wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
+ wfs.handlesLock.Lock()
+ wfs.handles[inodeId] = fileHandle
+ wfs.handlesLock.Unlock()
+ fileHandle.handle = inodeId
+ glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen)
return
}
-func (wfs *WFS) ReleaseHandle(fullpath string, handleId fuse.HandleID) {
- wfs.pathToHandleLock.Lock()
- defer wfs.pathToHandleLock.Unlock()
+func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
+ wfs.handlesLock.Lock()
+ defer wfs.handlesLock.Unlock()
- glog.V(4).Infof("%s releasing handle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
- delete(wfs.pathToHandleIndex, fullpath)
- if int(handleId) < len(wfs.handles) {
- wfs.handles[int(handleId)] = nil
- }
+ glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles))
+
+ delete(wfs.handles, uint64(handleId))
return
}
@@ -137,16 +183,17 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
if wfs.stats.lastChecked < time.Now().Unix()-20 {
- err := wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.StatisticsRequest{
Collection: wfs.option.Collection,
Replication: wfs.option.Replication,
Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
+ DiskType: string(wfs.option.DiskType),
}
glog.V(4).Infof("reading filer stats: %+v", request)
- resp, err := client.Statistics(ctx, request)
+ resp, err := client.Statistics(context.Background(), request)
if err != nil {
glog.V(0).Infof("reading filer stats %v: %v", request, err)
return err
@@ -191,3 +238,34 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
return nil
}
+
+func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
+ if entry.Attributes == nil {
+ return
+ }
+ entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
+}
+func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
+ if entry.Attributes == nil {
+ return
+ }
+ entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
+}
+
+func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
+ if wfs.option.VolumeServerAccess == "filerProxy" {
+ return func(fileId string) (targetUrls []string, err error) {
+ return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
+ }
+ }
+ return filer.LookupFn(wfs)
+
+}
+
+type NodeWithId uint64
+func (n NodeWithId) Id() uint64 {
+ return uint64(n)
+}
+func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error {
+ return nil
+}
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
deleted file mode 100644
index 6e586b7df..000000000
--- a/weed/filesys/wfs_deletion.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package filesys
-
-import (
- "context"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "google.golang.org/grpc"
-)
-
-func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChunk) {
- if len(chunks) == 0 {
- return
- }
-
- var fileIds []string
- for _, chunk := range chunks {
- fileIds = append(fileIds, chunk.GetFileIdString())
- }
-
- wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- deleteFileIds(ctx, wfs.option.GrpcDialOption, client, fileIds)
- return nil
- })
-}
-
-func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error {
-
- var vids []string
- for _, fileId := range fileIds {
- vids = append(vids, filer2.VolumeId(fileId))
- }
-
- lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
-
- m := make(map[string]operation.LookupResult)
-
- glog.V(4).Infof("remove file lookup volume id locations: %v", vids)
- resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
- VolumeIds: vids,
- })
- if err != nil {
- return m, err
- }
-
- for _, vid := range vids {
- lr := operation.LookupResult{
- VolumeId: vid,
- Locations: nil,
- }
- locations := resp.LocationsMap[vid]
- for _, loc := range locations.Locations {
- lr.Locations = append(lr.Locations, operation.Location{
- Url: loc.Url,
- PublicUrl: loc.PublicUrl,
- })
- }
- m[vid] = lr
- }
-
- return m, err
- }
-
- _, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
-
- return err
-}
diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go
new file mode 100644
index 000000000..671d20ba2
--- /dev/null
+++ b/weed/filesys/wfs_filer_client.go
@@ -0,0 +1,34 @@
+package filesys
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+var _ = filer_pb.FilerClient(&WFS{})
+
+func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ err := util.Retry("filer grpc "+wfs.option.FilerGrpcAddress, func() error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
+ })
+
+ if err == nil {
+ return nil
+ }
+ return err
+
+}
+
+func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
+ if wfs.option.VolumeServerAccess == "publicUrl" {
+ return location.PublicUrl
+ }
+ return location.Url
+}
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
new file mode 100644
index 000000000..dbec3bebc
--- /dev/null
+++ b/weed/filesys/wfs_write.go
@@ -0,0 +1,75 @@
+package filesys
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
+
+ return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
+ var fileId, host string
+ var auth security.EncodedJwt
+
+ if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: wfs.option.Replication,
+ Collection: wfs.option.Collection,
+ TtlSec: wfs.option.TtlSec,
+ DiskType: string(wfs.option.DiskType),
+ DataCenter: wfs.option.DataCenter,
+ Path: string(fullPath),
+ }
+
+ resp, err := client.AssignVolume(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
+ loc := &filer_pb.Location{
+ Url: resp.Url,
+ PublicUrl: resp.PublicUrl,
+ }
+ host = wfs.AdjustedUrl(loc)
+ collection, replication = resp.Collection, resp.Replication
+
+ return nil
+ }); err != nil {
+ return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
+ }
+
+ fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ if wfs.option.VolumeServerAccess == "filerProxy" {
+ fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId)
+ }
+ uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
+ if err != nil {
+ glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
+ return nil, "", "", fmt.Errorf("upload data: %v", err)
+ }
+ if uploadResult.Error != "" {
+ glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
+ return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+
+ wfs.chunkCache.SetChunk(fileId, data)
+
+ chunk = uploadResult.ToPbFileChunk(fileId, offset)
+ return chunk, collection, replication, nil
+ }
+}
diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go
index 3c0ba164a..92e43b675 100644
--- a/weed/filesys/xattr.go
+++ b/weed/filesys/xattr.go
@@ -2,11 +2,12 @@ package filesys
import (
"context"
- "path/filepath"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
+
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
@@ -107,36 +108,16 @@ func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.Lis
}
-func (wfs *WFS) maybeLoadEntry(ctx context.Context, dir, name string) (entry *filer_pb.Entry, err error) {
+func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) {
- fullpath := filepath.Join(dir, name)
- item := wfs.listDirectoryEntriesCache.Get(fullpath)
- if item != nil && !item.Expired() {
- entry = item.Value().(*filer_pb.Entry)
- return
- }
- glog.V(3).Infof("read entry cache miss %s", fullpath)
-
- err = wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.LookupDirectoryEntryRequest{
- Name: name,
- Directory: dir,
- }
+ fullpath := util.NewFullPath(dir, name)
+ // glog.V(3).Infof("read entry cache miss %s", fullpath)
- resp, err := client.LookupDirectoryEntry(ctx, request)
- if err != nil {
- glog.V(3).Infof("file attr read file %v: %v", request, err)
- return fuse.ENOENT
- }
-
- entry = resp.Entry
- if entry != nil {
- wfs.listDirectoryEntriesCache.Set(fullpath, entry, wfs.option.EntryCacheTtl)
- }
-
- return nil
- })
-
- return
+ // read from async meta cache
+ meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
+ cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
+ if cacheErr == filer_pb.ErrNotFound {
+ return nil, fuse.ENOENT
+ }
+ return cachedEntry.ToProtoEntry(), cacheErr
}