diff options
| -rw-r--r-- | weed/filer/reader_at.go | 3 | ||||
| -rw-r--r-- | weed/filer/reader_at_test.go | 35 | ||||
| -rw-r--r-- | weed/filer/reader_pattern.go | 7 | ||||
| -rw-r--r-- | weed/filesys/page_writer/page_chunk_interval_list.go | 96 | ||||
| -rw-r--r-- | weed/filesys/page_writer/page_chunk_interval_list_test.go | 49 |
5 files changed, 173 insertions, 17 deletions
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 5f58b870c..35e0012c8 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -186,6 +186,9 @@ func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *Chunk if len(chunkSlice) > 0 { return chunkSlice, nil } + if c.lookupFileId == nil { + return nil, nil + } chunkData, err := c.readFromWholeChunkData(chunkView, nextChunkViews) if err != nil { return nil, err diff --git a/weed/filer/reader_at_test.go b/weed/filer/reader_at_test.go index f8e4727ce..411d7eb3b 100644 --- a/weed/filer/reader_at_test.go +++ b/weed/filer/reader_at_test.go @@ -22,7 +22,7 @@ func (m *mockChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) { } func (m *mockChunkCache) GetChunkSlice(fileId string, offset, length uint64) []byte { - return nil + return m.GetChunk(fileId, length) } func (m *mockChunkCache) SetChunk(fileId string, data []byte) { @@ -64,11 +64,12 @@ func TestReaderAt(t *testing.T) { } readerAt := &ChunkReadAt{ - chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - lookupFileId: nil, - readerLock: sync.Mutex{}, - fileSize: 10, - chunkCache: &mockChunkCache{}, + chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), + lookupFileId: nil, + readerLock: sync.Mutex{}, + fileSize: 10, + chunkCache: &mockChunkCache{}, + readerPattern: NewReaderPattern(), } testReadAt(t, readerAt, 0, 10, 10, io.EOF) @@ -114,11 +115,12 @@ func TestReaderAt0(t *testing.T) { } readerAt := &ChunkReadAt{ - chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - lookupFileId: nil, - readerLock: sync.Mutex{}, - fileSize: 10, - chunkCache: &mockChunkCache{}, + chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), + lookupFileId: nil, + readerLock: sync.Mutex{}, + fileSize: 10, + chunkCache: &mockChunkCache{}, + readerPattern: NewReaderPattern(), } testReadAt(t, readerAt, 0, 10, 10, io.EOF) @@ -142,11 +144,12 @@ func TestReaderAt1(t *testing.T) { } readerAt := &ChunkReadAt{ - chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), - lookupFileId: nil, - readerLock: sync.Mutex{}, - fileSize: 20, - chunkCache: &mockChunkCache{}, + chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64), + lookupFileId: nil, + readerLock: sync.Mutex{}, + fileSize: 20, + chunkCache: &mockChunkCache{}, + readerPattern: NewReaderPattern(), } testReadAt(t, readerAt, 0, 20, 20, io.EOF) diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go index 2bf18d141..5e6ea7348 100644 --- a/weed/filer/reader_pattern.go +++ b/weed/filer/reader_pattern.go @@ -11,7 +11,7 @@ type ReaderPattern struct { func NewReaderPattern() *ReaderPattern { return &ReaderPattern{ isStreaming: true, - lastReadOffset: 0, + lastReadOffset: -1, } } @@ -19,6 +19,11 @@ func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) { if rp.lastReadOffset > offset { rp.isStreaming = false } + if rp.lastReadOffset == -1 { + if offset != 0 { + rp.isStreaming = false + } + } rp.lastReadOffset = offset } diff --git a/weed/filesys/page_writer/page_chunk_interval_list.go b/weed/filesys/page_writer/page_chunk_interval_list.go new file mode 100644 index 000000000..e626b2a7f --- /dev/null +++ b/weed/filesys/page_writer/page_chunk_interval_list.go @@ -0,0 +1,96 @@ +package page_writer + +import "math" + +// PageChunkWrittenInterval mark one written interval within one page chunk +type PageChunkWrittenInterval struct { + startOffset int64 + stopOffset int64 + prev *PageChunkWrittenInterval + next *PageChunkWrittenInterval +} + +// PageChunkWrittenIntervalList mark written intervals within one page chunk +type PageChunkWrittenIntervalList struct { + head *PageChunkWrittenInterval + tail *PageChunkWrittenInterval +} + +func newPageChunkWrittenIntervalList() *PageChunkWrittenIntervalList { + list := &PageChunkWrittenIntervalList{ + head: &PageChunkWrittenInterval{ + startOffset: -1, + stopOffset: -1, + }, + tail: &PageChunkWrittenInterval{ + startOffset: math.MaxInt64, + stopOffset: math.MaxInt64, + }, + } + list.head.next = list.tail + list.tail.prev = list.head + return list +} + +func (list *PageChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) { + interval := &PageChunkWrittenInterval{ + startOffset: startOffset, + stopOffset: stopOffset, + } + list.addInterval(interval) +} +func (list *PageChunkWrittenIntervalList) addInterval(interval *PageChunkWrittenInterval) { + + p := list.head + for ; p.next != nil && p.next.startOffset <= interval.startOffset; p = p.next { + } + q := list.tail + for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev { + } + + if interval.startOffset <= p.stopOffset && q.startOffset <= interval.stopOffset { + // merge p and q together + p.stopOffset = q.stopOffset + unlinkNodesBetween(p, q.next) + return + } + if interval.startOffset <= p.stopOffset { + // merge new interval into p + p.stopOffset = interval.stopOffset + unlinkNodesBetween(p, q) + return + } + if q.startOffset <= interval.stopOffset { + // merge new interval into q + q.startOffset = interval.startOffset + unlinkNodesBetween(p, q) + return + } + + // add the new interval between p and q + unlinkNodesBetween(p, q) + p.next = interval + interval.prev = p + q.prev = interval + interval.next = q + +} + +// unlinkNodesBetween remove all nodes after start and before stop, exclusive +func unlinkNodesBetween(start *PageChunkWrittenInterval, stop *PageChunkWrittenInterval) { + if start.next == stop { + return + } + start.next.prev = nil + start.next = stop + stop.prev.next = nil + stop.prev = start +} + +func (list *PageChunkWrittenIntervalList) size() int { + var count int + for t := list.head; t != nil; t = t.next { + count++ + } + return count - 2 +} diff --git a/weed/filesys/page_writer/page_chunk_interval_list_test.go b/weed/filesys/page_writer/page_chunk_interval_list_test.go new file mode 100644 index 000000000..f54b229d8 --- /dev/null +++ b/weed/filesys/page_writer/page_chunk_interval_list_test.go @@ -0,0 +1,49 @@ +package page_writer + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func Test_PageChunkWrittenIntervalList(t *testing.T) { + list := newPageChunkWrittenIntervalList() + + assert.Equal(t, 0, list.size(), "empty list") + + list.MarkWritten(0, 5) + assert.Equal(t, 1, list.size(), "one interval") + + list.MarkWritten(0, 5) + assert.Equal(t, 1, list.size(), "duplicated interval2") + + list.MarkWritten(95, 100) + assert.Equal(t, 2, list.size(), "two intervals") + + list.MarkWritten(50, 60) + assert.Equal(t, 3, list.size(), "three intervals") + + list.MarkWritten(50, 55) + assert.Equal(t, 3, list.size(), "three intervals merge") + + list.MarkWritten(40, 50) + assert.Equal(t, 3, list.size(), "three intervals grow forward") + + list.MarkWritten(50, 65) + assert.Equal(t, 3, list.size(), "three intervals grow backward") + + list.MarkWritten(70, 80) + assert.Equal(t, 4, list.size(), "four intervals") + + list.MarkWritten(60, 70) + assert.Equal(t, 3, list.size(), "three intervals merged") + + list.MarkWritten(59, 71) + assert.Equal(t, 3, list.size(), "covered three intervals") + + list.MarkWritten(5, 59) + assert.Equal(t, 2, list.size(), "covered two intervals") + + list.MarkWritten(70, 99) + assert.Equal(t, 1, list.size(), "covered one intervals") + +} |
