aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-05 15:14:25 -0800
committerchrislu <chris.lu@gmail.com>2024-01-05 15:14:25 -0800
commitee41dbb7fcff189433e40f4944e5a2df889a9c7f (patch)
tree18b92190c60073381b2efaac99d1f25653445997 /weed/mq
parent1fed37bed005fdaee8b86e972d0521a713e2ab97 (diff)
downloadseaweedfs-ee41dbb7fcff189433e40f4944e5a2df889a9c7f.tar.xz
seaweedfs-ee41dbb7fcff189433e40f4944e5a2df889a9c7f.zip
rename functions
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/broker/broker_grpc_pub.go12
-rw-r--r--weed/mq/broker/broker_grpc_sub.go10
-rw-r--r--weed/mq/client/pub_client/connect.go8
-rw-r--r--weed/mq/client/pub_client/publish.go4
4 files changed, 17 insertions, 17 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 43280e9be..45a573633 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -34,7 +34,7 @@ import (
// Subscribers needs to listen for new partitions and connect to the brokers.
// Each subscription may not get data. It can act as a backup.
-func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
+func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error {
// 1. write to the volume server
// 2. find the topic metadata owning filer
// 3. write to the filer
@@ -44,7 +44,7 @@ func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer
if err != nil {
return err
}
- response := &mq_pb.PublishResponse{}
+ response := &mq_pb.PublishMessageResponse{}
// TODO check whether current broker should be the leader for the topic partition
ackInterval := 1
initMessage := req.GetInit()
@@ -70,7 +70,7 @@ func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer
ackCounter := 0
var ackSequence int64
var isStopping int32
- respChan := make(chan *mq_pb.PublishResponse, 128)
+ respChan := make(chan *mq_pb.PublishMessageResponse, 128)
defer func() {
atomic.StoreInt32(&isStopping, 1)
close(respChan)
@@ -90,7 +90,7 @@ func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer
}
case <-ticker.C:
if atomic.LoadInt32(&isStopping) == 0 {
- response := &mq_pb.PublishResponse{
+ response := &mq_pb.PublishMessageResponse{
AckSequence: ackSequence,
}
respChan <- response
@@ -98,7 +98,7 @@ func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer
return
}
case <-localTopicPartition.StopPublishersCh:
- respChan <- &mq_pb.PublishResponse{
+ respChan <- &mq_pb.PublishMessageResponse{
AckSequence: ackSequence,
ShouldClose: true,
}
@@ -124,7 +124,7 @@ func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer
if ackCounter >= ackInterval {
ackCounter = 0
// send back the ack
- response := &mq_pb.PublishResponse{
+ response := &mq_pb.PublishMessageResponse{
AckSequence: ackSequence,
}
respChan <- response
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index ecf771b9f..1900bb08e 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -10,15 +10,15 @@ import (
"time"
)
-func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error {
+func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error {
t := topic.FromPbTopic(req.GetInit().Topic)
partition := topic.FromPbPartition(req.GetInit().Partition)
localTopicPartition := b.localTopicManager.GetTopicPartition(t, partition)
if localTopicPartition == nil {
- stream.Send(&mq_pb.SubscribeResponse{
- Message: &mq_pb.SubscribeResponse_Ctrl{
- Ctrl: &mq_pb.SubscribeResponse_CtrlMessage{
+ stream.Send(&mq_pb.SubscribeMessageResponse{
+ Message: &mq_pb.SubscribeMessageResponse_Ctrl{
+ Ctrl: &mq_pb.SubscribeMessageResponse_CtrlMessage{
Error: "not initialized",
},
},
@@ -73,7 +73,7 @@ func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb
sleepIntervalCount = 0
value := logEntry.GetData()
- if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{
+ if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
Data: &mq_pb.DataMessage{
Key: []byte(fmt.Sprintf("key-%d", logEntry.PartitionKeyHash)),
Value: value,
diff --git a/weed/mq/client/pub_client/connect.go b/weed/mq/client/pub_client/connect.go
index fc7ff4d77..364d41560 100644
--- a/weed/mq/client/pub_client/connect.go
+++ b/weed/mq/client/pub_client/connect.go
@@ -21,7 +21,7 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str
return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
}
brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
- stream, err := brokerClient.Publish(context.Background())
+ stream, err := brokerClient.SubscribeMessage(context.Background())
if err != nil {
return publishClient, fmt.Errorf("create publish client: %v", err)
}
@@ -29,9 +29,9 @@ func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress str
SeaweedMessaging_PublishClient: stream,
Broker: brokerAddress,
}
- if err = publishClient.Send(&mq_pb.PublishRequest{
- Message: &mq_pb.PublishRequest_Init{
- Init: &mq_pb.PublishRequest_InitMessage{
+ if err = publishClient.Send(&mq_pb.PublishMessageRequest{
+ Message: &mq_pb.PublishMessageRequest_Init{
+ Init: &mq_pb.PublishMessageRequest_InitMessage{
Topic: &mq_pb.Topic{
Namespace: p.namespace,
Name: p.topic,
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index 1e250ede3..2f4367b9d 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -27,8 +27,8 @@ func (p *TopicPublisher) Publish(key, value []byte) error {
//google.golang.org/grpc.(*clientStream).SendMsg(stream.go:894)
//github.com/seaweedfs/seaweedfs/weed/pb/mq_pb.(*seaweedMessagingPublishClient).Send(mq_grpc.pb.go:141)
//github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client.(*TopicPublisher).Publish(publish.go:19)
- if err := publishClient.Send(&mq_pb.PublishRequest{
- Message: &mq_pb.PublishRequest_Data{
+ if err := publishClient.Send(&mq_pb.PublishMessageRequest{
+ Message: &mq_pb.PublishMessageRequest_Data{
Data: &mq_pb.DataMessage{
Key: key,
Value: value,