aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-30 09:15:50 -0700
committerchrislu <chris.lu@gmail.com>2024-05-30 09:15:50 -0700
commitff14dfa0eee2f3b3689452ffc78368cb288d53b0 (patch)
tree48ea7214f9776a83150f2ef0750d19908ede31bc /weed/mq
parentb485b9695c8fb1a5e87c2afe7ddb96650d1c248e (diff)
downloadseaweedfs-ff14dfa0eee2f3b3689452ffc78368cb288d53b0.tar.xz
seaweedfs-ff14dfa0eee2f3b3689452ffc78368cb288d53b0.zip
debug
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/broker/broker_grpc_sub.go6
-rw-r--r--weed/mq/broker/broker_grpc_sub_follow.go2
2 files changed, 6 insertions, 2 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 68a80398d..d1131892b 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -66,7 +66,8 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
} else {
defer func() {
println("closing SubscribeFollowMe connection", follower)
- followerGrpcConnection.Close()
+ subscribeFollowMeStream.CloseSend()
+ // followerGrpcConnection.Close()
}()
followerClient := mq_pb.NewSeaweedMessagingClient(followerGrpcConnection)
if subscribeFollowMeStream, err = followerClient.SubscribeFollowMe(ctx); err != nil {
@@ -94,6 +95,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
ack, err := stream.Recv()
if err != nil {
if err == io.EOF {
+ // the client has called CloseSend(). This is to ack the close.
stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{
Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{
IsEndOfStream: true,
@@ -127,6 +129,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}
}
if lastOffset > 0 {
+ glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset)
if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil {
glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err)
}
@@ -204,6 +207,7 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess
return
}
if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
+ glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
return
}
diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go
index 351b904c2..cfea6f7c7 100644
--- a/weed/mq/broker/broker_grpc_sub_follow.go
+++ b/weed/mq/broker/broker_grpc_sub_follow.go
@@ -42,7 +42,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
// Process the received message
if ackMessage := req.GetAck(); ackMessage != nil {
lastOffset = ackMessage.TsNs
- // println("offset", lastOffset)
+ println("sub follower got offset", lastOffset)
} else if closeMessage := req.GetClose(); closeMessage != nil {
glog.V(0).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
return nil