diff options
Diffstat (limited to 'weed/filesys/filehandle.go')
| -rw-r--r-- | weed/filesys/filehandle.go | 99 |
1 files changed, 79 insertions, 20 deletions
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 55d574342..c71f1ee36 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -11,6 +11,9 @@ import ( "bytes" "github.com/chrislusf/seaweedfs/weed/operation" "time" + "strings" + "sync" + "github.com/chrislusf/seaweedfs/weed/util" ) type FileHandle struct { @@ -33,46 +36,94 @@ type FileHandle struct { } var _ = fs.Handle(&FileHandle{}) -var _ = fs.HandleReadAller(&FileHandle{}) -// var _ = fs.HandleReader(&FileHandle{}) +// var _ = fs.HandleReadAller(&FileHandle{}) +var _ = fs.HandleReader(&FileHandle{}) var _ = fs.HandleFlusher(&FileHandle{}) var _ = fs.HandleWriter(&FileHandle{}) var _ = fs.HandleReleaser(&FileHandle{}) -func (fh *FileHandle) ReadAll(ctx context.Context) (content []byte, err error) { +func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - glog.V(3).Infof("%v/%v read all fh ", fh.dirPath, fh.name) + glog.V(3).Infof("%v/%v read fh: [%d,%d)", fh.dirPath, fh.name, req.Offset, req.Offset+int64(req.Size)) if len(fh.Chunks) == 0 { glog.V(0).Infof("empty fh %v/%v", fh.dirPath, fh.name) - return + return fmt.Errorf("empty file %v/%v", fh.dirPath, fh.name) } - err = fh.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + buff := make([]byte, req.Size) - // FIXME: need to either use Read() or implement differently - chunks, _ := filer2.CompactFileChunks(fh.Chunks) - glog.V(1).Infof("read fh %v/%v %d/%d chunks", fh.dirPath, fh.name, len(chunks), len(fh.Chunks)) - for i, chunk := range chunks { - glog.V(1).Infof("read fh %v/%v %d/%d chunk %s [%d,%d)", fh.dirPath, fh.name, i, len(chunks), chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) - } - request := &filer_pb.GetFileContentRequest{ - FileId: chunks[0].FileId, - } + chunkViews := filer2.ReadFromChunks(fh.Chunks, req.Offset, req.Size) + + var vids []string + for _, chunkView := range chunkViews { + vids = append(vids, volumeId(chunkView.FileId)) + } + + vid2Locations := make(map[string]*filer_pb.Locations) + + err := fh.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - glog.V(1).Infof("read fh content %d chunk %s [%d,%d): %v", len(chunks), - chunks[0].FileId, chunks[0].Offset, chunks[0].Offset+int64(chunks[0].Size), request) - resp, err := client.GetFileContent(ctx, request) + glog.V(4).Infof("read fh lookup volume id locations: %v", vids) + resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ + VolumeIds: vids, + }) if err != nil { return err } - content = resp.Content + vid2Locations = resp.LocationsMap return nil }) - return content, err + if err != nil { + glog.V(3).Infof("%v/%v read fh lookup volume ids: %v", fh.dirPath, fh.name, err) + return fmt.Errorf("failed to lookup volume ids %v: %v", vids, err) + } + + var totalRead int64 + var wg sync.WaitGroup + for _, chunkView := range chunkViews { + wg.Add(1) + go func(chunkView *filer2.ChunkView) { + defer wg.Done() + + glog.V(3).Infof("read fh reading chunk: %+v", chunkView) + + locations := vid2Locations[volumeId(chunkView.FileId)] + if locations == nil || len(locations.Locations) == 0 { + glog.V(0).Infof("failed to locate %s", chunkView.FileId) + err = fmt.Errorf("failed to locate %s", chunkView.FileId) + return + } + + var n int64 + n, err = util.ReadUrl( + fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), + chunkView.Offset, + int(chunkView.Size), + buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)]) + + if err != nil { + + glog.V(0).Infof("%v/%v read http://%s/%v %v bytes: %v", fh.dirPath, fh.name, locations.Locations[0].Url, chunkView.FileId, n, err) + + err = fmt.Errorf("failed to read http://%s/%s: %v", + locations.Locations[0].Url, chunkView.FileId, err) + return + } + + glog.V(3).Infof("read fh read %d bytes: %+v", n, chunkView) + totalRead += n + + }(chunkView) + } + wg.Wait() + + resp.Data = buff[:totalRead] + + return err } // Write to the file handle @@ -179,3 +230,11 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { return err } + +func volumeId(fileId string) string { + lastCommaIndex := strings.LastIndex(fileId, ",") + if lastCommaIndex > 0 { + return fileId[:lastCommaIndex] + } + return fileId +} |
