diff options
Diffstat (limited to 'weed/util/log_buffer')
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 49 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer_test.go | 41 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 74 |
3 files changed, 148 insertions, 16 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index c7cb90549..d875dd54b 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -1,6 +1,7 @@ package log_buffer import ( + "bytes" "sync" "time" @@ -17,7 +18,7 @@ const PreviousBufferCount = 3 type dataToFlush struct { startTime time.Time stopTime time.Time - data []byte + data *bytes.Buffer } type LogBuffer struct { @@ -108,7 +109,8 @@ func (m *LogBuffer) Shutdown() { func (m *LogBuffer) loopFlush() { for d := range m.flushChan { if d != nil { - m.flushFn(d.startTime, d.stopTime, d.data) + m.flushFn(d.startTime, d.stopTime, d.data.Bytes()) + d.releaseMemory() } } } @@ -140,21 +142,26 @@ func (m *LogBuffer) copyToFlush() *dataToFlush { return nil } -func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, bufferCopy []byte) { +func (d *dataToFlush) releaseMemory() { + d.data.Reset() + bufferPool.Put(d.data) +} + +func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer) { m.RLock() defer m.RUnlock() - // fmt.Printf("read from buffer: %v\n", lastReadTime) + // fmt.Printf("read from buffer: %v last stop time: %v\n", lastReadTime.UnixNano(), m.stopTime.UnixNano()) if lastReadTime.Equal(m.stopTime) { - return lastReadTime, nil + return nil } if lastReadTime.After(m.stopTime) { // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime) - return lastReadTime, nil + return nil } if lastReadTime.Before(m.startTime) { - return m.stopTime, copiedBytes(m.buf[:m.pos]) + return copiedBytes(m.buf[:m.pos]) } lastTs := lastReadTime.UnixNano() @@ -177,7 +184,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer for l <= h { mid := (l + h) / 2 pos := m.idx[mid] - _, t := readTs(m.buf, m.idx[mid]) + _, t := readTs(m.buf, pos) if t <= lastTs { l = mid + 1 } else if lastTs < t { @@ -186,22 +193,32 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer _, prevT = readTs(m.buf, m.idx[mid-1]) } if prevT <= lastTs { - // println("found mid = ", mid) - return time.Unix(0, t), copiedBytes(m.buf[pos:m.pos]) + // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos) + return copiedBytes(m.buf[pos:m.pos]) } - h = mid - 1 + h = mid } // fmt.Printf("l=%d, h=%d\n", l, h) } // FIXME: this could be that the buffer has been flushed already - // println("not found") - return lastReadTime, nil + return nil } -func copiedBytes(buf []byte) (copied []byte) { - copied = make([]byte, len(buf)) - copy(copied, buf) +func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) { + b.Reset() + bufferPool.Put(b) +} + +var bufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + +func copiedBytes(buf []byte) (copied *bytes.Buffer) { + copied = bufferPool.Get().(*bytes.Buffer) + copied.Write(buf) return } diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go new file mode 100644 index 000000000..dfd611bed --- /dev/null +++ b/weed/util/log_buffer/log_buffer_test.go @@ -0,0 +1,41 @@ +package log_buffer + +import ( + "math/rand" + "testing" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func TestNewLogBuffer(t *testing.T) { + lb := NewLogBuffer(time.Second, func(startTime, stopTime time.Time, buf []byte) { + + }, func() { + + }) + + startTime := time.Now() + + messageSize := 1024 + messageCount := 994 + var buf = make([]byte, messageSize) + for i := 0; i < messageCount; i++ { + rand.Read(buf) + lb.AddToBuffer(nil, buf) + } + + receivedmessageCount := 0 + lb.LoopProcessLogData(startTime, func() bool { + // stop if no more messages + return false + }, func(logEntry *filer_pb.LogEntry) error { + receivedmessageCount++ + return nil + }) + + if receivedmessageCount != messageCount { + t.Errorf("sent %d received %d", messageCount, receivedmessageCount) + } + +} diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go new file mode 100644 index 000000000..4333df1b4 --- /dev/null +++ b/weed/util/log_buffer/log_read.go @@ -0,0 +1,74 @@ +package log_buffer + +import ( + "bytes" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (logBuffer *LogBuffer) LoopProcessLogData( + startTreadTime time.Time, waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (processed int64, err error) { + // loop through all messages + var bytesBuf *bytes.Buffer + lastReadTime := startTreadTime + defer func() { + if bytesBuf != nil { + logBuffer.ReleaseMeory(bytesBuf) + } + }() + + for { + + if bytesBuf != nil { + logBuffer.ReleaseMeory(bytesBuf) + } + bytesBuf = logBuffer.ReadFromBuffer(lastReadTime) + if bytesBuf == nil { + if waitForDataFn() { + continue + } else { + return + } + } + + buf := bytesBuf.Bytes() + + batchSize := 0 + var startReadTime time.Time + + for pos := 0; pos+4 < len(buf); { + + size := util.BytesToUint32(buf[pos : pos+4]) + entryData := buf[pos+4 : pos+4+int(size)] + + // fmt.Printf("read buffer read %d [%d,%d) from [0,%d)\n", batchSize, pos, pos+int(size)+4, len(buf)) + + logEntry := &filer_pb.LogEntry{} + if err = proto.Unmarshal(entryData, logEntry); err != nil { + glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) + pos += 4 + int(size) + continue + } + lastReadTime = time.Unix(0, logEntry.TsNs) + if startReadTime.IsZero() { + startReadTime = lastReadTime + } + + if err = eachLogDataFn(logEntry); err != nil { + return + } + + pos += 4 + int(size) + batchSize++ + processed++ + } + + // fmt.Printf("sent message ts[%d,%d] size %d\n", startReadTime.UnixNano(), lastReadTime.UnixNano(), batchSize) + } + +}
\ No newline at end of file |
