aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_assign.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_assign.go')
-rw-r--r--weed/mq/broker/broker_grpc_assign.go10
1 files changed, 8 insertions, 2 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go
index 48ec0d5bd..99fe88acd 100644
--- a/weed/mq/broker/broker_grpc_assign.go
+++ b/weed/mq/broker/broker_grpc_assign.go
@@ -15,9 +15,15 @@ import (
func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
ret := &mq_pb.AssignTopicPartitionsResponse{}
+ t := topic.FromPbTopic(request.Topic)
+ conf, readConfErr := b.fca.ReadTopicConfFromFiler(t)
+ if readConfErr != nil {
+ glog.Errorf("topic %v not found: %v", t, readConfErr)
+ return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr)
+ }
+
// drain existing topic partition subscriptions
for _, assignment := range request.BrokerPartitionAssignments {
- t := topic.FromPbTopic(request.Topic)
partition := topic.FromPbPartition(assignment.Partition)
b.accessLock.Lock()
if request.IsDraining {
@@ -26,7 +32,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
} else {
var localPartition *topic.LocalPartition
if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil {
- localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
+ localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition), conf.RecordType)
b.localTopicManager.AddLocalPartition(t, localPartition)
}
}