diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-28 15:55:26 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-28 15:55:26 -0800 |
| commit | a507069d19acaa3f5040e4608178ce073e0a1791 (patch) | |
| tree | 0f9415ab3e4dba6f4fc72cbdff83c4a1a54a98a3 | |
| parent | 9e6ea80dfff1146e33f0ffac353772bc8a216789 (diff) | |
| download | seaweedfs-a507069d19acaa3f5040e4608178ce073e0a1791.tar.xz seaweedfs-a507069d19acaa3f5040e4608178ce073e0a1791.zip | |
lock for creating local partition
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub.go | 12 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 13 | ||||
| -rw-r--r-- | weed/mq/broker/broker_topic_conf_read_write.go | 13 |
3 files changed, 24 insertions, 14 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 7ea1db27d..e4861e9bc 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -52,13 +52,11 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis var p topic.Partition if initMessage != nil { t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) - localTopicPartition = b.localTopicManager.GetTopicPartition(t, p) - if localTopicPartition == nil { - if localTopicPartition, err = b.genLocalPartitionFromFiler(t, p); err != nil { - response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) - glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) - return stream.Send(response) - } + localTopicPartition, err = b.GetOrGenLocalPartition(t, p) + if err != nil { + response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) + glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition) + return stream.Send(response) } ackInterval = int(initMessage.AckInterval) stream.Send(response) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 1a34d49e7..ddd6786d0 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -22,13 +22,12 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) waitIntervalCount := 0 + var localTopicPartition *topic.LocalPartition for localTopicPartition == nil { - localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) - if localTopicPartition == nil { - if localTopicPartition, err = b.genLocalPartitionFromFiler(t, partition); err != nil { - glog.V(1).Infof("topic %v partition %v not setup", t, partition) - } + localTopicPartition, err = b.GetOrGenLocalPartition(t, partition) + if err != nil { + glog.V(1).Infof("topic %v partition %v not setup", t, partition) } if localTopicPartition != nil { break @@ -75,9 +74,9 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2) } if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST { - startPosition = log_buffer.NewMessagePosition(1, -2) + startPosition = log_buffer.NewMessagePosition(1, -3) } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST { - startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -2) + startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4) } } diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go index c5d8e3b78..dbd0d97c7 100644 --- a/weed/mq/broker/broker_topic_conf_read_write.go +++ b/weed/mq/broker/broker_topic_conf_read_write.go @@ -56,6 +56,19 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb. return conf, nil } +func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) { + b.accessLock.Lock() + defer b.accessLock.Unlock() + + if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil { + localPartition, err = b.genLocalPartitionFromFiler(t, partition) + if err != nil { + return nil, err + } + } + return localPartition, nil +} + func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) { self := b.option.BrokerAddress() conf, err := b.readTopicConfFromFiler(t) |
