aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/reader_at.go3
-rw-r--r--weed/filer/reader_at_test.go35
-rw-r--r--weed/filer/reader_pattern.go7
-rw-r--r--weed/filesys/page_writer/page_chunk_interval_list.go96
-rw-r--r--weed/filesys/page_writer/page_chunk_interval_list_test.go49
5 files changed, 173 insertions, 17 deletions
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 5f58b870c..35e0012c8 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -186,6 +186,9 @@ func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *Chunk
if len(chunkSlice) > 0 {
return chunkSlice, nil
}
+ if c.lookupFileId == nil {
+ return nil, nil
+ }
chunkData, err := c.readFromWholeChunkData(chunkView, nextChunkViews)
if err != nil {
return nil, err
diff --git a/weed/filer/reader_at_test.go b/weed/filer/reader_at_test.go
index f8e4727ce..411d7eb3b 100644
--- a/weed/filer/reader_at_test.go
+++ b/weed/filer/reader_at_test.go
@@ -22,7 +22,7 @@ func (m *mockChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) {
}
func (m *mockChunkCache) GetChunkSlice(fileId string, offset, length uint64) []byte {
- return nil
+ return m.GetChunk(fileId, length)
}
func (m *mockChunkCache) SetChunk(fileId string, data []byte) {
@@ -64,11 +64,12 @@ func TestReaderAt(t *testing.T) {
}
readerAt := &ChunkReadAt{
- chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
- readerLock: sync.Mutex{},
- fileSize: 10,
- chunkCache: &mockChunkCache{},
+ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
+ lookupFileId: nil,
+ readerLock: sync.Mutex{},
+ fileSize: 10,
+ chunkCache: &mockChunkCache{},
+ readerPattern: NewReaderPattern(),
}
testReadAt(t, readerAt, 0, 10, 10, io.EOF)
@@ -114,11 +115,12 @@ func TestReaderAt0(t *testing.T) {
}
readerAt := &ChunkReadAt{
- chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
- readerLock: sync.Mutex{},
- fileSize: 10,
- chunkCache: &mockChunkCache{},
+ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
+ lookupFileId: nil,
+ readerLock: sync.Mutex{},
+ fileSize: 10,
+ chunkCache: &mockChunkCache{},
+ readerPattern: NewReaderPattern(),
}
testReadAt(t, readerAt, 0, 10, 10, io.EOF)
@@ -142,11 +144,12 @@ func TestReaderAt1(t *testing.T) {
}
readerAt := &ChunkReadAt{
- chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
- lookupFileId: nil,
- readerLock: sync.Mutex{},
- fileSize: 20,
- chunkCache: &mockChunkCache{},
+ chunkViews: ViewFromVisibleIntervals(visibles, 0, math.MaxInt64),
+ lookupFileId: nil,
+ readerLock: sync.Mutex{},
+ fileSize: 20,
+ chunkCache: &mockChunkCache{},
+ readerPattern: NewReaderPattern(),
}
testReadAt(t, readerAt, 0, 20, 20, io.EOF)
diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go
index 2bf18d141..5e6ea7348 100644
--- a/weed/filer/reader_pattern.go
+++ b/weed/filer/reader_pattern.go
@@ -11,7 +11,7 @@ type ReaderPattern struct {
func NewReaderPattern() *ReaderPattern {
return &ReaderPattern{
isStreaming: true,
- lastReadOffset: 0,
+ lastReadOffset: -1,
}
}
@@ -19,6 +19,11 @@ func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) {
if rp.lastReadOffset > offset {
rp.isStreaming = false
}
+ if rp.lastReadOffset == -1 {
+ if offset != 0 {
+ rp.isStreaming = false
+ }
+ }
rp.lastReadOffset = offset
}
diff --git a/weed/filesys/page_writer/page_chunk_interval_list.go b/weed/filesys/page_writer/page_chunk_interval_list.go
new file mode 100644
index 000000000..e626b2a7f
--- /dev/null
+++ b/weed/filesys/page_writer/page_chunk_interval_list.go
@@ -0,0 +1,96 @@
+package page_writer
+
+import "math"
+
+// PageChunkWrittenInterval mark one written interval within one page chunk
+type PageChunkWrittenInterval struct {
+ startOffset int64
+ stopOffset int64
+ prev *PageChunkWrittenInterval
+ next *PageChunkWrittenInterval
+}
+
+// PageChunkWrittenIntervalList mark written intervals within one page chunk
+type PageChunkWrittenIntervalList struct {
+ head *PageChunkWrittenInterval
+ tail *PageChunkWrittenInterval
+}
+
+func newPageChunkWrittenIntervalList() *PageChunkWrittenIntervalList {
+ list := &PageChunkWrittenIntervalList{
+ head: &PageChunkWrittenInterval{
+ startOffset: -1,
+ stopOffset: -1,
+ },
+ tail: &PageChunkWrittenInterval{
+ startOffset: math.MaxInt64,
+ stopOffset: math.MaxInt64,
+ },
+ }
+ list.head.next = list.tail
+ list.tail.prev = list.head
+ return list
+}
+
+func (list *PageChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset int64) {
+ interval := &PageChunkWrittenInterval{
+ startOffset: startOffset,
+ stopOffset: stopOffset,
+ }
+ list.addInterval(interval)
+}
+func (list *PageChunkWrittenIntervalList) addInterval(interval *PageChunkWrittenInterval) {
+
+ p := list.head
+ for ; p.next != nil && p.next.startOffset <= interval.startOffset; p = p.next {
+ }
+ q := list.tail
+ for ; q.prev != nil && q.prev.stopOffset >= interval.stopOffset; q = q.prev {
+ }
+
+ if interval.startOffset <= p.stopOffset && q.startOffset <= interval.stopOffset {
+ // merge p and q together
+ p.stopOffset = q.stopOffset
+ unlinkNodesBetween(p, q.next)
+ return
+ }
+ if interval.startOffset <= p.stopOffset {
+ // merge new interval into p
+ p.stopOffset = interval.stopOffset
+ unlinkNodesBetween(p, q)
+ return
+ }
+ if q.startOffset <= interval.stopOffset {
+ // merge new interval into q
+ q.startOffset = interval.startOffset
+ unlinkNodesBetween(p, q)
+ return
+ }
+
+ // add the new interval between p and q
+ unlinkNodesBetween(p, q)
+ p.next = interval
+ interval.prev = p
+ q.prev = interval
+ interval.next = q
+
+}
+
+// unlinkNodesBetween remove all nodes after start and before stop, exclusive
+func unlinkNodesBetween(start *PageChunkWrittenInterval, stop *PageChunkWrittenInterval) {
+ if start.next == stop {
+ return
+ }
+ start.next.prev = nil
+ start.next = stop
+ stop.prev.next = nil
+ stop.prev = start
+}
+
+func (list *PageChunkWrittenIntervalList) size() int {
+ var count int
+ for t := list.head; t != nil; t = t.next {
+ count++
+ }
+ return count - 2
+}
diff --git a/weed/filesys/page_writer/page_chunk_interval_list_test.go b/weed/filesys/page_writer/page_chunk_interval_list_test.go
new file mode 100644
index 000000000..f54b229d8
--- /dev/null
+++ b/weed/filesys/page_writer/page_chunk_interval_list_test.go
@@ -0,0 +1,49 @@
+package page_writer
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_PageChunkWrittenIntervalList(t *testing.T) {
+ list := newPageChunkWrittenIntervalList()
+
+ assert.Equal(t, 0, list.size(), "empty list")
+
+ list.MarkWritten(0, 5)
+ assert.Equal(t, 1, list.size(), "one interval")
+
+ list.MarkWritten(0, 5)
+ assert.Equal(t, 1, list.size(), "duplicated interval2")
+
+ list.MarkWritten(95, 100)
+ assert.Equal(t, 2, list.size(), "two intervals")
+
+ list.MarkWritten(50, 60)
+ assert.Equal(t, 3, list.size(), "three intervals")
+
+ list.MarkWritten(50, 55)
+ assert.Equal(t, 3, list.size(), "three intervals merge")
+
+ list.MarkWritten(40, 50)
+ assert.Equal(t, 3, list.size(), "three intervals grow forward")
+
+ list.MarkWritten(50, 65)
+ assert.Equal(t, 3, list.size(), "three intervals grow backward")
+
+ list.MarkWritten(70, 80)
+ assert.Equal(t, 4, list.size(), "four intervals")
+
+ list.MarkWritten(60, 70)
+ assert.Equal(t, 3, list.size(), "three intervals merged")
+
+ list.MarkWritten(59, 71)
+ assert.Equal(t, 3, list.size(), "covered three intervals")
+
+ list.MarkWritten(5, 59)
+ assert.Equal(t, 2, list.size(), "covered two intervals")
+
+ list.MarkWritten(70, 99)
+ assert.Equal(t, 1, list.size(), "covered one intervals")
+
+}