aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-11 23:08:02 -0800
committerchrislu <chris.lu@gmail.com>2024-01-11 23:08:02 -0800
commit7afaad31a45bd8733e34ab286b30fb428180a23f (patch)
treed0105e89ae5d30735c34c19b9191a495b831aece
parent61dbdd0ff68dc762abe272ac9c75a6c506b3e5cf (diff)
downloadseaweedfs-7afaad31a45bd8733e34ab286b30fb428180a23f.tar.xz
seaweedfs-7afaad31a45bd8733e34ab286b30fb428180a23f.zip
wait on local topic partition creation
-rw-r--r--weed/mq/broker/broker_grpc_sub.go29
1 files changed, 24 insertions, 5 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 782399d37..ed6b5a900 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -13,10 +13,17 @@ import (
func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error {
+ ctx := stream.Context()
+ clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
+
t := topic.FromPbTopic(req.GetInit().Topic)
partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
- localTopicPartition := b.localTopicManager.GetTopicPartition(t, partition)
- if localTopicPartition == nil {
+
+ glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
+
+ var localTopicPartition *topic.LocalPartition
+ localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
+ for localTopicPartition == nil {
stream.Send(&mq_pb.SubscribeMessageResponse{
Message: &mq_pb.SubscribeMessageResponse_Ctrl{
Ctrl: &mq_pb.SubscribeMessageResponse_CtrlMessage{
@@ -24,10 +31,23 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
},
},
})
- return nil
+ time.Sleep(337 * time.Millisecond)
+ // Check if the client has disconnected by monitoring the context
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err == context.Canceled {
+ // Client disconnected
+ return nil
+ }
+ glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
+ return nil
+ default:
+ // Continue processing the request
+ }
+ localTopicPartition = b.localTopicManager.GetTopicPartition(t, partition)
}
- clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
isConnected := true
@@ -38,7 +58,6 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
glog.V(0).Infof("Subscriber %s on %v %v disconnected", clientName, t, partition)
}()
- ctx := stream.Context()
var startPosition log_buffer.MessagePosition
var inMemoryOnly bool
if req.GetInit()!=nil && req.GetInit().GetPartitionOffset() != nil {