diff options
| author | chrislu <chris.lu@gmail.com> | 2024-05-30 09:15:50 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-05-30 09:15:50 -0700 |
| commit | ff14dfa0eee2f3b3689452ffc78368cb288d53b0 (patch) | |
| tree | 48ea7214f9776a83150f2ef0750d19908ede31bc | |
| parent | b485b9695c8fb1a5e87c2afe7ddb96650d1c248e (diff) | |
| download | seaweedfs-ff14dfa0eee2f3b3689452ffc78368cb288d53b0.tar.xz seaweedfs-ff14dfa0eee2f3b3689452ffc78368cb288d53b0.zip | |
debug
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 6 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub_follow.go | 2 |
2 files changed, 6 insertions, 2 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 68a80398d..d1131892b 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -66,7 +66,8 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } else { defer func() { println("closing SubscribeFollowMe connection", follower) - followerGrpcConnection.Close() + subscribeFollowMeStream.CloseSend() + // followerGrpcConnection.Close() }() followerClient := mq_pb.NewSeaweedMessagingClient(followerGrpcConnection) if subscribeFollowMeStream, err = followerClient.SubscribeFollowMe(ctx); err != nil { @@ -94,6 +95,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs ack, err := stream.Recv() if err != nil { if err == io.EOF { + // the client has called CloseSend(). This is to ack the close. stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{ Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{ IsEndOfStream: true, @@ -127,6 +129,7 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs } } if lastOffset > 0 { + glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset) if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil { glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err) } @@ -204,6 +207,7 @@ func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMess return } if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil { + glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset) startPosition = log_buffer.NewMessagePosition(storedOffset, -2) return } diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go index 351b904c2..cfea6f7c7 100644 --- a/weed/mq/broker/broker_grpc_sub_follow.go +++ b/weed/mq/broker/broker_grpc_sub_follow.go @@ -42,7 +42,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub // Process the received message if ackMessage := req.GetAck(); ackMessage != nil { lastOffset = ackMessage.TsNs - // println("offset", lastOffset) + println("sub follower got offset", lastOffset) } else if closeMessage := req.GetClose(); closeMessage != nil { glog.V(0).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage) return nil |
