diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-27 16:12:49 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-27 16:12:49 -0800 |
| commit | 5dc1362bdcdfdda355fa265313eea0f623733bfd (patch) | |
| tree | 91b3363cff26385f0f49cd40842e5b8ec58b362a | |
| parent | fa835c93195d9effee2d513ce628bce87a5f3201 (diff) | |
| download | seaweedfs-5dc1362bdcdfdda355fa265313eea0f623733bfd.tar.xz seaweedfs-5dc1362bdcdfdda355fa265313eea0f623733bfd.zip | |
close the input
| -rw-r--r-- | weed/util/buffered_queue/buffered_queue.go | 24 |
1 files changed, 22 insertions, 2 deletions
diff --git a/weed/util/buffered_queue/buffered_queue.go b/weed/util/buffered_queue/buffered_queue.go index fedfcee51..cc33e35c0 100644 --- a/weed/util/buffered_queue/buffered_queue.go +++ b/weed/util/buffered_queue/buffered_queue.go @@ -1,6 +1,7 @@ package buffered_queue import ( + "fmt" "sync" ) @@ -24,6 +25,7 @@ type BufferedQueue[T any] struct { nodeCounter int waitOnRead bool waitCond *sync.Cond + isClosed bool } // NewBufferedQueue creates a new buffered queue with the specified chunk size @@ -44,7 +46,12 @@ func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] { } // Enqueue adds a job to the queue -func (q *BufferedQueue[T]) Enqueue(job T) { +func (q *BufferedQueue[T]) Enqueue(job T) error { + + if q.isClosed { + return fmt.Errorf("queue is closed") + } + q.mutex.Lock() defer q.mutex.Unlock() @@ -73,6 +80,8 @@ func (q *BufferedQueue[T]) Enqueue(job T) { if q.waitOnRead { q.waitCond.Signal() } + + return nil } // Dequeue removes and returns a job from the queue @@ -81,9 +90,13 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) { defer q.mutex.Unlock() if q.waitOnRead { - for q.count <= 0 { + for q.count <= 0 && !q.isClosed { q.waitCond.Wait() } + if q.isClosed { + var a T + return a, false + } } else { if q.count == 0 { var a T @@ -124,3 +137,10 @@ func (q *BufferedQueue[T]) Size() int { func (q *BufferedQueue[T]) IsEmpty() bool { return q.Size() == 0 } + +func (q *BufferedQueue[T]) CloseInput() { + q.mutex.Lock() + defer q.mutex.Unlock() + q.isClosed = true + q.waitCond.Broadcast() +} |
