aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_sub.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub.go')
-rw-r--r--weed/mq/broker/broker_grpc_sub.go19
1 files changed, 9 insertions, 10 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index e6027d26b..1141ff47f 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -130,7 +130,7 @@ func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer
return
}
-func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) {
+func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) {
ctx := stream.Context()
clientName := req.GetInit().ConsumerId
@@ -188,8 +188,7 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe
// to indicate the follower is connected
stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
- },
+ Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{},
},
})
@@ -200,7 +199,7 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe
var prevFlushTsNs int64
- _,_, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
+ _, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
if !isConnected {
return false
}
@@ -285,12 +284,12 @@ func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMe
// send the log entry
if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
Message: &mq_pb.FollowInMemoryMessagesResponse_Data{
- Data: &mq_pb.DataMessage{
- Key: logEntry.Key,
- Value: logEntry.Data,
- TsNs: logEntry.TsNs,
- },
- }}); err != nil {
+ Data: &mq_pb.DataMessage{
+ Key: logEntry.Key,
+ Value: logEntry.Data,
+ TsNs: logEntry.TsNs,
+ },
+ }}); err != nil {
glog.Errorf("Error sending setup response: %v", err)
return false, err
}