diff options
Diffstat (limited to 'weed/filer2')
| -rw-r--r-- | weed/filer2/filer_notify.go | 4 | ||||
| -rw-r--r-- | weed/filer2/reader_at.go | 54 | ||||
| -rw-r--r-- | weed/filer2/stream.go | 12 |
3 files changed, 43 insertions, 27 deletions
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index 6b83d8e63..28ade51cc 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -109,7 +109,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func( } // println("processing", hourMinuteEntry.FullPath) chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.Chunks) - if err := readEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { + if err := ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { chunkedFileReader.Close() if err == io.EOF { break @@ -123,7 +123,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func( return nil } -func readEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) error { +func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) error { for { n, err := r.Read(sizeBuf) if err != nil { diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go index f56ef6388..a9772da5b 100644 --- a/weed/filer2/reader_at.go +++ b/weed/filer2/reader_at.go @@ -27,34 +27,40 @@ type ChunkReadAt struct { // var _ = io.ReaderAt(&ChunkReadAt{}) +type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error) + +func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType { + return func(fileId string) (targetUrl string, err error) { + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + vid := VolumeId(fileId) + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + + locations := resp.LocationsMap[vid] + if locations == nil || len(locations.Locations) == 0 { + glog.V(0).Infof("failed to locate %s", fileId) + return fmt.Errorf("failed to locate %s", fileId) + } + + volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) + + targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) + + return nil + }) + return + } +} + func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt { return &ChunkReadAt{ chunkViews: chunkViews, - lookupFileId: func(fileId string) (targetUrl string, err error) { - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - vid := VolumeId(fileId) - resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ - VolumeIds: []string{vid}, - }) - if err != nil { - return err - } - - locations := resp.LocationsMap[vid] - if locations == nil || len(locations.Locations) == 0 { - glog.V(0).Infof("failed to locate %s", fileId) - return fmt.Errorf("failed to locate %s", fileId) - } - - volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) - - targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) - - return nil - }) - return - }, + lookupFileId: LookupFn(filerClient), bufferOffset: -1, chunkCache: chunkCache, } diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go index 4e785fade..4c8213b07 100644 --- a/weed/filer2/stream.go +++ b/weed/filer2/stream.go @@ -81,7 +81,7 @@ type ChunkStreamReader struct { bufferOffset int64 bufferPos int chunkIndex int - lookupFileId func(fileId string) (targetUrl string, err error) + lookupFileId LookupFileIdFunctionType } var _ = io.ReadSeeker(&ChunkStreamReader{}) @@ -98,6 +98,16 @@ func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks [ } } +func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader { + + chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32) + + return &ChunkStreamReader{ + chunkViews: chunkViews, + lookupFileId: LookupFn(filerClient), + } +} + func (c *ChunkStreamReader) Read(p []byte) (n int, err error) { for n < len(p) { if c.isBufferEmpty() { |
