aboutsummaryrefslogtreecommitdiff
path: root/weed/util/queue_unbounded.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-02-20 15:44:17 -0800
committerChris Lu <chris.lu@gmail.com>2020-02-20 15:44:17 -0800
commit621cdbdf587dfd2218229055e74613ccae17d7d7 (patch)
tree9a266fa601f2367eaf3cbcdee997c0173d56c7f8 /weed/util/queue_unbounded.go
parent45156cc2feef983068dbd9c7c513fbb1772527b0 (diff)
downloadseaweedfs-621cdbdf587dfd2218229055e74613ccae17d7d7.tar.xz
seaweedfs-621cdbdf587dfd2218229055e74613ccae17d7d7.zip
filer: avoid possible timeouts for updates and deletions
Diffstat (limited to 'weed/util/queue_unbounded.go')
-rw-r--r--weed/util/queue_unbounded.go45
1 files changed, 45 insertions, 0 deletions
diff --git a/weed/util/queue_unbounded.go b/weed/util/queue_unbounded.go
new file mode 100644
index 000000000..664cd965e
--- /dev/null
+++ b/weed/util/queue_unbounded.go
@@ -0,0 +1,45 @@
+package util
+
+import "sync"
+
+type UnboundedQueue struct {
+ outbound []string
+ outboundLock sync.RWMutex
+ inbound []string
+ inboundLock sync.RWMutex
+}
+
+func NewUnboundedQueue() *UnboundedQueue {
+ q := &UnboundedQueue{}
+ return q
+}
+
+func (q *UnboundedQueue) EnQueue(items ...string) {
+ q.inboundLock.Lock()
+ defer q.inboundLock.Unlock()
+
+ q.outbound = append(q.outbound, items...)
+
+}
+
+func (q *UnboundedQueue) Consume(fn func([]string)) {
+ q.outboundLock.Lock()
+ defer q.outboundLock.Unlock()
+
+ if len(q.outbound) == 0 {
+ q.inboundLock.Lock()
+ inbountLen := len(q.inbound)
+ if inbountLen > 0 {
+ t := q.outbound
+ q.outbound = q.inbound
+ q.inbound = t
+ }
+ q.inboundLock.Unlock()
+ }
+
+ if len(q.outbound) > 0 {
+ fn(q.outbound)
+ q.outbound = q.outbound[:0]
+ }
+
+}