1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
package buffered_queue
import (
"fmt"
"sync"
)
// ItemChunkNode represents a node in the linked list of job chunks
type ItemChunkNode[T any] struct {
items []T
headIndex int
tailIndex int
next *ItemChunkNode[T]
nodeId int
}
// BufferedQueue implements a buffered queue using a linked list of job chunks
type BufferedQueue[T any] struct {
chunkSize int // Maximum number of items per chunk
head *ItemChunkNode[T]
tail *ItemChunkNode[T]
last *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory
count int // Total number of items in the queue
mutex sync.Mutex
nodeCounter int
waitCond *sync.Cond
isClosed bool
}
// NewBufferedQueue creates a new buffered queue with the specified chunk size
func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] {
// Create an empty chunk to initialize head and tail
chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0}
bq := &BufferedQueue[T]{
chunkSize: chunkSize,
head: chunk,
tail: chunk,
last: chunk,
count: 0,
mutex: sync.Mutex{},
}
bq.waitCond = sync.NewCond(&bq.mutex)
return bq
}
// Enqueue adds a job to the queue
func (q *BufferedQueue[T]) Enqueue(job T) error {
if q.isClosed {
return fmt.Errorf("queue is closed")
}
q.mutex.Lock()
defer q.mutex.Unlock()
// If the tail chunk is full, create a new chunk (reusing empty chunks if available)
if q.tail.tailIndex == q.chunkSize {
if q.tail == q.last {
// Create a new chunk
q.nodeCounter++
newChunk := &ItemChunkNode[T]{items: make([]T, q.chunkSize), nodeId: q.nodeCounter}
q.tail.next = newChunk
q.tail = newChunk
q.last = newChunk
} else {
// Reuse an empty chunk
q.tail = q.tail.next
q.tail.headIndex = 0
q.tail.tailIndex = 0
// println("tail moved to chunk", q.tail.nodeId)
}
}
// Add the job to the tail chunk
q.tail.items[q.tail.tailIndex] = job
q.tail.tailIndex++
q.count++
if q.count == 1 {
q.waitCond.Signal()
}
return nil
}
// Dequeue removes and returns a job from the queue
func (q *BufferedQueue[T]) Dequeue() (T, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
for q.count <= 0 && !q.isClosed {
q.waitCond.Wait()
}
if q.count <= 0 && q.isClosed {
var a T
return a, false
}
job := q.head.items[q.head.headIndex]
q.head.headIndex++
q.count--
if q.head.headIndex == q.chunkSize {
q.last.next = q.head
q.head = q.head.next
q.last = q.last.next
q.last.next = nil
//println("reusing chunk", q.last.nodeId)
//fmt.Printf("head: %+v\n", q.head)
//fmt.Printf("tail: %+v\n", q.tail)
//fmt.Printf("last: %+v\n", q.last)
//fmt.Printf("count: %d\n", q.count)
//for p := q.head; p != nil ; p = p.next {
// fmt.Printf("Node: %+v\n", p)
//}
}
return job, true
}
// Size returns the number of items in the queue
func (q *BufferedQueue[T]) Size() int {
q.mutex.Lock()
defer q.mutex.Unlock()
return q.count
}
// IsEmpty returns true if the queue is empty
func (q *BufferedQueue[T]) IsEmpty() bool {
return q.Size() == 0
}
func (q *BufferedQueue[T]) CloseInput() {
q.mutex.Lock()
defer q.mutex.Unlock()
q.isClosed = true
q.waitCond.Broadcast()
}
|