diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 4 | ||||
| -rw-r--r-- | weed/filesys/dirty_page.go | 18 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 30 | ||||
| -rw-r--r-- | weed/filesys/fscache.go | 32 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 72 |
5 files changed, 88 insertions, 68 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index c892b4f91..46e8aebb4 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -58,7 +58,7 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { attr.Gid = dir.entry.Attributes.Gid attr.Uid = dir.entry.Attributes.Uid - glog.V(3).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) + glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) return nil } @@ -200,7 +200,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { - glog.V(4).Infof("dir Lookup %s: %s", dir.FullPath(), req.Name) + glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String()) fullFilePath := util.NewFullPath(dir.FullPath(), req.Name) entry := dir.wfs.cacheGet(fullFilePath) diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index e2e628407..ce74d64d5 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -125,16 +125,18 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi return nil, false, nil } - chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size()) - if err == nil { - hasSavedData = true - glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId) - } else { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err) - return + for { + chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size()) + if err == nil { + hasSavedData = true + glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId) + return + } else { + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err) + time.Sleep(5 * time.Second) + } } - return } func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) { diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 83a93c062..d2983d53f 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -4,12 +4,9 @@ import ( "context" "fmt" "math" - "mime" - "path" + "net/http" "time" - "github.com/gabriel-vasile/mimetype" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -33,12 +30,16 @@ type FileHandle struct { } func newFileHandle(file *File, uid, gid uint32) *FileHandle { - return &FileHandle{ + fh := &FileHandle{ f: file, dirtyPages: newDirtyPages(file), Uid: uid, Gid: gid, } + if fh.f.entry != nil { + fh.f.entry.Attributes.FileSize = filer2.TotalSize(fh.f.entry.Chunks) + } + return fh } var _ = fs.Handle(&FileHandle{}) @@ -110,26 +111,23 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { // write the request to volume servers + data := make([]byte, len(req.Data)) + copy(data, req.Data) - fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(req.Data)), int64(fh.f.entry.Attributes.FileSize))) + fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize))) // glog.V(0).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data))) - chunks, err := fh.dirtyPages.AddPage(req.Offset, req.Data) + chunks, err := fh.dirtyPages.AddPage(req.Offset, data) if err != nil { - glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err) + glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(data)), err) return fuse.EIO } - resp.Size = len(req.Data) + resp.Size = len(data) if req.Offset == 0 { // detect mime type - detectedMIME := mimetype.Detect(req.Data) - fh.contentType = detectedMIME.String() - if ext := path.Ext(fh.f.Name); ext != detectedMIME.Extension() { - fh.contentType = mime.TypeByExtension(ext) - } - + fh.contentType = http.DetectContentType(data) fh.dirtyMetadata = true } @@ -187,7 +185,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { fh.f.entry.Attributes.Gid = req.Gid fh.f.entry.Attributes.Mtime = time.Now().Unix() fh.f.entry.Attributes.Crtime = time.Now().Unix() - fh.f.entry.Attributes.FileMode = uint32(0777 &^ fh.f.wfs.option.Umask) + fh.f.entry.Attributes.FileMode = uint32(0666 &^ fh.f.wfs.option.Umask) fh.f.entry.Attributes.Collection = fh.dirtyPages.collection fh.f.entry.Attributes.Replication = fh.dirtyPages.replication } diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go index ca8c7de5b..b146f0615 100644 --- a/weed/filesys/fscache.go +++ b/weed/filesys/fscache.go @@ -9,6 +9,7 @@ import ( type FsCache struct { root *FsNode + sync.RWMutex } type FsNode struct { parent *FsNode @@ -27,6 +28,14 @@ func newFsCache(root fs.Node) *FsCache { } func (c *FsCache) GetFsNode(path util.FullPath) fs.Node { + + c.RLock() + defer c.RUnlock() + + return c.doGetFsNode(path) +} + +func (c *FsCache) doGetFsNode(path util.FullPath) fs.Node { t := c.root for _, p := range path.Split() { t = t.findChild(p) @@ -38,6 +47,14 @@ func (c *FsCache) GetFsNode(path util.FullPath) fs.Node { } func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) { + + c.Lock() + defer c.Unlock() + + c.doSetFsNode(path, node) +} + +func (c *FsCache) doSetFsNode(path util.FullPath, node fs.Node) { t := c.root for _, p := range path.Split() { t = t.ensureChild(p) @@ -46,16 +63,24 @@ func (c *FsCache) SetFsNode(path util.FullPath, node fs.Node) { } func (c *FsCache) EnsureFsNode(path util.FullPath, genNodeFn func() fs.Node) fs.Node { - t := c.GetFsNode(path) + + c.Lock() + defer c.Unlock() + + t := c.doGetFsNode(path) if t != nil { return t } t = genNodeFn() - c.SetFsNode(path, t) + c.doSetFsNode(path, t) return t } func (c *FsCache) DeleteFsNode(path util.FullPath) { + + c.Lock() + defer c.Unlock() + t := c.root for _, p := range path.Split() { t = t.findChild(p) @@ -72,6 +97,9 @@ func (c *FsCache) DeleteFsNode(path util.FullPath) { // oldPath and newPath are full path including the new name func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { + c.Lock() + defer c.Unlock() + // find old node src := c.root for _, p := range oldPath.Split() { diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 590c39790..b3772d683 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -15,25 +15,26 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/pb_cache" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" ) type Option struct { - FilerGrpcAddress string - GrpcDialOption grpc.DialOption - FilerMountRootPath string - Collection string - Replication string - TtlSec int32 - ChunkSizeLimit int64 - ChunkCacheCountLimit int64 - DataCenter string - DirListCacheLimit int64 - EntryCacheTtl time.Duration - Umask os.FileMode + FilerGrpcAddress string + GrpcDialOption grpc.DialOption + FilerMountRootPath string + Collection string + Replication string + TtlSec int32 + ChunkSizeLimit int64 + CacheDir string + CacheSizeMB int64 + DataCenter string + DirListCacheLimit int64 + EntryCacheTtl time.Duration + Umask os.FileMode MountUid uint32 MountGid uint32 @@ -54,9 +55,8 @@ type WFS struct { listDirectoryEntriesCache *ccache.Cache // contains all open handles, protected by handlesLock - handlesLock sync.Mutex - handles []*FileHandle - pathToHandleIndex map[util.FullPath]int + handlesLock sync.Mutex + handles map[uint64]*FileHandle bufPool sync.Pool @@ -65,7 +65,7 @@ type WFS struct { root fs.Node fsNodeCache *FsCache - chunkCache *pb_cache.ChunkCache + chunkCache *chunk_cache.ChunkCache } type statsCache struct { filer_pb.StatisticsResponse @@ -76,13 +76,18 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs := &WFS{ option: option, listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)), - pathToHandleIndex: make(map[util.FullPath]int), + handles: make(map[uint64]*FileHandle), bufPool: sync.Pool{ New: func() interface{} { return make([]byte, option.ChunkSizeLimit) }, }, - chunkCache: pb_cache.NewChunkCache(option.ChunkCacheCountLimit), + } + if option.CacheSizeMB > 0 { + wfs.chunkCache = chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB) + util.OnInterrupt(func() { + wfs.chunkCache.Shutdown() + }) } wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs} @@ -117,26 +122,15 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand wfs.handlesLock.Lock() defer wfs.handlesLock.Unlock() - index, found := wfs.pathToHandleIndex[fullpath] - if found && wfs.handles[index] != nil { - glog.V(2).Infoln(fullpath, "found fileHandle id", index) - return wfs.handles[index] + inodeId := file.fullpath().AsInode() + existingHandle, found := wfs.handles[inodeId] + if found && existingHandle != nil { + return existingHandle } fileHandle = newFileHandle(file, uid, gid) - for i, h := range wfs.handles { - if h == nil { - wfs.handles[i] = fileHandle - fileHandle.handle = uint64(i) - wfs.pathToHandleIndex[fullpath] = i - glog.V(4).Infof("%s reuse fh %d", fullpath, fileHandle.handle) - return - } - } - - wfs.handles = append(wfs.handles, fileHandle) - fileHandle.handle = uint64(len(wfs.handles) - 1) - wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle) + wfs.handles[inodeId] = fileHandle + fileHandle.handle = inodeId glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle) return @@ -147,10 +141,8 @@ func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) { defer wfs.handlesLock.Unlock() glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles)) - delete(wfs.pathToHandleIndex, fullpath) - if int(handleId) < len(wfs.handles) { - wfs.handles[int(handleId)] = nil - } + + delete(wfs.handles, fullpath.AsInode()) return } |
