aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir.go398
-rw-r--r--weed/filesys/dir_link.go6
-rw-r--r--weed/filesys/dir_rename.go38
-rw-r--r--weed/filesys/dirty_page.go185
-rw-r--r--weed/filesys/dirty_page_interval.go220
-rw-r--r--weed/filesys/dirty_page_interval_test.go72
-rw-r--r--weed/filesys/file.go195
-rw-r--r--weed/filesys/filehandle.go112
-rw-r--r--weed/filesys/wfs.go122
-rw-r--r--weed/filesys/wfs_deletion.go20
-rw-r--r--weed/filesys/xattr.go141
11 files changed, 1111 insertions, 398 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 79cf45385..483229b3f 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -3,7 +3,7 @@ package filesys
import (
"context"
"os"
- "path"
+ "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/filer2"
@@ -14,9 +14,9 @@ import (
)
type Dir struct {
- Path string
- wfs *WFS
- attributes *filer_pb.FuseAttributes
+ Path string
+ wfs *WFS
+ entry *filer_pb.Entry
}
var _ = fs.Node(&Dir{})
@@ -27,6 +27,11 @@ var _ = fs.HandleReadDirAller(&Dir{})
var _ = fs.NodeRemover(&Dir{})
var _ = fs.NodeRenamer(&Dir{})
var _ = fs.NodeSetattrer(&Dir{})
+var _ = fs.NodeGetxattrer(&Dir{})
+var _ = fs.NodeSetxattrer(&Dir{})
+var _ = fs.NodeRemovexattrer(&Dir{})
+var _ = fs.NodeListxattrer(&Dir{})
+var _ = fs.NodeForgetter(&Dir{})
func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
@@ -35,42 +40,41 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
if dir.Path == dir.wfs.option.FilerMountRootPath {
dir.setRootDirAttributes(attr)
+ glog.V(3).Infof("root dir Attr %s, attr: %+v", dir.Path, attr)
return nil
}
- item := dir.wfs.listDirectoryEntriesCache.Get(dir.Path)
- if item != nil && !item.Expired() {
- entry := item.Value().(*filer_pb.Entry)
+ if err := dir.maybeLoadEntry(); err != nil {
+ glog.V(3).Infof("dir Attr %s,err: %+v", dir.Path, err)
+ return err
+ }
- attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
- attr.Ctime = time.Unix(entry.Attributes.Crtime, 0)
- attr.Mode = os.FileMode(entry.Attributes.FileMode)
- attr.Gid = entry.Attributes.Gid
- attr.Uid = entry.Attributes.Uid
+ attr.Inode = filer2.FullPath(dir.Path).AsInode()
+ attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir
+ attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0)
+ attr.Crtime = time.Unix(dir.entry.Attributes.Crtime, 0)
+ attr.Gid = dir.entry.Attributes.Gid
+ attr.Uid = dir.entry.Attributes.Uid
- return nil
- }
+ glog.V(3).Infof("dir Attr %s, attr: %+v", dir.Path, attr)
- entry, err := filer2.GetEntry(ctx, dir.wfs, dir.Path)
- if err != nil {
- glog.V(2).Infof("read dir %s attr: %v, error: %v", dir.Path, dir.attributes, err)
- return err
- }
- dir.attributes = entry.Attributes
+ return nil
+}
- glog.V(2).Infof("dir %s: %v perm: %v", dir.Path, dir.attributes, os.FileMode(dir.attributes.FileMode))
+func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
- attr.Mode = os.FileMode(dir.attributes.FileMode) | os.ModeDir
+ glog.V(4).Infof("dir Getxattr %s", dir.Path)
- attr.Mtime = time.Unix(dir.attributes.Mtime, 0)
- attr.Ctime = time.Unix(dir.attributes.Crtime, 0)
- attr.Gid = dir.attributes.Gid
- attr.Uid = dir.attributes.Uid
+ if err := dir.maybeLoadEntry(); err != nil {
+ return err
+ }
- return nil
+ return getxattr(dir.entry, req, resp)
}
func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
+ attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode()
+ attr.Valid = time.Hour
attr.Uid = dir.wfs.option.MountUid
attr.Gid = dir.wfs.option.MountGid
attr.Mode = dir.wfs.option.MountMode
@@ -78,16 +82,25 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
attr.Ctime = dir.wfs.option.MountCtime
attr.Mtime = dir.wfs.option.MountMtime
attr.Atime = dir.wfs.option.MountMtime
+ attr.BlockSize = 1024 * 1024
}
-func (dir *Dir) newFile(name string, entry *filer_pb.Entry) *File {
- return &File{
- Name: name,
- dir: dir,
- wfs: dir.wfs,
- entry: entry,
- entryViewCache: nil,
- }
+func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
+ return dir.wfs.getNode(filer2.NewFullPath(dir.Path, name), func() fs.Node {
+ return &File{
+ Name: name,
+ dir: dir,
+ wfs: dir.wfs,
+ entry: entry,
+ entryViewCache: nil,
+ }
+ })
+}
+
+func (dir *Dir) newDirectory(fullpath filer2.FullPath, entry *filer_pb.Entry) fs.Node {
+ return dir.wfs.getNode(fullpath, func() fs.Node {
+ return &Dir{Path: string(fullpath), wfs: dir.wfs, entry: entry}
+ })
}
func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
@@ -109,94 +122,104 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
TtlSec: dir.wfs.option.TtlSec,
},
},
+ OExcl: req.Flags&fuse.OpenExclusive != 0,
}
- glog.V(1).Infof("create: %v", request)
+ glog.V(1).Infof("create %s/%s: %v", dir.Path, req.Name, req.Flags)
- if request.Entry.IsDirectory {
- if err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- if _, err := client.CreateEntry(ctx, request); err != nil {
- glog.V(0).Infof("create %s/%s: %v", dir.Path, req.Name, err)
- return fuse.EIO
+ if err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
+ if strings.Contains(err.Error(), "EEXIST") {
+ return fuse.EEXIST
}
- return nil
- }); err != nil {
- return nil, nil, err
+ return fuse.EIO
}
+ return nil
+ }); err != nil {
+ return nil, nil, err
}
-
- file := dir.newFile(req.Name, request.Entry)
- if !request.Entry.IsDirectory {
- file.isOpen = true
+ var node fs.Node
+ if request.Entry.IsDirectory {
+ node = dir.newDirectory(filer2.NewFullPath(dir.Path, req.Name), request.Entry)
+ return node, nil, nil
}
+
+ node = dir.newFile(req.Name, request.Entry)
+ file := node.(*File)
+ file.isOpen++
fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
- fh.dirtyMetadata = true
return file, fh, nil
}
func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
- err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ newEntry := &filer_pb.Entry{
+ Name: req.Name,
+ IsDirectory: true,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(req.Mode &^ dir.wfs.option.Umask),
+ Uid: req.Uid,
+ Gid: req.Gid,
+ },
+ }
+
+ err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: dir.Path,
- Entry: &filer_pb.Entry{
- Name: req.Name,
- IsDirectory: true,
- Attributes: &filer_pb.FuseAttributes{
- Mtime: time.Now().Unix(),
- Crtime: time.Now().Unix(),
- FileMode: uint32(req.Mode &^ dir.wfs.option.Umask),
- Uid: req.Uid,
- Gid: req.Gid,
- },
- },
+ Entry: newEntry,
}
glog.V(1).Infof("mkdir: %v", request)
- if _, err := client.CreateEntry(ctx, request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
glog.V(0).Infof("mkdir %s/%s: %v", dir.Path, req.Name, err)
- return fuse.EIO
+ return err
}
return nil
})
if err == nil {
- node := &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs}
+ node := dir.newDirectory(filer2.NewFullPath(dir.Path, req.Name), newEntry)
return node, nil
}
- return nil, err
+ return nil, fuse.EIO
}
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
- var entry *filer_pb.Entry
- fullFilePath := path.Join(dir.Path, req.Name)
+ glog.V(4).Infof("dir Lookup %s: %s", dir.Path, req.Name)
- item := dir.wfs.listDirectoryEntriesCache.Get(fullFilePath)
- if item != nil && !item.Expired() {
- entry = item.Value().(*filer_pb.Entry)
- }
+ fullFilePath := filer2.NewFullPath(dir.Path, req.Name)
+ entry := dir.wfs.cacheGet(fullFilePath)
if entry == nil {
- entry, err = filer2.GetEntry(ctx, dir.wfs, fullFilePath)
+ // glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
+ entry, err = filer2.GetEntry(dir.wfs, fullFilePath)
if err != nil {
- return nil, err
+ glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err)
+ return nil, fuse.ENOENT
}
+ dir.wfs.cacheSet(fullFilePath, entry, 5*time.Minute)
+ } else {
+ glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath)
}
if entry != nil {
if entry.IsDirectory {
- node = &Dir{Path: path.Join(dir.Path, req.Name), wfs: dir.wfs, attributes: entry.Attributes}
+ node = dir.newDirectory(fullFilePath, entry)
} else {
node = dir.newFile(req.Name, entry)
}
- resp.EntryValid = time.Duration(0)
+ // resp.EntryValid = time.Second
+ resp.Attr.Inode = fullFilePath.AsInode()
+ resp.Attr.Valid = time.Second
resp.Attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
- resp.Attr.Ctime = time.Unix(entry.Attributes.Crtime, 0)
+ resp.Attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
resp.Attr.Mode = os.FileMode(entry.Attributes.FileMode)
resp.Attr.Gid = entry.Attributes.Gid
resp.Attr.Uid = entry.Attributes.Uid
@@ -204,57 +227,32 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
return node, nil
}
+ glog.V(4).Infof("not found dir GetEntry %s: %v", fullFilePath, err)
return nil, fuse.ENOENT
}
func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
- err = dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
-
- paginationLimit := 1024
- remaining := dir.wfs.option.DirListingLimit
-
- lastEntryName := ""
-
- for remaining >= 0 {
+ glog.V(3).Infof("dir ReadDirAll %s", dir.Path)
- request := &filer_pb.ListEntriesRequest{
- Directory: dir.Path,
- StartFromFileName: lastEntryName,
- Limit: uint32(paginationLimit),
- }
-
- glog.V(4).Infof("read directory: %v", request)
- resp, err := client.ListEntries(ctx, request)
- if err != nil {
- glog.V(0).Infof("list %s: %v", dir.Path, err)
- return fuse.EIO
- }
-
- cacheTtl := estimatedCacheTtl(len(resp.Entries))
-
- for _, entry := range resp.Entries {
- if entry.IsDirectory {
- dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir}
- ret = append(ret, dirent)
- } else {
- dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_File}
- ret = append(ret, dirent)
- }
- dir.wfs.listDirectoryEntriesCache.Set(path.Join(dir.Path, entry.Name), entry, cacheTtl)
- lastEntryName = entry.Name
- }
-
- remaining -= len(resp.Entries)
-
- if len(resp.Entries) < paginationLimit {
- break
- }
+ cacheTtl := 5 * time.Minute
+ readErr := filer2.ReadDirAllEntries(dir.wfs, filer2.FullPath(dir.Path), "", func(entry *filer_pb.Entry, isLast bool) {
+ fullpath := filer2.NewFullPath(dir.Path, entry.Name)
+ inode := fullpath.AsInode()
+ if entry.IsDirectory {
+ dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_Dir}
+ ret = append(ret, dirent)
+ } else {
+ dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_File}
+ ret = append(ret, dirent)
}
-
- return nil
+ dir.wfs.cacheSet(fullpath, entry, cacheTtl)
})
+ if readErr != nil {
+ glog.V(0).Infof("list %s: %v", dir.Path, err)
+ return ret, fuse.EIO
+ }
return ret, err
}
@@ -262,23 +260,29 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
if !req.Dir {
- return dir.removeOneFile(ctx, req)
+ return dir.removeOneFile(req)
}
- return dir.removeFolder(ctx, req)
+ return dir.removeFolder(req)
}
-func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) error {
+func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
- entry, err := filer2.GetEntry(ctx, dir.wfs, path.Join(dir.Path, req.Name))
+ filePath := filer2.NewFullPath(dir.Path, req.Name)
+ entry, err := filer2.GetEntry(dir.wfs, filePath)
if err != nil {
return err
}
+ if entry == nil {
+ return nil
+ }
+
+ dir.wfs.deleteFileChunks(entry.Chunks)
- dir.wfs.deleteFileChunks(ctx, entry.Chunks)
+ dir.wfs.cacheDelete(filePath)
- return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.DeleteEntryRequest{
Directory: dir.Path,
@@ -287,22 +291,22 @@ func (dir *Dir) removeOneFile(ctx context.Context, req *fuse.RemoveRequest) erro
}
glog.V(3).Infof("remove file: %v", request)
- _, err := client.DeleteEntry(ctx, request)
+ _, err := client.DeleteEntry(context.Background(), request)
if err != nil {
- glog.V(3).Infof("remove file %s/%s: %v", dir.Path, req.Name, err)
+ glog.V(3).Infof("not found remove file %s/%s: %v", dir.Path, req.Name, err)
return fuse.ENOENT
}
- dir.wfs.listDirectoryEntriesCache.Delete(path.Join(dir.Path, req.Name))
-
return nil
})
}
-func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error {
+func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
- return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ dir.wfs.cacheDelete(filer2.NewFullPath(dir.Path, req.Name))
+
+ return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.DeleteEntryRequest{
Directory: dir.Path,
@@ -311,14 +315,12 @@ func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error
}
glog.V(3).Infof("remove directory entry: %v", request)
- _, err := client.DeleteEntry(ctx, request)
+ _, err := client.DeleteEntry(context.Background(), request)
if err != nil {
- glog.V(3).Infof("remove %s/%s: %v", dir.Path, req.Name, err)
+ glog.V(3).Infof("not found remove %s/%s: %v", dir.Path, req.Name, err)
return fuse.ENOENT
}
- dir.wfs.listDirectoryEntriesCache.Delete(path.Join(dir.Path, req.Name))
-
return nil
})
@@ -326,66 +328,122 @@ func (dir *Dir) removeFolder(ctx context.Context, req *fuse.RemoveRequest) error
func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
- if dir.attributes == nil {
- return nil
+ glog.V(3).Infof("%v dir setattr %+v", dir.Path, req)
+
+ if err := dir.maybeLoadEntry(); err != nil {
+ return err
}
- glog.V(3).Infof("%v dir setattr %+v, fh=%d", dir.Path, req, req.Handle)
if req.Valid.Mode() {
- dir.attributes.FileMode = uint32(req.Mode)
+ dir.entry.Attributes.FileMode = uint32(req.Mode)
}
if req.Valid.Uid() {
- dir.attributes.Uid = req.Uid
+ dir.entry.Attributes.Uid = req.Uid
}
if req.Valid.Gid() {
- dir.attributes.Gid = req.Gid
+ dir.entry.Attributes.Gid = req.Gid
}
if req.Valid.Mtime() {
- dir.attributes.Mtime = req.Mtime.Unix()
+ dir.entry.Attributes.Mtime = req.Mtime.Unix()
+ }
+
+ dir.wfs.cacheDelete(filer2.FullPath(dir.Path))
+
+ return dir.saveEntry()
+
+}
+
+func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
+
+ glog.V(4).Infof("dir Setxattr %s: %s", dir.Path, req.Name)
+
+ if err := dir.maybeLoadEntry(); err != nil {
+ return err
+ }
+
+ if err := setxattr(dir.entry, req); err != nil {
+ return err
+ }
+
+ dir.wfs.cacheDelete(filer2.FullPath(dir.Path))
+
+ return dir.saveEntry()
+
+}
+
+func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error {
+
+ glog.V(4).Infof("dir Removexattr %s: %s", dir.Path, req.Name)
+
+ if err := dir.maybeLoadEntry(); err != nil {
+ return err
+ }
+
+ if err := removexattr(dir.entry, req); err != nil {
+ return err
+ }
+
+ dir.wfs.cacheDelete(filer2.FullPath(dir.Path))
+
+ return dir.saveEntry()
+
+}
+
+func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
+
+ glog.V(4).Infof("dir Listxattr %s", dir.Path)
+
+ if err := dir.maybeLoadEntry(); err != nil {
+ return err
+ }
+
+ if err := listxattr(dir.entry, req, resp); err != nil {
+ return err
}
+ return nil
+
+}
+
+func (dir *Dir) Forget() {
+ glog.V(3).Infof("Forget dir %s", dir.Path)
+
+ dir.wfs.forgetNode(filer2.FullPath(dir.Path))
+}
+
+func (dir *Dir) maybeLoadEntry() error {
+ if dir.entry == nil {
+ parentDirPath, name := filer2.FullPath(dir.Path).DirAndName()
+ entry, err := dir.wfs.maybeLoadEntry(parentDirPath, name)
+ if err != nil {
+ return err
+ }
+ dir.entry = entry
+ }
+ return nil
+}
+
+func (dir *Dir) saveEntry() error {
+
parentDir, name := filer2.FullPath(dir.Path).DirAndName()
- return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+
+ return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.UpdateEntryRequest{
Directory: parentDir,
- Entry: &filer_pb.Entry{
- Name: name,
- Attributes: dir.attributes,
- },
+ Entry: dir.entry,
}
- glog.V(1).Infof("set attr directory entry: %v", request)
- _, err := client.UpdateEntry(ctx, request)
+ glog.V(1).Infof("save dir entry: %v", request)
+ _, err := client.UpdateEntry(context.Background(), request)
if err != nil {
- glog.V(0).Infof("UpdateEntry %s: %v", dir.Path, err)
+ glog.V(0).Infof("UpdateEntry dir %s/%s: %v", parentDir, name, err)
return fuse.EIO
}
- dir.wfs.listDirectoryEntriesCache.Delete(dir.Path)
-
return nil
})
-
-}
-
-func estimatedCacheTtl(numEntries int) time.Duration {
- if numEntries < 100 {
- // 30 ms per entry
- return 3 * time.Second
- }
- if numEntries < 1000 {
- // 10 ms per entry
- return 10 * time.Second
- }
- if numEntries < 10000 {
- // 10 ms per entry
- return 100 * time.Second
- }
-
- // 2 ms per entry
- return time.Duration(numEntries*2) * time.Millisecond
}
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
index 94e443649..61ed04c26 100644
--- a/weed/filesys/dir_link.go
+++ b/weed/filesys/dir_link.go
@@ -35,8 +35,8 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
},
}
- err := dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- if _, err := client.CreateEntry(ctx, request); err != nil {
+ err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
glog.V(0).Infof("symlink %s/%s: %v", dir.Path, req.NewName, err)
return fuse.EIO
}
@@ -51,7 +51,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (string, error) {
- if err := file.maybeLoadAttributes(ctx); err != nil {
+ if err := file.maybeLoadEntry(ctx); err != nil {
return "", err
}
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go
index e72a15758..9b0c0fe6e 100644
--- a/weed/filesys/dir_rename.go
+++ b/weed/filesys/dir_rename.go
@@ -2,7 +2,9 @@ package filesys
import (
"context"
- "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
@@ -11,8 +13,9 @@ import (
func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error {
newDir := newDirectory.(*Dir)
+ glog.V(4).Infof("dir Rename %s/%s => %s/%s", dir.Path, req.OldName, newDir.Path, req.NewName)
- return dir.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AtomicRenameEntryRequest{
OldDirectory: dir.Path,
@@ -21,13 +24,40 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
NewName: req.NewName,
}
- _, err := client.AtomicRenameEntry(ctx, request)
+ _, err := client.AtomicRenameEntry(context.Background(), request)
if err != nil {
- return fmt.Errorf("renaming %s/%s => %s/%s: %v", dir.Path, req.OldName, newDir.Path, req.NewName, err)
+ glog.V(0).Infof("dir Rename %s/%s => %s/%s : %v", dir.Path, req.OldName, newDir.Path, req.NewName, err)
+ return fuse.EIO
}
return nil
})
+ if err == nil {
+ newPath := filer2.NewFullPath(newDir.Path, req.NewName)
+ oldPath := filer2.NewFullPath(dir.Path, req.OldName)
+ dir.wfs.cacheDelete(newPath)
+ dir.wfs.cacheDelete(oldPath)
+
+ oldFileNode := dir.wfs.getNode(oldPath, func() fs.Node {
+ return nil
+ })
+ newDirNode := dir.wfs.getNode(filer2.FullPath(dir.Path), func() fs.Node {
+ return nil
+ })
+ dir.wfs.forgetNode(newPath)
+ dir.wfs.forgetNode(oldPath)
+ if oldFileNode != nil && newDirNode != nil {
+ oldFile := oldFileNode.(*File)
+ oldFile.Name = req.NewName
+ oldFile.dir = newDirNode.(*Dir)
+ dir.wfs.getNode(newPath, func() fs.Node {
+ return oldFile
+ })
+
+ }
+ }
+
+ return err
}
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index baee412b2..7e33c97a7 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -4,8 +4,8 @@ import (
"bytes"
"context"
"fmt"
+ "io"
"sync"
- "sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -15,113 +15,68 @@ import (
)
type ContinuousDirtyPages struct {
- hasData bool
- Offset int64
- Size int64
- Data []byte
- f *File
- lock sync.Mutex
+ intervals *ContinuousIntervals
+ f *File
+ lock sync.Mutex
+ collection string
+ replication string
}
func newDirtyPages(file *File) *ContinuousDirtyPages {
return &ContinuousDirtyPages{
- Data: nil,
- f: file,
+ intervals: &ContinuousIntervals{},
+ f: file,
}
}
func (pages *ContinuousDirtyPages) releaseResource() {
- if pages.Data != nil {
- pages.f.wfs.bufPool.Put(pages.Data)
- pages.Data = nil
- atomic.AddInt32(&counter, -1)
- glog.V(3).Infof("%s/%s releasing resource %d", pages.f.dir.Path, pages.f.Name, counter)
- }
}
var counter = int32(0)
-func (pages *ContinuousDirtyPages) AddPage(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
+func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
pages.lock.Lock()
defer pages.lock.Unlock()
- var chunk *filer_pb.FileChunk
+ glog.V(3).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data)))
if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
// this is more than what buffer can hold.
- return pages.flushAndSave(ctx, offset, data)
- }
-
- if pages.Data == nil {
- pages.Data = pages.f.wfs.bufPool.Get().([]byte)
- atomic.AddInt32(&counter, 1)
- glog.V(3).Infof("%s/%s acquire resource %d", pages.f.dir.Path, pages.f.Name, counter)
+ return pages.flushAndSave(offset, data)
}
- if offset < pages.Offset || offset >= pages.Offset+int64(len(pages.Data)) ||
- pages.Offset+int64(len(pages.Data)) < offset+int64(len(data)) {
- // if the data is out of range,
- // or buffer is full if adding new data,
- // flush current buffer and add new data
+ pages.intervals.AddInterval(data, offset)
- // println("offset", offset, "size", len(data), "existing offset", pages.Offset, "size", pages.Size)
+ var chunk *filer_pb.FileChunk
+ var hasSavedData bool
- if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
- if chunk != nil {
- glog.V(4).Infof("%s/%s add save [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
- chunks = append(chunks, chunk)
- }
- } else {
- glog.V(0).Infof("%s/%s add save [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
- return
+ if pages.intervals.TotalSize() > pages.f.wfs.option.ChunkSizeLimit {
+ chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage()
+ if hasSavedData {
+ chunks = append(chunks, chunk)
}
- pages.Offset = offset
- copy(pages.Data, data)
- pages.Size = int64(len(data))
- return
}
- if offset != pages.Offset+pages.Size {
- // when this happens, debug shows the data overlapping with existing data is empty
- // the data is not just append
- if offset == pages.Offset && int(pages.Size) < len(data) {
- // glog.V(2).Infof("pages[%d,%d) pages.Data len=%v, data len=%d, pages.Size=%d", pages.Offset, pages.Offset+pages.Size, len(pages.Data), len(data), pages.Size)
- copy(pages.Data[pages.Size:], data[pages.Size:])
- } else {
- if pages.Size != 0 {
- glog.V(1).Infof("%s/%s add page: pages [%d, %d) write [%d, %d)", pages.f.dir.Path, pages.f.Name, pages.Offset, pages.Offset+pages.Size, offset, offset+int64(len(data)))
- }
- return pages.flushAndSave(ctx, offset, data)
- }
- } else {
- copy(pages.Data[offset-pages.Offset:], data)
- }
-
- pages.Size = max(pages.Size, offset+int64(len(data))-pages.Offset)
-
return
}
-func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
+func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
var chunk *filer_pb.FileChunk
+ var newChunks []*filer_pb.FileChunk
// flush existing
- if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
- if chunk != nil {
- glog.V(4).Infof("%s/%s flush existing [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId)
- chunks = append(chunks, chunk)
+ if newChunks, err = pages.saveExistingPagesToStorage(); err == nil {
+ if newChunks != nil {
+ chunks = append(chunks, newChunks...)
}
} else {
- glog.V(0).Infof("%s/%s failed to flush1 [%d,%d): %v", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
return
}
- pages.Size = 0
- pages.Offset = 0
// flush the new page
- if chunk, err = pages.saveToStorage(ctx, data, offset); err == nil {
+ if chunk, err = pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))); err == nil {
if chunk != nil {
glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId)
chunks = append(chunks, chunk)
@@ -134,40 +89,62 @@ func (pages *ContinuousDirtyPages) flushAndSave(ctx context.Context, offset int6
return
}
-func (pages *ContinuousDirtyPages) FlushToStorage(ctx context.Context) (chunk *filer_pb.FileChunk, err error) {
+func (pages *ContinuousDirtyPages) FlushToStorage() (chunks []*filer_pb.FileChunk, err error) {
pages.lock.Lock()
defer pages.lock.Unlock()
- if pages.Size == 0 {
- return nil, nil
- }
+ return pages.saveExistingPagesToStorage()
+}
- if chunk, err = pages.saveExistingPagesToStorage(ctx); err == nil {
- pages.Size = 0
- pages.Offset = 0
- if chunk != nil {
- glog.V(4).Infof("%s/%s flush [%d,%d)", pages.f.dir.Path, pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size))
+func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer_pb.FileChunk, err error) {
+
+ var hasSavedData bool
+ var chunk *filer_pb.FileChunk
+
+ for {
+
+ chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage()
+ if !hasSavedData {
+ return chunks, err
+ }
+
+ if err == nil {
+ chunks = append(chunks, chunk)
+ } else {
+ return
}
}
- return
+
}
-func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Context) (*filer_pb.FileChunk, error) {
+func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *filer_pb.FileChunk, hasSavedData bool, err error) {
- if pages.Size == 0 {
- return nil, nil
+ maxList := pages.intervals.RemoveLargestIntervalLinkedList()
+ if maxList == nil {
+ return nil, false, nil
}
- return pages.saveToStorage(ctx, pages.Data[:pages.Size], pages.Offset)
+ chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size())
+ if err == nil {
+ hasSavedData = true
+ glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId)
+ } else {
+ glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err)
+ return
+ }
+
+ return
}
-func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
+func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) {
var fileId, host string
var auth security.EncodedJwt
- if err := pages.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ dir, _ := pages.f.fullpath().DirAndName()
+
+ if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -175,15 +152,21 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte
Collection: pages.f.wfs.option.Collection,
TtlSec: pages.f.wfs.option.TtlSec,
DataCenter: pages.f.wfs.option.DataCenter,
+ ParentPath: dir,
}
- resp, err := client.AssignVolume(ctx, request)
+ resp, err := client.AssignVolume(context.Background(), request)
if err != nil {
glog.V(0).Infof("assign volume failure %v: %v", request, err)
return err
}
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ host = pages.f.wfs.AdjustedUrl(host)
+ pages.collection, pages.replication = resp.Collection, resp.Replication
return nil
}); err != nil {
@@ -191,8 +174,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- bufReader := bytes.NewReader(buf)
- uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, auth)
+ uploadResult, err := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth)
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
return nil, fmt.Errorf("upload data: %v", err)
@@ -203,11 +185,13 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte
}
return &filer_pb.FileChunk{
- FileId: fileId,
- Offset: offset,
- Size: uint64(len(buf)),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
+ FileId: fileId,
+ Offset: offset,
+ Size: uint64(size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ CipherKey: uploadResult.CipherKey,
+ IsGzipped: uploadResult.Gzip > 0,
}, nil
}
@@ -218,3 +202,18 @@ func max(x, y int64) int64 {
}
return y
}
+func min(x, y int64) int64 {
+ if x < y {
+ return x
+ }
+ return y
+}
+
+func (pages *ContinuousDirtyPages) ReadDirtyData(data []byte, startOffset int64) (offset int64, size int) {
+
+ pages.lock.Lock()
+ defer pages.lock.Unlock()
+
+ return pages.intervals.ReadData(data, startOffset)
+
+}
diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go
new file mode 100644
index 000000000..ec94c6df1
--- /dev/null
+++ b/weed/filesys/dirty_page_interval.go
@@ -0,0 +1,220 @@
+package filesys
+
+import (
+ "bytes"
+ "io"
+ "math"
+)
+
+type IntervalNode struct {
+ Data []byte
+ Offset int64
+ Size int64
+ Next *IntervalNode
+}
+
+type IntervalLinkedList struct {
+ Head *IntervalNode
+ Tail *IntervalNode
+}
+
+type ContinuousIntervals struct {
+ lists []*IntervalLinkedList
+}
+
+func (list *IntervalLinkedList) Offset() int64 {
+ return list.Head.Offset
+}
+func (list *IntervalLinkedList) Size() int64 {
+ return list.Tail.Offset + list.Tail.Size - list.Head.Offset
+}
+func (list *IntervalLinkedList) addNodeToTail(node *IntervalNode) {
+ // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size)
+ list.Tail.Next = node
+ list.Tail = node
+}
+func (list *IntervalLinkedList) addNodeToHead(node *IntervalNode) {
+ // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size)
+ node.Next = list.Head
+ list.Head = node
+}
+
+func (list *IntervalLinkedList) ReadData(buf []byte, start, stop int64) {
+ t := list.Head
+ for {
+
+ nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size)
+ if nodeStart < nodeStop {
+ // glog.V(0).Infof("copying start=%d stop=%d t=[%d,%d) t.data=%d => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.Offset, t.Offset+t.Size, len(t.Data), len(buf), nodeStart, nodeStop)
+ copy(buf[nodeStart-start:], t.Data[nodeStart-t.Offset:nodeStop-t.Offset])
+ }
+
+ if t.Next == nil {
+ break
+ }
+ t = t.Next
+ }
+}
+
+func (c *ContinuousIntervals) TotalSize() (total int64) {
+ for _, list := range c.lists {
+ total += list.Size()
+ }
+ return
+}
+
+func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList {
+ var nodes []*IntervalNode
+ for t := list.Head; t != nil; t = t.Next {
+ nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size)
+ if nodeStart >= nodeStop {
+ // skip non overlapping IntervalNode
+ continue
+ }
+ nodes = append(nodes, &IntervalNode{
+ Data: t.Data[nodeStart-t.Offset : nodeStop-t.Offset],
+ Offset: nodeStart,
+ Size: nodeStop - nodeStart,
+ Next: nil,
+ })
+ }
+ for i := 1; i < len(nodes); i++ {
+ nodes[i-1].Next = nodes[i]
+ }
+ return &IntervalLinkedList{
+ Head: nodes[0],
+ Tail: nodes[len(nodes)-1],
+ }
+}
+
+func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) {
+
+ interval := &IntervalNode{Data: data, Offset: offset, Size: int64(len(data))}
+
+ var newLists []*IntervalLinkedList
+ for _, list := range c.lists {
+ // if list is to the left of new interval, add to the new list
+ if list.Tail.Offset+list.Tail.Size <= interval.Offset {
+ newLists = append(newLists, list)
+ }
+ // if list is to the right of new interval, add to the new list
+ if interval.Offset+interval.Size <= list.Head.Offset {
+ newLists = append(newLists, list)
+ }
+ // if new interval overwrite the right part of the list
+ if list.Head.Offset < interval.Offset && interval.Offset < list.Tail.Offset+list.Tail.Size {
+ // create a new list of the left part of existing list
+ newLists = append(newLists, subList(list, list.Offset(), interval.Offset))
+ }
+ // if new interval overwrite the left part of the list
+ if list.Head.Offset < interval.Offset+interval.Size && interval.Offset+interval.Size < list.Tail.Offset+list.Tail.Size {
+ // create a new list of the right part of existing list
+ newLists = append(newLists, subList(list, interval.Offset+interval.Size, list.Tail.Offset+list.Tail.Size))
+ }
+ // skip anything that is fully overwritten by the new interval
+ }
+
+ c.lists = newLists
+ // add the new interval to the lists, connecting neighbor lists
+ var prevList, nextList *IntervalLinkedList
+
+ for _, list := range c.lists {
+ if list.Head.Offset == interval.Offset+interval.Size {
+ nextList = list
+ break
+ }
+ }
+
+ for _, list := range c.lists {
+ if list.Head.Offset+list.Size() == offset {
+ list.addNodeToTail(interval)
+ prevList = list
+ break
+ }
+ }
+
+ if prevList != nil && nextList != nil {
+ // glog.V(4).Infof("connecting [%d,%d) + [%d,%d) => [%d,%d)", prevList.Head.Offset, prevList.Tail.Offset+prevList.Tail.Size, nextList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size, prevList.Head.Offset, nextList.Tail.Offset+nextList.Tail.Size)
+ prevList.Tail.Next = nextList.Head
+ prevList.Tail = nextList.Tail
+ c.removeList(nextList)
+ } else if nextList != nil {
+ // add to head was not done when checking
+ nextList.addNodeToHead(interval)
+ }
+ if prevList == nil && nextList == nil {
+ c.lists = append(c.lists, &IntervalLinkedList{
+ Head: interval,
+ Tail: interval,
+ })
+ }
+
+ return
+}
+
+func (c *ContinuousIntervals) RemoveLargestIntervalLinkedList() *IntervalLinkedList {
+ var maxSize int64
+ maxIndex := -1
+ for k, list := range c.lists {
+ if maxSize <= list.Size() {
+ maxSize = list.Size()
+ maxIndex = k
+ }
+ }
+ if maxSize <= 0 {
+ return nil
+ }
+
+ t := c.lists[maxIndex]
+ c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...)
+ return t
+
+}
+
+func (c *ContinuousIntervals) removeList(target *IntervalLinkedList) {
+ index := -1
+ for k, list := range c.lists {
+ if list.Offset() == target.Offset() {
+ index = k
+ }
+ }
+ if index < 0 {
+ return
+ }
+
+ c.lists = append(c.lists[0:index], c.lists[index+1:]...)
+
+}
+
+func (c *ContinuousIntervals) ReadData(data []byte, startOffset int64) (offset int64, size int) {
+ var minOffset int64 = math.MaxInt64
+ var maxStop int64
+ for _, list := range c.lists {
+ start := max(startOffset, list.Offset())
+ stop := min(startOffset+int64(len(data)), list.Offset()+list.Size())
+ if start <= stop {
+ list.ReadData(data[start-startOffset:], start, stop)
+ minOffset = min(minOffset, start)
+ maxStop = max(maxStop, stop)
+ }
+ }
+
+ if minOffset == math.MaxInt64 {
+ return 0, 0
+ }
+
+ offset = minOffset
+ size = int(maxStop - offset)
+ return
+}
+
+func (l *IntervalLinkedList) ToReader() io.Reader {
+ var readers []io.Reader
+ t := l.Head
+ readers = append(readers, bytes.NewReader(t.Data))
+ for t.Next != nil {
+ t = t.Next
+ readers = append(readers, bytes.NewReader(t.Data))
+ }
+ return io.MultiReader(readers...)
+}
diff --git a/weed/filesys/dirty_page_interval_test.go b/weed/filesys/dirty_page_interval_test.go
new file mode 100644
index 000000000..184be2f3b
--- /dev/null
+++ b/weed/filesys/dirty_page_interval_test.go
@@ -0,0 +1,72 @@
+package filesys
+
+import (
+ "bytes"
+ "testing"
+)
+
+func TestContinuousIntervals_AddIntervalAppend(t *testing.T) {
+
+ c := &ContinuousIntervals{}
+
+ // 25, 25, 25
+ c.AddInterval(getBytes(25, 3), 0)
+ // _, _, 23, 23, 23, 23
+ c.AddInterval(getBytes(23, 4), 2)
+
+ expectedData(t, c, 0, 25, 25, 23, 23, 23, 23)
+
+}
+
+func TestContinuousIntervals_AddIntervalInnerOverwrite(t *testing.T) {
+
+ c := &ContinuousIntervals{}
+
+ // 25, 25, 25, 25, 25
+ c.AddInterval(getBytes(25, 5), 0)
+ // _, _, 23, 23
+ c.AddInterval(getBytes(23, 2), 2)
+
+ expectedData(t, c, 0, 25, 25, 23, 23, 25)
+
+}
+
+func TestContinuousIntervals_AddIntervalFullOverwrite(t *testing.T) {
+
+ c := &ContinuousIntervals{}
+
+ // 25,
+ c.AddInterval(getBytes(25, 1), 0)
+ // _, _, _, _, 23, 23
+ c.AddInterval(getBytes(23, 2), 4)
+ // _, _, _, 24, 24, 24, 24
+ c.AddInterval(getBytes(24, 4), 3)
+
+ // _, 22, 22
+ c.AddInterval(getBytes(22, 2), 1)
+
+ expectedData(t, c, 0, 25, 22, 22, 24, 24, 24, 24)
+
+}
+
+func expectedData(t *testing.T, c *ContinuousIntervals, offset int, data ...byte) {
+ start, stop := int64(offset), int64(offset+len(data))
+ for _, list := range c.lists {
+ nodeStart, nodeStop := max(start, list.Head.Offset), min(stop, list.Head.Offset+list.Size())
+ if nodeStart < nodeStop {
+ buf := make([]byte, nodeStop-nodeStart)
+ list.ReadData(buf, nodeStart, nodeStop)
+ if bytes.Compare(buf, data[nodeStart-start:nodeStop-start]) != 0 {
+ t.Errorf("expected %v actual %v", data[nodeStart-start:nodeStop-start], buf)
+ }
+ }
+ }
+}
+
+func getBytes(content byte, length int) []byte {
+ data := make([]byte, length)
+ for i := 0; i < length; i++ {
+ data[i] = content
+ }
+ return data
+}
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 1b359ebbe..69d440a73 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -3,7 +3,6 @@ package filesys
import (
"context"
"os"
- "path/filepath"
"sort"
"time"
@@ -20,6 +19,11 @@ var _ = fs.Node(&File{})
var _ = fs.NodeOpener(&File{})
var _ = fs.NodeFsyncer(&File{})
var _ = fs.NodeSetattrer(&File{})
+var _ = fs.NodeGetxattrer(&File{})
+var _ = fs.NodeSetxattrer(&File{})
+var _ = fs.NodeRemovexattrer(&File{})
+var _ = fs.NodeListxattrer(&File{})
+var _ = fs.NodeForgetter(&File{})
type File struct {
Name string
@@ -27,21 +31,32 @@ type File struct {
wfs *WFS
entry *filer_pb.Entry
entryViewCache []filer2.VisibleInterval
- isOpen bool
+ isOpen int
}
-func (file *File) fullpath() string {
- return filepath.Join(file.dir.Path, file.Name)
+func (file *File) fullpath() filer2.FullPath {
+ return filer2.NewFullPath(file.dir.Path, file.Name)
}
func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
- if err := file.maybeLoadAttributes(ctx); err != nil {
- return err
+ glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr)
+
+ if file.isOpen <= 0 {
+ if err := file.maybeLoadEntry(ctx); err != nil {
+ return err
+ }
}
+ attr.Inode = file.fullpath().AsInode()
+ attr.Valid = time.Second
attr.Mode = os.FileMode(file.entry.Attributes.FileMode)
attr.Size = filer2.TotalSize(file.entry.Chunks)
+ if file.isOpen > 0 {
+ attr.Size = file.entry.Attributes.FileSize
+ glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size)
+ }
+ attr.Crtime = time.Unix(file.entry.Attributes.Crtime, 0)
attr.Mtime = time.Unix(file.entry.Attributes.Mtime, 0)
attr.Gid = file.entry.Attributes.Gid
attr.Uid = file.entry.Attributes.Uid
@@ -52,11 +67,22 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
}
+func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
+
+ // glog.V(4).Infof("file Getxattr %s", file.fullpath())
+
+ if err := file.maybeLoadEntry(ctx); err != nil {
+ return err
+ }
+
+ return getxattr(file.entry, req, resp)
+}
+
func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
- glog.V(3).Infof("%v file open %+v", file.fullpath(), req)
+ glog.V(4).Infof("file %v open %+v", file.fullpath(), req)
- file.isOpen = true
+ file.isOpen++
handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid)
@@ -70,17 +96,28 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op
func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
- if err := file.maybeLoadAttributes(ctx); err != nil {
+ glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.entry.Attributes)
+
+ if err := file.maybeLoadEntry(ctx); err != nil {
return err
}
- glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.entry.Attributes)
if req.Valid.Size() {
glog.V(3).Infof("%v file setattr set size=%v", file.fullpath(), req.Size)
- if req.Size == 0 {
+ if req.Size < filer2.TotalSize(file.entry.Chunks) {
// fmt.Printf("truncate %v \n", fullPath)
- file.entry.Chunks = nil
+ var chunks []*filer_pb.FileChunk
+ for _, chunk := range file.entry.Chunks {
+ int64Size := int64(chunk.Size)
+ if chunk.Offset+int64Size > int64(req.Size) {
+ int64Size = int64(req.Size) - chunk.Offset
+ }
+ if int64Size > 0 {
+ chunks = append(chunks, chunk)
+ }
+ }
+ file.entry.Chunks = chunks
file.entryViewCache = nil
}
file.entry.Attributes.FileSize = req.Size
@@ -105,79 +142,94 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
file.entry.Attributes.Mtime = req.Mtime.Unix()
}
- if file.isOpen {
+ if file.isOpen > 0 {
return nil
}
- return file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ file.wfs.cacheDelete(file.fullpath())
- request := &filer_pb.UpdateEntryRequest{
- Directory: file.dir.Path,
- Entry: file.entry,
- }
+ return file.saveEntry()
- glog.V(1).Infof("set attr file entry: %v", request)
- _, err := client.UpdateEntry(ctx, request)
- if err != nil {
- glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.Path, file.Name, err)
- return fuse.EIO
- }
+}
- return nil
- })
+func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
-}
+ glog.V(4).Infof("file Setxattr %s: %s", file.fullpath(), req.Name)
-func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
- // fsync works at OS level
- // write the file chunks to the filerGrpcAddress
- glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req)
+ if err := file.maybeLoadEntry(ctx); err != nil {
+ return err
+ }
+
+ if err := setxattr(file.entry, req); err != nil {
+ return err
+ }
+
+ file.wfs.cacheDelete(file.fullpath())
+
+ return file.saveEntry()
- return nil
}
-func (file *File) maybeLoadAttributes(ctx context.Context) error {
- if file.entry == nil || !file.isOpen {
- item := file.wfs.listDirectoryEntriesCache.Get(file.fullpath())
- if item != nil && !item.Expired() {
- entry := item.Value().(*filer_pb.Entry)
- file.setEntry(entry)
- // glog.V(1).Infof("file attr read cached %v attributes", file.Name)
- } else {
- err := file.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) error {
- request := &filer_pb.LookupDirectoryEntryRequest{
- Name: file.Name,
- Directory: file.dir.Path,
- }
+ glog.V(4).Infof("file Removexattr %s: %s", file.fullpath(), req.Name)
- resp, err := client.LookupDirectoryEntry(ctx, request)
- if err != nil {
- glog.V(3).Infof("file attr read file %v: %v", request, err)
- return fuse.ENOENT
- }
+ if err := file.maybeLoadEntry(ctx); err != nil {
+ return err
+ }
- file.setEntry(resp.Entry)
+ if err := removexattr(file.entry, req); err != nil {
+ return err
+ }
+
+ file.wfs.cacheDelete(file.fullpath())
- glog.V(3).Infof("file attr %v %+v: %d", file.fullpath(), file.entry.Attributes, filer2.TotalSize(file.entry.Chunks))
+ return file.saveEntry()
- // file.wfs.listDirectoryEntriesCache.Set(file.fullpath(), file.entry, file.wfs.option.EntryCacheTtl)
+}
- return nil
- })
+func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
- if err != nil {
- return err
- }
- }
+ glog.V(4).Infof("file Listxattr %s", file.fullpath())
+
+ if err := file.maybeLoadEntry(ctx); err != nil {
+ return err
+ }
+
+ if err := listxattr(file.entry, req, resp); err != nil {
+ return err
}
+
return nil
+
}
-func (file *File) addChunk(chunk *filer_pb.FileChunk) {
- if chunk != nil {
- file.addChunks([]*filer_pb.FileChunk{chunk})
+func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
+ // fsync works at OS level
+ // write the file chunks to the filerGrpcAddress
+ glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req)
+
+ return nil
+}
+
+func (file *File) Forget() {
+ glog.V(3).Infof("Forget file %s/%s", file.dir.Path, file.Name)
+
+ file.wfs.forgetNode(filer2.NewFullPath(file.dir.Path, file.Name))
+
+}
+
+func (file *File) maybeLoadEntry(ctx context.Context) error {
+ if file.entry == nil || file.isOpen <= 0 {
+ entry, err := file.wfs.maybeLoadEntry(file.dir.Path, file.Name)
+ if err != nil {
+ return err
+ }
+ if entry != nil {
+ file.setEntry(entry)
+ }
}
+ return nil
}
func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
@@ -203,3 +255,22 @@ func (file *File) setEntry(entry *filer_pb.Entry) {
file.entry = entry
file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks)
}
+
+func (file *File) saveEntry() error {
+ return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.UpdateEntryRequest{
+ Directory: file.dir.Path,
+ Entry: file.entry,
+ }
+
+ glog.V(1).Infof("save file entry: %v", request)
+ _, err := client.UpdateEntry(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.Path, file.Name, err)
+ return fuse.EIO
+ }
+
+ return nil
+ })
+}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 1f4754dd1..100c9eba0 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -7,10 +7,11 @@ import (
"path"
"time"
+ "github.com/gabriel-vasile/mimetype"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/gabriel-vasile/mimetype"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
@@ -50,29 +51,51 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus
glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size))
+ buff := make([]byte, req.Size)
+
+ totalRead, err := fh.readFromChunks(buff, req.Offset)
+ if err == nil {
+ dirtyOffset, dirtySize := fh.readFromDirtyPages(buff, req.Offset)
+ if totalRead+req.Offset < dirtyOffset+int64(dirtySize) {
+ totalRead = dirtyOffset + int64(dirtySize) - req.Offset
+ }
+ }
+
+ resp.Data = buff[:totalRead]
+
+ if err != nil {
+ glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
+ return fuse.EIO
+ }
+
+ return err
+}
+
+func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (offset int64, size int) {
+ return fh.dirtyPages.ReadDirtyData(buff, startOffset)
+}
+
+func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
+
// this value should come from the filer instead of the old f
if len(fh.f.entry.Chunks) == 0 {
- glog.V(1).Infof("empty fh %v/%v", fh.f.dir.Path, fh.f.Name)
- return nil
+ glog.V(1).Infof("empty fh %v", fh.f.fullpath())
+ return 0, nil
}
- buff := make([]byte, req.Size)
-
if fh.f.entryViewCache == nil {
fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks)
}
- chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, req.Offset, req.Size)
-
- totalRead, err := filer2.ReadIntoBuffer(ctx, fh.f.wfs, fh.f.fullpath(), buff, chunkViews, req.Offset)
+ chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, offset, len(buff))
- resp.Data = buff[:totalRead]
+ totalRead, err := filer2.ReadIntoBuffer(fh.f.wfs, fh.f.fullpath(), buff, chunkViews, offset)
if err != nil {
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
}
- return err
+ return totalRead, err
}
// Write to the file handle
@@ -80,30 +103,32 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
// write the request to volume servers
- glog.V(4).Infof("%+v/%v write fh %d: [%d,%d)", fh.f.dir.Path, fh.f.Name, fh.handle, req.Offset, req.Offset+int64(len(req.Data)))
+ fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(req.Data)), int64(fh.f.entry.Attributes.FileSize)))
+ // glog.V(0).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)))
- chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data)
+ chunks, err := fh.dirtyPages.AddPage(req.Offset, req.Data)
if err != nil {
- glog.Errorf("%+v/%v write fh %d: [%d,%d): %v", fh.f.dir.Path, fh.f.Name, fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err)
- return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err)
+ glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err)
+ return fuse.EIO
}
resp.Size = len(req.Data)
if req.Offset == 0 {
// detect mime type
- var possibleExt string
- fh.contentType, possibleExt = mimetype.Detect(req.Data)
- if ext := path.Ext(fh.f.Name); ext != possibleExt {
+ detectedMIME := mimetype.Detect(req.Data)
+ fh.contentType = detectedMIME.String()
+ if ext := path.Ext(fh.f.Name); ext != detectedMIME.Extension() {
fh.contentType = mime.TypeByExtension(ext)
}
fh.dirtyMetadata = true
}
- fh.f.addChunks(chunks)
-
if len(chunks) > 0 {
+
+ fh.f.addChunks(chunks)
+
fh.dirtyMetadata = true
}
@@ -114,11 +139,12 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
glog.V(4).Infof("%v release fh %d", fh.f.fullpath(), fh.handle)
- fh.dirtyPages.releaseResource()
-
- fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
+ fh.f.isOpen--
- fh.f.isOpen = false
+ if fh.f.isOpen <= 0 {
+ fh.dirtyPages.releaseResource()
+ fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
+ }
return nil
}
@@ -128,19 +154,22 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
// send the data to the OS
glog.V(4).Infof("%s fh %d flush %v", fh.f.fullpath(), fh.handle, req)
- chunk, err := fh.dirtyPages.FlushToStorage(ctx)
+ chunks, err := fh.dirtyPages.FlushToStorage()
if err != nil {
- glog.Errorf("flush %s/%s: %v", fh.f.dir.Path, fh.f.Name, err)
- return fmt.Errorf("flush %s/%s: %v", fh.f.dir.Path, fh.f.Name, err)
+ glog.Errorf("flush %s: %v", fh.f.fullpath(), err)
+ return fuse.EIO
}
- fh.f.addChunk(chunk)
+ fh.f.addChunks(chunks)
+ if len(chunks) > 0 {
+ fh.dirtyMetadata = true
+ }
if !fh.dirtyMetadata {
return nil
}
- return fh.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ err = fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
if fh.f.entry.Attributes != nil {
fh.f.entry.Attributes.Mime = fh.contentType
@@ -149,6 +178,8 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
fh.f.entry.Attributes.Mtime = time.Now().Unix()
fh.f.entry.Attributes.Crtime = time.Now().Unix()
fh.f.entry.Attributes.FileMode = uint32(0777 &^ fh.f.wfs.option.Umask)
+ fh.f.entry.Attributes.Collection = fh.dirtyPages.collection
+ fh.f.entry.Attributes.Replication = fh.dirtyPages.replication
}
request := &filer_pb.CreateEntryRequest{
@@ -156,25 +187,36 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
Entry: fh.f.entry,
}
- glog.V(3).Infof("%s/%s set chunks: %v", fh.f.dir.Path, fh.f.Name, len(fh.f.entry.Chunks))
+ glog.V(3).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks))
for i, chunk := range fh.f.entry.Chunks {
- glog.V(3).Infof("%s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
}
chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks)
fh.f.entry.Chunks = chunks
// fh.f.entryViewCache = nil
- if _, err := client.CreateEntry(ctx, request); err != nil {
- glog.Errorf("update fh: %v", err)
- return fmt.Errorf("update fh: %v", err)
+ if err := filer_pb.CreateEntry(client, request); err != nil {
+ glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
+ return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
}
- fh.f.wfs.deleteFileChunks(ctx, garbages)
+ fh.f.wfs.deleteFileChunks(garbages)
for i, chunk := range garbages {
- glog.V(3).Infof("garbage %s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ glog.V(3).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
}
return nil
})
+
+ if err == nil {
+ fh.dirtyMetadata = false
+ }
+
+ if err != nil {
+ glog.Errorf("%v fh %d flush: %v", fh.f.fullpath(), fh.handle, err)
+ return fuse.EIO
+ }
+
+ return nil
}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 1bd9b5cc9..77438b58e 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -5,16 +5,19 @@ import (
"fmt"
"math"
"os"
+ "strings"
"sync"
"time"
+ "github.com/karlseguin/ccache"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/karlseguin/ccache"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
- "google.golang.org/grpc"
)
type Option struct {
@@ -26,7 +29,7 @@ type Option struct {
TtlSec int32
ChunkSizeLimit int64
DataCenter string
- DirListingLimit int
+ DirListCacheLimit int64
EntryCacheTtl time.Duration
Umask os.FileMode
@@ -35,6 +38,10 @@ type Option struct {
MountMode os.FileMode
MountCtime time.Time
MountMtime time.Time
+
+ OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
+ Cipher bool // whether encrypt data on volume server
+
}
var _ = fs.FS(&WFS{})
@@ -44,13 +51,19 @@ type WFS struct {
option *Option
listDirectoryEntriesCache *ccache.Cache
- // contains all open handles
+ // contains all open handles, protected by handlesLock
+ handlesLock sync.Mutex
handles []*FileHandle
- pathToHandleIndex map[string]int
- pathToHandleLock sync.Mutex
- bufPool sync.Pool
+ pathToHandleIndex map[filer2.FullPath]int
+
+ bufPool sync.Pool
stats statsCache
+
+ // nodes, protected by nodesLock
+ nodesLock sync.Mutex
+ nodes map[uint64]fs.Node
+ root fs.Node
}
type statsCache struct {
filer_pb.StatisticsResponse
@@ -60,36 +73,46 @@ type statsCache struct {
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
option: option,
- listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(1024 * 8).ItemsToPrune(100)),
- pathToHandleIndex: make(map[string]int),
+ listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
+ pathToHandleIndex: make(map[filer2.FullPath]int),
bufPool: sync.Pool{
New: func() interface{} {
return make([]byte, option.ChunkSizeLimit)
},
},
+ nodes: make(map[uint64]fs.Node),
}
+ wfs.root = &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}
+
return wfs
}
func (wfs *WFS) Root() (fs.Node, error) {
- return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil
+ return wfs.root, nil
}
-func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
+func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
+ if err == nil {
+ return nil
+ }
+ return err
+
}
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
- wfs.pathToHandleLock.Lock()
- defer wfs.pathToHandleLock.Unlock()
fullpath := file.fullpath()
+ glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid)
+
+ wfs.handlesLock.Lock()
+ defer wfs.handlesLock.Unlock()
index, found := wfs.pathToHandleIndex[fullpath]
if found && wfs.handles[index] != nil {
@@ -103,24 +126,24 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand
wfs.handles[i] = fileHandle
fileHandle.handle = uint64(i)
wfs.pathToHandleIndex[fullpath] = i
- glog.V(4).Infoln(fullpath, "reuse fileHandle id", fileHandle.handle)
+ glog.V(4).Infof("%s reuse fh %d", fullpath, fileHandle.handle)
return
}
}
wfs.handles = append(wfs.handles, fileHandle)
fileHandle.handle = uint64(len(wfs.handles) - 1)
- glog.V(2).Infoln(fullpath, "new fileHandle id", fileHandle.handle)
wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
+ glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
return
}
-func (wfs *WFS) ReleaseHandle(fullpath string, handleId fuse.HandleID) {
- wfs.pathToHandleLock.Lock()
- defer wfs.pathToHandleLock.Unlock()
+func (wfs *WFS) ReleaseHandle(fullpath filer2.FullPath, handleId fuse.HandleID) {
+ wfs.handlesLock.Lock()
+ defer wfs.handlesLock.Unlock()
- glog.V(4).Infof("%s releasing handle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
+ glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
delete(wfs.pathToHandleIndex, fullpath)
if int(handleId) < len(wfs.handles) {
wfs.handles[int(handleId)] = nil
@@ -136,7 +159,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
if wfs.stats.lastChecked < time.Now().Unix()-20 {
- err := wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.StatisticsRequest{
Collection: wfs.option.Collection,
@@ -145,7 +168,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
}
glog.V(4).Infof("reading filer stats: %+v", request)
- resp, err := client.Statistics(ctx, request)
+ resp, err := client.Statistics(context.Background(), request)
if err != nil {
glog.V(0).Infof("reading filer stats %v: %v", request, err)
return err
@@ -190,3 +213,56 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
return nil
}
+
+func (wfs *WFS) cacheGet(path filer2.FullPath) *filer_pb.Entry {
+ item := wfs.listDirectoryEntriesCache.Get(string(path))
+ if item != nil && !item.Expired() {
+ return item.Value().(*filer_pb.Entry)
+ }
+ return nil
+}
+func (wfs *WFS) cacheSet(path filer2.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
+ if entry == nil {
+ wfs.listDirectoryEntriesCache.Delete(string(path))
+ } else {
+ wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl)
+ }
+}
+func (wfs *WFS) cacheDelete(path filer2.FullPath) {
+ wfs.listDirectoryEntriesCache.Delete(string(path))
+}
+
+func (wfs *WFS) getNode(fullpath filer2.FullPath, fn func() fs.Node) fs.Node {
+ wfs.nodesLock.Lock()
+ defer wfs.nodesLock.Unlock()
+
+ node, found := wfs.nodes[fullpath.AsInode()]
+ if found {
+ return node
+ }
+ node = fn()
+ if node != nil {
+ wfs.nodes[fullpath.AsInode()] = node
+ }
+ return node
+}
+
+func (wfs *WFS) forgetNode(fullpath filer2.FullPath) {
+ wfs.nodesLock.Lock()
+ defer wfs.nodesLock.Unlock()
+
+ delete(wfs.nodes, fullpath.AsInode())
+}
+
+func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
+ if !wfs.option.OutsideContainerClusterMode {
+ return hostAndPort
+ }
+ commaIndex := strings.Index(hostAndPort, ":")
+ if commaIndex < 0 {
+ return hostAndPort
+ }
+ filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":")
+ return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:])
+
+}
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
index 6e586b7df..bf21b1808 100644
--- a/weed/filesys/wfs_deletion.go
+++ b/weed/filesys/wfs_deletion.go
@@ -3,14 +3,15 @@ package filesys
import (
"context"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "google.golang.org/grpc"
)
-func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChunk) {
+func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
if len(chunks) == 0 {
return
}
@@ -20,13 +21,13 @@ func (wfs *WFS) deleteFileChunks(ctx context.Context, chunks []*filer_pb.FileChu
fileIds = append(fileIds, chunk.GetFileIdString())
}
- wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- deleteFileIds(ctx, wfs.option.GrpcDialOption, client, fileIds)
+ wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ wfs.deleteFileIds(wfs.option.GrpcDialOption, client, fileIds)
return nil
})
}
-func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error {
+func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error {
var vids []string
for _, fileId := range fileIds {
@@ -38,7 +39,7 @@ func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client f
m := make(map[string]operation.LookupResult)
glog.V(4).Infof("remove file lookup volume id locations: %v", vids)
- resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
+ resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: vids,
})
if err != nil {
@@ -50,10 +51,13 @@ func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client f
VolumeId: vid,
Locations: nil,
}
- locations := resp.LocationsMap[vid]
+ locations, found := resp.LocationsMap[vid]
+ if !found {
+ continue
+ }
for _, loc := range locations.Locations {
lr.Locations = append(lr.Locations, operation.Location{
- Url: loc.Url,
+ Url: wfs.AdjustedUrl(loc.Url),
PublicUrl: loc.PublicUrl,
})
}
diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go
new file mode 100644
index 000000000..af154a7ee
--- /dev/null
+++ b/weed/filesys/xattr.go
@@ -0,0 +1,141 @@
+package filesys
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/fuse"
+)
+
+func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
+
+ if entry == nil {
+ return fuse.ErrNoXattr
+ }
+ if entry.Extended == nil {
+ return fuse.ErrNoXattr
+ }
+ data, found := entry.Extended[req.Name]
+ if !found {
+ return fuse.ErrNoXattr
+ }
+ if req.Position < uint32(len(data)) {
+ size := req.Size
+ if req.Position+size >= uint32(len(data)) {
+ size = uint32(len(data)) - req.Position
+ }
+ if size == 0 {
+ resp.Xattr = data[req.Position:]
+ } else {
+ resp.Xattr = data[req.Position : req.Position+size]
+ }
+ }
+
+ return nil
+
+}
+
+func setxattr(entry *filer_pb.Entry, req *fuse.SetxattrRequest) error {
+
+ if entry == nil {
+ return fuse.EIO
+ }
+
+ if entry.Extended == nil {
+ entry.Extended = make(map[string][]byte)
+ }
+ data, _ := entry.Extended[req.Name]
+
+ newData := make([]byte, int(req.Position)+len(req.Xattr))
+
+ copy(newData, data)
+
+ copy(newData[int(req.Position):], req.Xattr)
+
+ entry.Extended[req.Name] = newData
+
+ return nil
+
+}
+
+func removexattr(entry *filer_pb.Entry, req *fuse.RemovexattrRequest) error {
+
+ if entry == nil {
+ return fuse.ErrNoXattr
+ }
+
+ if entry.Extended == nil {
+ return fuse.ErrNoXattr
+ }
+
+ _, found := entry.Extended[req.Name]
+
+ if !found {
+ return fuse.ErrNoXattr
+ }
+
+ delete(entry.Extended, req.Name)
+
+ return nil
+
+}
+
+func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.ListxattrResponse) error {
+
+ if entry == nil {
+ return fuse.EIO
+ }
+
+ for k := range entry.Extended {
+ resp.Append(k)
+ }
+
+ size := req.Size
+ if req.Position+size >= uint32(len(resp.Xattr)) {
+ size = uint32(len(resp.Xattr)) - req.Position
+ }
+
+ if size == 0 {
+ resp.Xattr = resp.Xattr[req.Position:]
+ } else {
+ resp.Xattr = resp.Xattr[req.Position : req.Position+size]
+ }
+
+ return nil
+
+}
+
+func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) {
+
+ fullpath := filer2.NewFullPath(dir, name)
+ entry = wfs.cacheGet(fullpath)
+ if entry != nil {
+ return
+ }
+ // glog.V(3).Infof("read entry cache miss %s", fullpath)
+
+ err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.LookupDirectoryEntryRequest{
+ Name: name,
+ Directory: dir,
+ }
+
+ resp, err := filer_pb.LookupEntry(client, request)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ glog.V(3).Infof("file attr read not found file %v: %v", request, err)
+ return fuse.ENOENT
+ }
+ glog.V(3).Infof("attr read %v: %v", request, err)
+ return fuse.EIO
+ }
+
+ entry = resp.Entry
+ wfs.cacheSet(fullpath, entry, wfs.option.EntryCacheTtl)
+
+ return nil
+ })
+
+ return
+}