aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/queue_unbounded.go45
-rw-r--r--weed/util/queue_unbounded_test.go25
2 files changed, 70 insertions, 0 deletions
diff --git a/weed/util/queue_unbounded.go b/weed/util/queue_unbounded.go
new file mode 100644
index 000000000..664cd965e
--- /dev/null
+++ b/weed/util/queue_unbounded.go
@@ -0,0 +1,45 @@
+package util
+
+import "sync"
+
+type UnboundedQueue struct {
+ outbound []string
+ outboundLock sync.RWMutex
+ inbound []string
+ inboundLock sync.RWMutex
+}
+
+func NewUnboundedQueue() *UnboundedQueue {
+ q := &UnboundedQueue{}
+ return q
+}
+
+func (q *UnboundedQueue) EnQueue(items ...string) {
+ q.inboundLock.Lock()
+ defer q.inboundLock.Unlock()
+
+ q.outbound = append(q.outbound, items...)
+
+}
+
+func (q *UnboundedQueue) Consume(fn func([]string)) {
+ q.outboundLock.Lock()
+ defer q.outboundLock.Unlock()
+
+ if len(q.outbound) == 0 {
+ q.inboundLock.Lock()
+ inbountLen := len(q.inbound)
+ if inbountLen > 0 {
+ t := q.outbound
+ q.outbound = q.inbound
+ q.inbound = t
+ }
+ q.inboundLock.Unlock()
+ }
+
+ if len(q.outbound) > 0 {
+ fn(q.outbound)
+ q.outbound = q.outbound[:0]
+ }
+
+}
diff --git a/weed/util/queue_unbounded_test.go b/weed/util/queue_unbounded_test.go
new file mode 100644
index 000000000..2d02032cb
--- /dev/null
+++ b/weed/util/queue_unbounded_test.go
@@ -0,0 +1,25 @@
+package util
+
+import "testing"
+
+func TestEnqueueAndConsume(t *testing.T) {
+
+ q := NewUnboundedQueue()
+
+ q.EnQueue("1", "2", "3")
+
+ f := func(items []string) {
+ for _, t := range items {
+ println(t)
+ }
+ println("-----------------------")
+ }
+ q.Consume(f)
+
+ q.Consume(f)
+
+ q.EnQueue("4", "5")
+ q.EnQueue("6", "7")
+ q.Consume(f)
+
+}