diff options
Diffstat (limited to 'weed/mq/client/pub_client/connect.go')
| -rw-r--r-- | weed/mq/client/pub_client/connect.go | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/weed/mq/client/pub_client/connect.go b/weed/mq/client/pub_client/connect.go index fc7ff4d77..7f6d62a67 100644 --- a/weed/mq/client/pub_client/connect.go +++ b/weed/mq/client/pub_client/connect.go @@ -21,17 +21,17 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err) } brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) - stream, err := brokerClient.Publish(context.Background()) + stream, err := brokerClient.PublishMessage(context.Background()) if err != nil { return publishClient, fmt.Errorf("create publish client: %v", err) } publishClient = &PublishClient{ - SeaweedMessaging_PublishClient: stream, + SeaweedMessaging_PublishMessageClient: stream, Broker: brokerAddress, } - if err = publishClient.Send(&mq_pb.PublishRequest{ - Message: &mq_pb.PublishRequest_Init{ - Init: &mq_pb.PublishRequest_InitMessage{ + if err = publishClient.Send(&mq_pb.PublishMessageRequest{ + Message: &mq_pb.PublishMessageRequest_Init{ + Init: &mq_pb.PublishMessageRequest_InitMessage{ Topic: &mq_pb.Topic{ Namespace: p.namespace, Name: p.topic, @@ -40,6 +40,7 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str RingSize: partition.RingSize, RangeStart: partition.RangeStart, RangeStop: partition.RangeStop, + UnixTimeNs: partition.UnixTimeNs, }, AckInterval: 128, }, |
