aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2020-10-22 15:50:49 +0800
committerGitHub <noreply@github.com>2020-10-22 15:50:49 +0800
commitcf7a1c722fa82fa78c546f68e4814fff7dc6d1e2 (patch)
tree1470a0d158a31b516e02202c004dfb413a29e186 /weed/filesys
parentab1105c52472946efab9713bf15df45e14ff4514 (diff)
parent5179e559f74cf7aed562f785e40bef46da3191bf (diff)
downloadseaweedfs-cf7a1c722fa82fa78c546f68e4814fff7dc6d1e2.tar.xz
seaweedfs-cf7a1c722fa82fa78c546f68e4814fff7dc6d1e2.zip
Merge pull request #32 from chrislusf/master
sync
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dirty_page.go4
-rw-r--r--weed/filesys/dirty_page_interval.go80
2 files changed, 65 insertions, 19 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 7a3e859f5..6fda134aa 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -54,7 +54,7 @@ func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) {
pages.intervals.AddInterval(data, offset)
- if pages.intervals.TotalSize() > pages.f.wfs.option.ChunkSizeLimit {
+ if pages.intervals.TotalSize() >= pages.f.wfs.option.ChunkSizeLimit {
pages.saveExistingLargestPageToStorage()
}
@@ -93,6 +93,8 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize)
+ maxList.Destroy()
+
return true
}
diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/dirty_page_interval.go
index f143fe3e4..f644bea0b 100644
--- a/weed/filesys/dirty_page_interval.go
+++ b/weed/filesys/dirty_page_interval.go
@@ -5,6 +5,7 @@ import (
"io"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/valyala/bytebufferpool"
)
type IntervalNode struct {
@@ -12,6 +13,15 @@ type IntervalNode struct {
Offset int64
Size int64
Next *IntervalNode
+ Buffer *bytebufferpool.ByteBuffer
+}
+
+func (l *IntervalNode) Bytes() []byte {
+ data := l.Data
+ if data == nil {
+ data = l.Buffer.Bytes()
+ }
+ return data
}
type IntervalLinkedList struct {
@@ -23,16 +33,39 @@ type ContinuousIntervals struct {
lists []*IntervalLinkedList
}
+func NewIntervalLinkedList(head, tail *IntervalNode) *IntervalLinkedList {
+ list := &IntervalLinkedList{
+ Head: head,
+ Tail: tail,
+ }
+ return list
+}
+
+func (list *IntervalLinkedList) Destroy() {
+ for t := list.Head; t != nil; t = t.Next {
+ if t.Buffer != nil {
+ bytebufferpool.Put(t.Buffer)
+ }
+ }
+}
+
func (list *IntervalLinkedList) Offset() int64 {
return list.Head.Offset
}
func (list *IntervalLinkedList) Size() int64 {
return list.Tail.Offset + list.Tail.Size - list.Head.Offset
}
+
func (list *IntervalLinkedList) addNodeToTail(node *IntervalNode) {
// glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size)
- list.Tail.Next = node
- list.Tail = node
+ if list.Tail.Buffer == nil {
+ list.Tail.Buffer = bytebufferpool.Get()
+ list.Tail.Buffer.Write(list.Tail.Data)
+ list.Tail.Data = nil
+ }
+ list.Tail.Buffer.Write(node.Data)
+ list.Tail.Size += int64(len(node.Data))
+ return
}
func (list *IntervalLinkedList) addNodeToHead(node *IntervalNode) {
// glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size)
@@ -47,7 +80,7 @@ func (list *IntervalLinkedList) ReadData(buf []byte, start, stop int64) {
nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size)
if nodeStart < nodeStop {
// glog.V(0).Infof("copying start=%d stop=%d t=[%d,%d) t.data=%d => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.Offset, t.Offset+t.Size, len(t.Data), len(buf), nodeStart, nodeStop)
- copy(buf[nodeStart-start:], t.Data[nodeStart-t.Offset:nodeStop-t.Offset])
+ copy(buf[nodeStart-start:], t.Bytes()[nodeStart-t.Offset:nodeStop-t.Offset])
}
if t.Next == nil {
@@ -72,8 +105,15 @@ func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList {
// skip non overlapping IntervalNode
continue
}
+ data := t.Bytes()[nodeStart-t.Offset : nodeStop-t.Offset]
+ if t.Data == nil {
+ // need to clone if the bytes is from byte buffer
+ t := make([]byte, len(data))
+ copy(t, data)
+ data = t
+ }
nodes = append(nodes, &IntervalNode{
- Data: t.Data[nodeStart-t.Offset : nodeStop-t.Offset],
+ Data: data,
Offset: nodeStart,
Size: nodeStop - nodeStart,
Next: nil,
@@ -82,16 +122,22 @@ func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList {
for i := 1; i < len(nodes); i++ {
nodes[i-1].Next = nodes[i]
}
- return &IntervalLinkedList{
- Head: nodes[0],
- Tail: nodes[len(nodes)-1],
- }
+ return NewIntervalLinkedList(nodes[0], nodes[len(nodes)-1])
}
func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) {
interval := &IntervalNode{Data: data, Offset: offset, Size: int64(len(data))}
+ // append to the tail and return
+ if len(c.lists) == 1 {
+ lastSpan := c.lists[0]
+ if lastSpan.Tail.Offset + lastSpan.Tail.Size == offset {
+ lastSpan.addNodeToTail(interval)
+ return
+ }
+ }
+
var newLists []*IntervalLinkedList
for _, list := range c.lists {
// if list is to the left of new interval, add to the new list
@@ -144,10 +190,7 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) {
nextList.addNodeToHead(interval)
}
if prevList == nil && nextList == nil {
- c.lists = append(c.lists, &IntervalLinkedList{
- Head: interval,
- Tail: interval,
- })
+ c.lists = append(c.lists, NewIntervalLinkedList(interval, interval))
}
return
@@ -155,11 +198,12 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) {
func (c *ContinuousIntervals) RemoveLargestIntervalLinkedList() *IntervalLinkedList {
var maxSize int64
- maxIndex := -1
+ maxIndex, maxOffset := -1, int64(-1)
for k, list := range c.lists {
- if maxSize <= list.Size() {
- maxSize = list.Size()
- maxIndex = k
+ listSize := list.Size()
+ if maxSize < listSize || (maxSize == listSize && list.Offset() < maxOffset ) {
+ maxSize = listSize
+ maxIndex, maxOffset = k, list.Offset()
}
}
if maxSize <= 0 {
@@ -202,10 +246,10 @@ func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxSto
func (l *IntervalLinkedList) ToReader() io.Reader {
var readers []io.Reader
t := l.Head
- readers = append(readers, util.NewBytesReader(t.Data))
+ readers = append(readers, util.NewBytesReader(t.Bytes()))
for t.Next != nil {
t = t.Next
- readers = append(readers, bytes.NewReader(t.Data))
+ readers = append(readers, bytes.NewReader(t.Bytes()))
}
if len(readers) == 1 {
return readers[0]