diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2020-08-20 19:18:23 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-08-20 19:18:23 +0800 |
| commit | b0d6330cf44dbb0664f6ede0dbc82865879dcfe0 (patch) | |
| tree | dcf5b0dfb71089126da5ec3a3fb8eb763a37c739 /weed/filesys | |
| parent | 6a93e26fc32ce35901c96371628fd0916b639026 (diff) | |
| parent | f48567c5c62bf8c8cebf568eeb919f25a4fc4289 (diff) | |
| download | seaweedfs-b0d6330cf44dbb0664f6ede0dbc82865879dcfe0.tar.xz seaweedfs-b0d6330cf44dbb0664f6ede0dbc82865879dcfe0.zip | |
Merge pull request #12 from chrislusf/master
sync
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 47 | ||||
| -rw-r--r-- | weed/filesys/dir_link.go | 4 | ||||
| -rw-r--r-- | weed/filesys/dir_rename.go | 12 | ||||
| -rw-r--r-- | weed/filesys/dirty_page.go | 25 | ||||
| -rw-r--r-- | weed/filesys/dirty_page_interval.go | 15 | ||||
| -rw-r--r-- | weed/filesys/file.go | 55 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 84 | ||||
| -rw-r--r-- | weed/filesys/fscache.go | 13 | ||||
| -rw-r--r-- | weed/filesys/fscache_test.go | 21 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache.go | 9 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache_init.go | 2 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 16 | ||||
| -rw-r--r-- | weed/filesys/wfs_deletion.go | 2 |
13 files changed, 189 insertions, 116 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 08332d967..46f5f22ed 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -63,7 +63,7 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { attr.Gid = dir.entry.Attributes.Gid attr.Uid = dir.entry.Attributes.Uid - glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) + glog.V(5).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) return nil } @@ -101,7 +101,7 @@ func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { } func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { - return dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node { + f := dir.wfs.fsNodeCache.EnsureFsNode(util.NewFullPath(dir.FullPath(), name), func() fs.Node { return &File{ Name: name, dir: dir, @@ -110,14 +110,17 @@ func (dir *Dir) newFile(name string, entry *filer_pb.Entry) fs.Node { entryViewCache: nil, } }) + f.(*File).dir = dir // in case dir node was created later + return f } func (dir *Dir) newDirectory(fullpath util.FullPath, entry *filer_pb.Entry) fs.Node { - return dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node { + d := dir.wfs.fsNodeCache.EnsureFsNode(fullpath, func() fs.Node { return &Dir{name: entry.Name, wfs: dir.wfs, entry: entry, parent: dir} }) - + d.(*Dir).parent = dir // in case dir node was created later + return d } func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, @@ -218,7 +221,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { - glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String()) + glog.V(5).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String()) fullFilePath := util.NewFullPath(dir.FullPath(), req.Name) dirPath := util.FullPath(dir.FullPath()) @@ -237,7 +240,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. return nil, fuse.ENOENT } } else { - glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath) + glog.V(5).Infof("dir Lookup cache hit %s", fullFilePath) } if entry != nil { @@ -265,7 +268,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath()) + glog.V(5).Infof("dir ReadDirAll %s", dir.FullPath()) processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error { fullpath := util.NewFullPath(dir.FullPath(), entry.Name) @@ -314,12 +317,8 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { return nil } - dir.wfs.deleteFileChunks(entry.Chunks) - - dir.wfs.fsNodeCache.DeleteFsNode(filePath) - - dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) + // first, ensure the filer store can correctly delete glog.V(3).Infof("remove file: %v", req) err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false, false) if err != nil { @@ -327,34 +326,40 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { return fuse.ENOENT } + // then, delete meta cache and fsNode cache + dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) + dir.wfs.fsNodeCache.DeleteFsNode(filePath) + + // delete the chunks last + dir.wfs.deleteFileChunks(entry.Chunks) + return nil } func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { - t := util.NewFullPath(dir.FullPath(), req.Name) - dir.wfs.fsNodeCache.DeleteFsNode(t) - - dir.wfs.metaCache.DeleteEntry(context.Background(), t) - glog.V(3).Infof("remove directory entry: %v", req) err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false, false) if err != nil { - glog.V(3).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err) + glog.V(0).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err) if strings.Contains(err.Error(), "non-empty"){ return fuse.EEXIST } return fuse.ENOENT } + t := util.NewFullPath(dir.FullPath(), req.Name) + dir.wfs.metaCache.DeleteEntry(context.Background(), t) + dir.wfs.fsNodeCache.DeleteFsNode(t) + return nil } func (dir *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - glog.V(3).Infof("%v dir setattr %+v", dir.FullPath(), req) + glog.V(4).Infof("%v dir setattr %+v", dir.FullPath(), req) if err := dir.maybeLoadEntry(); err != nil { return err @@ -429,7 +434,7 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp } func (dir *Dir) Forget() { - glog.V(3).Infof("Forget dir %s", dir.FullPath()) + glog.V(5).Infof("Forget dir %s", dir.FullPath()) dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath())) } @@ -460,7 +465,7 @@ func (dir *Dir) saveEntry() error { glog.V(1).Infof("save dir entry: %v", request) _, err := client.UpdateEntry(context.Background(), request) if err != nil { - glog.V(0).Infof("UpdateEntry dir %s/%s: %v", parentDir, name, err) + glog.Errorf("UpdateEntry dir %s/%s: %v", parentDir, name, err) return fuse.EIO } diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index 4990e743c..bd564f413 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -18,7 +18,7 @@ var _ = fs.NodeReadlinker(&File{}) func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, error) { - glog.V(3).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target) + glog.V(4).Infof("Symlink: %v/%v to %v", dir.FullPath(), req.NewName, req.Target) request := &filer_pb.CreateEntryRequest{ Directory: dir.FullPath(), @@ -63,7 +63,7 @@ func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (stri return "", fuse.Errno(syscall.EINVAL) } - glog.V(3).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, file.entry.Attributes.SymlinkTarget) + glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, file.entry.Attributes.SymlinkTarget) return file.entry.Attributes.SymlinkTarget, nil diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index da4f1b232..0e417e0ab 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -63,7 +63,17 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector // fmt.Printf("rename path: %v => %v\n", oldPath, newPath) dir.wfs.fsNodeCache.Move(oldPath, newPath) - delete(dir.wfs.handles, oldPath.AsInode()) + + // change file handle + dir.wfs.handlesLock.Lock() + defer dir.wfs.handlesLock.Unlock() + inodeId := oldPath.AsInode() + existingHandle, found := dir.wfs.handles[inodeId] + if !found || existingHandle == nil { + return err + } + delete(dir.wfs.handles, inodeId) + dir.wfs.handles[newPath.AsInode()] = existingHandle return err } diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 46d20e466..88a1a4f55 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -25,9 +25,6 @@ func newDirtyPages(file *File) *ContinuousDirtyPages { } } -func (pages *ContinuousDirtyPages) releaseResource() { -} - var counter = int32(0) func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { @@ -35,7 +32,7 @@ func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks [] pages.lock.Lock() defer pages.lock.Unlock() - glog.V(3).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data))) + glog.V(5).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize) if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { // this is more than what buffer can hold. @@ -121,14 +118,16 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi return nil, false, nil } + fileSize := int64(pages.f.entry.Attributes.FileSize) for { - chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size()) + chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) + chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) if err == nil { hasSavedData = true - glog.V(3).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId) + glog.V(4).Infof("saveToStorage %s %s [%d,%d) of %d bytes", pages.f.fullpath(), chunk.GetFileIdString(), maxList.Offset(), maxList.Offset()+chunkSize, fileSize) return } else { - glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err) + glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+chunkSize, err) time.Sleep(5 * time.Second) } } @@ -139,6 +138,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, dir, _ := pages.f.fullpath().DirAndName() + reader = io.LimitReader(reader, size) chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset) if err != nil { return nil, err @@ -149,6 +149,13 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, } +func maxUint64(x, y uint64) uint64 { + if x > y { + return x + } + return y +} + func max(x, y int64) int64 { if x > y { return x @@ -162,11 +169,11 @@ func min(x, y int64) int64 { return y } -func (pages *ContinuousDirtyPages) ReadDirtyData(data []byte, startOffset int64) (offset int64, size int) { +func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { pages.lock.Lock() defer pages.lock.Unlock() - return pages.intervals.ReadData(data, startOffset) + return pages.intervals.ReadDataAt(data, startOffset) } diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go index ec94c6df1..afa2755ed 100644 --- a/weed/filesys/dirty_page_interval.go +++ b/weed/filesys/dirty_page_interval.go @@ -3,7 +3,6 @@ package filesys import ( "bytes" "io" - "math" ) type IntervalNode struct { @@ -186,25 +185,15 @@ func (c *ContinuousIntervals) removeList(target *IntervalLinkedList) { } -func (c *ContinuousIntervals) ReadData(data []byte, startOffset int64) (offset int64, size int) { - var minOffset int64 = math.MaxInt64 - var maxStop int64 +func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) { for _, list := range c.lists { start := max(startOffset, list.Offset()) stop := min(startOffset+int64(len(data)), list.Offset()+list.Size()) - if start <= stop { + if start < stop { list.ReadData(data[start-startOffset:], start, stop) - minOffset = min(minOffset, start) maxStop = max(maxStop, stop) } } - - if minOffset == math.MaxInt64 { - return 0, 0 - } - - offset = minOffset - size = int(maxStop - offset) return } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index dcda93522..d57f6cc57 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -7,12 +7,13 @@ import ( "sort" "time" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/seaweedfs/fuse" - "github.com/seaweedfs/fuse/fs" ) const blockSize = 512 @@ -35,6 +36,7 @@ type File struct { entryViewCache []filer2.VisibleInterval isOpen int reader io.ReaderAt + dirtyMetadata bool } func (file *File) fullpath() util.FullPath { @@ -43,7 +45,7 @@ func (file *File) fullpath() util.FullPath { func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { - glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr) + glog.V(5).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr) if file.isOpen <= 0 { if err := file.maybeLoadEntry(ctx); err != nil { @@ -54,7 +56,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { attr.Inode = file.fullpath().AsInode() attr.Valid = time.Second attr.Mode = os.FileMode(file.entry.Attributes.FileMode) - attr.Size = filer2.TotalSize(file.entry.Chunks) + attr.Size = filer2.FileSize(file.entry) if file.isOpen > 0 { attr.Size = file.entry.Attributes.FileSize glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size) @@ -91,7 +93,7 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op resp.Handle = fuse.HandleID(handle.handle) - glog.V(3).Infof("%v file open handle id = %d", file.fullpath(), handle.handle) + glog.V(4).Infof("%v file open handle id = %d", file.fullpath(), handle.handle) return handle, nil @@ -99,7 +101,7 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - glog.V(3).Infof("%v file setattr %+v, old:%+v", file.fullpath(), req, file.entry.Attributes) + glog.V(5).Infof("%v file setattr %+v", file.fullpath(), req) if err := file.maybeLoadEntry(ctx); err != nil { return err @@ -107,49 +109,72 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f if req.Valid.Size() { - glog.V(3).Infof("%v file setattr set size=%v", file.fullpath(), req.Size) + glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(file.entry.Chunks)) if req.Size < filer2.TotalSize(file.entry.Chunks) { // fmt.Printf("truncate %v \n", fullPath) var chunks []*filer_pb.FileChunk + var truncatedChunks []*filer_pb.FileChunk for _, chunk := range file.entry.Chunks { int64Size := int64(chunk.Size) if chunk.Offset+int64Size > int64(req.Size) { + // this chunk is truncated int64Size = int64(req.Size) - chunk.Offset - } - if int64Size > 0 { - chunks = append(chunks, chunk) + if int64Size > 0 { + chunks = append(chunks, chunk) + glog.V(4).Infof("truncated chunk %+v from %d to %d\n", chunk.GetFileIdString(), chunk.Size, int64Size) + chunk.Size = uint64(int64Size) + } else { + glog.V(4).Infof("truncated whole chunk %+v\n", chunk.GetFileIdString()) + truncatedChunks = append(truncatedChunks, chunk) + } } } + file.wfs.deleteFileChunks(truncatedChunks) file.entry.Chunks = chunks file.entryViewCache = nil file.reader = nil } file.entry.Attributes.FileSize = req.Size + file.dirtyMetadata = true } + if req.Valid.Mode() { file.entry.Attributes.FileMode = uint32(req.Mode) + file.dirtyMetadata = true } if req.Valid.Uid() { file.entry.Attributes.Uid = req.Uid + file.dirtyMetadata = true } if req.Valid.Gid() { file.entry.Attributes.Gid = req.Gid + file.dirtyMetadata = true } if req.Valid.Crtime() { file.entry.Attributes.Crtime = req.Crtime.Unix() + file.dirtyMetadata = true } if req.Valid.Mtime() { file.entry.Attributes.Mtime = req.Mtime.Unix() + file.dirtyMetadata = true + } + + if req.Valid.Handle() { + // fmt.Printf("file handle => %d\n", req.Handle) } if file.isOpen > 0 { return nil } + if !file.dirtyMetadata { + return nil + } + return file.saveEntry() } @@ -205,14 +230,14 @@ func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, res func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { // fsync works at OS level // write the file chunks to the filerGrpcAddress - glog.V(3).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req) + glog.V(4).Infof("%s/%s fsync file %+v", file.dir.FullPath(), file.Name, req) return nil } func (file *File) Forget() { t := util.NewFullPath(file.dir.FullPath(), file.Name) - glog.V(3).Infof("Forget file %s", t) + glog.V(5).Infof("Forget file %s", t) file.wfs.fsNodeCache.DeleteFsNode(t) } @@ -246,7 +271,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { file.reader = nil - glog.V(3).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks)) + glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks)) file.entry.Chunks = append(file.entry.Chunks, chunks...) } @@ -265,10 +290,10 @@ func (file *File) saveEntry() error { Entry: file.entry, } - glog.V(1).Infof("save file entry: %v", request) + glog.V(4).Infof("save file entry: %v", request) _, err := client.UpdateEntry(context.Background(), request) if err != nil { - glog.V(0).Infof("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) + glog.Errorf("UpdateEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) return fuse.EIO } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index b9d224fb2..94590f842 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -19,10 +19,9 @@ import ( type FileHandle struct { // cache file has been written to - dirtyPages *ContinuousDirtyPages - contentType string - dirtyMetadata bool - handle uint64 + dirtyPages *ContinuousDirtyPages + contentType string + handle uint64 f *File RequestId fuse.RequestID // unique ID for request @@ -40,7 +39,7 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle { Gid: gid, } if fh.f.entry != nil { - fh.f.entry.Attributes.FileSize = filer2.TotalSize(fh.f.entry.Chunks) + fh.f.entry.Attributes.FileSize = filer2.FileSize(fh.f.entry) } return fh } @@ -55,38 +54,45 @@ var _ = fs.HandleReleaser(&FileHandle{}) func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size)) + glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data)) - buff := make([]byte, req.Size) + buff := resp.Data[:cap(resp.Data)] + if req.Size > cap(resp.Data) { + // should not happen + buff = make([]byte, req.Size) + } totalRead, err := fh.readFromChunks(buff, req.Offset) if err == nil { - dirtyOffset, dirtySize := fh.readFromDirtyPages(buff, req.Offset) - if totalRead+req.Offset < dirtyOffset+int64(dirtySize) { - totalRead = dirtyOffset + int64(dirtySize) - req.Offset - } + maxStop := fh.readFromDirtyPages(buff, req.Offset) + totalRead = max(maxStop - req.Offset, totalRead) } - resp.Data = buff[:totalRead] - if err != nil { glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) return fuse.EIO } + if totalRead > int64(len(buff)) { + glog.Warningf("%s FileHandle Read %d: [%d,%d) size %d totalRead %d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, totalRead) + totalRead = min(int64(len(buff)), totalRead) + } + resp.Data = buff[:totalRead] + return err } -func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (offset int64, size int) { - return fh.dirtyPages.ReadDirtyData(buff, startOffset) +func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) { + return fh.dirtyPages.ReadDirtyDataAt(buff, startOffset) } func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { - // this value should come from the filer instead of the old f - if len(fh.f.entry.Chunks) == 0 { + fileSize := int64(filer2.FileSize(fh.f.entry)) + + if fileSize == 0 { glog.V(1).Infof("empty fh %v", fh.f.fullpath()) - return 0, nil + return 0, io.EOF } var chunkResolveErr error @@ -99,8 +105,8 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { } if fh.f.reader == nil { - chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt32) - fh.f.reader = filer2.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache) + chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64) + fh.f.reader = filer2.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache, fileSize) } totalRead, err := fh.f.reader.ReadAt(buff, offset) @@ -113,7 +119,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) } - // glog.V(0).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err) + glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err) return int64(totalRead), err } @@ -126,7 +132,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f copy(data, req.Data) fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize))) - // glog.V(0).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data))) + glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) chunks, err := fh.dirtyPages.AddPage(req.Offset, data) if err != nil { @@ -139,14 +145,14 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f if req.Offset == 0 { // detect mime type fh.contentType = http.DetectContentType(data) - fh.dirtyMetadata = true + fh.f.dirtyMetadata = true } if len(chunks) > 0 { fh.f.addChunks(chunks) - fh.dirtyMetadata = true + fh.f.dirtyMetadata = true } return nil @@ -154,24 +160,28 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error { - glog.V(4).Infof("%v release fh %d", fh.f.fullpath(), fh.handle) + glog.V(4).Infof("Release %v fh %d", fh.f.fullpath(), fh.handle) fh.f.isOpen-- if fh.f.isOpen <= 0 { - fh.dirtyPages.releaseResource() + fh.doFlush(ctx, req.Header) fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) + fh.f.entryViewCache = nil + fh.f.reader = nil } - fh.f.entryViewCache = nil - fh.f.reader = nil return nil } func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { + return fh.doFlush(ctx, req.Header) +} + +func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { // fflush works at fh level // send the data to the OS - glog.V(4).Infof("%s fh %d flush %v", fh.f.fullpath(), fh.handle, req) + glog.V(4).Infof("doFlush %s fh %d %v", fh.f.fullpath(), fh.handle, header) chunks, err := fh.dirtyPages.FlushToStorage() if err != nil { @@ -181,10 +191,10 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { if len(chunks) > 0 { fh.f.addChunks(chunks) - fh.dirtyMetadata = true + fh.f.dirtyMetadata = true } - if !fh.dirtyMetadata { + if !fh.f.dirtyMetadata { return nil } @@ -193,10 +203,10 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { if fh.f.entry.Attributes != nil { fh.f.entry.Attributes.Mime = fh.contentType if fh.f.entry.Attributes.Uid == 0 { - fh.f.entry.Attributes.Uid = req.Uid + fh.f.entry.Attributes.Uid = header.Uid } if fh.f.entry.Attributes.Gid == 0 { - fh.f.entry.Attributes.Gid = req.Gid + fh.f.entry.Attributes.Gid = header.Gid } if fh.f.entry.Attributes.Crtime == 0 { fh.f.entry.Attributes.Crtime = time.Now().Unix() @@ -212,9 +222,9 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { Entry: fh.f.entry, } - glog.V(3).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks)) + glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks)) for i, chunk := range fh.f.entry.Chunks { - glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) + glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) } chunks, garbages := filer2.CompactFileChunks(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks) @@ -239,14 +249,14 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { fh.f.wfs.deleteFileChunks(garbages) for i, chunk := range garbages { - glog.V(3).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) + glog.V(4).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) } return nil }) if err == nil { - fh.dirtyMetadata = false + fh.f.dirtyMetadata = false } if err != nil { diff --git a/weed/filesys/fscache.go b/weed/filesys/fscache.go index b146f0615..fdec8253c 100644 --- a/weed/filesys/fscache.go +++ b/weed/filesys/fscache.go @@ -3,8 +3,9 @@ package filesys import ( "sync" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse/fs" + + "github.com/chrislusf/seaweedfs/weed/util" ) type FsCache struct { @@ -118,7 +119,6 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { target = target.ensureChild(p) } parent := target.parent - src.name = target.name if dir, ok := src.node.(*Dir); ok { dir.name = target.name // target is not Dir, but a shortcut } @@ -132,6 +132,7 @@ func (c *FsCache) Move(oldPath util.FullPath, newPath util.FullPath) *FsNode { target.deleteSelf() + src.name = target.name src.connectToParent(parent) return src @@ -144,10 +145,14 @@ func (n *FsNode) connectToParent(parent *FsNode) { oldNode.deleteSelf() } if dir, ok := n.node.(*Dir); ok { - dir.parent = parent.node.(*Dir) + if parent.node != nil { + dir.parent = parent.node.(*Dir) + } } if f, ok := n.node.(*File); ok { - f.dir = parent.node.(*Dir) + if parent.node != nil { + f.dir = parent.node.(*Dir) + } } n.childrenLock.Lock() parent.children[n.name] = n diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go index 67f9aacc8..8bfae1472 100644 --- a/weed/filesys/fscache_test.go +++ b/weed/filesys/fscache_test.go @@ -94,3 +94,24 @@ func TestFsCacheMove(t *testing.T) { } } + + +func TestFsCacheMove2(t *testing.T) { + + cache := newFsCache(nil) + + cache.SetFsNode(util.FullPath("/a/b/d"), &File{Name: "dd"}) + cache.SetFsNode(util.FullPath("/a/b/e"), &File{Name: "ee"}) + + cache.Move(util.FullPath("/a/b/d"), util.FullPath("/a/b/e")) + + d := cache.GetFsNode(util.FullPath("/a/b/e")) + if d == nil { + t.Errorf("unexpected nil node!") + } + if d.(*File).Name != "e" { + t.Errorf("unexpected node!") + } + +} + diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index edf329143..15ec0903d 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -61,8 +61,13 @@ func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPat oldDir, _ := oldPath.DirAndName() if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) { if oldPath != "" { - if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil { - return err + if newEntry != nil && oldPath == newEntry.FullPath { + // skip the unnecessary deletion + // leave the update to the following InsertEntry operation + } else { + if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil { + return err + } } } } else { diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go index e119ebff5..cd98f4a7c 100644 --- a/weed/filesys/meta_cache/meta_cache_init.go +++ b/weed/filesys/meta_cache/meta_cache_init.go @@ -14,7 +14,7 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) { - glog.V(2).Infof("ReadDirAllEntries %s ...", path) + glog.V(5).Infof("ReadDirAllEntries %s ...", path) err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error { entry := filer2.FromPbEntry(string(dirPath), pbEntry) diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 22f0b655a..f147d7548 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -65,7 +65,7 @@ type WFS struct { root fs.Node fsNodeCache *FsCache - chunkCache *chunk_cache.ChunkCache + chunkCache *chunk_cache.TieredChunkCache metaCache *meta_cache.MetaCache } type statsCache struct { @@ -87,10 +87,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { cacheDir := path.Join(option.CacheDir, cacheUniqueId) if option.CacheSizeMB > 0 { os.MkdirAll(cacheDir, 0755) - wfs.chunkCache = chunk_cache.NewChunkCache(256, cacheDir, option.CacheSizeMB) - grace.OnInterrupt(func() { - wfs.chunkCache.Shutdown() - }) + wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB) } wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta")) @@ -113,7 +110,7 @@ func (wfs *WFS) Root() (fs.Node, error) { func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) { fullpath := file.fullpath() - glog.V(4).Infof("%s AcquireHandle uid=%d gid=%d", fullpath, uid, gid) + glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid) wfs.handlesLock.Lock() defer wfs.handlesLock.Unlock() @@ -127,7 +124,6 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand fileHandle = newFileHandle(file, uid, gid) wfs.handles[inodeId] = fileHandle fileHandle.handle = inodeId - glog.V(4).Infof("%s new fh %d", fullpath, fileHandle.handle) return } @@ -146,7 +142,7 @@ func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) { // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error { - glog.V(4).Infof("reading fs stats: %+v", req) + glog.V(5).Infof("reading fs stats: %+v", req) if wfs.stats.lastChecked < time.Now().Unix()-20 { @@ -158,13 +154,13 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec), } - glog.V(4).Infof("reading filer stats: %+v", request) + glog.V(5).Infof("reading filer stats: %+v", request) resp, err := client.Statistics(context.Background(), request) if err != nil { glog.V(0).Infof("reading filer stats %v: %v", request, err) return err } - glog.V(4).Infof("read filer stats: %+v", resp) + glog.V(5).Infof("read filer stats: %+v", resp) wfs.stats.TotalSize = resp.TotalSize wfs.stats.UsedSize = resp.UsedSize diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index bf21b1808..203ebdad1 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -38,7 +38,7 @@ func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.Se m := make(map[string]operation.LookupResult) - glog.V(4).Infof("remove file lookup volume id locations: %v", vids) + glog.V(5).Infof("deleteFileIds lookup volume id locations: %v", vids) resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: vids, }) |
