aboutsummaryrefslogtreecommitdiff
path: root/weed/queue/log_buffer.go
blob: d6ccdf2a6ea862daa4eb050372d16ae05893a125 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package queue

import (
	"sync"
	"time"

	"github.com/golang/protobuf/proto"

	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
	"github.com/chrislusf/seaweedfs/weed/util"
)

type LogBuffer struct {
	buf           []byte
	pos           int
	startTime     time.Time
	stopTime      time.Time
	sizeBuf       []byte
	flushInterval time.Duration
	flushFn       func(startTime, stopTime time.Time, buf []byte)
	isStopping    bool
	sync.Mutex
}

func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte)) *LogBuffer {
	lb := &LogBuffer{
		buf:           make([]byte, 4*0124*1024),
		sizeBuf:       make([]byte, 4),
		flushInterval: flushInterval,
		flushFn:       flushFn,
	}
	go lb.loopFlush()
	return lb
}

func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {

	logEntry := &filer_pb.LogEntry{
		TsNs:             ts.UnixNano(),
		PartitionKeyHash: util.HashToInt32(key),
		Data:             data,
	}

	logEntryData, _ := proto.Marshal(logEntry)

	size := len(logEntryData)

	m.Lock()
	defer m.Unlock()

	if m.pos == 0 {
		m.startTime = ts
	}

	if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
		m.flush()
		m.startTime = ts
	}
	m.stopTime = ts

	util.Uint32toBytes(m.sizeBuf, uint32(size))
	copy(m.buf[m.pos:m.pos+4], m.sizeBuf)

	copy(m.buf[m.pos+4:m.pos+4+size], logEntryData)
	m.pos += size + 4
}

func (m *LogBuffer) Shutdown() {
	if m.isStopping {
		return
	}
	m.isStopping = true
	m.Lock()
	m.flush()
	m.Unlock()
}

func (m *LogBuffer) loopFlush() {
	for !m.isStopping {
		m.Lock()
		m.flush()
		m.Unlock()
		time.Sleep(m.flushInterval)
	}
}

func (m *LogBuffer) flush() {
	if m.flushFn != nil && m.pos > 0 {
		m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos])
		m.pos = 0
	}
}