aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-13 18:25:38 -0700
committerchrislu <chris.lu@gmail.com>2024-05-13 18:25:38 -0700
commit2142842f8248e841e900ee75783726cbd473ecf0 (patch)
tree6514f2cdffbae83d1b2aaa959589a04bb76fe59b
parent8cc5298a08db7c7d21d6bb657bab2277e143500d (diff)
downloadseaweedfs-2142842f8248e841e900ee75783726cbd473ecf0.tar.xz
seaweedfs-2142842f8248e841e900ee75783726cbd473ecf0.zip
refactor
-rw-r--r--weed/mq/broker/broker_grpc_sub_coordinator.go4
-rw-r--r--weed/mq/sub_coordinator/coordinator.go36
2 files changed, 20 insertions, 20 deletions
diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go
index 3fd97f1c2..6925baa9e 100644
--- a/weed/mq/broker/broker_grpc_sub_coordinator.go
+++ b/weed/mq/broker/broker_grpc_sub_coordinator.go
@@ -24,13 +24,13 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
// process init message
initMessage := req.GetInit()
if initMessage != nil {
- cgi = b.Coordinator.AddSubscriber(initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
+ cgi = b.Coordinator.AddSubscriber(initMessage)
glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
} else {
return status.Errorf(codes.InvalidArgument, "subscriber init message is empty")
}
defer func() {
- b.Coordinator.RemoveSubscriber(initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic)
+ b.Coordinator.RemoveSubscriber(initMessage)
glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerGroupInstanceId, initMessage.Topic, err)
}()
diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go
index bb50991ab..d128310d1 100644
--- a/weed/mq/sub_coordinator/coordinator.go
+++ b/weed/mq/sub_coordinator/coordinator.go
@@ -51,42 +51,42 @@ func toTopicName(topic *mq_pb.Topic) string {
return topicName
}
-func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance {
- tcg := c.GetTopicConsumerGroups(topic, true)
- cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
+func (c *Coordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) *ConsumerGroupInstance {
+ tcg := c.GetTopicConsumerGroups(initMessage.Topic, true)
+ cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
if cg == nil {
- cg = NewConsumerGroup(topic, c.balancer)
- if !tcg.ConsumerGroups.SetIfAbsent(consumerGroup, cg) {
- cg, _ = tcg.ConsumerGroups.Get(consumerGroup)
+ cg = NewConsumerGroup(initMessage.Topic, c.balancer)
+ if !tcg.ConsumerGroups.SetIfAbsent(initMessage.ConsumerGroup, cg) {
+ cg, _ = tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
}
}
- cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance)
+ cgi, _ := cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId)
if cgi == nil {
- cgi = NewConsumerGroupInstance(consumerGroupInstance)
- if !cg.ConsumerGroupInstances.SetIfAbsent(consumerGroupInstance, cgi) {
- cgi, _ = cg.ConsumerGroupInstances.Get(consumerGroupInstance)
+ cgi = NewConsumerGroupInstance(initMessage.ConsumerGroupInstanceId)
+ if !cg.ConsumerGroupInstances.SetIfAbsent(initMessage.ConsumerGroupInstanceId, cgi) {
+ cgi, _ = cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId)
}
}
- cg.OnAddConsumerGroupInstance(consumerGroupInstance, topic)
+ cg.OnAddConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic)
return cgi
}
-func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) {
- tcg := c.GetTopicConsumerGroups(topic, false)
+func (c *Coordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) {
+ tcg := c.GetTopicConsumerGroups(initMessage.Topic, false)
if tcg == nil {
return
}
- cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
+ cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
if cg == nil {
return
}
- cg.ConsumerGroupInstances.Remove(consumerGroupInstance)
- cg.OnRemoveConsumerGroupInstance(consumerGroupInstance, topic)
+ cg.ConsumerGroupInstances.Remove(initMessage.ConsumerGroupInstanceId)
+ cg.OnRemoveConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.Topic)
if cg.ConsumerGroupInstances.Count() == 0 {
- tcg.ConsumerGroups.Remove(consumerGroup)
+ tcg.ConsumerGroups.Remove(initMessage.ConsumerGroup)
}
if tcg.ConsumerGroups.Count() == 0 {
- c.RemoveTopic(topic)
+ c.RemoveTopic(initMessage.Topic)
}
}