diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_pub.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub.go | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 43280e9be..45a573633 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -34,7 +34,7 @@ import ( // Subscribers needs to listen for new partitions and connect to the brokers. // Each subscription may not get data. It can act as a backup. -func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error { +func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error { // 1. write to the volume server // 2. find the topic metadata owning filer // 3. write to the filer @@ -44,7 +44,7 @@ func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer if err != nil { return err } - response := &mq_pb.PublishResponse{} + response := &mq_pb.PublishMessageResponse{} // TODO check whether current broker should be the leader for the topic partition ackInterval := 1 initMessage := req.GetInit() @@ -70,7 +70,7 @@ func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer ackCounter := 0 var ackSequence int64 var isStopping int32 - respChan := make(chan *mq_pb.PublishResponse, 128) + respChan := make(chan *mq_pb.PublishMessageResponse, 128) defer func() { atomic.StoreInt32(&isStopping, 1) close(respChan) @@ -90,7 +90,7 @@ func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer } case <-ticker.C: if atomic.LoadInt32(&isStopping) == 0 { - response := &mq_pb.PublishResponse{ + response := &mq_pb.PublishMessageResponse{ AckSequence: ackSequence, } respChan <- response @@ -98,7 +98,7 @@ func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer return } case <-localTopicPartition.StopPublishersCh: - respChan <- &mq_pb.PublishResponse{ + respChan <- &mq_pb.PublishMessageResponse{ AckSequence: ackSequence, ShouldClose: true, } @@ -124,7 +124,7 @@ func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer if ackCounter >= ackInterval { ackCounter = 0 // send back the ack - response := &mq_pb.PublishResponse{ + response := &mq_pb.PublishMessageResponse{ AckSequence: ackSequence, } respChan <- response |
