diff options
Diffstat (limited to 'weed/util/log_buffer/log_buffer.go')
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 15 |
1 files changed, 14 insertions, 1 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index fa956317e..efe42176e 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -2,6 +2,7 @@ package log_buffer import ( "bytes" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "sync" "sync/atomic" "time" @@ -43,6 +44,7 @@ type LogBuffer struct { ReadFromDiskFn LogReadFromDiskFuncType notifyFn func() isStopping *atomic.Bool + isAllFlushed bool flushChan chan *dataToFlush LastTsNs int64 sync.RWMutex @@ -67,7 +69,11 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc return lb } -func (logBuffer *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { +func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) { + logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs) +} + +func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) { var toFlush *dataToFlush logBuffer.Lock() @@ -134,6 +140,7 @@ func (logBuffer *LogBuffer) IsStopping() bool { return logBuffer.isStopping.Load() } +// ShutdownLogBuffer flushes the buffer and stops the log buffer func (logBuffer *LogBuffer) ShutdownLogBuffer() { isAlreadyStopped := logBuffer.isStopping.Swap(true) if isAlreadyStopped { @@ -144,6 +151,11 @@ func (logBuffer *LogBuffer) ShutdownLogBuffer() { close(logBuffer.flushChan) } +// IsAllFlushed returns true if all data in the buffer has been flushed, after calling ShutdownLogBuffer(). +func (logBuffer *LogBuffer) IsAllFlushed() bool { + return logBuffer.isAllFlushed +} + func (logBuffer *LogBuffer) loopFlush() { for d := range logBuffer.flushChan { if d != nil { @@ -154,6 +166,7 @@ func (logBuffer *LogBuffer) loopFlush() { logBuffer.lastFlushDataTime = d.stopTime } } + logBuffer.isAllFlushed = true } func (logBuffer *LogBuffer) loopInterval() { |
