diff options
Diffstat (limited to 'weed/mq/topic/local_topic.go')
| -rw-r--r-- | weed/mq/topic/local_topic.go | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/weed/mq/topic/local_topic.go b/weed/mq/topic/local_topic.go index ef3c0e65e..7825d2168 100644 --- a/weed/mq/topic/local_topic.go +++ b/weed/mq/topic/local_topic.go @@ -1,10 +1,19 @@ package topic +import "sync" + type LocalTopic struct { Topic Partitions []*LocalPartition } +func NewLocalTopic(topic Topic) *LocalTopic { + return &LocalTopic{ + Topic: topic, + Partitions: make([]*LocalPartition, 0), + } +} + func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition { for _, localPartition := range localTopic.Partitions { if localPartition.Partition.Equals(partition) { @@ -27,3 +36,52 @@ func (localTopic *LocalTopic) removePartition(partition Partition) bool { localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...) return true } + +func (localTopic *LocalTopic) closePartitionPublishers(unixTsNs int64) bool { + var wg sync.WaitGroup + for _, localPartition := range localTopic.Partitions { + if localPartition.UnixTimeNs != unixTsNs { + continue + } + wg.Add(1) + go func(localPartition *LocalPartition) { + defer wg.Done() + localPartition.closePublishers() + }(localPartition) + } + wg.Wait() + return true +} + +func (localTopic *LocalTopic) closePartitionSubscribers(unixTsNs int64) bool { + var wg sync.WaitGroup + for _, localPartition := range localTopic.Partitions { + if localPartition.UnixTimeNs != unixTsNs { + continue + } + wg.Add(1) + go func(localPartition *LocalPartition) { + defer wg.Done() + localPartition.closeSubscribers() + }(localPartition) + } + wg.Wait() + return true +} + +func (localTopic *LocalTopic) WaitUntilNoPublishers() { + for { + var wg sync.WaitGroup + for _, localPartition := range localTopic.Partitions { + wg.Add(1) + go func(localPartition *LocalPartition) { + defer wg.Done() + localPartition.WaitUntilNoPublishers() + }(localPartition) + } + wg.Wait() + if len(localTopic.Partitions) == 0 { + return + } + } +} |
