aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys/filehandle.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-05-24 01:22:37 -0700
committerChris Lu <chris.lu@gmail.com>2018-05-24 01:22:37 -0700
commitd773e11c7a65903b3ee1adea801a20f91cb0c7aa (patch)
tree9e40f834e929d826c9ce5dacd9fa57ca0de57bc6 /weed/filesys/filehandle.go
parent00d0274fd7c829f5d26c051f5832e0f602929b08 (diff)
downloadseaweedfs-d773e11c7a65903b3ee1adea801a20f91cb0c7aa.tar.xz
seaweedfs-d773e11c7a65903b3ee1adea801a20f91cb0c7aa.zip
file handler directly read from volume servers
this mostly works fine now! next: need to cache files to local disk
Diffstat (limited to 'weed/filesys/filehandle.go')
-rw-r--r--weed/filesys/filehandle.go99
1 files changed, 79 insertions, 20 deletions
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 55d574342..c71f1ee36 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -11,6 +11,9 @@ import (
"bytes"
"github.com/chrislusf/seaweedfs/weed/operation"
"time"
+ "strings"
+ "sync"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type FileHandle struct {
@@ -33,46 +36,94 @@ type FileHandle struct {
}
var _ = fs.Handle(&FileHandle{})
-var _ = fs.HandleReadAller(&FileHandle{})
-// var _ = fs.HandleReader(&FileHandle{})
+// var _ = fs.HandleReadAller(&FileHandle{})
+var _ = fs.HandleReader(&FileHandle{})
var _ = fs.HandleFlusher(&FileHandle{})
var _ = fs.HandleWriter(&FileHandle{})
var _ = fs.HandleReleaser(&FileHandle{})
-func (fh *FileHandle) ReadAll(ctx context.Context) (content []byte, err error) {
+func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
- glog.V(3).Infof("%v/%v read all fh ", fh.dirPath, fh.name)
+ glog.V(3).Infof("%v/%v read fh: [%d,%d)", fh.dirPath, fh.name, req.Offset, req.Offset+int64(req.Size))
if len(fh.Chunks) == 0 {
glog.V(0).Infof("empty fh %v/%v", fh.dirPath, fh.name)
- return
+ return fmt.Errorf("empty file %v/%v", fh.dirPath, fh.name)
}
- err = fh.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ buff := make([]byte, req.Size)
- // FIXME: need to either use Read() or implement differently
- chunks, _ := filer2.CompactFileChunks(fh.Chunks)
- glog.V(1).Infof("read fh %v/%v %d/%d chunks", fh.dirPath, fh.name, len(chunks), len(fh.Chunks))
- for i, chunk := range chunks {
- glog.V(1).Infof("read fh %v/%v %d/%d chunk %s [%d,%d)", fh.dirPath, fh.name, i, len(chunks), chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
- }
- request := &filer_pb.GetFileContentRequest{
- FileId: chunks[0].FileId,
- }
+ chunkViews := filer2.ReadFromChunks(fh.Chunks, req.Offset, req.Size)
+
+ var vids []string
+ for _, chunkView := range chunkViews {
+ vids = append(vids, volumeId(chunkView.FileId))
+ }
+
+ vid2Locations := make(map[string]*filer_pb.Locations)
+
+ err := fh.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- glog.V(1).Infof("read fh content %d chunk %s [%d,%d): %v", len(chunks),
- chunks[0].FileId, chunks[0].Offset, chunks[0].Offset+int64(chunks[0].Size), request)
- resp, err := client.GetFileContent(ctx, request)
+ glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
+ resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
+ VolumeIds: vids,
+ })
if err != nil {
return err
}
- content = resp.Content
+ vid2Locations = resp.LocationsMap
return nil
})
- return content, err
+ if err != nil {
+ glog.V(3).Infof("%v/%v read fh lookup volume ids: %v", fh.dirPath, fh.name, err)
+ return fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
+ }
+
+ var totalRead int64
+ var wg sync.WaitGroup
+ for _, chunkView := range chunkViews {
+ wg.Add(1)
+ go func(chunkView *filer2.ChunkView) {
+ defer wg.Done()
+
+ glog.V(3).Infof("read fh reading chunk: %+v", chunkView)
+
+ locations := vid2Locations[volumeId(chunkView.FileId)]
+ if locations == nil || len(locations.Locations) == 0 {
+ glog.V(0).Infof("failed to locate %s", chunkView.FileId)
+ err = fmt.Errorf("failed to locate %s", chunkView.FileId)
+ return
+ }
+
+ var n int64
+ n, err = util.ReadUrl(
+ fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId),
+ chunkView.Offset,
+ int(chunkView.Size),
+ buff[chunkView.LogicOffset-req.Offset:chunkView.LogicOffset-req.Offset+int64(chunkView.Size)])
+
+ if err != nil {
+
+ glog.V(0).Infof("%v/%v read http://%s/%v %v bytes: %v", fh.dirPath, fh.name, locations.Locations[0].Url, chunkView.FileId, n, err)
+
+ err = fmt.Errorf("failed to read http://%s/%s: %v",
+ locations.Locations[0].Url, chunkView.FileId, err)
+ return
+ }
+
+ glog.V(3).Infof("read fh read %d bytes: %+v", n, chunkView)
+ totalRead += n
+
+ }(chunkView)
+ }
+ wg.Wait()
+
+ resp.Data = buff[:totalRead]
+
+ return err
}
// Write to the file handle
@@ -179,3 +230,11 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
return err
}
+
+func volumeId(fileId string) string {
+ lastCommaIndex := strings.LastIndex(fileId, ",")
+ if lastCommaIndex > 0 {
+ return fileId[:lastCommaIndex]
+ }
+ return fileId
+}