aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_assign.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_assign.go')
-rw-r--r--weed/mq/broker/broker_grpc_assign.go8
1 files changed, 4 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go
index ee69db30d..48ec0d5bd 100644
--- a/weed/mq/broker/broker_grpc_assign.go
+++ b/weed/mq/broker/broker_grpc_assign.go
@@ -36,7 +36,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
// if is leader, notify the followers to drain existing topic partition subscriptions
if request.IsLeader {
for _, brokerPartition := range request.BrokerPartitionAssignments {
- for _, follower := range brokerPartition.FollowerBrokers {
+ if follower := brokerPartition.FollowerBroker; follower != "" {
err := pb.WithBrokerGrpcClient(false, follower, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.AssignTopicPartitions(context.Background(), request)
return err
@@ -79,11 +79,11 @@ func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context,
return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
}
}
- brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
+ brokerStats, found := b.PubBalancer.Brokers.Get(bpa.LeaderBroker)
if !found {
brokerStats = pub_balancer.NewBrokerStats()
- if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
- brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
+ if !b.PubBalancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
+ brokerStats, _ = b.PubBalancer.Brokers.Get(bpa.LeaderBroker)
}
}
brokerStats.RegisterAssignment(t, bpa.Partition, isAdd)