diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
| commit | 40334bc28d3fa694ce59b4e65077efb845264d20 (patch) | |
| tree | a085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/filesys/filehandle.go | |
| parent | d892cad15d748327c2b7c649f6398ff35d8dce0b (diff) | |
| parent | fbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff) | |
| download | seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/filesys/filehandle.go')
| -rw-r--r-- | weed/filesys/filehandle.go | 78 |
1 files changed, 46 insertions, 32 deletions
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index cf253a7ed..372d742ea 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -3,12 +3,10 @@ package filesys import ( "context" "fmt" - "mime" - "path" + "math" + "net/http" "time" - "github.com/gabriel-vasile/mimetype" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -28,15 +26,20 @@ type FileHandle struct { NodeId fuse.NodeID // file or directory the request is about Uid uint32 // user ID of process making request Gid uint32 // group ID of process making request + } func newFileHandle(file *File, uid, gid uint32) *FileHandle { - return &FileHandle{ + fh := &FileHandle{ f: file, dirtyPages: newDirtyPages(file), Uid: uid, Gid: gid, } + if fh.f.entry != nil { + fh.f.entry.Attributes.FileSize = filer2.TotalSize(fh.f.entry.Chunks) + } + return fh } var _ = fs.Handle(&FileHandle{}) @@ -53,9 +56,9 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus buff := make([]byte, req.Size) - totalRead, err := fh.readFromChunks(ctx, buff, req.Offset) + totalRead, err := fh.readFromChunks(buff, req.Offset) if err == nil { - dirtyOffset, dirtySize := fh.readFromDirtyPages(ctx, buff, req.Offset) + dirtyOffset, dirtySize := fh.readFromDirtyPages(buff, req.Offset) if totalRead+req.Offset < dirtyOffset+int64(dirtySize) { totalRead = dirtyOffset + int64(dirtySize) - req.Offset } @@ -71,11 +74,11 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus return err } -func (fh *FileHandle) readFromDirtyPages(ctx context.Context, buff []byte, startOffset int64) (offset int64, size int) { - return fh.dirtyPages.ReadDirtyData(ctx, buff, startOffset) +func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (offset int64, size int) { + return fh.dirtyPages.ReadDirtyData(buff, startOffset) } -func (fh *FileHandle) readFromChunks(ctx context.Context, buff []byte, offset int64) (int64, error) { +func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { // this value should come from the filer instead of the old f if len(fh.f.entry.Chunks) == 0 { @@ -85,43 +88,46 @@ func (fh *FileHandle) readFromChunks(ctx context.Context, buff []byte, offset in if fh.f.entryViewCache == nil { fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks) + fh.f.reader = nil } - chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, offset, len(buff)) + if fh.f.reader == nil { + chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, 0, math.MaxInt32) + fh.f.reader = filer2.NewChunkReaderAtFromClient(fh.f.wfs, chunkViews, fh.f.wfs.chunkCache) + } - totalRead, err := filer2.ReadIntoBuffer(ctx, fh.f.wfs, fh.f.fullpath(), buff, chunkViews, offset) + totalRead, err := fh.f.reader.ReadAt(buff, offset) if err != nil { glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) } - return totalRead, err + // glog.V(0).Infof("file handle read %s [%d,%d] %d : %v", fh.f.fullpath(), offset, offset+int64(totalRead), totalRead, err) + + return int64(totalRead), err } // Write to the file handle func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { // write the request to volume servers + data := make([]byte, len(req.Data)) + copy(data, req.Data) - fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(req.Data)), int64(fh.f.entry.Attributes.FileSize))) + fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize))) // glog.V(0).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data))) - chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data) + chunks, err := fh.dirtyPages.AddPage(req.Offset, data) if err != nil { - glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err) + glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(data)), err) return fuse.EIO } - resp.Size = len(req.Data) + resp.Size = len(data) if req.Offset == 0 { // detect mime type - detectedMIME := mimetype.Detect(req.Data) - fh.contentType = detectedMIME.String() - if ext := path.Ext(fh.f.Name); ext != detectedMIME.Extension() { - fh.contentType = mime.TypeByExtension(ext) - } - + fh.contentType = http.DetectContentType(data) fh.dirtyMetadata = true } @@ -145,6 +151,8 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err fh.dirtyPages.releaseResource() fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) } + fh.f.entryViewCache = nil + fh.f.reader = nil return nil } @@ -154,14 +162,14 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { // send the data to the OS glog.V(4).Infof("%s fh %d flush %v", fh.f.fullpath(), fh.handle, req) - chunks, err := fh.dirtyPages.FlushToStorage(ctx) + chunks, err := fh.dirtyPages.FlushToStorage() if err != nil { glog.Errorf("flush %s: %v", fh.f.fullpath(), err) return fuse.EIO } - fh.f.addChunks(chunks) if len(chunks) > 0 { + fh.f.addChunks(chunks) fh.dirtyMetadata = true } @@ -169,7 +177,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return nil } - err = fh.f.wfs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + err = fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { if fh.f.entry.Attributes != nil { fh.f.entry.Attributes.Mime = fh.contentType @@ -177,11 +185,13 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { fh.f.entry.Attributes.Gid = req.Gid fh.f.entry.Attributes.Mtime = time.Now().Unix() fh.f.entry.Attributes.Crtime = time.Now().Unix() - fh.f.entry.Attributes.FileMode = uint32(0777 &^ fh.f.wfs.option.Umask) + fh.f.entry.Attributes.FileMode = uint32(0666 &^ fh.f.wfs.option.Umask) + fh.f.entry.Attributes.Collection = fh.dirtyPages.collection + fh.f.entry.Attributes.Replication = fh.dirtyPages.replication } request := &filer_pb.CreateEntryRequest{ - Directory: fh.f.dir.Path, + Directory: fh.f.dir.FullPath(), Entry: fh.f.entry, } @@ -194,12 +204,16 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { fh.f.entry.Chunks = chunks // fh.f.entryViewCache = nil - if err := filer_pb.CreateEntry(ctx, client, request); err != nil { - glog.Errorf("update fh: %v", err) - return fmt.Errorf("update fh: %v", err) + if err := filer_pb.CreateEntry(client, request); err != nil { + glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) + return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) + } + + if fh.f.wfs.option.AsyncMetaDataCaching { + fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) } - fh.f.wfs.deleteFileChunks(ctx, garbages) + fh.f.wfs.deleteFileChunks(garbages) for i, chunk := range garbages { glog.V(3).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) } |
