diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_assign.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_assign.go | 10 |
1 files changed, 8 insertions, 2 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go index 48ec0d5bd..99fe88acd 100644 --- a/weed/mq/broker/broker_grpc_assign.go +++ b/weed/mq/broker/broker_grpc_assign.go @@ -15,9 +15,15 @@ import ( func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) { ret := &mq_pb.AssignTopicPartitionsResponse{} + t := topic.FromPbTopic(request.Topic) + conf, readConfErr := b.fca.ReadTopicConfFromFiler(t) + if readConfErr != nil { + glog.Errorf("topic %v not found: %v", t, readConfErr) + return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr) + } + // drain existing topic partition subscriptions for _, assignment := range request.BrokerPartitionAssignments { - t := topic.FromPbTopic(request.Topic) partition := topic.FromPbPartition(assignment.Partition) b.accessLock.Lock() if request.IsDraining { @@ -26,7 +32,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m } else { var localPartition *topic.LocalPartition if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil { - localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition)) + localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition), conf.RecordType) b.localTopicManager.AddLocalPartition(t, localPartition) } } |
