diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-11 23:08:02 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-11 23:08:02 -0800 |
| commit | 7afaad31a45bd8733e34ab286b30fb428180a23f (patch) | |
| tree | d0105e89ae5d30735c34c19b9191a495b831aece | |
| parent | 61dbdd0ff68dc762abe272ac9c75a6c506b3e5cf (diff) | |
| download | seaweedfs-7afaad31a45bd8733e34ab286b30fb428180a23f.tar.xz seaweedfs-7afaad31a45bd8733e34ab286b30fb428180a23f.zip | |
wait on local topic partition creation
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 29 |
1 files changed, 24 insertions, 5 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 782399d37..ed6b5a900 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -13,10 +13,17 @@ import ( func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error { + ctx := stream.Context() + clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId) + t := topic.FromPbTopic(req.GetInit().Topic) partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition()) - localTopicPartition := b.localTopicManager.GetTopicPartition(t, partition) - if localTopicPartition == nil { + + glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition) + + var localTopicPartition *topic.LocalPartition + localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) + for localTopicPartition == nil { stream.Send(&mq_pb.SubscribeMessageResponse{ Message: &mq_pb.SubscribeMessageResponse_Ctrl{ Ctrl: &mq_pb.SubscribeMessageResponse_CtrlMessage{ @@ -24,10 +31,23 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest }, }, }) - return nil + time.Sleep(337 * time.Millisecond) + // Check if the client has disconnected by monitoring the context + select { + case <-ctx.Done(): + err := ctx.Err() + if err == context.Canceled { + // Client disconnected + return nil + } + glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err) + return nil + default: + // Continue processing the request + } + localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition) } - clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId) localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber()) glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition) isConnected := true @@ -38,7 +58,6 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest glog.V(0).Infof("Subscriber %s on %v %v disconnected", clientName, t, partition) }() - ctx := stream.Context() var startPosition log_buffer.MessagePosition var inMemoryOnly bool if req.GetInit()!=nil && req.GetInit().GetPartitionOffset() != nil { |
