diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-03-22 01:00:36 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-03-22 01:00:36 -0700 |
| commit | 65d2ea9fb00757320f348835d9761a357264ea98 (patch) | |
| tree | 18e539c2949b4b270ca4cb02ef544a058c57f149 /weed/filer2/stream.go | |
| parent | 82bfad5b8615d9c2cd21efc059514b8899232a0f (diff) | |
| download | seaweedfs-65d2ea9fb00757320f348835d9761a357264ea98.tar.xz seaweedfs-65d2ea9fb00757320f348835d9761a357264ea98.zip | |
FUSE mount: stream read data with buffer
fix https://github.com/chrislusf/seaweedfs/issues/1244
Diffstat (limited to 'weed/filer2/stream.go')
| -rw-r--r-- | weed/filer2/stream.go | 59 |
1 files changed, 53 insertions, 6 deletions
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index 9c7a68b8e..0f7c3c176 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -2,8 +2,11 @@ package filer2 import ( "bytes" + "context" + "fmt" "io" "math" + "strings" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -51,18 +54,51 @@ type ChunkStreamReader struct { bufferOffset int64 bufferPos int chunkIndex int + lookupFileId func(fileId string) (targetUrl string, err error) } var _ = io.ReadSeeker(&ChunkStreamReader{}) -func NewChunkStreamReader(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { +func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) return &ChunkStreamReader{ - masterClient: masterClient, - chunkViews: chunkViews, - bufferOffset: -1, + chunkViews: chunkViews, + lookupFileId: func(fileId string) (targetUrl string, err error) { + return masterClient.LookupFileId(fileId) + }, + } +} + +func NewChunkStreamReaderFromClient(filerClient FilerClient, chunkViews []*ChunkView) *ChunkStreamReader { + + return &ChunkStreamReader{ + chunkViews: chunkViews, + lookupFileId: func(fileId string) (targetUrl string, err error) { + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + vid := fileIdToVolumeId(fileId) + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + + locations := resp.LocationsMap[vid] + if locations == nil || len(locations.Locations) == 0 { + glog.V(0).Infof("failed to locate %s", fileId) + return fmt.Errorf("failed to locate %s", fileId) + } + + volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) + + targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) + + return nil + }) + return + }, } } @@ -72,6 +108,7 @@ func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { return 0, io.EOF } chunkView := c.chunkViews[c.chunkIndex] + println("fetch1") c.fetchChunkToBuffer(chunkView) c.chunkIndex++ } @@ -105,7 +142,7 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { for i, chunk := range c.chunkViews { if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) { - if c.isBufferEmpty() || c.bufferOffset != offset { + if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset { c.fetchChunkToBuffer(chunk) c.chunkIndex = i + 1 break @@ -119,7 +156,7 @@ func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) { } func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { - urlString, err := c.masterClient.LookupFileId(chunkView.FileId) + urlString, err := c.lookupFileId(chunkView.FileId) if err != nil { glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return err @@ -136,5 +173,15 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { c.bufferPos = 0 c.bufferOffset = chunkView.LogicOffset + // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size)) + return nil } + +func fileIdToVolumeId(fileId string) (volumeId string) { + parts := strings.Split(fileId, ",") + if len(parts) != 2 { + return fileId + } + return parts[0] +} |
