aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-22 00:49:57 -0800
committerchrislu <chris.lu@gmail.com>2024-01-22 00:49:57 -0800
commitc77d35313e8376798bb6f0dd63bf748a42d84370 (patch)
tree64a3276f4daf83984fbd1961e6cc4f45067d83e7
parentb0a2e9aea37937d40c9fa097803237bf89a7f908 (diff)
downloadseaweedfs-c77d35313e8376798bb6f0dd63bf748a42d84370.tar.xz
seaweedfs-c77d35313e8376798bb6f0dd63bf748a42d84370.zip
pub/sub broker only check local assigned partitions
-rw-r--r--weed/mq/broker/broker_grpc_pub.go16
-rw-r--r--weed/mq/broker/broker_grpc_sub.go34
2 files changed, 16 insertions, 34 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index f37629b81..ad878a88c 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -52,10 +52,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
var p topic.Partition
if initMessage != nil {
t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
- localTopicPartition, err = b.loadLocalTopicPartition(t, p)
- if err != nil {
- response.Error = fmt.Sprintf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err)
- glog.Errorf("topic %v partition %v not setup: %v", initMessage.Topic, initMessage.Partition, err)
+ localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
+ if localTopicPartition == nil {
+ response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
+ glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
return stream.Send(response)
}
ackInterval = int(initMessage.AckInterval)
@@ -141,14 +141,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return nil
}
-func (b *MessageQueueBroker) loadLocalTopicPartition(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) {
- localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
- if localTopicPartition == nil {
- localTopicPartition, err = b.loadLocalTopicPartitionFromFiler(t, p)
- }
- return localTopicPartition, err
-}
-
func (b *MessageQueueBroker) loadLocalTopicPartitionFromFiler(t topic.Topic, p topic.Partition) (localTopicPartition *topic.LocalPartition, err error) {
self := b.option.BrokerAddress()
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 2f4af3be9..72101ba86 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -24,31 +24,21 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
var localTopicPartition *topic.LocalPartition
localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
for localTopicPartition == nil {
- localTopicPartition, err = b.loadLocalTopicPartitionFromFiler(t, partition)
- // if not created, return error
- if err != nil {
- stream.Send(&mq_pb.SubscribeMessageResponse{
- Message: &mq_pb.SubscribeMessageResponse_Ctrl{
- Ctrl: &mq_pb.SubscribeMessageResponse_CtrlMessage{
- Error: fmt.Sprintf("topic %v partition %v not setup: %v", t, partition, err),
- },
- },
- })
- time.Sleep(337 * time.Millisecond)
- // Check if the client has disconnected by monitoring the context
- select {
- case <-ctx.Done():
- err := ctx.Err()
- if err == context.Canceled {
- // Client disconnected
- return nil
- }
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
+ time.Sleep(337 * time.Millisecond)
+ // Check if the client has disconnected by monitoring the context
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err == context.Canceled {
+ // Client disconnected
return nil
- default:
- // Continue processing the request
}
+ glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
+ return nil
+ default:
+ // Continue processing the request
}
+ localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
}
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())