aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys/filehandle.go
diff options
context:
space:
mode:
authorshibinbin <shibinbin@megvii.com>2020-10-28 11:36:42 +0800
committershibinbin <shibinbin@megvii.com>2020-10-28 11:36:42 +0800
commit7cc07655d493d11c967cfa978ddc5181d4b6b861 (patch)
tree5ae5bcf7ccc3cee3c55372674753d7c1ca48dff9 /weed/filesys/filehandle.go
parent29a4c3944eeb07434060df52dfb1d3cf4c59dc91 (diff)
parent53c3aad87528d57343afc5fdb3fb5107544af0fc (diff)
downloadseaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.tar.xz
seaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.zip
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/filesys/filehandle.go')
-rw-r--r--weed/filesys/filehandle.go215
1 files changed, 133 insertions, 82 deletions
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index c6637259d..54bde3494 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -6,21 +6,24 @@ import (
"io"
"math"
"net/http"
+ "os"
+ "sync"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
type FileHandle struct {
// cache file has been written to
- dirtyPages *ContinuousDirtyPages
- contentType string
- dirtyMetadata bool
- handle uint64
+ dirtyPages *ContinuousDirtyPages
+ contentType string
+ handle uint64
+ sync.RWMutex
f *File
RequestId fuse.RequestID // unique ID for request
@@ -38,8 +41,9 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle {
Gid: gid,
}
if fh.f.entry != nil {
- fh.f.entry.Attributes.FileSize = filer2.TotalSize(fh.f.entry.Chunks)
+ fh.f.entry.Attributes.FileSize = filer.FileSize(fh.f.entry)
}
+
return fh
}
@@ -53,61 +57,80 @@ var _ = fs.HandleReleaser(&FileHandle{})
func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
- glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size))
+ glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data))
+ fh.RLock()
+ defer fh.RUnlock()
+
+ if req.Size <= 0 {
+ return nil
+ }
- buff := make([]byte, req.Size)
+ buff := resp.Data[:cap(resp.Data)]
+ if req.Size > cap(resp.Data) {
+ // should not happen
+ buff = make([]byte, req.Size)
+ }
totalRead, err := fh.readFromChunks(buff, req.Offset)
if err == nil {
- dirtyOffset, dirtySize := fh.readFromDirtyPages(buff, req.Offset)
- if totalRead+req.Offset < dirtyOffset+int64(dirtySize) {
- totalRead = dirtyOffset + int64(dirtySize) - req.Offset
- }
+ maxStop := fh.readFromDirtyPages(buff, req.Offset)
+ totalRead = max(maxStop-req.Offset, totalRead)
}
- resp.Data = buff[:totalRead]
+ if err == io.EOF {
+ err = nil
+ }
if err != nil {
- glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
+ glog.Warningf("file handle read %s %d: %v", fh.f.fullpath(), totalRead, err)
return fuse.EIO
}
+ if totalRead > int64(len(buff)) {
+ glog.Warningf("%s FileHandle Read %d: [%d,%d) size %d totalRead %d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, totalRead)
+ totalRead = min(int64(len(buff)), totalRead)
+ }
+ // resp.Data = buff[:totalRead]
+ resp.Data = buff
+
return err
}
-func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (offset int64, size int) {
- return fh.dirtyPages.ReadDirtyData(buff, startOffset)
+func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) {
+ maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset)
+ return
}
func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
- // this value should come from the filer instead of the old f
- if len(fh.f.entry.Chunks) == 0 {
+ fileSize := int64(filer.FileSize(fh.f.entry))
+
+ if fileSize == 0 {
glog.V(1).Infof("empty fh %v", fh.f.fullpath())
- return 0, nil
+ return 0, io.EOF
}
+ var chunkResolveErr error
if fh.f.entryViewCache == nil {
- fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks)
+ fh.f.entryViewCache, chunkResolveErr = filer.NonOverlappingVisibleIntervals(filer.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
+ if chunkResolveErr != nil {
+ return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
+ }
fh.f.reader = nil
}
if fh.f.reader == nil {
- chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt32)
- fh.f.reader = filer2.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache)
+ chunkViews := filer.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt64)
+ fh.f.reader = filer.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache, fileSize)
}
totalRead, err := fh.f.reader.ReadAt(buff, offset)
- if err == io.EOF {
- err = nil
- }
-
- if err != nil {
+ if err != nil && err != io.EOF {
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
}
- // glog.V(0).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err)
+ glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err)
return int64(totalRead), err
}
@@ -115,119 +138,147 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
// Write to the file handle
func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
+ fh.Lock()
+ defer fh.Unlock()
+
// write the request to volume servers
- data := make([]byte, len(req.Data))
- copy(data, req.Data)
+ data := req.Data
+ if len(data) <= 512 {
+ // fuse message cacheable size
+ data = make([]byte, len(req.Data))
+ copy(data, req.Data)
+ }
fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize)))
- // glog.V(0).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)))
+ glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
- chunks, err := fh.dirtyPages.AddPage(req.Offset, data)
- if err != nil {
- glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(data)), err)
- return fuse.EIO
- }
+ fh.dirtyPages.AddPage(req.Offset, data)
resp.Size = len(data)
if req.Offset == 0 {
// detect mime type
fh.contentType = http.DetectContentType(data)
- fh.dirtyMetadata = true
+ fh.f.dirtyMetadata = true
}
- if len(chunks) > 0 {
-
- fh.f.addChunks(chunks)
-
- fh.dirtyMetadata = true
- }
+ fh.f.dirtyMetadata = true
return nil
}
func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
- glog.V(4).Infof("%v release fh %d", fh.f.fullpath(), fh.handle)
+ glog.V(4).Infof("Release %v fh %d", fh.f.fullpath(), fh.handle)
+
+ fh.Lock()
+ defer fh.Unlock()
fh.f.isOpen--
- if fh.f.isOpen <= 0 {
- fh.dirtyPages.releaseResource()
+ if fh.f.isOpen < 0 {
+ glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0)
+ fh.f.isOpen = 0
+ return nil
+ }
+
+ if fh.f.isOpen == 0 {
+ if err := fh.doFlush(ctx, req.Header); err != nil {
+ glog.Errorf("Release doFlush %s: %v", fh.f.Name, err)
+ }
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
}
- fh.f.entryViewCache = nil
- fh.f.reader = nil
+
+ // stop the goroutine
+ if !fh.dirtyPages.chunkSaveErrChanClosed {
+ fh.dirtyPages.chunkSaveErrChanClosed = true
+ close(fh.dirtyPages.chunkSaveErrChan)
+ }
return nil
}
func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
- // fflush works at fh level
+
+ fh.Lock()
+ defer fh.Unlock()
+
+ return fh.doFlush(ctx, req.Header)
+}
+
+func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
+ // flush works at fh level
// send the data to the OS
- glog.V(4).Infof("%s fh %d flush %v", fh.f.fullpath(), fh.handle, req)
+ glog.V(4).Infof("doFlush %s fh %d", fh.f.fullpath(), fh.handle)
- chunks, err := fh.dirtyPages.FlushToStorage()
- if err != nil {
- glog.Errorf("flush %s: %v", fh.f.fullpath(), err)
- return fuse.EIO
- }
+ fh.dirtyPages.saveExistingPagesToStorage()
+
+ fh.dirtyPages.writeWaitGroup.Wait()
- if len(chunks) > 0 {
- fh.f.addChunks(chunks)
- fh.dirtyMetadata = true
+ if fh.dirtyPages.lastErr != nil {
+ return fh.dirtyPages.lastErr
}
- if !fh.dirtyMetadata {
+ if !fh.f.dirtyMetadata {
return nil
}
- err = fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err := fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
if fh.f.entry.Attributes != nil {
fh.f.entry.Attributes.Mime = fh.contentType
- fh.f.entry.Attributes.Uid = req.Uid
- fh.f.entry.Attributes.Gid = req.Gid
+ if fh.f.entry.Attributes.Uid == 0 {
+ fh.f.entry.Attributes.Uid = header.Uid
+ }
+ if fh.f.entry.Attributes.Gid == 0 {
+ fh.f.entry.Attributes.Gid = header.Gid
+ }
+ if fh.f.entry.Attributes.Crtime == 0 {
+ fh.f.entry.Attributes.Crtime = time.Now().Unix()
+ }
fh.f.entry.Attributes.Mtime = time.Now().Unix()
- fh.f.entry.Attributes.Crtime = time.Now().Unix()
- fh.f.entry.Attributes.FileMode = uint32(0666 &^ fh.f.wfs.option.Umask)
+ fh.f.entry.Attributes.FileMode = uint32(os.FileMode(fh.f.entry.Attributes.FileMode) &^ fh.f.wfs.option.Umask)
fh.f.entry.Attributes.Collection = fh.dirtyPages.collection
fh.f.entry.Attributes.Replication = fh.dirtyPages.replication
}
request := &filer_pb.CreateEntryRequest{
- Directory: fh.f.dir.FullPath(),
- Entry: fh.f.entry,
+ Directory: fh.f.dir.FullPath(),
+ Entry: fh.f.entry,
+ Signatures: []int32{fh.f.wfs.signature},
}
- glog.V(3).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks))
+ glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks))
for i, chunk := range fh.f.entry.Chunks {
- glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
+ glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size))
}
- chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks)
- fh.f.entry.Chunks = chunks
- // fh.f.entryViewCache = nil
+ manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(fh.f.entry.Chunks)
+
+ chunks, _ := filer.CompactFileChunks(filer.LookupFn(fh.f.wfs), nonManifestChunks)
+ chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
+ if manifestErr != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
+ }
+ fh.f.entry.Chunks = append(chunks, manifestChunks...)
+ fh.f.entryViewCache = nil
+
+ fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry)
+ defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry)
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
}
- if fh.f.wfs.option.AsyncMetaDataCaching {
- fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry))
- }
-
- fh.f.wfs.deleteFileChunks(garbages)
- for i, chunk := range garbages {
- glog.V(3).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
- }
+ fh.f.wfs.metaCache.InsertEntry(context.Background(), filer.FromPbEntry(request.Directory, request.Entry))
return nil
})
if err == nil {
- fh.dirtyMetadata = false
+ fh.f.dirtyMetadata = false
}
if err != nil {