aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_buffer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/log_buffer/log_buffer.go')
-rw-r--r--weed/util/log_buffer/log_buffer.go49
1 files changed, 33 insertions, 16 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index c7cb90549..d875dd54b 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -1,6 +1,7 @@
package log_buffer
import (
+ "bytes"
"sync"
"time"
@@ -17,7 +18,7 @@ const PreviousBufferCount = 3
type dataToFlush struct {
startTime time.Time
stopTime time.Time
- data []byte
+ data *bytes.Buffer
}
type LogBuffer struct {
@@ -108,7 +109,8 @@ func (m *LogBuffer) Shutdown() {
func (m *LogBuffer) loopFlush() {
for d := range m.flushChan {
if d != nil {
- m.flushFn(d.startTime, d.stopTime, d.data)
+ m.flushFn(d.startTime, d.stopTime, d.data.Bytes())
+ d.releaseMemory()
}
}
}
@@ -140,21 +142,26 @@ func (m *LogBuffer) copyToFlush() *dataToFlush {
return nil
}
-func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, bufferCopy []byte) {
+func (d *dataToFlush) releaseMemory() {
+ d.data.Reset()
+ bufferPool.Put(d.data)
+}
+
+func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer) {
m.RLock()
defer m.RUnlock()
- // fmt.Printf("read from buffer: %v\n", lastReadTime)
+ // fmt.Printf("read from buffer: %v last stop time: %v\n", lastReadTime.UnixNano(), m.stopTime.UnixNano())
if lastReadTime.Equal(m.stopTime) {
- return lastReadTime, nil
+ return nil
}
if lastReadTime.After(m.stopTime) {
// glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime)
- return lastReadTime, nil
+ return nil
}
if lastReadTime.Before(m.startTime) {
- return m.stopTime, copiedBytes(m.buf[:m.pos])
+ return copiedBytes(m.buf[:m.pos])
}
lastTs := lastReadTime.UnixNano()
@@ -177,7 +184,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer
for l <= h {
mid := (l + h) / 2
pos := m.idx[mid]
- _, t := readTs(m.buf, m.idx[mid])
+ _, t := readTs(m.buf, pos)
if t <= lastTs {
l = mid + 1
} else if lastTs < t {
@@ -186,22 +193,32 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer
_, prevT = readTs(m.buf, m.idx[mid-1])
}
if prevT <= lastTs {
- // println("found mid = ", mid)
- return time.Unix(0, t), copiedBytes(m.buf[pos:m.pos])
+ // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos)
+ return copiedBytes(m.buf[pos:m.pos])
}
- h = mid - 1
+ h = mid
}
// fmt.Printf("l=%d, h=%d\n", l, h)
}
// FIXME: this could be that the buffer has been flushed already
- // println("not found")
- return lastReadTime, nil
+ return nil
}
-func copiedBytes(buf []byte) (copied []byte) {
- copied = make([]byte, len(buf))
- copy(copied, buf)
+func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) {
+ b.Reset()
+ bufferPool.Put(b)
+}
+
+var bufferPool = sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+}
+
+func copiedBytes(buf []byte) (copied *bytes.Buffer) {
+ copied = bufferPool.Get().(*bytes.Buffer)
+ copied.Write(buf)
return
}