aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/buffered_queue/buffered_queue.go13
-rw-r--r--weed/util/log_buffer/log_buffer.go15
-rw-r--r--weed/util/log_buffer/log_buffer_test.go7
3 files changed, 33 insertions, 2 deletions
diff --git a/weed/util/buffered_queue/buffered_queue.go b/weed/util/buffered_queue/buffered_queue.go
index edaa0a7ce..042561cdd 100644
--- a/weed/util/buffered_queue/buffered_queue.go
+++ b/weed/util/buffered_queue/buffered_queue.go
@@ -117,6 +117,19 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) {
return job, true
}
+func (q *BufferedQueue[T]) PeekHead() (T, bool) {
+ q.mutex.Lock()
+ defer q.mutex.Unlock()
+
+ if q.count <= 0 {
+ var a T
+ return a, false
+ }
+
+ job := q.head.items[q.head.headIndex]
+ return job, true
+}
+
// Size returns the number of items in the queue
func (q *BufferedQueue[T]) Size() int {
q.mutex.Lock()
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index fa956317e..efe42176e 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -2,6 +2,7 @@ package log_buffer
import (
"bytes"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"sync"
"sync/atomic"
"time"
@@ -43,6 +44,7 @@ type LogBuffer struct {
ReadFromDiskFn LogReadFromDiskFuncType
notifyFn func()
isStopping *atomic.Bool
+ isAllFlushed bool
flushChan chan *dataToFlush
LastTsNs int64
sync.RWMutex
@@ -67,7 +69,11 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc
return lb
}
-func (logBuffer *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) {
+func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) {
+ logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs)
+}
+
+func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) {
var toFlush *dataToFlush
logBuffer.Lock()
@@ -134,6 +140,7 @@ func (logBuffer *LogBuffer) IsStopping() bool {
return logBuffer.isStopping.Load()
}
+// ShutdownLogBuffer flushes the buffer and stops the log buffer
func (logBuffer *LogBuffer) ShutdownLogBuffer() {
isAlreadyStopped := logBuffer.isStopping.Swap(true)
if isAlreadyStopped {
@@ -144,6 +151,11 @@ func (logBuffer *LogBuffer) ShutdownLogBuffer() {
close(logBuffer.flushChan)
}
+// IsAllFlushed returns true if all data in the buffer has been flushed, after calling ShutdownLogBuffer().
+func (logBuffer *LogBuffer) IsAllFlushed() bool {
+ return logBuffer.isAllFlushed
+}
+
func (logBuffer *LogBuffer) loopFlush() {
for d := range logBuffer.flushChan {
if d != nil {
@@ -154,6 +166,7 @@ func (logBuffer *LogBuffer) loopFlush() {
logBuffer.lastFlushDataTime = d.stopTime
}
}
+ logBuffer.isAllFlushed = true
}
func (logBuffer *LogBuffer) loopInterval() {
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
index 84279f625..067a02ef4 100644
--- a/weed/util/log_buffer/log_buffer_test.go
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -3,6 +3,7 @@ package log_buffer
import (
"crypto/rand"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io"
"sync"
"testing"
@@ -50,7 +51,11 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
var buf = make([]byte, messageSize)
for i := 0; i < messageCount; i++ {
rand.Read(buf)
- lb.AddToBuffer(nil, buf, 0)
+ lb.AddToBuffer(&mq_pb.DataMessage{
+ Key: nil,
+ Value: buf,
+ TsNs: 0,
+ })
}
wg.Wait()