aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker/subscription.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-05-15 21:38:42 -0700
committerChris Lu <chris.lu@gmail.com>2020-05-15 21:38:42 -0700
commite02a8c67daade10a305834e1c19c0c73905ce1c6 (patch)
treee0ae1b75eb8631ac33349d0a4a354f2a796c76cf /weed/messaging/broker/subscription.go
parentf8aed8a7f535aa142d0ae3c4f1b8ab52f45ad84a (diff)
downloadseaweedfs-e02a8c67daade10a305834e1c19c0c73905ce1c6.tar.xz
seaweedfs-e02a8c67daade10a305834e1c19c0c73905ce1c6.zip
revert to one subscriber one thread
Diffstat (limited to 'weed/messaging/broker/subscription.go')
-rw-r--r--weed/messaging/broker/subscription.go82
1 files changed, 0 insertions, 82 deletions
diff --git a/weed/messaging/broker/subscription.go b/weed/messaging/broker/subscription.go
deleted file mode 100644
index f74b0c546..000000000
--- a/weed/messaging/broker/subscription.go
+++ /dev/null
@@ -1,82 +0,0 @@
-package broker
-
-import (
- "sync"
-)
-
-type TopicPartitionSubscription struct {
- sync.Mutex
- name string
- lastReadTsNs int64
- cond *sync.Cond
-}
-
-func NewTopicPartitionSubscription(name string) *TopicPartitionSubscription {
- t := &TopicPartitionSubscription{
- name: name,
- }
- t.cond = sync.NewCond(t)
- return t
-}
-
-func (s *TopicPartitionSubscription) Wait() {
- s.Mutex.Lock()
- s.cond.Wait()
- s.Mutex.Unlock()
-}
-
-func (s *TopicPartitionSubscription) NotifyOne() {
- // notify one waiting goroutine
- s.cond.Signal()
-}
-
-type TopicPartitionSubscriptions struct {
- sync.Mutex
- cond *sync.Cond
- subscriptions map[string]*TopicPartitionSubscription
- subscriptionsLock sync.RWMutex
-}
-
-func NewTopicPartitionSubscriptions() *TopicPartitionSubscriptions {
- m := &TopicPartitionSubscriptions{
- subscriptions: make(map[string]*TopicPartitionSubscription),
- }
- m.cond = sync.NewCond(m)
- return m
-}
-
-func (m *TopicPartitionSubscriptions) AddSubscription(subscription string) *TopicPartitionSubscription {
- m.subscriptionsLock.Lock()
- defer m.subscriptionsLock.Unlock()
-
- if s, found := m.subscriptions[subscription]; found {
- return s
- }
-
- s := NewTopicPartitionSubscription(subscription)
- m.subscriptions[subscription] = s
-
- return s
-
-}
-
-func (m *TopicPartitionSubscriptions) NotifyAll() {
-
- m.subscriptionsLock.RLock()
- defer m.subscriptionsLock.RUnlock()
-
- for name, tps := range m.subscriptions {
- println("notifying", name)
- tps.NotifyOne()
- }
-
-}
-
-func (m *TopicPartitionSubscriptions) Wait() {
- m.Mutex.Lock()
- m.cond.Wait()
- for _, tps := range m.subscriptions {
- tps.NotifyOne()
- }
- m.Mutex.Unlock()
-}