diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-20 12:16:40 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-20 12:16:40 -0800 |
| commit | 34839237ab18256c3dba0ae9638ca24c38f79478 (patch) | |
| tree | 87ef5bfd807f2c5846f06ec79d55e50b6f546bbe | |
| parent | 2828ccbb30081ae260428a2aa42e50c724e55e6c (diff) | |
| download | seaweedfs-34839237ab18256c3dba0ae9638ca24c38f79478.tar.xz seaweedfs-34839237ab18256c3dba0ae9638ca24c38f79478.zip | |
refactor
| -rw-r--r-- | weed/mq/broker/broker_server.go | 4 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/balancer.go | 10 | ||||
| -rw-r--r-- | weed/mq/sub_coordinator/coordinator.go | 6 |
3 files changed, 16 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 31a6e10ef..f41ec87ca 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -67,8 +67,8 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial } mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate) pub_broker_balancer.OnPartitionChange = mqBroker.Coordinator.OnPartitionChange - pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnAddBroker - pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnRemoveBroker + pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnSubAddBroker + pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnSubRemoveBroker go mqBroker.MasterClient.KeepConnectedToMaster() diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go index d44c68720..cd9583016 100644 --- a/weed/mq/pub_balancer/balancer.go +++ b/weed/mq/pub_balancer/balancer.go @@ -53,6 +53,7 @@ func (balancer *Balancer) AddBroker(broker string) (brokerStats *BrokerStats) { brokerStats, _ = balancer.Brokers.Get(broker) } } + balancer.onPubAddBroker(broker, brokerStats) balancer.OnAddBroker(broker, brokerStats) return brokerStats } @@ -68,6 +69,7 @@ func (balancer *Balancer) RemoveBroker(broker string, stats *BrokerStats) { } partitionSlotToBrokerList.RemoveBroker(broker) } + balancer.onPubRemoveBroker(broker, stats) balancer.OnRemoveBroker(broker, stats) } @@ -88,3 +90,11 @@ func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *Broke partitionSlotToBrokerList.AddBroker(partition, broker) } } + +// OnPubAddBroker is called when a broker is added for a publisher coordinator +func (balancer *Balancer) onPubAddBroker(broker string, brokerStats *BrokerStats) { +} + +// OnPubRemoveBroker is called when a broker is removed for a publisher coordinator +func (balancer *Balancer) onPubRemoveBroker(broker string, brokerStats *BrokerStats) { +} diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go index 1ad539cfb..5a4474076 100644 --- a/weed/mq/sub_coordinator/coordinator.go +++ b/weed/mq/sub_coordinator/coordinator.go @@ -100,10 +100,12 @@ func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb } } -func (c *Coordinator) OnAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) { +// OnSubAddBroker is called when a broker is added to the balancer +func (c *Coordinator) OnSubAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) { } -func (c *Coordinator) OnRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) { +// OnSubRemoveBroker is called when a broker is removed from the balancer +func (c *Coordinator) OnSubRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) { } |
