diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_assign.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_assign.go | 9 |
1 files changed, 4 insertions, 5 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go index 264565b7b..ee69db30d 100644 --- a/weed/mq/broker/broker_grpc_assign.go +++ b/weed/mq/broker/broker_grpc_assign.go @@ -14,7 +14,6 @@ import ( // AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) { ret := &mq_pb.AssignTopicPartitionsResponse{} - self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port)) // drain existing topic partition subscriptions for _, assignment := range request.BrokerPartitionAssignments { @@ -23,12 +22,12 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m b.accessLock.Lock() if request.IsDraining { // TODO drain existing topic partition subscriptions - b.localTopicManager.RemoveTopicPartition(t, partition) + b.localTopicManager.RemoveLocalPartition(t, partition) } else { var localPartition *topic.LocalPartition - if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil { - localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) - b.localTopicManager.AddTopicPartition(t, localPartition) + if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil { + localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) + b.localTopicManager.AddLocalPartition(t, localPartition) } } b.accessLock.Unlock() |
