aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-28 15:55:26 -0800
committerchrislu <chris.lu@gmail.com>2024-01-28 15:55:26 -0800
commita507069d19acaa3f5040e4608178ce073e0a1791 (patch)
tree0f9415ab3e4dba6f4fc72cbdff83c4a1a54a98a3
parent9e6ea80dfff1146e33f0ffac353772bc8a216789 (diff)
downloadseaweedfs-a507069d19acaa3f5040e4608178ce073e0a1791.tar.xz
seaweedfs-a507069d19acaa3f5040e4608178ce073e0a1791.zip
lock for creating local partition
-rw-r--r--weed/mq/broker/broker_grpc_pub.go12
-rw-r--r--weed/mq/broker/broker_grpc_sub.go13
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go13
3 files changed, 24 insertions, 14 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 7ea1db27d..e4861e9bc 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -52,13 +52,11 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
var p topic.Partition
if initMessage != nil {
t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
- localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
- if localTopicPartition == nil {
- if localTopicPartition, err = b.genLocalPartitionFromFiler(t, p); err != nil {
- response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
- glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
- return stream.Send(response)
- }
+ localTopicPartition, err = b.GetOrGenLocalPartition(t, p)
+ if err != nil {
+ response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
+ glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
+ return stream.Send(response)
}
ackInterval = int(initMessage.AckInterval)
stream.Send(response)
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 1a34d49e7..ddd6786d0 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -22,13 +22,12 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
waitIntervalCount := 0
+
var localTopicPartition *topic.LocalPartition
for localTopicPartition == nil {
- localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
- if localTopicPartition == nil {
- if localTopicPartition, err = b.genLocalPartitionFromFiler(t, partition); err != nil {
- glog.V(1).Infof("topic %v partition %v not setup", t, partition)
- }
+ localTopicPartition, err = b.GetOrGenLocalPartition(t, partition)
+ if err != nil {
+ glog.V(1).Infof("topic %v partition %v not setup", t, partition)
}
if localTopicPartition != nil {
break
@@ -75,9 +74,9 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
}
if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST {
- startPosition = log_buffer.NewMessagePosition(1, -2)
+ startPosition = log_buffer.NewMessagePosition(1, -3)
} else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {
- startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -2)
+ startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
}
}
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index c5d8e3b78..dbd0d97c7 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -56,6 +56,19 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.
return conf, nil
}
+func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) {
+ b.accessLock.Lock()
+ defer b.accessLock.Unlock()
+
+ if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
+ localPartition, err = b.genLocalPartitionFromFiler(t, partition)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return localPartition, nil
+}
+
func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) {
self := b.option.BrokerAddress()
conf, err := b.readTopicConfFromFiler(t)