aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_manager.go
blob: 6e7db5d08459b17e8a6b57b2238f6a3591204a2b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package topic

import (
	cmap "github.com/orcaman/concurrent-map/v2"
)

// LocalTopicManager manages topics on local broker
type LocalTopicManager struct {
	topics cmap.ConcurrentMap[string, *LocalTopic]
}

// NewLocalTopicManager creates a new LocalTopicManager
func NewLocalTopicManager() *LocalTopicManager {
	return &LocalTopicManager{
		topics: cmap.New[*LocalTopic](),
	}
}

// AddTopic adds a topic to the local topic manager
func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) {
	localTopic, ok := manager.topics.Get(topic.String())
	if !ok {
		localTopic = &LocalTopic{
			Topic:      topic,
			Partitions: make([]*LocalPartition, 0),
		}
	}
	manager.topics.SetIfAbsent(topic.String(), localTopic)
	if localTopic.findPartition(localPartition.Partition) != nil {
		return
	}
	localTopic.Partitions = append(localTopic.Partitions, localPartition)
}

// GetTopic gets a topic from the local topic manager
func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition {
	localTopic, ok := manager.topics.Get(topic.String())
	if !ok {
		return nil
	}
	return localTopic.findPartition(partition)
}

// RemoveTopic removes a topic from the local topic manager
func (manager *LocalTopicManager) RemoveTopic(topic Topic) {
	manager.topics.Remove(topic.String())
}

func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) {
	localTopic, ok := manager.topics.Get(topic.String())
	if !ok {
		return false
	}
	return localTopic.removePartition(partition)
}