diff options
Diffstat (limited to 'weed/filesys/wfs.go')
| -rw-r--r-- | weed/filesys/wfs.go | 237 |
1 files changed, 198 insertions, 39 deletions
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 9b1b98ebf..77438b58e 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -1,109 +1,268 @@ package filesys import ( - "bazil.org/fuse" - "bazil.org/fuse/fs" + "context" "fmt" + "math" + "os" + "strings" + "sync" + "time" + + "github.com/karlseguin/ccache" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/karlseguin/ccache" - "sync" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" ) type Option struct { FilerGrpcAddress string + GrpcDialOption grpc.DialOption FilerMountRootPath string Collection string Replication string TtlSec int32 ChunkSizeLimit int64 DataCenter string - DirListingLimit int + DirListCacheLimit int64 + EntryCacheTtl time.Duration + Umask os.FileMode + + MountUid uint32 + MountGid uint32 + MountMode os.FileMode + MountCtime time.Time + MountMtime time.Time + + OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers + Cipher bool // whether encrypt data on volume server + } +var _ = fs.FS(&WFS{}) +var _ = fs.FSStatfser(&WFS{}) + type WFS struct { option *Option listDirectoryEntriesCache *ccache.Cache - // contains all open handles + // contains all open handles, protected by handlesLock + handlesLock sync.Mutex handles []*FileHandle - pathToHandleIndex map[string]int - pathToHandleLock sync.Mutex + pathToHandleIndex map[filer2.FullPath]int + + bufPool sync.Pool + + stats statsCache + + // nodes, protected by nodesLock + nodesLock sync.Mutex + nodes map[uint64]fs.Node + root fs.Node +} +type statsCache struct { + filer_pb.StatisticsResponse + lastChecked int64 // unix time in seconds } func NewSeaweedFileSystem(option *Option) *WFS { - return &WFS{ - option: option, - listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(6000).ItemsToPrune(100)), - pathToHandleIndex: make(map[string]int), + wfs := &WFS{ + option: option, + listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)), + pathToHandleIndex: make(map[filer2.FullPath]int), + bufPool: sync.Pool{ + New: func() interface{} { + return make([]byte, option.ChunkSizeLimit) + }, + }, + nodes: make(map[uint64]fs.Node), } + + wfs.root = &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs} + + return wfs } func (wfs *WFS) Root() (fs.Node, error) { - return &Dir{Path: wfs.option.FilerMountRootPath, wfs: wfs}, nil + return wfs.root, nil } -func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - grpcConnection, err := util.GrpcDial(wfs.option.FilerGrpcAddress) - if err != nil { - return fmt.Errorf("fail to dial %s: %v", wfs.option.FilerGrpcAddress, err) - } - defer grpcConnection.Close() + err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) - client := filer_pb.NewSeaweedFilerClient(grpcConnection) + if err == nil { + return nil + } + return err - return fn(client) } func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) { - wfs.pathToHandleLock.Lock() - defer wfs.pathToHandleLock.Unlock() fullpath := file.fullpath() + glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid) + + wfs.handlesLock.Lock() + defer wfs.handlesLock.Unlock() index, found := wfs.pathToHandleIndex[fullpath] if found && wfs.handles[index] != nil { - glog.V(4).Infoln(fullpath, "found fileHandle id", index) + glog.V(2).Infoln(fullpath, "found fileHandle id", index) return wfs.handles[index] } fileHandle = newFileHandle(file, uid, gid) - - if found && wfs.handles[index] != nil { - glog.V(4).Infoln(fullpath, "reuse previous fileHandle id", index) - wfs.handles[index] = fileHandle - fileHandle.handle = uint64(index) - return - } - for i, h := range wfs.handles { if h == nil { wfs.handles[i] = fileHandle fileHandle.handle = uint64(i) wfs.pathToHandleIndex[fullpath] = i - glog.V(4).Infoln(fullpath, "reuse fileHandle id", fileHandle.handle) + glog.V(4).Infof("%s reuse fh %d", fullpath, fileHandle.handle) return } } wfs.handles = append(wfs.handles, fileHandle) fileHandle.handle = uint64(len(wfs.handles) - 1) - glog.V(4).Infoln(fullpath, "new fileHandle id", fileHandle.handle) wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle) + glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle) return } -func (wfs *WFS) ReleaseHandle(handleId fuse.HandleID) { - wfs.pathToHandleLock.Lock() - defer wfs.pathToHandleLock.Unlock() +func (wfs *WFS) ReleaseHandle(fullpath filer2.FullPath, handleId fuse.HandleID) { + wfs.handlesLock.Lock() + defer wfs.handlesLock.Unlock() - glog.V(4).Infoln("releasing handle id", handleId, "current handles lengh", len(wfs.handles)) + glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles)) + delete(wfs.pathToHandleIndex, fullpath) if int(handleId) < len(wfs.handles) { wfs.handles[int(handleId)] = nil } return } + +// Statfs is called to obtain file system metadata. Implements fuse.FSStatfser +func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error { + + glog.V(4).Infof("reading fs stats: %+v", req) + + if wfs.stats.lastChecked < time.Now().Unix()-20 { + + err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.StatisticsRequest{ + Collection: wfs.option.Collection, + Replication: wfs.option.Replication, + Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec), + } + + glog.V(4).Infof("reading filer stats: %+v", request) + resp, err := client.Statistics(context.Background(), request) + if err != nil { + glog.V(0).Infof("reading filer stats %v: %v", request, err) + return err + } + glog.V(4).Infof("read filer stats: %+v", resp) + + wfs.stats.TotalSize = resp.TotalSize + wfs.stats.UsedSize = resp.UsedSize + wfs.stats.FileCount = resp.FileCount + wfs.stats.lastChecked = time.Now().Unix() + + return nil + }) + if err != nil { + glog.V(0).Infof("filer Statistics: %v", err) + return err + } + } + + totalDiskSize := wfs.stats.TotalSize + usedDiskSize := wfs.stats.UsedSize + actualFileCount := wfs.stats.FileCount + + // Compute the total number of available blocks + resp.Blocks = totalDiskSize / blockSize + + // Compute the number of used blocks + numBlocks := uint64(usedDiskSize / blockSize) + + // Report the number of free and available blocks for the block size + resp.Bfree = resp.Blocks - numBlocks + resp.Bavail = resp.Blocks - numBlocks + resp.Bsize = uint32(blockSize) + + // Report the total number of possible files in the file system (and those free) + resp.Files = math.MaxInt64 + resp.Ffree = math.MaxInt64 - actualFileCount + + // Report the maximum length of a name and the minimum fragment size + resp.Namelen = 1024 + resp.Frsize = uint32(blockSize) + + return nil +} + +func (wfs *WFS) cacheGet(path filer2.FullPath) *filer_pb.Entry { + item := wfs.listDirectoryEntriesCache.Get(string(path)) + if item != nil && !item.Expired() { + return item.Value().(*filer_pb.Entry) + } + return nil +} +func (wfs *WFS) cacheSet(path filer2.FullPath, entry *filer_pb.Entry, ttl time.Duration) { + if entry == nil { + wfs.listDirectoryEntriesCache.Delete(string(path)) + } else { + wfs.listDirectoryEntriesCache.Set(string(path), entry, ttl) + } +} +func (wfs *WFS) cacheDelete(path filer2.FullPath) { + wfs.listDirectoryEntriesCache.Delete(string(path)) +} + +func (wfs *WFS) getNode(fullpath filer2.FullPath, fn func() fs.Node) fs.Node { + wfs.nodesLock.Lock() + defer wfs.nodesLock.Unlock() + + node, found := wfs.nodes[fullpath.AsInode()] + if found { + return node + } + node = fn() + if node != nil { + wfs.nodes[fullpath.AsInode()] = node + } + return node +} + +func (wfs *WFS) forgetNode(fullpath filer2.FullPath) { + wfs.nodesLock.Lock() + defer wfs.nodesLock.Unlock() + + delete(wfs.nodes, fullpath.AsInode()) +} + +func (wfs *WFS) AdjustedUrl(hostAndPort string) string { + if !wfs.option.OutsideContainerClusterMode { + return hostAndPort + } + commaIndex := strings.Index(hostAndPort, ":") + if commaIndex < 0 { + return hostAndPort + } + filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":") + return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:]) + +} |
