aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-05 15:16:01 -0800
committerchrislu <chris.lu@gmail.com>2024-01-05 15:16:01 -0800
commitddd0fde0949b36809ad589c257aeb9fb60f2d3f0 (patch)
tree601d12470f208cc37452d502e67647052309489e
parentee41dbb7fcff189433e40f4944e5a2df889a9c7f (diff)
downloadseaweedfs-ddd0fde0949b36809ad589c257aeb9fb60f2d3f0.tar.xz
seaweedfs-ddd0fde0949b36809ad589c257aeb9fb60f2d3f0.zip
rename functions
-rw-r--r--weed/mq/client/cmd/weed_pub/publisher.go1
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go8
2 files changed, 5 insertions, 4 deletions
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go
index ee00be9f8..d1f2e7c90 100644
--- a/weed/mq/client/cmd/weed_pub/publisher.go
+++ b/weed/mq/client/cmd/weed_pub/publisher.go
@@ -36,6 +36,7 @@ func main() {
flag.Parse()
config := &pub_client.PublisherConfiguration{
CreateTopic: true,
+ CreateTopicPartitionCount: 1,
}
publisher := pub_client.NewTopicPublisher(*namespace, *topic, config)
brokers := strings.Split(*seedBrokers, ",")
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index afafb15ea..51b8b1f8c 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -99,9 +99,9 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo
func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) error {
// connect to the partition broker
return pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
- subscribeClient, err := client.Subscribe(context.Background(), &mq_pb.SubscribeRequest{
- Message: &mq_pb.SubscribeRequest_Init{
- Init: &mq_pb.SubscribeRequest_InitMessage{
+ subscribeClient, err := client.SubscribeMessage(context.Background(), &mq_pb.SubscribeMessageRequest{
+ Message: &mq_pb.SubscribeMessageRequest_Init{
+ Init: &mq_pb.SubscribeMessageRequest_InitMessage{
ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
Topic: &mq_pb.Topic{
@@ -114,7 +114,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
RangeStop: partition.RangeStop,
},
Filter: sub.ContentConfig.Filter,
- Offset: &mq_pb.SubscribeRequest_InitMessage_StartTimestampNs{
+ Offset: &mq_pb.SubscribeMessageRequest_InitMessage_StartTimestampNs{
StartTimestampNs: sub.alreadyProcessedTsNs,
},
},