aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir.go4
-rw-r--r--weed/filesys/dirty_page.go18
-rw-r--r--weed/filesys/filehandle.go30
-rw-r--r--weed/filesys/fscache.go32
-rw-r--r--weed/filesys/wfs.go72
5 files changed, 88 insertions, 68 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index c892b4f91..46e8aebb4 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -58,7 +58,7 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
attr.Gid = dir.entry.Attributes.Gid
attr.Uid = dir.entry.Attributes.Uid
- glog.V(3).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr)
+ glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr)
return nil
}
@@ -200,7 +200,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
- glog.V(4).Infof("dir Lookup %s: %s", dir.FullPath(), req.Name)
+ glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
entry := dir.wfs.cacheGet(fullFilePath)
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index e2e628407..ce74d64d5 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -125,16 +125,18 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi
return nil, false, nil
}
- chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size())
- if err == nil {
- hasSavedData = true
- glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId)
- } else {
- glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err)
- return
+ for {
+ chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size())
+ if err == nil {
+ hasSavedData = true
+ glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId)
+ return
+ } else {
+ glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err)
+ time.Sleep(5 * time.Second)
+ }
}
- return
}
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) {
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 83a93c062..d2983d53f 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -4,12 +4,9 @@ import (
"context"
"fmt"
"math"
- "mime"
- "path"
+ "net/http"
"time"
- "github.com/gabriel-vasile/mimetype"
-
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -33,12 +30,16 @@ type FileHandle struct {
}
func newFileHandle(file *File, uid, gid uint32) *FileHandle {
- return &FileHandle{
+ fh := &FileHandle{
f: file,
dirtyPages: newDirtyPages(file),
Uid: uid,
Gid: gid,
}
+ if fh.f.entry != nil {
+ fh.f.entry.Attributes.FileSize = filer2.TotalSize(fh.f.entry.Chunks)
+ }
+ return fh
}
var _ = fs.Handle(&FileHandle{})
@@ -110,26 +111,23 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
// write the request to volume servers
+ data := make([]byte, len(req.Data))
+ copy(data, req.Data)
- fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(req.Data)), int64(fh.f.entry.Attributes.FileSize)))
+ fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize)))
// glog.V(0).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)))
- chunks, err := fh.dirtyPages.AddPage(req.Offset, req.Data)
+ chunks, err := fh.dirtyPages.AddPage(req.Offset, data)
if err != nil {
- glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err)
+ glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(data)), err)
return fuse.EIO
}
- resp.Size = len(req.Data)
+ resp.Size = len(data)
if req.Offset == 0 {
// detect mime type
- detectedMIME := mimetype.Detect(req.Data)
- fh.contentType = detectedMIME.String()
- if ext := path.Ext(fh.f.Name); ext != detectedMIME.Extension() {
- fh.contentType = mime.TypeByExtension(ext)
- }
-
+ fh.contentType = http.DetectContentType(data)
fh.dirtyMetadata = true
}
@@ -187,7 +185,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
fh.f.entry.Attributes.Gid = req.Gid
fh.f.entry.Attributes.Mtime = time.Now().Unix()
fh.f.entry.Attributes.Crtime = time.Now().Unix()
- fh.f.entry.Attributes.FileMode = uint32(0777 &^ fh.f.wfs.option.Umask)
+ fh.f.entry.Attributes.FileMode = uint32(0666 &^ fh.f.wfs.option.Umask)
fh.f.entry.Attributes.Collection = fh.dirtyPages.collection
fh.f.entry.Attributes.Replication = fh.dirtyPages.replication
}
diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go
index ca8c7de5b..b146f0615 100644
--- a/weed/filesys/fscache.go
+++ b/weed/filesys/fscache.go
@@ -9,6 +9,7 @@ import (
type FsCache struct {
root *FsNode
+ sync.RWMutex
}
type FsNode struct {
parent *FsNode
@@ -27,6 +28,14 @@ func newFsCache(root fs.Node) *FsCache {
}
func (c *FsCache) GetFsNode(path util.FullPath) fs.Node {
+
+ c.RLock()
+ defer c.RUnlock()
+
+ return c.doGetFsNode(path)
+}
+
+func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node {
t := c.root
for _, p := range path.Split() {
t = t.findChild(p)
@@ -38,6 +47,14 @@ func (c *FsCache) GetFsNode(path util.FullPath) fs.Node {
}
func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) {
+
+ c.Lock()
+ defer c.Unlock()
+
+ c.doSetFsNode(path, node)
+}
+
+func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) {
t := c.root
for _, p := range path.Split() {
t = t.ensureChild(p)
@@ -46,16 +63,24 @@ func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) {
}
func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node {
- t := c.GetFsNode(path)
+
+ c.Lock()
+ defer c.Unlock()
+
+ t := c.doGetFsNode(path)
if t != nil {
return t
}
t = genNodeFn()
- c.SetFsNode(path, t)
+ c.doSetFsNode(path, t)
return t
}
func (c *FsCache) DeleteFsNode(path util.FullPath) {
+
+ c.Lock()
+ defer c.Unlock()
+
t := c.root
for _, p := range path.Split() {
t = t.findChild(p)
@@ -72,6 +97,9 @@ func (c *FsCache) DeleteFsNode(path util.FullPath) {
// oldPath and newPath are full path including the new name
func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
+ c.Lock()
+ defer c.Unlock()
+
// find old node
src := c.root
for _, p := range oldPath.Split() {
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 590c39790..b3772d683 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -15,25 +15,26 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/pb_cache"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
type Option struct {
- FilerGrpcAddress string
- GrpcDialOption grpc.DialOption
- FilerMountRootPath string
- Collection string
- Replication string
- TtlSec int32
- ChunkSizeLimit int64
- ChunkCacheCountLimit int64
- DataCenter string
- DirListCacheLimit int64
- EntryCacheTtl time.Duration
- Umask os.FileMode
+ FilerGrpcAddress string
+ GrpcDialOption grpc.DialOption
+ FilerMountRootPath string
+ Collection string
+ Replication string
+ TtlSec int32
+ ChunkSizeLimit int64
+ CacheDir string
+ CacheSizeMB int64
+ DataCenter string
+ DirListCacheLimit int64
+ EntryCacheTtl time.Duration
+ Umask os.FileMode
MountUid uint32
MountGid uint32
@@ -54,9 +55,8 @@ type WFS struct {
listDirectoryEntriesCache *ccache.Cache
// contains all open handles, protected by handlesLock
- handlesLock sync.Mutex
- handles []*FileHandle
- pathToHandleIndex map[util.FullPath]int
+ handlesLock sync.Mutex
+ handles map[uint64]*FileHandle
bufPool sync.Pool
@@ -65,7 +65,7 @@ type WFS struct {
root fs.Node
fsNodeCache *FsCache
- chunkCache *pb_cache.ChunkCache
+ chunkCache *chunk_cache.ChunkCache
}
type statsCache struct {
filer_pb.StatisticsResponse
@@ -76,13 +76,18 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
option: option,
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
- pathToHandleIndex: make(map[util.FullPath]int),
+ handles: make(map[uint64]*FileHandle),
bufPool: sync.Pool{
New: func() interface{} {
return make([]byte, option.ChunkSizeLimit)
},
},
- chunkCache: pb_cache.NewChunkCache(option.ChunkCacheCountLimit),
+ }
+ if option.CacheSizeMB > 0 {
+ wfs.chunkCache = chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB)
+ util.OnInterrupt(func() {
+ wfs.chunkCache.Shutdown()
+ })
}
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs}
@@ -117,26 +122,15 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand
wfs.handlesLock.Lock()
defer wfs.handlesLock.Unlock()
- index, found := wfs.pathToHandleIndex[fullpath]
- if found && wfs.handles[index] != nil {
- glog.V(2).Infoln(fullpath, "found fileHandle id", index)
- return wfs.handles[index]
+ inodeId := file.fullpath().AsInode()
+ existingHandle, found := wfs.handles[inodeId]
+ if found && existingHandle != nil {
+ return existingHandle
}
fileHandle = newFileHandle(file, uid, gid)
- for i, h := range wfs.handles {
- if h == nil {
- wfs.handles[i] = fileHandle
- fileHandle.handle = uint64(i)
- wfs.pathToHandleIndex[fullpath] = i
- glog.V(4).Infof("%s reuse fh %d", fullpath, fileHandle.handle)
- return
- }
- }
-
- wfs.handles = append(wfs.handles, fileHandle)
- fileHandle.handle = uint64(len(wfs.handles) - 1)
- wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
+ wfs.handles[inodeId] = fileHandle
+ fileHandle.handle = inodeId
glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
return
@@ -147,10 +141,8 @@ func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
defer wfs.handlesLock.Unlock()
glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
- delete(wfs.pathToHandleIndex, fullpath)
- if int(handleId) < len(wfs.handles) {
- wfs.handles[int(handleId)] = nil
- }
+
+ delete(wfs.handles, fullpath.AsInode())
return
}