aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-15 21:08:42 -0700
committerchrislu <chris.lu@gmail.com>2024-03-15 21:08:42 -0700
commit55714f54aba2f43f4a6b372cf14d610760d8d8ca (patch)
tree5cf798aa094fc6c6a453ac2becf220a720373743
parent56ce040ab6fd7359e3a573e127cc924d58eebddf (diff)
downloadseaweedfs-55714f54aba2f43f4a6b372cf14d610760d8d8ca.tar.xz
seaweedfs-55714f54aba2f43f4a6b372cf14d610760d8d8ca.zip
local partition is generated or not
-rw-r--r--weed/mq/broker/broker_grpc_pub.go2
-rw-r--r--weed/mq/broker/broker_grpc_sub.go4
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go15
3 files changed, 11 insertions, 10 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 8c46ea99d..4d37d5393 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -53,7 +53,7 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
var p topic.Partition
if initMessage != nil {
t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
- localTopicPartition, err = b.GetOrGenLocalPartition(t, p)
+ localTopicPartition, _, err = b.GetOrGenLocalPartition(t, p)
if err != nil {
response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 5fd4522bd..e6027d26b 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -26,7 +26,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
var localTopicPartition *topic.LocalPartition
for localTopicPartition == nil {
- localTopicPartition, err = b.GetOrGenLocalPartition(t, partition)
+ localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
if err != nil {
glog.V(1).Infof("topic %v partition %v not setup", t, partition)
}
@@ -143,7 +143,7 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe
var localTopicPartition *topic.LocalPartition
for localTopicPartition == nil {
- localTopicPartition, err = b.GetOrGenLocalPartition(t, partition)
+ localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
if err != nil {
glog.V(1).Infof("topic %v partition %v not setup", t, partition)
}
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index 4bcb62931..35d95c0e4 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -56,34 +56,35 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.
return conf, nil
}
-func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) {
+func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
b.accessLock.Lock()
defer b.accessLock.Unlock()
if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
- localPartition, err = b.genLocalPartitionFromFiler(t, partition)
+ localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition)
if err != nil {
- return nil, err
+ return nil, false, err
}
}
- return localPartition, nil
+ return localPartition, isGenerated, nil
}
-func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) {
+func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
self := b.option.BrokerAddress()
conf, err := b.readTopicConfFromFiler(t)
if err != nil {
- return nil, err
+ return nil, isGenerated, err
}
for _, assignment := range conf.BrokerPartitionAssignments {
if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
localPartition = topic.FromPbBrokerPartitionAssignment(b.option.BrokerAddress(), partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
b.localTopicManager.AddTopicPartition(t, localPartition)
+ isGenerated = true
break
}
}
- return localPartition, nil
+ return localPartition, isGenerated, nil
}
func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) {