diff options
Diffstat (limited to 'weed/util/log_buffer/log_buffer.go')
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 49 |
1 files changed, 33 insertions, 16 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index c7cb90549..d875dd54b 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -1,6 +1,7 @@ package log_buffer import ( + "bytes" "sync" "time" @@ -17,7 +18,7 @@ const PreviousBufferCount = 3 type dataToFlush struct { startTime time.Time stopTime time.Time - data []byte + data *bytes.Buffer } type LogBuffer struct { @@ -108,7 +109,8 @@ func (m *LogBuffer) Shutdown() { func (m *LogBuffer) loopFlush() { for d := range m.flushChan { if d != nil { - m.flushFn(d.startTime, d.stopTime, d.data) + m.flushFn(d.startTime, d.stopTime, d.data.Bytes()) + d.releaseMemory() } } } @@ -140,21 +142,26 @@ func (m *LogBuffer) copyToFlush() *dataToFlush { return nil } -func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, bufferCopy []byte) { +func (d *dataToFlush) releaseMemory() { + d.data.Reset() + bufferPool.Put(d.data) +} + +func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer) { m.RLock() defer m.RUnlock() - // fmt.Printf("read from buffer: %v\n", lastReadTime) + // fmt.Printf("read from buffer: %v last stop time: %v\n", lastReadTime.UnixNano(), m.stopTime.UnixNano()) if lastReadTime.Equal(m.stopTime) { - return lastReadTime, nil + return nil } if lastReadTime.After(m.stopTime) { // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime) - return lastReadTime, nil + return nil } if lastReadTime.Before(m.startTime) { - return m.stopTime, copiedBytes(m.buf[:m.pos]) + return copiedBytes(m.buf[:m.pos]) } lastTs := lastReadTime.UnixNano() @@ -177,7 +184,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer for l <= h { mid := (l + h) / 2 pos := m.idx[mid] - _, t := readTs(m.buf, m.idx[mid]) + _, t := readTs(m.buf, pos) if t <= lastTs { l = mid + 1 } else if lastTs < t { @@ -186,22 +193,32 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer _, prevT = readTs(m.buf, m.idx[mid-1]) } if prevT <= lastTs { - // println("found mid = ", mid) - return time.Unix(0, t), copiedBytes(m.buf[pos:m.pos]) + // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos) + return copiedBytes(m.buf[pos:m.pos]) } - h = mid - 1 + h = mid } // fmt.Printf("l=%d, h=%d\n", l, h) } // FIXME: this could be that the buffer has been flushed already - // println("not found") - return lastReadTime, nil + return nil } -func copiedBytes(buf []byte) (copied []byte) { - copied = make([]byte, len(buf)) - copy(copied, buf) +func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) { + b.Reset() + bufferPool.Put(b) +} + +var bufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + +func copiedBytes(buf []byte) (copied *bytes.Buffer) { + copied = bufferPool.Get().(*bytes.Buffer) + copied.Write(buf) return } |
