diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-05-12 21:26:02 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-05-12 21:26:02 -0700 |
| commit | a7959c1c488f5662dc2f867e0dbeeebf3899bbe3 (patch) | |
| tree | 0716f575b29cc41f9e433f355052c092895beeb6 /weed/messaging/broker/subscription.go | |
| parent | 2f243f5b0b7c037eb13ae14f58bd1f8608fbc4d0 (diff) | |
| download | seaweedfs-a7959c1c488f5662dc2f867e0dbeeebf3899bbe3.tar.xz seaweedfs-a7959c1c488f5662dc2f867e0dbeeebf3899bbe3.zip | |
multiple subscriber with same subscriberId shares the topic manager
rename topicControl to topicCursor
Diffstat (limited to 'weed/messaging/broker/subscription.go')
| -rw-r--r-- | weed/messaging/broker/subscription.go | 82 |
1 files changed, 82 insertions, 0 deletions
diff --git a/weed/messaging/broker/subscription.go b/weed/messaging/broker/subscription.go new file mode 100644 index 000000000..f74b0c546 --- /dev/null +++ b/weed/messaging/broker/subscription.go @@ -0,0 +1,82 @@ +package broker + +import ( + "sync" +) + +type TopicPartitionSubscription struct { + sync.Mutex + name string + lastReadTsNs int64 + cond *sync.Cond +} + +func NewTopicPartitionSubscription(name string) *TopicPartitionSubscription { + t := &TopicPartitionSubscription{ + name: name, + } + t.cond = sync.NewCond(t) + return t +} + +func (s *TopicPartitionSubscription) Wait() { + s.Mutex.Lock() + s.cond.Wait() + s.Mutex.Unlock() +} + +func (s *TopicPartitionSubscription) NotifyOne() { + // notify one waiting goroutine + s.cond.Signal() +} + +type TopicPartitionSubscriptions struct { + sync.Mutex + cond *sync.Cond + subscriptions map[string]*TopicPartitionSubscription + subscriptionsLock sync.RWMutex +} + +func NewTopicPartitionSubscriptions() *TopicPartitionSubscriptions { + m := &TopicPartitionSubscriptions{ + subscriptions: make(map[string]*TopicPartitionSubscription), + } + m.cond = sync.NewCond(m) + return m +} + +func (m *TopicPartitionSubscriptions) AddSubscription(subscription string) *TopicPartitionSubscription { + m.subscriptionsLock.Lock() + defer m.subscriptionsLock.Unlock() + + if s, found := m.subscriptions[subscription]; found { + return s + } + + s := NewTopicPartitionSubscription(subscription) + m.subscriptions[subscription] = s + + return s + +} + +func (m *TopicPartitionSubscriptions) NotifyAll() { + + m.subscriptionsLock.RLock() + defer m.subscriptionsLock.RUnlock() + + for name, tps := range m.subscriptions { + println("notifying", name) + tps.NotifyOne() + } + +} + +func (m *TopicPartitionSubscriptions) Wait() { + m.Mutex.Lock() + m.cond.Wait() + for _, tps := range m.subscriptions { + tps.NotifyOne() + } + m.Mutex.Unlock() +} |
