aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir.go173
-rw-r--r--weed/filesys/dir_link.go93
-rw-r--r--weed/filesys/dir_rename.go51
-rw-r--r--weed/filesys/dirty_page.go210
-rw-r--r--weed/filesys/dirty_page_interval.go31
-rw-r--r--weed/filesys/dirty_page_interval_test.go24
-rw-r--r--weed/filesys/file.go120
-rw-r--r--weed/filesys/filehandle.go215
-rw-r--r--weed/filesys/fscache.go13
-rw-r--r--weed/filesys/fscache_test.go19
-rw-r--r--weed/filesys/meta_cache/id_mapper.go101
-rw-r--r--weed/filesys/meta_cache/meta_cache.go99
-rw-r--r--weed/filesys/meta_cache/meta_cache_init.go41
-rw-r--r--weed/filesys/meta_cache/meta_cache_subscribe.go39
-rw-r--r--weed/filesys/unimplemented.go22
-rw-r--r--weed/filesys/wfs.go130
-rw-r--r--weed/filesys/wfs_deletion.go19
-rw-r--r--weed/filesys/wfs_filer_client.go31
-rw-r--r--weed/filesys/wfs_write.go71
-rw-r--r--weed/filesys/xattr.go45
20 files changed, 1004 insertions, 543 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index e4260d56f..ae2ae3418 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -3,16 +3,19 @@ package filesys
import (
"bytes"
"context"
+ "math"
"os"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/seaweedfs/fuse"
+ "github.com/seaweedfs/fuse/fs"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
)
type Dir struct {
@@ -25,6 +28,7 @@ type Dir struct {
var _ = fs.Node(&Dir{})
var _ = fs.NodeCreater(&Dir{})
var _ = fs.NodeMkdirer(&Dir{})
+var _ = fs.NodeFsyncer(&Dir{})
var _ = fs.NodeRequestLookuper(&Dir{})
var _ = fs.HandleReadDirAller(&Dir{})
var _ = fs.NodeRemover(&Dir{})
@@ -88,8 +92,16 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
attr.BlockSize = 1024 * 1024
}
+func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
+ // fsync works at OS level
+ // write the file chunks to the filerGrpcAddress
+ glog.V(3).Infof("dir %s fsync %+v", dir.FullPath(), req)
+
+ return nil
+}
+
func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
- return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node {
+ f := dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node {
return &File{
Name: name,
dir: dir,
@@ -98,14 +110,17 @@ func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
entryViewCache: nil,
}
})
+ f.(*File).dir = dir // in case dir node was created later
+ return f
}
func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node {
- return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node {
+ d := dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node {
return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir}
})
-
+ d.(*Dir).parent = dir // in case dir node was created later
+ return d
}
func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
@@ -127,21 +142,25 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
TtlSec: dir.wfs.option.TtlSec,
},
},
- OExcl: req.Flags&fuse.OpenExclusive != 0,
+ OExcl: req.Flags&fuse.OpenExclusive != 0,
+ Signatures: []int32{dir.wfs.signature},
}
glog.V(1).Infof("create %s/%s: %v", dir.FullPath(), req.Name, req.Flags)
if err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
+ defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
+
if err := filer_pb.CreateEntry(client, request); err != nil {
if strings.Contains(err.Error(), "EEXIST") {
return fuse.EEXIST
}
+ glog.V(0).Infof("create %s/%s: %v", dir.FullPath(), req.Name, err)
return fuse.EIO
}
- if dir.wfs.option.AsyncMetaDataCaching {
- dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
- }
+ dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
}); err != nil {
@@ -155,7 +174,6 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
node = dir.newFile(req.Name, request.Entry)
file := node.(*File)
- file.isOpen++
fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
return file, fh, nil
@@ -179,9 +197,13 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ dir.wfs.mapPbIdFromLocalToFiler(newEntry)
+ defer dir.wfs.mapPbIdFromFilerToLocal(newEntry)
+
request := &filer_pb.CreateEntryRequest{
- Directory: dir.FullPath(),
- Entry: newEntry,
+ Directory: dir.FullPath(),
+ Entry: newEntry,
+ Signatures: []int32{dir.wfs.signature},
}
glog.V(1).Infof("mkdir: %v", request)
@@ -190,9 +212,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
return err
}
- if dir.wfs.option.AsyncMetaDataCaching {
- dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
- }
+ dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
})
@@ -213,15 +233,17 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
- entry := dir.wfs.cacheGet(fullFilePath)
-
- if dir.wfs.option.AsyncMetaDataCaching {
- cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
- if cacheErr == filer_pb.ErrNotFound {
- return nil, fuse.ENOENT
- }
- entry = cachedEntry.ToProtoEntry()
+ dirPath := util.FullPath(dir.FullPath())
+ visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
+ if visitErr != nil {
+ glog.Errorf("dir Lookup %s: %v", dirPath, visitErr)
+ return nil, fuse.EIO
}
+ cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
+ if cacheErr == filer_pb.ErrNotFound {
+ return nil, fuse.ENOENT
+ }
+ entry := cachedEntry.ToProtoEntry()
if entry == nil {
// glog.V(3).Infof("dir Lookup cache miss %s", fullFilePath)
@@ -230,7 +252,6 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
glog.V(1).Infof("dir GetEntry %s: %v", fullFilePath, err)
return nil, fuse.ENOENT
}
- dir.wfs.cacheSet(fullFilePath, entry, 5*time.Minute)
} else {
glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath)
}
@@ -250,6 +271,9 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
resp.Attr.Mode = os.FileMode(entry.Attributes.FileMode)
resp.Attr.Gid = entry.Attributes.Gid
resp.Attr.Uid = entry.Attributes.Uid
+ if entry.HardLinkCounter > 0 {
+ resp.Attr.Nlink = uint32(entry.HardLinkCounter)
+ }
return node, nil
}
@@ -260,9 +284,8 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
- glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath())
+ glog.V(4).Infof("dir ReadDirAll %s", dir.FullPath())
- cacheTtl := 5 * time.Minute
processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error {
fullpath := util.NewFullPath(dir.FullPath(), entry.Name)
inode := fullpath.AsInode()
@@ -273,29 +296,23 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_File}
ret = append(ret, dirent)
}
- dir.wfs.cacheSet(fullpath, entry, cacheTtl)
return nil
}
- if dir.wfs.option.AsyncMetaDataCaching {
- listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit))
- if listErr != nil {
- glog.Errorf("list meta cache: %v", listErr)
- return nil, fuse.EIO
- }
- for _, cachedEntry := range listedEntries {
- processEachEntryFn(cachedEntry.ToProtoEntry(), false)
- }
- return
+ dirPath := util.FullPath(dir.FullPath())
+ if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil {
+ glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
+ return nil, fuse.EIO
}
-
- readErr := filer_pb.ReadDirAllEntries(dir.wfs, util.FullPath(dir.FullPath()), "", processEachEntryFn)
- if readErr != nil {
- glog.V(0).Infof("list %s: %v", dir.FullPath(), err)
- return ret, fuse.EIO
+ listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(math.MaxInt32))
+ if listErr != nil {
+ glog.Errorf("list meta cache: %v", listErr)
+ return nil, fuse.EIO
}
-
- return ret, err
+ for _, cachedEntry := range listedEntries {
+ processEachEntryFn(cachedEntry.ToProtoEntry(), false)
+ }
+ return
}
func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
@@ -319,50 +336,52 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
return nil
}
- dir.wfs.deleteFileChunks(entry.Chunks)
-
- dir.wfs.cacheDelete(filePath)
- dir.wfs.fsNodeCache.DeleteFsNode(filePath)
-
- if dir.wfs.option.AsyncMetaDataCaching {
- dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
- }
-
+ // first, ensure the filer store can correctly delete
glog.V(3).Infof("remove file: %v", req)
- err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false)
+ isDeleteData := entry.HardLinkCounter <= 1
+ err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, isDeleteData, false, false, false, []int32{dir.wfs.signature})
if err != nil {
glog.V(3).Infof("not found remove file %s/%s: %v", dir.FullPath(), req.Name, err)
return fuse.ENOENT
}
+ // then, delete meta cache and fsNode cache
+ dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
+ dir.wfs.fsNodeCache.DeleteFsNode(filePath)
+
+ // delete the chunks last
+ if isDeleteData {
+ dir.wfs.deleteFileChunks(entry.Chunks)
+ }
+
return nil
}
func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
- t := util.NewFullPath(dir.FullPath(), req.Name)
- dir.wfs.cacheDelete(t)
- dir.wfs.fsNodeCache.DeleteFsNode(t)
-
- if dir.wfs.option.AsyncMetaDataCaching {
- dir.wfs.metaCache.DeleteEntry(context.Background(), t)
- }
-
glog.V(3).Infof("remove directory entry: %v", req)
- err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false)
+ ignoreRecursiveErr := true // ignore recursion error since the OS should manage it
+ err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, ignoreRecursiveErr, false, []int32{dir.wfs.signature})
if err != nil {
- glog.V(3).Infof("not found remove %s/%s: %v", dir.FullPath(), req.Name, err)
+ glog.V(0).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err)
+ if strings.Contains(err.Error(), "non-empty") {
+ return fuse.EEXIST
+ }
return fuse.ENOENT
}
+ t := util.NewFullPath(dir.FullPath(), req.Name)
+ dir.wfs.metaCache.DeleteEntry(context.Background(), t)
+ dir.wfs.fsNodeCache.DeleteFsNode(t)
+
return nil
}
func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
- glog.V(3).Infof("%v dir setattr %+v", dir.FullPath(), req)
+ glog.V(4).Infof("%v dir setattr %+v", dir.FullPath(), req)
if err := dir.maybeLoadEntry(); err != nil {
return err
@@ -384,8 +403,6 @@ func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fus
dir.entry.Attributes.Mtime = req.Mtime.Unix()
}
- dir.wfs.cacheDelete(util.FullPath(dir.FullPath()))
-
return dir.saveEntry()
}
@@ -402,8 +419,6 @@ func (dir *Dir) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error {
return err
}
- dir.wfs.cacheDelete(util.FullPath(dir.FullPath()))
-
return dir.saveEntry()
}
@@ -420,8 +435,6 @@ func (dir *Dir) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) e
return err
}
- dir.wfs.cacheDelete(util.FullPath(dir.FullPath()))
-
return dir.saveEntry()
}
@@ -443,7 +456,7 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp
}
func (dir *Dir) Forget() {
- glog.V(3).Infof("Forget dir %s", dir.FullPath())
+ glog.V(4).Infof("Forget dir %s", dir.FullPath())
dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath()))
}
@@ -466,21 +479,23 @@ func (dir *Dir) saveEntry() error {
return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ dir.wfs.mapPbIdFromLocalToFiler(dir.entry)
+ defer dir.wfs.mapPbIdFromFilerToLocal(dir.entry)
+
request := &filer_pb.UpdateEntryRequest{
- Directory: parentDir,
- Entry: dir.entry,
+ Directory: parentDir,
+ Entry: dir.entry,
+ Signatures: []int32{dir.wfs.signature},
}
glog.V(1).Infof("save dir entry: %v", request)
_, err := client.UpdateEntry(context.Background(), request)
if err != nil {
- glog.V(0).Infof("UpdateEntry dir %s/%s: %v", parentDir, name, err)
+ glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err)
return fuse.EIO
}
- if dir.wfs.option.AsyncMetaDataCaching {
- dir.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
- }
+ dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
})
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
index d1858e99b..f6bc41b56 100644
--- a/weed/filesys/dir_link.go
+++ b/weed/filesys/dir_link.go
@@ -2,23 +2,101 @@ package filesys
import (
"context"
+ "github.com/chrislusf/seaweedfs/weed/util"
"os"
"syscall"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
+var _ = fs.NodeLinker(&Dir{})
var _ = fs.NodeSymlinker(&Dir{})
var _ = fs.NodeReadlinker(&File{})
+const (
+ HARD_LINK_MARKER = '\x01'
+)
+
+func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (fs.Node, error) {
+
+ oldFile, ok := old.(*File)
+ if !ok {
+ glog.Errorf("old node is not a file: %+v", old)
+ }
+
+ glog.V(4).Infof("Link: %v/%v -> %v/%v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName)
+
+ if err := oldFile.maybeLoadEntry(ctx); err != nil {
+ return nil, err
+ }
+
+ // update old file to hardlink mode
+ if len(oldFile.entry.HardLinkId) == 0 {
+ oldFile.entry.HardLinkId = append(util.RandomBytes(16), HARD_LINK_MARKER)
+ oldFile.entry.HardLinkCounter = 1
+ }
+ oldFile.entry.HardLinkCounter++
+ updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
+ Directory: oldFile.dir.FullPath(),
+ Entry: oldFile.entry,
+ Signatures: []int32{dir.wfs.signature},
+ }
+
+ // CreateLink 1.2 : update new file to hardlink mode
+ request := &filer_pb.CreateEntryRequest{
+ Directory: dir.FullPath(),
+ Entry: &filer_pb.Entry{
+ Name: req.NewName,
+ IsDirectory: false,
+ Attributes: oldFile.entry.Attributes,
+ Chunks: oldFile.entry.Chunks,
+ Extended: oldFile.entry.Extended,
+ HardLinkId: oldFile.entry.HardLinkId,
+ HardLinkCounter: oldFile.entry.HardLinkCounter,
+ },
+ Signatures: []int32{dir.wfs.signature},
+ }
+
+ // apply changes to the filer, and also apply to local metaCache
+ err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
+ defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
+
+ if err := filer_pb.UpdateEntry(client, updateOldEntryRequest); err != nil {
+ glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err)
+ return fuse.EIO
+ }
+ dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(updateOldEntryRequest.Directory, updateOldEntryRequest.Entry))
+
+ if err := filer_pb.CreateEntry(client, request); err != nil {
+ glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err)
+ return fuse.EIO
+ }
+ dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
+
+ return nil
+ })
+
+ // create new file node
+ newNode := dir.newFile(req.NewName, request.Entry)
+ newFile := newNode.(*File)
+ if err := newFile.maybeLoadEntry(ctx); err != nil {
+ return nil, err
+ }
+
+ return newFile, err
+
+}
+
func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, error) {
- glog.V(3).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target)
+ glog.V(4).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target)
request := &filer_pb.CreateEntryRequest{
Directory: dir.FullPath(),
@@ -34,17 +112,20 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
SymlinkTarget: req.Target,
},
},
+ Signatures: []int32{dir.wfs.signature},
}
err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
+ defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
+
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.V(0).Infof("symlink %s/%s: %v", dir.FullPath(), req.NewName, err)
return fuse.EIO
}
- if dir.wfs.option.AsyncMetaDataCaching {
- dir.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
- }
+ dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
})
@@ -65,7 +146,7 @@ func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (stri
return "", fuse.Errno(syscall.EINVAL)
}
- glog.V(3).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, file.entry.Attributes.SymlinkTarget)
+ glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, file.entry.Attributes.SymlinkTarget)
return file.entry.Attributes.SymlinkTarget, nil
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go
index ea40f5c31..3f73d0eb6 100644
--- a/weed/filesys/dir_rename.go
+++ b/weed/filesys/dir_rename.go
@@ -3,11 +3,12 @@ package filesys
import (
"context"
+ "github.com/seaweedfs/fuse"
+ "github.com/seaweedfs/fuse/fs"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
)
func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error {
@@ -19,7 +20,17 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)
- err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ // find local old entry
+ oldEntry, err := dir.wfs.metaCache.FindEntry(context.Background(), oldPath)
+ if err != nil {
+ glog.Errorf("dir Rename can not find source %s : %v", oldPath, err)
+ return fuse.ENOENT
+ }
+
+ // update remote filer
+ err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
request := &filer_pb.AtomicRenameEntryRequest{
OldDirectory: dir.FullPath(),
@@ -28,24 +39,44 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
NewName: req.NewName,
}
- _, err := client.AtomicRenameEntry(context.Background(), request)
+ _, err := client.AtomicRenameEntry(ctx, request)
if err != nil {
- glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err)
+ glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err)
return fuse.EIO
}
return nil
})
+ if err != nil {
+ glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err)
+ return fuse.EIO
+ }
- if err == nil {
- dir.wfs.cacheDelete(newPath)
- dir.wfs.cacheDelete(oldPath)
+ // TODO: replicate renaming logic on filer
+ if err := dir.wfs.metaCache.DeleteEntry(context.Background(), oldPath); err != nil {
+ glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err)
+ return fuse.EIO
+ }
+ oldEntry.FullPath = newPath
+ if err := dir.wfs.metaCache.InsertEntry(context.Background(), oldEntry); err != nil {
+ glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err)
+ return fuse.EIO
+ }
- // fmt.Printf("rename path: %v => %v\n", oldPath, newPath)
- dir.wfs.fsNodeCache.Move(oldPath, newPath)
+ // fmt.Printf("rename path: %v => %v\n", oldPath, newPath)
+ dir.wfs.fsNodeCache.Move(oldPath, newPath)
+ // change file handle
+ dir.wfs.handlesLock.Lock()
+ defer dir.wfs.handlesLock.Unlock()
+ inodeId := oldPath.AsInode()
+ existingHandle, found := dir.wfs.handles[inodeId]
+ if !found || existingHandle == nil {
+ return err
}
+ delete(dir.wfs.handles, inodeId)
+ dir.wfs.handles[newPath.AsInode()] = existingHandle
return err
}
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 45224b3e7..dd0c48796 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -2,193 +2,126 @@ package filesys
import (
"bytes"
- "context"
- "fmt"
"io"
+ "runtime"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ concurrentWriterLimit = runtime.NumCPU()
+ concurrentWriters = util.NewLimitedConcurrentExecutor(4 * concurrentWriterLimit)
)
type ContinuousDirtyPages struct {
- intervals *ContinuousIntervals
- f *File
- lock sync.Mutex
- collection string
- replication string
+ intervals *ContinuousIntervals
+ f *File
+ writeWaitGroup sync.WaitGroup
+ chunkSaveErrChan chan error
+ chunkSaveErrChanClosed bool
+ lastErr error
+ lock sync.Mutex
+ collection string
+ replication string
}
func newDirtyPages(file *File) *ContinuousDirtyPages {
- return &ContinuousDirtyPages{
- intervals: &ContinuousIntervals{},
- f: file,
+ dirtyPages := &ContinuousDirtyPages{
+ intervals: &ContinuousIntervals{},
+ f: file,
+ chunkSaveErrChan: make(chan error, concurrentWriterLimit),
}
+ go func() {
+ for t := range dirtyPages.chunkSaveErrChan {
+ if t != nil {
+ dirtyPages.lastErr = t
+ }
+ }
+ }()
+ return dirtyPages
}
-func (pages *ContinuousDirtyPages) releaseResource() {
-}
-
-var counter = int32(0)
-
-func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
+func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) {
- pages.lock.Lock()
- defer pages.lock.Unlock()
-
- glog.V(3).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data)))
+ glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize)
if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
// this is more than what buffer can hold.
- return pages.flushAndSave(offset, data)
+ pages.flushAndSave(offset, data)
}
pages.intervals.AddInterval(data, offset)
- var chunk *filer_pb.FileChunk
- var hasSavedData bool
-
- if pages.intervals.TotalSize() > pages.f.wfs.option.ChunkSizeLimit {
- chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage()
- if hasSavedData {
- chunks = append(chunks, chunk)
- }
+ if pages.intervals.TotalSize() >= pages.f.wfs.option.ChunkSizeLimit {
+ pages.saveExistingLargestPageToStorage()
}
return
}
-func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
-
- var chunk *filer_pb.FileChunk
- var newChunks []*filer_pb.FileChunk
+func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) {
// flush existing
- if newChunks, err = pages.saveExistingPagesToStorage(); err == nil {
- if newChunks != nil {
- chunks = append(chunks, newChunks...)
- }
- } else {
- return
- }
+ pages.saveExistingPagesToStorage()
// flush the new page
- if chunk, err = pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data))); err == nil {
- if chunk != nil {
- glog.V(4).Infof("%s/%s flush big request [%d,%d) to %s", pages.f.dir.FullPath(), pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.FileId)
- chunks = append(chunks, chunk)
- }
- } else {
- glog.V(0).Infof("%s/%s failed to flush2 [%d,%d): %v", pages.f.dir.FullPath(), pages.f.Name, chunk.Offset, chunk.Offset+int64(chunk.Size), err)
- return
- }
+ pages.saveToStorage(bytes.NewReader(data), offset, int64(len(data)))
return
}
-func (pages *ContinuousDirtyPages) FlushToStorage() (chunks []*filer_pb.FileChunk, err error) {
-
- pages.lock.Lock()
- defer pages.lock.Unlock()
-
- return pages.saveExistingPagesToStorage()
-}
-
-func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer_pb.FileChunk, err error) {
-
- var hasSavedData bool
- var chunk *filer_pb.FileChunk
-
- for {
-
- chunk, hasSavedData, err = pages.saveExistingLargestPageToStorage()
- if !hasSavedData {
- return chunks, err
- }
-
- if err == nil {
- chunks = append(chunks, chunk)
- } else {
- return
- }
+func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() {
+ for pages.saveExistingLargestPageToStorage() {
}
-
}
-func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *filer_pb.FileChunk, hasSavedData bool, err error) {
+func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedData bool) {
maxList := pages.intervals.RemoveLargestIntervalLinkedList()
if maxList == nil {
- return nil, false, nil
+ return false
}
- for {
- chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size())
- if err == nil {
- hasSavedData = true
- glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId)
- return
- } else {
- glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err)
- time.Sleep(5 * time.Second)
- }
- }
+ fileSize := int64(pages.f.entry.Attributes.FileSize)
-}
+ chunkSize := min(maxList.Size(), fileSize-maxList.Offset())
+ if chunkSize == 0 {
+ return false
+ }
-func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) {
+ pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize)
- var fileId, host string
- var auth security.EncodedJwt
+ return true
+}
- dir, _ := pages.f.fullpath().DirAndName()
+func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
- if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if pages.chunkSaveErrChanClosed {
+ pages.chunkSaveErrChan = make(chan error, concurrentWriterLimit)
+ pages.chunkSaveErrChanClosed = false
+ }
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: pages.f.wfs.option.Replication,
- Collection: pages.f.wfs.option.Collection,
- TtlSec: pages.f.wfs.option.TtlSec,
- DataCenter: pages.f.wfs.option.DataCenter,
- ParentPath: dir,
- }
+ mtime := time.Now().UnixNano()
+ pages.writeWaitGroup.Add(1)
+ go func() {
+ defer pages.writeWaitGroup.Done()
- resp, err := client.AssignVolume(context.Background(), request)
+ reader = io.LimitReader(reader, size)
+ chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
+ pages.chunkSaveErrChan <- err
+ return
}
-
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
- host = pages.f.wfs.AdjustedUrl(host)
- pages.collection, pages.replication = resp.Collection, resp.Replication
-
- return nil
- }); err != nil {
- return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
- }
-
- fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth)
- if err != nil {
- glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
- return nil, fmt.Errorf("upload data: %v", err)
- }
- if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
- return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
- }
- pages.f.wfs.chunkCache.SetChunk(fileId, data)
-
- return uploadResult.ToPbFileChunk(fileId, offset), nil
-
+ chunk.Mtime = mtime
+ pages.collection, pages.replication = collection, replication
+ pages.f.addChunks([]*filer_pb.FileChunk{chunk})
+ glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size)
+ }()
}
func max(x, y int64) int64 {
@@ -204,11 +137,6 @@ func min(x, y int64) int64 {
return y
}
-func (pages *ContinuousDirtyPages) ReadDirtyData(data []byte, startOffset int64) (offset int64, size int) {
-
- pages.lock.Lock()
- defer pages.lock.Unlock()
-
- return pages.intervals.ReadData(data, startOffset)
-
+func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
+ return pages.intervals.ReadDataAt(data, startOffset)
}
diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go
index ec94c6df1..1404bf78c 100644
--- a/weed/filesys/dirty_page_interval.go
+++ b/weed/filesys/dirty_page_interval.go
@@ -3,7 +3,8 @@ package filesys
import (
"bytes"
"io"
- "math"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type IntervalNode struct {
@@ -91,6 +92,15 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) {
interval := &IntervalNode{Data: data, Offset: offset, Size: int64(len(data))}
+ // append to the tail and return
+ if len(c.lists) == 1 {
+ lastSpan := c.lists[0]
+ if lastSpan.Tail.Offset+lastSpan.Tail.Size == offset {
+ lastSpan.addNodeToTail(interval)
+ return
+ }
+ }
+
var newLists []*IntervalLinkedList
for _, list := range c.lists {
// if list is to the left of new interval, add to the new list
@@ -186,35 +196,28 @@ func (c *ContinuousIntervals) removeList(target *IntervalLinkedList) {
}
-func (c *ContinuousIntervals) ReadData(data []byte, startOffset int64) (offset int64, size int) {
- var minOffset int64 = math.MaxInt64
- var maxStop int64
+func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) {
for _, list := range c.lists {
start := max(startOffset, list.Offset())
stop := min(startOffset+int64(len(data)), list.Offset()+list.Size())
- if start <= stop {
+ if start < stop {
list.ReadData(data[start-startOffset:], start, stop)
- minOffset = min(minOffset, start)
maxStop = max(maxStop, stop)
}
}
-
- if minOffset == math.MaxInt64 {
- return 0, 0
- }
-
- offset = minOffset
- size = int(maxStop - offset)
return
}
func (l *IntervalLinkedList) ToReader() io.Reader {
var readers []io.Reader
t := l.Head
- readers = append(readers, bytes.NewReader(t.Data))
+ readers = append(readers, util.NewBytesReader(t.Data))
for t.Next != nil {
t = t.Next
readers = append(readers, bytes.NewReader(t.Data))
}
+ if len(readers) == 1 {
+ return readers[0]
+ }
return io.MultiReader(readers...)
}
diff --git a/weed/filesys/dirty_page_interval_test.go b/weed/filesys/dirty_page_interval_test.go
index ab3b37b7c..d02ad27fd 100644
--- a/weed/filesys/dirty_page_interval_test.go
+++ b/weed/filesys/dirty_page_interval_test.go
@@ -2,6 +2,7 @@ package filesys
import (
"bytes"
+ "math/rand"
"testing"
)
@@ -66,6 +67,29 @@ func TestContinuousIntervals_RealCase1(t *testing.T) {
}
+func TestRandomWrites(t *testing.T) {
+
+ c := &ContinuousIntervals{}
+
+ data := make([]byte, 1024)
+
+ for i := 0; i < 1024; i++ {
+
+ start, stop := rand.Intn(len(data)), rand.Intn(len(data))
+ if start > stop {
+ start, stop = stop, start
+ }
+
+ rand.Read(data[start : stop+1])
+
+ c.AddInterval(data[start:stop+1], int64(start))
+
+ expectedData(t, c, 0, data...)
+
+ }
+
+}
+
func expectedData(t *testing.T, c *ContinuousIntervals, offset int, data ...byte) {
start, stop := int64(offset), int64(offset+len(data))
for _, list := range c.lists {
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index bafbd7cc8..7aa1016d7 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -7,12 +7,13 @@ import (
"sort"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/seaweedfs/fuse"
+ "github.com/seaweedfs/fuse/fs"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
)
const blockSize = 512
@@ -32,9 +33,10 @@ type File struct {
dir *Dir
wfs *WFS
entry *filer_pb.Entry
- entryViewCache []filer2.VisibleInterval
+ entryViewCache []filer.VisibleInterval
isOpen int
reader io.ReaderAt
+ dirtyMetadata bool
}
func (file *File) fullpath() util.FullPath {
@@ -54,7 +56,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
attr.Inode = file.fullpath().AsInode()
attr.Valid = time.Second
attr.Mode = os.FileMode(file.entry.Attributes.FileMode)
- attr.Size = filer2.TotalSize(file.entry.Chunks)
+ attr.Size = filer.FileSize(file.entry)
if file.isOpen > 0 {
attr.Size = file.entry.Attributes.FileSize
glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size)
@@ -65,6 +67,9 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
attr.Uid = file.entry.Attributes.Uid
attr.Blocks = attr.Size/blockSize + 1
attr.BlockSize = uint32(file.wfs.option.ChunkSizeLimit)
+ if file.entry.HardLinkCounter > 0 {
+ attr.Nlink = uint32(file.entry.HardLinkCounter)
+ }
return nil
@@ -85,13 +90,11 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op
glog.V(4).Infof("file %v open %+v", file.fullpath(), req)
- file.isOpen++
-
handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid)
resp.Handle = fuse.HandleID(handle.handle)
- glog.V(3).Infof("%v file open handle id = %d", file.fullpath(), handle.handle)
+ glog.V(4).Infof("%v file open handle id = %d", file.fullpath(), handle.handle)
return handle, nil
@@ -99,58 +102,89 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op
func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
- glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.entry.Attributes)
+ glog.V(4).Infof("%v file setattr %+v", file.fullpath(), req)
if err := file.maybeLoadEntry(ctx); err != nil {
return err
}
+ if file.isOpen > 0 {
+ file.wfs.handlesLock.Lock()
+ fileHandle := file.wfs.handles[file.fullpath().AsInode()]
+ file.wfs.handlesLock.Unlock()
+
+ if fileHandle != nil {
+ fileHandle.Lock()
+ defer fileHandle.Unlock()
+ }
+ }
if req.Valid.Size() {
- glog.V(3).Infof("%v file setattr set size=%v", file.fullpath(), req.Size)
- if req.Size < filer2.TotalSize(file.entry.Chunks) {
+ glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(file.entry.Chunks))
+ if req.Size < filer.FileSize(file.entry) {
// fmt.Printf("truncate %v \n", fullPath)
var chunks []*filer_pb.FileChunk
+ var truncatedChunks []*filer_pb.FileChunk
for _, chunk := range file.entry.Chunks {
int64Size := int64(chunk.Size)
if chunk.Offset+int64Size > int64(req.Size) {
+ // this chunk is truncated
int64Size = int64(req.Size) - chunk.Offset
- }
- if int64Size > 0 {
- chunks = append(chunks, chunk)
+ if int64Size > 0 {
+ chunks = append(chunks, chunk)
+ glog.V(4).Infof("truncated chunk %+v from %d to %d\n", chunk.GetFileIdString(), chunk.Size, int64Size)
+ chunk.Size = uint64(int64Size)
+ } else {
+ glog.V(4).Infof("truncated whole chunk %+v\n", chunk.GetFileIdString())
+ truncatedChunks = append(truncatedChunks, chunk)
+ }
}
}
file.entry.Chunks = chunks
file.entryViewCache = nil
file.reader = nil
+ file.wfs.deleteFileChunks(truncatedChunks)
}
file.entry.Attributes.FileSize = req.Size
+ file.dirtyMetadata = true
}
+
if req.Valid.Mode() {
file.entry.Attributes.FileMode = uint32(req.Mode)
+ file.dirtyMetadata = true
}
if req.Valid.Uid() {
file.entry.Attributes.Uid = req.Uid
+ file.dirtyMetadata = true
}
if req.Valid.Gid() {
file.entry.Attributes.Gid = req.Gid
+ file.dirtyMetadata = true
}
if req.Valid.Crtime() {
file.entry.Attributes.Crtime = req.Crtime.Unix()
+ file.dirtyMetadata = true
}
if req.Valid.Mtime() {
file.entry.Attributes.Mtime = req.Mtime.Unix()
+ file.dirtyMetadata = true
+ }
+
+ if req.Valid.Handle() {
+ // fmt.Printf("file handle => %d\n", req.Handle)
}
if file.isOpen > 0 {
return nil
}
- file.wfs.cacheDelete(file.fullpath())
+ if !file.dirtyMetadata {
+ return nil
+ }
return file.saveEntry()
@@ -168,8 +202,6 @@ func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error
return err
}
- file.wfs.cacheDelete(file.fullpath())
-
return file.saveEntry()
}
@@ -186,8 +218,6 @@ func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest)
return err
}
- file.wfs.cacheDelete(file.fullpath())
-
return file.saveEntry()
}
@@ -211,27 +241,28 @@ func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, res
func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
// fsync works at OS level
// write the file chunks to the filerGrpcAddress
- glog.V(3).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req)
+ glog.V(4).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req)
return nil
}
func (file *File) Forget() {
t := util.NewFullPath(file.dir.FullPath(), file.Name)
- glog.V(3).Infof("Forget file %s", t)
+ glog.V(4).Infof("Forget file %s", t)
file.wfs.fsNodeCache.DeleteFsNode(t)
}
func (file *File) maybeLoadEntry(ctx context.Context) error {
- if file.entry == nil || file.isOpen <= 0 {
- entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name)
- if err != nil {
- glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
- return err
- }
- if entry != nil {
- file.setEntry(entry)
- }
+ if (file.entry != nil && len(file.entry.HardLinkId) != 0) || file.isOpen > 0 {
+ return nil
+ }
+ entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name)
+ if err != nil {
+ glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
+ return err
+ }
+ if entry != nil {
+ file.setEntry(entry)
}
return nil
}
@@ -239,48 +270,49 @@ func (file *File) maybeLoadEntry(ctx context.Context) error {
func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
sort.Slice(chunks, func(i, j int) bool {
+ if chunks[i].Mtime == chunks[j].Mtime {
+ return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey
+ }
return chunks[i].Mtime < chunks[j].Mtime
})
- var newVisibles []filer2.VisibleInterval
for _, chunk := range chunks {
- newVisibles = filer2.MergeIntoVisibles(file.entryViewCache, newVisibles, chunk)
- t := file.entryViewCache[:0]
- file.entryViewCache = newVisibles
- newVisibles = t
+ file.entryViewCache = filer.MergeIntoVisibles(file.entryViewCache, chunk)
}
file.reader = nil
- glog.V(3).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
+ glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
file.entry.Chunks = append(file.entry.Chunks, chunks...)
}
func (file *File) setEntry(entry *filer_pb.Entry) {
file.entry = entry
- file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks)
+ file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), file.entry.Chunks)
file.reader = nil
}
func (file *File) saveEntry() error {
return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ file.wfs.mapPbIdFromLocalToFiler(file.entry)
+ defer file.wfs.mapPbIdFromFilerToLocal(file.entry)
+
request := &filer_pb.UpdateEntryRequest{
- Directory: file.dir.FullPath(),
- Entry: file.entry,
+ Directory: file.dir.FullPath(),
+ Entry: file.entry,
+ Signatures: []int32{file.wfs.signature},
}
- glog.V(1).Infof("save file entry: %v", request)
+ glog.V(4).Infof("save file entry: %v", request)
_, err := client.UpdateEntry(context.Background(), request)
if err != nil {
- glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
+ glog.Errorf("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
return fuse.EIO
}
- if file.wfs.option.AsyncMetaDataCaching {
- file.wfs.metaCache.UpdateEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
- }
+ file.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
})
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index c6637259d..54bde3494 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -6,21 +6,24 @@ import (
"io"
"math"
"net/http"
+ "os"
+ "sync"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
type FileHandle struct {
// cache file has been written to
- dirtyPages *ContinuousDirtyPages
- contentType string
- dirtyMetadata bool
- handle uint64
+ dirtyPages *ContinuousDirtyPages
+ contentType string
+ handle uint64
+ sync.RWMutex
f *File
RequestId fuse.RequestID // unique ID for request
@@ -38,8 +41,9 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle {
Gid: gid,
}
if fh.f.entry != nil {
- fh.f.entry.Attributes.FileSize = filer2.TotalSize(fh.f.entry.Chunks)
+ fh.f.entry.Attributes.FileSize = filer.FileSize(fh.f.entry)
}
+
return fh
}
@@ -53,61 +57,80 @@ var _ = fs.HandleReleaser(&FileHandle{})
func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
- glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size))
+ glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data))
+ fh.RLock()
+ defer fh.RUnlock()
+
+ if req.Size <= 0 {
+ return nil
+ }
- buff := make([]byte, req.Size)
+ buff := resp.Data[:cap(resp.Data)]
+ if req.Size > cap(resp.Data) {
+ // should not happen
+ buff = make([]byte, req.Size)
+ }
totalRead, err := fh.readFromChunks(buff, req.Offset)
if err == nil {
- dirtyOffset, dirtySize := fh.readFromDirtyPages(buff, req.Offset)
- if totalRead+req.Offset < dirtyOffset+int64(dirtySize) {
- totalRead = dirtyOffset + int64(dirtySize) - req.Offset
- }
+ maxStop := fh.readFromDirtyPages(buff, req.Offset)
+ totalRead = max(maxStop-req.Offset, totalRead)
}
- resp.Data = buff[:totalRead]
+ if err == io.EOF {
+ err = nil
+ }
if err != nil {
- glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
+ glog.Warningf("file handle read %s %d: %v", fh.f.fullpath(), totalRead, err)
return fuse.EIO
}
+ if totalRead > int64(len(buff)) {
+ glog.Warningf("%s FileHandle Read %d: [%d,%d) size %d totalRead %d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, totalRead)
+ totalRead = min(int64(len(buff)), totalRead)
+ }
+ // resp.Data = buff[:totalRead]
+ resp.Data = buff
+
return err
}
-func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (offset int64, size int) {
- return fh.dirtyPages.ReadDirtyData(buff, startOffset)
+func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) {
+ maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset)
+ return
}
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
- // this value should come from the filer instead of the old f
- if len(fh.f.entry.Chunks) == 0 {
+ fileSize := int64(filer.FileSize(fh.f.entry))
+
+ if fileSize == 0 {
glog.V(1).Infof("empty fh %v", fh.f.fullpath())
- return 0, nil
+ return 0, io.EOF
}
+ var chunkResolveErr error
if fh.f.entryViewCache == nil {
- fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks)
+ fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(filer.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
+ if chunkResolveErr != nil {
+ return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
+ }
fh.f.reader = nil
}
if fh.f.reader == nil {
- chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt32)
- fh.f.reader = filer2.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache)
+ chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64)
+ fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache, fileSize)
}
totalRead, err := fh.f.reader.ReadAt(buff, offset)
- if err == io.EOF {
- err = nil
- }
-
- if err != nil {
+ if err != nil && err != io.EOF {
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
}
- // glog.V(0).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err)
+ glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err)
return int64(totalRead), err
}
@@ -115,119 +138,147 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
// Write to the file handle
func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
+ fh.Lock()
+ defer fh.Unlock()
+
// write the request to volume servers
- data := make([]byte, len(req.Data))
- copy(data, req.Data)
+ data := req.Data
+ if len(data) <= 512 {
+ // fuse message cacheable size
+ data = make([]byte, len(req.Data))
+ copy(data, req.Data)
+ }
fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize)))
- // glog.V(0).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)))
+ glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
- chunks, err := fh.dirtyPages.AddPage(req.Offset, data)
- if err != nil {
- glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(data)), err)
- return fuse.EIO
- }
+ fh.dirtyPages.AddPage(req.Offset, data)
resp.Size = len(data)
if req.Offset == 0 {
// detect mime type
fh.contentType = http.DetectContentType(data)
- fh.dirtyMetadata = true
+ fh.f.dirtyMetadata = true
}
- if len(chunks) > 0 {
-
- fh.f.addChunks(chunks)
-
- fh.dirtyMetadata = true
- }
+ fh.f.dirtyMetadata = true
return nil
}
func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
- glog.V(4).Infof("%v release fh %d", fh.f.fullpath(), fh.handle)
+ glog.V(4).Infof("Release %v fh %d", fh.f.fullpath(), fh.handle)
+
+ fh.Lock()
+ defer fh.Unlock()
fh.f.isOpen--
- if fh.f.isOpen <= 0 {
- fh.dirtyPages.releaseResource()
+ if fh.f.isOpen < 0 {
+ glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0)
+ fh.f.isOpen = 0
+ return nil
+ }
+
+ if fh.f.isOpen == 0 {
+ if err := fh.doFlush(ctx, req.Header); err != nil {
+ glog.Errorf("Release doFlush %s: %v", fh.f.Name, err)
+ }
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
}
- fh.f.entryViewCache = nil
- fh.f.reader = nil
+
+ // stop the goroutine
+ if !fh.dirtyPages.chunkSaveErrChanClosed {
+ fh.dirtyPages.chunkSaveErrChanClosed = true
+ close(fh.dirtyPages.chunkSaveErrChan)
+ }
return nil
}
func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
- // fflush works at fh level
+
+ fh.Lock()
+ defer fh.Unlock()
+
+ return fh.doFlush(ctx, req.Header)
+}
+
+func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
+ // flush works at fh level
// send the data to the OS
- glog.V(4).Infof("%s fh %d flush %v", fh.f.fullpath(), fh.handle, req)
+ glog.V(4).Infof("doFlush %s fh %d", fh.f.fullpath(), fh.handle)
- chunks, err := fh.dirtyPages.FlushToStorage()
- if err != nil {
- glog.Errorf("flush %s: %v", fh.f.fullpath(), err)
- return fuse.EIO
- }
+ fh.dirtyPages.saveExistingPagesToStorage()
+
+ fh.dirtyPages.writeWaitGroup.Wait()
- if len(chunks) > 0 {
- fh.f.addChunks(chunks)
- fh.dirtyMetadata = true
+ if fh.dirtyPages.lastErr != nil {
+ return fh.dirtyPages.lastErr
}
- if !fh.dirtyMetadata {
+ if !fh.f.dirtyMetadata {
return nil
}
- err = fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
if fh.f.entry.Attributes != nil {
fh.f.entry.Attributes.Mime = fh.contentType
- fh.f.entry.Attributes.Uid = req.Uid
- fh.f.entry.Attributes.Gid = req.Gid
+ if fh.f.entry.Attributes.Uid == 0 {
+ fh.f.entry.Attributes.Uid = header.Uid
+ }
+ if fh.f.entry.Attributes.Gid == 0 {
+ fh.f.entry.Attributes.Gid = header.Gid
+ }
+ if fh.f.entry.Attributes.Crtime == 0 {
+ fh.f.entry.Attributes.Crtime = time.Now().Unix()
+ }
fh.f.entry.Attributes.Mtime = time.Now().Unix()
- fh.f.entry.Attributes.Crtime = time.Now().Unix()
- fh.f.entry.Attributes.FileMode = uint32(0666 &^ fh.f.wfs.option.Umask)
+ fh.f.entry.Attributes.FileMode = uint32(os.FileMode(fh.f.entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
fh.f.entry.Attributes.Collection = fh.dirtyPages.collection
fh.f.entry.Attributes.Replication = fh.dirtyPages.replication
}
request := &filer_pb.CreateEntryRequest{
- Directory: fh.f.dir.FullPath(),
- Entry: fh.f.entry,
+ Directory: fh.f.dir.FullPath(),
+ Entry: fh.f.entry,
+ Signatures: []int32{fh.f.wfs.signature},
}
- glog.V(3).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks))
+ glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks))
for i, chunk := range fh.f.entry.Chunks {
- glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
}
- chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks)
- fh.f.entry.Chunks = chunks
- // fh.f.entryViewCache = nil
+ manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
+
+ chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks)
+ chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
+ if manifestErr != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
+ }
+ fh.f.entry.Chunks = append(chunks, manifestChunks...)
+ fh.f.entryViewCache = nil
+
+ fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry)
+ defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry)
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
}
- if fh.f.wfs.option.AsyncMetaDataCaching {
- fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
- }
-
- fh.f.wfs.deleteFileChunks(garbages)
- for i, chunk := range garbages {
- glog.V(3).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
- }
+ fh.f.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
})
if err == nil {
- fh.dirtyMetadata = false
+ fh.f.dirtyMetadata = false
}
if err != nil {
diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go
index b146f0615..fdec8253c 100644
--- a/weed/filesys/fscache.go
+++ b/weed/filesys/fscache.go
@@ -3,8 +3,9 @@ package filesys
import (
"sync"
- "github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse/fs"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type FsCache struct {
@@ -118,7 +119,6 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
target = target.ensureChild(p)
}
parent := target.parent
- src.name = target.name
if dir, ok := src.node.(*Dir); ok {
dir.name = target.name // target is not Dir, but a shortcut
}
@@ -132,6 +132,7 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
target.deleteSelf()
+ src.name = target.name
src.connectToParent(parent)
return src
@@ -144,10 +145,14 @@ func (n *FsNode) connectToParent(parent *FsNode) {
oldNode.deleteSelf()
}
if dir, ok := n.node.(*Dir); ok {
- dir.parent = parent.node.(*Dir)
+ if parent.node != nil {
+ dir.parent = parent.node.(*Dir)
+ }
}
if f, ok := n.node.(*File); ok {
- f.dir = parent.node.(*Dir)
+ if parent.node != nil {
+ f.dir = parent.node.(*Dir)
+ }
}
n.childrenLock.Lock()
parent.children[n.name] = n
diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go
index 67f9aacc8..1152eb32e 100644
--- a/weed/filesys/fscache_test.go
+++ b/weed/filesys/fscache_test.go
@@ -94,3 +94,22 @@ func TestFsCacheMove(t *testing.T) {
}
}
+
+func TestFsCacheMove2(t *testing.T) {
+
+ cache := newFsCache(nil)
+
+ cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
+ cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
+
+ cache.Move(util.FullPath("/a/b/d"), util.FullPath("/a/b/e"))
+
+ d := cache.GetFsNode(util.FullPath("/a/b/e"))
+ if d == nil {
+ t.Errorf("unexpected nil node!")
+ }
+ if d.(*File).Name != "e" {
+ t.Errorf("unexpected node!")
+ }
+
+}
diff --git a/weed/filesys/meta_cache/id_mapper.go b/weed/filesys/meta_cache/id_mapper.go
new file mode 100644
index 000000000..4a2179f31
--- /dev/null
+++ b/weed/filesys/meta_cache/id_mapper.go
@@ -0,0 +1,101 @@
+package meta_cache
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+)
+
+type UidGidMapper struct {
+ uidMapper *IdMapper
+ gidMapper *IdMapper
+}
+
+type IdMapper struct {
+ localToFiler map[uint32]uint32
+ filerToLocal map[uint32]uint32
+}
+
+// UidGidMapper translates local uid/gid to filer uid/gid
+// The local storage always persists the same as the filer.
+// The local->filer translation happens when updating the filer first and later saving to meta_cache.
+// And filer->local happens when reading from the meta_cache.
+func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) {
+ uidMapper, err := newIdMapper(uidPairsStr)
+ if err != nil {
+ return nil, err
+ }
+ gidMapper, err := newIdMapper(gidPairStr)
+ if err != nil {
+ return nil, err
+ }
+
+ return &UidGidMapper{
+ uidMapper: uidMapper,
+ gidMapper: gidMapper,
+ }, nil
+}
+
+func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32, uint32) {
+ return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid)
+}
+func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32, uint32) {
+ return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid)
+}
+
+func (m *IdMapper) LocalToFiler(id uint32) uint32 {
+ value, found := m.localToFiler[id]
+ if found {
+ return value
+ }
+ return id
+}
+func (m *IdMapper) FilerToLocal(id uint32) uint32 {
+ value, found := m.filerToLocal[id]
+ if found {
+ return value
+ }
+ return id
+}
+
+func newIdMapper(pairsStr string) (*IdMapper, error) {
+
+ localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr)
+ if err != nil {
+ return nil, err
+ }
+
+ return &IdMapper{
+ localToFiler: localToFiler,
+ filerToLocal: filerToLocal,
+ }, nil
+
+}
+
+func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) {
+
+ if pairsStr == "" {
+ return
+ }
+
+ localToFiler = make(map[uint32]uint32)
+ filerToLocal = make(map[uint32]uint32)
+ for _, pairStr := range strings.Split(pairsStr, ",") {
+ pair := strings.Split(pairStr, ":")
+ localUidStr, filerUidStr := pair[0], pair[1]
+ localUid, localUidErr := strconv.Atoi(localUidStr)
+ if localUidErr != nil {
+ err = fmt.Errorf("failed to parse local %s: %v", localUidStr, localUidErr)
+ return
+ }
+ filerUid, filerUidErr := strconv.Atoi(filerUidStr)
+ if filerUidErr != nil {
+ err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr)
+ return
+ }
+ localToFiler[uint32(localUid)] = uint32(filerUid)
+ filerToLocal[uint32(filerUid)] = uint32(localUid)
+ }
+
+ return
+}
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index 4c9090d42..4b282253d 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -2,27 +2,38 @@ package meta_cache
import (
"context"
+ "fmt"
"os"
"sync"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/bounded_tree"
)
+// need to have logic similar to FilerStoreWrapper
+// e.g. fill fileId field for chunks
+
type MetaCache struct {
- actualStore filer2.FilerStore
+ localStore filer.VirtualFilerStore
sync.RWMutex
+ visitedBoundary *bounded_tree.BoundedTree
+ uidGidMapper *UidGidMapper
+ invalidateFunc func(util.FullPath)
}
-func NewMetaCache(dbFolder string) *MetaCache {
+func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper, invalidateFunc func(util.FullPath)) *MetaCache {
return &MetaCache{
- actualStore: openMetaStore(dbFolder),
+ localStore: openMetaStore(dbFolder),
+ visitedBoundary: bounded_tree.NewBoundedTree(baseDir),
+ uidGidMapper: uidGidMapper,
+ invalidateFunc: invalidateFunc,
}
}
-func openMetaStore(dbFolder string) filer2.FilerStore {
+func openMetaStore(dbFolder string) filer.VirtualFilerStore {
os.RemoveAll(dbFolder)
os.MkdirAll(dbFolder, 0755)
@@ -36,58 +47,100 @@ func openMetaStore(dbFolder string) filer2.FilerStore {
glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
}
- return store
+ return filer.NewFilerStoreWrapper(store)
}
-func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error {
+func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
mc.Lock()
defer mc.Unlock()
- return mc.actualStore.InsertEntry(ctx, entry)
+ return mc.doInsertEntry(ctx, entry)
+}
+
+func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) error {
+ return mc.localStore.InsertEntry(ctx, entry)
}
-func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPath, newEntry *filer2.Entry) error {
+func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
mc.Lock()
defer mc.Unlock()
- if oldPath != "" {
- if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil {
- return err
+
+ oldDir, _ := oldPath.DirAndName()
+ if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) {
+ if oldPath != "" {
+ if newEntry != nil && oldPath == newEntry.FullPath {
+ // skip the unnecessary deletion
+ // leave the update to the following InsertEntry operation
+ } else {
+ glog.V(3).Infof("DeleteEntry %s/%s", oldPath, oldPath.Name())
+ if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
+ return err
+ }
+ }
}
+ } else {
+ // println("unknown old directory:", oldDir)
}
+
if newEntry != nil {
- if err := mc.actualStore.InsertEntry(ctx, newEntry); err != nil {
- return err
+ newDir, _ := newEntry.DirAndName()
+ if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) {
+ glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name())
+ if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
+ return err
+ }
}
}
return nil
}
-func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer2.Entry) error {
+func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
mc.Lock()
defer mc.Unlock()
- return mc.actualStore.UpdateEntry(ctx, entry)
+ return mc.localStore.UpdateEntry(ctx, entry)
}
-func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer2.Entry, err error) {
+func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
mc.RLock()
defer mc.RUnlock()
- return mc.actualStore.FindEntry(ctx, fp)
+ entry, err = mc.localStore.FindEntry(ctx, fp)
+ if err != nil {
+ return nil, err
+ }
+ mc.mapIdFromFilerToLocal(entry)
+ return
}
func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
mc.Lock()
defer mc.Unlock()
- return mc.actualStore.DeleteEntry(ctx, fp)
+ return mc.localStore.DeleteEntry(ctx, fp)
}
-func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer2.Entry, error) {
+func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer.Entry, error) {
mc.RLock()
defer mc.RUnlock()
- return mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+
+ if !mc.visitedBoundary.HasVisited(dirPath) {
+ return nil, fmt.Errorf("unsynchronized dir: %v", dirPath)
+ }
+
+ entries, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+ if err != nil {
+ return nil, err
+ }
+ for _, entry := range entries {
+ mc.mapIdFromFilerToLocal(entry)
+ }
+ return entries, err
}
func (mc *MetaCache) Shutdown() {
mc.Lock()
defer mc.Unlock()
- mc.actualStore.Shutdown()
+ mc.localStore.Shutdown()
+}
+
+func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) {
+ entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid)
}
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
index 58bf6862e..f42d61230 100644
--- a/weed/filesys/meta_cache/meta_cache_init.go
+++ b/weed/filesys/meta_cache/meta_cache_init.go
@@ -2,20 +2,45 @@ package meta_cache
import (
"context"
+ "fmt"
+ "strings"
+ "time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
-func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient, path string) error {
- glog.V(0).Infof("synchronizing meta data ...")
- filer_pb.TraverseBfs(client, util.FullPath(path), func(parentPath util.FullPath, pbEntry *filer_pb.Entry) {
- entry := filer2.FromPbEntry(string(parentPath), pbEntry)
- if err := mc.InsertEntry(context.Background(), entry); err != nil {
- glog.V(0).Infof("read %s: %v", entry.FullPath, err)
+func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
+
+ return mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {
+
+ glog.V(4).Infof("ReadDirAllEntries %s ...", path)
+
+ for waitTime := time.Second; waitTime < filer.ReadWaitTime; waitTime += waitTime / 2 {
+ err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
+ entry := filer.FromPbEntry(string(dirPath), pbEntry)
+ if err := mc.doInsertEntry(context.Background(), entry); err != nil {
+ glog.V(0).Infof("read %s: %v", entry.FullPath, err)
+ return err
+ }
+ if entry.IsDirectory() {
+ childDirectories = append(childDirectories, entry.Name())
+ }
+ return nil
+ })
+ if err == nil {
+ break
+ }
+ if strings.Contains(err.Error(), "transport: ") {
+ glog.V(0).Infof("ReadDirAllEntries %s: %v. Retry in %v", path, err, waitTime)
+ time.Sleep(waitTime)
+ continue
+ }
+ err = fmt.Errorf("list %s: %v", dirPath, err)
+ break
}
+ return
})
- return nil
}
diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go
index 2e411a48a..f9973f436 100644
--- a/weed/filesys/meta_cache/meta_cache_subscribe.go
+++ b/weed/filesys/meta_cache/meta_cache_subscribe.go
@@ -6,41 +6,58 @@ import (
"io"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
-func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
+func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
message := resp.EventNotification
+
+ for _, sig := range message.Signatures {
+ if sig == selfSignature && selfSignature != 0 {
+ return nil
+ }
+ }
+
+ dir := resp.Directory
var oldPath util.FullPath
- var newEntry *filer2.Entry
+ var newEntry *filer.Entry
if message.OldEntry != nil {
- oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name)
+ oldPath = util.NewFullPath(dir, message.OldEntry.Name)
glog.V(4).Infof("deleting %v", oldPath)
}
if message.NewEntry != nil {
- dir := resp.Directory
if message.NewParentPath != "" {
dir = message.NewParentPath
}
key := util.NewFullPath(dir, message.NewEntry.Name)
glog.V(4).Infof("creating %v", key)
- newEntry = filer2.FromPbEntry(dir, message.NewEntry)
+ newEntry = filer.FromPbEntry(dir, message.NewEntry)
}
- return mc.AtomicUpdateEntry(context.Background(), oldPath, newEntry)
+ err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
+ if err == nil && message.OldEntry != nil && message.NewEntry != nil {
+ key := util.NewFullPath(dir, message.NewEntry.Name)
+ mc.invalidateFunc(key)
+ }
+
+ return err
+
}
for {
err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
ClientName: "mount",
PathPrefix: dir,
SinceNs: lastTsNs,
+ Signature: selfSignature,
})
if err != nil {
return fmt.Errorf("subscribe: %v", err)
@@ -56,14 +73,14 @@ func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string,
}
if err := processEventFn(resp); err != nil {
- return fmt.Errorf("process %v: %v", resp, err)
+ glog.Fatalf("process %v: %v", resp, err)
}
lastTsNs = resp.TsNs
}
})
if err != nil {
- glog.V(0).Infof("subscribing filer meta change: %v", err)
- time.Sleep(time.Second)
+ glog.Errorf("subscribing filer meta change: %v", err)
}
+ time.Sleep(time.Second)
}
}
diff --git a/weed/filesys/unimplemented.go b/weed/filesys/unimplemented.go
new file mode 100644
index 000000000..5c2dcf0e1
--- /dev/null
+++ b/weed/filesys/unimplemented.go
@@ -0,0 +1,22 @@
+package filesys
+
+import (
+ "context"
+
+ "github.com/seaweedfs/fuse"
+ "github.com/seaweedfs/fuse/fs"
+)
+
+// https://github.com/bazil/fuse/issues/130
+
+var _ = fs.NodeAccesser(&Dir{})
+
+func (dir *Dir) Access(ctx context.Context, req *fuse.AccessRequest) error {
+ return fuse.ENOSYS
+}
+
+var _ = fs.NodeAccesser(&File{})
+
+func (file *File) Access(ctx context.Context, req *fuse.AccessRequest) error {
+ return fuse.ENOSYS
+}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 2b0ef64c2..759e21b15 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -6,22 +6,21 @@ import (
"math"
"os"
"path"
- "strings"
"sync"
"time"
- "github.com/chrislusf/seaweedfs/weed/util/grace"
- "github.com/karlseguin/ccache"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
+ "github.com/seaweedfs/fuse"
+ "github.com/seaweedfs/fuse/fs"
+
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
)
type Option struct {
@@ -35,7 +34,6 @@ type Option struct {
CacheDir string
CacheSizeMB int64
DataCenter string
- DirListCacheLimit int64
EntryCacheTtl time.Duration
Umask os.FileMode
@@ -47,16 +45,14 @@ type Option struct {
OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
Cipher bool // whether encrypt data on volume server
- AsyncMetaDataCaching bool // whether asynchronously cache meta data
-
+ UidGidMapper *meta_cache.UidGidMapper
}
var _ = fs.FS(&WFS{})
var _ = fs.FSStatfser(&WFS{})
type WFS struct {
- option *Option
- listDirectoryEntriesCache *ccache.Cache
+ option *Option
// contains all open handles, protected by handlesLock
handlesLock sync.Mutex
@@ -69,8 +65,9 @@ type WFS struct {
root fs.Node
fsNodeCache *FsCache
- chunkCache *chunk_cache.ChunkCache
+ chunkCache *chunk_cache.TieredChunkCache
metaCache *meta_cache.MetaCache
+ signature int32
}
type statsCache struct {
filer_pb.StatisticsResponse
@@ -79,36 +76,38 @@ type statsCache struct {
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
- option: option,
- listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
- handles: make(map[uint64]*FileHandle),
+ option: option,
+ handles: make(map[uint64]*FileHandle),
bufPool: sync.Pool{
New: func() interface{} {
return make([]byte, option.ChunkSizeLimit)
},
},
+ signature: util.RandomInt32(),
}
+ cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
+ cacheDir := path.Join(option.CacheDir, cacheUniqueId)
if option.CacheSizeMB > 0 {
- os.MkdirAll(option.CacheDir, 0755)
- wfs.chunkCache = chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB)
- grace.OnInterrupt(func() {
- wfs.chunkCache.Shutdown()
- })
- }
- if wfs.option.AsyncMetaDataCaching {
- wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta"))
- startTime := time.Now()
- if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil {
- glog.V(0).Infof("failed to init meta cache: %v", err)
- } else {
- go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
- grace.OnInterrupt(func() {
- wfs.metaCache.Shutdown()
- })
- }
+ os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
+ wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
}
- wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
+ wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
+ fsNode := wfs.fsNodeCache.GetFsNode(filePath)
+ if fsNode != nil {
+ if file, ok := fsNode.(*File); ok {
+ file.entry = nil
+ }
+ }
+ })
+ startTime := time.Now()
+ go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
+ grace.OnInterrupt(func() {
+ wfs.metaCache.Shutdown()
+ })
+
+ entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath))
+ wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
wfs.fsNodeCache = newFsCache(wfs.root)
return wfs
@@ -118,40 +117,29 @@ func (wfs *WFS) Root() (fs.Node, error) {
return wfs.root, nil
}
-var _ = filer_pb.FilerClient(&WFS{})
-
-func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
-
- err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
- }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
-
- if err == nil {
- return nil
- }
- return err
-
-}
-
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
fullpath := file.fullpath()
- glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid)
+ glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
wfs.handlesLock.Lock()
defer wfs.handlesLock.Unlock()
inodeId := file.fullpath().AsInode()
- existingHandle, found := wfs.handles[inodeId]
- if found && existingHandle != nil {
- return existingHandle
+ if file.isOpen > 0 {
+ existingHandle, found := wfs.handles[inodeId]
+ if found && existingHandle != nil {
+ file.isOpen++
+ return existingHandle
+ }
}
fileHandle = newFileHandle(file, uid, gid)
+ file.maybeLoadEntry(context.Background())
+ file.isOpen++
+
wfs.handles[inodeId] = fileHandle
fileHandle.handle = inodeId
- glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
return
}
@@ -229,33 +217,15 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
return nil
}
-func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry {
- item := wfs.listDirectoryEntriesCache.Get(string(path))
- if item != nil && !item.Expired() {
- return item.Value().(*filer_pb.Entry)
+func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
+ if entry.Attributes == nil {
+ return
}
- return nil
+ entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
}
-func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
- if entry == nil {
- wfs.listDirectoryEntriesCache.Delete(string(path))
- } else {
- wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl)
+func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
+ if entry.Attributes == nil {
+ return
}
-}
-func (wfs *WFS) cacheDelete(path util.FullPath) {
- wfs.listDirectoryEntriesCache.Delete(string(path))
-}
-
-func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
- if !wfs.option.OutsideContainerClusterMode {
- return hostAndPort
- }
- commaIndex := strings.Index(hostAndPort, ":")
- if commaIndex < 0 {
- return hostAndPort
- }
- filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":")
- return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:])
-
+ entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
}
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
index bf21b1808..a245b6795 100644
--- a/weed/filesys/wfs_deletion.go
+++ b/weed/filesys/wfs_deletion.go
@@ -5,7 +5,7 @@ import (
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -18,6 +18,17 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
var fileIds []string
for _, chunk := range chunks {
+ if !chunk.IsChunkManifest {
+ fileIds = append(fileIds, chunk.GetFileIdString())
+ continue
+ }
+ dataChunks, manifestResolveErr := filer.ResolveOneChunkManifest(filer.LookupFn(wfs), chunk)
+ if manifestResolveErr != nil {
+ glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
+ }
+ for _, dChunk := range dataChunks {
+ fileIds = append(fileIds, dChunk.GetFileIdString())
+ }
fileIds = append(fileIds, chunk.GetFileIdString())
}
@@ -31,14 +42,14 @@ func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.Se
var vids []string
for _, fileId := range fileIds {
- vids = append(vids, filer2.VolumeId(fileId))
+ vids = append(vids, filer.VolumeId(fileId))
}
lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
m := make(map[string]operation.LookupResult)
- glog.V(4).Infof("remove file lookup volume id locations: %v", vids)
+ glog.V(4).Infof("deleteFileIds lookup volume id locations: %v", vids)
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: vids,
})
@@ -57,7 +68,7 @@ func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.Se
}
for _, loc := range locations.Locations {
lr.Locations = append(lr.Locations, operation.Location{
- Url: wfs.AdjustedUrl(loc.Url),
+ Url: wfs.AdjustedUrl(loc),
PublicUrl: loc.PublicUrl,
})
}
diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go
new file mode 100644
index 000000000..096ee555f
--- /dev/null
+++ b/weed/filesys/wfs_filer_client.go
@@ -0,0 +1,31 @@
+package filesys
+
+import (
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+var _ = filer_pb.FilerClient(&WFS{})
+
+func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
+
+ if err == nil {
+ return nil
+ }
+ return err
+
+}
+
+func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
+ if wfs.option.OutsideContainerClusterMode {
+ return location.PublicUrl
+ }
+ return location.Url
+}
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
new file mode 100644
index 000000000..83e40e7f5
--- /dev/null
+++ b/weed/filesys/wfs_write.go
@@ -0,0 +1,71 @@
+package filesys
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
+
+ return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
+ var fileId, host string
+ var auth security.EncodedJwt
+
+ if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: wfs.option.Replication,
+ Collection: wfs.option.Collection,
+ TtlSec: wfs.option.TtlSec,
+ DataCenter: wfs.option.DataCenter,
+ Path: string(fullPath),
+ }
+
+ resp, err := client.AssignVolume(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
+ loc := &filer_pb.Location{
+ Url: resp.Url,
+ PublicUrl: resp.PublicUrl,
+ }
+ host = wfs.AdjustedUrl(loc)
+ collection, replication = resp.Collection, resp.Replication
+
+ return nil
+ }); err != nil {
+ return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
+ }
+
+ fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
+ if err != nil {
+ glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
+ return nil, "", "", fmt.Errorf("upload data: %v", err)
+ }
+ if uploadResult.Error != "" {
+ glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
+ return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+
+ wfs.chunkCache.SetChunk(fileId, data)
+
+ chunk = uploadResult.ToPbFileChunk(fileId, offset)
+ return chunk, collection, replication, nil
+ }
+}
diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go
index 7e7b8c60b..92e43b675 100644
--- a/weed/filesys/xattr.go
+++ b/weed/filesys/xattr.go
@@ -3,10 +3,11 @@ package filesys
import (
"context"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/seaweedfs/fuse"
+
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/seaweedfs/fuse"
)
func getxattr(entry *filer_pb.Entry, req *fuse.GetxattrRequest, resp *fuse.GetxattrResponse) error {
@@ -110,43 +111,13 @@ func listxattr(entry *filer_pb.Entry, req *fuse.ListxattrRequest, resp *fuse.Lis
func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err error) {
fullpath := util.NewFullPath(dir, name)
- entry = wfs.cacheGet(fullpath)
- if entry != nil {
- return
- }
// glog.V(3).Infof("read entry cache miss %s", fullpath)
// read from async meta cache
- if wfs.option.AsyncMetaDataCaching {
- cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
- if cacheErr == filer_pb.ErrNotFound {
- return nil, fuse.ENOENT
- }
- return cachedEntry.ToProtoEntry(), nil
+ meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
+ cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
+ if cacheErr == filer_pb.ErrNotFound {
+ return nil, fuse.ENOENT
}
-
- err = wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.LookupDirectoryEntryRequest{
- Name: name,
- Directory: dir,
- }
-
- resp, err := filer_pb.LookupEntry(client, request)
- if err != nil {
- if err == filer_pb.ErrNotFound {
- glog.V(3).Infof("file attr read not found file %v: %v", request, err)
- return fuse.ENOENT
- }
- glog.V(3).Infof("attr read %v: %v", request, err)
- return fuse.EIO
- }
-
- entry = resp.Entry
- wfs.cacheSet(fullpath, entry, wfs.option.EntryCacheTtl)
-
- return nil
- })
-
- return
+ return cachedEntry.ToProtoEntry(), cacheErr
}