diff options
Diffstat (limited to 'weed/mq/topic/local_partition_subscribers.go')
| -rw-r--r-- | weed/mq/topic/local_partition_subscribers.go | 49 |
1 files changed, 49 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition_subscribers.go b/weed/mq/topic/local_partition_subscribers.go new file mode 100644 index 000000000..e177ec7e8 --- /dev/null +++ b/weed/mq/topic/local_partition_subscribers.go @@ -0,0 +1,49 @@ +package topic + +import "sync" + +type LocalPartitionSubscribers struct { + Subscribers map[string]*LocalSubscriber + SubscribersLock sync.RWMutex +} +type LocalSubscriber struct { + stopCh chan struct{} +} + +func NewLocalSubscriber() *LocalSubscriber { + return &LocalSubscriber{ + stopCh: make(chan struct{}, 1), + } +} +func (p *LocalSubscriber) SignalShutdown() { + close(p.stopCh) +} + +func NewLocalPartitionSubscribers() *LocalPartitionSubscribers { + return &LocalPartitionSubscribers{ + Subscribers: make(map[string]*LocalSubscriber), + } +} + +func (p *LocalPartitionSubscribers) AddSubscriber(clientName string, Subscriber *LocalSubscriber) { + p.SubscribersLock.Lock() + defer p.SubscribersLock.Unlock() + + p.Subscribers[clientName] = Subscriber +} + +func (p *LocalPartitionSubscribers) RemoveSubscriber(clientName string) { + p.SubscribersLock.Lock() + defer p.SubscribersLock.Unlock() + + delete(p.Subscribers, clientName) +} + +func (p *LocalPartitionSubscribers) SignalShutdown() { + p.SubscribersLock.RLock() + defer p.SubscribersLock.RUnlock() + + for _, Subscriber := range p.Subscribers { + Subscriber.SignalShutdown() + } +} |
