aboutsummaryrefslogtreecommitdiff
path: root/weed/queue/log_buffer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/queue/log_buffer.go')
-rw-r--r--weed/queue/log_buffer.go92
1 files changed, 92 insertions, 0 deletions
diff --git a/weed/queue/log_buffer.go b/weed/queue/log_buffer.go
new file mode 100644
index 000000000..d6ccdf2a6
--- /dev/null
+++ b/weed/queue/log_buffer.go
@@ -0,0 +1,92 @@
+package queue
+
+import (
+ "sync"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type LogBuffer struct {
+ buf []byte
+ pos int
+ startTime time.Time
+ stopTime time.Time
+ sizeBuf []byte
+ flushInterval time.Duration
+ flushFn func(startTime, stopTime time.Time, buf []byte)
+ isStopping bool
+ sync.Mutex
+}
+
+func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte)) *LogBuffer {
+ lb := &LogBuffer{
+ buf: make([]byte, 4*0124*1024),
+ sizeBuf: make([]byte, 4),
+ flushInterval: flushInterval,
+ flushFn: flushFn,
+ }
+ go lb.loopFlush()
+ return lb
+}
+
+func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
+
+ logEntry := &filer_pb.LogEntry{
+ TsNs: ts.UnixNano(),
+ PartitionKeyHash: util.HashToInt32(key),
+ Data: data,
+ }
+
+ logEntryData, _ := proto.Marshal(logEntry)
+
+ size := len(logEntryData)
+
+ m.Lock()
+ defer m.Unlock()
+
+ if m.pos == 0 {
+ m.startTime = ts
+ }
+
+ if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
+ m.flush()
+ m.startTime = ts
+ }
+ m.stopTime = ts
+
+ util.Uint32toBytes(m.sizeBuf, uint32(size))
+ copy(m.buf[m.pos:m.pos+4], m.sizeBuf)
+
+ copy(m.buf[m.pos+4:m.pos+4+size], logEntryData)
+ m.pos += size + 4
+}
+
+func (m *LogBuffer) Shutdown() {
+ if m.isStopping {
+ return
+ }
+ m.isStopping = true
+ m.Lock()
+ m.flush()
+ m.Unlock()
+}
+
+func (m *LogBuffer) loopFlush() {
+ for !m.isStopping {
+ m.Lock()
+ m.flush()
+ m.Unlock()
+ time.Sleep(m.flushInterval)
+ }
+}
+
+func (m *LogBuffer) flush() {
+ if m.flushFn != nil && m.pos > 0 {
+ m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos])
+ m.pos = 0
+ }
+}