diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-10-28 11:36:42 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-10-28 11:36:42 +0800 |
| commit | 7cc07655d493d11c967cfa978ddc5181d4b6b861 (patch) | |
| tree | 5ae5bcf7ccc3cee3c55372674753d7c1ca48dff9 /weed/filesys/wfs.go | |
| parent | 29a4c3944eeb07434060df52dfb1d3cf4c59dc91 (diff) | |
| parent | 53c3aad87528d57343afc5fdb3fb5107544af0fc (diff) | |
| download | seaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.tar.xz seaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/filesys/wfs.go')
| -rw-r--r-- | weed/filesys/wfs.go | 130 |
1 files changed, 50 insertions, 80 deletions
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 2b0ef64c2..759e21b15 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -6,22 +6,21 @@ import ( "math" "os" "path" - "strings" "sync" "time" - "github.com/chrislusf/seaweedfs/weed/util/grace" - "github.com/karlseguin/ccache" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/util/grace" + + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" ) type Option struct { @@ -35,7 +34,6 @@ type Option struct { CacheDir string CacheSizeMB int64 DataCenter string - DirListCacheLimit int64 EntryCacheTtl time.Duration Umask os.FileMode @@ -47,16 +45,14 @@ type Option struct { OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers Cipher bool // whether encrypt data on volume server - AsyncMetaDataCaching bool // whether asynchronously cache meta data - + UidGidMapper *meta_cache.UidGidMapper } var _ = fs.FS(&WFS{}) var _ = fs.FSStatfser(&WFS{}) type WFS struct { - option *Option - listDirectoryEntriesCache *ccache.Cache + option *Option // contains all open handles, protected by handlesLock handlesLock sync.Mutex @@ -69,8 +65,9 @@ type WFS struct { root fs.Node fsNodeCache *FsCache - chunkCache *chunk_cache.ChunkCache + chunkCache *chunk_cache.TieredChunkCache metaCache *meta_cache.MetaCache + signature int32 } type statsCache struct { filer_pb.StatisticsResponse @@ -79,36 +76,38 @@ type statsCache struct { func NewSeaweedFileSystem(option *Option) *WFS { wfs := &WFS{ - option: option, - listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)), - handles: make(map[uint64]*FileHandle), + option: option, + handles: make(map[uint64]*FileHandle), bufPool: sync.Pool{ New: func() interface{} { return make([]byte, option.ChunkSizeLimit) }, }, + signature: util.RandomInt32(), } + cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4] + cacheDir := path.Join(option.CacheDir, cacheUniqueId) if option.CacheSizeMB > 0 { - os.MkdirAll(option.CacheDir, 0755) - wfs.chunkCache = chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB) - grace.OnInterrupt(func() { - wfs.chunkCache.Shutdown() - }) - } - if wfs.option.AsyncMetaDataCaching { - wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.CacheDir, "meta")) - startTime := time.Now() - if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil { - glog.V(0).Infof("failed to init meta cache: %v", err) - } else { - go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) - grace.OnInterrupt(func() { - wfs.metaCache.Shutdown() - }) - } + os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask) + wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024) } - wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} + wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) { + fsNode := wfs.fsNodeCache.GetFsNode(filePath) + if fsNode != nil { + if file, ok := fsNode.(*File); ok { + file.entry = nil + } + } + }) + startTime := time.Now() + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) + grace.OnInterrupt(func() { + wfs.metaCache.Shutdown() + }) + + entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath)) + wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry} wfs.fsNodeCache = newFsCache(wfs.root) return wfs @@ -118,40 +117,29 @@ func (wfs *WFS) Root() (fs.Node, error) { return wfs.root, nil } -var _ = filer_pb.FilerClient(&WFS{}) - -func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - - err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) - }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) - - if err == nil { - return nil - } - return err - -} - func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) { fullpath := file.fullpath() - glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid) + glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid) wfs.handlesLock.Lock() defer wfs.handlesLock.Unlock() inodeId := file.fullpath().AsInode() - existingHandle, found := wfs.handles[inodeId] - if found && existingHandle != nil { - return existingHandle + if file.isOpen > 0 { + existingHandle, found := wfs.handles[inodeId] + if found && existingHandle != nil { + file.isOpen++ + return existingHandle + } } fileHandle = newFileHandle(file, uid, gid) + file.maybeLoadEntry(context.Background()) + file.isOpen++ + wfs.handles[inodeId] = fileHandle fileHandle.handle = inodeId - glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle) return } @@ -229,33 +217,15 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. return nil } -func (wfs *WFS) cacheGet(path util.FullPath) *filer_pb.Entry { - item := wfs.listDirectoryEntriesCache.Get(string(path)) - if item != nil && !item.Expired() { - return item.Value().(*filer_pb.Entry) +func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) { + if entry.Attributes == nil { + return } - return nil + entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid) } -func (wfs *WFS) cacheSet(path util.FullPath, entry *filer_pb.Entry, ttl time.Duration) { - if entry == nil { - wfs.listDirectoryEntriesCache.Delete(string(path)) - } else { - wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl) +func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) { + if entry.Attributes == nil { + return } -} -func (wfs *WFS) cacheDelete(path util.FullPath) { - wfs.listDirectoryEntriesCache.Delete(string(path)) -} - -func (wfs *WFS) AdjustedUrl(hostAndPort string) string { - if !wfs.option.OutsideContainerClusterMode { - return hostAndPort - } - commaIndex := strings.Index(hostAndPort, ":") - if commaIndex < 0 { - return hostAndPort - } - filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":") - return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:]) - + entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid) } |
