aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-21 09:57:45 -0700
committerchrislu <chris.lu@gmail.com>2024-05-21 09:57:45 -0700
commit67e6051585875736f24bd3d509549db775c3d44f (patch)
tree919e39eaa270165ad2f21114f38847ca77d6b2ea
parentb5099263a4310da1cfa7383c38b7bb8622a9869b (diff)
downloadseaweedfs-67e6051585875736f24bd3d509549db775c3d44f.tar.xz
seaweedfs-67e6051585875736f24bd3d509549db775c3d44f.zip
rename Coordinator to SubCoordinator
-rw-r--r--weed/mq/broker/broker_server.go2
-rw-r--r--weed/mq/sub_coordinator/coordinator.go22
2 files changed, 12 insertions, 12 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index e381fa84c..2e449083a 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -45,7 +45,7 @@ type MessageQueueBroker struct {
localTopicManager *topic.LocalTopicManager
Balancer *pub_balancer.PubBalancer
lockAsBalancer *cluster.LiveLock
- Coordinator *sub_coordinator.Coordinator
+ Coordinator *sub_coordinator.SubCoordinator
accessLock sync.Mutex
fca *sub_coordinator.FilerClientAccessor
}
diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go
index 6ab7166d4..f78e8c849 100644
--- a/weed/mq/sub_coordinator/coordinator.go
+++ b/weed/mq/sub_coordinator/coordinator.go
@@ -11,25 +11,25 @@ type TopicConsumerGroups struct {
ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
}
-// Coordinator coordinates the instances in the consumer group for one topic.
+// SubCoordinator coordinates the instances in the consumer group for one topic.
// It is responsible for:
// 1. (Maybe) assigning partitions when a consumer instance is up/down.
-type Coordinator struct {
+type SubCoordinator struct {
// map topic name to consumer groups
TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
balancer *pub_balancer.PubBalancer
FilerClientAccessor *FilerClientAccessor
}
-func NewCoordinator(balancer *pub_balancer.PubBalancer) *Coordinator {
- return &Coordinator{
+func NewCoordinator(balancer *pub_balancer.PubBalancer) *SubCoordinator {
+ return &SubCoordinator{
TopicSubscribers: cmap.New[*TopicConsumerGroups](),
balancer: balancer,
}
}
-func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups {
+func (c *SubCoordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups {
topicName := toTopicName(topic)
tcg, _ := c.TopicSubscribers.Get(topicName)
if tcg == nil && createIfMissing {
@@ -42,7 +42,7 @@ func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing
}
return tcg
}
-func (c *Coordinator) RemoveTopic(topic *mq_pb.Topic) {
+func (c *SubCoordinator) RemoveTopic(topic *mq_pb.Topic) {
topicName := toTopicName(topic)
c.TopicSubscribers.Remove(topicName)
}
@@ -52,7 +52,7 @@ func toTopicName(topic *mq_pb.Topic) string {
return topicName
}
-func (c *Coordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) *ConsumerGroupInstance {
+func (c *SubCoordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) *ConsumerGroupInstance {
tcg := c.GetTopicConsumerGroups(initMessage.Topic, true)
cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
if cg == nil {
@@ -73,7 +73,7 @@ func (c *Coordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinato
return cgi
}
-func (c *Coordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) {
+func (c *SubCoordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) {
tcg := c.GetTopicConsumerGroups(initMessage.Topic, false)
if tcg == nil {
return
@@ -92,7 +92,7 @@ func (c *Coordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordin
}
}
-func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) {
+func (c *SubCoordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) {
tcg, _ := c.TopicSubscribers.Get(toTopicName(topic))
if tcg == nil {
return
@@ -103,11 +103,11 @@ func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb
}
// OnSubAddBroker is called when a broker is added to the balancer
-func (c *Coordinator) OnSubAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
+func (c *SubCoordinator) OnSubAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
}
// OnSubRemoveBroker is called when a broker is removed from the balancer
-func (c *Coordinator) OnSubRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
+func (c *SubCoordinator) OnSubRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
}