aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/queue/log_buffer.go8
-rw-r--r--weed/queue/sealed_buffer.go40
2 files changed, 47 insertions, 1 deletions
diff --git a/weed/queue/log_buffer.go b/weed/queue/log_buffer.go
index f24429479..78c5b75ae 100644
--- a/weed/queue/log_buffer.go
+++ b/weed/queue/log_buffer.go
@@ -11,6 +11,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
+const BufferSize = 4 * 1024 * 1024
+const PreviousBufferCount = 3
+
type dataToFlush struct {
startTime time.Time
stopTime time.Time
@@ -18,6 +21,7 @@ type dataToFlush struct {
}
type LogBuffer struct {
+ prevBuffers *SealedBuffers
buf []byte
idx []int
pos int
@@ -34,7 +38,8 @@ type LogBuffer struct {
func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer {
lb := &LogBuffer{
- buf: make([]byte, 4*1024*1024),
+ prevBuffers: newSealedBuffers(PreviousBufferCount),
+ buf: make([]byte, BufferSize),
sizeBuf: make([]byte, 4),
flushInterval: flushInterval,
flushFn: flushFn,
@@ -127,6 +132,7 @@ func (m *LogBuffer) copyToFlush() *dataToFlush {
stopTime: m.stopTime,
data: copiedBytes(m.buf[:m.pos]),
}
+ m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf)
m.pos = 0
m.idx = m.idx[:0]
return d
diff --git a/weed/queue/sealed_buffer.go b/weed/queue/sealed_buffer.go
new file mode 100644
index 000000000..23cec92c9
--- /dev/null
+++ b/weed/queue/sealed_buffer.go
@@ -0,0 +1,40 @@
+package queue
+
+import "time"
+
+type MemBuffer struct {
+ buf []byte
+ startTime time.Time
+ stopTime time.Time
+}
+
+type SealedBuffers struct {
+ buffers []*MemBuffer
+}
+
+func newSealedBuffers(size int) *SealedBuffers {
+ sbs := &SealedBuffers{}
+
+ sbs.buffers = make([]*MemBuffer, size)
+ for i := 0; i < size; i++ {
+ sbs.buffers[i] = &MemBuffer{
+ buf: make([]byte, BufferSize),
+ }
+ }
+
+ return sbs
+}
+
+func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte) (newBuf []byte) {
+ oldMemBuffer := sbs.buffers[0]
+ size := len(sbs.buffers)
+ for i := 0; i < size-1; i++ {
+ sbs.buffers[i].buf = sbs.buffers[i+1].buf
+ sbs.buffers[i].startTime = sbs.buffers[i+1].startTime
+ sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime
+ }
+ sbs.buffers[size-1].buf = buf
+ sbs.buffers[size-1].startTime = startTime
+ sbs.buffers[size-1].stopTime = stopTime
+ return oldMemBuffer.buf
+}