aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-24 13:07:51 -0700
committerchrislu <chris.lu@gmail.com>2024-03-24 13:07:51 -0700
commit4f5c4c33882dac37ff87fadce34b9a2592ea4fa2 (patch)
treec98803db16813d7ca5ca20253bd3cd5746e20a15
parentd558a589468459f6266168e0771031a13060c6d3 (diff)
downloadseaweedfs-4f5c4c33882dac37ff87fadce34b9a2592ea4fa2.tar.xz
seaweedfs-4f5c4c33882dac37ff87fadce34b9a2592ea4fa2.zip
refactor
-rw-r--r--weed/mq/broker/broker_grpc_sub.go15
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go15
2 files changed, 15 insertions, 15 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index e07c7b67f..6c03ba409 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -97,21 +97,6 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
})
}
-func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) {
- // get or generate a local partition
- conf, readConfErr := b.readTopicConfFromFiler(t)
- if readConfErr != nil {
- glog.Errorf("topic %v not found: %v", t, readConfErr)
- return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr)
- }
- localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf)
- if getOrGenError != nil {
- glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
- return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
- }
- return localTopicPartition, nil
-}
-
func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) {
if offset.StartTsNs != 0 {
startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index abe25a240..987c60243 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -56,6 +56,21 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.
return conf, nil
}
+func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) {
+ // get or generate a local partition
+ conf, readConfErr := b.readTopicConfFromFiler(t)
+ if readConfErr != nil {
+ glog.Errorf("topic %v not found: %v", t, readConfErr)
+ return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr)
+ }
+ localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf)
+ if getOrGenError != nil {
+ glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
+ return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
+ }
+ return localTopicPartition, nil
+}
+
func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
b.accessLock.Lock()
defer b.accessLock.Unlock()