aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir.go129
-rw-r--r--weed/filesys/dirty_page.go40
-rw-r--r--weed/filesys/file.go9
-rw-r--r--weed/filesys/filehandle.go45
-rw-r--r--weed/filesys/meta_cache/meta_cache.go24
-rw-r--r--weed/filesys/meta_cache/meta_cache_init.go7
-rw-r--r--weed/filesys/wfs.go36
-rw-r--r--weed/filesys/wfs_deletion.go84
-rw-r--r--weed/filesys/wfs_filer_client.go2
-rw-r--r--weed/filesys/wfs_write.go3
10 files changed, 172 insertions, 207 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index a8481a435..10a0a2b44 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -6,6 +6,7 @@ import (
"math"
"os"
"strings"
+ "syscall"
"time"
"github.com/seaweedfs/fuse"
@@ -57,7 +58,7 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
return err
}
- attr.Inode = util.FullPath(dir.FullPath()).AsInode()
+ // attr.Inode = util.FullPath(dir.FullPath()).AsInode()
attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir
attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0)
attr.Crtime = time.Unix(dir.entry.Attributes.Crtime, 0)
@@ -81,8 +82,8 @@ func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *f
}
func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
- attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode()
- attr.Valid = time.Hour
+ // attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode()
+ attr.Valid = time.Second
attr.Uid = dir.wfs.option.MountUid
attr.Gid = dir.wfs.option.MountGid
attr.Mode = dir.wfs.option.MountMode
@@ -90,7 +91,7 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
attr.Ctime = dir.wfs.option.MountCtime
attr.Mtime = dir.wfs.option.MountMtime
attr.Atime = dir.wfs.option.MountMtime
- attr.BlockSize = 1024 * 1024
+ attr.BlockSize = blockSize
}
func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
@@ -127,28 +128,59 @@ func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.N
func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
resp *fuse.CreateResponse) (fs.Node, fs.Handle, error) {
+ request, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, req.Flags&fuse.OpenExclusive != 0)
+
+ if err != nil {
+ return nil, nil, err
+ }
+ var node fs.Node
+ if request.Entry.IsDirectory {
+ node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), request.Entry)
+ return node, nil, nil
+ }
+
+ node = dir.newFile(req.Name, request.Entry)
+ file := node.(*File)
+ fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
+ return file, fh, nil
+
+}
+
+func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) {
+
+ request, err := dir.doCreateEntry(req.Name, req.Mode, req.Uid, req.Gid, false)
+
+ if err != nil {
+ return nil, err
+ }
+ var node fs.Node
+ node = dir.newFile(req.Name, request.Entry)
+ return node, nil
+}
+
+func (dir *Dir) doCreateEntry(name string, mode os.FileMode, uid, gid uint32, exlusive bool) (*filer_pb.CreateEntryRequest, error) {
request := &filer_pb.CreateEntryRequest{
Directory: dir.FullPath(),
Entry: &filer_pb.Entry{
- Name: req.Name,
- IsDirectory: req.Mode&os.ModeDir > 0,
+ Name: name,
+ IsDirectory: mode&os.ModeDir > 0,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
- FileMode: uint32(req.Mode &^ dir.wfs.option.Umask),
- Uid: req.Uid,
- Gid: req.Gid,
+ FileMode: uint32(mode &^ dir.wfs.option.Umask),
+ Uid: uid,
+ Gid: gid,
Collection: dir.wfs.option.Collection,
Replication: dir.wfs.option.Replication,
TtlSec: dir.wfs.option.TtlSec,
},
},
- OExcl: req.Flags&fuse.OpenExclusive != 0,
+ OExcl: exlusive,
Signatures: []int32{dir.wfs.signature},
}
- glog.V(1).Infof("create %s/%s: %v", dir.FullPath(), req.Name, req.Flags)
+ glog.V(1).Infof("create %s/%s", dir.FullPath(), name)
- if err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
@@ -157,41 +189,15 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
if strings.Contains(err.Error(), "EEXIST") {
return fuse.EEXIST
}
- glog.V(0).Infof("create %s/%s: %v", dir.FullPath(), req.Name, err)
+ glog.V(0).Infof("create %s/%s: %v", dir.FullPath(), name, err)
return fuse.EIO
}
dir.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
- }); err != nil {
- return nil, nil, err
- }
- var node fs.Node
- if request.Entry.IsDirectory {
- node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name), request.Entry)
- return node, nil, nil
- }
-
- node = dir.newFile(req.Name, request.Entry)
- file := node.(*File)
- fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
- return file, fh, nil
-
-}
-
-func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) {
- if req.Mode&os.ModeNamedPipe != 0 {
- glog.V(1).Infof("mknod named pipe %s", req.String())
- return nil, fuse.ENOSYS
- }
- if req.Mode&req.Mode&os.ModeSocket != 0 {
- glog.V(1).Infof("mknod socket %s", req.String())
- return nil, fuse.ENOSYS
- }
- // not going to support mknod for normal files either
- glog.V(1).Infof("mknod %s", req.String())
- return nil, fuse.ENOSYS
+ })
+ return request, err
}
func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
@@ -279,7 +285,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
}
// resp.EntryValid = time.Second
- resp.Attr.Inode = fullFilePath.AsInode()
+ // resp.Attr.Inode = fullFilePath.AsInode()
resp.Attr.Valid = time.Second
resp.Attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
resp.Attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
@@ -302,13 +308,11 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
glog.V(4).Infof("dir ReadDirAll %s", dir.FullPath())
processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error {
- fullpath := util.NewFullPath(dir.FullPath(), entry.Name)
- inode := fullpath.AsInode()
if entry.IsDirectory {
- dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_Dir}
+ dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir}
ret = append(ret, dirent)
} else {
- dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_File}
+ dirent := fuse.Dirent{Name: entry.Name, Type: findFileType(uint16(entry.Attributes.FileMode))}
ret = append(ret, dirent)
}
return nil
@@ -319,17 +323,37 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return nil, fuse.EIO
}
- listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(math.MaxInt32))
+ listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
+ processEachEntryFn(entry.ToProtoEntry(), false)
+ return true
+ })
if listErr != nil {
glog.Errorf("list meta cache: %v", listErr)
return nil, fuse.EIO
}
- for _, cachedEntry := range listedEntries {
- processEachEntryFn(cachedEntry.ToProtoEntry(), false)
- }
return
}
+func findFileType(mode uint16) fuse.DirentType {
+ switch mode & (syscall.S_IFMT & 0xffff) {
+ case syscall.S_IFSOCK:
+ return fuse.DT_Socket
+ case syscall.S_IFLNK:
+ return fuse.DT_Link
+ case syscall.S_IFREG:
+ return fuse.DT_File
+ case syscall.S_IFBLK:
+ return fuse.DT_Block
+ case syscall.S_IFDIR:
+ return fuse.DT_Dir
+ case syscall.S_IFCHR:
+ return fuse.DT_Char
+ case syscall.S_IFIFO:
+ return fuse.DT_FIFO
+ }
+ return fuse.DT_File
+}
+
func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
if !req.Dir {
@@ -378,11 +402,6 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
inodeId := util.NewFullPath(dir.FullPath(), req.Name).AsInode()
delete(dir.wfs.handles, inodeId)
- // delete the chunks last
- if isDeleteData {
- dir.wfs.deleteFileChunks(entry.Chunks)
- }
-
return nil
}
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 11089186f..f05a3a56a 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -3,7 +3,6 @@ package filesys
import (
"bytes"
"io"
- "runtime"
"sync"
"time"
@@ -12,30 +11,20 @@ import (
)
type ContinuousDirtyPages struct {
- intervals *ContinuousIntervals
- f *File
- writeWaitGroup sync.WaitGroup
- chunkAddLock sync.Mutex
- chunkSaveErrChan chan error
- chunkSaveErrChanClosed bool
- lastErr error
- collection string
- replication string
+ intervals *ContinuousIntervals
+ f *File
+ writeWaitGroup sync.WaitGroup
+ chunkAddLock sync.Mutex
+ lastErr error
+ collection string
+ replication string
}
func newDirtyPages(file *File) *ContinuousDirtyPages {
dirtyPages := &ContinuousDirtyPages{
- intervals: &ContinuousIntervals{},
- f: file,
- chunkSaveErrChan: make(chan error, runtime.NumCPU()),
+ intervals: &ContinuousIntervals{},
+ f: file,
}
- go func() {
- for t := range dirtyPages.chunkSaveErrChan {
- if t != nil {
- dirtyPages.lastErr = t
- }
- }
- }()
return dirtyPages
}
@@ -94,15 +83,6 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
- errChanSize := pages.f.wfs.option.ConcurrentWriters
- if errChanSize == 0 {
- errChanSize = runtime.NumCPU()
- }
- if pages.chunkSaveErrChanClosed {
- pages.chunkSaveErrChan = make(chan error, errChanSize)
- pages.chunkSaveErrChanClosed = false
- }
-
mtime := time.Now().UnixNano()
pages.writeWaitGroup.Add(1)
writer := func() {
@@ -112,7 +92,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
- pages.chunkSaveErrChan <- err
+ pages.lastErr = err
return
}
chunk.Mtime = mtime
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 3bffa156e..a210c5152 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -45,7 +45,7 @@ func (file *File) fullpath() util.FullPath {
func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
- glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr)
+ glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr)
entry := file.entry
if file.isOpen <= 0 || entry == nil {
@@ -54,7 +54,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
}
}
- attr.Inode = file.fullpath().AsInode()
+ // attr.Inode = file.fullpath().AsInode()
attr.Valid = time.Second
attr.Mode = os.FileMode(entry.Attributes.FileMode)
attr.Size = filer.FileSize(entry)
@@ -144,9 +144,8 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
}
}
file.entry.Chunks = chunks
- file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), chunks)
+ file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), chunks)
file.reader = nil
- file.wfs.deleteFileChunks(truncatedChunks)
}
file.entry.Attributes.FileSize = req.Size
file.dirtyMetadata = true
@@ -326,7 +325,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
func (file *File) setEntry(entry *filer_pb.Entry) {
file.entry = entry
- file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), entry.Chunks)
+ file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(file.wfs.LookupFn(), entry.Chunks)
file.reader = nil
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index c273eec8a..85f64d292 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -72,7 +72,7 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus
}
totalRead, err := fh.readFromChunks(buff, req.Offset)
- if err == nil {
+ if err == nil || err == io.EOF {
maxStop := fh.readFromDirtyPages(buff, req.Offset)
totalRead = max(maxStop-req.Offset, totalRead)
}
@@ -90,8 +90,9 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus
glog.Warningf("%s FileHandle Read %d: [%d,%d) size %d totalRead %d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, totalRead)
totalRead = min(int64(len(buff)), totalRead)
}
- // resp.Data = buff[:totalRead]
- resp.Data = buff
+ if err == nil {
+ resp.Data = buff[:totalRead]
+ }
return err
}
@@ -118,19 +119,21 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
var chunkResolveErr error
if fh.f.entryViewCache == nil {
- fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(filer.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
+ fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(fh.f.wfs.LookupFn(), fh.f.entry.Chunks)
if chunkResolveErr != nil {
return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
}
fh.f.reader = nil
}
- if fh.f.reader == nil {
+ reader := fh.f.reader
+ if reader == nil {
chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64)
- fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache, fileSize)
+ reader = filer.NewChunkReaderAtFromClient(fh.f.wfs.LookupFn(), chunkViews, fh.f.wfs.chunkCache, fileSize)
}
+ fh.f.reader = reader
- totalRead, err := fh.f.reader.ReadAt(buff, offset)
+ totalRead, err := reader.ReadAt(buff, offset)
if err != nil && err != io.EOF {
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
@@ -181,25 +184,20 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
fh.Lock()
defer fh.Unlock()
- fh.f.isOpen--
-
- if fh.f.isOpen < 0 {
+ if fh.f.isOpen <= 0 {
glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0)
fh.f.isOpen = 0
return nil
}
- if fh.f.isOpen == 0 {
+ if fh.f.isOpen == 1 {
if err := fh.doFlush(ctx, req.Header); err != nil {
glog.Errorf("Release doFlush %s: %v", fh.f.Name, err)
+ return err
}
- // stop the goroutine
- if !fh.dirtyPages.chunkSaveErrChanClosed {
- fh.dirtyPages.chunkSaveErrChanClosed = true
- close(fh.dirtyPages.chunkSaveErrChan)
- }
+ fh.f.isOpen--
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
if closer, ok := fh.f.reader.(io.Closer); ok {
@@ -213,10 +211,18 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
+ glog.V(4).Infof("Flush %v fh %d", fh.f.fullpath(), fh.handle)
+
fh.Lock()
defer fh.Unlock()
- return fh.doFlush(ctx, req.Header)
+ if err := fh.doFlush(ctx, req.Header); err != nil {
+ glog.Errorf("Flush doFlush %s: %v", fh.f.Name, err)
+ return err
+ }
+
+ glog.V(4).Infof("Flush %v fh %d success", fh.f.fullpath(), fh.handle)
+ return nil
}
func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
@@ -229,7 +235,8 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
fh.dirtyPages.writeWaitGroup.Wait()
if fh.dirtyPages.lastErr != nil {
- return fh.dirtyPages.lastErr
+ glog.Errorf("%v doFlush last err: %v", fh.f.fullpath(), fh.dirtyPages.lastErr)
+ return fuse.EIO
}
if !fh.f.dirtyMetadata {
@@ -268,7 +275,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
- chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks)
+ chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
if manifestErr != nil {
// not good, but should be ok
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index 4b282253d..b9d4724c9 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
+ "strings"
"sync"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -29,7 +30,12 @@ func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMa
localStore: openMetaStore(dbFolder),
visitedBoundary: bounded_tree.NewBoundedTree(baseDir),
uidGidMapper: uidGidMapper,
- invalidateFunc: invalidateFunc,
+ invalidateFunc: func(fullpath util.FullPath) {
+ if baseDir != "/" && strings.HasPrefix(string(fullpath), string(baseDir)) {
+ fullpath = fullpath[len(baseDir):]
+ }
+ invalidateFunc(fullpath)
+ },
}
}
@@ -117,22 +123,22 @@ func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err err
return mc.localStore.DeleteEntry(ctx, fp)
}
-func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer.Entry, error) {
+func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
mc.RLock()
defer mc.RUnlock()
if !mc.visitedBoundary.HasVisited(dirPath) {
- return nil, fmt.Errorf("unsynchronized dir: %v", dirPath)
+ return fmt.Errorf("unsynchronized dir: %v", dirPath)
}
- entries, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
- if err != nil {
- return nil, err
- }
- for _, entry := range entries {
+ _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
mc.mapIdFromFilerToLocal(entry)
+ return eachEntryFunc(entry)
+ })
+ if err != nil {
+ return err
}
- return entries, err
+ return err
}
func (mc *MetaCache) Shutdown() {
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
index 4089cea28..1ca3b16d5 100644
--- a/weed/filesys/meta_cache/meta_cache_init.go
+++ b/weed/filesys/meta_cache/meta_cache_init.go
@@ -19,6 +19,9 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
util.Retry("ReadDirAllEntries", func() error {
err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
entry := filer.FromPbEntry(string(dirPath), pbEntry)
+ if IsHiddenSystemEntry(string(dirPath), entry.Name()) {
+ return nil
+ }
if err := mc.doInsertEntry(context.Background(), entry); err != nil {
glog.V(0).Infof("read %s: %v", entry.FullPath, err)
return err
@@ -38,3 +41,7 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
return
})
}
+
+func IsHiddenSystemEntry(dir, name string) bool {
+ return dir == "/" && name == "topics"
+}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index d8f865be7..9443bc0bf 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -4,6 +4,8 @@ import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
"os"
"path"
@@ -25,6 +27,8 @@ import (
)
type Option struct {
+ MountDirectory string
+ FilerAddress string
FilerGrpcAddress string
GrpcDialOption grpc.DialOption
FilerMountRootPath string
@@ -46,9 +50,9 @@ type Option struct {
MountCtime time.Time
MountMtime time.Time
- OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
- Cipher bool // whether encrypt data on volume server
- UidGidMapper *meta_cache.UidGidMapper
+ VolumeServerAccess string // how to access volume servers
+ Cipher bool // whether encrypt data on volume server
+ UidGidMapper *meta_cache.UidGidMapper
}
var _ = fs.FS(&WFS{})
@@ -74,6 +78,7 @@ type WFS struct {
// throttle writers
concurrentWriters *util.LimitedConcurrentExecutor
+ Server *fs.Server
}
type statsCache struct {
filer_pb.StatisticsResponse
@@ -91,7 +96,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
},
signature: util.RandomInt32(),
}
- cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
+ cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:8]
cacheDir := path.Join(option.CacheDir, cacheUniqueId)
if option.CacheSizeMB > 0 {
os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
@@ -102,9 +107,22 @@ func NewSeaweedFileSystem(option *Option) *WFS {
fsNode := wfs.fsNodeCache.GetFsNode(filePath)
if fsNode != nil {
if file, ok := fsNode.(*File); ok {
+ if err := wfs.Server.InvalidateNodeData(file); err != nil {
+ glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
+ }
file.clearEntry()
}
}
+ dir, name := filePath.DirAndName()
+ parent := wfs.root
+ if dir != "/" {
+ parent = wfs.fsNodeCache.GetFsNode(util.FullPath(dir))
+ }
+ if parent != nil {
+ if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
+ glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
+ }
+ }
})
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
@@ -240,3 +258,13 @@ func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
}
entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
}
+
+func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
+ if wfs.option.VolumeServerAccess == "filerProxy" {
+ return func(fileId string) (targetUrls []string, err error) {
+ return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
+ }
+ }
+ return filer.LookupFn(wfs)
+
+}
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
deleted file mode 100644
index a245b6795..000000000
--- a/weed/filesys/wfs_deletion.go
+++ /dev/null
@@ -1,84 +0,0 @@
-package filesys
-
-import (
- "context"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
-)
-
-func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
- if len(chunks) == 0 {
- return
- }
-
- var fileIds []string
- for _, chunk := range chunks {
- if !chunk.IsChunkManifest {
- fileIds = append(fileIds, chunk.GetFileIdString())
- continue
- }
- dataChunks, manifestResolveErr := filer.ResolveOneChunkManifest(filer.LookupFn(wfs), chunk)
- if manifestResolveErr != nil {
- glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
- }
- for _, dChunk := range dataChunks {
- fileIds = append(fileIds, dChunk.GetFileIdString())
- }
- fileIds = append(fileIds, chunk.GetFileIdString())
- }
-
- wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- wfs.deleteFileIds(wfs.option.GrpcDialOption, client, fileIds)
- return nil
- })
-}
-
-func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error {
-
- var vids []string
- for _, fileId := range fileIds {
- vids = append(vids, filer.VolumeId(fileId))
- }
-
- lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
-
- m := make(map[string]operation.LookupResult)
-
- glog.V(4).Infof("deleteFileIds lookup volume id locations: %v", vids)
- resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
- VolumeIds: vids,
- })
- if err != nil {
- return m, err
- }
-
- for _, vid := range vids {
- lr := operation.LookupResult{
- VolumeId: vid,
- Locations: nil,
- }
- locations, found := resp.LocationsMap[vid]
- if !found {
- continue
- }
- for _, loc := range locations.Locations {
- lr.Locations = append(lr.Locations, operation.Location{
- Url: wfs.AdjustedUrl(loc),
- PublicUrl: loc.PublicUrl,
- })
- }
- m[vid] = lr
- }
-
- return m, err
- }
-
- _, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
-
- return err
-}
diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go
index ef4213af1..671d20ba2 100644
--- a/weed/filesys/wfs_filer_client.go
+++ b/weed/filesys/wfs_filer_client.go
@@ -27,7 +27,7 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro
}
func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
- if wfs.option.OutsideContainerClusterMode {
+ if wfs.option.VolumeServerAccess == "publicUrl" {
return location.PublicUrl
}
return location.Url
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
index 92ff5c8da..dbec3bebc 100644
--- a/weed/filesys/wfs_write.go
+++ b/weed/filesys/wfs_write.go
@@ -54,6 +54,9 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFun
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ if wfs.option.VolumeServerAccess == "filerProxy" {
+ fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", wfs.option.FilerAddress, fileId)
+ }
uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)