aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-09-30 21:26:09 -0700
committerchrislu <chris.lu@gmail.com>2023-09-30 21:26:09 -0700
commitc4e65451971f989a997963c1443d726c4d50c663 (patch)
treef30c16ff427fdccc7a114574ee841ad0d8857b7a
parent358cba43ef8e3e37c91df1a044933253d7941be8 (diff)
downloadseaweedfs-c4e65451971f989a997963c1443d726c4d50c663.tar.xz
seaweedfs-c4e65451971f989a997963c1443d726c4d50c663.zip
fix compilation
-rw-r--r--weed/mq/broker/broker_grpc_sub.go6
1 files changed, 3 insertions, 3 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index ad65df2d1..8e9bcc52d 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -11,8 +11,8 @@ import (
func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error {
- localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.Cursor.Topic),
- topic.FromPbPartition(req.Cursor.Partition))
+ localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.GetConsumer().Topic),
+ topic.FromPbPartition(req.GetConsumer().Partition))
if localTopicPartition == nil {
stream.Send(&mq_pb.SubscribeResponse{
Message: &mq_pb.SubscribeResponse_Ctrl{
@@ -24,7 +24,7 @@ func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream
return nil
}
- clientName := fmt.Sprintf("%s/%s-%s", req.Consumer.ConsumerGroup, req.Consumer.ConsumerId, req.Consumer.ClientId)
+ clientName := fmt.Sprintf("%s/%s-%s", req.GetConsumer().ConsumerGroup, req.GetConsumer().ConsumerId, req.GetConsumer().ClientId)
localTopicPartition.Subscribe(clientName, time.Now(), func(logEntry *filer_pb.LogEntry) error {
value := logEntry.GetData()