aboutsummaryrefslogtreecommitdiff
path: root/weed/queue/log_buffer.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-05 00:51:16 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-05 00:51:16 -0700
commitbf270d9e8c01052409464193b693d50fa09a70a9 (patch)
tree75d7faa1a56d984fd78954df8dca8b65d2f60a00 /weed/queue/log_buffer.go
parent2a2d92d06e440c661bc0b06ff9c5c7034e9fc465 (diff)
downloadseaweedfs-bf270d9e8c01052409464193b693d50fa09a70a9.tar.xz
seaweedfs-bf270d9e8c01052409464193b693d50fa09a70a9.zip
filer: able to tail meta data changes
Diffstat (limited to 'weed/queue/log_buffer.go')
-rw-r--r--weed/queue/log_buffer.go82
1 files changed, 78 insertions, 4 deletions
diff --git a/weed/queue/log_buffer.go b/weed/queue/log_buffer.go
index d6ccdf2a6..6ed2a719c 100644
--- a/weed/queue/log_buffer.go
+++ b/weed/queue/log_buffer.go
@@ -6,28 +6,32 @@ import (
"github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
type LogBuffer struct {
buf []byte
+ idx []int
pos int
startTime time.Time
stopTime time.Time
sizeBuf []byte
flushInterval time.Duration
flushFn func(startTime, stopTime time.Time, buf []byte)
+ notifyFn func()
isStopping bool
- sync.Mutex
+ sync.RWMutex
}
-func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte)) *LogBuffer {
+func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer {
lb := &LogBuffer{
buf: make([]byte, 4*0124*1024),
sizeBuf: make([]byte, 4),
flushInterval: flushInterval,
flushFn: flushFn,
+ notifyFn: notifyFn,
}
go lb.loopFlush()
return lb
@@ -46,7 +50,12 @@ func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
size := len(logEntryData)
m.Lock()
- defer m.Unlock()
+ defer func() {
+ m.Unlock()
+ if m.notifyFn != nil {
+ m.notifyFn()
+ }
+ }()
if m.pos == 0 {
m.startTime = ts
@@ -55,12 +64,15 @@ func (m *LogBuffer) AddToBuffer(ts time.Time, key, data []byte) {
if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
m.flush()
m.startTime = ts
+ if len(m.buf) < size+4 {
+ m.buf = make([]byte, 2*size+4)
+ }
}
m.stopTime = ts
+ m.idx = append(m.idx, m.pos)
util.Uint32toBytes(m.sizeBuf, uint32(size))
copy(m.buf[m.pos:m.pos+4], m.sizeBuf)
-
copy(m.buf[m.pos+4:m.pos+4+size], logEntryData)
m.pos += size + 4
}
@@ -88,5 +100,67 @@ func (m *LogBuffer) flush() {
if m.flushFn != nil && m.pos > 0 {
m.flushFn(m.startTime, m.stopTime, m.buf[:m.pos])
m.pos = 0
+ m.idx = m.idx[:0]
+ }
+}
+
+func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, bufferCopy []byte) {
+ m.RLock()
+ defer m.RUnlock()
+
+ // fmt.Printf("read from buffer: %v\n", lastReadTime)
+
+ if lastReadTime.Equal(m.stopTime) {
+ return lastReadTime, nil
+ }
+ if lastReadTime.After(m.stopTime) {
+ // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime)
+ return lastReadTime, nil
}
+ if lastReadTime.Before(m.startTime) {
+ return m.stopTime, copiedBytes(m.buf[:m.pos])
+ }
+
+ lastTs := lastReadTime.UnixNano()
+ l, h := 0, len(m.idx)-1
+
+ // fmt.Printf("l=%d, h=%d\n", l, h)
+ for {
+ mid := (l + h) / 2
+ pos := m.idx[mid]
+ t := readTs(m.buf, m.idx[mid])
+ if t <= lastTs {
+ l = mid + 1
+ } else if lastTs < t {
+ var prevT int64
+ if mid > 0 {
+ prevT = readTs(m.buf, m.idx[mid-1])
+ }
+ if prevT <= lastTs {
+ return time.Unix(0, t), copiedBytes(m.buf[pos:m.pos])
+ }
+ h = mid - 1
+ }
+ // fmt.Printf("l=%d, h=%d\n", l, h)
+ }
+
+}
+func copiedBytes(buf []byte) (copied []byte) {
+ copied = make([]byte, len(buf))
+ copy(copied, buf)
+ return
+}
+
+func readTs(buf []byte, pos int) int64 {
+
+ size := util.BytesToUint32(buf[pos : pos+4])
+ entryData := buf[pos+4 : pos+4+int(size)]
+ logEntry := &filer_pb.LogEntry{}
+
+ err := proto.Unmarshal(entryData, logEntry)
+ if err != nil {
+ glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err)
+ }
+ return logEntry.TsNs
+
}