diff options
| author | chrislu <chris.lu@gmail.com> | 2024-05-20 12:27:45 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-05-20 12:27:45 -0700 |
| commit | 65dd5ac6fbe93d968fe1a4f526e6df577ac915d8 (patch) | |
| tree | ad1e8ef995eb36690239a717fa56d834d6941d80 | |
| parent | 3b7f24b1fc55fe5e138701b056c3e528573fa7d4 (diff) | |
| download | seaweedfs-65dd5ac6fbe93d968fe1a4f526e6df577ac915d8.tar.xz seaweedfs-65dd5ac6fbe93d968fe1a4f526e6df577ac915d8.zip | |
fix
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 3 |
1 files changed, 3 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index db2964121..7cbde21f8 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -101,6 +101,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence) currentLastOffset := imt.GetOldest() + fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset) if subscribeFollowMeStream != nil && currentLastOffset > lastOffset { if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{ Message: &mq_pb.SubscribeFollowMeRequest_Ack{ @@ -161,6 +162,8 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs // reset the sleep interval count sleepIntervalCount = 0 + imt.InflightMessage(logEntry.Key, logEntry.TsNs) + if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{ Data: &mq_pb.DataMessage{ Key: logEntry.Key, |
