aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys/wfs.go
diff options
context:
space:
mode:
authorshibinbin <shibinbin@megvii.com>2020-10-28 11:36:42 +0800
committershibinbin <shibinbin@megvii.com>2020-10-28 11:36:42 +0800
commit7cc07655d493d11c967cfa978ddc5181d4b6b861 (patch)
tree5ae5bcf7ccc3cee3c55372674753d7c1ca48dff9 /weed/filesys/wfs.go
parent29a4c3944eeb07434060df52dfb1d3cf4c59dc91 (diff)
parent53c3aad87528d57343afc5fdb3fb5107544af0fc (diff)
downloadseaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.tar.xz
seaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.zip
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/filesys/wfs.go')
-rw-r--r--weed/filesys/wfs.go130
1 files changed, 50 insertions, 80 deletions
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 2b0ef64c2..759e21b15 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -6,22 +6,21 @@ import (
"math"
"os"
"path"
- "strings"
"sync"
"time"
- "github.com/chrislusf/seaweedfs/weed/util/grace"
- "github.com/karlseguin/ccache"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
+ "github.com/seaweedfs/fuse"
+ "github.com/seaweedfs/fuse/fs"
+
"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
)
type Option struct {
@@ -35,7 +34,6 @@ type Option struct {
CacheDir string
CacheSizeMB int64
DataCenter string
- DirListCacheLimit int64
EntryCacheTtl time.Duration
Umask os.FileMode
@@ -47,16 +45,14 @@ type Option struct {
OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
Cipher bool // whether encrypt data on volume server
- AsyncMetaDataCaching bool // whether asynchronously cache meta data
-
+ UidGidMapper *meta_cache.UidGidMapper
}
var _ = fs.FS(&WFS{})
var _ = fs.FSStatfser(&WFS{})
type WFS struct {
- option *Option
- listDirectoryEntriesCache *ccache.Cache
+ option *Option
// contains all open handles, protected by handlesLock
handlesLock sync.Mutex
@@ -69,8 +65,9 @@ type WFS struct {
root fs.Node
fsNodeCache *FsCache
- chunkCache *chunk_cache.ChunkCache
+ chunkCache *chunk_cache.TieredChunkCache
metaCache *meta_cache.MetaCache
+ signature int32
}
type statsCache struct {
filer_pb.StatisticsResponse
@@ -79,36 +76,38 @@ type statsCache struct {
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
- option: option,
- listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
- handles: make(map[uint64]*FileHandle),
+ option: option,
+ handles: make(map[uint64]*FileHandle),
bufPool: sync.Pool{
New: func() interface{} {
return make([]byte, option.ChunkSizeLimit)
},
},
+ signature: util.RandomInt32(),
}
+ cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
+ cacheDir := path.Join(option.CacheDir, cacheUniqueId)
if option.CacheSizeMB > 0 {
- os.MkdirAll(option.CacheDir, 0755)
- wfs.chunkCache = chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB)
- grace.OnInterrupt(func() {
- wfs.chunkCache.Shutdown()
- })
- }
- if wfs.option.AsyncMetaDataCaching {
- wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta"))
- startTime := time.Now()
- if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil {
- glog.V(0).Infof("failed to init meta cache: %v", err)
- } else {
- go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
- grace.OnInterrupt(func() {
- wfs.metaCache.Shutdown()
- })
- }
+ os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
+ wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
}
- wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
+ wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
+ fsNode := wfs.fsNodeCache.GetFsNode(filePath)
+ if fsNode != nil {
+ if file, ok := fsNode.(*File); ok {
+ file.entry = nil
+ }
+ }
+ })
+ startTime := time.Now()
+ go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
+ grace.OnInterrupt(func() {
+ wfs.metaCache.Shutdown()
+ })
+
+ entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath))
+ wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
wfs.fsNodeCache = newFsCache(wfs.root)
return wfs
@@ -118,40 +117,29 @@ func (wfs *WFS) Root() (fs.Node, error) {
return wfs.root, nil
}
-var _ = filer_pb.FilerClient(&WFS{})
-
-func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
-
- err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
- }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
-
- if err == nil {
- return nil
- }
- return err
-
-}
-
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
fullpath := file.fullpath()
- glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid)
+ glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
wfs.handlesLock.Lock()
defer wfs.handlesLock.Unlock()
inodeId := file.fullpath().AsInode()
- existingHandle, found := wfs.handles[inodeId]
- if found && existingHandle != nil {
- return existingHandle
+ if file.isOpen > 0 {
+ existingHandle, found := wfs.handles[inodeId]
+ if found && existingHandle != nil {
+ file.isOpen++
+ return existingHandle
+ }
}
fileHandle = newFileHandle(file, uid, gid)
+ file.maybeLoadEntry(context.Background())
+ file.isOpen++
+
wfs.handles[inodeId] = fileHandle
fileHandle.handle = inodeId
- glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
return
}
@@ -229,33 +217,15 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
return nil
}
-func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry {
- item := wfs.listDirectoryEntriesCache.Get(string(path))
- if item != nil && !item.Expired() {
- return item.Value().(*filer_pb.Entry)
+func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
+ if entry.Attributes == nil {
+ return
}
- return nil
+ entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
}
-func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) {
- if entry == nil {
- wfs.listDirectoryEntriesCache.Delete(string(path))
- } else {
- wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl)
+func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
+ if entry.Attributes == nil {
+ return
}
-}
-func (wfs *WFS) cacheDelete(path util.FullPath) {
- wfs.listDirectoryEntriesCache.Delete(string(path))
-}
-
-func (wfs *WFS) AdjustedUrl(hostAndPort string) string {
- if !wfs.option.OutsideContainerClusterMode {
- return hostAndPort
- }
- commaIndex := strings.Index(hostAndPort, ":")
- if commaIndex < 0 {
- return hostAndPort
- }
- filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":")
- return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:])
-
+ entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
}