diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-05-15 21:38:42 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-05-15 21:38:42 -0700 |
| commit | e02a8c67daade10a305834e1c19c0c73905ce1c6 (patch) | |
| tree | e0ae1b75eb8631ac33349d0a4a354f2a796c76cf /weed/messaging/broker/subscription.go | |
| parent | f8aed8a7f535aa142d0ae3c4f1b8ab52f45ad84a (diff) | |
| download | seaweedfs-e02a8c67daade10a305834e1c19c0c73905ce1c6.tar.xz seaweedfs-e02a8c67daade10a305834e1c19c0c73905ce1c6.zip | |
revert to one subscriber one thread
Diffstat (limited to 'weed/messaging/broker/subscription.go')
| -rw-r--r-- | weed/messaging/broker/subscription.go | 82 |
1 files changed, 0 insertions, 82 deletions
diff --git a/weed/messaging/broker/subscription.go b/weed/messaging/broker/subscription.go deleted file mode 100644 index f74b0c546..000000000 --- a/weed/messaging/broker/subscription.go +++ /dev/null @@ -1,82 +0,0 @@ -package broker - -import ( - "sync" -) - -type TopicPartitionSubscription struct { - sync.Mutex - name string - lastReadTsNs int64 - cond *sync.Cond -} - -func NewTopicPartitionSubscription(name string) *TopicPartitionSubscription { - t := &TopicPartitionSubscription{ - name: name, - } - t.cond = sync.NewCond(t) - return t -} - -func (s *TopicPartitionSubscription) Wait() { - s.Mutex.Lock() - s.cond.Wait() - s.Mutex.Unlock() -} - -func (s *TopicPartitionSubscription) NotifyOne() { - // notify one waiting goroutine - s.cond.Signal() -} - -type TopicPartitionSubscriptions struct { - sync.Mutex - cond *sync.Cond - subscriptions map[string]*TopicPartitionSubscription - subscriptionsLock sync.RWMutex -} - -func NewTopicPartitionSubscriptions() *TopicPartitionSubscriptions { - m := &TopicPartitionSubscriptions{ - subscriptions: make(map[string]*TopicPartitionSubscription), - } - m.cond = sync.NewCond(m) - return m -} - -func (m *TopicPartitionSubscriptions) AddSubscription(subscription string) *TopicPartitionSubscription { - m.subscriptionsLock.Lock() - defer m.subscriptionsLock.Unlock() - - if s, found := m.subscriptions[subscription]; found { - return s - } - - s := NewTopicPartitionSubscription(subscription) - m.subscriptions[subscription] = s - - return s - -} - -func (m *TopicPartitionSubscriptions) NotifyAll() { - - m.subscriptionsLock.RLock() - defer m.subscriptionsLock.RUnlock() - - for name, tps := range m.subscriptions { - println("notifying", name) - tps.NotifyOne() - } - -} - -func (m *TopicPartitionSubscriptions) Wait() { - m.Mutex.Lock() - m.cond.Wait() - for _, tps := range m.subscriptions { - tps.NotifyOne() - } - m.Mutex.Unlock() -} |
