diff options
| author | Bl1tz23 <alex3angle@gmail.com> | 2021-08-10 13:45:24 +0300 |
|---|---|---|
| committer | Bl1tz23 <alex3angle@gmail.com> | 2021-08-10 13:45:24 +0300 |
| commit | 1c94b3d01340baad000188550fcf2ccab6ca80e5 (patch) | |
| tree | 12c3da17eb2d1a43fef78021a3d7c79110b0ff5f /weed/filesys | |
| parent | e6e57db530217ff57b3622b4672b03ebb6313e96 (diff) | |
| parent | f9cf9b93d32a2b01bc4d95ce7d24d86ef60be668 (diff) | |
| download | seaweedfs-1c94b3d01340baad000188550fcf2ccab6ca80e5.tar.xz seaweedfs-1c94b3d01340baad000188550fcf2ccab6ca80e5.zip | |
merge master, resolve conflicts
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 62 | ||||
| -rw-r--r-- | weed/filesys/dir_rename.go | 138 | ||||
| -rw-r--r-- | weed/filesys/dirty_pages_temp_file.go | 2 | ||||
| -rw-r--r-- | weed/filesys/dirty_pages_temp_interval.go | 2 | ||||
| -rw-r--r-- | weed/filesys/file.go | 57 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 20 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache.go | 34 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache_init.go | 2 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache_subscribe.go | 60 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 25 | ||||
| -rw-r--r-- | weed/filesys/xattr.go | 15 |
11 files changed, 273 insertions, 144 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 72e41247f..9a791e013 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -54,28 +54,27 @@ func (dir *Dir) Id() uint64 { func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { - // https://github.com/bazil/fuse/issues/196 - attr.Valid = time.Second - - if dir.FullPath() == dir.wfs.option.FilerMountRootPath { - dir.setRootDirAttributes(attr) - glog.V(3).Infof("root dir Attr %s, attr: %+v", dir.FullPath(), attr) - return nil - } - entry, err := dir.maybeLoadEntry() if err != nil { - glog.V(3).Infof("dir Attr %s,err: %+v", dir.FullPath(), err) + glog.V(3).Infof("dir Attr %s, err: %+v", dir.FullPath(), err) return err } + // https://github.com/bazil/fuse/issues/196 + attr.Valid = time.Second attr.Inode = dir.Id() attr.Mode = os.FileMode(entry.Attributes.FileMode) | os.ModeDir attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) attr.Crtime = time.Unix(entry.Attributes.Crtime, 0) + attr.Ctime = time.Unix(entry.Attributes.Crtime, 0) + attr.Atime = time.Unix(entry.Attributes.Mtime, 0) attr.Gid = entry.Attributes.Gid attr.Uid = entry.Attributes.Uid + if dir.FullPath() == dir.wfs.option.FilerMountRootPath { + attr.BlockSize = blockSize + } + glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) return nil @@ -93,20 +92,6 @@ func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *f return getxattr(entry, req, resp) } -func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { - // attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode() - attr.Valid = time.Second - attr.Inode = dir.Id() - attr.Uid = dir.wfs.option.MountUid - attr.Gid = dir.wfs.option.MountGid - attr.Mode = dir.wfs.option.MountMode - attr.Crtime = dir.wfs.option.MountCtime - attr.Ctime = dir.wfs.option.MountCtime - attr.Mtime = dir.wfs.option.MountMtime - attr.Atime = dir.wfs.option.MountMtime - attr.BlockSize = blockSize -} - func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { // fsync works at OS level // write the file chunks to the filerGrpcAddress @@ -156,7 +141,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, var node fs.Node if isDirectory { node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name)) - return node, nil, nil + return node, node, nil } node = dir.newFile(req.Name) @@ -375,6 +360,28 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { glog.Errorf("list meta cache: %v", listErr) return nil, fuse.EIO } + + // create proper . and .. directories + ret = append(ret, fuse.Dirent{ + Inode: dirPath.AsInode(), + Name: ".", + Type: fuse.DT_Dir, + }) + + // return the correct parent inode for the mount root + var inode uint64 + if string(dirPath) == dir.wfs.option.FilerMountRootPath { + inode = dir.wfs.option.MountParentInode + } else { + inode = util.FullPath(dir.parent.FullPath()).AsInode() + } + + ret = append(ret, fuse.Dirent{ + Inode: inode, + Name: "..", + Type: fuse.DT_Dir, + }) + return } @@ -436,7 +443,10 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { dir.wfs.handlesLock.Lock() defer dir.wfs.handlesLock.Unlock() inodeId := filePath.AsInode() - delete(dir.wfs.handles, inodeId) + if fh, ok := dir.wfs.handles[inodeId]; ok { + delete(dir.wfs.handles, inodeId) + fh.isDeleted = true + } return nil diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index d50c6dab0..dd76577b0 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -2,6 +2,9 @@ package filesys import ( "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "math" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" @@ -37,6 +40,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector OldName: req.OldName, NewDirectory: newDir.FullPath(), NewName: req.NewName, + Signatures: []int32{dir.wfs.signature}, } _, err := client.AtomicRenameEntry(ctx, request) @@ -53,46 +57,118 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector return fuse.EIO } - // TODO: replicate renaming logic on filer - if err := dir.wfs.metaCache.DeleteEntry(context.Background(), oldPath); err != nil { - glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err) + err = dir.moveEntry(context.Background(), util.FullPath(dir.FullPath()), oldEntry, util.FullPath(newDir.FullPath()), req.NewName) + if err != nil { + glog.V(0).Infof("dir local Rename %s => %s : %v", oldPath, newPath, err) return fuse.EIO } - oldEntry.FullPath = newPath - if err := dir.wfs.metaCache.InsertEntry(context.Background(), oldEntry); err != nil { + + return nil +} + +func (dir *Dir) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error { + + oldName := entry.Name() + + oldPath := oldParent.Child(oldName) + newPath := newParent.Child(newName) + if err := dir.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error { + + oldFsNode := NodeWithId(oldPath.AsInode()) + newFsNode := NodeWithId(newPath.AsInode()) + newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode())) + var newDir *Dir + if found { + newDir = newDirNode.(*Dir) + } + dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) { + if file, ok := internalNode.(*File); ok { + glog.V(4).Infof("internal file node %s", oldParent.Child(oldName)) + file.Name = newName + file.id = uint64(newFsNode) + if found { + file.dir = newDir + } + } + if dir, ok := internalNode.(*Dir); ok { + glog.V(4).Infof("internal dir node %s", oldParent.Child(oldName)) + dir.name = newName + dir.id = uint64(newFsNode) + if found { + dir.parent = newDir + } + } + }) + + // change file handle + inodeId := oldPath.AsInode() + dir.wfs.handlesLock.Lock() + if existingHandle, found := dir.wfs.handles[inodeId]; found && existingHandle == nil { + glog.V(4).Infof("opened file handle %s => %s", oldPath, newPath) + delete(dir.wfs.handles, inodeId) + dir.wfs.handles[newPath.AsInode()] = existingHandle + } + dir.wfs.handlesLock.Unlock() + + if entry.IsDirectory() { + if err := dir.moveFolderSubEntries(ctx, oldParent, oldName, newParent, newName); err != nil { + return err + } + } + return nil + }); err != nil { + return fmt.Errorf("fail to move %s => %s: %v", oldPath, newPath, err) + } + + return nil +} + +func (dir *Dir) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, oldName string, newParent util.FullPath, newName string) error { + + currentDirPath := oldParent.Child(oldName) + newDirPath := newParent.Child(newName) + + glog.V(1).Infof("moving folder %s => %s", currentDirPath, newDirPath) + + var moveErr error + listErr := dir.wfs.metaCache.ListDirectoryEntries(ctx, currentDirPath, "", false, int64(math.MaxInt32), func(item *filer.Entry) bool { + moveErr = dir.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name()) + if moveErr != nil { + return false + } + return true + }) + if listErr != nil { + return listErr + } + if moveErr != nil { + return moveErr + } + + return nil +} + +func (dir *Dir) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error { + + newPath := newParent.Child(newName) + oldPath := oldParent.Child(entry.Name()) + + entry.FullPath = newPath + if err := dir.wfs.metaCache.InsertEntry(ctx, entry); err != nil { glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err) return fuse.EIO } - oldFsNode := NodeWithId(oldPath.AsInode()) - newFsNode := NodeWithId(newPath.AsInode()) - dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) { - if file, ok := internalNode.(*File); ok { - glog.V(4).Infof("internal file node %s", file.Name) - file.Name = req.NewName - file.id = uint64(newFsNode) - file.dir = newDir + if moveFolderSubEntries != nil { + if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil { + return moveChildrenErr } - if dir, ok := internalNode.(*Dir); ok { - glog.V(4).Infof("internal dir node %s", dir.name) - dir.name = req.NewName - dir.id = uint64(newFsNode) - dir.parent = newDir - } - }) + } - // change file handle - dir.wfs.handlesLock.Lock() - defer dir.wfs.handlesLock.Unlock() - inodeId := oldPath.AsInode() - existingHandle, found := dir.wfs.handles[inodeId] - glog.V(4).Infof("has open filehandle %s: %v", oldPath, found) - if !found || existingHandle == nil { - return nil + if err := dir.wfs.metaCache.DeleteEntry(ctx, oldPath); err != nil { + glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err) + return fuse.EIO } - glog.V(4).Infof("opened filehandle %s => %s", oldPath, newPath) - delete(dir.wfs.handles, inodeId) - dir.wfs.handles[newPath.AsInode()] = existingHandle return nil } diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go index 3826008b7..9fa7c0c8e 100644 --- a/weed/filesys/dirty_pages_temp_file.go +++ b/weed/filesys/dirty_pages_temp_file.go @@ -97,7 +97,7 @@ func (pages *TempFileDirtyPages) saveExistingPagesToStorage() { for _, list := range pages.writtenIntervals.lists { listStopOffset := list.Offset() + list.Size() - for uploadedOffset:=int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize { + for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize { start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize) if start >= stop { continue diff --git a/weed/filesys/dirty_pages_temp_interval.go b/weed/filesys/dirty_pages_temp_interval.go index 2030178be..42c4b5a3b 100644 --- a/weed/filesys/dirty_pages_temp_interval.go +++ b/weed/filesys/dirty_pages_temp_interval.go @@ -54,7 +54,7 @@ func (list *WrittenIntervalLinkedList) ReadData(buf []byte, start, stop int64) { nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size) if nodeStart < nodeStop { // glog.V(4).Infof("copying start=%d stop=%d t=[%d,%d) => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.DataOffset, t.DataOffset+t.Size, len(buf), nodeStart, nodeStop) - list.tempFile.ReadAt(buf[nodeStart-start:nodeStop-start], t.TempOffset + nodeStart - t.DataOffset) + list.tempFile.ReadAt(buf[nodeStart-start:nodeStop-start], t.TempOffset+nodeStart-t.DataOffset) } if t.Next == nil { diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 4b711ecee..b990b20d1 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -115,16 +115,6 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f if err != nil { return err } - if file.isOpen > 0 { - file.wfs.handlesLock.Lock() - fileHandle := file.wfs.handles[file.Id()] - file.wfs.handlesLock.Unlock() - - if fileHandle != nil { - fileHandle.Lock() - defer fileHandle.Unlock() - } - } if req.Valid.Size() { @@ -154,17 +144,17 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f file.dirtyMetadata = true } - if req.Valid.Mode() { + if req.Valid.Mode() && entry.Attributes.FileMode != uint32(req.Mode) { entry.Attributes.FileMode = uint32(req.Mode) file.dirtyMetadata = true } - if req.Valid.Uid() { + if req.Valid.Uid() && entry.Attributes.Uid != req.Uid { entry.Attributes.Uid = req.Uid file.dirtyMetadata = true } - if req.Valid.Gid() { + if req.Valid.Gid() && entry.Attributes.Gid != req.Gid { entry.Attributes.Gid = req.Gid file.dirtyMetadata = true } @@ -174,7 +164,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f file.dirtyMetadata = true } - if req.Valid.Mtime() { + if req.Valid.Mtime() && entry.Attributes.Mtime != req.Mtime.Unix() { entry.Attributes.Mtime = req.Mtime.Unix() file.dirtyMetadata = true } @@ -207,6 +197,11 @@ func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error if err := setxattr(entry, req); err != nil { return err } + file.dirtyMetadata = true + + if file.isOpen > 0 { + return nil + } return file.saveEntry(entry) @@ -224,6 +219,11 @@ func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) if err := removexattr(entry, req); err != nil { return err } + file.dirtyMetadata = true + + if file.isOpen > 0 { + return nil + } return file.saveEntry(entry) @@ -351,6 +351,8 @@ func (file *File) saveEntry(entry *filer_pb.Entry) error { file.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry)) + file.dirtyMetadata = false + return nil }) } @@ -358,3 +360,30 @@ func (file *File) saveEntry(entry *filer_pb.Entry) error { func (file *File) getEntry() *filer_pb.Entry { return file.entry } + +func (file *File) downloadRemoteEntry(entry *filer_pb.Entry) (*filer_pb.Entry, error) { + err := file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.DownloadToLocalRequest{ + Directory: file.dir.FullPath(), + Name: entry.Name, + } + + glog.V(4).Infof("download entry: %v", request) + resp, err := client.DownloadToLocal(context.Background(), request) + if err != nil { + glog.Errorf("DownloadToLocal file %s/%s: %v", file.dir.FullPath(), file.Name, err) + return fuse.EIO + } + + entry = resp.Entry + + file.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, resp.Entry)) + + file.dirtyMetadata = false + + return nil + }) + + return entry, err +} diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 88cfe45f0..5cd7ca948 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -33,11 +33,12 @@ type FileHandle struct { Uid uint32 // user ID of process making request Gid uint32 // group ID of process making request writeOnly bool + isDeleted bool } func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle { fh := &FileHandle{ - f: file, + f: file, // dirtyPages: newContinuousDirtyPages(file, writeOnly), dirtyPages: newTempFileDirtyPages(file, writeOnly), Uid: uid, @@ -113,6 +114,16 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { return 0, io.EOF } + if entry.IsInRemoteOnly() { + glog.V(4).Infof("download remote entry %s", fh.f.fullpath()) + newEntry, err := fh.f.downloadRemoteEntry(entry) + if err != nil { + glog.V(1).Infof("download remote entry %s: %v", fh.f.fullpath(), err) + return 0, err + } + entry = newEntry + } + fileSize := int64(filer.FileSize(entry)) fileFullPath := fh.f.fullpath() @@ -129,7 +140,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { var chunkResolveErr error if fh.entryViewCache == nil { - fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks) + fh.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), entry.Chunks, 0, math.MaxInt64) if chunkResolveErr != nil { return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) } @@ -222,6 +233,11 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { glog.V(4).Infof("Flush %v fh %d", fh.f.fullpath(), fh.handle) + if fh.isDeleted { + glog.V(4).Infof("Flush %v fh %d skip deleted", fh.f.fullpath(), fh.handle) + return nil + } + fh.Lock() defer fh.Unlock() diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index 3f6391c39..69d1655ee 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -3,14 +3,12 @@ package meta_cache import ( "context" "fmt" - "os" - "sync" - "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer/leveldb" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/bounded_tree" + "os" ) // need to have logic similar to FilerStoreWrapper @@ -18,7 +16,7 @@ import ( type MetaCache struct { localStore filer.VirtualFilerStore - sync.RWMutex + // sync.RWMutex visitedBoundary *bounded_tree.BoundedTree uidGidMapper *UidGidMapper invalidateFunc func(util.FullPath) @@ -54,8 +52,8 @@ func openMetaStore(dbFolder string) filer.VirtualFilerStore { } func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error { - mc.Lock() - defer mc.Unlock() + //mc.Lock() + //defer mc.Unlock() return mc.doInsertEntry(ctx, entry) } @@ -64,8 +62,8 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro } func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error { - mc.Lock() - defer mc.Unlock() + //mc.Lock() + //defer mc.Unlock() oldDir, _ := oldPath.DirAndName() if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) { @@ -97,14 +95,14 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti } func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error { - mc.Lock() - defer mc.Unlock() + //mc.Lock() + //defer mc.Unlock() return mc.localStore.UpdateEntry(ctx, entry) } func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) { - mc.RLock() - defer mc.RUnlock() + //mc.RLock() + //defer mc.RUnlock() entry, err = mc.localStore.FindEntry(ctx, fp) if err != nil { return nil, err @@ -114,14 +112,14 @@ func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *fi } func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { - mc.Lock() - defer mc.Unlock() + //mc.Lock() + //defer mc.Unlock() return mc.localStore.DeleteEntry(ctx, fp) } func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error { - mc.RLock() - defer mc.RUnlock() + //mc.RLock() + //defer mc.RUnlock() if !mc.visitedBoundary.HasVisited(dirPath) { return fmt.Errorf("unsynchronized dir: %v", dirPath) @@ -138,8 +136,8 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full } func (mc *MetaCache) Shutdown() { - mc.Lock() - defer mc.Unlock() + //mc.Lock() + //defer mc.Unlock() mc.localStore.Shutdown() } diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go index 9af25ae29..07098bf6b 100644 --- a/weed/filesys/meta_cache/meta_cache_init.go +++ b/weed/filesys/meta_cache/meta_cache_init.go @@ -43,5 +43,5 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full } func IsHiddenSystemEntry(dir, name string) bool { - return dir == "/" && name == "topics" + return dir == "/" && (name == "topics" || name == "etc") } diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go index f9973f436..c650b8024 100644 --- a/weed/filesys/meta_cache/meta_cache_subscribe.go +++ b/weed/filesys/meta_cache/meta_cache_subscribe.go @@ -2,12 +2,9 @@ package meta_cache import ( "context" - "fmt" - "io" - "time" - "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -40,47 +37,30 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil newEntry = filer.FromPbEntry(dir, message.NewEntry) } err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry) - if err == nil && message.OldEntry != nil && message.NewEntry != nil { - key := util.NewFullPath(dir, message.NewEntry.Name) - mc.invalidateFunc(key) + if err == nil { + if message.OldEntry != nil && message.NewEntry != nil { + if message.OldEntry.Name == message.NewEntry.Name { + // no need to invalidate + } else { + oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) + mc.invalidateFunc(oldKey) + newKey := util.NewFullPath(dir, message.NewEntry.Name) + mc.invalidateFunc(newKey) + } + } else if message.OldEntry == nil && message.NewEntry != nil { + // no need to invaalidate + } else if message.OldEntry != nil && message.NewEntry == nil { + oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name) + mc.invalidateFunc(oldKey) + } } return err } - for { - err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "mount", - PathPrefix: dir, - SinceNs: lastTsNs, - Signature: selfSignature, - }) - if err != nil { - return fmt.Errorf("subscribe: %v", err) - } - - for { - resp, listenErr := stream.Recv() - if listenErr == io.EOF { - return nil - } - if listenErr != nil { - return listenErr - } + return util.Retry("followMetaUpdates", func() error { + return pb.WithFilerClientFollowMetadata(client, "mount", dir, lastTsNs, selfSignature, processEventFn, true) + }) - if err := processEventFn(resp); err != nil { - glog.Fatalf("process %v: %v", resp, err) - } - lastTsNs = resp.TsNs - } - }) - if err != nil { - glog.Errorf("subscribing filer meta change: %v", err) - } - time.Sleep(time.Second) - } } diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 8f864a123..0e0050964 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -3,9 +3,6 @@ package filesys import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/chrislusf/seaweedfs/weed/wdclient" "math" "math/rand" "os" @@ -14,6 +11,10 @@ import ( "sync" "time" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/util/grace" @@ -46,11 +47,12 @@ type Option struct { DataCenter string Umask os.FileMode - MountUid uint32 - MountGid uint32 - MountMode os.FileMode - MountCtime time.Time - MountMtime time.Time + MountUid uint32 + MountGid uint32 + MountMode os.FileMode + MountCtime time.Time + MountMtime time.Time + MountParentInode uint64 VolumeServerAccess string // how to access volume servers Cipher bool // whether encrypt data on volume server @@ -123,8 +125,6 @@ func NewSeaweedFileSystem(option *Option) *WFS { glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err) } }) - startTime := time.Now() - go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) grace.OnInterrupt(func() { wfs.metaCache.Shutdown() }) @@ -139,6 +139,11 @@ func NewSeaweedFileSystem(option *Option) *WFS { return wfs } +func (wfs *WFS) StartBackgroundTasks() { + startTime := time.Now() + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) +} + func (wfs *WFS) Root() (fs.Node, error) { return wfs.root, nil } diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go index 92e43b675..473805116 100644 --- a/weed/filesys/xattr.go +++ b/weed/filesys/xattr.go @@ -113,6 +113,21 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err err fullpath := util.NewFullPath(dir, name) // glog.V(3).Infof("read entry cache miss %s", fullpath) + // return a valid entry for the mount root + if string(fullpath) == wfs.option.FilerMountRootPath { + return &filer_pb.Entry{ + Name: wfs.option.FilerMountRootPath, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: wfs.option.MountMtime.Unix(), + FileMode: uint32(wfs.option.MountMode), + Uid: wfs.option.MountUid, + Gid: wfs.option.MountGid, + Crtime: wfs.option.MountCtime.Unix(), + }, + }, nil + } + // read from async meta cache meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir)) cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath) |
