diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 19 |
1 files changed, 9 insertions, 10 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index e6027d26b..1141ff47f 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -130,7 +130,7 @@ func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer return } -func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) { +func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) { ctx := stream.Context() clientName := req.GetInit().ConsumerId @@ -188,8 +188,7 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe // to indicate the follower is connected stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{ - Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{ - }, + Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{}, }, }) @@ -200,7 +199,7 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe var prevFlushTsNs int64 - _,_, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool { + _, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool { if !isConnected { return false } @@ -285,12 +284,12 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe // send the log entry if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{ Message: &mq_pb.FollowInMemoryMessagesResponse_Data{ - Data: &mq_pb.DataMessage{ - Key: logEntry.Key, - Value: logEntry.Data, - TsNs: logEntry.TsNs, - }, - }}); err != nil { + Data: &mq_pb.DataMessage{ + Key: logEntry.Key, + Value: logEntry.Data, + TsNs: logEntry.TsNs, + }, + }}); err != nil { glog.Errorf("Error sending setup response: %v", err) return false, err } |
