aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_admin.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_admin.go')
-rw-r--r--weed/mq/broker/broker_grpc_admin.go20
1 files changed, 10 insertions, 10 deletions
diff --git a/weed/mq/broker/broker_grpc_admin.go b/weed/mq/broker/broker_grpc_admin.go
index 5c9dc726e..7337ba23e 100644
--- a/weed/mq/broker/broker_grpc_admin.go
+++ b/weed/mq/broker/broker_grpc_admin.go
@@ -90,36 +90,36 @@ func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq
// createOrUpdateTopicPartitions creates the topic partitions on the broker
// 1. check
-func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment) (err error) {
+func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignments []*mq_pb.BrokerPartitionAssignment) (err error) {
// create or update each partition
- if prevAssignment == nil {
+ if prevAssignments == nil {
broker.createOrUpdateTopicPartition(topic, nil)
} else {
- for _, partitionAssignment := range prevAssignment.BrokerPartitions {
- broker.createOrUpdateTopicPartition(topic, partitionAssignment)
+ for _, brokerPartitionAssignment := range prevAssignments {
+ broker.createOrUpdateTopicPartition(topic, brokerPartitionAssignment)
}
}
return nil
}
-func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (newAssignment *mq_pb.BrokerPartitionsAssignment) {
+func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (newAssignment *mq_pb.BrokerPartitionAssignment) {
shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment)
if !shouldCreate {
}
return
}
-func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (shouldCreate bool) {
+func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (shouldCreate bool) {
if oldAssignment == nil {
return true
}
for _, b := range oldAssignment.FollowerBrokers {
pb.WithBrokerGrpcClient(false, b, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{
- Namespace: string(topic.Namespace),
- Topic: topic.Name,
- BrokerPartitionsAssignment: oldAssignment,
- ShouldCancelIfNotMatch: true,
+ Namespace: string(topic.Namespace),
+ Topic: topic.Name,
+ BrokerPartitionAssignment: oldAssignment,
+ ShouldCancelIfNotMatch: true,
})
if err != nil {
shouldCreate = true