aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_configure.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_configure.go')
-rw-r--r--weed/mq/broker/broker_grpc_configure.go18
1 files changed, 10 insertions, 8 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index 008c08bbe..8d3727b1f 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -82,18 +82,20 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
// drain existing topic partition subscriptions
for _, assignment := range request.BrokerPartitionAssignments {
- localPartition := topic.FromPbBrokerPartitionAssignment(self, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition), b.genLogOnDiskReadFunc(request.Topic, assignment.Partition))
+ t := topic.FromPbTopic(request.Topic)
+ partition := topic.FromPbPartition(assignment.Partition)
+ b.accessLock.Lock()
if request.IsDraining {
// TODO drain existing topic partition subscriptions
-
- b.localTopicManager.RemoveTopicPartition(
- topic.FromPbTopic(request.Topic),
- localPartition.Partition)
+ b.localTopicManager.RemoveTopicPartition(t, partition)
} else {
- b.localTopicManager.AddTopicPartition(
- topic.FromPbTopic(request.Topic),
- localPartition)
+ var localPartition *topic.LocalPartition
+ if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
+ localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(request.Topic, assignment.Partition), b.genLogOnDiskReadFunc(request.Topic, assignment.Partition))
+ b.localTopicManager.AddTopicPartition(t, localPartition)
+ }
}
+ b.accessLock.Unlock()
}
// if is leader, notify the followers to drain existing topic partition subscriptions