diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-05 15:16:01 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-05 15:16:01 -0800 |
| commit | ddd0fde0949b36809ad589c257aeb9fb60f2d3f0 (patch) | |
| tree | 601d12470f208cc37452d502e67647052309489e /weed | |
| parent | ee41dbb7fcff189433e40f4944e5a2df889a9c7f (diff) | |
| download | seaweedfs-ddd0fde0949b36809ad589c257aeb9fb60f2d3f0.tar.xz seaweedfs-ddd0fde0949b36809ad589c257aeb9fb60f2d3f0.zip | |
rename functions
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/mq/client/cmd/weed_pub/publisher.go | 1 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/connect_to_sub_coordinator.go | 8 |
2 files changed, 5 insertions, 4 deletions
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go index ee00be9f8..d1f2e7c90 100644 --- a/weed/mq/client/cmd/weed_pub/publisher.go +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -36,6 +36,7 @@ func main() { flag.Parse() config := &pub_client.PublisherConfiguration{ CreateTopic: true, + CreateTopicPartitionCount: 1, } publisher := pub_client.NewTopicPublisher(*namespace, *topic, config) brokers := strings.Split(*seedBrokers, ",") diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index afafb15ea..51b8b1f8c 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -99,9 +99,9 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) error { // connect to the partition broker return pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { - subscribeClient, err := client.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ - Message: &mq_pb.SubscribeRequest_Init{ - Init: &mq_pb.SubscribeRequest_InitMessage{ + subscribeClient, err := client.SubscribeMessage(context.Background(), &mq_pb.SubscribeMessageRequest{ + Message: &mq_pb.SubscribeMessageRequest_Init{ + Init: &mq_pb.SubscribeMessageRequest_InitMessage{ ConsumerGroup: sub.SubscriberConfig.ConsumerGroup, ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId, Topic: &mq_pb.Topic{ @@ -114,7 +114,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s RangeStop: partition.RangeStop, }, Filter: sub.ContentConfig.Filter, - Offset: &mq_pb.SubscribeRequest_InitMessage_StartTimestampNs{ + Offset: &mq_pb.SubscribeMessageRequest_InitMessage_StartTimestampNs{ StartTimestampNs: sub.alreadyProcessedTsNs, }, }, |
