aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_assign.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_assign.go')
-rw-r--r--weed/mq/broker/broker_grpc_assign.go9
1 files changed, 4 insertions, 5 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go
index 264565b7b..ee69db30d 100644
--- a/weed/mq/broker/broker_grpc_assign.go
+++ b/weed/mq/broker/broker_grpc_assign.go
@@ -14,7 +14,6 @@ import (
// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
ret := &mq_pb.AssignTopicPartitionsResponse{}
- self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port))
// drain existing topic partition subscriptions
for _, assignment := range request.BrokerPartitionAssignments {
@@ -23,12 +22,12 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
b.accessLock.Lock()
if request.IsDraining {
// TODO drain existing topic partition subscriptions
- b.localTopicManager.RemoveTopicPartition(t, partition)
+ b.localTopicManager.RemoveLocalPartition(t, partition)
} else {
var localPartition *topic.LocalPartition
- if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
- localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
- b.localTopicManager.AddTopicPartition(t, localPartition)
+ if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil {
+ localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
+ b.localTopicManager.AddLocalPartition(t, localPartition)
}
}
b.accessLock.Unlock()