diff options
| author | yourchanges <yourchanges@gmail.com> | 2020-07-10 09:44:32 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-07-10 09:44:32 +0800 |
| commit | e67096656b0fcdc313c7d8983b6ce36a54d794a3 (patch) | |
| tree | 4d6cfd722cf6e19b5aa8253e477ddc596ea5e193 /weed/filesys/wfs.go | |
| parent | 2b3cef7780a5e91d2072a33411926f9b30c88ee2 (diff) | |
| parent | 1b680c06c1de27e6a3899c089ec354a9eb08ea44 (diff) | |
| download | seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.tar.xz seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.zip | |
Merge pull request #1 from chrislusf/master
update
Diffstat (limited to 'weed/filesys/wfs.go')
| -rw-r--r-- | weed/filesys/wfs.go | 136 |
1 files changed, 77 insertions, 59 deletions
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 969514a06..8dffa6555 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -5,32 +5,48 @@ import ( "fmt" "math" "os" + "path" "sync" "time" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/util/grace" + + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/karlseguin/ccache" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" - "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" ) type Option struct { FilerGrpcAddress string + GrpcDialOption grpc.DialOption FilerMountRootPath string Collection string Replication string TtlSec int32 ChunkSizeLimit int64 + CacheDir string + CacheSizeMB int64 DataCenter string - DirListingLimit int + DirListCacheLimit int64 EntryCacheTtl time.Duration + Umask os.FileMode + + MountUid uint32 + MountGid uint32 + MountMode os.FileMode + MountCtime time.Time + MountMtime time.Time + + OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers + Cipher bool // whether encrypt data on volume server - MountUid uint32 - MountGid uint32 - MountMode os.FileMode } var _ = fs.FS(&WFS{}) @@ -38,17 +54,20 @@ var _ = fs.FSStatfser(&WFS{}) type WFS struct { option *Option - listDirectoryEntriesCache *ccache.Cache - // contains all open handles - handles []*FileHandle - pathToHandleIndex map[string]int - pathToHandleLock sync.Mutex - bufPool sync.Pool + // contains all open handles, protected by handlesLock + handlesLock sync.Mutex + handles map[uint64]*FileHandle - fileIdsDeletionChan chan []string + bufPool sync.Pool stats statsCache + + root fs.Node + fsNodeCache *FsCache + + chunkCache *chunk_cache.ChunkCache + metaCache *meta_cache.MetaCache } type statsCache struct { filer_pb.StatisticsResponse @@ -58,74 +77,73 @@ type statsCache struct { func NewSeaweedFileSystem(option *Option) *WFS { wfs := &WFS{ option: option, - listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(1024 * 8).ItemsToPrune(100)), - pathToHandleIndex: make(map[string]int), + handles: make(map[uint64]*FileHandle), bufPool: sync.Pool{ New: func() interface{} { return make([]byte, option.ChunkSizeLimit) }, }, - fileIdsDeletionChan: make(chan []string, 32), + } + cacheUniqueId := util.Md5([]byte(option.FilerGrpcAddress))[0:4] + cacheDir := path.Join(option.CacheDir, cacheUniqueId) + if option.CacheSizeMB > 0 { + os.MkdirAll(cacheDir, 0755) + wfs.chunkCache = chunk_cache.NewChunkCache(256, cacheDir, option.CacheSizeMB) + grace.OnInterrupt(func() { + wfs.chunkCache.Shutdown() + }) + } + + wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta")) + startTime := time.Now() + if err := meta_cache.InitMetaCache(wfs.metaCache, wfs, wfs.option.FilerMountRootPath); err != nil { + glog.V(0).Infof("failed to init meta cache: %v", err) + } else { + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) + grace.OnInterrupt(func() { + wfs.metaCache.Shutdown() + }) } - go wfs.loopProcessingDeletion() + wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} + wfs.fsNodeCache = newFsCache(wfs.root) return wfs } func (wfs *WFS) Root() (fs.Node, error) { - return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil -} - -func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) - }, wfs.option.FilerGrpcAddress) - + return wfs.root, nil } func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) { - wfs.pathToHandleLock.Lock() - defer wfs.pathToHandleLock.Unlock() fullpath := file.fullpath() + glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid) - index, found := wfs.pathToHandleIndex[fullpath] - if found && wfs.handles[index] != nil { - glog.V(2).Infoln(fullpath, "found fileHandle id", index) - return wfs.handles[index] - } + wfs.handlesLock.Lock() + defer wfs.handlesLock.Unlock() - fileHandle = newFileHandle(file, uid, gid) - for i, h := range wfs.handles { - if h == nil { - wfs.handles[i] = fileHandle - fileHandle.handle = uint64(i) - wfs.pathToHandleIndex[fullpath] = i - glog.V(4).Infoln(fullpath, "reuse fileHandle id", fileHandle.handle) - return - } + inodeId := file.fullpath().AsInode() + existingHandle, found := wfs.handles[inodeId] + if found && existingHandle != nil { + return existingHandle } - wfs.handles = append(wfs.handles, fileHandle) - fileHandle.handle = uint64(len(wfs.handles) - 1) - glog.V(2).Infoln(fullpath, "new fileHandle id", fileHandle.handle) - wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle) + fileHandle = newFileHandle(file, uid, gid) + wfs.handles[inodeId] = fileHandle + fileHandle.handle = inodeId + glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle) return } -func (wfs *WFS) ReleaseHandle(fullpath string, handleId fuse.HandleID) { - wfs.pathToHandleLock.Lock() - defer wfs.pathToHandleLock.Unlock() +func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) { + wfs.handlesLock.Lock() + defer wfs.handlesLock.Unlock() - glog.V(4).Infof("%s releasing handle id %d current handles length %d", fullpath, handleId, len(wfs.handles)) - delete(wfs.pathToHandleIndex, fullpath) - if int(handleId) < len(wfs.handles) { - wfs.handles[int(handleId)] = nil - } + glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles)) + + delete(wfs.handles, fullpath.AsInode()) return } @@ -137,7 +155,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. if wfs.stats.lastChecked < time.Now().Unix()-20 { - err := wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.StatisticsRequest{ Collection: wfs.option.Collection, @@ -146,7 +164,7 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. } glog.V(4).Infof("reading filer stats: %+v", request) - resp, err := client.Statistics(ctx, request) + resp, err := client.Statistics(context.Background(), request) if err != nil { glog.V(0).Infof("reading filer stats %v: %v", request, err) return err |
