diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_admin.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_admin.go | 20 |
1 files changed, 10 insertions, 10 deletions
diff --git a/weed/mq/broker/broker_grpc_admin.go b/weed/mq/broker/broker_grpc_admin.go index 5c9dc726e..7337ba23e 100644 --- a/weed/mq/broker/broker_grpc_admin.go +++ b/weed/mq/broker/broker_grpc_admin.go @@ -90,36 +90,36 @@ func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq // createOrUpdateTopicPartitions creates the topic partitions on the broker // 1. check -func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment) (err error) { +func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignments []*mq_pb.BrokerPartitionAssignment) (err error) { // create or update each partition - if prevAssignment == nil { + if prevAssignments == nil { broker.createOrUpdateTopicPartition(topic, nil) } else { - for _, partitionAssignment := range prevAssignment.BrokerPartitions { - broker.createOrUpdateTopicPartition(topic, partitionAssignment) + for _, brokerPartitionAssignment := range prevAssignments { + broker.createOrUpdateTopicPartition(topic, brokerPartitionAssignment) } } return nil } -func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (newAssignment *mq_pb.BrokerPartitionsAssignment) { +func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (newAssignment *mq_pb.BrokerPartitionAssignment) { shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment) if !shouldCreate { } return } -func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (shouldCreate bool) { +func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (shouldCreate bool) { if oldAssignment == nil { return true } for _, b := range oldAssignment.FollowerBrokers { pb.WithBrokerGrpcClient(false, b, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { _, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{ - Namespace: string(topic.Namespace), - Topic: topic.Name, - BrokerPartitionsAssignment: oldAssignment, - ShouldCancelIfNotMatch: true, + Namespace: string(topic.Namespace), + Topic: topic.Name, + BrokerPartitionAssignment: oldAssignment, + ShouldCancelIfNotMatch: true, }) if err != nil { shouldCreate = true |
