aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-19 23:37:04 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-19 23:37:50 -0700
commitce3cb25cfbf30a06348386210f72cc51c3fbd13a (patch)
tree217836bc07221d97a180f7f2fba41dd51ddb3eae /weed/util/log_buffer
parentf37323222751c104723273293a0b15b209021f32 (diff)
downloadseaweedfs-ce3cb25cfbf30a06348386210f72cc51c3fbd13a.tar.xz
seaweedfs-ce3cb25cfbf30a06348386210f72cc51c3fbd13a.zip
working for in memory single log buffer
Diffstat (limited to 'weed/util/log_buffer')
-rw-r--r--weed/util/log_buffer/log_buffer.go49
-rw-r--r--weed/util/log_buffer/log_buffer_test.go41
-rw-r--r--weed/util/log_buffer/log_read.go74
3 files changed, 148 insertions, 16 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index c7cb90549..d875dd54b 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -1,6 +1,7 @@
package log_buffer
import (
+ "bytes"
"sync"
"time"
@@ -17,7 +18,7 @@ const PreviousBufferCount = 3
type dataToFlush struct {
startTime time.Time
stopTime time.Time
- data []byte
+ data *bytes.Buffer
}
type LogBuffer struct {
@@ -108,7 +109,8 @@ func (m *LogBuffer) Shutdown() {
func (m *LogBuffer) loopFlush() {
for d := range m.flushChan {
if d != nil {
- m.flushFn(d.startTime, d.stopTime, d.data)
+ m.flushFn(d.startTime, d.stopTime, d.data.Bytes())
+ d.releaseMemory()
}
}
}
@@ -140,21 +142,26 @@ func (m *LogBuffer) copyToFlush() *dataToFlush {
return nil
}
-func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, bufferCopy []byte) {
+func (d *dataToFlush) releaseMemory() {
+ d.data.Reset()
+ bufferPool.Put(d.data)
+}
+
+func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer) {
m.RLock()
defer m.RUnlock()
- // fmt.Printf("read from buffer: %v\n", lastReadTime)
+ // fmt.Printf("read from buffer: %v last stop time: %v\n", lastReadTime.UnixNano(), m.stopTime.UnixNano())
if lastReadTime.Equal(m.stopTime) {
- return lastReadTime, nil
+ return nil
}
if lastReadTime.After(m.stopTime) {
// glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime)
- return lastReadTime, nil
+ return nil
}
if lastReadTime.Before(m.startTime) {
- return m.stopTime, copiedBytes(m.buf[:m.pos])
+ return copiedBytes(m.buf[:m.pos])
}
lastTs := lastReadTime.UnixNano()
@@ -177,7 +184,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer
for l <= h {
mid := (l + h) / 2
pos := m.idx[mid]
- _, t := readTs(m.buf, m.idx[mid])
+ _, t := readTs(m.buf, pos)
if t <= lastTs {
l = mid + 1
} else if lastTs < t {
@@ -186,22 +193,32 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer
_, prevT = readTs(m.buf, m.idx[mid-1])
}
if prevT <= lastTs {
- // println("found mid = ", mid)
- return time.Unix(0, t), copiedBytes(m.buf[pos:m.pos])
+ // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos)
+ return copiedBytes(m.buf[pos:m.pos])
}
- h = mid - 1
+ h = mid
}
// fmt.Printf("l=%d, h=%d\n", l, h)
}
// FIXME: this could be that the buffer has been flushed already
- // println("not found")
- return lastReadTime, nil
+ return nil
}
-func copiedBytes(buf []byte) (copied []byte) {
- copied = make([]byte, len(buf))
- copy(copied, buf)
+func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) {
+ b.Reset()
+ bufferPool.Put(b)
+}
+
+var bufferPool = sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+}
+
+func copiedBytes(buf []byte) (copied *bytes.Buffer) {
+ copied = bufferPool.Get().(*bytes.Buffer)
+ copied.Write(buf)
return
}
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
new file mode 100644
index 000000000..dfd611bed
--- /dev/null
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -0,0 +1,41 @@
+package log_buffer
+
+import (
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func TestNewLogBuffer(t *testing.T) {
+ lb := NewLogBuffer(time.Second, func(startTime, stopTime time.Time, buf []byte) {
+
+ }, func() {
+
+ })
+
+ startTime := time.Now()
+
+ messageSize := 1024
+ messageCount := 994
+ var buf = make([]byte, messageSize)
+ for i := 0; i < messageCount; i++ {
+ rand.Read(buf)
+ lb.AddToBuffer(nil, buf)
+ }
+
+ receivedmessageCount := 0
+ lb.LoopProcessLogData(startTime, func() bool {
+ // stop if no more messages
+ return false
+ }, func(logEntry *filer_pb.LogEntry) error {
+ receivedmessageCount++
+ return nil
+ })
+
+ if receivedmessageCount != messageCount {
+ t.Errorf("sent %d received %d", messageCount, receivedmessageCount)
+ }
+
+}
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
new file mode 100644
index 000000000..4333df1b4
--- /dev/null
+++ b/weed/util/log_buffer/log_read.go
@@ -0,0 +1,74 @@
+package log_buffer
+
+import (
+ "bytes"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (logBuffer *LogBuffer) LoopProcessLogData(
+ startTreadTime time.Time, waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (processed int64, err error) {
+ // loop through all messages
+ var bytesBuf *bytes.Buffer
+ lastReadTime := startTreadTime
+ defer func() {
+ if bytesBuf != nil {
+ logBuffer.ReleaseMeory(bytesBuf)
+ }
+ }()
+
+ for {
+
+ if bytesBuf != nil {
+ logBuffer.ReleaseMeory(bytesBuf)
+ }
+ bytesBuf = logBuffer.ReadFromBuffer(lastReadTime)
+ if bytesBuf == nil {
+ if waitForDataFn() {
+ continue
+ } else {
+ return
+ }
+ }
+
+ buf := bytesBuf.Bytes()
+
+ batchSize := 0
+ var startReadTime time.Time
+
+ for pos := 0; pos+4 < len(buf); {
+
+ size := util.BytesToUint32(buf[pos : pos+4])
+ entryData := buf[pos+4 : pos+4+int(size)]
+
+ // fmt.Printf("read buffer read %d [%d,%d) from [0,%d)\n", batchSize, pos, pos+int(size)+4, len(buf))
+
+ logEntry := &filer_pb.LogEntry{}
+ if err = proto.Unmarshal(entryData, logEntry); err != nil {
+ glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
+ pos += 4 + int(size)
+ continue
+ }
+ lastReadTime = time.Unix(0, logEntry.TsNs)
+ if startReadTime.IsZero() {
+ startReadTime = lastReadTime
+ }
+
+ if err = eachLogDataFn(logEntry); err != nil {
+ return
+ }
+
+ pos += 4 + int(size)
+ batchSize++
+ processed++
+ }
+
+ // fmt.Printf("sent message ts[%d,%d] size %d\n", startReadTime.UnixNano(), lastReadTime.UnixNano(), batchSize)
+ }
+
+} \ No newline at end of file