aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-27 16:12:49 -0800
committerchrislu <chris.lu@gmail.com>2024-01-27 16:12:49 -0800
commit5dc1362bdcdfdda355fa265313eea0f623733bfd (patch)
tree91b3363cff26385f0f49cd40842e5b8ec58b362a
parentfa835c93195d9effee2d513ce628bce87a5f3201 (diff)
downloadseaweedfs-5dc1362bdcdfdda355fa265313eea0f623733bfd.tar.xz
seaweedfs-5dc1362bdcdfdda355fa265313eea0f623733bfd.zip
close the input
-rw-r--r--weed/util/buffered_queue/buffered_queue.go24
1 files changed, 22 insertions, 2 deletions
diff --git a/weed/util/buffered_queue/buffered_queue.go b/weed/util/buffered_queue/buffered_queue.go
index fedfcee51..cc33e35c0 100644
--- a/weed/util/buffered_queue/buffered_queue.go
+++ b/weed/util/buffered_queue/buffered_queue.go
@@ -1,6 +1,7 @@
package buffered_queue
import (
+ "fmt"
"sync"
)
@@ -24,6 +25,7 @@ type BufferedQueue[T any] struct {
nodeCounter int
waitOnRead bool
waitCond *sync.Cond
+ isClosed bool
}
// NewBufferedQueue creates a new buffered queue with the specified chunk size
@@ -44,7 +46,12 @@ func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] {
}
// Enqueue adds a job to the queue
-func (q *BufferedQueue[T]) Enqueue(job T) {
+func (q *BufferedQueue[T]) Enqueue(job T) error {
+
+ if q.isClosed {
+ return fmt.Errorf("queue is closed")
+ }
+
q.mutex.Lock()
defer q.mutex.Unlock()
@@ -73,6 +80,8 @@ func (q *BufferedQueue[T]) Enqueue(job T) {
if q.waitOnRead {
q.waitCond.Signal()
}
+
+ return nil
}
// Dequeue removes and returns a job from the queue
@@ -81,9 +90,13 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) {
defer q.mutex.Unlock()
if q.waitOnRead {
- for q.count <= 0 {
+ for q.count <= 0 && !q.isClosed {
q.waitCond.Wait()
}
+ if q.isClosed {
+ var a T
+ return a, false
+ }
} else {
if q.count == 0 {
var a T
@@ -124,3 +137,10 @@ func (q *BufferedQueue[T]) Size() int {
func (q *BufferedQueue[T]) IsEmpty() bool {
return q.Size() == 0
}
+
+func (q *BufferedQueue[T]) CloseInput() {
+ q.mutex.Lock()
+ defer q.mutex.Unlock()
+ q.isClosed = true
+ q.waitCond.Broadcast()
+}