diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_assign.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_assign.go | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go index ee69db30d..48ec0d5bd 100644 --- a/weed/mq/broker/broker_grpc_assign.go +++ b/weed/mq/broker/broker_grpc_assign.go @@ -36,7 +36,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m // if is leader, notify the followers to drain existing topic partition subscriptions if request.IsLeader { for _, brokerPartition := range request.BrokerPartitionAssignments { - for _, follower := range brokerPartition.FollowerBrokers { + if follower := brokerPartition.FollowerBroker; follower != "" { err := pb.WithBrokerGrpcClient(false, follower, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { _, err := client.AssignTopicPartitions(context.Background(), request) return err @@ -79,11 +79,11 @@ func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr) } } - brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker) + brokerStats, found := b.PubBalancer.Brokers.Get(bpa.LeaderBroker) if !found { brokerStats = pub_balancer.NewBrokerStats() - if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) { - brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker) + if !b.PubBalancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) { + brokerStats, _ = b.PubBalancer.Brokers.Get(bpa.LeaderBroker) } } brokerStats.RegisterAssignment(t, bpa.Partition, isAdd) |
