aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition_publishers.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_partition_publishers.go')
-rw-r--r--weed/mq/topic/local_partition_publishers.go52
1 files changed, 52 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition_publishers.go b/weed/mq/topic/local_partition_publishers.go
new file mode 100644
index 000000000..367ccce5f
--- /dev/null
+++ b/weed/mq/topic/local_partition_publishers.go
@@ -0,0 +1,52 @@
+package topic
+
+import "sync"
+
+type LocalPartitionPublishers struct {
+ publishers map[string]*LocalPublisher
+ publishersLock sync.RWMutex
+}
+type LocalPublisher struct {
+}
+
+func NewLocalPublisher() *LocalPublisher {
+ return &LocalPublisher{}
+}
+func (p *LocalPublisher) SignalShutdown() {
+}
+
+func NewLocalPartitionPublishers() *LocalPartitionPublishers {
+ return &LocalPartitionPublishers{
+ publishers: make(map[string]*LocalPublisher),
+ }
+}
+
+func (p *LocalPartitionPublishers) AddPublisher(clientName string, publisher *LocalPublisher) {
+ p.publishersLock.Lock()
+ defer p.publishersLock.Unlock()
+
+ p.publishers[clientName] = publisher
+}
+
+func (p *LocalPartitionPublishers) RemovePublisher(clientName string) {
+ p.publishersLock.Lock()
+ defer p.publishersLock.Unlock()
+
+ delete(p.publishers, clientName)
+}
+
+func (p *LocalPartitionPublishers) SignalShutdown() {
+ p.publishersLock.RLock()
+ defer p.publishersLock.RUnlock()
+
+ for _, publisher := range p.publishers {
+ publisher.SignalShutdown()
+ }
+}
+
+func (p *LocalPartitionPublishers) IsEmpty() bool {
+ p.publishersLock.RLock()
+ defer p.publishersLock.RUnlock()
+
+ return len(p.publishers) == 0
+}