aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-20 12:16:40 -0800
committerchrislu <chris.lu@gmail.com>2024-01-20 12:16:40 -0800
commit34839237ab18256c3dba0ae9638ca24c38f79478 (patch)
tree87ef5bfd807f2c5846f06ec79d55e50b6f546bbe
parent2828ccbb30081ae260428a2aa42e50c724e55e6c (diff)
downloadseaweedfs-34839237ab18256c3dba0ae9638ca24c38f79478.tar.xz
seaweedfs-34839237ab18256c3dba0ae9638ca24c38f79478.zip
refactor
-rw-r--r--weed/mq/broker/broker_server.go4
-rw-r--r--weed/mq/pub_balancer/balancer.go10
-rw-r--r--weed/mq/sub_coordinator/coordinator.go6
3 files changed, 16 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 31a6e10ef..f41ec87ca 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -67,8 +67,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
}
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange
- pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnAddBroker
- pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnRemoveBroker
+ pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnSubAddBroker
+ pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnSubRemoveBroker
go mqBroker.MasterClient.KeepConnectedToMaster()
diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go
index d44c68720..cd9583016 100644
--- a/weed/mq/pub_balancer/balancer.go
+++ b/weed/mq/pub_balancer/balancer.go
@@ -53,6 +53,7 @@ func (balancer *Balancer) AddBroker(broker string) (brokerStats *BrokerStats) {
brokerStats, _ = balancer.Brokers.Get(broker)
}
}
+ balancer.onPubAddBroker(broker, brokerStats)
balancer.OnAddBroker(broker, brokerStats)
return brokerStats
}
@@ -68,6 +69,7 @@ func (balancer *Balancer) RemoveBroker(broker string, stats *BrokerStats) {
}
partitionSlotToBrokerList.RemoveBroker(broker)
}
+ balancer.onPubRemoveBroker(broker, stats)
balancer.OnRemoveBroker(broker, stats)
}
@@ -88,3 +90,11 @@ func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *Broke
partitionSlotToBrokerList.AddBroker(partition, broker)
}
}
+
+// OnPubAddBroker is called when a broker is added for a publisher coordinator
+func (balancer *Balancer) onPubAddBroker(broker string, brokerStats *BrokerStats) {
+}
+
+// OnPubRemoveBroker is called when a broker is removed for a publisher coordinator
+func (balancer *Balancer) onPubRemoveBroker(broker string, brokerStats *BrokerStats) {
+}
diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go
index 1ad539cfb..5a4474076 100644
--- a/weed/mq/sub_coordinator/coordinator.go
+++ b/weed/mq/sub_coordinator/coordinator.go
@@ -100,10 +100,12 @@ func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb
}
}
-func (c *Coordinator) OnAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
+// OnSubAddBroker is called when a broker is added to the balancer
+func (c *Coordinator) OnSubAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
}
-func (c *Coordinator) OnRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
+// OnSubRemoveBroker is called when a broker is removed from the balancer
+func (c *Coordinator) OnSubRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
}