aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/pub_client/connect.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/client/pub_client/connect.go')
-rw-r--r--weed/mq/client/pub_client/connect.go11
1 files changed, 6 insertions, 5 deletions
diff --git a/weed/mq/client/pub_client/connect.go b/weed/mq/client/pub_client/connect.go
index fc7ff4d77..7f6d62a67 100644
--- a/weed/mq/client/pub_client/connect.go
+++ b/weed/mq/client/pub_client/connect.go
@@ -21,17 +21,17 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str
return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
}
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
- stream, err := brokerClient.Publish(context.Background())
+ stream, err := brokerClient.PublishMessage(context.Background())
if err != nil {
return publishClient, fmt.Errorf("create publish client: %v", err)
}
publishClient = &PublishClient{
- SeaweedMessaging_PublishClient: stream,
+ SeaweedMessaging_PublishMessageClient: stream,
Broker: brokerAddress,
}
- if err = publishClient.Send(&mq_pb.PublishRequest{
- Message: &mq_pb.PublishRequest_Init{
- Init: &mq_pb.PublishRequest_InitMessage{
+ if err = publishClient.Send(&mq_pb.PublishMessageRequest{
+ Message: &mq_pb.PublishMessageRequest_Init{
+ Init: &mq_pb.PublishMessageRequest_InitMessage{
Topic: &mq_pb.Topic{
Namespace: p.namespace,
Name: p.topic,
@@ -40,6 +40,7 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str
RingSize: partition.RingSize,
RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop,
+ UnixTimeNs: partition.UnixTimeNs,
},
AckInterval: 128,
},