diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-28 13:10:34 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-28 13:10:34 -0800 |
| commit | b6c5e57c30da55640c27783385589110fc8e08ee (patch) | |
| tree | a659f0b5a37995e7fcad8468d50a058281da4d0e /weed/util/buffered_queue/buffered_queue.go | |
| parent | 0bf5424a2ece6614fd77e88ce1b169b2986a0622 (diff) | |
| download | seaweedfs-b6c5e57c30da55640c27783385589110fc8e08ee.tar.xz seaweedfs-b6c5e57c30da55640c27783385589110fc8e08ee.zip | |
read will block if no items
Diffstat (limited to 'weed/util/buffered_queue/buffered_queue.go')
| -rw-r--r-- | weed/util/buffered_queue/buffered_queue.go | 25 |
1 files changed, 8 insertions, 17 deletions
diff --git a/weed/util/buffered_queue/buffered_queue.go b/weed/util/buffered_queue/buffered_queue.go index cc33e35c0..6f5f79eb5 100644 --- a/weed/util/buffered_queue/buffered_queue.go +++ b/weed/util/buffered_queue/buffered_queue.go @@ -23,13 +23,12 @@ type BufferedQueue[T any] struct { count int // Total number of items in the queue mutex sync.Mutex nodeCounter int - waitOnRead bool waitCond *sync.Cond isClosed bool } // NewBufferedQueue creates a new buffered queue with the specified chunk size -func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] { +func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] { // Create an empty chunk to initialize head and tail chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0} bq := &BufferedQueue[T]{ @@ -39,7 +38,6 @@ func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] { last: chunk, count: 0, mutex: sync.Mutex{}, - waitOnRead: waitOnRead, } bq.waitCond = sync.NewCond(&bq.mutex) return bq @@ -77,7 +75,7 @@ func (q *BufferedQueue[T]) Enqueue(job T) error { q.tail.items[q.tail.tailIndex] = job q.tail.tailIndex++ q.count++ - if q.waitOnRead { + if q.count == 1 { q.waitCond.Signal() } @@ -89,19 +87,12 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) { q.mutex.Lock() defer q.mutex.Unlock() - if q.waitOnRead { - for q.count <= 0 && !q.isClosed { - q.waitCond.Wait() - } - if q.isClosed { - var a T - return a, false - } - } else { - if q.count == 0 { - var a T - return a, false - } + for q.count <= 0 && !q.isClosed { + q.waitCond.Wait() + } + if q.count <= 0 && q.isClosed { + var a T + return a, false } job := q.head.items[q.head.headIndex] |
