aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition_subscribers.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_partition_subscribers.go')
-rw-r--r--weed/mq/topic/local_partition_subscribers.go49
1 files changed, 49 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition_subscribers.go b/weed/mq/topic/local_partition_subscribers.go
new file mode 100644
index 000000000..e177ec7e8
--- /dev/null
+++ b/weed/mq/topic/local_partition_subscribers.go
@@ -0,0 +1,49 @@
+package topic
+
+import "sync"
+
+type LocalPartitionSubscribers struct {
+ Subscribers map[string]*LocalSubscriber
+ SubscribersLock sync.RWMutex
+}
+type LocalSubscriber struct {
+ stopCh chan struct{}
+}
+
+func NewLocalSubscriber() *LocalSubscriber {
+ return &LocalSubscriber{
+ stopCh: make(chan struct{}, 1),
+ }
+}
+func (p *LocalSubscriber) SignalShutdown() {
+ close(p.stopCh)
+}
+
+func NewLocalPartitionSubscribers() *LocalPartitionSubscribers {
+ return &LocalPartitionSubscribers{
+ Subscribers: make(map[string]*LocalSubscriber),
+ }
+}
+
+func (p *LocalPartitionSubscribers) AddSubscriber(clientName string, Subscriber *LocalSubscriber) {
+ p.SubscribersLock.Lock()
+ defer p.SubscribersLock.Unlock()
+
+ p.Subscribers[clientName] = Subscriber
+}
+
+func (p *LocalPartitionSubscribers) RemoveSubscriber(clientName string) {
+ p.SubscribersLock.Lock()
+ defer p.SubscribersLock.Unlock()
+
+ delete(p.Subscribers, clientName)
+}
+
+func (p *LocalPartitionSubscribers) SignalShutdown() {
+ p.SubscribersLock.RLock()
+ defer p.SubscribersLock.RUnlock()
+
+ for _, Subscriber := range p.Subscribers {
+ Subscriber.SignalShutdown()
+ }
+}