aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2024-01-27 13:51:19 -0800
committerChris Lu <chris.lu@gmail.com>2024-01-27 13:51:19 -0800
commitb6de35cdb276057df37c588c7b7ac37563e41fb1 (patch)
tree2c287b22324578f924c806a29e9e251194d1ada6
parent91af1f3069aef2102231dd073289fc17266e3a1f (diff)
downloadseaweedfs-b6de35cdb276057df37c588c7b7ac37563e41fb1.tar.xz
seaweedfs-b6de35cdb276057df37c588c7b7ac37563e41fb1.zip
moved. there are some deadlock. WIP
-rw-r--r--weed/util/buffered_queue/buffered_queue.go (renamed from weed/util/buffered_queue.go)54
-rw-r--r--weed/util/buffered_queue/buffered_queue_test.go (renamed from weed/util/buffered_queue_test.go)28
2 files changed, 59 insertions, 23 deletions
diff --git a/weed/util/buffered_queue.go b/weed/util/buffered_queue/buffered_queue.go
index b2fbcd3c7..fedfcee51 100644
--- a/weed/util/buffered_queue.go
+++ b/weed/util/buffered_queue/buffered_queue.go
@@ -1,4 +1,4 @@
-package util
+package buffered_queue
import (
"sync"
@@ -9,33 +9,38 @@ type ItemChunkNode[T any] struct {
items []T
headIndex int
tailIndex int
- next *ItemChunkNode[T]
- nodeId int
+ next *ItemChunkNode[T]
+ nodeId int
}
// BufferedQueue implements a buffered queue using a linked list of job chunks
type BufferedQueue[T any] struct {
- chunkSize int // Maximum number of items per chunk
- head *ItemChunkNode[T]
- tail *ItemChunkNode[T]
- last *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory
- count int // Total number of items in the queue
- mutex sync.Mutex
+ chunkSize int // Maximum number of items per chunk
+ head *ItemChunkNode[T]
+ tail *ItemChunkNode[T]
+ last *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory
+ count int // Total number of items in the queue
+ mutex sync.Mutex
nodeCounter int
+ waitOnRead bool
+ waitCond *sync.Cond
}
// NewBufferedQueue creates a new buffered queue with the specified chunk size
-func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] {
+func NewBufferedQueue[T any](chunkSize int, waitOnRead bool) *BufferedQueue[T] {
// Create an empty chunk to initialize head and tail
chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0}
- return &BufferedQueue[T]{
- chunkSize: chunkSize,
- head: chunk,
- tail: chunk,
- last: chunk,
- count: 0,
- mutex: sync.Mutex{},
+ bq := &BufferedQueue[T]{
+ chunkSize: chunkSize,
+ head: chunk,
+ tail: chunk,
+ last: chunk,
+ count: 0,
+ mutex: sync.Mutex{},
+ waitOnRead: waitOnRead,
}
+ bq.waitCond = sync.NewCond(&bq.mutex)
+ return bq
}
// Enqueue adds a job to the queue
@@ -65,6 +70,9 @@ func (q *BufferedQueue[T]) Enqueue(job T) {
q.tail.items[q.tail.tailIndex] = job
q.tail.tailIndex++
q.count++
+ if q.waitOnRead {
+ q.waitCond.Signal()
+ }
}
// Dequeue removes and returns a job from the queue
@@ -72,9 +80,15 @@ func (q *BufferedQueue[T]) Dequeue() (T, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
- if q.count == 0 {
- var a T
- return a, false
+ if q.waitOnRead {
+ for q.count <= 0 {
+ q.waitCond.Wait()
+ }
+ } else {
+ if q.count == 0 {
+ var a T
+ return a, false
+ }
}
job := q.head.items[q.head.headIndex]
diff --git a/weed/util/buffered_queue_test.go b/weed/util/buffered_queue/buffered_queue_test.go
index a4b08c036..c4236cd40 100644
--- a/weed/util/buffered_queue_test.go
+++ b/weed/util/buffered_queue/buffered_queue_test.go
@@ -1,4 +1,4 @@
-package util
+package buffered_queue
import "testing"
@@ -9,7 +9,7 @@ func TestJobQueue(t *testing.T) {
Data T
}
- queue := NewBufferedQueue[Job[string]](2) // Chunk size of 5
+ queue := NewBufferedQueue[Job[string]](2, false) // Chunk size of 5
queue.Enqueue(Job[string]{ID: 1, Action: "task1", Data: "hello"})
queue.Enqueue(Job[string]{ID: 2, Action: "task2", Data: "world"})
@@ -62,7 +62,7 @@ func TestJobQueue(t *testing.T) {
for i := 0; i < 5; i++ {
println("enqueue", i+8)
- queue.Enqueue(Job[string]{ID: i+8, Action: "task", Data: "data"})
+ queue.Enqueue(Job[string]{ID: i + 8, Action: "task", Data: "data"})
}
for i := 0; i < 5; i++ {
job, ok = queue.Dequeue()
@@ -76,3 +76,25 @@ func TestJobQueue(t *testing.T) {
}
}
+
+func BenchmarkBufferedQueue(b *testing.B) {
+ type Job[T any] struct {
+ ID int
+ Action string
+ Data T
+ }
+
+ queue := NewBufferedQueue[Job[string]](1024, true)
+
+ b.Run("Enqueue", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ queue.Enqueue(Job[string]{ID: i, Action: "task", Data: "data"})
+ }
+ })
+
+ b.Run("Dequeue", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ _, _ = queue.Dequeue()
+ }
+ })
+}