aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docker/compose/local-brokers-compose.yml6
-rw-r--r--weed/mq/broker/broker_grpc_pub.go188
-rw-r--r--weed/mq/broker/broker_grpc_pub_follow.go109
-rw-r--r--weed/mq/broker/broker_grpc_sub.go210
-rw-r--r--weed/mq/broker/broker_grpc_sub_coordinator.go9
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go25
-rw-r--r--weed/mq/client/pub_client/publish.go2
-rw-r--r--weed/mq/client/pub_client/scheduler.go29
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go21
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go8
-rw-r--r--weed/mq/topic/local_partition.go24
-rw-r--r--weed/pb/mq.proto58
-rw-r--r--weed/pb/mq_pb/mq.pb.go1163
-rw-r--r--weed/pb/mq_pb/mq_grpc.pb.go102
14 files changed, 748 insertions, 1206 deletions
diff --git a/docker/compose/local-brokers-compose.yml b/docker/compose/local-brokers-compose.yml
index 5f92d9bad..78ae180ac 100644
--- a/docker/compose/local-brokers-compose.yml
+++ b/docker/compose/local-brokers-compose.yml
@@ -6,7 +6,7 @@ services:
ports:
- 9333:9333
- 19333:19333
- command: "-v=1 master -volumeSizeLimitMB 100 -resumeState=false -ip=master0 -port=9333 -peers=master0:9333,master1:9334,master2:9335 -mdir=/tmp"
+ command: "-v=0 master -volumeSizeLimitMB 100 -resumeState=false -ip=master0 -port=9333 -peers=master0:9333,master1:9334,master2:9335 -mdir=/tmp"
environment:
WEED_MASTER_VOLUME_GROWTH_COPY_1: 1
WEED_MASTER_VOLUME_GROWTH_COPY_2: 2
@@ -16,7 +16,7 @@ services:
ports:
- 9334:9334
- 19334:19334
- command: "-v=1 master -volumeSizeLimitMB 100 -resumeState=false -ip=master1 -port=9334 -peers=master0:9333,master1:9334,master2:9335 -mdir=/tmp"
+ command: "-v=0 master -volumeSizeLimitMB 100 -resumeState=false -ip=master1 -port=9334 -peers=master0:9333,master1:9334,master2:9335 -mdir=/tmp"
environment:
WEED_MASTER_VOLUME_GROWTH_COPY_1: 1
WEED_MASTER_VOLUME_GROWTH_COPY_2: 2
@@ -26,7 +26,7 @@ services:
ports:
- 9335:9335
- 19335:19335
- command: "-v=1 master -volumeSizeLimitMB 100 -resumeState=false -ip=master2 -port=9335 -peers=master0:9333,master1:9334,master2:9335 -mdir=/tmp"
+ command: "-v=0 master -volumeSizeLimitMB 100 -resumeState=false -ip=master2 -port=9335 -peers=master0:9333,master1:9334,master2:9335 -mdir=/tmp"
environment:
WEED_MASTER_VOLUME_GROWTH_COPY_1: 1
WEED_MASTER_VOLUME_GROWTH_COPY_2: 2
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 3b68db1af..57a860cf1 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -11,7 +11,6 @@ import (
"io"
"math/rand"
"net"
- "sync/atomic"
)
// PUB
@@ -40,7 +39,6 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
// 2. find the topic metadata owning filer
// 3. write to the filer
- var localTopicPartition *topic.LocalPartition
req, err := stream.Recv()
if err != nil {
return err
@@ -49,62 +47,121 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
// TODO check whether current broker should be the leader for the topic partition
ackInterval := 1
initMessage := req.GetInit()
- var t topic.Topic
- var p topic.Partition
- if initMessage != nil {
- t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
- localTopicPartition, _, err = b.GetOrGenLocalPartition(t, p)
+ if initMessage == nil {
+ response.Error = fmt.Sprintf("missing init message")
+ glog.Errorf("missing init message")
+ return stream.Send(response)
+ }
+
+ // get or generate a local partition
+ t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
+ localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, p)
+ if getOrGenErr != nil {
+ response.Error = fmt.Sprintf("topic %v not found: %v", t, getOrGenErr)
+ glog.Errorf("topic %v not found: %v", t, getOrGenErr)
+ return stream.Send(response)
+ }
+
+ ackInterval = int(initMessage.AckInterval)
+
+ // connect to follower brokers
+ if localTopicPartition.FollowerStream == nil && len(initMessage.FollowerBrokers) > 0 {
+ follower := initMessage.FollowerBrokers[0]
+ ctx := stream.Context()
+ localTopicPartition.GrpcConnection, err = pb.GrpcDial(ctx, follower, true, b.grpcDialOption)
if err != nil {
- response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
- glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
+ response.Error = fmt.Sprintf("fail to dial %s: %v", follower, err)
+ glog.Errorf("fail to dial %s: %v", follower, err)
return stream.Send(response)
}
- ackInterval = int(initMessage.AckInterval)
- for _, follower := range initMessage.FollowerBrokers {
- followErr := b.withBrokerClient(false, pb.ServerAddress(follower), func(client mq_pb.SeaweedMessagingClient) error {
- _, err := client.PublishFollowMe(context.Background(), &mq_pb.PublishFollowMeRequest{
+ followerClient := mq_pb.NewSeaweedMessagingClient(localTopicPartition.GrpcConnection)
+ localTopicPartition.FollowerStream, err = followerClient.PublishFollowMe(ctx)
+ if err != nil {
+ response.Error = fmt.Sprintf("fail to create publish client: %v", err)
+ glog.Errorf("fail to create publish client: %v", err)
+ return stream.Send(response)
+ }
+ if err = localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Init{
+ Init: &mq_pb.PublishFollowMeRequest_InitMessage{
Topic: initMessage.Topic,
Partition: initMessage.Partition,
- BrokerSelf: string(b.option.BrokerAddress()),
- })
- return err
- })
- if followErr != nil {
- response.Error = fmt.Sprintf("follower %v failed: %v", follower, followErr)
- glog.Errorf("follower %v failed: %v", follower, followErr)
- return stream.Send(response)
- }
+ },
+ },
+ }); err != nil {
+ return err
}
- stream.Send(response)
- } else {
- response.Error = fmt.Sprintf("missing init message")
- glog.Errorf("missing init message")
- return stream.Send(response)
+
+ // start receiving ack from follower
+ go func() {
+ defer func() {
+ println("stop receiving ack from follower")
+ }()
+
+ for {
+ ack, err := localTopicPartition.FollowerStream.Recv()
+ if err != nil {
+ glog.Errorf("Error receiving follower ack: %v", err)
+ return
+ }
+ println("recv ack", ack.AckTsNs)
+ if err := stream.Send(&mq_pb.PublishMessageResponse{
+ AckSequence: ack.AckTsNs,
+ }); err != nil {
+ glog.Errorf("Error sending publisher ack %v: %v", ack, err)
+ return
+ }
+ }
+ }()
}
+ // process each published messages
clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
ackCounter := 0
var ackSequence int64
- var isStopping int32
- respChan := make(chan *mq_pb.PublishMessageResponse, 128)
defer func() {
- atomic.StoreInt32(&isStopping, 1)
- respChan <- &mq_pb.PublishMessageResponse{
- AckSequence: ackSequence,
- }
- close(respChan)
+ // remove the publisher
localTopicPartition.Publishers.RemovePublisher(clientName)
glog.V(0).Infof("topic %v partition %v published %d messges Publisher:%d Subscriber:%d", initMessage.Topic, initMessage.Partition, ackSequence, localTopicPartition.Publishers.Size(), localTopicPartition.Subscribers.Size())
if localTopicPartition.MaybeShutdownLocalPartition() {
+ if localTopicPartition.FollowerStream != nil {
+ // send close to the follower
+ if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Close{
+ Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
+ },
+ }); followErr != nil {
+ glog.Errorf("Error closing follower stream: %v", followErr)
+ }
+ println("closing grpcConnection to follower")
+ localTopicPartition.GrpcConnection.Close()
+ }
b.localTopicManager.RemoveTopicPartition(t, p)
+ glog.V(0).Infof("Removed local topic %v partition %v", initMessage.Topic, initMessage.Partition)
}
}()
- go func() {
- for resp := range respChan {
- if err := stream.Send(resp); err != nil {
- glog.Errorf("Error sending response %v: %v", resp, err)
+
+ // send a hello message
+ stream.Send(&mq_pb.PublishMessageResponse{})
+
+ var receivedSequence, acknowledgedSequence int64
+
+ defer func() {
+ if localTopicPartition.FollowerStream != nil {
+ //if err := followerStream.CloseSend(); err != nil {
+ // glog.Errorf("Error closing follower stream: %v", err)
+ //}
+ } else {
+ if acknowledgedSequence < receivedSequence {
+ acknowledgedSequence = receivedSequence
+ response := &mq_pb.PublishMessageResponse{
+ AckSequence: acknowledgedSequence,
+ }
+ if err := stream.Send(response); err != nil {
+ glog.Errorf("Error sending response %v: %v", response, err)
+ }
}
}
}()
@@ -122,22 +179,57 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
}
// Process the received message
- if dataMessage := req.GetData(); dataMessage != nil {
- localTopicPartition.Publish(dataMessage)
+ dataMessage := req.GetData()
+ if dataMessage == nil {
+ continue
}
- ackCounter++
- ackSequence++
- if ackCounter >= ackInterval {
- ackCounter = 0
- // send back the ack
- response := &mq_pb.PublishMessageResponse{
- AckSequence: ackSequence,
+ // send to the local partition
+ localTopicPartition.Publish(dataMessage)
+ receivedSequence = dataMessage.TsNs
+
+ // maybe send to the follower
+ if localTopicPartition.FollowerStream != nil {
+ println("recv", string(dataMessage.Key), dataMessage.TsNs)
+ if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Data{
+ Data: dataMessage,
+ },
+ }); followErr != nil {
+ return followErr
+ }
+ } else {
+ ackCounter++
+ if ackCounter >= ackInterval {
+ ackCounter = 0
+ // send back the ack directly
+ acknowledgedSequence = receivedSequence
+ response := &mq_pb.PublishMessageResponse{
+ AckSequence: acknowledgedSequence,
+ }
+ if err := stream.Send(response); err != nil {
+ glog.Errorf("Error sending response %v: %v", response, err)
+ }
}
- respChan <- response
}
}
+ if localTopicPartition.FollowerStream != nil {
+ // send close to the follower
+ if followErr := localTopicPartition.FollowerStream.Send(&mq_pb.PublishFollowMeRequest{
+ Message: &mq_pb.PublishFollowMeRequest_Close{
+ Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
+ },
+ }); followErr != nil {
+ return followErr
+ }
+ println("closing follower stream")
+
+ //if err := followerStream.CloseSend(); err != nil {
+ // glog.Errorf("Error closing follower stream: %v", err)
+ //}
+ }
+
glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition)
return nil
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go
index 8ef85110a..3e7977eba 100644
--- a/weed/mq/broker/broker_grpc_pub_follow.go
+++ b/weed/mq/broker/broker_grpc_pub_follow.go
@@ -1,96 +1,49 @@
package broker
import (
- "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io"
- "math/rand"
- "sync"
- "time"
)
-func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.PublishFollowMeRequest) (*mq_pb.PublishFollowMeResponse, error) {
- glog.V(0).Infof("PublishFollowMe %v", request)
- var wg sync.WaitGroup
- wg.Add(1)
- var ret error
- go b.withBrokerClient(true, pb.ServerAddress(request.BrokerSelf), func(client mq_pb.SeaweedMessagingClient) error {
- followerId := rand.Int31()
- subscribeClient, err := client.FollowInMemoryMessages(context.Background(), &mq_pb.FollowInMemoryMessagesRequest{
- Message: &mq_pb.FollowInMemoryMessagesRequest_Init{
- Init: &mq_pb.FollowInMemoryMessagesRequest_InitMessage{
- ConsumerGroup: string(b.option.BrokerAddress()),
- ConsumerId: fmt.Sprintf("followMe-%d", followerId),
- FollowerId: followerId,
- Topic: request.Topic,
- PartitionOffset: &mq_pb.PartitionOffset{
- Partition: request.Partition,
- StartTsNs: 0,
- StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
- },
- },
- },
- })
-
- if err != nil {
- glog.Errorf("FollowInMemoryMessages error: %v", err)
- ret = err
- return err
- }
-
- // receive first hello message
- resp, err := subscribeClient.Recv()
- if err != nil {
- return fmt.Errorf("FollowInMemoryMessages recv first message error: %v", err)
- }
- if resp == nil {
- glog.V(0).Infof("doFollowInMemoryMessage recv first message nil response")
- return io.ErrUnexpectedEOF
- }
- wg.Done()
-
- b.doFollowInMemoryMessage(context.Background(), subscribeClient)
-
- return nil
- })
- wg.Wait()
- return &mq_pb.PublishFollowMeResponse{}, ret
-}
+func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) error {
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+ initMessage := req.GetInit()
+ if initMessage == nil {
+ return fmt.Errorf("missing init message")
+ }
-func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) {
+ // t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
+ // follow each published messages
for {
- resp, err := client.Recv()
+ // receive a message
+ req, err := stream.Recv()
if err != nil {
- if err != io.EOF {
- glog.V(0).Infof("doFollowInMemoryMessage error: %v", err)
+ if err == io.EOF {
+ break
}
- return
- }
- if resp == nil {
- glog.V(0).Infof("doFollowInMemoryMessage nil response")
- return
+ glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
+ return err
}
- if resp.Message != nil {
- // process ctrl message or data message
- switch m := resp.Message.(type) {
- case *mq_pb.FollowInMemoryMessagesResponse_Data:
- // process data message
- print("d")
- case *mq_pb.FollowInMemoryMessagesResponse_Ctrl:
- // process ctrl message
- if m.Ctrl.FlushedSequence > 0 {
- flushTime := time.Unix(0, m.Ctrl.FlushedSequence)
- glog.V(0).Infof("doFollowInMemoryMessage flushTime: %v", flushTime)
- }
- if m.Ctrl.FollowerChangedToId != 0 {
- // follower changed
- glog.V(0).Infof("doFollowInMemoryMessage follower changed to %d", m.Ctrl.FollowerChangedToId)
- return
- }
+
+ // Process the received message
+ if dataMessage := req.GetData(); dataMessage != nil {
+ // send back the ack
+ if err := stream.Send(&mq_pb.PublishFollowMeResponse{
+ AckTsNs: dataMessage.TsNs,
+ }); err != nil {
+ // TODO save un-acked messages to disk
+ glog.Errorf("Error sending response %v: %v", dataMessage, err)
}
+ println("ack", string(dataMessage.Key), dataMessage.TsNs)
+ } else if closeMessage := req.GetClose(); closeMessage != nil {
+ glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
+ break
}
}
+ return nil
}
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 1141ff47f..6c03ba409 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -8,7 +8,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
- "sync/atomic"
"time"
)
@@ -17,40 +16,20 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
ctx := stream.Context()
clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
+ initMessage := req.GetInit()
+ if initMessage == nil {
+ glog.Errorf("missing init message")
+ return fmt.Errorf("missing init message")
+ }
+
t := topic.FromPbTopic(req.GetInit().Topic)
partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
- waitIntervalCount := 0
-
- var localTopicPartition *topic.LocalPartition
- for localTopicPartition == nil {
- localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
- if err != nil {
- glog.V(1).Infof("topic %v partition %v not setup", t, partition)
- }
- if localTopicPartition != nil {
- break
- }
- waitIntervalCount++
- if waitIntervalCount > 10 {
- waitIntervalCount = 10
- }
- time.Sleep(time.Duration(waitIntervalCount) * 337 * time.Millisecond)
- // Check if the client has disconnected by monitoring the context
- select {
- case <-ctx.Done():
- err := ctx.Err()
- if err == context.Canceled {
- // Client disconnected
- return nil
- }
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
- return nil
- default:
- // Continue processing the request
- }
+ localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition)
+ if getOrGenErr != nil {
+ return getOrGenErr
}
localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
@@ -129,174 +108,3 @@ func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer
}
return
}
-
-func (b *MessageQueueBroker) FollowInMemoryMessages(req *mq_pb.FollowInMemoryMessagesRequest, stream mq_pb.SeaweedMessaging_FollowInMemoryMessagesServer) (err error) {
- ctx := stream.Context()
- clientName := req.GetInit().ConsumerId
-
- t := topic.FromPbTopic(req.GetInit().Topic)
- partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
-
- glog.V(0).Infof("FollowInMemoryMessages %s on %v %v connected", clientName, t, partition)
-
- waitIntervalCount := 0
-
- var localTopicPartition *topic.LocalPartition
- for localTopicPartition == nil {
- localTopicPartition, _, err = b.GetOrGenLocalPartition(t, partition)
- if err != nil {
- glog.V(1).Infof("topic %v partition %v not setup", t, partition)
- }
- if localTopicPartition != nil {
- break
- }
- waitIntervalCount++
- if waitIntervalCount > 32 {
- waitIntervalCount = 32
- }
- time.Sleep(time.Duration(waitIntervalCount) * 137 * time.Millisecond)
- // Check if the client has disconnected by monitoring the context
- select {
- case <-ctx.Done():
- err := ctx.Err()
- if err == context.Canceled {
- // Client disconnected
- return nil
- }
- glog.V(0).Infof("FollowInMemoryMessages %s disconnected: %v", clientName, err)
- return nil
- default:
- // Continue processing the request
- }
- }
-
- // set the current follower id
- followerId := req.GetInit().FollowerId
- atomic.StoreInt32(&localTopicPartition.FollowerId, followerId)
-
- glog.V(0).Infof("FollowInMemoryMessages %s connected on %v %v", clientName, t, partition)
- isConnected := true
- sleepIntervalCount := 0
-
- var counter int64
- defer func() {
- isConnected = false
- glog.V(0).Infof("FollowInMemoryMessages %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
- }()
-
- // send first hello message
- // to indicate the follower is connected
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{},
- },
- })
-
- var startPosition log_buffer.MessagePosition
- if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
- startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
- }
-
- var prevFlushTsNs int64
-
- _, _, err = localTopicPartition.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, func() bool {
- if !isConnected {
- return false
- }
- sleepIntervalCount++
- if sleepIntervalCount > 32 {
- sleepIntervalCount = 32
- }
- time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
-
- if localTopicPartition.LogBuffer.IsStopping() {
- newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
- glog.V(0).Infof("FollowInMemoryMessages1 %s on %v %v follower id changed to %d", clientName, t, partition, newFollowerId)
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
- FollowerChangedToId: newFollowerId,
- },
- },
- })
- return false
- }
-
- // Check if the client has disconnected by monitoring the context
- select {
- case <-ctx.Done():
- err := ctx.Err()
- if err == context.Canceled {
- // Client disconnected
- return false
- }
- glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
- return false
- default:
- // Continue processing the request
- }
-
- // send the last flushed sequence
- flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
- if flushTsNs != prevFlushTsNs {
- prevFlushTsNs = flushTsNs
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
- FlushedSequence: flushTsNs,
- },
- },
- })
- }
-
- return true
- }, func(logEntry *filer_pb.LogEntry) (bool, error) {
- // reset the sleep interval count
- sleepIntervalCount = 0
-
- // check the follower id
- newFollowerId := atomic.LoadInt32(&localTopicPartition.FollowerId)
- if newFollowerId != followerId {
- glog.V(0).Infof("FollowInMemoryMessages2 %s on %v %v follower id %d changed to %d", clientName, t, partition, followerId, newFollowerId)
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
- FollowerChangedToId: newFollowerId,
- },
- },
- })
- return true, nil
- }
-
- // send the last flushed sequence
- flushTsNs := atomic.LoadInt64(&localTopicPartition.LogBuffer.LastFlushTsNs)
- if flushTsNs != prevFlushTsNs {
- prevFlushTsNs = flushTsNs
- stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Ctrl{
- Ctrl: &mq_pb.FollowInMemoryMessagesResponse_CtrlMessage{
- FlushedSequence: flushTsNs,
- },
- },
- })
- }
-
- // send the log entry
- if err := stream.Send(&mq_pb.FollowInMemoryMessagesResponse{
- Message: &mq_pb.FollowInMemoryMessagesResponse_Data{
- Data: &mq_pb.DataMessage{
- Key: logEntry.Key,
- Value: logEntry.Data,
- TsNs: logEntry.TsNs,
- },
- }}); err != nil {
- glog.Errorf("Error sending setup response: %v", err)
- return false, err
- }
-
- counter++
- return false, nil
- })
-
- return err
-}
diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go
index 89c221af5..3fd97f1c2 100644
--- a/weed/mq/broker/broker_grpc_sub_coordinator.go
+++ b/weed/mq/broker/broker_grpc_sub_coordinator.go
@@ -39,18 +39,11 @@ func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMess
go func() {
// try to load the partition assignment from filer
if conf, err := b.readTopicConfFromFiler(topic.FromPbTopic(initMessage.Topic)); err == nil {
- assignedPartitions := make([]*mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition, len(conf.BrokerPartitionAssignments))
- for i, assignment := range conf.BrokerPartitionAssignments {
- assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{
- Partition: assignment.Partition,
- Broker: assignment.LeaderBroker,
- }
- }
// send partition assignment to subscriber
cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{
Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{
- AssignedPartitions: assignedPartitions,
+ PartitionAssignments: conf.BrokerPartitionAssignments,
},
},
}
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index 35d95c0e4..987c60243 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -56,12 +56,27 @@ func (b *MessageQueueBroker) readTopicConfFromFiler(t topic.Topic) (conf *mq_pb.
return conf, nil
}
-func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
+func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error) {
+ // get or generate a local partition
+ conf, readConfErr := b.readTopicConfFromFiler(t)
+ if readConfErr != nil {
+ glog.Errorf("topic %v not found: %v", t, readConfErr)
+ return nil, fmt.Errorf("topic %v not found: %v", t, readConfErr)
+ }
+ localTopicPartition, _, getOrGenError = b.doGetOrGenLocalPartition(t, partition, conf)
+ if getOrGenError != nil {
+ glog.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
+ return nil, fmt.Errorf("topic %v partition %v not setup: %v", t, partition, getOrGenError)
+ }
+ return localTopicPartition, nil
+}
+
+func (b *MessageQueueBroker) doGetOrGenLocalPartition(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
b.accessLock.Lock()
defer b.accessLock.Unlock()
if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
- localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition)
+ localPartition, isGenerated, err = b.genLocalPartitionFromFiler(t, partition, conf)
if err != nil {
return nil, false, err
}
@@ -69,12 +84,8 @@ func (b *MessageQueueBroker) GetOrGenLocalPartition(t topic.Topic, partition top
return localPartition, isGenerated, nil
}
-func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
+func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition topic.Partition, conf *mq_pb.ConfigureTopicResponse) (localPartition *topic.LocalPartition, isGenerated bool, err error) {
self := b.option.BrokerAddress()
- conf, err := b.readTopicConfFromFiler(t)
- if err != nil {
- return nil, isGenerated, err
- }
for _, assignment := range conf.BrokerPartitionAssignments {
if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
localPartition = topic.FromPbBrokerPartitionAssignment(b.option.BrokerAddress(), partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index 1c5891049..df5b0ca06 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -5,6 +5,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "time"
)
func (p *TopicPublisher) Publish(key, value []byte) error {
@@ -20,5 +21,6 @@ func (p *TopicPublisher) Publish(key, value []byte) error {
return inputBuffer.Enqueue(&mq_pb.DataMessage{
Key: key,
Value: value,
+ TsNs: time.Now().UnixNano(),
})
}
diff --git a/weed/mq/client/pub_client/scheduler.go b/weed/mq/client/pub_client/scheduler.go
index e6caf896c..e91127522 100644
--- a/weed/mq/client/pub_client/scheduler.go
+++ b/weed/mq/client/pub_client/scheduler.go
@@ -12,6 +12,7 @@ import (
"log"
"sort"
"sync"
+ "sync/atomic"
"time"
)
@@ -150,6 +151,7 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
}); err != nil {
return fmt.Errorf("send init message: %v", err)
}
+ // process the hello message
resp, err := stream.Recv()
if err != nil {
return fmt.Errorf("recv init response: %v", err)
@@ -158,26 +160,35 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return fmt.Errorf("init response error: %v", resp.Error)
}
+ var publishedTsNs int64
+ hasMoreData := int32(1)
+ var wg sync.WaitGroup
+ wg.Add(1)
go func() {
+ defer wg.Done()
for {
ackResp, err := publishClient.Recv()
if err != nil {
e, _ := status.FromError(err)
if e.Code() == codes.Unknown && e.Message() == "EOF" {
+ log.Printf("publish to %s EOF", publishClient.Broker)
return
}
publishClient.Err = err
- fmt.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
+ log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
return
}
if ackResp.Error != "" {
publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
- fmt.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
+ log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
return
}
if ackResp.AckSequence > 0 {
log.Printf("ack %d", ackResp.AckSequence)
}
+ if atomic.LoadInt64(&publishedTsNs) == ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
+ return
+ }
}
}()
@@ -191,14 +202,18 @@ func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) erro
return fmt.Errorf("send publish data: %v", err)
}
publishCounter++
+ atomic.StoreInt64(&publishedTsNs, data.TsNs)
}
-
- if err := publishClient.CloseSend(); err != nil {
- return fmt.Errorf("close send: %v", err)
+ atomic.StoreInt32(&hasMoreData, 0)
+ if publishCounter > 0 {
+ wg.Wait()
+ } else {
+ // CloseSend would cancel the context on the server side
+ if err := publishClient.CloseSend(); err != nil {
+ return fmt.Errorf("close send: %v", err)
+ }
}
- time.Sleep(3 * time.Second)
-
log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
return nil
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index 4cc3c8ff2..b0b533e42 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -91,26 +91,26 @@ func (sub *TopicSubscriber) onEachAssignment(assignment *mq_pb.SubscriberToSubCo
var wg sync.WaitGroup
semaphore := make(chan struct{}, sub.ProcessorConfig.ConcurrentPartitionLimit)
- for _, assigned := range assignment.AssignedPartitions {
+ for _, assigned := range assignment.PartitionAssignments {
wg.Add(1)
semaphore <- struct{}{}
- go func(partition *mq_pb.Partition, broker string) {
+ go func(assigned *mq_pb.BrokerPartitionAssignment) {
defer wg.Done()
defer func() { <-semaphore }()
- glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker)
- err := sub.onEachPartition(partition, broker)
+ glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
+ err := sub.onEachPartition(assigned)
if err != nil {
- glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker, err)
+ glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
}
- }(assigned.Partition, assigned.Broker)
+ }(assigned)
}
wg.Wait()
}
-func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker string) error {
+func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error {
// connect to the partition broker
- return pb.WithBrokerGrpcClient(true, broker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
subscribeClient, err := client.SubscribeMessage(context.Background(), &mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
Init: &mq_pb.SubscribeMessageRequest_InitMessage{
@@ -118,11 +118,12 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
Topic: sub.ContentConfig.Topic.ToPbTopic(),
PartitionOffset: &mq_pb.PartitionOffset{
- Partition: partition,
+ Partition: assigned.Partition,
StartTsNs: sub.alreadyProcessedTsNs,
StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
},
Filter: sub.ContentConfig.Filter,
+ FollowerBrokers: assigned.FollowerBrokers,
},
},
})
@@ -131,7 +132,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
return fmt.Errorf("create subscribe client: %v", err)
}
- glog.V(0).Infof("subscriber %s/%s connected to partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, partition, broker)
+ glog.V(0).Infof("subscriber %s/%s connected to partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
if sub.OnCompletionFunc != nil {
defer sub.OnCompletionFunc()
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index f897fe2b3..d24a38d8a 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -103,22 +103,22 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr
partitionSlots = make([]*PartitionSlotToConsumerInstance, 0)
}
consumerGroupInstance.Partitions = ToPartitions(partitionSlotToBrokerList.RingSize, partitionSlots)
- assignedPartitions := make([]*mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition, len(partitionSlots))
+ assignedPartitions := make([]*mq_pb.BrokerPartitionAssignment, len(partitionSlots))
for i, partitionSlot := range partitionSlots {
- assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{
+ assignedPartitions[i] = &mq_pb.BrokerPartitionAssignment{
Partition: &mq_pb.Partition{
RangeStop: partitionSlot.RangeStop,
RangeStart: partitionSlot.RangeStart,
RingSize: partitionSlotToBrokerList.RingSize,
UnixTimeNs: partitionSlot.UnixTimeNs,
},
- Broker: partitionSlot.Broker,
+ LeaderBroker: partitionSlot.Broker,
}
}
response := &mq_pb.SubscriberToSubCoordinatorResponse{
Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{
Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{
- AssignedPartitions: assignedPartitions,
+ PartitionAssignments: assignedPartitions,
},
},
}
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 798949736..a25786280 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -6,11 +6,19 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "google.golang.org/grpc"
+ "sync"
"sync/atomic"
"time"
)
type LocalPartition struct {
+ ListenersWaits int64
+
+ // notifying clients
+ ListenersLock sync.Mutex
+ ListenersCond *sync.Cond
+
Partition
isLeader bool
FollowerBrokers []pb.ServerAddress
@@ -19,20 +27,29 @@ type LocalPartition struct {
Publishers *LocalPartitionPublishers
Subscribers *LocalPartitionSubscribers
FollowerId int32
+
+ FollowerStream mq_pb.SeaweedMessaging_PublishFollowMeClient
+ GrpcConnection *grpc.ClientConn
}
var TIME_FORMAT = "2006-01-02-15-04-05"
func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
- return &LocalPartition{
+ lp := &LocalPartition{
Partition: partition,
isLeader: isLeader,
FollowerBrokers: followerBrokers,
- LogBuffer: log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
- 2*time.Minute, logFlushFn, readFromDiskFn, func() {}),
Publishers: NewLocalPartitionPublishers(),
Subscribers: NewLocalPartitionSubscribers(),
}
+ lp.ListenersCond = sync.NewCond(&lp.ListenersLock)
+ lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
+ 2*time.Minute, logFlushFn, readFromDiskFn, func() {
+ if atomic.LoadInt64(&lp.ListenersWaits) > 0 {
+ lp.ListenersCond.Broadcast()
+ }
+ })
+ return lp
}
func (p *LocalPartition) Publish(message *mq_pb.DataMessage) {
@@ -123,4 +140,5 @@ func (p *LocalPartition) Shutdown() {
p.closeSubscribers()
p.LogBuffer.ShutdownLogBuffer()
atomic.StoreInt32(&p.FollowerId, 0)
+ glog.V(0).Infof("local partition %v shutting down", p.Partition)
}
diff --git a/weed/pb/mq.proto b/weed/pb/mq.proto
index 369f82fb3..45253059f 100644
--- a/weed/pb/mq.proto
+++ b/weed/pb/mq.proto
@@ -46,9 +46,7 @@ service SeaweedMessaging {
rpc SubscribeMessage (SubscribeMessageRequest) returns (stream SubscribeMessageResponse) {
}
// The lead broker asks a follower broker to follow itself
- rpc PublishFollowMe (PublishFollowMeRequest) returns (PublishFollowMeResponse) {
- }
- rpc FollowInMemoryMessages (FollowInMemoryMessagesRequest) returns (stream FollowInMemoryMessagesResponse) {
+ rpc PublishFollowMe (stream PublishFollowMeRequest) returns (stream PublishFollowMeResponse) {
}
}
@@ -172,14 +170,9 @@ message SubscriberToSubCoordinatorRequest {
}
}
message SubscriberToSubCoordinatorResponse {
- message AssignedPartition {
- Partition partition = 1;
- int64 ts_ns = 2;
- string broker = 3;
- }
message Assignment {
int64 generation = 1;
- repeated AssignedPartition assigned_partitions = 2;
+ repeated BrokerPartitionAssignment partition_assignments = 2;
}
oneof message {
Assignment assignment = 1;
@@ -211,12 +204,24 @@ message PublishMessageResponse {
bool should_close = 3;
}
message PublishFollowMeRequest {
- Topic topic = 1;
- Partition partition = 2;
- string broker_self = 3;
+ message InitMessage {
+ Topic topic = 1;
+ Partition partition = 2;
+ }
+ message FlushMessage {
+ int64 ts_ns = 1;
+ }
+ message CloseMessage {
+ }
+ oneof message {
+ InitMessage init = 1;
+ DataMessage data = 2;
+ FlushMessage flush = 3;
+ CloseMessage close = 4;
+ }
}
message PublishFollowMeResponse {
- string error = 1;
+ int64 ack_ts_ns = 1;
}
message SubscribeMessageRequest {
message InitMessage {
@@ -226,6 +231,7 @@ message SubscribeMessageRequest {
Topic topic = 4;
PartitionOffset partition_offset = 5;
string filter = 6;
+ repeated string follower_brokers = 7;
}
message AckMessage {
int64 sequence = 1;
@@ -246,32 +252,6 @@ message SubscribeMessageResponse {
DataMessage data = 2;
}
}
-message FollowInMemoryMessagesRequest {
- message InitMessage {
- string consumer_group = 1;
- string consumer_id = 2;
- int32 follower_id = 3;
- Topic topic = 4;
- PartitionOffset partition_offset = 5;
- }
- message AckMessage {
- int64 sequence = 1;
- }
- oneof message {
- InitMessage init = 1;
- AckMessage ack = 2;
- }
-}
-message FollowInMemoryMessagesResponse {
- message CtrlMessage {
- int64 flushed_sequence = 1;
- int32 follower_changed_to_id = 2;
- }
- oneof message {
- CtrlMessage ctrl = 1;
- DataMessage data = 2;
- }
-}
message ClosePublishersRequest {
Topic topic = 1;
int64 unix_time_ns = 2;
diff --git a/weed/pb/mq_pb/mq.pb.go b/weed/pb/mq_pb/mq.pb.go
index ed9f5c002..b5182e6af 100644
--- a/weed/pb/mq_pb/mq.pb.go
+++ b/weed/pb/mq_pb/mq.pb.go
@@ -1569,9 +1569,13 @@ type PublishFollowMeRequest struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
- Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"`
- BrokerSelf string `protobuf:"bytes,3,opt,name=broker_self,json=brokerSelf,proto3" json:"broker_self,omitempty"`
+ // Types that are assignable to Message:
+ //
+ // *PublishFollowMeRequest_Init
+ // *PublishFollowMeRequest_Data
+ // *PublishFollowMeRequest_Flush
+ // *PublishFollowMeRequest_Close
+ Message isPublishFollowMeRequest_Message `protobuf_oneof:"message"`
}
func (x *PublishFollowMeRequest) Reset() {
@@ -1606,33 +1610,75 @@ func (*PublishFollowMeRequest) Descriptor() ([]byte, []int) {
return file_mq_proto_rawDescGZIP(), []int{26}
}
-func (x *PublishFollowMeRequest) GetTopic() *Topic {
- if x != nil {
- return x.Topic
+func (m *PublishFollowMeRequest) GetMessage() isPublishFollowMeRequest_Message {
+ if m != nil {
+ return m.Message
}
return nil
}
-func (x *PublishFollowMeRequest) GetPartition() *Partition {
- if x != nil {
- return x.Partition
+func (x *PublishFollowMeRequest) GetInit() *PublishFollowMeRequest_InitMessage {
+ if x, ok := x.GetMessage().(*PublishFollowMeRequest_Init); ok {
+ return x.Init
}
return nil
}
-func (x *PublishFollowMeRequest) GetBrokerSelf() string {
- if x != nil {
- return x.BrokerSelf
+func (x *PublishFollowMeRequest) GetData() *DataMessage {
+ if x, ok := x.GetMessage().(*PublishFollowMeRequest_Data); ok {
+ return x.Data
}
- return ""
+ return nil
+}
+
+func (x *PublishFollowMeRequest) GetFlush() *PublishFollowMeRequest_FlushMessage {
+ if x, ok := x.GetMessage().(*PublishFollowMeRequest_Flush); ok {
+ return x.Flush
+ }
+ return nil
+}
+
+func (x *PublishFollowMeRequest) GetClose() *PublishFollowMeRequest_CloseMessage {
+ if x, ok := x.GetMessage().(*PublishFollowMeRequest_Close); ok {
+ return x.Close
+ }
+ return nil
+}
+
+type isPublishFollowMeRequest_Message interface {
+ isPublishFollowMeRequest_Message()
+}
+
+type PublishFollowMeRequest_Init struct {
+ Init *PublishFollowMeRequest_InitMessage `protobuf:"bytes,1,opt,name=init,proto3,oneof"`
+}
+
+type PublishFollowMeRequest_Data struct {
+ Data *DataMessage `protobuf:"bytes,2,opt,name=data,proto3,oneof"`
+}
+
+type PublishFollowMeRequest_Flush struct {
+ Flush *PublishFollowMeRequest_FlushMessage `protobuf:"bytes,3,opt,name=flush,proto3,oneof"`
+}
+
+type PublishFollowMeRequest_Close struct {
+ Close *PublishFollowMeRequest_CloseMessage `protobuf:"bytes,4,opt,name=close,proto3,oneof"`
}
+func (*PublishFollowMeRequest_Init) isPublishFollowMeRequest_Message() {}
+
+func (*PublishFollowMeRequest_Data) isPublishFollowMeRequest_Message() {}
+
+func (*PublishFollowMeRequest_Flush) isPublishFollowMeRequest_Message() {}
+
+func (*PublishFollowMeRequest_Close) isPublishFollowMeRequest_Message() {}
+
type PublishFollowMeResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
+ AckTsNs int64 `protobuf:"varint,1,opt,name=ack_ts_ns,json=ackTsNs,proto3" json:"ack_ts_ns,omitempty"`
}
func (x *PublishFollowMeResponse) Reset() {
@@ -1667,11 +1713,11 @@ func (*PublishFollowMeResponse) Descriptor() ([]byte, []int) {
return file_mq_proto_rawDescGZIP(), []int{27}
}
-func (x *PublishFollowMeResponse) GetError() string {
+func (x *PublishFollowMeResponse) GetAckTsNs() int64 {
if x != nil {
- return x.Error
+ return x.AckTsNs
}
- return ""
+ return 0
}
type SubscribeMessageRequest struct {
@@ -1836,168 +1882,6 @@ func (*SubscribeMessageResponse_Ctrl) isSubscribeMessageResponse_Message() {}
func (*SubscribeMessageResponse_Data) isSubscribeMessageResponse_Message() {}
-type FollowInMemoryMessagesRequest struct {
- state protoimpl.MessageState
- sizeCache protoimpl.SizeCache
- unknownFields protoimpl.UnknownFields
-
- // Types that are assignable to Message:
- //
- // *FollowInMemoryMessagesRequest_Init
- // *FollowInMemoryMessagesRequest_Ack
- Message isFollowInMemoryMessagesRequest_Message `protobuf_oneof:"message"`
-}
-
-func (x *FollowInMemoryMessagesRequest) Reset() {
- *x = FollowInMemoryMessagesRequest{}
- if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[30]
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- ms.StoreMessageInfo(mi)
- }
-}
-
-func (x *FollowInMemoryMessagesRequest) String() string {
- return protoimpl.X.MessageStringOf(x)
-}
-
-func (*FollowInMemoryMessagesRequest) ProtoMessage() {}
-
-func (x *FollowInMemoryMessagesRequest) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[30]
- if protoimpl.UnsafeEnabled && x != nil {
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- if ms.LoadMessageInfo() == nil {
- ms.StoreMessageInfo(mi)
- }
- return ms
- }
- return mi.MessageOf(x)
-}
-
-// Deprecated: Use FollowInMemoryMessagesRequest.ProtoReflect.Descriptor instead.
-func (*FollowInMemoryMessagesRequest) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{30}
-}
-
-func (m *FollowInMemoryMessagesRequest) GetMessage() isFollowInMemoryMessagesRequest_Message {
- if m != nil {
- return m.Message
- }
- return nil
-}
-
-func (x *FollowInMemoryMessagesRequest) GetInit() *FollowInMemoryMessagesRequest_InitMessage {
- if x, ok := x.GetMessage().(*FollowInMemoryMessagesRequest_Init); ok {
- return x.Init
- }
- return nil
-}
-
-func (x *FollowInMemoryMessagesRequest) GetAck() *FollowInMemoryMessagesRequest_AckMessage {
- if x, ok := x.GetMessage().(*FollowInMemoryMessagesRequest_Ack); ok {
- return x.Ack
- }
- return nil
-}
-
-type isFollowInMemoryMessagesRequest_Message interface {
- isFollowInMemoryMessagesRequest_Message()
-}
-
-type FollowInMemoryMessagesRequest_Init struct {
- Init *FollowInMemoryMessagesRequest_InitMessage `protobuf:"bytes,1,opt,name=init,proto3,oneof"`
-}
-
-type FollowInMemoryMessagesRequest_Ack struct {
- Ack *FollowInMemoryMessagesRequest_AckMessage `protobuf:"bytes,2,opt,name=ack,proto3,oneof"`
-}
-
-func (*FollowInMemoryMessagesRequest_Init) isFollowInMemoryMessagesRequest_Message() {}
-
-func (*FollowInMemoryMessagesRequest_Ack) isFollowInMemoryMessagesRequest_Message() {}
-
-type FollowInMemoryMessagesResponse struct {
- state protoimpl.MessageState
- sizeCache protoimpl.SizeCache
- unknownFields protoimpl.UnknownFields
-
- // Types that are assignable to Message:
- //
- // *FollowInMemoryMessagesResponse_Ctrl
- // *FollowInMemoryMessagesResponse_Data
- Message isFollowInMemoryMessagesResponse_Message `protobuf_oneof:"message"`
-}
-
-func (x *FollowInMemoryMessagesResponse) Reset() {
- *x = FollowInMemoryMessagesResponse{}
- if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[31]
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- ms.StoreMessageInfo(mi)
- }
-}
-
-func (x *FollowInMemoryMessagesResponse) String() string {
- return protoimpl.X.MessageStringOf(x)
-}
-
-func (*FollowInMemoryMessagesResponse) ProtoMessage() {}
-
-func (x *FollowInMemoryMessagesResponse) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[31]
- if protoimpl.UnsafeEnabled && x != nil {
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- if ms.LoadMessageInfo() == nil {
- ms.StoreMessageInfo(mi)
- }
- return ms
- }
- return mi.MessageOf(x)
-}
-
-// Deprecated: Use FollowInMemoryMessagesResponse.ProtoReflect.Descriptor instead.
-func (*FollowInMemoryMessagesResponse) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{31}
-}
-
-func (m *FollowInMemoryMessagesResponse) GetMessage() isFollowInMemoryMessagesResponse_Message {
- if m != nil {
- return m.Message
- }
- return nil
-}
-
-func (x *FollowInMemoryMessagesResponse) GetCtrl() *FollowInMemoryMessagesResponse_CtrlMessage {
- if x, ok := x.GetMessage().(*FollowInMemoryMessagesResponse_Ctrl); ok {
- return x.Ctrl
- }
- return nil
-}
-
-func (x *FollowInMemoryMessagesResponse) GetData() *DataMessage {
- if x, ok := x.GetMessage().(*FollowInMemoryMessagesResponse_Data); ok {
- return x.Data
- }
- return nil
-}
-
-type isFollowInMemoryMessagesResponse_Message interface {
- isFollowInMemoryMessagesResponse_Message()
-}
-
-type FollowInMemoryMessagesResponse_Ctrl struct {
- Ctrl *FollowInMemoryMessagesResponse_CtrlMessage `protobuf:"bytes,1,opt,name=ctrl,proto3,oneof"`
-}
-
-type FollowInMemoryMessagesResponse_Data struct {
- Data *DataMessage `protobuf:"bytes,2,opt,name=data,proto3,oneof"`
-}
-
-func (*FollowInMemoryMessagesResponse_Ctrl) isFollowInMemoryMessagesResponse_Message() {}
-
-func (*FollowInMemoryMessagesResponse_Data) isFollowInMemoryMessagesResponse_Message() {}
-
type ClosePublishersRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -2010,7 +1894,7 @@ type ClosePublishersRequest struct {
func (x *ClosePublishersRequest) Reset() {
*x = ClosePublishersRequest{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[32]
+ mi := &file_mq_proto_msgTypes[30]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -2023,7 +1907,7 @@ func (x *ClosePublishersRequest) String() string {
func (*ClosePublishersRequest) ProtoMessage() {}
func (x *ClosePublishersRequest) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[32]
+ mi := &file_mq_proto_msgTypes[30]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2036,7 +1920,7 @@ func (x *ClosePublishersRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use ClosePublishersRequest.ProtoReflect.Descriptor instead.
func (*ClosePublishersRequest) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{32}
+ return file_mq_proto_rawDescGZIP(), []int{30}
}
func (x *ClosePublishersRequest) GetTopic() *Topic {
@@ -2062,7 +1946,7 @@ type ClosePublishersResponse struct {
func (x *ClosePublishersResponse) Reset() {
*x = ClosePublishersResponse{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[33]
+ mi := &file_mq_proto_msgTypes[31]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -2075,7 +1959,7 @@ func (x *ClosePublishersResponse) String() string {
func (*ClosePublishersResponse) ProtoMessage() {}
func (x *ClosePublishersResponse) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[33]
+ mi := &file_mq_proto_msgTypes[31]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2088,7 +1972,7 @@ func (x *ClosePublishersResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use ClosePublishersResponse.ProtoReflect.Descriptor instead.
func (*ClosePublishersResponse) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{33}
+ return file_mq_proto_rawDescGZIP(), []int{31}
}
type CloseSubscribersRequest struct {
@@ -2103,7 +1987,7 @@ type CloseSubscribersRequest struct {
func (x *CloseSubscribersRequest) Reset() {
*x = CloseSubscribersRequest{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[34]
+ mi := &file_mq_proto_msgTypes[32]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -2116,7 +2000,7 @@ func (x *CloseSubscribersRequest) String() string {
func (*CloseSubscribersRequest) ProtoMessage() {}
func (x *CloseSubscribersRequest) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[34]
+ mi := &file_mq_proto_msgTypes[32]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2129,7 +2013,7 @@ func (x *CloseSubscribersRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use CloseSubscribersRequest.ProtoReflect.Descriptor instead.
func (*CloseSubscribersRequest) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{34}
+ return file_mq_proto_rawDescGZIP(), []int{32}
}
func (x *CloseSubscribersRequest) GetTopic() *Topic {
@@ -2155,7 +2039,7 @@ type CloseSubscribersResponse struct {
func (x *CloseSubscribersResponse) Reset() {
*x = CloseSubscribersResponse{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[35]
+ mi := &file_mq_proto_msgTypes[33]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -2168,7 +2052,7 @@ func (x *CloseSubscribersResponse) String() string {
func (*CloseSubscribersResponse) ProtoMessage() {}
func (x *CloseSubscribersResponse) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[35]
+ mi := &file_mq_proto_msgTypes[33]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2181,7 +2065,7 @@ func (x *CloseSubscribersResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use CloseSubscribersResponse.ProtoReflect.Descriptor instead.
func (*CloseSubscribersResponse) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{35}
+ return file_mq_proto_rawDescGZIP(), []int{33}
}
type PublisherToPubBalancerRequest_InitMessage struct {
@@ -2195,7 +2079,7 @@ type PublisherToPubBalancerRequest_InitMessage struct {
func (x *PublisherToPubBalancerRequest_InitMessage) Reset() {
*x = PublisherToPubBalancerRequest_InitMessage{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[37]
+ mi := &file_mq_proto_msgTypes[35]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -2208,7 +2092,7 @@ func (x *PublisherToPubBalancerRequest_InitMessage) String() string {
func (*PublisherToPubBalancerRequest_InitMessage) ProtoMessage() {}
func (x *PublisherToPubBalancerRequest_InitMessage) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[37]
+ mi := &file_mq_proto_msgTypes[35]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2244,7 +2128,7 @@ type SubscriberToSubCoordinatorRequest_InitMessage struct {
func (x *SubscriberToSubCoordinatorRequest_InitMessage) Reset() {
*x = SubscriberToSubCoordinatorRequest_InitMessage{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[38]
+ mi := &file_mq_proto_msgTypes[36]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -2257,7 +2141,7 @@ func (x *SubscriberToSubCoordinatorRequest_InitMessage) String() string {
func (*SubscriberToSubCoordinatorRequest_InitMessage) ProtoMessage() {}
func (x *SubscriberToSubCoordinatorRequest_InitMessage) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[38]
+ mi := &file_mq_proto_msgTypes[36]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2306,7 +2190,7 @@ type SubscriberToSubCoordinatorRequest_AckMessage struct {
func (x *SubscriberToSubCoordinatorRequest_AckMessage) Reset() {
*x = SubscriberToSubCoordinatorRequest_AckMessage{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[39]
+ mi := &file_mq_proto_msgTypes[37]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -2319,7 +2203,7 @@ func (x *SubscriberToSubCoordinatorRequest_AckMessage) String() string {
func (*SubscriberToSubCoordinatorRequest_AckMessage) ProtoMessage() {}
func (x *SubscriberToSubCoordinatorRequest_AckMessage) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[39]
+ mi := &file_mq_proto_msgTypes[37]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2349,82 +2233,19 @@ func (x *SubscriberToSubCoordinatorRequest_AckMessage) GetTsNs() int64 {
return 0
}
-type SubscriberToSubCoordinatorResponse_AssignedPartition struct {
- state protoimpl.MessageState
- sizeCache protoimpl.SizeCache
- unknownFields protoimpl.UnknownFields
-
- Partition *Partition `protobuf:"bytes,1,opt,name=partition,proto3" json:"partition,omitempty"`
- TsNs int64 `protobuf:"varint,2,opt,name=ts_ns,json=tsNs,proto3" json:"ts_ns,omitempty"`
- Broker string `protobuf:"bytes,3,opt,name=broker,proto3" json:"broker,omitempty"`
-}
-
-func (x *SubscriberToSubCoordinatorResponse_AssignedPartition) Reset() {
- *x = SubscriberToSubCoordinatorResponse_AssignedPartition{}
- if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[40]
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- ms.StoreMessageInfo(mi)
- }
-}
-
-func (x *SubscriberToSubCoordinatorResponse_AssignedPartition) String() string {
- return protoimpl.X.MessageStringOf(x)
-}
-
-func (*SubscriberToSubCoordinatorResponse_AssignedPartition) ProtoMessage() {}
-
-func (x *SubscriberToSubCoordinatorResponse_AssignedPartition) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[40]
- if protoimpl.UnsafeEnabled && x != nil {
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- if ms.LoadMessageInfo() == nil {
- ms.StoreMessageInfo(mi)
- }
- return ms
- }
- return mi.MessageOf(x)
-}
-
-// Deprecated: Use SubscriberToSubCoordinatorResponse_AssignedPartition.ProtoReflect.Descriptor instead.
-func (*SubscriberToSubCoordinatorResponse_AssignedPartition) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{22, 0}
-}
-
-func (x *SubscriberToSubCoordinatorResponse_AssignedPartition) GetPartition() *Partition {
- if x != nil {
- return x.Partition
- }
- return nil
-}
-
-func (x *SubscriberToSubCoordinatorResponse_AssignedPartition) GetTsNs() int64 {
- if x != nil {
- return x.TsNs
- }
- return 0
-}
-
-func (x *SubscriberToSubCoordinatorResponse_AssignedPartition) GetBroker() string {
- if x != nil {
- return x.Broker
- }
- return ""
-}
-
type SubscriberToSubCoordinatorResponse_Assignment struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Generation int64 `protobuf:"varint,1,opt,name=generation,proto3" json:"generation,omitempty"`
- AssignedPartitions []*SubscriberToSubCoordinatorResponse_AssignedPartition `protobuf:"bytes,2,rep,name=assigned_partitions,json=assignedPartitions,proto3" json:"assigned_partitions,omitempty"`
+ Generation int64 `protobuf:"varint,1,opt,name=generation,proto3" json:"generation,omitempty"`
+ PartitionAssignments []*BrokerPartitionAssignment `protobuf:"bytes,2,rep,name=partition_assignments,json=partitionAssignments,proto3" json:"partition_assignments,omitempty"`
}
func (x *SubscriberToSubCoordinatorResponse_Assignment) Reset() {
*x = SubscriberToSubCoordinatorResponse_Assignment{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[41]
+ mi := &file_mq_proto_msgTypes[38]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -2437,7 +2258,7 @@ func (x *SubscriberToSubCoordinatorResponse_Assignment) String() string {
func (*SubscriberToSubCoordinatorResponse_Assignment) ProtoMessage() {}
func (x *SubscriberToSubCoordinatorResponse_Assignment) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[41]
+ mi := &file_mq_proto_msgTypes[38]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2450,7 +2271,7 @@ func (x *SubscriberToSubCoordinatorResponse_Assignment) ProtoReflect() protorefl
// Deprecated: Use SubscriberToSubCoordinatorResponse_Assignment.ProtoReflect.Descriptor instead.
func (*SubscriberToSubCoordinatorResponse_Assignment) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{22, 1}
+ return file_mq_proto_rawDescGZIP(), []int{22, 0}
}
func (x *SubscriberToSubCoordinatorResponse_Assignment) GetGeneration() int64 {
@@ -2460,9 +2281,9 @@ func (x *SubscriberToSubCoordinatorResponse_Assignment) GetGeneration() int64 {
return 0
}
-func (x *SubscriberToSubCoordinatorResponse_Assignment) GetAssignedPartitions() []*SubscriberToSubCoordinatorResponse_AssignedPartition {
+func (x *SubscriberToSubCoordinatorResponse_Assignment) GetPartitionAssignments() []*BrokerPartitionAssignment {
if x != nil {
- return x.AssignedPartitions
+ return x.PartitionAssignments
}
return nil
}
@@ -2481,7 +2302,7 @@ type PublishMessageRequest_InitMessage struct {
func (x *PublishMessageRequest_InitMessage) Reset() {
*x = PublishMessageRequest_InitMessage{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[42]
+ mi := &file_mq_proto_msgTypes[39]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -2494,7 +2315,7 @@ func (x *PublishMessageRequest_InitMessage) String() string {
func (*PublishMessageRequest_InitMessage) ProtoMessage() {}
func (x *PublishMessageRequest_InitMessage) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[42]
+ mi := &file_mq_proto_msgTypes[39]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2538,36 +2359,32 @@ func (x *PublishMessageRequest_InitMessage) GetFollowerBrokers() []string {
return nil
}
-type SubscribeMessageRequest_InitMessage struct {
+type PublishFollowMeRequest_InitMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- ConsumerGroup string `protobuf:"bytes,1,opt,name=consumer_group,json=consumerGroup,proto3" json:"consumer_group,omitempty"`
- ConsumerId string `protobuf:"bytes,2,opt,name=consumer_id,json=consumerId,proto3" json:"consumer_id,omitempty"`
- ClientId string `protobuf:"bytes,3,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"`
- Topic *Topic `protobuf:"bytes,4,opt,name=topic,proto3" json:"topic,omitempty"`
- PartitionOffset *PartitionOffset `protobuf:"bytes,5,opt,name=partition_offset,json=partitionOffset,proto3" json:"partition_offset,omitempty"`
- Filter string `protobuf:"bytes,6,opt,name=filter,proto3" json:"filter,omitempty"`
+ Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
+ Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"`
}
-func (x *SubscribeMessageRequest_InitMessage) Reset() {
- *x = SubscribeMessageRequest_InitMessage{}
+func (x *PublishFollowMeRequest_InitMessage) Reset() {
+ *x = PublishFollowMeRequest_InitMessage{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[43]
+ mi := &file_mq_proto_msgTypes[40]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
-func (x *SubscribeMessageRequest_InitMessage) String() string {
+func (x *PublishFollowMeRequest_InitMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
-func (*SubscribeMessageRequest_InitMessage) ProtoMessage() {}
+func (*PublishFollowMeRequest_InitMessage) ProtoMessage() {}
-func (x *SubscribeMessageRequest_InitMessage) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[43]
+func (x *PublishFollowMeRequest_InitMessage) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[40]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2578,78 +2395,50 @@ func (x *SubscribeMessageRequest_InitMessage) ProtoReflect() protoreflect.Messag
return mi.MessageOf(x)
}
-// Deprecated: Use SubscribeMessageRequest_InitMessage.ProtoReflect.Descriptor instead.
-func (*SubscribeMessageRequest_InitMessage) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{28, 0}
-}
-
-func (x *SubscribeMessageRequest_InitMessage) GetConsumerGroup() string {
- if x != nil {
- return x.ConsumerGroup
- }
- return ""
-}
-
-func (x *SubscribeMessageRequest_InitMessage) GetConsumerId() string {
- if x != nil {
- return x.ConsumerId
- }
- return ""
-}
-
-func (x *SubscribeMessageRequest_InitMessage) GetClientId() string {
- if x != nil {
- return x.ClientId
- }
- return ""
+// Deprecated: Use PublishFollowMeRequest_InitMessage.ProtoReflect.Descriptor instead.
+func (*PublishFollowMeRequest_InitMessage) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{26, 0}
}
-func (x *SubscribeMessageRequest_InitMessage) GetTopic() *Topic {
+func (x *PublishFollowMeRequest_InitMessage) GetTopic() *Topic {
if x != nil {
return x.Topic
}
return nil
}
-func (x *SubscribeMessageRequest_InitMessage) GetPartitionOffset() *PartitionOffset {
+func (x *PublishFollowMeRequest_InitMessage) GetPartition() *Partition {
if x != nil {
- return x.PartitionOffset
+ return x.Partition
}
return nil
}
-func (x *SubscribeMessageRequest_InitMessage) GetFilter() string {
- if x != nil {
- return x.Filter
- }
- return ""
-}
-
-type SubscribeMessageRequest_AckMessage struct {
+type PublishFollowMeRequest_FlushMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Sequence int64 `protobuf:"varint,1,opt,name=sequence,proto3" json:"sequence,omitempty"`
+ TsNs int64 `protobuf:"varint,1,opt,name=ts_ns,json=tsNs,proto3" json:"ts_ns,omitempty"`
}
-func (x *SubscribeMessageRequest_AckMessage) Reset() {
- *x = SubscribeMessageRequest_AckMessage{}
+func (x *PublishFollowMeRequest_FlushMessage) Reset() {
+ *x = PublishFollowMeRequest_FlushMessage{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[44]
+ mi := &file_mq_proto_msgTypes[41]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
-func (x *SubscribeMessageRequest_AckMessage) String() string {
+func (x *PublishFollowMeRequest_FlushMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
-func (*SubscribeMessageRequest_AckMessage) ProtoMessage() {}
+func (*PublishFollowMeRequest_FlushMessage) ProtoMessage() {}
-func (x *SubscribeMessageRequest_AckMessage) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[44]
+func (x *PublishFollowMeRequest_FlushMessage) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[41]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2660,45 +2449,41 @@ func (x *SubscribeMessageRequest_AckMessage) ProtoReflect() protoreflect.Message
return mi.MessageOf(x)
}
-// Deprecated: Use SubscribeMessageRequest_AckMessage.ProtoReflect.Descriptor instead.
-func (*SubscribeMessageRequest_AckMessage) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{28, 1}
+// Deprecated: Use PublishFollowMeRequest_FlushMessage.ProtoReflect.Descriptor instead.
+func (*PublishFollowMeRequest_FlushMessage) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{26, 1}
}
-func (x *SubscribeMessageRequest_AckMessage) GetSequence() int64 {
+func (x *PublishFollowMeRequest_FlushMessage) GetTsNs() int64 {
if x != nil {
- return x.Sequence
+ return x.TsNs
}
return 0
}
-type SubscribeMessageResponse_CtrlMessage struct {
+type PublishFollowMeRequest_CloseMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
-
- Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
- IsEndOfStream bool `protobuf:"varint,2,opt,name=is_end_of_stream,json=isEndOfStream,proto3" json:"is_end_of_stream,omitempty"`
- IsEndOfTopic bool `protobuf:"varint,3,opt,name=is_end_of_topic,json=isEndOfTopic,proto3" json:"is_end_of_topic,omitempty"`
}
-func (x *SubscribeMessageResponse_CtrlMessage) Reset() {
- *x = SubscribeMessageResponse_CtrlMessage{}
+func (x *PublishFollowMeRequest_CloseMessage) Reset() {
+ *x = PublishFollowMeRequest_CloseMessage{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[45]
+ mi := &file_mq_proto_msgTypes[42]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
-func (x *SubscribeMessageResponse_CtrlMessage) String() string {
+func (x *PublishFollowMeRequest_CloseMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
-func (*SubscribeMessageResponse_CtrlMessage) ProtoMessage() {}
+func (*PublishFollowMeRequest_CloseMessage) ProtoMessage() {}
-func (x *SubscribeMessageResponse_CtrlMessage) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[45]
+func (x *PublishFollowMeRequest_CloseMessage) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[42]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2709,61 +2494,42 @@ func (x *SubscribeMessageResponse_CtrlMessage) ProtoReflect() protoreflect.Messa
return mi.MessageOf(x)
}
-// Deprecated: Use SubscribeMessageResponse_CtrlMessage.ProtoReflect.Descriptor instead.
-func (*SubscribeMessageResponse_CtrlMessage) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{29, 0}
-}
-
-func (x *SubscribeMessageResponse_CtrlMessage) GetError() string {
- if x != nil {
- return x.Error
- }
- return ""
-}
-
-func (x *SubscribeMessageResponse_CtrlMessage) GetIsEndOfStream() bool {
- if x != nil {
- return x.IsEndOfStream
- }
- return false
-}
-
-func (x *SubscribeMessageResponse_CtrlMessage) GetIsEndOfTopic() bool {
- if x != nil {
- return x.IsEndOfTopic
- }
- return false
+// Deprecated: Use PublishFollowMeRequest_CloseMessage.ProtoReflect.Descriptor instead.
+func (*PublishFollowMeRequest_CloseMessage) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{26, 2}
}
-type FollowInMemoryMessagesRequest_InitMessage struct {
+type SubscribeMessageRequest_InitMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ConsumerGroup string `protobuf:"bytes,1,opt,name=consumer_group,json=consumerGroup,proto3" json:"consumer_group,omitempty"`
ConsumerId string `protobuf:"bytes,2,opt,name=consumer_id,json=consumerId,proto3" json:"consumer_id,omitempty"`
- FollowerId int32 `protobuf:"varint,3,opt,name=follower_id,json=followerId,proto3" json:"follower_id,omitempty"`
+ ClientId string `protobuf:"bytes,3,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"`
Topic *Topic `protobuf:"bytes,4,opt,name=topic,proto3" json:"topic,omitempty"`
PartitionOffset *PartitionOffset `protobuf:"bytes,5,opt,name=partition_offset,json=partitionOffset,proto3" json:"partition_offset,omitempty"`
+ Filter string `protobuf:"bytes,6,opt,name=filter,proto3" json:"filter,omitempty"`
+ FollowerBrokers []string `protobuf:"bytes,7,rep,name=follower_brokers,json=followerBrokers,proto3" json:"follower_brokers,omitempty"`
}
-func (x *FollowInMemoryMessagesRequest_InitMessage) Reset() {
- *x = FollowInMemoryMessagesRequest_InitMessage{}
+func (x *SubscribeMessageRequest_InitMessage) Reset() {
+ *x = SubscribeMessageRequest_InitMessage{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[46]
+ mi := &file_mq_proto_msgTypes[43]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
-func (x *FollowInMemoryMessagesRequest_InitMessage) String() string {
+func (x *SubscribeMessageRequest_InitMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
-func (*FollowInMemoryMessagesRequest_InitMessage) ProtoMessage() {}
+func (*SubscribeMessageRequest_InitMessage) ProtoMessage() {}
-func (x *FollowInMemoryMessagesRequest_InitMessage) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[46]
+func (x *SubscribeMessageRequest_InitMessage) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[43]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2774,47 +2540,61 @@ func (x *FollowInMemoryMessagesRequest_InitMessage) ProtoReflect() protoreflect.
return mi.MessageOf(x)
}
-// Deprecated: Use FollowInMemoryMessagesRequest_InitMessage.ProtoReflect.Descriptor instead.
-func (*FollowInMemoryMessagesRequest_InitMessage) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{30, 0}
+// Deprecated: Use SubscribeMessageRequest_InitMessage.ProtoReflect.Descriptor instead.
+func (*SubscribeMessageRequest_InitMessage) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{28, 0}
}
-func (x *FollowInMemoryMessagesRequest_InitMessage) GetConsumerGroup() string {
+func (x *SubscribeMessageRequest_InitMessage) GetConsumerGroup() string {
if x != nil {
return x.ConsumerGroup
}
return ""
}
-func (x *FollowInMemoryMessagesRequest_InitMessage) GetConsumerId() string {
+func (x *SubscribeMessageRequest_InitMessage) GetConsumerId() string {
if x != nil {
return x.ConsumerId
}
return ""
}
-func (x *FollowInMemoryMessagesRequest_InitMessage) GetFollowerId() int32 {
+func (x *SubscribeMessageRequest_InitMessage) GetClientId() string {
if x != nil {
- return x.FollowerId
+ return x.ClientId
}
- return 0
+ return ""
}
-func (x *FollowInMemoryMessagesRequest_InitMessage) GetTopic() *Topic {
+func (x *SubscribeMessageRequest_InitMessage) GetTopic() *Topic {
if x != nil {
return x.Topic
}
return nil
}
-func (x *FollowInMemoryMessagesRequest_InitMessage) GetPartitionOffset() *PartitionOffset {
+func (x *SubscribeMessageRequest_InitMessage) GetPartitionOffset() *PartitionOffset {
if x != nil {
return x.PartitionOffset
}
return nil
}
-type FollowInMemoryMessagesRequest_AckMessage struct {
+func (x *SubscribeMessageRequest_InitMessage) GetFilter() string {
+ if x != nil {
+ return x.Filter
+ }
+ return ""
+}
+
+func (x *SubscribeMessageRequest_InitMessage) GetFollowerBrokers() []string {
+ if x != nil {
+ return x.FollowerBrokers
+ }
+ return nil
+}
+
+type SubscribeMessageRequest_AckMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
@@ -2822,23 +2602,23 @@ type FollowInMemoryMessagesRequest_AckMessage struct {
Sequence int64 `protobuf:"varint,1,opt,name=sequence,proto3" json:"sequence,omitempty"`
}
-func (x *FollowInMemoryMessagesRequest_AckMessage) Reset() {
- *x = FollowInMemoryMessagesRequest_AckMessage{}
+func (x *SubscribeMessageRequest_AckMessage) Reset() {
+ *x = SubscribeMessageRequest_AckMessage{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[47]
+ mi := &file_mq_proto_msgTypes[44]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
-func (x *FollowInMemoryMessagesRequest_AckMessage) String() string {
+func (x *SubscribeMessageRequest_AckMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
-func (*FollowInMemoryMessagesRequest_AckMessage) ProtoMessage() {}
+func (*SubscribeMessageRequest_AckMessage) ProtoMessage() {}
-func (x *FollowInMemoryMessagesRequest_AckMessage) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[47]
+func (x *SubscribeMessageRequest_AckMessage) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[44]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2849,44 +2629,45 @@ func (x *FollowInMemoryMessagesRequest_AckMessage) ProtoReflect() protoreflect.M
return mi.MessageOf(x)
}
-// Deprecated: Use FollowInMemoryMessagesRequest_AckMessage.ProtoReflect.Descriptor instead.
-func (*FollowInMemoryMessagesRequest_AckMessage) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{30, 1}
+// Deprecated: Use SubscribeMessageRequest_AckMessage.ProtoReflect.Descriptor instead.
+func (*SubscribeMessageRequest_AckMessage) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{28, 1}
}
-func (x *FollowInMemoryMessagesRequest_AckMessage) GetSequence() int64 {
+func (x *SubscribeMessageRequest_AckMessage) GetSequence() int64 {
if x != nil {
return x.Sequence
}
return 0
}
-type FollowInMemoryMessagesResponse_CtrlMessage struct {
+type SubscribeMessageResponse_CtrlMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- FlushedSequence int64 `protobuf:"varint,1,opt,name=flushed_sequence,json=flushedSequence,proto3" json:"flushed_sequence,omitempty"`
- FollowerChangedToId int32 `protobuf:"varint,2,opt,name=follower_changed_to_id,json=followerChangedToId,proto3" json:"follower_changed_to_id,omitempty"`
+ Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
+ IsEndOfStream bool `protobuf:"varint,2,opt,name=is_end_of_stream,json=isEndOfStream,proto3" json:"is_end_of_stream,omitempty"`
+ IsEndOfTopic bool `protobuf:"varint,3,opt,name=is_end_of_topic,json=isEndOfTopic,proto3" json:"is_end_of_topic,omitempty"`
}
-func (x *FollowInMemoryMessagesResponse_CtrlMessage) Reset() {
- *x = FollowInMemoryMessagesResponse_CtrlMessage{}
+func (x *SubscribeMessageResponse_CtrlMessage) Reset() {
+ *x = SubscribeMessageResponse_CtrlMessage{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[48]
+ mi := &file_mq_proto_msgTypes[45]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
-func (x *FollowInMemoryMessagesResponse_CtrlMessage) String() string {
+func (x *SubscribeMessageResponse_CtrlMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
-func (*FollowInMemoryMessagesResponse_CtrlMessage) ProtoMessage() {}
+func (*SubscribeMessageResponse_CtrlMessage) ProtoMessage() {}
-func (x *FollowInMemoryMessagesResponse_CtrlMessage) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[48]
+func (x *SubscribeMessageResponse_CtrlMessage) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[45]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -2897,23 +2678,30 @@ func (x *FollowInMemoryMessagesResponse_CtrlMessage) ProtoReflect() protoreflect
return mi.MessageOf(x)
}
-// Deprecated: Use FollowInMemoryMessagesResponse_CtrlMessage.ProtoReflect.Descriptor instead.
-func (*FollowInMemoryMessagesResponse_CtrlMessage) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{31, 0}
+// Deprecated: Use SubscribeMessageResponse_CtrlMessage.ProtoReflect.Descriptor instead.
+func (*SubscribeMessageResponse_CtrlMessage) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{29, 0}
}
-func (x *FollowInMemoryMessagesResponse_CtrlMessage) GetFlushedSequence() int64 {
+func (x *SubscribeMessageResponse_CtrlMessage) GetError() string {
if x != nil {
- return x.FlushedSequence
+ return x.Error
}
- return 0
+ return ""
}
-func (x *FollowInMemoryMessagesResponse_CtrlMessage) GetFollowerChangedToId() int32 {
+func (x *SubscribeMessageResponse_CtrlMessage) GetIsEndOfStream() bool {
if x != nil {
- return x.FollowerChangedToId
+ return x.IsEndOfStream
}
- return 0
+ return false
+}
+
+func (x *SubscribeMessageResponse_CtrlMessage) GetIsEndOfTopic() bool {
+ if x != nil {
+ return x.IsEndOfTopic
+ }
+ return false
}
var File_mq_proto protoreflect.FileDescriptor
@@ -3097,7 +2885,7 @@ var file_mq_proto_rawDesc = []byte{
0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x13, 0x0a,
0x05, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x74, 0x73,
- 0x4e, 0x73, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0xab, 0x03,
+ 0x4e, 0x73, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x9b, 0x02,
0x0a, 0x22, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x72, 0x54, 0x6f, 0x53, 0x75,
0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5d, 0x0a, 0x0a, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65,
@@ -3106,24 +2894,15 @@ var file_mq_proto_rawDesc = []byte{
0x65, 0x72, 0x54, 0x6f, 0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74,
0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67,
0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x48, 0x00, 0x52, 0x0a, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d,
- 0x65, 0x6e, 0x74, 0x1a, 0x77, 0x0a, 0x11, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x50,
- 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x35, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74,
- 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65,
- 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69,
- 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12,
- 0x13, 0x0a, 0x05, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04,
- 0x74, 0x73, 0x4e, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x03,
- 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x1a, 0xa1, 0x01, 0x0a,
- 0x0a, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x67,
- 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52,
- 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x73, 0x0a, 0x13, 0x61,
- 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f,
- 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x42, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61,
- 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62,
- 0x65, 0x72, 0x54, 0x6f, 0x53, 0x75, 0x62, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74,
- 0x6f, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67,
- 0x6e, 0x65, 0x64, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x12, 0x61, 0x73,
- 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73,
+ 0x65, 0x6e, 0x74, 0x1a, 0x8a, 0x01, 0x0a, 0x0a, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65,
+ 0x6e, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e,
+ 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x69,
+ 0x6f, 0x6e, 0x12, 0x5c, 0x0a, 0x15, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f,
+ 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28,
+ 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62,
+ 0x2e, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e,
+ 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x14, 0x70, 0x61, 0x72, 0x74,
+ 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x73,
0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x4a, 0x0a, 0x0b, 0x44,
0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65,
0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05,
@@ -3160,115 +2939,90 @@ var file_mq_proto_rawDesc = []byte{
0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72,
0x72, 0x6f, 0x72, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x5f, 0x63, 0x6c,
0x6f, 0x73, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x73, 0x68, 0x6f, 0x75, 0x6c,
- 0x64, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x22, 0x9b, 0x01, 0x0a, 0x16, 0x50, 0x75, 0x62, 0x6c, 0x69,
+ 0x64, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x22, 0xd8, 0x03, 0x0a, 0x16, 0x50, 0x75, 0x62, 0x6c, 0x69,
0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
- 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b,
- 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e,
- 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x35, 0x0a, 0x09,
- 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32,
- 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50,
- 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74,
- 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x5f, 0x73, 0x65,
- 0x6c, 0x66, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72,
- 0x53, 0x65, 0x6c, 0x66, 0x22, 0x2f, 0x0a, 0x17, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46,
- 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
- 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05,
- 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0xdf, 0x03, 0x0a, 0x17, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
- 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
- 0x74, 0x12, 0x47, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32,
- 0x31, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53,
- 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52,
- 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61,
- 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x12, 0x44, 0x0a, 0x03, 0x61, 0x63,
- 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
- 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
- 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x41,
- 0x63, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x03, 0x61, 0x63, 0x6b,
- 0x1a, 0xff, 0x01, 0x0a, 0x0b, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
- 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x5f, 0x67, 0x72, 0x6f,
- 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d,
- 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x73, 0x75,
- 0x6d, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6f,
- 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x6c, 0x69, 0x65,
- 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x6c, 0x69,
- 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x04,
- 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
- 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63,
- 0x12, 0x48, 0x0a, 0x10, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x66,
- 0x66, 0x73, 0x65, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x6d, 0x65, 0x73,
- 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74,
- 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x0f, 0x70, 0x61, 0x72, 0x74, 0x69,
- 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x66, 0x69,
- 0x6c, 0x74, 0x65, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x66, 0x69, 0x6c, 0x74,
- 0x65, 0x72, 0x1a, 0x28, 0x0a, 0x0a, 0x41, 0x63, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
- 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01,
- 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x42, 0x09, 0x0a, 0x07,
- 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x95, 0x02, 0x0a, 0x18, 0x53, 0x75, 0x62, 0x73,
- 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70,
- 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x48, 0x0a, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01,
- 0x28, 0x0b, 0x32, 0x32, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
+ 0x74, 0x12, 0x46, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32,
+ 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50,
+ 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65,
+ 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
+ 0x65, 0x48, 0x00, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x12, 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74,
+ 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
+ 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61,
+ 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x49, 0x0a, 0x05, 0x66, 0x6c,
+ 0x75, 0x73, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x6d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68,
+ 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e,
+ 0x46, 0x6c, 0x75, 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x05,
+ 0x66, 0x6c, 0x75, 0x73, 0x68, 0x12, 0x49, 0x0a, 0x05, 0x63, 0x6c, 0x6f, 0x73, 0x65, 0x18, 0x04,
+ 0x20, 0x01, 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
+ 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f,
+ 0x77, 0x4d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x43, 0x6c, 0x6f, 0x73, 0x65,
+ 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x05, 0x63, 0x6c, 0x6f, 0x73, 0x65,
+ 0x1a, 0x6f, 0x0a, 0x0b, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12,
+ 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13,
+ 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f,
+ 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x35, 0x0a, 0x09, 0x70, 0x61,
+ 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e,
+ 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72,
+ 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f,
+ 0x6e, 0x1a, 0x23, 0x0a, 0x0c, 0x46, 0x6c, 0x75, 0x73, 0x68, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
+ 0x65, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03,
+ 0x52, 0x04, 0x74, 0x73, 0x4e, 0x73, 0x1a, 0x0e, 0x0a, 0x0c, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x4d,
+ 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
+ 0x65, 0x22, 0x35, 0x0a, 0x17, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c,
+ 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x09,
+ 0x61, 0x63, 0x6b, 0x5f, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52,
+ 0x07, 0x61, 0x63, 0x6b, 0x54, 0x73, 0x4e, 0x73, 0x22, 0x8a, 0x04, 0x0a, 0x17, 0x53, 0x75, 0x62,
+ 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71,
+ 0x75, 0x65, 0x73, 0x74, 0x12, 0x47, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61,
- 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x43, 0x74, 0x72, 0x6c, 0x4d,
- 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x12, 0x2f,
- 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d,
- 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, 0x74, 0x61,
- 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x1a,
- 0x73, 0x0a, 0x0b, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14,
- 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65,
- 0x72, 0x72, 0x6f, 0x72, 0x12, 0x27, 0x0a, 0x10, 0x69, 0x73, 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x6f,
- 0x66, 0x5f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d,
- 0x69, 0x73, 0x45, 0x6e, 0x64, 0x4f, 0x66, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x25, 0x0a,
- 0x0f, 0x69, 0x73, 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x6f, 0x66, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63,
- 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x73, 0x45, 0x6e, 0x64, 0x4f, 0x66, 0x54,
- 0x6f, 0x70, 0x69, 0x63, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22,
- 0xdd, 0x03, 0x0a, 0x1d, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x49, 0x6e, 0x4d, 0x65, 0x6d, 0x6f,
- 0x72, 0x79, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
- 0x74, 0x12, 0x4d, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32,
- 0x37, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46,
- 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x49, 0x6e, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x4d, 0x65, 0x73,
- 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69,
- 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74,
- 0x12, 0x4a, 0x0a, 0x03, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x36, 0x2e,
- 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x6f, 0x6c,
- 0x6c, 0x6f, 0x77, 0x49, 0x6e, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x4d, 0x65, 0x73, 0x73, 0x61,
- 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x41, 0x63, 0x6b, 0x4d, 0x65,
- 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x03, 0x61, 0x63, 0x6b, 0x1a, 0xeb, 0x01, 0x0a,
- 0x0b, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x0e,
- 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01,
- 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x47, 0x72,
- 0x6f, 0x75, 0x70, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x5f,
- 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d,
- 0x65, 0x72, 0x49, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x66, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x72,
- 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x66, 0x6f, 0x6c, 0x6c, 0x6f,
- 0x77, 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x04,
- 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
- 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63,
- 0x12, 0x48, 0x0a, 0x10, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x66,
- 0x66, 0x73, 0x65, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x6d, 0x65, 0x73,
- 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74,
- 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x0f, 0x70, 0x61, 0x72, 0x74, 0x69,
- 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x1a, 0x28, 0x0a, 0x0a, 0x41, 0x63,
- 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75,
- 0x65, 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75,
- 0x65, 0x6e, 0x63, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22,
- 0x9b, 0x02, 0x0a, 0x1e, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x49, 0x6e, 0x4d, 0x65, 0x6d, 0x6f,
- 0x72, 0x79, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
- 0x73, 0x65, 0x12, 0x4e, 0x0a, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b,
- 0x32, 0x38, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e,
- 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x49, 0x6e, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x4d, 0x65,
- 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x43,
- 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x63, 0x74,
- 0x72, 0x6c, 0x12, 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
- 0x32, 0x19, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e,
- 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64,
- 0x61, 0x74, 0x61, 0x1a, 0x6d, 0x0a, 0x0b, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61,
- 0x67, 0x65, 0x12, 0x29, 0x0a, 0x10, 0x66, 0x6c, 0x75, 0x73, 0x68, 0x65, 0x64, 0x5f, 0x73, 0x65,
- 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0f, 0x66, 0x6c,
- 0x75, 0x73, 0x68, 0x65, 0x64, 0x53, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x12, 0x33, 0x0a,
- 0x16, 0x66, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x72, 0x5f, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65,
- 0x64, 0x5f, 0x74, 0x6f, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x13, 0x66,
- 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x72, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x64, 0x54, 0x6f,
- 0x49, 0x64, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x65, 0x0a,
+ 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65,
+ 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x12, 0x44, 0x0a,
+ 0x03, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x30, 0x2e, 0x6d, 0x65, 0x73,
+ 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
+ 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
+ 0x74, 0x2e, 0x41, 0x63, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x03,
+ 0x61, 0x63, 0x6b, 0x1a, 0xaa, 0x02, 0x0a, 0x0b, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x5f,
+ 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6e,
+ 0x73, 0x75, 0x6d, 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6f,
+ 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
+ 0x0a, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x63,
+ 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08,
+ 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69,
+ 0x63, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
+ 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f,
+ 0x70, 0x69, 0x63, 0x12, 0x48, 0x0a, 0x10, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e,
+ 0x5f, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e,
+ 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72,
+ 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x0f, 0x70, 0x61,
+ 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x16, 0x0a,
+ 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x66,
+ 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x29, 0x0a, 0x10, 0x66, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x65,
+ 0x72, 0x5f, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x07, 0x20, 0x03, 0x28, 0x09, 0x52,
+ 0x0f, 0x66, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x72, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73,
+ 0x1a, 0x28, 0x0a, 0x0a, 0x41, 0x63, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1a,
+ 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03,
+ 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65,
+ 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x95, 0x02, 0x0a, 0x18, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
+ 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
+ 0x73, 0x65, 0x12, 0x48, 0x0a, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b,
+ 0x32, 0x32, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e,
+ 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
+ 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73,
+ 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x12, 0x2f, 0x0a, 0x04,
+ 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, 0x65, 0x73,
+ 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65,
+ 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x73, 0x0a,
+ 0x0b, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a, 0x05,
+ 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72,
+ 0x6f, 0x72, 0x12, 0x27, 0x0a, 0x10, 0x69, 0x73, 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x6f, 0x66, 0x5f,
+ 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0d, 0x69, 0x73,
+ 0x45, 0x6e, 0x64, 0x4f, 0x66, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x25, 0x0a, 0x0f, 0x69,
+ 0x73, 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x6f, 0x66, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x03,
+ 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x73, 0x45, 0x6e, 0x64, 0x4f, 0x66, 0x54, 0x6f, 0x70,
+ 0x69, 0x63, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x65, 0x0a,
0x16, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x65, 0x72, 0x73,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69,
@@ -3290,7 +3044,7 @@ var file_mq_proto_rawDesc = []byte{
0x0c, 0x0a, 0x08, 0x45, 0x41, 0x52, 0x4c, 0x49, 0x45, 0x53, 0x54, 0x10, 0x00, 0x12, 0x16, 0x0a,
0x12, 0x45, 0x41, 0x52, 0x4c, 0x49, 0x45, 0x53, 0x54, 0x5f, 0x49, 0x4e, 0x5f, 0x4d, 0x45, 0x4d,
0x4f, 0x52, 0x59, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x41, 0x54, 0x45, 0x53, 0x54, 0x10,
- 0x02, 0x32, 0xd3, 0x0b, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73,
+ 0x02, 0x32, 0xde, 0x0a, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72,
0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72,
@@ -3369,26 +3123,19 @@ var file_mq_proto_rawDesc = []byte{
0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e,
0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
- 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x60, 0x0a, 0x0f,
+ 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x64, 0x0a, 0x0f,
0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x12,
0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50,
0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e,
0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x46, 0x6f, 0x6c, 0x6c,
- 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x77,
- 0x0a, 0x16, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x49, 0x6e, 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79,
- 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61,
- 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x49, 0x6e,
- 0x4d, 0x65, 0x6d, 0x6f, 0x72, 0x79, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65,
- 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e,
- 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x49, 0x6e, 0x4d, 0x65, 0x6d,
- 0x6f, 0x72, 0x79, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f,
- 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x4f, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65,
- 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, 0x11, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
- 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68,
- 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73,
- 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f,
- 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x6f, 0x77, 0x4d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01,
+ 0x30, 0x01, 0x42, 0x4f, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e,
+ 0x6d, 0x71, 0x42, 0x11, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x51, 0x75, 0x65, 0x75, 0x65,
+ 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
+ 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77,
+ 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71,
+ 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -3404,68 +3151,65 @@ func file_mq_proto_rawDescGZIP() []byte {
}
var file_mq_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
-var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 49)
+var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 46)
var file_mq_proto_goTypes = []interface{}{
- (PartitionOffsetStartType)(0), // 0: messaging_pb.PartitionOffsetStartType
- (*FindBrokerLeaderRequest)(nil), // 1: messaging_pb.FindBrokerLeaderRequest
- (*FindBrokerLeaderResponse)(nil), // 2: messaging_pb.FindBrokerLeaderResponse
- (*Topic)(nil), // 3: messaging_pb.Topic
- (*Partition)(nil), // 4: messaging_pb.Partition
- (*Offset)(nil), // 5: messaging_pb.Offset
- (*PartitionOffset)(nil), // 6: messaging_pb.PartitionOffset
- (*BrokerStats)(nil), // 7: messaging_pb.BrokerStats
- (*TopicPartitionStats)(nil), // 8: messaging_pb.TopicPartitionStats
- (*PublisherToPubBalancerRequest)(nil), // 9: messaging_pb.PublisherToPubBalancerRequest
- (*PublisherToPubBalancerResponse)(nil), // 10: messaging_pb.PublisherToPubBalancerResponse
- (*BalanceTopicsRequest)(nil), // 11: messaging_pb.BalanceTopicsRequest
- (*BalanceTopicsResponse)(nil), // 12: messaging_pb.BalanceTopicsResponse
- (*ConfigureTopicRequest)(nil), // 13: messaging_pb.ConfigureTopicRequest
- (*ConfigureTopicResponse)(nil), // 14: messaging_pb.ConfigureTopicResponse
- (*ListTopicsRequest)(nil), // 15: messaging_pb.ListTopicsRequest
- (*ListTopicsResponse)(nil), // 16: messaging_pb.ListTopicsResponse
- (*LookupTopicBrokersRequest)(nil), // 17: messaging_pb.LookupTopicBrokersRequest
- (*LookupTopicBrokersResponse)(nil), // 18: messaging_pb.LookupTopicBrokersResponse
- (*BrokerPartitionAssignment)(nil), // 19: messaging_pb.BrokerPartitionAssignment
- (*AssignTopicPartitionsRequest)(nil), // 20: messaging_pb.AssignTopicPartitionsRequest
- (*AssignTopicPartitionsResponse)(nil), // 21: messaging_pb.AssignTopicPartitionsResponse
- (*SubscriberToSubCoordinatorRequest)(nil), // 22: messaging_pb.SubscriberToSubCoordinatorRequest
- (*SubscriberToSubCoordinatorResponse)(nil), // 23: messaging_pb.SubscriberToSubCoordinatorResponse
- (*DataMessage)(nil), // 24: messaging_pb.DataMessage
- (*PublishMessageRequest)(nil), // 25: messaging_pb.PublishMessageRequest
- (*PublishMessageResponse)(nil), // 26: messaging_pb.PublishMessageResponse
- (*PublishFollowMeRequest)(nil), // 27: messaging_pb.PublishFollowMeRequest
- (*PublishFollowMeResponse)(nil), // 28: messaging_pb.PublishFollowMeResponse
- (*SubscribeMessageRequest)(nil), // 29: messaging_pb.SubscribeMessageRequest
- (*SubscribeMessageResponse)(nil), // 30: messaging_pb.SubscribeMessageResponse
- (*FollowInMemoryMessagesRequest)(nil), // 31: messaging_pb.FollowInMemoryMessagesRequest
- (*FollowInMemoryMessagesResponse)(nil), // 32: messaging_pb.FollowInMemoryMessagesResponse
- (*ClosePublishersRequest)(nil), // 33: messaging_pb.ClosePublishersRequest
- (*ClosePublishersResponse)(nil), // 34: messaging_pb.ClosePublishersResponse
- (*CloseSubscribersRequest)(nil), // 35: messaging_pb.CloseSubscribersRequest
- (*CloseSubscribersResponse)(nil), // 36: messaging_pb.CloseSubscribersResponse
- nil, // 37: messaging_pb.BrokerStats.StatsEntry
- (*PublisherToPubBalancerRequest_InitMessage)(nil), // 38: messaging_pb.PublisherToPubBalancerRequest.InitMessage
- (*SubscriberToSubCoordinatorRequest_InitMessage)(nil), // 39: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage
- (*SubscriberToSubCoordinatorRequest_AckMessage)(nil), // 40: messaging_pb.SubscriberToSubCoordinatorRequest.AckMessage
- (*SubscriberToSubCoordinatorResponse_AssignedPartition)(nil), // 41: messaging_pb.SubscriberToSubCoordinatorResponse.AssignedPartition
- (*SubscriberToSubCoordinatorResponse_Assignment)(nil), // 42: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment
- (*PublishMessageRequest_InitMessage)(nil), // 43: messaging_pb.PublishMessageRequest.InitMessage
- (*SubscribeMessageRequest_InitMessage)(nil), // 44: messaging_pb.SubscribeMessageRequest.InitMessage
- (*SubscribeMessageRequest_AckMessage)(nil), // 45: messaging_pb.SubscribeMessageRequest.AckMessage
- (*SubscribeMessageResponse_CtrlMessage)(nil), // 46: messaging_pb.SubscribeMessageResponse.CtrlMessage
- (*FollowInMemoryMessagesRequest_InitMessage)(nil), // 47: messaging_pb.FollowInMemoryMessagesRequest.InitMessage
- (*FollowInMemoryMessagesRequest_AckMessage)(nil), // 48: messaging_pb.FollowInMemoryMessagesRequest.AckMessage
- (*FollowInMemoryMessagesResponse_CtrlMessage)(nil), // 49: messaging_pb.FollowInMemoryMessagesResponse.CtrlMessage
+ (PartitionOffsetStartType)(0), // 0: messaging_pb.PartitionOffsetStartType
+ (*FindBrokerLeaderRequest)(nil), // 1: messaging_pb.FindBrokerLeaderRequest
+ (*FindBrokerLeaderResponse)(nil), // 2: messaging_pb.FindBrokerLeaderResponse
+ (*Topic)(nil), // 3: messaging_pb.Topic
+ (*Partition)(nil), // 4: messaging_pb.Partition
+ (*Offset)(nil), // 5: messaging_pb.Offset
+ (*PartitionOffset)(nil), // 6: messaging_pb.PartitionOffset
+ (*BrokerStats)(nil), // 7: messaging_pb.BrokerStats
+ (*TopicPartitionStats)(nil), // 8: messaging_pb.TopicPartitionStats
+ (*PublisherToPubBalancerRequest)(nil), // 9: messaging_pb.PublisherToPubBalancerRequest
+ (*PublisherToPubBalancerResponse)(nil), // 10: messaging_pb.PublisherToPubBalancerResponse
+ (*BalanceTopicsRequest)(nil), // 11: messaging_pb.BalanceTopicsRequest
+ (*BalanceTopicsResponse)(nil), // 12: messaging_pb.BalanceTopicsResponse
+ (*ConfigureTopicRequest)(nil), // 13: messaging_pb.ConfigureTopicRequest
+ (*ConfigureTopicResponse)(nil), // 14: messaging_pb.ConfigureTopicResponse
+ (*ListTopicsRequest)(nil), // 15: messaging_pb.ListTopicsRequest
+ (*ListTopicsResponse)(nil), // 16: messaging_pb.ListTopicsResponse
+ (*LookupTopicBrokersRequest)(nil), // 17: messaging_pb.LookupTopicBrokersRequest
+ (*LookupTopicBrokersResponse)(nil), // 18: messaging_pb.LookupTopicBrokersResponse
+ (*BrokerPartitionAssignment)(nil), // 19: messaging_pb.BrokerPartitionAssignment
+ (*AssignTopicPartitionsRequest)(nil), // 20: messaging_pb.AssignTopicPartitionsRequest
+ (*AssignTopicPartitionsResponse)(nil), // 21: messaging_pb.AssignTopicPartitionsResponse
+ (*SubscriberToSubCoordinatorRequest)(nil), // 22: messaging_pb.SubscriberToSubCoordinatorRequest
+ (*SubscriberToSubCoordinatorResponse)(nil), // 23: messaging_pb.SubscriberToSubCoordinatorResponse
+ (*DataMessage)(nil), // 24: messaging_pb.DataMessage
+ (*PublishMessageRequest)(nil), // 25: messaging_pb.PublishMessageRequest
+ (*PublishMessageResponse)(nil), // 26: messaging_pb.PublishMessageResponse
+ (*PublishFollowMeRequest)(nil), // 27: messaging_pb.PublishFollowMeRequest
+ (*PublishFollowMeResponse)(nil), // 28: messaging_pb.PublishFollowMeResponse
+ (*SubscribeMessageRequest)(nil), // 29: messaging_pb.SubscribeMessageRequest
+ (*SubscribeMessageResponse)(nil), // 30: messaging_pb.SubscribeMessageResponse
+ (*ClosePublishersRequest)(nil), // 31: messaging_pb.ClosePublishersRequest
+ (*ClosePublishersResponse)(nil), // 32: messaging_pb.ClosePublishersResponse
+ (*CloseSubscribersRequest)(nil), // 33: messaging_pb.CloseSubscribersRequest
+ (*CloseSubscribersResponse)(nil), // 34: messaging_pb.CloseSubscribersResponse
+ nil, // 35: messaging_pb.BrokerStats.StatsEntry
+ (*PublisherToPubBalancerRequest_InitMessage)(nil), // 36: messaging_pb.PublisherToPubBalancerRequest.InitMessage
+ (*SubscriberToSubCoordinatorRequest_InitMessage)(nil), // 37: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage
+ (*SubscriberToSubCoordinatorRequest_AckMessage)(nil), // 38: messaging_pb.SubscriberToSubCoordinatorRequest.AckMessage
+ (*SubscriberToSubCoordinatorResponse_Assignment)(nil), // 39: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment
+ (*PublishMessageRequest_InitMessage)(nil), // 40: messaging_pb.PublishMessageRequest.InitMessage
+ (*PublishFollowMeRequest_InitMessage)(nil), // 41: messaging_pb.PublishFollowMeRequest.InitMessage
+ (*PublishFollowMeRequest_FlushMessage)(nil), // 42: messaging_pb.PublishFollowMeRequest.FlushMessage
+ (*PublishFollowMeRequest_CloseMessage)(nil), // 43: messaging_pb.PublishFollowMeRequest.CloseMessage
+ (*SubscribeMessageRequest_InitMessage)(nil), // 44: messaging_pb.SubscribeMessageRequest.InitMessage
+ (*SubscribeMessageRequest_AckMessage)(nil), // 45: messaging_pb.SubscribeMessageRequest.AckMessage
+ (*SubscribeMessageResponse_CtrlMessage)(nil), // 46: messaging_pb.SubscribeMessageResponse.CtrlMessage
}
var file_mq_proto_depIdxs = []int32{
3, // 0: messaging_pb.Offset.topic:type_name -> messaging_pb.Topic
6, // 1: messaging_pb.Offset.partition_offsets:type_name -> messaging_pb.PartitionOffset
4, // 2: messaging_pb.PartitionOffset.partition:type_name -> messaging_pb.Partition
0, // 3: messaging_pb.PartitionOffset.start_type:type_name -> messaging_pb.PartitionOffsetStartType
- 37, // 4: messaging_pb.BrokerStats.stats:type_name -> messaging_pb.BrokerStats.StatsEntry
+ 35, // 4: messaging_pb.BrokerStats.stats:type_name -> messaging_pb.BrokerStats.StatsEntry
3, // 5: messaging_pb.TopicPartitionStats.topic:type_name -> messaging_pb.Topic
4, // 6: messaging_pb.TopicPartitionStats.partition:type_name -> messaging_pb.Partition
- 38, // 7: messaging_pb.PublisherToPubBalancerRequest.init:type_name -> messaging_pb.PublisherToPubBalancerRequest.InitMessage
+ 36, // 7: messaging_pb.PublisherToPubBalancerRequest.init:type_name -> messaging_pb.PublisherToPubBalancerRequest.InitMessage
7, // 8: messaging_pb.PublisherToPubBalancerRequest.stats:type_name -> messaging_pb.BrokerStats
3, // 9: messaging_pb.ConfigureTopicRequest.topic:type_name -> messaging_pb.Topic
19, // 10: messaging_pb.ConfigureTopicResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment
@@ -3476,67 +3220,62 @@ var file_mq_proto_depIdxs = []int32{
4, // 15: messaging_pb.BrokerPartitionAssignment.partition:type_name -> messaging_pb.Partition
3, // 16: messaging_pb.AssignTopicPartitionsRequest.topic:type_name -> messaging_pb.Topic
19, // 17: messaging_pb.AssignTopicPartitionsRequest.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment
- 39, // 18: messaging_pb.SubscriberToSubCoordinatorRequest.init:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage
- 40, // 19: messaging_pb.SubscriberToSubCoordinatorRequest.ack:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.AckMessage
- 42, // 20: messaging_pb.SubscriberToSubCoordinatorResponse.assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.Assignment
- 43, // 21: messaging_pb.PublishMessageRequest.init:type_name -> messaging_pb.PublishMessageRequest.InitMessage
+ 37, // 18: messaging_pb.SubscriberToSubCoordinatorRequest.init:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage
+ 38, // 19: messaging_pb.SubscriberToSubCoordinatorRequest.ack:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.AckMessage
+ 39, // 20: messaging_pb.SubscriberToSubCoordinatorResponse.assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.Assignment
+ 40, // 21: messaging_pb.PublishMessageRequest.init:type_name -> messaging_pb.PublishMessageRequest.InitMessage
24, // 22: messaging_pb.PublishMessageRequest.data:type_name -> messaging_pb.DataMessage
- 3, // 23: messaging_pb.PublishFollowMeRequest.topic:type_name -> messaging_pb.Topic
- 4, // 24: messaging_pb.PublishFollowMeRequest.partition:type_name -> messaging_pb.Partition
- 44, // 25: messaging_pb.SubscribeMessageRequest.init:type_name -> messaging_pb.SubscribeMessageRequest.InitMessage
- 45, // 26: messaging_pb.SubscribeMessageRequest.ack:type_name -> messaging_pb.SubscribeMessageRequest.AckMessage
- 46, // 27: messaging_pb.SubscribeMessageResponse.ctrl:type_name -> messaging_pb.SubscribeMessageResponse.CtrlMessage
- 24, // 28: messaging_pb.SubscribeMessageResponse.data:type_name -> messaging_pb.DataMessage
- 47, // 29: messaging_pb.FollowInMemoryMessagesRequest.init:type_name -> messaging_pb.FollowInMemoryMessagesRequest.InitMessage
- 48, // 30: messaging_pb.FollowInMemoryMessagesRequest.ack:type_name -> messaging_pb.FollowInMemoryMessagesRequest.AckMessage
- 49, // 31: messaging_pb.FollowInMemoryMessagesResponse.ctrl:type_name -> messaging_pb.FollowInMemoryMessagesResponse.CtrlMessage
- 24, // 32: messaging_pb.FollowInMemoryMessagesResponse.data:type_name -> messaging_pb.DataMessage
- 3, // 33: messaging_pb.ClosePublishersRequest.topic:type_name -> messaging_pb.Topic
- 3, // 34: messaging_pb.CloseSubscribersRequest.topic:type_name -> messaging_pb.Topic
- 8, // 35: messaging_pb.BrokerStats.StatsEntry.value:type_name -> messaging_pb.TopicPartitionStats
- 3, // 36: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage.topic:type_name -> messaging_pb.Topic
- 4, // 37: messaging_pb.SubscriberToSubCoordinatorRequest.AckMessage.partition:type_name -> messaging_pb.Partition
- 4, // 38: messaging_pb.SubscriberToSubCoordinatorResponse.AssignedPartition.partition:type_name -> messaging_pb.Partition
- 41, // 39: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment.assigned_partitions:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.AssignedPartition
- 3, // 40: messaging_pb.PublishMessageRequest.InitMessage.topic:type_name -> messaging_pb.Topic
- 4, // 41: messaging_pb.PublishMessageRequest.InitMessage.partition:type_name -> messaging_pb.Partition
- 3, // 42: messaging_pb.SubscribeMessageRequest.InitMessage.topic:type_name -> messaging_pb.Topic
- 6, // 43: messaging_pb.SubscribeMessageRequest.InitMessage.partition_offset:type_name -> messaging_pb.PartitionOffset
- 3, // 44: messaging_pb.FollowInMemoryMessagesRequest.InitMessage.topic:type_name -> messaging_pb.Topic
- 6, // 45: messaging_pb.FollowInMemoryMessagesRequest.InitMessage.partition_offset:type_name -> messaging_pb.PartitionOffset
- 1, // 46: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest
- 9, // 47: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:input_type -> messaging_pb.PublisherToPubBalancerRequest
- 11, // 48: messaging_pb.SeaweedMessaging.BalanceTopics:input_type -> messaging_pb.BalanceTopicsRequest
- 15, // 49: messaging_pb.SeaweedMessaging.ListTopics:input_type -> messaging_pb.ListTopicsRequest
- 13, // 50: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest
- 17, // 51: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest
- 20, // 52: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest
- 33, // 53: messaging_pb.SeaweedMessaging.ClosePublishers:input_type -> messaging_pb.ClosePublishersRequest
- 35, // 54: messaging_pb.SeaweedMessaging.CloseSubscribers:input_type -> messaging_pb.CloseSubscribersRequest
- 22, // 55: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:input_type -> messaging_pb.SubscriberToSubCoordinatorRequest
- 25, // 56: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishMessageRequest
- 29, // 57: messaging_pb.SeaweedMessaging.SubscribeMessage:input_type -> messaging_pb.SubscribeMessageRequest
- 27, // 58: messaging_pb.SeaweedMessaging.PublishFollowMe:input_type -> messaging_pb.PublishFollowMeRequest
- 31, // 59: messaging_pb.SeaweedMessaging.FollowInMemoryMessages:input_type -> messaging_pb.FollowInMemoryMessagesRequest
- 2, // 60: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse
- 10, // 61: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:output_type -> messaging_pb.PublisherToPubBalancerResponse
- 12, // 62: messaging_pb.SeaweedMessaging.BalanceTopics:output_type -> messaging_pb.BalanceTopicsResponse
- 16, // 63: messaging_pb.SeaweedMessaging.ListTopics:output_type -> messaging_pb.ListTopicsResponse
- 14, // 64: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse
- 18, // 65: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse
- 21, // 66: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse
- 34, // 67: messaging_pb.SeaweedMessaging.ClosePublishers:output_type -> messaging_pb.ClosePublishersResponse
- 36, // 68: messaging_pb.SeaweedMessaging.CloseSubscribers:output_type -> messaging_pb.CloseSubscribersResponse
- 23, // 69: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:output_type -> messaging_pb.SubscriberToSubCoordinatorResponse
- 26, // 70: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishMessageResponse
- 30, // 71: messaging_pb.SeaweedMessaging.SubscribeMessage:output_type -> messaging_pb.SubscribeMessageResponse
- 28, // 72: messaging_pb.SeaweedMessaging.PublishFollowMe:output_type -> messaging_pb.PublishFollowMeResponse
- 32, // 73: messaging_pb.SeaweedMessaging.FollowInMemoryMessages:output_type -> messaging_pb.FollowInMemoryMessagesResponse
- 60, // [60:74] is the sub-list for method output_type
- 46, // [46:60] is the sub-list for method input_type
- 46, // [46:46] is the sub-list for extension type_name
- 46, // [46:46] is the sub-list for extension extendee
- 0, // [0:46] is the sub-list for field type_name
+ 41, // 23: messaging_pb.PublishFollowMeRequest.init:type_name -> messaging_pb.PublishFollowMeRequest.InitMessage
+ 24, // 24: messaging_pb.PublishFollowMeRequest.data:type_name -> messaging_pb.DataMessage
+ 42, // 25: messaging_pb.PublishFollowMeRequest.flush:type_name -> messaging_pb.PublishFollowMeRequest.FlushMessage
+ 43, // 26: messaging_pb.PublishFollowMeRequest.close:type_name -> messaging_pb.PublishFollowMeRequest.CloseMessage
+ 44, // 27: messaging_pb.SubscribeMessageRequest.init:type_name -> messaging_pb.SubscribeMessageRequest.InitMessage
+ 45, // 28: messaging_pb.SubscribeMessageRequest.ack:type_name -> messaging_pb.SubscribeMessageRequest.AckMessage
+ 46, // 29: messaging_pb.SubscribeMessageResponse.ctrl:type_name -> messaging_pb.SubscribeMessageResponse.CtrlMessage
+ 24, // 30: messaging_pb.SubscribeMessageResponse.data:type_name -> messaging_pb.DataMessage
+ 3, // 31: messaging_pb.ClosePublishersRequest.topic:type_name -> messaging_pb.Topic
+ 3, // 32: messaging_pb.CloseSubscribersRequest.topic:type_name -> messaging_pb.Topic
+ 8, // 33: messaging_pb.BrokerStats.StatsEntry.value:type_name -> messaging_pb.TopicPartitionStats
+ 3, // 34: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage.topic:type_name -> messaging_pb.Topic
+ 4, // 35: messaging_pb.SubscriberToSubCoordinatorRequest.AckMessage.partition:type_name -> messaging_pb.Partition
+ 19, // 36: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment.partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment
+ 3, // 37: messaging_pb.PublishMessageRequest.InitMessage.topic:type_name -> messaging_pb.Topic
+ 4, // 38: messaging_pb.PublishMessageRequest.InitMessage.partition:type_name -> messaging_pb.Partition
+ 3, // 39: messaging_pb.PublishFollowMeRequest.InitMessage.topic:type_name -> messaging_pb.Topic
+ 4, // 40: messaging_pb.PublishFollowMeRequest.InitMessage.partition:type_name -> messaging_pb.Partition
+ 3, // 41: messaging_pb.SubscribeMessageRequest.InitMessage.topic:type_name -> messaging_pb.Topic
+ 6, // 42: messaging_pb.SubscribeMessageRequest.InitMessage.partition_offset:type_name -> messaging_pb.PartitionOffset
+ 1, // 43: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest
+ 9, // 44: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:input_type -> messaging_pb.PublisherToPubBalancerRequest
+ 11, // 45: messaging_pb.SeaweedMessaging.BalanceTopics:input_type -> messaging_pb.BalanceTopicsRequest
+ 15, // 46: messaging_pb.SeaweedMessaging.ListTopics:input_type -> messaging_pb.ListTopicsRequest
+ 13, // 47: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest
+ 17, // 48: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest
+ 20, // 49: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest
+ 31, // 50: messaging_pb.SeaweedMessaging.ClosePublishers:input_type -> messaging_pb.ClosePublishersRequest
+ 33, // 51: messaging_pb.SeaweedMessaging.CloseSubscribers:input_type -> messaging_pb.CloseSubscribersRequest
+ 22, // 52: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:input_type -> messaging_pb.SubscriberToSubCoordinatorRequest
+ 25, // 53: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishMessageRequest
+ 29, // 54: messaging_pb.SeaweedMessaging.SubscribeMessage:input_type -> messaging_pb.SubscribeMessageRequest
+ 27, // 55: messaging_pb.SeaweedMessaging.PublishFollowMe:input_type -> messaging_pb.PublishFollowMeRequest
+ 2, // 56: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse
+ 10, // 57: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:output_type -> messaging_pb.PublisherToPubBalancerResponse
+ 12, // 58: messaging_pb.SeaweedMessaging.BalanceTopics:output_type -> messaging_pb.BalanceTopicsResponse
+ 16, // 59: messaging_pb.SeaweedMessaging.ListTopics:output_type -> messaging_pb.ListTopicsResponse
+ 14, // 60: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse
+ 18, // 61: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse
+ 21, // 62: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse
+ 32, // 63: messaging_pb.SeaweedMessaging.ClosePublishers:output_type -> messaging_pb.ClosePublishersResponse
+ 34, // 64: messaging_pb.SeaweedMessaging.CloseSubscribers:output_type -> messaging_pb.CloseSubscribersResponse
+ 23, // 65: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:output_type -> messaging_pb.SubscriberToSubCoordinatorResponse
+ 26, // 66: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishMessageResponse
+ 30, // 67: messaging_pb.SeaweedMessaging.SubscribeMessage:output_type -> messaging_pb.SubscribeMessageResponse
+ 28, // 68: messaging_pb.SeaweedMessaging.PublishFollowMe:output_type -> messaging_pb.PublishFollowMeResponse
+ 56, // [56:69] is the sub-list for method output_type
+ 43, // [43:56] is the sub-list for method input_type
+ 43, // [43:43] is the sub-list for extension type_name
+ 43, // [43:43] is the sub-list for extension extendee
+ 0, // [0:43] is the sub-list for field type_name
}
func init() { file_mq_proto_init() }
@@ -3906,7 +3645,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[30].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*FollowInMemoryMessagesRequest); i {
+ switch v := v.(*ClosePublishersRequest); i {
case 0:
return &v.state
case 1:
@@ -3918,7 +3657,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[31].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*FollowInMemoryMessagesResponse); i {
+ switch v := v.(*ClosePublishersResponse); i {
case 0:
return &v.state
case 1:
@@ -3930,7 +3669,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[32].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*ClosePublishersRequest); i {
+ switch v := v.(*CloseSubscribersRequest); i {
case 0:
return &v.state
case 1:
@@ -3942,7 +3681,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[33].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*ClosePublishersResponse); i {
+ switch v := v.(*CloseSubscribersResponse); i {
case 0:
return &v.state
case 1:
@@ -3953,8 +3692,8 @@ func file_mq_proto_init() {
return nil
}
}
- file_mq_proto_msgTypes[34].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*CloseSubscribersRequest); i {
+ file_mq_proto_msgTypes[35].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*PublisherToPubBalancerRequest_InitMessage); i {
case 0:
return &v.state
case 1:
@@ -3965,8 +3704,8 @@ func file_mq_proto_init() {
return nil
}
}
- file_mq_proto_msgTypes[35].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*CloseSubscribersResponse); i {
+ file_mq_proto_msgTypes[36].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*SubscriberToSubCoordinatorRequest_InitMessage); i {
case 0:
return &v.state
case 1:
@@ -3978,7 +3717,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[37].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*PublisherToPubBalancerRequest_InitMessage); i {
+ switch v := v.(*SubscriberToSubCoordinatorRequest_AckMessage); i {
case 0:
return &v.state
case 1:
@@ -3990,7 +3729,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[38].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*SubscriberToSubCoordinatorRequest_InitMessage); i {
+ switch v := v.(*SubscriberToSubCoordinatorResponse_Assignment); i {
case 0:
return &v.state
case 1:
@@ -4002,7 +3741,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[39].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*SubscriberToSubCoordinatorRequest_AckMessage); i {
+ switch v := v.(*PublishMessageRequest_InitMessage); i {
case 0:
return &v.state
case 1:
@@ -4014,7 +3753,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[40].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*SubscriberToSubCoordinatorResponse_AssignedPartition); i {
+ switch v := v.(*PublishFollowMeRequest_InitMessage); i {
case 0:
return &v.state
case 1:
@@ -4026,7 +3765,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[41].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*SubscriberToSubCoordinatorResponse_Assignment); i {
+ switch v := v.(*PublishFollowMeRequest_FlushMessage); i {
case 0:
return &v.state
case 1:
@@ -4038,7 +3777,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[42].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*PublishMessageRequest_InitMessage); i {
+ switch v := v.(*PublishFollowMeRequest_CloseMessage); i {
case 0:
return &v.state
case 1:
@@ -4085,42 +3824,6 @@ func file_mq_proto_init() {
return nil
}
}
- file_mq_proto_msgTypes[46].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*FollowInMemoryMessagesRequest_InitMessage); i {
- case 0:
- return &v.state
- case 1:
- return &v.sizeCache
- case 2:
- return &v.unknownFields
- default:
- return nil
- }
- }
- file_mq_proto_msgTypes[47].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*FollowInMemoryMessagesRequest_AckMessage); i {
- case 0:
- return &v.state
- case 1:
- return &v.sizeCache
- case 2:
- return &v.unknownFields
- default:
- return nil
- }
- }
- file_mq_proto_msgTypes[48].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*FollowInMemoryMessagesResponse_CtrlMessage); i {
- case 0:
- return &v.state
- case 1:
- return &v.sizeCache
- case 2:
- return &v.unknownFields
- default:
- return nil
- }
- }
}
file_mq_proto_msgTypes[8].OneofWrappers = []interface{}{
(*PublisherToPubBalancerRequest_Init)(nil),
@@ -4137,6 +3840,12 @@ func file_mq_proto_init() {
(*PublishMessageRequest_Init)(nil),
(*PublishMessageRequest_Data)(nil),
}
+ file_mq_proto_msgTypes[26].OneofWrappers = []interface{}{
+ (*PublishFollowMeRequest_Init)(nil),
+ (*PublishFollowMeRequest_Data)(nil),
+ (*PublishFollowMeRequest_Flush)(nil),
+ (*PublishFollowMeRequest_Close)(nil),
+ }
file_mq_proto_msgTypes[28].OneofWrappers = []interface{}{
(*SubscribeMessageRequest_Init)(nil),
(*SubscribeMessageRequest_Ack)(nil),
@@ -4145,21 +3854,13 @@ func file_mq_proto_init() {
(*SubscribeMessageResponse_Ctrl)(nil),
(*SubscribeMessageResponse_Data)(nil),
}
- file_mq_proto_msgTypes[30].OneofWrappers = []interface{}{
- (*FollowInMemoryMessagesRequest_Init)(nil),
- (*FollowInMemoryMessagesRequest_Ack)(nil),
- }
- file_mq_proto_msgTypes[31].OneofWrappers = []interface{}{
- (*FollowInMemoryMessagesResponse_Ctrl)(nil),
- (*FollowInMemoryMessagesResponse_Data)(nil),
- }
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_mq_proto_rawDesc,
NumEnums: 1,
- NumMessages: 49,
+ NumMessages: 46,
NumExtensions: 0,
NumServices: 1,
},
diff --git a/weed/pb/mq_pb/mq_grpc.pb.go b/weed/pb/mq_pb/mq_grpc.pb.go
index 64518d605..0028f341e 100644
--- a/weed/pb/mq_pb/mq_grpc.pb.go
+++ b/weed/pb/mq_pb/mq_grpc.pb.go
@@ -32,7 +32,6 @@ const (
SeaweedMessaging_PublishMessage_FullMethodName = "/messaging_pb.SeaweedMessaging/PublishMessage"
SeaweedMessaging_SubscribeMessage_FullMethodName = "/messaging_pb.SeaweedMessaging/SubscribeMessage"
SeaweedMessaging_PublishFollowMe_FullMethodName = "/messaging_pb.SeaweedMessaging/PublishFollowMe"
- SeaweedMessaging_FollowInMemoryMessages_FullMethodName = "/messaging_pb.SeaweedMessaging/FollowInMemoryMessages"
)
// SeaweedMessagingClient is the client API for SeaweedMessaging service.
@@ -58,8 +57,7 @@ type SeaweedMessagingClient interface {
PublishMessage(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishMessageClient, error)
SubscribeMessage(ctx context.Context, in *SubscribeMessageRequest, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeMessageClient, error)
// The lead broker asks a follower broker to follow itself
- PublishFollowMe(ctx context.Context, in *PublishFollowMeRequest, opts ...grpc.CallOption) (*PublishFollowMeResponse, error)
- FollowInMemoryMessages(ctx context.Context, in *FollowInMemoryMessagesRequest, opts ...grpc.CallOption) (SeaweedMessaging_FollowInMemoryMessagesClient, error)
+ PublishFollowMe(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishFollowMeClient, error)
}
type seaweedMessagingClient struct {
@@ -267,41 +265,31 @@ func (x *seaweedMessagingSubscribeMessageClient) Recv() (*SubscribeMessageRespon
return m, nil
}
-func (c *seaweedMessagingClient) PublishFollowMe(ctx context.Context, in *PublishFollowMeRequest, opts ...grpc.CallOption) (*PublishFollowMeResponse, error) {
- out := new(PublishFollowMeResponse)
- err := c.cc.Invoke(ctx, SeaweedMessaging_PublishFollowMe_FullMethodName, in, out, opts...)
+func (c *seaweedMessagingClient) PublishFollowMe(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishFollowMeClient, error) {
+ stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[4], SeaweedMessaging_PublishFollowMe_FullMethodName, opts...)
if err != nil {
return nil, err
}
- return out, nil
-}
-
-func (c *seaweedMessagingClient) FollowInMemoryMessages(ctx context.Context, in *FollowInMemoryMessagesRequest, opts ...grpc.CallOption) (SeaweedMessaging_FollowInMemoryMessagesClient, error) {
- stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[4], SeaweedMessaging_FollowInMemoryMessages_FullMethodName, opts...)
- if err != nil {
- return nil, err
- }
- x := &seaweedMessagingFollowInMemoryMessagesClient{stream}
- if err := x.ClientStream.SendMsg(in); err != nil {
- return nil, err
- }
- if err := x.ClientStream.CloseSend(); err != nil {
- return nil, err
- }
+ x := &seaweedMessagingPublishFollowMeClient{stream}
return x, nil
}
-type SeaweedMessaging_FollowInMemoryMessagesClient interface {
- Recv() (*FollowInMemoryMessagesResponse, error)
+type SeaweedMessaging_PublishFollowMeClient interface {
+ Send(*PublishFollowMeRequest) error
+ Recv() (*PublishFollowMeResponse, error)
grpc.ClientStream
}
-type seaweedMessagingFollowInMemoryMessagesClient struct {
+type seaweedMessagingPublishFollowMeClient struct {
grpc.ClientStream
}
-func (x *seaweedMessagingFollowInMemoryMessagesClient) Recv() (*FollowInMemoryMessagesResponse, error) {
- m := new(FollowInMemoryMessagesResponse)
+func (x *seaweedMessagingPublishFollowMeClient) Send(m *PublishFollowMeRequest) error {
+ return x.ClientStream.SendMsg(m)
+}
+
+func (x *seaweedMessagingPublishFollowMeClient) Recv() (*PublishFollowMeResponse, error) {
+ m := new(PublishFollowMeResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
@@ -331,8 +319,7 @@ type SeaweedMessagingServer interface {
PublishMessage(SeaweedMessaging_PublishMessageServer) error
SubscribeMessage(*SubscribeMessageRequest, SeaweedMessaging_SubscribeMessageServer) error
// The lead broker asks a follower broker to follow itself
- PublishFollowMe(context.Context, *PublishFollowMeRequest) (*PublishFollowMeResponse, error)
- FollowInMemoryMessages(*FollowInMemoryMessagesRequest, SeaweedMessaging_FollowInMemoryMessagesServer) error
+ PublishFollowMe(SeaweedMessaging_PublishFollowMeServer) error
mustEmbedUnimplementedSeaweedMessagingServer()
}
@@ -376,11 +363,8 @@ func (UnimplementedSeaweedMessagingServer) PublishMessage(SeaweedMessaging_Publi
func (UnimplementedSeaweedMessagingServer) SubscribeMessage(*SubscribeMessageRequest, SeaweedMessaging_SubscribeMessageServer) error {
return status.Errorf(codes.Unimplemented, "method SubscribeMessage not implemented")
}
-func (UnimplementedSeaweedMessagingServer) PublishFollowMe(context.Context, *PublishFollowMeRequest) (*PublishFollowMeResponse, error) {
- return nil, status.Errorf(codes.Unimplemented, "method PublishFollowMe not implemented")
-}
-func (UnimplementedSeaweedMessagingServer) FollowInMemoryMessages(*FollowInMemoryMessagesRequest, SeaweedMessaging_FollowInMemoryMessagesServer) error {
- return status.Errorf(codes.Unimplemented, "method FollowInMemoryMessages not implemented")
+func (UnimplementedSeaweedMessagingServer) PublishFollowMe(SeaweedMessaging_PublishFollowMeServer) error {
+ return status.Errorf(codes.Unimplemented, "method PublishFollowMe not implemented")
}
func (UnimplementedSeaweedMessagingServer) mustEmbedUnimplementedSeaweedMessagingServer() {}
@@ -638,45 +622,32 @@ func (x *seaweedMessagingSubscribeMessageServer) Send(m *SubscribeMessageRespons
return x.ServerStream.SendMsg(m)
}
-func _SeaweedMessaging_PublishFollowMe_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
- in := new(PublishFollowMeRequest)
- if err := dec(in); err != nil {
- return nil, err
- }
- if interceptor == nil {
- return srv.(SeaweedMessagingServer).PublishFollowMe(ctx, in)
- }
- info := &grpc.UnaryServerInfo{
- Server: srv,
- FullMethod: SeaweedMessaging_PublishFollowMe_FullMethodName,
- }
- handler := func(ctx context.Context, req interface{}) (interface{}, error) {
- return srv.(SeaweedMessagingServer).PublishFollowMe(ctx, req.(*PublishFollowMeRequest))
- }
- return interceptor(ctx, in, info, handler)
-}
-
-func _SeaweedMessaging_FollowInMemoryMessages_Handler(srv interface{}, stream grpc.ServerStream) error {
- m := new(FollowInMemoryMessagesRequest)
- if err := stream.RecvMsg(m); err != nil {
- return err
- }
- return srv.(SeaweedMessagingServer).FollowInMemoryMessages(m, &seaweedMessagingFollowInMemoryMessagesServer{stream})
+func _SeaweedMessaging_PublishFollowMe_Handler(srv interface{}, stream grpc.ServerStream) error {
+ return srv.(SeaweedMessagingServer).PublishFollowMe(&seaweedMessagingPublishFollowMeServer{stream})
}
-type SeaweedMessaging_FollowInMemoryMessagesServer interface {
- Send(*FollowInMemoryMessagesResponse) error
+type SeaweedMessaging_PublishFollowMeServer interface {
+ Send(*PublishFollowMeResponse) error
+ Recv() (*PublishFollowMeRequest, error)
grpc.ServerStream
}
-type seaweedMessagingFollowInMemoryMessagesServer struct {
+type seaweedMessagingPublishFollowMeServer struct {
grpc.ServerStream
}
-func (x *seaweedMessagingFollowInMemoryMessagesServer) Send(m *FollowInMemoryMessagesResponse) error {
+func (x *seaweedMessagingPublishFollowMeServer) Send(m *PublishFollowMeResponse) error {
return x.ServerStream.SendMsg(m)
}
+func (x *seaweedMessagingPublishFollowMeServer) Recv() (*PublishFollowMeRequest, error) {
+ m := new(PublishFollowMeRequest)
+ if err := x.ServerStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
// SeaweedMessaging_ServiceDesc is the grpc.ServiceDesc for SeaweedMessaging service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -716,10 +687,6 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{
MethodName: "CloseSubscribers",
Handler: _SeaweedMessaging_CloseSubscribers_Handler,
},
- {
- MethodName: "PublishFollowMe",
- Handler: _SeaweedMessaging_PublishFollowMe_Handler,
- },
},
Streams: []grpc.StreamDesc{
{
@@ -746,9 +713,10 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{
ServerStreams: true,
},
{
- StreamName: "FollowInMemoryMessages",
- Handler: _SeaweedMessaging_FollowInMemoryMessages_Handler,
+ StreamName: "PublishFollowMe",
+ Handler: _SeaweedMessaging_PublishFollowMe_Handler,
ServerStreams: true,
+ ClientStreams: true,
},
},
Metadata: "mq.proto",