diff options
Diffstat (limited to 'weed/filesys/filehandle.go')
| -rw-r--r-- | weed/filesys/filehandle.go | 190 |
1 files changed, 94 insertions, 96 deletions
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index cc2c070eb..100c9eba0 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -1,17 +1,19 @@ package filesys import ( - "bazil.org/fuse" - "bazil.org/fuse/fs" "context" "fmt" + "mime" + "path" + "time" + + "github.com/gabriel-vasile/mimetype" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "net/http" - "strings" - "sync" + "github.com/seaweedfs/fuse" + "github.com/seaweedfs/fuse/fs" ) type FileHandle struct { @@ -49,85 +51,51 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size)) - // this value should come from the filer instead of the old f - if len(fh.f.entry.Chunks) == 0 { - glog.V(1).Infof("empty fh %v/%v", fh.f.dir.Path, fh.f.Name) - return nil - } - buff := make([]byte, req.Size) - chunkViews := filer2.ViewFromChunks(fh.f.entry.Chunks, req.Offset, req.Size) - - var vids []string - for _, chunkView := range chunkViews { - vids = append(vids, volumeId(chunkView.FileId)) - } - - vid2Locations := make(map[string]*filer_pb.Locations) - - err := fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - glog.V(4).Infof("read fh lookup volume id locations: %v", vids) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ - VolumeIds: vids, - }) - if err != nil { - return err + totalRead, err := fh.readFromChunks(buff, req.Offset) + if err == nil { + dirtyOffset, dirtySize := fh.readFromDirtyPages(buff, req.Offset) + if totalRead+req.Offset < dirtyOffset+int64(dirtySize) { + totalRead = dirtyOffset + int64(dirtySize) - req.Offset } + } - vid2Locations = resp.LocationsMap - - return nil - }) + resp.Data = buff[:totalRead] if err != nil { - glog.V(4).Infof("%v/%v read fh lookup volume ids: %v", fh.f.dir.Path, fh.f.Name, err) - return fmt.Errorf("failed to lookup volume ids %v: %v", vids, err) + glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) + return fuse.EIO } - var totalRead int64 - var wg sync.WaitGroup - for _, chunkView := range chunkViews { - wg.Add(1) - go func(chunkView *filer2.ChunkView) { - defer wg.Done() - - glog.V(4).Infof("read fh reading chunk: %+v", chunkView) + return err +} - locations := vid2Locations[volumeId(chunkView.FileId)] - if locations == nil || len(locations.Locations) == 0 { - glog.V(0).Infof("failed to locate %s", chunkView.FileId) - err = fmt.Errorf("failed to locate %s", chunkView.FileId) - return - } +func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (offset int64, size int) { + return fh.dirtyPages.ReadDirtyData(buff, startOffset) +} - var n int64 - n, err = util.ReadUrl( - fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), - chunkView.Offset, - int(chunkView.Size), - buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)]) +func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { - if err != nil { + // this value should come from the filer instead of the old f + if len(fh.f.entry.Chunks) == 0 { + glog.V(1).Infof("empty fh %v", fh.f.fullpath()) + return 0, nil + } - glog.V(0).Infof("%v/%v read http://%s/%v %v bytes: %v", fh.f.dir.Path, fh.f.Name, locations.Locations[0].Url, chunkView.FileId, n, err) + if fh.f.entryViewCache == nil { + fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks) + } - err = fmt.Errorf("failed to read http://%s/%s: %v", - locations.Locations[0].Url, chunkView.FileId, err) - return - } + chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, offset, len(buff)) - glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView) - totalRead += n + totalRead, err := filer2.ReadIntoBuffer(fh.f.wfs, fh.f.fullpath(), buff, chunkViews, offset) - }(chunkView) + if err != nil { + glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) } - wg.Wait() - resp.Data = buff[:totalRead] - - return err + return totalRead, err } // Write to the file handle @@ -135,24 +103,32 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f // write the request to volume servers - glog.V(4).Infof("%+v/%v write fh %d: [%d,%d)", fh.f.dir.Path, fh.f.Name, fh.handle, req.Offset, req.Offset+int64(len(req.Data))) + fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(req.Data)), int64(fh.f.entry.Attributes.FileSize))) + // glog.V(0).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data))) - chunks, err := fh.dirtyPages.AddPage(ctx, req.Offset, req.Data) + chunks, err := fh.dirtyPages.AddPage(req.Offset, req.Data) if err != nil { - glog.Errorf("%+v/%v write fh %d: [%d,%d): %v", fh.f.dir.Path, fh.f.Name, fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err) - return fmt.Errorf("write %s/%s at [%d,%d): %v", fh.f.dir.Path, fh.f.Name, req.Offset, req.Offset+int64(len(req.Data)), err) + glog.Errorf("%v write fh %d: [%d,%d): %v", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(len(req.Data)), err) + return fuse.EIO } resp.Size = len(req.Data) if req.Offset == 0 { - fh.contentType = http.DetectContentType(req.Data) + // detect mime type + detectedMIME := mimetype.Detect(req.Data) + fh.contentType = detectedMIME.String() + if ext := path.Ext(fh.f.Name); ext != detectedMIME.Extension() { + fh.contentType = mime.TypeByExtension(ext) + } + fh.dirtyMetadata = true } - for _, chunk := range chunks { - fh.f.entry.Chunks = append(fh.f.entry.Chunks, chunk) - glog.V(1).Infof("uploaded %s/%s to %s [%d,%d)", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) + if len(chunks) > 0 { + + fh.f.addChunks(chunks) + fh.dirtyMetadata = true } @@ -163,39 +139,47 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err glog.V(4).Infof("%v release fh %d", fh.f.fullpath(), fh.handle) - fh.f.wfs.ReleaseHandle(fuse.HandleID(fh.handle)) + fh.f.isOpen-- - fh.f.isOpen = false + if fh.f.isOpen <= 0 { + fh.dirtyPages.releaseResource() + fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) + } return nil } -// Flush - experimenting with uploading at flush, this slows operations down till it has been -// completely flushed func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { // fflush works at fh level // send the data to the OS glog.V(4).Infof("%s fh %d flush %v", fh.f.fullpath(), fh.handle, req) - chunk, err := fh.dirtyPages.FlushToStorage(ctx) + chunks, err := fh.dirtyPages.FlushToStorage() if err != nil { - glog.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err) - return fmt.Errorf("flush %s/%s to %s [%d,%d): %v", fh.f.dir.Path, fh.f.Name, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size), err) + glog.Errorf("flush %s: %v", fh.f.fullpath(), err) + return fuse.EIO } - if chunk != nil { - fh.f.entry.Chunks = append(fh.f.entry.Chunks, chunk) + + fh.f.addChunks(chunks) + if len(chunks) > 0 { + fh.dirtyMetadata = true } if !fh.dirtyMetadata { return nil } - return fh.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fh.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { if fh.f.entry.Attributes != nil { fh.f.entry.Attributes.Mime = fh.contentType fh.f.entry.Attributes.Uid = req.Uid fh.f.entry.Attributes.Gid = req.Gid + fh.f.entry.Attributes.Mtime = time.Now().Unix() + fh.f.entry.Attributes.Crtime = time.Now().Unix() + fh.f.entry.Attributes.FileMode = uint32(0777 &^ fh.f.wfs.option.Umask) + fh.f.entry.Attributes.Collection = fh.dirtyPages.collection + fh.f.entry.Attributes.Replication = fh.dirtyPages.replication } request := &filer_pb.CreateEntryRequest{ @@ -203,22 +187,36 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { Entry: fh.f.entry, } - glog.V(1).Infof("%s/%s set chunks: %v", fh.f.dir.Path, fh.f.Name, len(fh.f.entry.Chunks)) + glog.V(3).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks)) for i, chunk := range fh.f.entry.Chunks { - glog.V(1).Infof("%s/%s chunks %d: %v [%d,%d)", fh.f.dir.Path, fh.f.Name, i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) + glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) + } + + chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks) + fh.f.entry.Chunks = chunks + // fh.f.entryViewCache = nil + + if err := filer_pb.CreateEntry(client, request); err != nil { + glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) + return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) } - if _, err := client.CreateEntry(ctx, request); err != nil { - return fmt.Errorf("update fh: %v", err) + + fh.f.wfs.deleteFileChunks(garbages) + for i, chunk := range garbages { + glog.V(3).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) } return nil }) -} -func volumeId(fileId string) string { - lastCommaIndex := strings.LastIndex(fileId, ",") - if lastCommaIndex > 0 { - return fileId[:lastCommaIndex] + if err == nil { + fh.dirtyMetadata = false } - return fileId + + if err != nil { + glog.Errorf("%v fh %d flush: %v", fh.f.fullpath(), fh.handle, err) + return fuse.EIO + } + + return nil } |
