aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-20 12:27:45 -0700
committerchrislu <chris.lu@gmail.com>2024-05-20 12:27:45 -0700
commit65dd5ac6fbe93d968fe1a4f526e6df577ac915d8 (patch)
treead1e8ef995eb36690239a717fa56d834d6941d80
parent3b7f24b1fc55fe5e138701b056c3e528573fa7d4 (diff)
downloadseaweedfs-65dd5ac6fbe93d968fe1a4f526e6df577ac915d8.tar.xz
seaweedfs-65dd5ac6fbe93d968fe1a4f526e6df577ac915d8.zip
fix
-rw-r--r--weed/mq/broker/broker_grpc_sub.go3
1 files changed, 3 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index db2964121..7cbde21f8 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -101,6 +101,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
}
imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence)
currentLastOffset := imt.GetOldest()
+ fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset)
if subscribeFollowMeStream != nil && currentLastOffset > lastOffset {
if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
Message: &mq_pb.SubscribeFollowMeRequest_Ack{
@@ -161,6 +162,8 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
// reset the sleep interval count
sleepIntervalCount = 0
+ imt.InflightMessage(logEntry.Key, logEntry.TsNs)
+
if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
Data: &mq_pb.DataMessage{
Key: logEntry.Key,