aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-22 11:44:21 -0800
committerchrislu <chris.lu@gmail.com>2024-01-22 11:44:21 -0800
commit2beaa2d0b3d31d4686139f0b4c151e348ec6d340 (patch)
treeb904a106b34b529e7d47bb0dcb88b961055f53e0
parentc3f8530f97f91ca53854e80ec95585bf5501113c (diff)
downloadseaweedfs-2beaa2d0b3d31d4686139f0b4c151e348ec6d340.tar.xz
seaweedfs-2beaa2d0b3d31d4686139f0b4c151e348ec6d340.zip
pub/sub brokers check filer for assigned partitions
-rw-r--r--weed/mq/broker/broker_grpc_pub.go36
-rw-r--r--weed/mq/broker/broker_grpc_sub.go11
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go17
3 files changed, 31 insertions, 33 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index ad878a88c..7ea1db27d 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -54,9 +54,11 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
if localTopicPartition == nil {
- response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
- glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
- return stream.Send(response)
+ if localTopicPartition, err = b.genLocalPartitionFromFiler(t, p); err != nil {
+ response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
+ glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
+ return stream.Send(response)
+ }
}
ackInterval = int(initMessage.AckInterval)
stream.Send(response)
@@ -141,34 +143,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return nil
}
-func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) {
- self := b.option.BrokerAddress()
-
- // load local topic partition from configuration on filer if not found
- var conf *mq_pb.ConfigureTopicResponse
- conf, err = b.readTopicConfFromFiler(t)
- if err != nil {
- return nil, err
- }
-
- // create local topic partition
- var hasCreated bool
- for _, assignment := range conf.BrokerPartitionAssignments {
- if assignment.LeaderBroker == string(self) && p.Equals(topic.FromPbPartition(assignment.Partition)) {
- localTopicPartition = topic.FromPbBrokerPartitionAssignment(self, p, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
- b.localTopicManager.AddTopicPartition(t, localTopicPartition)
- hasCreated = true
- break
- }
- }
-
- if !hasCreated {
- return nil, fmt.Errorf("topic %v partition %v not assigned to broker %v", t, p, self)
- }
-
- return localTopicPartition, nil
-}
-
// duplicated from master_grpc_server.go
func findClientAddress(ctx context.Context) string {
// fmt.Printf("FromContext %+v\n", ctx)
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 72101ba86..8a6acadb9 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -22,8 +22,16 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
var localTopicPartition *topic.LocalPartition
- localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
for localTopicPartition == nil {
+ localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
+ if localTopicPartition == nil {
+ if localTopicPartition, err = b.genLocalPartitionFromFiler(t, partition); err != nil {
+ glog.V(1).Infof("topic %v partition %v not setup", t, partition)
+ }
+ }
+ if localTopicPartition != nil {
+ break
+ }
time.Sleep(337 * time.Millisecond)
// Check if the client has disconnected by monitoring the context
select {
@@ -38,7 +46,6 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
default:
// Continue processing the request
}
- localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
}
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index 710e95b38..0eeefbdf0 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -53,6 +53,23 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.
return conf, nil
}
+func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, err error) {
+ self := b.option.BrokerAddress()
+ conf, err := b.readTopicConfFromFiler(t)
+ if err != nil {
+ return nil, err
+ }
+ for _, assignment := range conf.BrokerPartitionAssignments {
+ if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
+ localPartition = topic.FromPbBrokerPartitionAssignment(b.option.BrokerAddress(), partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
+ b.localTopicManager.AddTopicPartition(t, localPartition)
+ break
+ }
+ }
+
+ return localPartition, nil
+}
+
func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) (err error) {
// also fix assignee broker if invalid
addedAssignments, updatedAssignments := pub_balancer.EnsureAssignmentsToActiveBrokers(b.Balancer.Brokers, conf.BrokerPartitionAssignments)