diff options
Diffstat (limited to 'weed/util')
| -rw-r--r-- | weed/util/buffered_queue/buffered_queue.go | 13 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 15 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer_test.go | 7 |
3 files changed, 33 insertions, 2 deletions
diff --git a/weed/util/buffered_queue/buffered_queue.go b/weed/util/buffered_queue/buffered_queue.go index edaa0a7ce..042561cdd 100644 --- a/weed/util/buffered_queue/buffered_queue.go +++ b/weed/util/buffered_queue/buffered_queue.go @@ -117,6 +117,19 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) { return job, true } +func (q *BufferedQueue[T]) PeekHead() (T, bool) { + q.mutex.Lock() + defer q.mutex.Unlock() + + if q.count <= 0 { + var a T + return a, false + } + + job := q.head.items[q.head.headIndex] + return job, true +} + // Size returns the number of items in the queue func (q *BufferedQueue[T]) Size() int { q.mutex.Lock() diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index fa956317e..efe42176e 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -2,6 +2,7 @@ package log_buffer import ( "bytes" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "sync" "sync/atomic" "time" @@ -43,6 +44,7 @@ type LogBuffer struct { ReadFromDiskFn LogReadFromDiskFuncType notifyFn func() isStopping *atomic.Bool + isAllFlushed bool flushChan chan *dataToFlush LastTsNs int64 sync.RWMutex @@ -67,7 +69,11 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc return lb } -func (logBuffer *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { +func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) { + logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs) +} + +func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) { var toFlush *dataToFlush logBuffer.Lock() @@ -134,6 +140,7 @@ func (logBuffer *LogBuffer) IsStopping() bool { return logBuffer.isStopping.Load() } +// ShutdownLogBuffer flushes the buffer and stops the log buffer func (logBuffer *LogBuffer) ShutdownLogBuffer() { isAlreadyStopped := logBuffer.isStopping.Swap(true) if isAlreadyStopped { @@ -144,6 +151,11 @@ func (logBuffer *LogBuffer) ShutdownLogBuffer() { close(logBuffer.flushChan) } +// IsAllFlushed returns true if all data in the buffer has been flushed, after calling ShutdownLogBuffer(). +func (logBuffer *LogBuffer) IsAllFlushed() bool { + return logBuffer.isAllFlushed +} + func (logBuffer *LogBuffer) loopFlush() { for d := range logBuffer.flushChan { if d != nil { @@ -154,6 +166,7 @@ func (logBuffer *LogBuffer) loopFlush() { logBuffer.lastFlushDataTime = d.stopTime } } + logBuffer.isAllFlushed = true } func (logBuffer *LogBuffer) loopInterval() { diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index 84279f625..067a02ef4 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -3,6 +3,7 @@ package log_buffer import ( "crypto/rand" "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "io" "sync" "testing" @@ -50,7 +51,11 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { var buf = make([]byte, messageSize) for i := 0; i < messageCount; i++ { rand.Read(buf) - lb.AddToBuffer(nil, buf, 0) + lb.AddToBuffer(&mq_pb.DataMessage{ + Key: nil, + Value: buf, + TsNs: 0, + }) } wg.Wait() |
