aboutsummaryrefslogtreecommitdiff
path: root/weed/util/buffered_queue/buffered_queue.go
blob: 9c65899e9b03895e73fee671b974f5e7dba231a8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package buffered_queue

import (
	"fmt"
	"sync"
)

// ItemChunkNode represents a node in the linked list of job chunks
type ItemChunkNode[T any] struct {
	items     []T
	headIndex int
	tailIndex int
	next      *ItemChunkNode[T]
	nodeId    int
}

// BufferedQueue implements a buffered queue using a linked list of job chunks
type BufferedQueue[T any] struct {
	chunkSize   int // Maximum number of items per chunk
	head        *ItemChunkNode[T]
	tail        *ItemChunkNode[T]
	last        *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory
	count       int               // Total number of items in the queue
	mutex       sync.Mutex
	nodeCounter int
	waitCond    *sync.Cond
	isClosed    bool
}

// NewBufferedQueue creates a new buffered queue with the specified chunk size
func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] {
	// Create an empty chunk to initialize head and tail
	chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0}
	bq := &BufferedQueue[T]{
		chunkSize: chunkSize,
		head:      chunk,
		tail:      chunk,
		last:      chunk,
		count:     0,
		mutex:     sync.Mutex{},
	}
	bq.waitCond = sync.NewCond(&bq.mutex)
	return bq
}

// Enqueue adds a job to the queue
func (q *BufferedQueue[T]) Enqueue(job T) error {

	if q.isClosed {
		return fmt.Errorf("queue is closed")
	}

	q.mutex.Lock()
	defer q.mutex.Unlock()

	// If the tail chunk is full, create a new chunk (reusing empty chunks if available)
	if q.tail.tailIndex == q.chunkSize {
		if q.tail == q.last {
			// Create a new chunk
			q.nodeCounter++
			newChunk := &ItemChunkNode[T]{items: make([]T, q.chunkSize), nodeId: q.nodeCounter}
			q.tail.next = newChunk
			q.tail = newChunk
			q.last = newChunk
		} else {
			// Reuse an empty chunk
			q.tail = q.tail.next
			q.tail.headIndex = 0
			q.tail.tailIndex = 0
			// println("tail moved to chunk", q.tail.nodeId)
		}
	}

	// Add the job to the tail chunk
	q.tail.items[q.tail.tailIndex] = job
	q.tail.tailIndex++
	q.count++
	if q.count == 1 {
		q.waitCond.Signal()
	}

	return nil
}

// Dequeue removes and returns a job from the queue
func (q *BufferedQueue[T]) Dequeue() (T, bool) {
	q.mutex.Lock()
	defer q.mutex.Unlock()

	for q.count <= 0 && !q.isClosed {
		q.waitCond.Wait()
	}
	if q.count <= 0 && q.isClosed {
		var a T
		return a, false
	}

	q.maybeAdjustHeadIndex()

	job := q.head.items[q.head.headIndex]
	q.head.headIndex++
	q.count--

	return job, true
}

func (q *BufferedQueue[T]) maybeAdjustHeadIndex() {
	if q.head.headIndex == q.chunkSize {
		q.last.next = q.head
		q.head = q.head.next
		q.last = q.last.next
		q.last.next = nil
		//println("reusing chunk", q.last.nodeId)
		//fmt.Printf("head: %+v\n", q.head)
		//fmt.Printf("tail: %+v\n", q.tail)
		//fmt.Printf("last: %+v\n", q.last)
		//fmt.Printf("count: %d\n", q.count)
		//for p := q.head; p != nil ; p = p.next {
		//	fmt.Printf("Node: %+v\n", p)
		//}
	}
}

func (q *BufferedQueue[T]) PeekHead() (T, bool) {
	q.mutex.Lock()
	defer q.mutex.Unlock()

	if q.count <= 0 {
		var a T
		return a, false
	}

	q.maybeAdjustHeadIndex()

	job := q.head.items[q.head.headIndex]
	return job, true
}

// Size returns the number of items in the queue
func (q *BufferedQueue[T]) Size() int {
	q.mutex.Lock()
	defer q.mutex.Unlock()
	return q.count
}

// IsEmpty returns true if the queue is empty
func (q *BufferedQueue[T]) IsEmpty() bool {
	return q.Size() == 0
}

func (q *BufferedQueue[T]) CloseInput() {
	q.mutex.Lock()
	defer q.mutex.Unlock()
	q.isClosed = true
	q.waitCond.Broadcast()
}