diff options
Diffstat (limited to 'weed/mq/topic/local_partition_publishers.go')
| -rw-r--r-- | weed/mq/topic/local_partition_publishers.go | 52 |
1 files changed, 52 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition_publishers.go b/weed/mq/topic/local_partition_publishers.go new file mode 100644 index 000000000..367ccce5f --- /dev/null +++ b/weed/mq/topic/local_partition_publishers.go @@ -0,0 +1,52 @@ +package topic + +import "sync" + +type LocalPartitionPublishers struct { + publishers map[string]*LocalPublisher + publishersLock sync.RWMutex +} +type LocalPublisher struct { +} + +func NewLocalPublisher() *LocalPublisher { + return &LocalPublisher{} +} +func (p *LocalPublisher) SignalShutdown() { +} + +func NewLocalPartitionPublishers() *LocalPartitionPublishers { + return &LocalPartitionPublishers{ + publishers: make(map[string]*LocalPublisher), + } +} + +func (p *LocalPartitionPublishers) AddPublisher(clientName string, publisher *LocalPublisher) { + p.publishersLock.Lock() + defer p.publishersLock.Unlock() + + p.publishers[clientName] = publisher +} + +func (p *LocalPartitionPublishers) RemovePublisher(clientName string) { + p.publishersLock.Lock() + defer p.publishersLock.Unlock() + + delete(p.publishers, clientName) +} + +func (p *LocalPartitionPublishers) SignalShutdown() { + p.publishersLock.RLock() + defer p.publishersLock.RUnlock() + + for _, publisher := range p.publishers { + publisher.SignalShutdown() + } +} + +func (p *LocalPartitionPublishers) IsEmpty() bool { + p.publishersLock.RLock() + defer p.publishersLock.RUnlock() + + return len(p.publishers) == 0 +} |
