diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_configure.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_configure.go | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 008c08bbe..8d3727b1f 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -82,18 +82,20 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m // drain existing topic partition subscriptions for _, assignment := range request.BrokerPartitionAssignments { - localPartition := topic.FromPbBrokerPartitionAssignment(self, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition), b.genLogOnDiskReadFunc(request.Topic, assignment.Partition)) + t := topic.FromPbTopic(request.Topic) + partition := topic.FromPbPartition(assignment.Partition) + b.accessLock.Lock() if request.IsDraining { // TODO drain existing topic partition subscriptions - - b.localTopicManager.RemoveTopicPartition( - topic.FromPbTopic(request.Topic), - localPartition.Partition) + b.localTopicManager.RemoveTopicPartition(t, partition) } else { - b.localTopicManager.AddTopicPartition( - topic.FromPbTopic(request.Topic), - localPartition) + var localPartition *topic.LocalPartition + if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil { + localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition), b.genLogOnDiskReadFunc(request.Topic, assignment.Partition)) + b.localTopicManager.AddTopicPartition(t, localPartition) + } } + b.accessLock.Unlock() } // if is leader, notify the followers to drain existing topic partition subscriptions |
