aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2020-08-20 19:18:23 +0800
committerGitHub <noreply@github.com>2020-08-20 19:18:23 +0800
commitb0d6330cf44dbb0664f6ede0dbc82865879dcfe0 (patch)
treedcf5b0dfb71089126da5ec3a3fb8eb763a37c739 /weed/filesys
parent6a93e26fc32ce35901c96371628fd0916b639026 (diff)
parentf48567c5c62bf8c8cebf568eeb919f25a4fc4289 (diff)
downloadseaweedfs-b0d6330cf44dbb0664f6ede0dbc82865879dcfe0.tar.xz
seaweedfs-b0d6330cf44dbb0664f6ede0dbc82865879dcfe0.zip
Merge pull request #12 from chrislusf/master
sync
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dir.go47
-rw-r--r--weed/filesys/dir_link.go4
-rw-r--r--weed/filesys/dir_rename.go12
-rw-r--r--weed/filesys/dirty_page.go25
-rw-r--r--weed/filesys/dirty_page_interval.go15
-rw-r--r--weed/filesys/file.go55
-rw-r--r--weed/filesys/filehandle.go84
-rw-r--r--weed/filesys/fscache.go13
-rw-r--r--weed/filesys/fscache_test.go21
-rw-r--r--weed/filesys/meta_cache/meta_cache.go9
-rw-r--r--weed/filesys/meta_cache/meta_cache_init.go2
-rw-r--r--weed/filesys/wfs.go16
-rw-r--r--weed/filesys/wfs_deletion.go2
13 files changed, 189 insertions, 116 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 08332d967..46f5f22ed 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -63,7 +63,7 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
attr.Gid = dir.entry.Attributes.Gid
attr.Uid = dir.entry.Attributes.Uid
- glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr)
+ glog.V(5).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr)
return nil
}
@@ -101,7 +101,7 @@ func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
}
func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
- return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node {
+ f := dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node {
return &File{
Name: name,
dir: dir,
@@ -110,14 +110,17 @@ func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node {
entryViewCache: nil,
}
})
+ f.(*File).dir = dir // in case dir node was created later
+ return f
}
func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node {
- return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node {
+ d := dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node {
return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir}
})
-
+ d.(*Dir).parent = dir // in case dir node was created later
+ return d
}
func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
@@ -218,7 +221,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
- glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
+ glog.V(5).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
dirPath := util.FullPath(dir.FullPath())
@@ -237,7 +240,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
return nil, fuse.ENOENT
}
} else {
- glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath)
+ glog.V(5).Infof("dir Lookup cache hit %s", fullFilePath)
}
if entry != nil {
@@ -265,7 +268,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
- glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath())
+ glog.V(5).Infof("dir ReadDirAll %s", dir.FullPath())
processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error {
fullpath := util.NewFullPath(dir.FullPath(), entry.Name)
@@ -314,12 +317,8 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
return nil
}
- dir.wfs.deleteFileChunks(entry.Chunks)
-
- dir.wfs.fsNodeCache.DeleteFsNode(filePath)
-
- dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
+ // first, ensure the filer store can correctly delete
glog.V(3).Infof("remove file: %v", req)
err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false, false)
if err != nil {
@@ -327,34 +326,40 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
return fuse.ENOENT
}
+ // then, delete meta cache and fsNode cache
+ dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
+ dir.wfs.fsNodeCache.DeleteFsNode(filePath)
+
+ // delete the chunks last
+ dir.wfs.deleteFileChunks(entry.Chunks)
+
return nil
}
func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error {
- t := util.NewFullPath(dir.FullPath(), req.Name)
- dir.wfs.fsNodeCache.DeleteFsNode(t)
-
- dir.wfs.metaCache.DeleteEntry(context.Background(), t)
-
glog.V(3).Infof("remove directory entry: %v", req)
err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false, false)
if err != nil {
- glog.V(3).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err)
+ glog.V(0).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err)
if strings.Contains(err.Error(), "non-empty"){
return fuse.EEXIST
}
return fuse.ENOENT
}
+ t := util.NewFullPath(dir.FullPath(), req.Name)
+ dir.wfs.metaCache.DeleteEntry(context.Background(), t)
+ dir.wfs.fsNodeCache.DeleteFsNode(t)
+
return nil
}
func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
- glog.V(3).Infof("%v dir setattr %+v", dir.FullPath(), req)
+ glog.V(4).Infof("%v dir setattr %+v", dir.FullPath(), req)
if err := dir.maybeLoadEntry(); err != nil {
return err
@@ -429,7 +434,7 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp
}
func (dir *Dir) Forget() {
- glog.V(3).Infof("Forget dir %s", dir.FullPath())
+ glog.V(5).Infof("Forget dir %s", dir.FullPath())
dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath()))
}
@@ -460,7 +465,7 @@ func (dir *Dir) saveEntry() error {
glog.V(1).Infof("save dir entry: %v", request)
_, err := client.UpdateEntry(context.Background(), request)
if err != nil {
- glog.V(0).Infof("UpdateEntry dir %s/%s: %v", parentDir, name, err)
+ glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err)
return fuse.EIO
}
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
index 4990e743c..bd564f413 100644
--- a/weed/filesys/dir_link.go
+++ b/weed/filesys/dir_link.go
@@ -18,7 +18,7 @@ var _ = fs.NodeReadlinker(&File{})
func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, error) {
- glog.V(3).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target)
+ glog.V(4).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target)
request := &filer_pb.CreateEntryRequest{
Directory: dir.FullPath(),
@@ -63,7 +63,7 @@ func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (stri
return "", fuse.Errno(syscall.EINVAL)
}
- glog.V(3).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, file.entry.Attributes.SymlinkTarget)
+ glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, file.entry.Attributes.SymlinkTarget)
return file.entry.Attributes.SymlinkTarget, nil
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go
index da4f1b232..0e417e0ab 100644
--- a/weed/filesys/dir_rename.go
+++ b/weed/filesys/dir_rename.go
@@ -63,7 +63,17 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
// fmt.Printf("rename path: %v => %v\n", oldPath, newPath)
dir.wfs.fsNodeCache.Move(oldPath, newPath)
- delete(dir.wfs.handles, oldPath.AsInode())
+
+ // change file handle
+ dir.wfs.handlesLock.Lock()
+ defer dir.wfs.handlesLock.Unlock()
+ inodeId := oldPath.AsInode()
+ existingHandle, found := dir.wfs.handles[inodeId]
+ if !found || existingHandle == nil {
+ return err
+ }
+ delete(dir.wfs.handles, inodeId)
+ dir.wfs.handles[newPath.AsInode()] = existingHandle
return err
}
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 46d20e466..88a1a4f55 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -25,9 +25,6 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
}
}
-func (pages *ContinuousDirtyPages) releaseResource() {
-}
-
var counter = int32(0)
func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) {
@@ -35,7 +32,7 @@ func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []
pages.lock.Lock()
defer pages.lock.Unlock()
- glog.V(3).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data)))
+ glog.V(5).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize)
if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
// this is more than what buffer can hold.
@@ -121,14 +118,16 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi
return nil, false, nil
}
+ fileSize := int64(pages.f.entry.Attributes.FileSize)
for {
- chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size())
+ chunkSize := min(maxList.Size(), fileSize-maxList.Offset())
+ chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize)
if err == nil {
hasSavedData = true
- glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId)
+ glog.V(4).Infof("saveToStorage %s %s [%d,%d) of %d bytes", pages.f.fullpath(), chunk.GetFileIdString(), maxList.Offset(), maxList.Offset()+chunkSize, fileSize)
return
} else {
- glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err)
+ glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+chunkSize, err)
time.Sleep(5 * time.Second)
}
}
@@ -139,6 +138,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
dir, _ := pages.f.fullpath().DirAndName()
+ reader = io.LimitReader(reader, size)
chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset)
if err != nil {
return nil, err
@@ -149,6 +149,13 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
}
+func maxUint64(x, y uint64) uint64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
func max(x, y int64) int64 {
if x > y {
return x
@@ -162,11 +169,11 @@ func min(x, y int64) int64 {
return y
}
-func (pages *ContinuousDirtyPages) ReadDirtyData(data []byte, startOffset int64) (offset int64, size int) {
+func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
pages.lock.Lock()
defer pages.lock.Unlock()
- return pages.intervals.ReadData(data, startOffset)
+ return pages.intervals.ReadDataAt(data, startOffset)
}
diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go
index ec94c6df1..afa2755ed 100644
--- a/weed/filesys/dirty_page_interval.go
+++ b/weed/filesys/dirty_page_interval.go
@@ -3,7 +3,6 @@ package filesys
import (
"bytes"
"io"
- "math"
)
type IntervalNode struct {
@@ -186,25 +185,15 @@ func (c *ContinuousIntervals) removeList(target *IntervalLinkedList) {
}
-func (c *ContinuousIntervals) ReadData(data []byte, startOffset int64) (offset int64, size int) {
- var minOffset int64 = math.MaxInt64
- var maxStop int64
+func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) {
for _, list := range c.lists {
start := max(startOffset, list.Offset())
stop := min(startOffset+int64(len(data)), list.Offset()+list.Size())
- if start <= stop {
+ if start < stop {
list.ReadData(data[start-startOffset:], start, stop)
- minOffset = min(minOffset, start)
maxStop = max(maxStop, stop)
}
}
-
- if minOffset == math.MaxInt64 {
- return 0, 0
- }
-
- offset = minOffset
- size = int(maxStop - offset)
return
}
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index dcda93522..d57f6cc57 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -7,12 +7,13 @@ import (
"sort"
"time"
+ "github.com/seaweedfs/fuse"
+ "github.com/seaweedfs/fuse/fs"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
)
const blockSize = 512
@@ -35,6 +36,7 @@ type File struct {
entryViewCache []filer2.VisibleInterval
isOpen int
reader io.ReaderAt
+ dirtyMetadata bool
}
func (file *File) fullpath() util.FullPath {
@@ -43,7 +45,7 @@ func (file *File) fullpath() util.FullPath {
func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
- glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr)
+ glog.V(5).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr)
if file.isOpen <= 0 {
if err := file.maybeLoadEntry(ctx); err != nil {
@@ -54,7 +56,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
attr.Inode = file.fullpath().AsInode()
attr.Valid = time.Second
attr.Mode = os.FileMode(file.entry.Attributes.FileMode)
- attr.Size = filer2.TotalSize(file.entry.Chunks)
+ attr.Size = filer2.FileSize(file.entry)
if file.isOpen > 0 {
attr.Size = file.entry.Attributes.FileSize
glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size)
@@ -91,7 +93,7 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op
resp.Handle = fuse.HandleID(handle.handle)
- glog.V(3).Infof("%v file open handle id = %d", file.fullpath(), handle.handle)
+ glog.V(4).Infof("%v file open handle id = %d", file.fullpath(), handle.handle)
return handle, nil
@@ -99,7 +101,7 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op
func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
- glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.entry.Attributes)
+ glog.V(5).Infof("%v file setattr %+v", file.fullpath(), req)
if err := file.maybeLoadEntry(ctx); err != nil {
return err
@@ -107,49 +109,72 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
if req.Valid.Size() {
- glog.V(3).Infof("%v file setattr set size=%v", file.fullpath(), req.Size)
+ glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(file.entry.Chunks))
if req.Size < filer2.TotalSize(file.entry.Chunks) {
// fmt.Printf("truncate %v \n", fullPath)
var chunks []*filer_pb.FileChunk
+ var truncatedChunks []*filer_pb.FileChunk
for _, chunk := range file.entry.Chunks {
int64Size := int64(chunk.Size)
if chunk.Offset+int64Size > int64(req.Size) {
+ // this chunk is truncated
int64Size = int64(req.Size) - chunk.Offset
- }
- if int64Size > 0 {
- chunks = append(chunks, chunk)
+ if int64Size > 0 {
+ chunks = append(chunks, chunk)
+ glog.V(4).Infof("truncated chunk %+v from %d to %d\n", chunk.GetFileIdString(), chunk.Size, int64Size)
+ chunk.Size = uint64(int64Size)
+ } else {
+ glog.V(4).Infof("truncated whole chunk %+v\n", chunk.GetFileIdString())
+ truncatedChunks = append(truncatedChunks, chunk)
+ }
}
}
+ file.wfs.deleteFileChunks(truncatedChunks)
file.entry.Chunks = chunks
file.entryViewCache = nil
file.reader = nil
}
file.entry.Attributes.FileSize = req.Size
+ file.dirtyMetadata = true
}
+
if req.Valid.Mode() {
file.entry.Attributes.FileMode = uint32(req.Mode)
+ file.dirtyMetadata = true
}
if req.Valid.Uid() {
file.entry.Attributes.Uid = req.Uid
+ file.dirtyMetadata = true
}
if req.Valid.Gid() {
file.entry.Attributes.Gid = req.Gid
+ file.dirtyMetadata = true
}
if req.Valid.Crtime() {
file.entry.Attributes.Crtime = req.Crtime.Unix()
+ file.dirtyMetadata = true
}
if req.Valid.Mtime() {
file.entry.Attributes.Mtime = req.Mtime.Unix()
+ file.dirtyMetadata = true
+ }
+
+ if req.Valid.Handle() {
+ // fmt.Printf("file handle => %d\n", req.Handle)
}
if file.isOpen > 0 {
return nil
}
+ if !file.dirtyMetadata {
+ return nil
+ }
+
return file.saveEntry()
}
@@ -205,14 +230,14 @@ func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, res
func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
// fsync works at OS level
// write the file chunks to the filerGrpcAddress
- glog.V(3).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req)
+ glog.V(4).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req)
return nil
}
func (file *File) Forget() {
t := util.NewFullPath(file.dir.FullPath(), file.Name)
- glog.V(3).Infof("Forget file %s", t)
+ glog.V(5).Infof("Forget file %s", t)
file.wfs.fsNodeCache.DeleteFsNode(t)
}
@@ -246,7 +271,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
file.reader = nil
- glog.V(3).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
+ glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
file.entry.Chunks = append(file.entry.Chunks, chunks...)
}
@@ -265,10 +290,10 @@ func (file *File) saveEntry() error {
Entry: file.entry,
}
- glog.V(1).Infof("save file entry: %v", request)
+ glog.V(4).Infof("save file entry: %v", request)
_, err := client.UpdateEntry(context.Background(), request)
if err != nil {
- glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
+ glog.Errorf("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
return fuse.EIO
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index b9d224fb2..94590f842 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -19,10 +19,9 @@ import (
type FileHandle struct {
// cache file has been written to
- dirtyPages *ContinuousDirtyPages
- contentType string
- dirtyMetadata bool
- handle uint64
+ dirtyPages *ContinuousDirtyPages
+ contentType string
+ handle uint64
f *File
RequestId fuse.RequestID // unique ID for request
@@ -40,7 +39,7 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle {
Gid: gid,
}
if fh.f.entry != nil {
- fh.f.entry.Attributes.FileSize = filer2.TotalSize(fh.f.entry.Chunks)
+ fh.f.entry.Attributes.FileSize = filer2.FileSize(fh.f.entry)
}
return fh
}
@@ -55,38 +54,45 @@ var _ = fs.HandleReleaser(&FileHandle{})
func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
- glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size))
+ glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data))
- buff := make([]byte, req.Size)
+ buff := resp.Data[:cap(resp.Data)]
+ if req.Size > cap(resp.Data) {
+ // should not happen
+ buff = make([]byte, req.Size)
+ }
totalRead, err := fh.readFromChunks(buff, req.Offset)
if err == nil {
- dirtyOffset, dirtySize := fh.readFromDirtyPages(buff, req.Offset)
- if totalRead+req.Offset < dirtyOffset+int64(dirtySize) {
- totalRead = dirtyOffset + int64(dirtySize) - req.Offset
- }
+ maxStop := fh.readFromDirtyPages(buff, req.Offset)
+ totalRead = max(maxStop - req.Offset, totalRead)
}
- resp.Data = buff[:totalRead]
-
if err != nil {
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
return fuse.EIO
}
+ if totalRead > int64(len(buff)) {
+ glog.Warningf("%s FileHandle Read %d: [%d,%d) size %d totalRead %d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, totalRead)
+ totalRead = min(int64(len(buff)), totalRead)
+ }
+ resp.Data = buff[:totalRead]
+
return err
}
-func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (offset int64, size int) {
- return fh.dirtyPages.ReadDirtyData(buff, startOffset)
+func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) {
+ return fh.dirtyPages.ReadDirtyDataAt(buff, startOffset)
}
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
- // this value should come from the filer instead of the old f
- if len(fh.f.entry.Chunks) == 0 {
+ fileSize := int64(filer2.FileSize(fh.f.entry))
+
+ if fileSize == 0 {
glog.V(1).Infof("empty fh %v", fh.f.fullpath())
- return 0, nil
+ return 0, io.EOF
}
var chunkResolveErr error
@@ -99,8 +105,8 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
}
if fh.f.reader == nil {
- chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt32)
- fh.f.reader = filer2.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache)
+ chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64)
+ fh.f.reader = filer2.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache, fileSize)
}
totalRead, err := fh.f.reader.ReadAt(buff, offset)
@@ -113,7 +119,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
}
- // glog.V(0).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err)
+ glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err)
return int64(totalRead), err
}
@@ -126,7 +132,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
copy(data, req.Data)
fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize)))
- // glog.V(0).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)))
+ glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
chunks, err := fh.dirtyPages.AddPage(req.Offset, data)
if err != nil {
@@ -139,14 +145,14 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
if req.Offset == 0 {
// detect mime type
fh.contentType = http.DetectContentType(data)
- fh.dirtyMetadata = true
+ fh.f.dirtyMetadata = true
}
if len(chunks) > 0 {
fh.f.addChunks(chunks)
- fh.dirtyMetadata = true
+ fh.f.dirtyMetadata = true
}
return nil
@@ -154,24 +160,28 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
- glog.V(4).Infof("%v release fh %d", fh.f.fullpath(), fh.handle)
+ glog.V(4).Infof("Release %v fh %d", fh.f.fullpath(), fh.handle)
fh.f.isOpen--
if fh.f.isOpen <= 0 {
- fh.dirtyPages.releaseResource()
+ fh.doFlush(ctx, req.Header)
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
+ fh.f.entryViewCache = nil
+ fh.f.reader = nil
}
- fh.f.entryViewCache = nil
- fh.f.reader = nil
return nil
}
func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
+ return fh.doFlush(ctx, req.Header)
+}
+
+func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
// fflush works at fh level
// send the data to the OS
- glog.V(4).Infof("%s fh %d flush %v", fh.f.fullpath(), fh.handle, req)
+ glog.V(4).Infof("doFlush %s fh %d %v", fh.f.fullpath(), fh.handle, header)
chunks, err := fh.dirtyPages.FlushToStorage()
if err != nil {
@@ -181,10 +191,10 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
if len(chunks) > 0 {
fh.f.addChunks(chunks)
- fh.dirtyMetadata = true
+ fh.f.dirtyMetadata = true
}
- if !fh.dirtyMetadata {
+ if !fh.f.dirtyMetadata {
return nil
}
@@ -193,10 +203,10 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
if fh.f.entry.Attributes != nil {
fh.f.entry.Attributes.Mime = fh.contentType
if fh.f.entry.Attributes.Uid == 0 {
- fh.f.entry.Attributes.Uid = req.Uid
+ fh.f.entry.Attributes.Uid = header.Uid
}
if fh.f.entry.Attributes.Gid == 0 {
- fh.f.entry.Attributes.Gid = req.Gid
+ fh.f.entry.Attributes.Gid = header.Gid
}
if fh.f.entry.Attributes.Crtime == 0 {
fh.f.entry.Attributes.Crtime = time.Now().Unix()
@@ -212,9 +222,9 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
Entry: fh.f.entry,
}
- glog.V(3).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks))
+ glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks))
for i, chunk := range fh.f.entry.Chunks {
- glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
}
chunks, garbages := filer2.CompactFileChunks(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
@@ -239,14 +249,14 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
fh.f.wfs.deleteFileChunks(garbages)
for i, chunk := range garbages {
- glog.V(3).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ glog.V(4).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
}
return nil
})
if err == nil {
- fh.dirtyMetadata = false
+ fh.f.dirtyMetadata = false
}
if err != nil {
diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go
index b146f0615..fdec8253c 100644
--- a/weed/filesys/fscache.go
+++ b/weed/filesys/fscache.go
@@ -3,8 +3,9 @@ package filesys
import (
"sync"
- "github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse/fs"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type FsCache struct {
@@ -118,7 +119,6 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
target = target.ensureChild(p)
}
parent := target.parent
- src.name = target.name
if dir, ok := src.node.(*Dir); ok {
dir.name = target.name // target is not Dir, but a shortcut
}
@@ -132,6 +132,7 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode {
target.deleteSelf()
+ src.name = target.name
src.connectToParent(parent)
return src
@@ -144,10 +145,14 @@ func (n *FsNode) connectToParent(parent *FsNode) {
oldNode.deleteSelf()
}
if dir, ok := n.node.(*Dir); ok {
- dir.parent = parent.node.(*Dir)
+ if parent.node != nil {
+ dir.parent = parent.node.(*Dir)
+ }
}
if f, ok := n.node.(*File); ok {
- f.dir = parent.node.(*Dir)
+ if parent.node != nil {
+ f.dir = parent.node.(*Dir)
+ }
}
n.childrenLock.Lock()
parent.children[n.name] = n
diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go
index 67f9aacc8..8bfae1472 100644
--- a/weed/filesys/fscache_test.go
+++ b/weed/filesys/fscache_test.go
@@ -94,3 +94,24 @@ func TestFsCacheMove(t *testing.T) {
}
}
+
+
+func TestFsCacheMove2(t *testing.T) {
+
+ cache := newFsCache(nil)
+
+ cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"})
+ cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"})
+
+ cache.Move(util.FullPath("/a/b/d"), util.FullPath("/a/b/e"))
+
+ d := cache.GetFsNode(util.FullPath("/a/b/e"))
+ if d == nil {
+ t.Errorf("unexpected nil node!")
+ }
+ if d.(*File).Name != "e" {
+ t.Errorf("unexpected node!")
+ }
+
+}
+
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index edf329143..15ec0903d 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -61,8 +61,13 @@ func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPat
oldDir, _ := oldPath.DirAndName()
if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) {
if oldPath != "" {
- if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil {
- return err
+ if newEntry != nil && oldPath == newEntry.FullPath {
+ // skip the unnecessary deletion
+ // leave the update to the following InsertEntry operation
+ } else {
+ if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil {
+ return err
+ }
}
}
} else {
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
index e119ebff5..cd98f4a7c 100644
--- a/weed/filesys/meta_cache/meta_cache_init.go
+++ b/weed/filesys/meta_cache/meta_cache_init.go
@@ -14,7 +14,7 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {
- glog.V(2).Infof("ReadDirAllEntries %s ...", path)
+ glog.V(5).Infof("ReadDirAllEntries %s ...", path)
err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
entry := filer2.FromPbEntry(string(dirPath), pbEntry)
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 22f0b655a..f147d7548 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -65,7 +65,7 @@ type WFS struct {
root fs.Node
fsNodeCache *FsCache
- chunkCache *chunk_cache.ChunkCache
+ chunkCache *chunk_cache.TieredChunkCache
metaCache *meta_cache.MetaCache
}
type statsCache struct {
@@ -87,10 +87,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
cacheDir := path.Join(option.CacheDir, cacheUniqueId)
if option.CacheSizeMB > 0 {
os.MkdirAll(cacheDir, 0755)
- wfs.chunkCache = chunk_cache.NewChunkCache(256, cacheDir, option.CacheSizeMB)
- grace.OnInterrupt(func() {
- wfs.chunkCache.Shutdown()
- })
+ wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB)
}
wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"))
@@ -113,7 +110,7 @@ func (wfs *WFS) Root() (fs.Node, error) {
func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
fullpath := file.fullpath()
- glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid)
+ glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
wfs.handlesLock.Lock()
defer wfs.handlesLock.Unlock()
@@ -127,7 +124,6 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand
fileHandle = newFileHandle(file, uid, gid)
wfs.handles[inodeId] = fileHandle
fileHandle.handle = inodeId
- glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle)
return
}
@@ -146,7 +142,7 @@ func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
// Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
- glog.V(4).Infof("reading fs stats: %+v", req)
+ glog.V(5).Infof("reading fs stats: %+v", req)
if wfs.stats.lastChecked < time.Now().Unix()-20 {
@@ -158,13 +154,13 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
}
- glog.V(4).Infof("reading filer stats: %+v", request)
+ glog.V(5).Infof("reading filer stats: %+v", request)
resp, err := client.Statistics(context.Background(), request)
if err != nil {
glog.V(0).Infof("reading filer stats %v: %v", request, err)
return err
}
- glog.V(4).Infof("read filer stats: %+v", resp)
+ glog.V(5).Infof("read filer stats: %+v", resp)
wfs.stats.TotalSize = resp.TotalSize
wfs.stats.UsedSize = resp.UsedSize
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
index bf21b1808..203ebdad1 100644
--- a/weed/filesys/wfs_deletion.go
+++ b/weed/filesys/wfs_deletion.go
@@ -38,7 +38,7 @@ func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.Se
m := make(map[string]operation.LookupResult)
- glog.V(4).Infof("remove file lookup volume id locations: %v", vids)
+ glog.V(5).Infof("deleteFileIds lookup volume id locations: %v", vids)
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: vids,
})