aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/util/buffered_queue/buffered_queue.go24
1 files changed, 22 insertions, 2 deletions
diff --git a/weed/util/buffered_queue/buffered_queue.go b/weed/util/buffered_queue/buffered_queue.go
index fedfcee51..cc33e35c0 100644
--- a/weed/util/buffered_queue/buffered_queue.go
+++ b/weed/util/buffered_queue/buffered_queue.go
@@ -1,6 +1,7 @@
package buffered_queue
import (
+ "fmt"
"sync"
)
@@ -24,6 +25,7 @@ type BufferedQueue[T any] struct {
nodeCounter int
waitOnRead bool
waitCond *sync.Cond
+ isClosed bool
}
// NewBufferedQueue creates a new buffered queue with the specified chunk size
@@ -44,7 +46,12 @@ func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] {
}
// Enqueue adds a job to the queue
-func (q *BufferedQueue[T]) Enqueue(job T) {
+func (q *BufferedQueue[T]) Enqueue(job T) error {
+
+ if q.isClosed {
+ return fmt.Errorf("queue is closed")
+ }
+
q.mutex.Lock()
defer q.mutex.Unlock()
@@ -73,6 +80,8 @@ func (q *BufferedQueue[T]) Enqueue(job T) {
if q.waitOnRead {
q.waitCond.Signal()
}
+
+ return nil
}
// Dequeue removes and returns a job from the queue
@@ -81,9 +90,13 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) {
defer q.mutex.Unlock()
if q.waitOnRead {
- for q.count <= 0 {
+ for q.count <= 0 && !q.isClosed {
q.waitCond.Wait()
}
+ if q.isClosed {
+ var a T
+ return a, false
+ }
} else {
if q.count == 0 {
var a T
@@ -124,3 +137,10 @@ func (q *BufferedQueue[T]) Size() int {
func (q *BufferedQueue[T]) IsEmpty() bool {
return q.Size() == 0
}
+
+func (q *BufferedQueue[T]) CloseInput() {
+ q.mutex.Lock()
+ defer q.mutex.Unlock()
+ q.isClosed = true
+ q.waitCond.Broadcast()
+}