aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys/wfs.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys/wfs.go')
-rw-r--r--weed/filesys/wfs.go186
1 files changed, 132 insertions, 54 deletions
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index e924783ec..42816d23d 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -3,32 +3,44 @@ package filesys
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"math"
"os"
+ "path"
"sync"
"time"
- "github.com/karlseguin/ccache"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
+ "github.com/seaweedfs/fuse"
+ "github.com/seaweedfs/fuse/fs"
+
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
+ "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
)
type Option struct {
+ MountDirectory string
+ FilerAddress string
FilerGrpcAddress string
GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
TtlSec int32
+ DiskType types.DiskType
ChunkSizeLimit int64
+ ConcurrentWriters int
+ CacheDir string
+ CacheSizeMB int64
DataCenter string
- DirListCacheLimit int64
- EntryCacheTtl time.Duration
Umask os.FileMode
MountUid uint32
@@ -36,22 +48,36 @@ type Option struct {
MountMode os.FileMode
MountCtime time.Time
MountMtime time.Time
+
+ VolumeServerAccess string // how to access volume servers
+ Cipher bool // whether encrypt data on volume server
+ UidGidMapper *meta_cache.UidGidMapper
}
var _ = fs.FS(&WFS{})
var _ = fs.FSStatfser(&WFS{})
type WFS struct {
- option *Option
- listDirectoryEntriesCache *ccache.Cache
+ option *Option
+
+ // contains all open handles, protected by handlesLock
+ handlesLock sync.Mutex
+ handles map[uint64]*FileHandle
- // contains all open handles
- handles []*FileHandle
- pathToHandleIndex map[string]int
- pathToHandleLock sync.Mutex
- bufPool sync.Pool
+ bufPool sync.Pool
stats statsCache
+
+ root fs.Node
+ fsNodeCache *FsCache
+
+ chunkCache *chunk_cache.TieredChunkCache
+ metaCache *meta_cache.MetaCache
+ signature int32
+
+ // throttle writers
+ concurrentWriters *util.LimitedConcurrentExecutor
+ Server *fs.Server
}
type statsCache struct {
filer_pb.StatisticsResponse
@@ -60,72 +86,92 @@ type statsCache struct {
func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
- option: option,
- listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
- pathToHandleIndex: make(map[string]int),
+ option: option,
+ handles: make(map[uint64]*FileHandle),
bufPool: sync.Pool{
New: func() interface{} {
return make([]byte, option.ChunkSizeLimit)
},
},
+ signature: util.RandomInt32(),
+ }
+ cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:8]
+ cacheDir := path.Join(option.CacheDir, cacheUniqueId)
+ if option.CacheSizeMB > 0 {
+ os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
+ wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
}
- return wfs
-}
+ wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
-func (wfs *WFS) Root() (fs.Node, error) {
- return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil
-}
+ fsNode := NodeWithId(filePath.AsInode())
+ if err := wfs.Server.InvalidateNodeData(fsNode); err != nil {
+ glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
+ }
-func (wfs *WFS) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
+ dir, name := filePath.DirAndName()
+ parent := NodeWithId(util.FullPath(dir).AsInode())
+ if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
+ glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
+ }
+ })
+ startTime := time.Now()
+ go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
+ grace.OnInterrupt(func() {
+ wfs.metaCache.Shutdown()
+ })
+
+ wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
+ wfs.fsNodeCache = newFsCache(wfs.root)
+
+ if wfs.option.ConcurrentWriters > 0 {
+ wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
+ }
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
- }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
+ return wfs
+}
+func (wfs *WFS) Root() (fs.Node, error) {
+ return wfs.root, nil
}
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
- wfs.pathToHandleLock.Lock()
- defer wfs.pathToHandleLock.Unlock()
fullpath := file.fullpath()
+ glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
+
+ inodeId := file.Id()
- index, found := wfs.pathToHandleIndex[fullpath]
- if found && wfs.handles[index] != nil {
- glog.V(2).Infoln(fullpath, "found fileHandle id", index)
- return wfs.handles[index]
+ wfs.handlesLock.Lock()
+ existingHandle, found := wfs.handles[inodeId]
+ wfs.handlesLock.Unlock()
+ if found && existingHandle != nil {
+ existingHandle.f.isOpen++
+ glog.V(4).Infof("Acquired Handle %s open %d", fullpath, existingHandle.f.isOpen)
+ return existingHandle
}
+ entry, _ := file.maybeLoadEntry(context.Background())
+ file.entry = entry
fileHandle = newFileHandle(file, uid, gid)
- for i, h := range wfs.handles {
- if h == nil {
- wfs.handles[i] = fileHandle
- fileHandle.handle = uint64(i)
- wfs.pathToHandleIndex[fullpath] = i
- glog.V(4).Infoln(fullpath, "reuse fileHandle id", fileHandle.handle)
- return
- }
- }
+ file.isOpen++
- wfs.handles = append(wfs.handles, fileHandle)
- fileHandle.handle = uint64(len(wfs.handles) - 1)
- glog.V(2).Infoln(fullpath, "new fileHandle id", fileHandle.handle)
- wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
+ wfs.handlesLock.Lock()
+ wfs.handles[inodeId] = fileHandle
+ wfs.handlesLock.Unlock()
+ fileHandle.handle = inodeId
+ glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen)
return
}
-func (wfs *WFS) ReleaseHandle(fullpath string, handleId fuse.HandleID) {
- wfs.pathToHandleLock.Lock()
- defer wfs.pathToHandleLock.Unlock()
+func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
+ wfs.handlesLock.Lock()
+ defer wfs.handlesLock.Unlock()
- glog.V(4).Infof("%s releasing handle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
- delete(wfs.pathToHandleIndex, fullpath)
- if int(handleId) < len(wfs.handles) {
- wfs.handles[int(handleId)] = nil
- }
+ glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles))
+
+ delete(wfs.handles, uint64(handleId))
return
}
@@ -137,16 +183,17 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
if wfs.stats.lastChecked < time.Now().Unix()-20 {
- err := wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.StatisticsRequest{
Collection: wfs.option.Collection,
Replication: wfs.option.Replication,
Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
+ DiskType: string(wfs.option.DiskType),
}
glog.V(4).Infof("reading filer stats: %+v", request)
- resp, err := client.Statistics(ctx, request)
+ resp, err := client.Statistics(context.Background(), request)
if err != nil {
glog.V(0).Infof("reading filer stats %v: %v", request, err)
return err
@@ -191,3 +238,34 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
return nil
}
+
+func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
+ if entry.Attributes == nil {
+ return
+ }
+ entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
+}
+func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
+ if entry.Attributes == nil {
+ return
+ }
+ entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
+}
+
+func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
+ if wfs.option.VolumeServerAccess == "filerProxy" {
+ return func(fileId string) (targetUrls []string, err error) {
+ return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
+ }
+ }
+ return filer.LookupFn(wfs)
+
+}
+
+type NodeWithId uint64
+func (n NodeWithId) Id() uint64 {
+ return uint64(n)
+}
+func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error {
+ return nil
+}