diff options
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 15 | ||||
| -rw-r--r-- | weed/mq/broker/broker_topic_conf_read_write.go | 15 |
2 files changed, 15 insertions, 15 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index e07c7b67f..6c03ba409 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -97,21 +97,6 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest }) } -func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) { - // get or generate a local partition - conf, readConfErr := b.readTopicConfFromFiler(t) - if readConfErr != nil { - glog.Errorf("topic %v not found: %v", t, readConfErr) - return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr) - } - localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf) - if getOrGenError != nil { - glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) - return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) - } - return localTopicPartition, nil -} - func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) { if offset.StartTsNs != 0 { startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index abe25a240..987c60243 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -56,6 +56,21 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb. return conf, nil } +func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) { + // get or generate a local partition + conf, readConfErr := b.readTopicConfFromFiler(t) + if readConfErr != nil { + glog.Errorf("topic %v not found: %v", t, readConfErr) + return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr) + } + localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf) + if getOrGenError != nil { + glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) + return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError) + } + return localTopicPartition, nil +} + func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) { b.accessLock.Lock() defer b.accessLock.Unlock() |
