aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir.go62
-rw-r--r--weed/filesys/dir_rename.go138
-rw-r--r--weed/filesys/dirty_pages_temp_file.go2
-rw-r--r--weed/filesys/dirty_pages_temp_interval.go2
-rw-r--r--weed/filesys/file.go57
-rw-r--r--weed/filesys/filehandle.go20
-rw-r--r--weed/filesys/meta_cache/meta_cache.go34
-rw-r--r--weed/filesys/meta_cache/meta_cache_init.go2
-rw-r--r--weed/filesys/meta_cache/meta_cache_subscribe.go60
-rw-r--r--weed/filesys/wfs.go25
-rw-r--r--weed/filesys/xattr.go15
11 files changed, 273 insertions, 144 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 72e41247f..9a791e013 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -54,28 +54,27 @@ func (dir *Dir) Id() uint64 {
func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
- // https://github.com/bazil/fuse/issues/196
- attr.Valid = time.Second
-
- if dir.FullPath() == dir.wfs.option.FilerMountRootPath {
- dir.setRootDirAttributes(attr)
- glog.V(3).Infof("root dir Attr %s, attr: %+v", dir.FullPath(), attr)
- return nil
- }
-
entry, err := dir.maybeLoadEntry()
if err != nil {
- glog.V(3).Infof("dir Attr %s,err: %+v", dir.FullPath(), err)
+ glog.V(3).Infof("dir Attr %s, err: %+v", dir.FullPath(), err)
return err
}
+ // https://github.com/bazil/fuse/issues/196
+ attr.Valid = time.Second
attr.Inode = dir.Id()
attr.Mode = os.FileMode(entry.Attributes.FileMode) | os.ModeDir
attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
+ attr.Ctime = time.Unix(entry.Attributes.Crtime, 0)
+ attr.Atime = time.Unix(entry.Attributes.Mtime, 0)
attr.Gid = entry.Attributes.Gid
attr.Uid = entry.Attributes.Uid
+ if dir.FullPath() == dir.wfs.option.FilerMountRootPath {
+ attr.BlockSize = blockSize
+ }
+
glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr)
return nil
@@ -93,20 +92,6 @@ func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *f
return getxattr(entry, req, resp)
}
-func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
- // attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode()
- attr.Valid = time.Second
- attr.Inode = dir.Id()
- attr.Uid = dir.wfs.option.MountUid
- attr.Gid = dir.wfs.option.MountGid
- attr.Mode = dir.wfs.option.MountMode
- attr.Crtime = dir.wfs.option.MountCtime
- attr.Ctime = dir.wfs.option.MountCtime
- attr.Mtime = dir.wfs.option.MountMtime
- attr.Atime = dir.wfs.option.MountMtime
- attr.BlockSize = blockSize
-}
-
func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
// fsync works at OS level
// write the file chunks to the filerGrpcAddress
@@ -156,7 +141,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
var node fs.Node
if isDirectory {
node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name))
- return node, nil, nil
+ return node, node, nil
}
node = dir.newFile(req.Name)
@@ -375,6 +360,28 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
glog.Errorf("list meta cache: %v", listErr)
return nil, fuse.EIO
}
+
+ // create proper . and .. directories
+ ret = append(ret, fuse.Dirent{
+ Inode: dirPath.AsInode(),
+ Name: ".",
+ Type: fuse.DT_Dir,
+ })
+
+ // return the correct parent inode for the mount root
+ var inode uint64
+ if string(dirPath) == dir.wfs.option.FilerMountRootPath {
+ inode = dir.wfs.option.MountParentInode
+ } else {
+ inode = util.FullPath(dir.parent.FullPath()).AsInode()
+ }
+
+ ret = append(ret, fuse.Dirent{
+ Inode: inode,
+ Name: "..",
+ Type: fuse.DT_Dir,
+ })
+
return
}
@@ -436,7 +443,10 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
dir.wfs.handlesLock.Lock()
defer dir.wfs.handlesLock.Unlock()
inodeId := filePath.AsInode()
- delete(dir.wfs.handles, inodeId)
+ if fh, ok := dir.wfs.handles[inodeId]; ok {
+ delete(dir.wfs.handles, inodeId)
+ fh.isDeleted = true
+ }
return nil
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go
index d50c6dab0..dd76577b0 100644
--- a/weed/filesys/dir_rename.go
+++ b/weed/filesys/dir_rename.go
@@ -2,6 +2,9 @@ package filesys
import (
"context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "math"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
@@ -37,6 +40,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
OldName: req.OldName,
NewDirectory: newDir.FullPath(),
NewName: req.NewName,
+ Signatures: []int32{dir.wfs.signature},
}
_, err := client.AtomicRenameEntry(ctx, request)
@@ -53,46 +57,118 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
return fuse.EIO
}
- // TODO: replicate renaming logic on filer
- if err := dir.wfs.metaCache.DeleteEntry(context.Background(), oldPath); err != nil {
- glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err)
+ err = dir.moveEntry(context.Background(), util.FullPath(dir.FullPath()), oldEntry, util.FullPath(newDir.FullPath()), req.NewName)
+ if err != nil {
+ glog.V(0).Infof("dir local Rename %s => %s : %v", oldPath, newPath, err)
return fuse.EIO
}
- oldEntry.FullPath = newPath
- if err := dir.wfs.metaCache.InsertEntry(context.Background(), oldEntry); err != nil {
+
+ return nil
+}
+
+func (dir *Dir) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
+
+ oldName := entry.Name()
+
+ oldPath := oldParent.Child(oldName)
+ newPath := newParent.Child(newName)
+ if err := dir.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error {
+
+ oldFsNode := NodeWithId(oldPath.AsInode())
+ newFsNode := NodeWithId(newPath.AsInode())
+ newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode()))
+ var newDir *Dir
+ if found {
+ newDir = newDirNode.(*Dir)
+ }
+ dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) {
+ if file, ok := internalNode.(*File); ok {
+ glog.V(4).Infof("internal file node %s", oldParent.Child(oldName))
+ file.Name = newName
+ file.id = uint64(newFsNode)
+ if found {
+ file.dir = newDir
+ }
+ }
+ if dir, ok := internalNode.(*Dir); ok {
+ glog.V(4).Infof("internal dir node %s", oldParent.Child(oldName))
+ dir.name = newName
+ dir.id = uint64(newFsNode)
+ if found {
+ dir.parent = newDir
+ }
+ }
+ })
+
+ // change file handle
+ inodeId := oldPath.AsInode()
+ dir.wfs.handlesLock.Lock()
+ if existingHandle, found := dir.wfs.handles[inodeId]; found && existingHandle == nil {
+ glog.V(4).Infof("opened file handle %s => %s", oldPath, newPath)
+ delete(dir.wfs.handles, inodeId)
+ dir.wfs.handles[newPath.AsInode()] = existingHandle
+ }
+ dir.wfs.handlesLock.Unlock()
+
+ if entry.IsDirectory() {
+ if err := dir.moveFolderSubEntries(ctx, oldParent, oldName, newParent, newName); err != nil {
+ return err
+ }
+ }
+ return nil
+ }); err != nil {
+ return fmt.Errorf("fail to move %s => %s: %v", oldPath, newPath, err)
+ }
+
+ return nil
+}
+
+func (dir *Dir) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, oldName string, newParent util.FullPath, newName string) error {
+
+ currentDirPath := oldParent.Child(oldName)
+ newDirPath := newParent.Child(newName)
+
+ glog.V(1).Infof("moving folder %s => %s", currentDirPath, newDirPath)
+
+ var moveErr error
+ listErr := dir.wfs.metaCache.ListDirectoryEntries(ctx, currentDirPath, "", false, int64(math.MaxInt32), func(item *filer.Entry) bool {
+ moveErr = dir.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name())
+ if moveErr != nil {
+ return false
+ }
+ return true
+ })
+ if listErr != nil {
+ return listErr
+ }
+ if moveErr != nil {
+ return moveErr
+ }
+
+ return nil
+}
+
+func (dir *Dir) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error {
+
+ newPath := newParent.Child(newName)
+ oldPath := oldParent.Child(entry.Name())
+
+ entry.FullPath = newPath
+ if err := dir.wfs.metaCache.InsertEntry(ctx, entry); err != nil {
glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err)
return fuse.EIO
}
- oldFsNode := NodeWithId(oldPath.AsInode())
- newFsNode := NodeWithId(newPath.AsInode())
- dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) {
- if file, ok := internalNode.(*File); ok {
- glog.V(4).Infof("internal file node %s", file.Name)
- file.Name = req.NewName
- file.id = uint64(newFsNode)
- file.dir = newDir
+ if moveFolderSubEntries != nil {
+ if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil {
+ return moveChildrenErr
}
- if dir, ok := internalNode.(*Dir); ok {
- glog.V(4).Infof("internal dir node %s", dir.name)
- dir.name = req.NewName
- dir.id = uint64(newFsNode)
- dir.parent = newDir
- }
- })
+ }
- // change file handle
- dir.wfs.handlesLock.Lock()
- defer dir.wfs.handlesLock.Unlock()
- inodeId := oldPath.AsInode()
- existingHandle, found := dir.wfs.handles[inodeId]
- glog.V(4).Infof("has open filehandle %s: %v", oldPath, found)
- if !found || existingHandle == nil {
- return nil
+ if err := dir.wfs.metaCache.DeleteEntry(ctx, oldPath); err != nil {
+ glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err)
+ return fuse.EIO
}
- glog.V(4).Infof("opened filehandle %s => %s", oldPath, newPath)
- delete(dir.wfs.handles, inodeId)
- dir.wfs.handles[newPath.AsInode()] = existingHandle
return nil
}
diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go
index 3826008b7..9fa7c0c8e 100644
--- a/weed/filesys/dirty_pages_temp_file.go
+++ b/weed/filesys/dirty_pages_temp_file.go
@@ -97,7 +97,7 @@ func (pages *TempFileDirtyPages) saveExistingPagesToStorage() {
for _, list := range pages.writtenIntervals.lists {
listStopOffset := list.Offset() + list.Size()
- for uploadedOffset:=int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
+ for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize)
if start >= stop {
continue
diff --git a/weed/filesys/dirty_pages_temp_interval.go b/weed/filesys/dirty_pages_temp_interval.go
index 2030178be..42c4b5a3b 100644
--- a/weed/filesys/dirty_pages_temp_interval.go
+++ b/weed/filesys/dirty_pages_temp_interval.go
@@ -54,7 +54,7 @@ func (list *WrittenIntervalLinkedList) ReadData(buf []byte, start, stop int64) {
nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size)
if nodeStart < nodeStop {
// glog.V(4).Infof("copying start=%d stop=%d t=[%d,%d) => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.DataOffset, t.DataOffset+t.Size, len(buf), nodeStart, nodeStop)
- list.tempFile.ReadAt(buf[nodeStart-start:nodeStop-start], t.TempOffset + nodeStart - t.DataOffset)
+ list.tempFile.ReadAt(buf[nodeStart-start:nodeStop-start], t.TempOffset+nodeStart-t.DataOffset)
}
if t.Next == nil {
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 4b711ecee..b990b20d1 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -115,16 +115,6 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
if err != nil {
return err
}
- if file.isOpen > 0 {
- file.wfs.handlesLock.Lock()
- fileHandle := file.wfs.handles[file.Id()]
- file.wfs.handlesLock.Unlock()
-
- if fileHandle != nil {
- fileHandle.Lock()
- defer fileHandle.Unlock()
- }
- }
if req.Valid.Size() {
@@ -154,17 +144,17 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
file.dirtyMetadata = true
}
- if req.Valid.Mode() {
+ if req.Valid.Mode() && entry.Attributes.FileMode != uint32(req.Mode) {
entry.Attributes.FileMode = uint32(req.Mode)
file.dirtyMetadata = true
}
- if req.Valid.Uid() {
+ if req.Valid.Uid() && entry.Attributes.Uid != req.Uid {
entry.Attributes.Uid = req.Uid
file.dirtyMetadata = true
}
- if req.Valid.Gid() {
+ if req.Valid.Gid() && entry.Attributes.Gid != req.Gid {
entry.Attributes.Gid = req.Gid
file.dirtyMetadata = true
}
@@ -174,7 +164,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
file.dirtyMetadata = true
}
- if req.Valid.Mtime() {
+ if req.Valid.Mtime() && entry.Attributes.Mtime != req.Mtime.Unix() {
entry.Attributes.Mtime = req.Mtime.Unix()
file.dirtyMetadata = true
}
@@ -207,6 +197,11 @@ func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error
if err := setxattr(entry, req); err != nil {
return err
}
+ file.dirtyMetadata = true
+
+ if file.isOpen > 0 {
+ return nil
+ }
return file.saveEntry(entry)
@@ -224,6 +219,11 @@ func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest)
if err := removexattr(entry, req); err != nil {
return err
}
+ file.dirtyMetadata = true
+
+ if file.isOpen > 0 {
+ return nil
+ }
return file.saveEntry(entry)
@@ -351,6 +351,8 @@ func (file *File) saveEntry(entry *filer_pb.Entry) error {
file.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
+ file.dirtyMetadata = false
+
return nil
})
}
@@ -358,3 +360,30 @@ func (file *File) saveEntry(entry *filer_pb.Entry) error {
func (file *File) getEntry() *filer_pb.Entry {
return file.entry
}
+
+func (file *File) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) {
+ err := file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.DownloadToLocalRequest{
+ Directory: file.dir.FullPath(),
+ Name: entry.Name,
+ }
+
+ glog.V(4).Infof("download entry: %v", request)
+ resp, err := client.DownloadToLocal(context.Background(), request)
+ if err != nil {
+ glog.Errorf("DownloadToLocal file %s/%s: %v", file.dir.FullPath(), file.Name, err)
+ return fuse.EIO
+ }
+
+ entry = resp.Entry
+
+ file.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, resp.Entry))
+
+ file.dirtyMetadata = false
+
+ return nil
+ })
+
+ return entry, err
+}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 88cfe45f0..5cd7ca948 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -33,11 +33,12 @@ type FileHandle struct {
Uid uint32 // user ID of process making request
Gid uint32 // group ID of process making request
writeOnly bool
+ isDeleted bool
}
func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle {
fh := &FileHandle{
- f: file,
+ f: file,
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
dirtyPages: newTempFileDirtyPages(file, writeOnly),
Uid: uid,
@@ -113,6 +114,16 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
return 0, io.EOF
}
+ if entry.IsInRemoteOnly() {
+ glog.V(4).Infof("download remote entry %s", fh.f.fullpath())
+ newEntry, err := fh.f.downloadRemoteEntry(entry)
+ if err != nil {
+ glog.V(1).Infof("download remote entry %s: %v", fh.f.fullpath(), err)
+ return 0, err
+ }
+ entry = newEntry
+ }
+
fileSize := int64(filer.FileSize(entry))
fileFullPath := fh.f.fullpath()
@@ -129,7 +140,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
var chunkResolveErr error
if fh.entryViewCache == nil {
- fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks)
+ fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks, 0, math.MaxInt64)
if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
@@ -222,6 +233,11 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
glog.V(4).Infof("Flush %v fh %d", fh.f.fullpath(), fh.handle)
+ if fh.isDeleted {
+ glog.V(4).Infof("Flush %v fh %d skip deleted", fh.f.fullpath(), fh.handle)
+ return nil
+ }
+
fh.Lock()
defer fh.Unlock()
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index 3f6391c39..69d1655ee 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -3,14 +3,12 @@ package meta_cache
import (
"context"
"fmt"
- "os"
- "sync"
-
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/leveldb"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/bounded_tree"
+ "os"
)
// need to have logic similar to FilerStoreWrapper
@@ -18,7 +16,7 @@ import (
type MetaCache struct {
localStore filer.VirtualFilerStore
- sync.RWMutex
+ // sync.RWMutex
visitedBoundary *bounded_tree.BoundedTree
uidGidMapper *UidGidMapper
invalidateFunc func(util.FullPath)
@@ -54,8 +52,8 @@ func openMetaStore(dbFolder string) filer.VirtualFilerStore {
}
func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
- mc.Lock()
- defer mc.Unlock()
+ //mc.Lock()
+ //defer mc.Unlock()
return mc.doInsertEntry(ctx, entry)
}
@@ -64,8 +62,8 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro
}
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
- mc.Lock()
- defer mc.Unlock()
+ //mc.Lock()
+ //defer mc.Unlock()
oldDir, _ := oldPath.DirAndName()
if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) {
@@ -97,14 +95,14 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti
}
func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
- mc.Lock()
- defer mc.Unlock()
+ //mc.Lock()
+ //defer mc.Unlock()
return mc.localStore.UpdateEntry(ctx, entry)
}
func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
- mc.RLock()
- defer mc.RUnlock()
+ //mc.RLock()
+ //defer mc.RUnlock()
entry, err = mc.localStore.FindEntry(ctx, fp)
if err != nil {
return nil, err
@@ -114,14 +112,14 @@ func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *fi
}
func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
- mc.Lock()
- defer mc.Unlock()
+ //mc.Lock()
+ //defer mc.Unlock()
return mc.localStore.DeleteEntry(ctx, fp)
}
func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
- mc.RLock()
- defer mc.RUnlock()
+ //mc.RLock()
+ //defer mc.RUnlock()
if !mc.visitedBoundary.HasVisited(dirPath) {
return fmt.Errorf("unsynchronized dir: %v", dirPath)
@@ -138,8 +136,8 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full
}
func (mc *MetaCache) Shutdown() {
- mc.Lock()
- defer mc.Unlock()
+ //mc.Lock()
+ //defer mc.Unlock()
mc.localStore.Shutdown()
}
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
index 9af25ae29..07098bf6b 100644
--- a/weed/filesys/meta_cache/meta_cache_init.go
+++ b/weed/filesys/meta_cache/meta_cache_init.go
@@ -43,5 +43,5 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
}
func IsHiddenSystemEntry(dir, name string) bool {
- return dir == "/" && name == "topics"
+ return dir == "/" && (name == "topics" || name == "etc")
}
diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go
index f9973f436..c650b8024 100644
--- a/weed/filesys/meta_cache/meta_cache_subscribe.go
+++ b/weed/filesys/meta_cache/meta_cache_subscribe.go
@@ -2,12 +2,9 @@ package meta_cache
import (
"context"
- "fmt"
- "io"
- "time"
-
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -40,47 +37,30 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
newEntry = filer.FromPbEntry(dir, message.NewEntry)
}
err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
- if err == nil && message.OldEntry != nil && message.NewEntry != nil {
- key := util.NewFullPath(dir, message.NewEntry.Name)
- mc.invalidateFunc(key)
+ if err == nil {
+ if message.OldEntry != nil && message.NewEntry != nil {
+ if message.OldEntry.Name == message.NewEntry.Name {
+ // no need to invalidate
+ } else {
+ oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
+ mc.invalidateFunc(oldKey)
+ newKey := util.NewFullPath(dir, message.NewEntry.Name)
+ mc.invalidateFunc(newKey)
+ }
+ } else if message.OldEntry == nil && message.NewEntry != nil {
+ // no need to invaalidate
+ } else if message.OldEntry != nil && message.NewEntry == nil {
+ oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
+ mc.invalidateFunc(oldKey)
+ }
}
return err
}
- for {
- err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "mount",
- PathPrefix: dir,
- SinceNs: lastTsNs,
- Signature: selfSignature,
- })
- if err != nil {
- return fmt.Errorf("subscribe: %v", err)
- }
-
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
+ return util.Retry("followMetaUpdates", func() error {
+ return pb.WithFilerClientFollowMetadata(client, "mount", dir, lastTsNs, selfSignature, processEventFn, true)
+ })
- if err := processEventFn(resp); err != nil {
- glog.Fatalf("process %v: %v", resp, err)
- }
- lastTsNs = resp.TsNs
- }
- })
- if err != nil {
- glog.Errorf("subscribing filer meta change: %v", err)
- }
- time.Sleep(time.Second)
- }
}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 8f864a123..0e0050964 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -3,9 +3,6 @@ package filesys
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
"math/rand"
"os"
@@ -14,6 +11,10 @@ import (
"sync"
"time"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
+
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/util/grace"
@@ -46,11 +47,12 @@ type Option struct {
DataCenter string
Umask os.FileMode
- MountUid uint32
- MountGid uint32
- MountMode os.FileMode
- MountCtime time.Time
- MountMtime time.Time
+ MountUid uint32
+ MountGid uint32
+ MountMode os.FileMode
+ MountCtime time.Time
+ MountMtime time.Time
+ MountParentInode uint64
VolumeServerAccess string // how to access volume servers
Cipher bool // whether encrypt data on volume server
@@ -123,8 +125,6 @@ func NewSeaweedFileSystem(option *Option) *WFS {
glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
}
})
- startTime := time.Now()
- go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
grace.OnInterrupt(func() {
wfs.metaCache.Shutdown()
})
@@ -139,6 +139,11 @@ func NewSeaweedFileSystem(option *Option) *WFS {
return wfs
}
+func (wfs *WFS) StartBackgroundTasks() {
+ startTime := time.Now()
+ go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
+}
+
func (wfs *WFS) Root() (fs.Node, error) {
return wfs.root, nil
}
diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go
index 92e43b675..473805116 100644
--- a/weed/filesys/xattr.go
+++ b/weed/filesys/xattr.go
@@ -113,6 +113,21 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err err
fullpath := util.NewFullPath(dir, name)
// glog.V(3).Infof("read entry cache miss %s", fullpath)
+ // return a valid entry for the mount root
+ if string(fullpath) == wfs.option.FilerMountRootPath {
+ return &filer_pb.Entry{
+ Name: wfs.option.FilerMountRootPath,
+ IsDirectory: true,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: wfs.option.MountMtime.Unix(),
+ FileMode: uint32(wfs.option.MountMode),
+ Uid: wfs.option.MountUid,
+ Gid: wfs.option.MountGid,
+ Crtime: wfs.option.MountCtime.Unix(),
+ },
+ }, nil
+ }
+
// read from async meta cache
meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)