diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2020-10-22 15:50:49 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-10-22 15:50:49 +0800 |
| commit | cf7a1c722fa82fa78c546f68e4814fff7dc6d1e2 (patch) | |
| tree | 1470a0d158a31b516e02202c004dfb413a29e186 /weed/filesys | |
| parent | ab1105c52472946efab9713bf15df45e14ff4514 (diff) | |
| parent | 5179e559f74cf7aed562f785e40bef46da3191bf (diff) | |
| download | seaweedfs-cf7a1c722fa82fa78c546f68e4814fff7dc6d1e2.tar.xz seaweedfs-cf7a1c722fa82fa78c546f68e4814fff7dc6d1e2.zip | |
Merge pull request #32 from chrislusf/master
sync
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dirty_page.go | 4 | ||||
| -rw-r--r-- | weed/filesys/dirty_page_interval.go | 80 |
2 files changed, 65 insertions, 19 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 7a3e859f5..6fda134aa 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -54,7 +54,7 @@ func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) { pages.intervals.AddInterval(data, offset) - if pages.intervals.TotalSize() > pages.f.wfs.option.ChunkSizeLimit { + if pages.intervals.TotalSize() >= pages.f.wfs.option.ChunkSizeLimit { pages.saveExistingLargestPageToStorage() } @@ -93,6 +93,8 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) + maxList.Destroy() + return true } diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go index f143fe3e4..f644bea0b 100644 --- a/weed/filesys/dirty_page_interval.go +++ b/weed/filesys/dirty_page_interval.go @@ -5,6 +5,7 @@ import ( "io" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/valyala/bytebufferpool" ) type IntervalNode struct { @@ -12,6 +13,15 @@ type IntervalNode struct { Offset int64 Size int64 Next *IntervalNode + Buffer *bytebufferpool.ByteBuffer +} + +func (l *IntervalNode) Bytes() []byte { + data := l.Data + if data == nil { + data = l.Buffer.Bytes() + } + return data } type IntervalLinkedList struct { @@ -23,16 +33,39 @@ type ContinuousIntervals struct { lists []*IntervalLinkedList } +func NewIntervalLinkedList(head, tail *IntervalNode) *IntervalLinkedList { + list := &IntervalLinkedList{ + Head: head, + Tail: tail, + } + return list +} + +func (list *IntervalLinkedList) Destroy() { + for t := list.Head; t != nil; t = t.Next { + if t.Buffer != nil { + bytebufferpool.Put(t.Buffer) + } + } +} + func (list *IntervalLinkedList) Offset() int64 { return list.Head.Offset } func (list *IntervalLinkedList) Size() int64 { return list.Tail.Offset + list.Tail.Size - list.Head.Offset } + func (list *IntervalLinkedList) addNodeToTail(node *IntervalNode) { // glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size) - list.Tail.Next = node - list.Tail = node + if list.Tail.Buffer == nil { + list.Tail.Buffer = bytebufferpool.Get() + list.Tail.Buffer.Write(list.Tail.Data) + list.Tail.Data = nil + } + list.Tail.Buffer.Write(node.Data) + list.Tail.Size += int64(len(node.Data)) + return } func (list *IntervalLinkedList) addNodeToHead(node *IntervalNode) { // glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size) @@ -47,7 +80,7 @@ func (list *IntervalLinkedList) ReadData(buf []byte, start, stop int64) { nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size) if nodeStart < nodeStop { // glog.V(0).Infof("copying start=%d stop=%d t=[%d,%d) t.data=%d => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.Offset, t.Offset+t.Size, len(t.Data), len(buf), nodeStart, nodeStop) - copy(buf[nodeStart-start:], t.Data[nodeStart-t.Offset:nodeStop-t.Offset]) + copy(buf[nodeStart-start:], t.Bytes()[nodeStart-t.Offset:nodeStop-t.Offset]) } if t.Next == nil { @@ -72,8 +105,15 @@ func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList { // skip non overlapping IntervalNode continue } + data := t.Bytes()[nodeStart-t.Offset : nodeStop-t.Offset] + if t.Data == nil { + // need to clone if the bytes is from byte buffer + t := make([]byte, len(data)) + copy(t, data) + data = t + } nodes = append(nodes, &IntervalNode{ - Data: t.Data[nodeStart-t.Offset : nodeStop-t.Offset], + Data: data, Offset: nodeStart, Size: nodeStop - nodeStart, Next: nil, @@ -82,16 +122,22 @@ func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList { for i := 1; i < len(nodes); i++ { nodes[i-1].Next = nodes[i] } - return &IntervalLinkedList{ - Head: nodes[0], - Tail: nodes[len(nodes)-1], - } + return NewIntervalLinkedList(nodes[0], nodes[len(nodes)-1]) } func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { interval := &IntervalNode{Data: data, Offset: offset, Size: int64(len(data))} + // append to the tail and return + if len(c.lists) == 1 { + lastSpan := c.lists[0] + if lastSpan.Tail.Offset + lastSpan.Tail.Size == offset { + lastSpan.addNodeToTail(interval) + return + } + } + var newLists []*IntervalLinkedList for _, list := range c.lists { // if list is to the left of new interval, add to the new list @@ -144,10 +190,7 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { nextList.addNodeToHead(interval) } if prevList == nil && nextList == nil { - c.lists = append(c.lists, &IntervalLinkedList{ - Head: interval, - Tail: interval, - }) + c.lists = append(c.lists, NewIntervalLinkedList(interval, interval)) } return @@ -155,11 +198,12 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) { func (c *ContinuousIntervals) RemoveLargestIntervalLinkedList() *IntervalLinkedList { var maxSize int64 - maxIndex := -1 + maxIndex, maxOffset := -1, int64(-1) for k, list := range c.lists { - if maxSize <= list.Size() { - maxSize = list.Size() - maxIndex = k + listSize := list.Size() + if maxSize < listSize || (maxSize == listSize && list.Offset() < maxOffset ) { + maxSize = listSize + maxIndex, maxOffset = k, list.Offset() } } if maxSize <= 0 { @@ -202,10 +246,10 @@ func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxSto func (l *IntervalLinkedList) ToReader() io.Reader { var readers []io.Reader t := l.Head - readers = append(readers, util.NewBytesReader(t.Data)) + readers = append(readers, util.NewBytesReader(t.Bytes())) for t.Next != nil { t = t.Next - readers = append(readers, bytes.NewReader(t.Data)) + readers = append(readers, bytes.NewReader(t.Bytes())) } if len(readers) == 1 { return readers[0] |
