aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_buffer.go
diff options
context:
space:
mode:
authoryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
committeryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
commit46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch)
tree734125b48b6d96f8796a2b89b924312cd169ef0e /weed/util/log_buffer/log_buffer.go
parenta5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff)
parentdc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff)
downloadseaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz
seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip
Update tikv client version and add one PC support
Diffstat (limited to 'weed/util/log_buffer/log_buffer.go')
-rw-r--r--weed/util/log_buffer/log_buffer.go62
1 files changed, 43 insertions, 19 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index c2158e7eb..422575193 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -56,11 +56,15 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTi
return lb
}
-func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) {
+func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) {
+ var toFlush *dataToFlush
m.Lock()
defer func() {
m.Unlock()
+ if toFlush != nil {
+ m.flushChan <- toFlush
+ }
if m.notifyFn != nil {
m.notifyFn()
}
@@ -68,20 +72,20 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) {
// need to put the timestamp inside the lock
var ts time.Time
- if eventTsNs == 0 {
+ if processingTsNs == 0 {
ts = time.Now()
- eventTsNs = ts.UnixNano()
+ processingTsNs = ts.UnixNano()
} else {
- ts = time.Unix(0, eventTsNs)
+ ts = time.Unix(0, processingTsNs)
}
- if m.lastTsNs >= eventTsNs {
+ if m.lastTsNs >= processingTsNs {
// this is unlikely to happen, but just in case
- eventTsNs = m.lastTsNs + 1
- ts = time.Unix(0, eventTsNs)
+ processingTsNs = m.lastTsNs + 1
+ ts = time.Unix(0, processingTsNs)
}
- m.lastTsNs = eventTsNs
+ m.lastTsNs = processingTsNs
logEntry := &filer_pb.LogEntry{
- TsNs: eventTsNs,
+ TsNs: processingTsNs,
PartitionKeyHash: util.HashToInt32(partitionKey),
Data: data,
}
@@ -96,7 +100,7 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) {
if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
// glog.V(4).Infof("%s copyToFlush1 start time %v, ts %v, remaining %d bytes", m.name, m.startTime, ts, len(m.buf)-m.pos)
- m.flushChan <- m.copyToFlush()
+ toFlush = m.copyToFlush()
m.startTime = ts
if len(m.buf) < size+4 {
m.buf = make([]byte, 2*size+4)
@@ -148,8 +152,10 @@ func (m *LogBuffer) loopInterval() {
return
}
toFlush := m.copyToFlush()
- m.flushChan <- toFlush
m.Unlock()
+ if toFlush != nil {
+ m.flushChan <- toFlush
+ }
}
}
@@ -188,16 +194,34 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
m.RLock()
defer m.RUnlock()
- if !m.lastFlushTime.IsZero() && m.lastFlushTime.After(lastReadTime) {
- return nil, ResumeFromDiskError
+ // Read from disk and memory
+ // 1. read from disk, last time is = td
+ // 2. in memory, the earliest time = tm
+ // if tm <= td, case 2.1
+ // read from memory
+ // if tm is empty, case 2.2
+ // read from memory
+ // if td < tm, case 2.3
+ // read from disk again
+ var tsMemory time.Time
+ if !m.startTime.IsZero() {
+ tsMemory = m.startTime
}
-
- /*
- fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers))
- for i, prevBuf := range m.prevBuffers.buffers {
- fmt.Printf(" prev %d : %s\n", i, prevBuf.String())
+ for _, prevBuf := range m.prevBuffers.buffers {
+ if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) {
+ tsMemory = prevBuf.startTime
}
- */
+ }
+ if tsMemory.IsZero() { // case 2.2
+ return nil, nil
+ } else if lastReadTime.Before(tsMemory) { // case 2.3
+ if !m.lastFlushTime.IsZero() {
+ glog.V(0).Infof("resume with last flush time: %v", m.lastFlushTime)
+ return nil, ResumeFromDiskError
+ }
+ }
+
+ // the following is case 2.1
if lastReadTime.Equal(m.stopTime) {
return nil, nil