diff options
| author | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
|---|---|---|
| committer | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
| commit | 46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch) | |
| tree | 734125b48b6d96f8796a2b89b924312cd169ef0e /weed/util/log_buffer | |
| parent | a5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff) | |
| parent | dc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff) | |
| download | seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip | |
Update tikv client version and add one PC support
Diffstat (limited to 'weed/util/log_buffer')
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 62 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer_test.go | 2 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 20 |
3 files changed, 57 insertions, 27 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index c2158e7eb..422575193 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -56,11 +56,15 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTi return lb } -func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { + var toFlush *dataToFlush m.Lock() defer func() { m.Unlock() + if toFlush != nil { + m.flushChan <- toFlush + } if m.notifyFn != nil { m.notifyFn() } @@ -68,20 +72,20 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { // need to put the timestamp inside the lock var ts time.Time - if eventTsNs == 0 { + if processingTsNs == 0 { ts = time.Now() - eventTsNs = ts.UnixNano() + processingTsNs = ts.UnixNano() } else { - ts = time.Unix(0, eventTsNs) + ts = time.Unix(0, processingTsNs) } - if m.lastTsNs >= eventTsNs { + if m.lastTsNs >= processingTsNs { // this is unlikely to happen, but just in case - eventTsNs = m.lastTsNs + 1 - ts = time.Unix(0, eventTsNs) + processingTsNs = m.lastTsNs + 1 + ts = time.Unix(0, processingTsNs) } - m.lastTsNs = eventTsNs + m.lastTsNs = processingTsNs logEntry := &filer_pb.LogEntry{ - TsNs: eventTsNs, + TsNs: processingTsNs, PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, } @@ -96,7 +100,7 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { // glog.V(4).Infof("%s copyToFlush1 start time %v, ts %v, remaining %d bytes", m.name, m.startTime, ts, len(m.buf)-m.pos) - m.flushChan <- m.copyToFlush() + toFlush = m.copyToFlush() m.startTime = ts if len(m.buf) < size+4 { m.buf = make([]byte, 2*size+4) @@ -148,8 +152,10 @@ func (m *LogBuffer) loopInterval() { return } toFlush := m.copyToFlush() - m.flushChan <- toFlush m.Unlock() + if toFlush != nil { + m.flushChan <- toFlush + } } } @@ -188,16 +194,34 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu m.RLock() defer m.RUnlock() - if !m.lastFlushTime.IsZero() && m.lastFlushTime.After(lastReadTime) { - return nil, ResumeFromDiskError + // Read from disk and memory + // 1. read from disk, last time is = td + // 2. in memory, the earliest time = tm + // if tm <= td, case 2.1 + // read from memory + // if tm is empty, case 2.2 + // read from memory + // if td < tm, case 2.3 + // read from disk again + var tsMemory time.Time + if !m.startTime.IsZero() { + tsMemory = m.startTime } - - /* - fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers)) - for i, prevBuf := range m.prevBuffers.buffers { - fmt.Printf(" prev %d : %s\n", i, prevBuf.String()) + for _, prevBuf := range m.prevBuffers.buffers { + if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) { + tsMemory = prevBuf.startTime } - */ + } + if tsMemory.IsZero() { // case 2.2 + return nil, nil + } else if lastReadTime.Before(tsMemory) { // case 2.3 + if !m.lastFlushTime.IsZero() { + glog.V(0).Infof("resume with last flush time: %v", m.lastFlushTime) + return nil, ResumeFromDiskError + } + } + + // the following is case 2.1 if lastReadTime.Equal(m.stopTime) { return nil, nil diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index 7dcfe5f52..915b93bf4 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -27,7 +27,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { } receivedmessageCount := 0 - lb.LoopProcessLogData("test", startTime, func() bool { + lb.LoopProcessLogData("test", startTime, 0, func() bool { // stop if no more messages return false }, func(logEntry *filer_pb.LogEntry) error { diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 02f5af274..99532b47b 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -17,10 +17,11 @@ var ( ResumeFromDiskError = fmt.Errorf("resumeFromDisk") ) -func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime time.Time, waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) { +func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime time.Time, stopTsNs int64, + waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, isDone bool, err error) { // loop through all messages var bytesBuf *bytes.Buffer - lastReadTime = startTreadTime + lastReadTime = startReadTime defer func() { if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) @@ -34,10 +35,15 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime } bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime) if err == ResumeFromDiskError { - return lastReadTime, ResumeFromDiskError + time.Sleep(1127 * time.Millisecond) + return lastReadTime, isDone, ResumeFromDiskError } // glog.V(4).Infof("%s ReadFromBuffer by %v", readerName, lastReadTime) if bytesBuf == nil { + if stopTsNs != 0 { + isDone = true + return + } if waitForDataFn() { continue } else { @@ -49,7 +55,6 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadTime, len(buf)) batchSize := 0 - var startReadTime time.Time for pos := 0; pos+4 < len(buf); { @@ -67,10 +72,11 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime pos += 4 + int(size) continue } - lastReadTime = time.Unix(0, logEntry.TsNs) - if startReadTime.IsZero() { - startReadTime = lastReadTime + if stopTsNs != 0 && logEntry.TsNs > stopTsNs { + isDone = true + return } + lastReadTime = time.Unix(0, logEntry.TsNs) if err = eachLogDataFn(logEntry); err != nil { return |
