aboutsummaryrefslogtreecommitdiff
path: root/weed/util/buffered_queue/buffered_queue.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-28 13:10:34 -0800
committerchrislu <chris.lu@gmail.com>2024-01-28 13:10:34 -0800
commitb6c5e57c30da55640c27783385589110fc8e08ee (patch)
treea659f0b5a37995e7fcad8468d50a058281da4d0e /weed/util/buffered_queue/buffered_queue.go
parent0bf5424a2ece6614fd77e88ce1b169b2986a0622 (diff)
downloadseaweedfs-b6c5e57c30da55640c27783385589110fc8e08ee.tar.xz
seaweedfs-b6c5e57c30da55640c27783385589110fc8e08ee.zip
read will block if no items
Diffstat (limited to 'weed/util/buffered_queue/buffered_queue.go')
-rw-r--r--weed/util/buffered_queue/buffered_queue.go25
1 files changed, 8 insertions, 17 deletions
diff --git a/weed/util/buffered_queue/buffered_queue.go b/weed/util/buffered_queue/buffered_queue.go
index cc33e35c0..6f5f79eb5 100644
--- a/weed/util/buffered_queue/buffered_queue.go
+++ b/weed/util/buffered_queue/buffered_queue.go
@@ -23,13 +23,12 @@ type BufferedQueue[T any] struct {
count int // Total number of items in the queue
mutex sync.Mutex
nodeCounter int
- waitOnRead bool
waitCond *sync.Cond
isClosed bool
}
// NewBufferedQueue creates a new buffered queue with the specified chunk size
-func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] {
+func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] {
// Create an empty chunk to initialize head and tail
chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0}
bq := &BufferedQueue[T]{
@@ -39,7 +38,6 @@ func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] {
last: chunk,
count: 0,
mutex: sync.Mutex{},
- waitOnRead: waitOnRead,
}
bq.waitCond = sync.NewCond(&bq.mutex)
return bq
@@ -77,7 +75,7 @@ func (q *BufferedQueue[T]) Enqueue(job T) error {
q.tail.items[q.tail.tailIndex] = job
q.tail.tailIndex++
q.count++
- if q.waitOnRead {
+ if q.count == 1 {
q.waitCond.Signal()
}
@@ -89,19 +87,12 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
- if q.waitOnRead {
- for q.count <= 0 && !q.isClosed {
- q.waitCond.Wait()
- }
- if q.isClosed {
- var a T
- return a, false
- }
- } else {
- if q.count == 0 {
- var a T
- return a, false
- }
+ for q.count <= 0 && !q.isClosed {
+ q.waitCond.Wait()
+ }
+ if q.count <= 0 && q.isClosed {
+ var a T
+ return a, false
}
job := q.head.items[q.head.headIndex]