aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.mod2
-rw-r--r--go.sum6
-rw-r--r--weed/mq/broker/brokder_grpc_admin.go107
-rw-r--r--weed/mq/broker/brokder_grpc_pub.go122
-rw-r--r--weed/mq/broker/broker_segment_serde.go14
-rw-r--r--weed/mq/broker/broker_server.go21
-rw-r--r--weed/mq/segment/message_serde_test.go5
-rw-r--r--weed/mq/topic/local_manager.go54
-rw-r--r--weed/mq/topic/local_partition.go40
-rw-r--r--weed/mq/topic/local_topic.go29
-rw-r--r--weed/mq/topic/partition.go32
-rw-r--r--weed/mq/topic/topic.go (renamed from weed/mq/topic.go)21
-rw-r--r--weed/mq/topic_allocation/allocation.go81
-rw-r--r--weed/pb/mq.proto74
-rw-r--r--weed/pb/mq_pb/mq.pb.go1383
-rw-r--r--weed/pb/mq_pb/mq_grpc.pb.go148
16 files changed, 1919 insertions, 220 deletions
diff --git a/go.mod b/go.mod
index 225e3f6c7..52a61a9c7 100644
--- a/go.mod
+++ b/go.mod
@@ -249,6 +249,8 @@ require (
github.com/ncw/swift/v2 v2.0.1 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/oracle/oci-go-sdk/v65 v65.34.0 // indirect
+ github.com/orcaman/concurrent-map v1.0.0 // indirect
+ github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pengsrc/go-shared v0.2.1-0.20190131101655-1999055a4a14 // indirect
diff --git a/go.sum b/go.sum
index 5427d286c..84925174e 100644
--- a/go.sum
+++ b/go.sum
@@ -648,6 +648,12 @@ github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/oracle/oci-go-sdk/v65 v65.34.0 h1:uG1KucBxAbn8cYRgQHxtQKogtl85nOX8LhimZCPfMqw=
github.com/oracle/oci-go-sdk/v65 v65.34.0/go.mod h1:MXMLMzHnnd9wlpgadPkdlkZ9YrwQmCOmbX5kjVEJodw=
+github.com/orcaman/concurrent-map v1.0.0 h1:I/2A2XPCb4IuQWcQhBhSwGfiuybl/J0ev9HDbW65HOY=
+github.com/orcaman/concurrent-map v1.0.0/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
+github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c=
+github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM=
+github.com/ovh/go-ovh v1.4.1/go.mod h1:6bL6pPyUT7tBfI0pqOegJgRjgjuO+mOo+MyXd1EEC0M=
+github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/brokder_grpc_admin.go
index 7e669ef9b..5aac780fb 100644
--- a/weed/mq/broker/brokder_grpc_admin.go
+++ b/weed/mq/broker/brokder_grpc_admin.go
@@ -4,7 +4,7 @@ import (
"context"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/mq"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -12,6 +12,10 @@ import (
"sync"
)
+const (
+ MaxPartitionCount = 1024
+)
+
func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
ret := &mq_pb.FindBrokerLeaderResponse{}
err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
@@ -31,21 +35,9 @@ func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *m
return ret, err
}
-func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
- ret := &mq_pb.CheckSegmentStatusResponse{}
- // TODO add in memory active segment
- return ret, nil
-}
-
-func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
- ret := &mq_pb.CheckBrokerLoadResponse{}
- // TODO read broker's load
- return ret, nil
-}
-
func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) {
ret := &mq_pb.AssignSegmentBrokersResponse{}
- segment := mq.FromPbSegment(request.Segment)
+ segment := topic.FromPbSegment(request.Segment)
// check existing segment locations on filer
existingBrokers, err := broker.checkSegmentOnFiler(segment)
@@ -84,7 +76,92 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques
return ret, nil
}
-func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) {
+func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
+ ret := &mq_pb.CheckSegmentStatusResponse{}
+ // TODO add in memory active segment
+ return ret, nil
+}
+
+func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
+ ret := &mq_pb.CheckBrokerLoadResponse{}
+ // TODO read broker's load
+ return ret, nil
+}
+
+// FindTopicBrokers returns the brokers that are serving the topic
+//
+// 1. lock the topic
+//
+// 2. find the topic partitions on the filer
+// 2.1 if the topic is not found, return error
+// 2.2 if the request is_for_publish, create the topic
+// 2.2.1 if the request is_for_subscribe, return error not found
+// 2.2.2 if the request is_for_publish, create the topic
+// 2.2 if the topic is found, return the brokers
+//
+// 3. unlock the topic
+func (broker *MessageQueueBroker) FindTopicBrokers(c context.Context, request *mq_pb.FindTopicBrokersRequest) (*mq_pb.FindTopicBrokersResponse, error) {
+ ret := &mq_pb.FindTopicBrokersResponse{}
+ // lock the topic
+
+ // find the topic partitions on the filer
+ // if the topic is not found
+ // if the request is_for_publish
+ // create the topic
+ // if the request is_for_subscribe
+ // return error not found
+ return ret, nil
+}
+
+// CheckTopicPartitionsStatus check the topic partitions on the broker
+func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) {
+ ret := &mq_pb.CheckTopicPartitionsStatusResponse{}
+ return ret, nil
+}
+
+// createOrUpdateTopicPartitions creates the topic partitions on the broker
+// 1. check
+func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment) (err error) {
+ // create or update each partition
+ if prevAssignment == nil {
+ broker.createOrUpdateTopicPartition(topic, nil)
+ } else {
+ for _, partitionAssignment := range prevAssignment.BrokerPartitions {
+ broker.createOrUpdateTopicPartition(topic, partitionAssignment)
+ }
+ }
+ return nil
+}
+
+func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (newAssignment *mq_pb.BrokerPartitionsAssignment) {
+ shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment)
+ if !shouldCreate {
+
+ }
+ return
+}
+func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (shouldCreate bool) {
+ if oldAssignment == nil {
+ return true
+ }
+ for _, b := range oldAssignment.FollowerBrokers {
+ pb.WithBrokerClient(false, pb.ServerAddress(b), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{
+ Namespace: string(topic.Namespace),
+ Topic: topic.Name,
+ BrokerPartitionsAssignment: oldAssignment,
+ ShouldCancelIfNotMatch: true,
+ })
+ if err != nil {
+ shouldCreate = true
+ }
+ return nil
+ })
+ }
+ return
+}
+
+func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *topic.Segment, brokers []pb.ServerAddress) (active bool, err error) {
var wg sync.WaitGroup
for _, candidate := range brokers {
diff --git a/weed/mq/broker/brokder_grpc_pub.go b/weed/mq/broker/brokder_grpc_pub.go
index a26be5171..58ab6e5d2 100644
--- a/weed/mq/broker/brokder_grpc_pub.go
+++ b/weed/mq/broker/brokder_grpc_pub.go
@@ -1,16 +1,136 @@
package broker
import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
+// For a new or re-configured topic, or one of the broker went offline,
+// the pub clients ask one broker what are the brokers for all the topic partitions.
+// The broker will lock the topic on write.
+// 1. if the topic is not found, create the topic, and allocate the topic partitions to the brokers
+// 2. if the topic is found, return the brokers for the topic partitions
+// For a topic to read from, the sub clients ask one broker what are the brokers for all the topic partitions.
+// The broker will lock the topic on read.
+// 1. if the topic is not found, return error
+// 2. if the topic is found, return the brokers for the topic partitions
+//
+// If the topic needs to be re-balanced, the admin client will lock the topic,
+// 1. collect throughput information for all the brokers
+// 2. adjust the topic partitions to the brokers
+// 3. notify the brokers to add/remove partitions to host
+// 3.1 When locking the topic, the partitions and brokers should be remembered in the lock.
+// 4. the brokers will stop process incoming messages if not the right partition
+// 4.1 the pub clients will need to re-partition the messages and publish to the right brokers for the partition3
+// 4.2 the sub clients will need to change the brokers to read from
+//
+// The following is from each individual component's perspective:
+// For a pub client
+// For current topic/partition, ask one broker for the brokers for the topic partitions
+// 1. connect to the brokers and keep sending, until the broker returns error, or the broker leader is moved.
+// For a sub client
+// For current topic/partition, ask one broker for the brokers for the topic partitions
+// 1. connect to the brokers and keep reading, until the broker returns error, or the broker leader is moved.
+// For a broker
+// Upon a pub client lookup:
+// 1. lock the topic
+// 2. if already has topic partition assignment, check all brokers are healthy
+// 3. if not, create topic partition assignment
+// 2. return the brokers for the topic partitions
+// 3. unlock the topic
+// Upon a sub client lookup:
+// 1. lock the topic
+// 2. if already has topic partition assignment, check all brokers are healthy
+// 3. if not, return error
+// 2. return the brokers for the topic partitions
+// 3. unlock the topic
+// For an admin tool
+// 0. collect stats from all the brokers, and find the topic worth moving
+// 1. lock the topic
+// 2. collect throughput information for all the brokers
+// 3. adjust the topic partitions to the brokers
+// 4. notify the brokers to add/remove partitions to host
+// 5. the brokers will stop process incoming messages if not the right partition
+// 6. unlock the topic
+
/*
-The messages is buffered in memory, and saved to filer under
+The messages are buffered in memory, and saved to filer under
/topics/<topic>/<date>/<hour>/<segment>/*.msg
/topics/<topic>/<date>/<hour>/segment
/topics/<topic>/info/segment_<id>.meta
+
+
+
*/
func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
+ // 1. write to the volume server
+ // 2. find the topic metadata owning filer
+ // 3. write to the filer
+
+ var localTopicPartition *topic.LocalPartition
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+
+ // Process the received message
+ sequence := req.GetSequence()
+ response := &mq_pb.PublishResponse{
+ AckSequence: sequence,
+ }
+ if dataMessage := req.GetData(); dataMessage != nil {
+ if localTopicPartition == nil {
+ response.Error = "topic partition not initialized"
+ glog.Errorf("topic partition not found")
+ } else {
+ localTopicPartition.Publish(dataMessage)
+ }
+ } else if initMessage := req.GetInit(); initMessage != nil {
+ localTopicPartition = broker.localTopicManager.GetTopicPartition(
+ topic.NewTopic(topic.Namespace(initMessage.Segment.Namespace), initMessage.Segment.Topic),
+ topic.FromPbPartition(initMessage.Segment.Partition),
+ )
+ if localTopicPartition == nil {
+ response.Error = fmt.Sprintf("topic partition %v not found", initMessage.Segment)
+ glog.Errorf("topic partition %v not found", initMessage.Segment)
+ }
+ }
+ if err := stream.Send(response); err != nil {
+ glog.Errorf("Error sending setup response: %v", err)
+ }
+ }
+
return nil
}
+
+// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
+func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
+ ret := &mq_pb.AssignTopicPartitionsResponse{}
+ self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port))
+
+ for _, partition := range request.TopicPartitionsAssignment.BrokerPartitions {
+ localPartiton := topic.FromPbBrokerPartitionsAssignment(self, partition)
+ broker.localTopicManager.AddTopicPartition(
+ topic.FromPbTopic(request.Topic),
+ localPartiton)
+ if request.IsLeader {
+ for _, follower := range localPartiton.FollowerBrokers {
+ err := pb.WithBrokerClient(false, follower, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.AssignTopicPartitions(context.Background(), request)
+ return err
+ })
+ if err != nil {
+ return ret, err
+ }
+ }
+ }
+ }
+
+ return ret, nil
+}
diff --git a/weed/mq/broker/broker_segment_serde.go b/weed/mq/broker/broker_segment_serde.go
index e36867da0..bb9aecc0b 100644
--- a/weed/mq/broker/broker_segment_serde.go
+++ b/weed/mq/broker/broker_segment_serde.go
@@ -4,7 +4,7 @@ import (
"bytes"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/mq"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -12,8 +12,8 @@ import (
"time"
)
-func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) {
- info, found, err := broker.readSegmentOnFiler(segment)
+func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *topic.Segment) (brokers []pb.ServerAddress, err error) {
+ info, found, err := broker.readSegmentInfoOnFiler(segment)
if err != nil {
return
}
@@ -27,12 +27,12 @@ func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brok
return
}
-func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) {
+func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *topic.Segment, brokers []pb.ServerAddress) (err error) {
var nodes []string
for _, b := range brokers {
nodes = append(nodes, string(b))
}
- broker.saveSegmentToFiler(segment, &mq_pb.SegmentInfo{
+ broker.saveSegmentInfoToFiler(segment, &mq_pb.SegmentInfo{
Segment: segment.ToPbSegment(),
StartTsNs: time.Now().UnixNano(),
Brokers: nodes,
@@ -43,7 +43,7 @@ func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *mq.Segment,
return
}
-func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info *mq_pb.SegmentInfo, found bool, err error) {
+func (broker *MessageQueueBroker) readSegmentInfoOnFiler(segment *topic.Segment) (info *mq_pb.SegmentInfo, found bool, err error) {
dir, name := segment.DirAndName()
found, err = filer_pb.Exists(broker, dir, name, false)
@@ -70,7 +70,7 @@ func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info
return
}
-func (broker *MessageQueueBroker) saveSegmentToFiler(segment *mq.Segment, info *mq_pb.SegmentInfo) (err error) {
+func (broker *MessageQueueBroker) saveSegmentInfoToFiler(segment *topic.Segment, info *mq_pb.SegmentInfo) (err error) {
dir, name := segment.DirAndName()
var buf bytes.Buffer
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 7ec7fb431..4f5b3c28d 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -1,6 +1,7 @@
package broker
import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
@@ -27,20 +28,22 @@ type MessageQueueBrokerOption struct {
type MessageQueueBroker struct {
mq_pb.UnimplementedSeaweedMessagingServer
- option *MessageQueueBrokerOption
- grpcDialOption grpc.DialOption
- MasterClient *wdclient.MasterClient
- filers map[pb.ServerAddress]struct{}
- currentFiler pb.ServerAddress
+ option *MessageQueueBrokerOption
+ grpcDialOption grpc.DialOption
+ MasterClient *wdclient.MasterClient
+ filers map[pb.ServerAddress]struct{}
+ currentFiler pb.ServerAddress
+ localTopicManager *topic.LocalTopicManager
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
mqBroker = &MessageQueueBroker{
- option: option,
- grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
- filers: make(map[pb.ServerAddress]struct{}),
+ option: option,
+ grpcDialOption: grpcDialOption,
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
+ filers: make(map[pb.ServerAddress]struct{}),
+ localTopicManager: topic.NewLocalTopicManager(),
}
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go
index c65bffb84..a54ce5708 100644
--- a/weed/mq/segment/message_serde_test.go
+++ b/weed/mq/segment/message_serde_test.go
@@ -17,6 +17,7 @@ func TestMessageSerde(t *testing.T) {
bb := NewMessageBatchBuilder(b, 1, 2, 3, 4)
bb.AddMessage(5, 6, prop, []byte("the primary key"), []byte("body is here"))
+ bb.AddMessage(5, 7, prop, []byte("the primary 2"), []byte("body is 2"))
bb.BuildMessageBatch()
@@ -33,9 +34,9 @@ func TestMessageSerde(t *testing.T) {
assert.Equal(t, int64(5), mb.SegmentSeqBase())
assert.Equal(t, int32(0), mb.SegmentSeqMaxDelta())
assert.Equal(t, int64(6), mb.TsMsBase())
- assert.Equal(t, int32(0), mb.TsMsMaxDelta())
+ assert.Equal(t, int32(1), mb.TsMsMaxDelta())
- assert.Equal(t, 1, mb.MessagesLength())
+ assert.Equal(t, 2, mb.MessagesLength())
m := &message_fbs.Message{}
mb.Messages(m, 0)
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
new file mode 100644
index 000000000..168e3d561
--- /dev/null
+++ b/weed/mq/topic/local_manager.go
@@ -0,0 +1,54 @@
+package topic
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+)
+
+// LocalTopicManager manages topics on local broker
+type LocalTopicManager struct {
+ topics cmap.ConcurrentMap[string, *LocalTopic]
+}
+
+// NewLocalTopicManager creates a new LocalTopicManager
+func NewLocalTopicManager() *LocalTopicManager {
+ return &LocalTopicManager{
+ topics: cmap.New[*LocalTopic](),
+ }
+}
+
+// AddTopic adds a topic to the local topic manager
+func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) {
+ localTopic, ok := manager.topics.Get(topic.String())
+ if !ok {
+ localTopic = &LocalTopic{
+ Topic: topic,
+ Partitions: make([]*LocalPartition, 0),
+ }
+ }
+ if localTopic.findPartition(localPartition.Partition) != nil {
+ return
+ }
+ localTopic.Partitions = append(localTopic.Partitions, localPartition)
+}
+
+// GetTopic gets a topic from the local topic manager
+func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition {
+ localTopic, ok := manager.topics.Get(topic.String())
+ if !ok {
+ return nil
+ }
+ return localTopic.findPartition(partition)
+}
+
+// RemoveTopic removes a topic from the local topic manager
+func (manager *LocalTopicManager) RemoveTopic(topic Topic) {
+ manager.topics.Remove(topic.String())
+}
+
+func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) {
+ localTopic, ok := manager.topics.Get(topic.String())
+ if !ok {
+ return false
+ }
+ return localTopic.removePartition(partition)
+}
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
new file mode 100644
index 000000000..e26b7afd1
--- /dev/null
+++ b/weed/mq/topic/local_partition.go
@@ -0,0 +1,40 @@
+package topic
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "time"
+)
+
+type LocalPartition struct {
+ Partition
+ isLeader bool
+ FollowerBrokers []pb.ServerAddress
+ logBuffer *log_buffer.LogBuffer
+}
+
+func (p LocalPartition) Publish(message *mq_pb.PublishRequest_DataMessage) {
+ p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
+}
+
+func FromPbBrokerPartitionsAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionsAssignment) *LocalPartition {
+ isLeaer := assignment.LeaderBroker == string(self)
+ localPartition := &LocalPartition{
+ Partition: Partition{
+ RangeStart: assignment.PartitionStart,
+ RangeStop: assignment.PartitionStop,
+ RingSize: PartitionCount,
+ },
+ isLeader: isLeaer,
+ }
+ if !isLeaer {
+ return localPartition
+ }
+ followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers))
+ for i, follower := range assignment.FollowerBrokers {
+ followers[i] = pb.ServerAddress(follower)
+ }
+ localPartition.FollowerBrokers = followers
+ return localPartition
+}
diff --git a/weed/mq/topic/local_topic.go b/weed/mq/topic/local_topic.go
new file mode 100644
index 000000000..ef3c0e65e
--- /dev/null
+++ b/weed/mq/topic/local_topic.go
@@ -0,0 +1,29 @@
+package topic
+
+type LocalTopic struct {
+ Topic
+ Partitions []*LocalPartition
+}
+
+func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition {
+ for _, localPartition := range localTopic.Partitions {
+ if localPartition.Partition.Equals(partition) {
+ return localPartition
+ }
+ }
+ return nil
+}
+func (localTopic *LocalTopic) removePartition(partition Partition) bool {
+ foundPartitionIndex := -1
+ for i, localPartition := range localTopic.Partitions {
+ if localPartition.Partition.Equals(partition) {
+ foundPartitionIndex = i
+ break
+ }
+ }
+ if foundPartitionIndex == -1 {
+ return false
+ }
+ localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...)
+ return true
+}
diff --git a/weed/mq/topic/partition.go b/weed/mq/topic/partition.go
new file mode 100644
index 000000000..285bdcb36
--- /dev/null
+++ b/weed/mq/topic/partition.go
@@ -0,0 +1,32 @@
+package topic
+
+import "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+
+const PartitionCount = 4096
+
+type Partition struct {
+ RangeStart int32
+ RangeStop int32 // exclusive
+ RingSize int32
+}
+
+func (partition Partition) Equals(other Partition) bool {
+ if partition.RangeStart != other.RangeStart {
+ return false
+ }
+ if partition.RangeStop != other.RangeStop {
+ return false
+ }
+ if partition.RingSize != other.RingSize {
+ return false
+ }
+ return true
+}
+
+func FromPbPartition(partition *mq_pb.Partition) Partition {
+ return Partition{
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
+ RingSize: partition.RingSize,
+ }
+}
diff --git a/weed/mq/topic.go b/weed/mq/topic/topic.go
index 96544bac9..430999179 100644
--- a/weed/mq/topic.go
+++ b/weed/mq/topic/topic.go
@@ -1,4 +1,4 @@
-package mq
+package topic
import (
"fmt"
@@ -14,10 +14,21 @@ type Topic struct {
Name string
}
-type Partition struct {
- RangeStart int32
- RangeStop int32 // exclusive
- RingSize int32
+func NewTopic(namespace Namespace, name string) Topic {
+ return Topic{
+ Namespace: namespace,
+ Name: name,
+ }
+}
+func FromPbTopic(topic *mq_pb.Topic) Topic {
+ return Topic{
+ Namespace: Namespace(topic.Namespace),
+ Name: topic.Name,
+ }
+}
+
+func (tp Topic) String() string {
+ return fmt.Sprintf("%s.%s", tp.Namespace, tp.Name)
}
type Segment struct {
diff --git a/weed/mq/topic_allocation/allocation.go b/weed/mq/topic_allocation/allocation.go
new file mode 100644
index 000000000..a07ce4884
--- /dev/null
+++ b/weed/mq/topic_allocation/allocation.go
@@ -0,0 +1,81 @@
+package topic_allocation
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "modernc.org/mathutil"
+)
+
+const (
+ DefaultBrokerCount = 4
+)
+
+// AllocateBrokersForTopicPartitions allocate brokers for a topic's all partitions
+func AllocateBrokersForTopicPartitions(t topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment, candidateBrokers []pb.ServerAddress) (assignment *mq_pb.TopicPartitionsAssignment, err error) {
+ // create a previous assignment if not exists
+ if prevAssignment == nil || len(prevAssignment.BrokerPartitions) == 0 {
+ prevAssignment = &mq_pb.TopicPartitionsAssignment{
+ PartitionCount: topic.PartitionCount,
+ }
+ partitionCountForEachBroker := topic.PartitionCount / DefaultBrokerCount
+ for i := 0; i < DefaultBrokerCount; i++ {
+ prevAssignment.BrokerPartitions = append(prevAssignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
+ PartitionStart: int32(i * partitionCountForEachBroker),
+ PartitionStop: mathutil.MaxInt32(int32((i+1)*partitionCountForEachBroker), topic.PartitionCount),
+ })
+ }
+ }
+
+ // create a new assignment
+ assignment = &mq_pb.TopicPartitionsAssignment{
+ PartitionCount: prevAssignment.PartitionCount,
+ }
+
+ // allocate partitions for each partition range
+ for _, brokerPartition := range prevAssignment.BrokerPartitions {
+ // allocate partitions for each partition range
+ leader, followers, err := allocateBrokersForOneTopicPartition(t, brokerPartition, candidateBrokers)
+ if err != nil {
+ return nil, err
+ }
+
+ followerBrokers := make([]string, len(followers))
+ for i, follower := range followers {
+ followerBrokers[i] = string(follower)
+ }
+
+ assignment.BrokerPartitions = append(assignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
+ PartitionStart: brokerPartition.PartitionStart,
+ PartitionStop: brokerPartition.PartitionStop,
+ LeaderBroker: string(leader),
+ FollowerBrokers: followerBrokers,
+ })
+ }
+
+ return
+}
+
+func allocateBrokersForOneTopicPartition(t topic.Topic, brokerPartition *mq_pb.BrokerPartitionsAssignment, candidateBrokers []pb.ServerAddress) (leader pb.ServerAddress, followers []pb.ServerAddress, err error) {
+ // allocate leader
+ leader, err = allocateLeaderForOneTopicPartition(t, brokerPartition, candidateBrokers)
+ if err != nil {
+ return
+ }
+
+ // allocate followers
+ followers, err = allocateFollowersForOneTopicPartition(t, brokerPartition, candidateBrokers)
+ if err != nil {
+ return
+ }
+
+ return
+}
+
+func allocateFollowersForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (followers []pb.ServerAddress, err error) {
+ return
+}
+
+func allocateLeaderForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (leader pb.ServerAddress, err error) {
+ return
+}
diff --git a/weed/pb/mq.proto b/weed/pb/mq.proto
index bb53f635e..47440a46e 100644
--- a/weed/pb/mq.proto
+++ b/weed/pb/mq.proto
@@ -20,6 +20,17 @@ service SeaweedMessaging {
rpc CheckBrokerLoad (CheckBrokerLoadRequest) returns (CheckBrokerLoadResponse) {
}
+ // control plane for topic partitions
+ rpc FindTopicBrokers (FindTopicBrokersRequest) returns (FindTopicBrokersResponse) {
+ }
+ // a pub client will call this to get the topic partitions assignment
+ rpc RequestTopicPartitions (RequestTopicPartitionsRequest) returns (RequestTopicPartitionsResponse) {
+ }
+ rpc AssignTopicPartitions (AssignTopicPartitionsRequest) returns (AssignTopicPartitionsResponse) {
+ }
+ rpc CheckTopicPartitionsStatus (CheckTopicPartitionsStatusRequest) returns (CheckTopicPartitionsStatusResponse) {
+ }
+
// data plane
rpc Publish (stream PublishRequest) returns (stream PublishResponse) {
}
@@ -45,6 +56,10 @@ message FindBrokerLeaderResponse {
string broker = 1;
}
+message Topic {
+ string namespace = 1;
+ string name = 2;
+}
message Partition {
int32 ring_size = 1;
int32 range_start = 2;
@@ -83,15 +98,68 @@ message CheckBrokerLoadResponse {
}
+message FindTopicBrokersRequest {
+ Topic topic = 1;
+ bool is_for_publish = 2;
+}
+message FindTopicBrokersResponse {
+ Topic topic = 1;
+ TopicPartitionsAssignment topic_partitions_assignment = 2;
+}
+message BrokerPartitionsAssignment {
+ int32 partition_start = 1;
+ int32 partition_stop = 2;
+ string leader_broker = 3;
+ repeated string follower_brokers = 4;
+}
+message TopicPartitionsAssignment {
+ int32 partition_count = 1; // over-sharded partitions, usually 1024
+ repeated BrokerPartitionsAssignment broker_partitions = 2;
+}
+
+message RequestTopicPartitionsRequest {
+ Topic topic = 1;
+ int32 partition_count = 2;
+}
+message RequestTopicPartitionsResponse {
+ TopicPartitionsAssignment topic_partitions_assignment = 1;
+}
+
+message AssignTopicPartitionsRequest {
+ Topic topic = 1;
+ TopicPartitionsAssignment topic_partitions_assignment = 2;
+ bool is_leader = 3;
+}
+message AssignTopicPartitionsResponse {
+}
+
+message CheckTopicPartitionsStatusRequest {
+ string namespace = 1;
+ string topic = 2;
+ BrokerPartitionsAssignment broker_partitions_assignment = 3;
+ bool should_cancel_if_not_match = 4;
+}
+message CheckTopicPartitionsStatusResponse {
+ TopicPartitionsAssignment topic_partitions_assignment = 1;
+}
+
//////////////////////////////////////////////////
message PublishRequest {
message InitMessage {
Segment segment = 1;
}
- InitMessage init = 1;
- bytes message = 2;
+ message DataMessage {
+ bytes key = 1;
+ bytes value = 2;
+ }
+ oneof message {
+ InitMessage init = 1;
+ DataMessage data = 2;
+ }
+ int64 sequence = 3;
}
message PublishResponse {
int64 ack_sequence = 1;
- bool is_closed = 2;
+ string error = 2;
+ bool is_closed = 3;
}
diff --git a/weed/pb/mq_pb/mq.pb.go b/weed/pb/mq_pb/mq.pb.go
index f1da81b9e..818acc111 100644
--- a/weed/pb/mq_pb/mq.pb.go
+++ b/weed/pb/mq_pb/mq.pb.go
@@ -202,6 +202,61 @@ func (x *FindBrokerLeaderResponse) GetBroker() string {
return ""
}
+type Topic struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"`
+ Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
+}
+
+func (x *Topic) Reset() {
+ *x = Topic{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_mq_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Topic) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Topic) ProtoMessage() {}
+
+func (x *Topic) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Topic.ProtoReflect.Descriptor instead.
+func (*Topic) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *Topic) GetNamespace() string {
+ if x != nil {
+ return x.Namespace
+ }
+ return ""
+}
+
+func (x *Topic) GetName() string {
+ if x != nil {
+ return x.Name
+ }
+ return ""
+}
+
type Partition struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -215,7 +270,7 @@ type Partition struct {
func (x *Partition) Reset() {
*x = Partition{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[3]
+ mi := &file_mq_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -228,7 +283,7 @@ func (x *Partition) String() string {
func (*Partition) ProtoMessage() {}
func (x *Partition) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[3]
+ mi := &file_mq_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -241,7 +296,7 @@ func (x *Partition) ProtoReflect() protoreflect.Message {
// Deprecated: Use Partition.ProtoReflect.Descriptor instead.
func (*Partition) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{3}
+ return file_mq_proto_rawDescGZIP(), []int{4}
}
func (x *Partition) GetRingSize() int32 {
@@ -279,7 +334,7 @@ type Segment struct {
func (x *Segment) Reset() {
*x = Segment{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[4]
+ mi := &file_mq_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -292,7 +347,7 @@ func (x *Segment) String() string {
func (*Segment) ProtoMessage() {}
func (x *Segment) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[4]
+ mi := &file_mq_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -305,7 +360,7 @@ func (x *Segment) ProtoReflect() protoreflect.Message {
// Deprecated: Use Segment.ProtoReflect.Descriptor instead.
func (*Segment) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{4}
+ return file_mq_proto_rawDescGZIP(), []int{5}
}
func (x *Segment) GetNamespace() string {
@@ -347,7 +402,7 @@ type AssignSegmentBrokersRequest struct {
func (x *AssignSegmentBrokersRequest) Reset() {
*x = AssignSegmentBrokersRequest{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[5]
+ mi := &file_mq_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -360,7 +415,7 @@ func (x *AssignSegmentBrokersRequest) String() string {
func (*AssignSegmentBrokersRequest) ProtoMessage() {}
func (x *AssignSegmentBrokersRequest) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[5]
+ mi := &file_mq_proto_msgTypes[6]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -373,7 +428,7 @@ func (x *AssignSegmentBrokersRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use AssignSegmentBrokersRequest.ProtoReflect.Descriptor instead.
func (*AssignSegmentBrokersRequest) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{5}
+ return file_mq_proto_rawDescGZIP(), []int{6}
}
func (x *AssignSegmentBrokersRequest) GetSegment() *Segment {
@@ -394,7 +449,7 @@ type AssignSegmentBrokersResponse struct {
func (x *AssignSegmentBrokersResponse) Reset() {
*x = AssignSegmentBrokersResponse{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[6]
+ mi := &file_mq_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -407,7 +462,7 @@ func (x *AssignSegmentBrokersResponse) String() string {
func (*AssignSegmentBrokersResponse) ProtoMessage() {}
func (x *AssignSegmentBrokersResponse) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[6]
+ mi := &file_mq_proto_msgTypes[7]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -420,7 +475,7 @@ func (x *AssignSegmentBrokersResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use AssignSegmentBrokersResponse.ProtoReflect.Descriptor instead.
func (*AssignSegmentBrokersResponse) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{6}
+ return file_mq_proto_rawDescGZIP(), []int{7}
}
func (x *AssignSegmentBrokersResponse) GetBrokers() []string {
@@ -441,7 +496,7 @@ type CheckSegmentStatusRequest struct {
func (x *CheckSegmentStatusRequest) Reset() {
*x = CheckSegmentStatusRequest{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[7]
+ mi := &file_mq_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -454,7 +509,7 @@ func (x *CheckSegmentStatusRequest) String() string {
func (*CheckSegmentStatusRequest) ProtoMessage() {}
func (x *CheckSegmentStatusRequest) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[7]
+ mi := &file_mq_proto_msgTypes[8]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -467,7 +522,7 @@ func (x *CheckSegmentStatusRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use CheckSegmentStatusRequest.ProtoReflect.Descriptor instead.
func (*CheckSegmentStatusRequest) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{7}
+ return file_mq_proto_rawDescGZIP(), []int{8}
}
func (x *CheckSegmentStatusRequest) GetSegment() *Segment {
@@ -488,7 +543,7 @@ type CheckSegmentStatusResponse struct {
func (x *CheckSegmentStatusResponse) Reset() {
*x = CheckSegmentStatusResponse{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[8]
+ mi := &file_mq_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -501,7 +556,7 @@ func (x *CheckSegmentStatusResponse) String() string {
func (*CheckSegmentStatusResponse) ProtoMessage() {}
func (x *CheckSegmentStatusResponse) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[8]
+ mi := &file_mq_proto_msgTypes[9]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -514,7 +569,7 @@ func (x *CheckSegmentStatusResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use CheckSegmentStatusResponse.ProtoReflect.Descriptor instead.
func (*CheckSegmentStatusResponse) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{8}
+ return file_mq_proto_rawDescGZIP(), []int{9}
}
func (x *CheckSegmentStatusResponse) GetIsActive() bool {
@@ -533,7 +588,7 @@ type CheckBrokerLoadRequest struct {
func (x *CheckBrokerLoadRequest) Reset() {
*x = CheckBrokerLoadRequest{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[9]
+ mi := &file_mq_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -546,7 +601,7 @@ func (x *CheckBrokerLoadRequest) String() string {
func (*CheckBrokerLoadRequest) ProtoMessage() {}
func (x *CheckBrokerLoadRequest) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[9]
+ mi := &file_mq_proto_msgTypes[10]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -559,7 +614,7 @@ func (x *CheckBrokerLoadRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use CheckBrokerLoadRequest.ProtoReflect.Descriptor instead.
func (*CheckBrokerLoadRequest) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{9}
+ return file_mq_proto_rawDescGZIP(), []int{10}
}
type CheckBrokerLoadResponse struct {
@@ -574,7 +629,7 @@ type CheckBrokerLoadResponse struct {
func (x *CheckBrokerLoadResponse) Reset() {
*x = CheckBrokerLoadResponse{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[10]
+ mi := &file_mq_proto_msgTypes[11]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -587,7 +642,7 @@ func (x *CheckBrokerLoadResponse) String() string {
func (*CheckBrokerLoadResponse) ProtoMessage() {}
func (x *CheckBrokerLoadResponse) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[10]
+ mi := &file_mq_proto_msgTypes[11]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -600,7 +655,7 @@ func (x *CheckBrokerLoadResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use CheckBrokerLoadResponse.ProtoReflect.Descriptor instead.
func (*CheckBrokerLoadResponse) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{10}
+ return file_mq_proto_rawDescGZIP(), []int{11}
}
func (x *CheckBrokerLoadResponse) GetMessageCount() int64 {
@@ -617,20 +672,581 @@ func (x *CheckBrokerLoadResponse) GetBytesCount() int64 {
return 0
}
+type FindTopicBrokersRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
+ IsForPublish bool `protobuf:"varint,2,opt,name=is_for_publish,json=isForPublish,proto3" json:"is_for_publish,omitempty"`
+}
+
+func (x *FindTopicBrokersRequest) Reset() {
+ *x = FindTopicBrokersRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_mq_proto_msgTypes[12]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FindTopicBrokersRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FindTopicBrokersRequest) ProtoMessage() {}
+
+func (x *FindTopicBrokersRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[12]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FindTopicBrokersRequest.ProtoReflect.Descriptor instead.
+func (*FindTopicBrokersRequest) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{12}
+}
+
+func (x *FindTopicBrokersRequest) GetTopic() *Topic {
+ if x != nil {
+ return x.Topic
+ }
+ return nil
+}
+
+func (x *FindTopicBrokersRequest) GetIsForPublish() bool {
+ if x != nil {
+ return x.IsForPublish
+ }
+ return false
+}
+
+type FindTopicBrokersResponse struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
+ TopicPartitionsAssignment *TopicPartitionsAssignment `protobuf:"bytes,2,opt,name=topic_partitions_assignment,json=topicPartitionsAssignment,proto3" json:"topic_partitions_assignment,omitempty"`
+}
+
+func (x *FindTopicBrokersResponse) Reset() {
+ *x = FindTopicBrokersResponse{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_mq_proto_msgTypes[13]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *FindTopicBrokersResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FindTopicBrokersResponse) ProtoMessage() {}
+
+func (x *FindTopicBrokersResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[13]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FindTopicBrokersResponse.ProtoReflect.Descriptor instead.
+func (*FindTopicBrokersResponse) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{13}
+}
+
+func (x *FindTopicBrokersResponse) GetTopic() *Topic {
+ if x != nil {
+ return x.Topic
+ }
+ return nil
+}
+
+func (x *FindTopicBrokersResponse) GetTopicPartitionsAssignment() *TopicPartitionsAssignment {
+ if x != nil {
+ return x.TopicPartitionsAssignment
+ }
+ return nil
+}
+
+type BrokerPartitionsAssignment struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ PartitionStart int32 `protobuf:"varint,1,opt,name=partition_start,json=partitionStart,proto3" json:"partition_start,omitempty"`
+ PartitionStop int32 `protobuf:"varint,2,opt,name=partition_stop,json=partitionStop,proto3" json:"partition_stop,omitempty"`
+ LeaderBroker string `protobuf:"bytes,3,opt,name=leader_broker,json=leaderBroker,proto3" json:"leader_broker,omitempty"`
+ FollowerBrokers []string `protobuf:"bytes,4,rep,name=follower_brokers,json=followerBrokers,proto3" json:"follower_brokers,omitempty"`
+}
+
+func (x *BrokerPartitionsAssignment) Reset() {
+ *x = BrokerPartitionsAssignment{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_mq_proto_msgTypes[14]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *BrokerPartitionsAssignment) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*BrokerPartitionsAssignment) ProtoMessage() {}
+
+func (x *BrokerPartitionsAssignment) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[14]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use BrokerPartitionsAssignment.ProtoReflect.Descriptor instead.
+func (*BrokerPartitionsAssignment) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{14}
+}
+
+func (x *BrokerPartitionsAssignment) GetPartitionStart() int32 {
+ if x != nil {
+ return x.PartitionStart
+ }
+ return 0
+}
+
+func (x *BrokerPartitionsAssignment) GetPartitionStop() int32 {
+ if x != nil {
+ return x.PartitionStop
+ }
+ return 0
+}
+
+func (x *BrokerPartitionsAssignment) GetLeaderBroker() string {
+ if x != nil {
+ return x.LeaderBroker
+ }
+ return ""
+}
+
+func (x *BrokerPartitionsAssignment) GetFollowerBrokers() []string {
+ if x != nil {
+ return x.FollowerBrokers
+ }
+ return nil
+}
+
+type TopicPartitionsAssignment struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ PartitionCount int32 `protobuf:"varint,1,opt,name=partition_count,json=partitionCount,proto3" json:"partition_count,omitempty"` // over-sharded partitions, usually 1024
+ BrokerPartitions []*BrokerPartitionsAssignment `protobuf:"bytes,2,rep,name=broker_partitions,json=brokerPartitions,proto3" json:"broker_partitions,omitempty"`
+}
+
+func (x *TopicPartitionsAssignment) Reset() {
+ *x = TopicPartitionsAssignment{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_mq_proto_msgTypes[15]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *TopicPartitionsAssignment) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*TopicPartitionsAssignment) ProtoMessage() {}
+
+func (x *TopicPartitionsAssignment) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[15]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use TopicPartitionsAssignment.ProtoReflect.Descriptor instead.
+func (*TopicPartitionsAssignment) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{15}
+}
+
+func (x *TopicPartitionsAssignment) GetPartitionCount() int32 {
+ if x != nil {
+ return x.PartitionCount
+ }
+ return 0
+}
+
+func (x *TopicPartitionsAssignment) GetBrokerPartitions() []*BrokerPartitionsAssignment {
+ if x != nil {
+ return x.BrokerPartitions
+ }
+ return nil
+}
+
+type RequestTopicPartitionsRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
+ PartitionCount int32 `protobuf:"varint,2,opt,name=partition_count,json=partitionCount,proto3" json:"partition_count,omitempty"`
+}
+
+func (x *RequestTopicPartitionsRequest) Reset() {
+ *x = RequestTopicPartitionsRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_mq_proto_msgTypes[16]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *RequestTopicPartitionsRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*RequestTopicPartitionsRequest) ProtoMessage() {}
+
+func (x *RequestTopicPartitionsRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[16]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use RequestTopicPartitionsRequest.ProtoReflect.Descriptor instead.
+func (*RequestTopicPartitionsRequest) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{16}
+}
+
+func (x *RequestTopicPartitionsRequest) GetTopic() *Topic {
+ if x != nil {
+ return x.Topic
+ }
+ return nil
+}
+
+func (x *RequestTopicPartitionsRequest) GetPartitionCount() int32 {
+ if x != nil {
+ return x.PartitionCount
+ }
+ return 0
+}
+
+type RequestTopicPartitionsResponse struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ TopicPartitionsAssignment *TopicPartitionsAssignment `protobuf:"bytes,1,opt,name=topic_partitions_assignment,json=topicPartitionsAssignment,proto3" json:"topic_partitions_assignment,omitempty"`
+}
+
+func (x *RequestTopicPartitionsResponse) Reset() {
+ *x = RequestTopicPartitionsResponse{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_mq_proto_msgTypes[17]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *RequestTopicPartitionsResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*RequestTopicPartitionsResponse) ProtoMessage() {}
+
+func (x *RequestTopicPartitionsResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[17]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use RequestTopicPartitionsResponse.ProtoReflect.Descriptor instead.
+func (*RequestTopicPartitionsResponse) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{17}
+}
+
+func (x *RequestTopicPartitionsResponse) GetTopicPartitionsAssignment() *TopicPartitionsAssignment {
+ if x != nil {
+ return x.TopicPartitionsAssignment
+ }
+ return nil
+}
+
+type AssignTopicPartitionsRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
+ TopicPartitionsAssignment *TopicPartitionsAssignment `protobuf:"bytes,2,opt,name=topic_partitions_assignment,json=topicPartitionsAssignment,proto3" json:"topic_partitions_assignment,omitempty"`
+ IsLeader bool `protobuf:"varint,3,opt,name=is_leader,json=isLeader,proto3" json:"is_leader,omitempty"`
+}
+
+func (x *AssignTopicPartitionsRequest) Reset() {
+ *x = AssignTopicPartitionsRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_mq_proto_msgTypes[18]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *AssignTopicPartitionsRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*AssignTopicPartitionsRequest) ProtoMessage() {}
+
+func (x *AssignTopicPartitionsRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[18]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use AssignTopicPartitionsRequest.ProtoReflect.Descriptor instead.
+func (*AssignTopicPartitionsRequest) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{18}
+}
+
+func (x *AssignTopicPartitionsRequest) GetTopic() *Topic {
+ if x != nil {
+ return x.Topic
+ }
+ return nil
+}
+
+func (x *AssignTopicPartitionsRequest) GetTopicPartitionsAssignment() *TopicPartitionsAssignment {
+ if x != nil {
+ return x.TopicPartitionsAssignment
+ }
+ return nil
+}
+
+func (x *AssignTopicPartitionsRequest) GetIsLeader() bool {
+ if x != nil {
+ return x.IsLeader
+ }
+ return false
+}
+
+type AssignTopicPartitionsResponse struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+}
+
+func (x *AssignTopicPartitionsResponse) Reset() {
+ *x = AssignTopicPartitionsResponse{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_mq_proto_msgTypes[19]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *AssignTopicPartitionsResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*AssignTopicPartitionsResponse) ProtoMessage() {}
+
+func (x *AssignTopicPartitionsResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[19]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use AssignTopicPartitionsResponse.ProtoReflect.Descriptor instead.
+func (*AssignTopicPartitionsResponse) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{19}
+}
+
+type CheckTopicPartitionsStatusRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"`
+ Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"`
+ BrokerPartitionsAssignment *BrokerPartitionsAssignment `protobuf:"bytes,3,opt,name=broker_partitions_assignment,json=brokerPartitionsAssignment,proto3" json:"broker_partitions_assignment,omitempty"`
+ ShouldCancelIfNotMatch bool `protobuf:"varint,4,opt,name=should_cancel_if_not_match,json=shouldCancelIfNotMatch,proto3" json:"should_cancel_if_not_match,omitempty"`
+}
+
+func (x *CheckTopicPartitionsStatusRequest) Reset() {
+ *x = CheckTopicPartitionsStatusRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_mq_proto_msgTypes[20]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *CheckTopicPartitionsStatusRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*CheckTopicPartitionsStatusRequest) ProtoMessage() {}
+
+func (x *CheckTopicPartitionsStatusRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[20]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use CheckTopicPartitionsStatusRequest.ProtoReflect.Descriptor instead.
+func (*CheckTopicPartitionsStatusRequest) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{20}
+}
+
+func (x *CheckTopicPartitionsStatusRequest) GetNamespace() string {
+ if x != nil {
+ return x.Namespace
+ }
+ return ""
+}
+
+func (x *CheckTopicPartitionsStatusRequest) GetTopic() string {
+ if x != nil {
+ return x.Topic
+ }
+ return ""
+}
+
+func (x *CheckTopicPartitionsStatusRequest) GetBrokerPartitionsAssignment() *BrokerPartitionsAssignment {
+ if x != nil {
+ return x.BrokerPartitionsAssignment
+ }
+ return nil
+}
+
+func (x *CheckTopicPartitionsStatusRequest) GetShouldCancelIfNotMatch() bool {
+ if x != nil {
+ return x.ShouldCancelIfNotMatch
+ }
+ return false
+}
+
+type CheckTopicPartitionsStatusResponse struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ TopicPartitionsAssignment *TopicPartitionsAssignment `protobuf:"bytes,1,opt,name=topic_partitions_assignment,json=topicPartitionsAssignment,proto3" json:"topic_partitions_assignment,omitempty"`
+}
+
+func (x *CheckTopicPartitionsStatusResponse) Reset() {
+ *x = CheckTopicPartitionsStatusResponse{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_mq_proto_msgTypes[21]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *CheckTopicPartitionsStatusResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*CheckTopicPartitionsStatusResponse) ProtoMessage() {}
+
+func (x *CheckTopicPartitionsStatusResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[21]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use CheckTopicPartitionsStatusResponse.ProtoReflect.Descriptor instead.
+func (*CheckTopicPartitionsStatusResponse) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{21}
+}
+
+func (x *CheckTopicPartitionsStatusResponse) GetTopicPartitionsAssignment() *TopicPartitionsAssignment {
+ if x != nil {
+ return x.TopicPartitionsAssignment
+ }
+ return nil
+}
+
// ////////////////////////////////////////////////
type PublishRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Init *PublishRequest_InitMessage `protobuf:"bytes,1,opt,name=init,proto3" json:"init,omitempty"`
- Message []byte `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
+ // Types that are assignable to Message:
+ //
+ // *PublishRequest_Init
+ // *PublishRequest_Data
+ Message isPublishRequest_Message `protobuf_oneof:"message"`
+ Sequence int64 `protobuf:"varint,3,opt,name=sequence,proto3" json:"sequence,omitempty"`
}
func (x *PublishRequest) Reset() {
*x = PublishRequest{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[11]
+ mi := &file_mq_proto_msgTypes[22]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -643,7 +1259,7 @@ func (x *PublishRequest) String() string {
func (*PublishRequest) ProtoMessage() {}
func (x *PublishRequest) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[11]
+ mi := &file_mq_proto_msgTypes[22]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -656,36 +1272,67 @@ func (x *PublishRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use PublishRequest.ProtoReflect.Descriptor instead.
func (*PublishRequest) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{11}
+ return file_mq_proto_rawDescGZIP(), []int{22}
+}
+
+func (m *PublishRequest) GetMessage() isPublishRequest_Message {
+ if m != nil {
+ return m.Message
+ }
+ return nil
}
func (x *PublishRequest) GetInit() *PublishRequest_InitMessage {
- if x != nil {
+ if x, ok := x.GetMessage().(*PublishRequest_Init); ok {
return x.Init
}
return nil
}
-func (x *PublishRequest) GetMessage() []byte {
- if x != nil {
- return x.Message
+func (x *PublishRequest) GetData() *PublishRequest_DataMessage {
+ if x, ok := x.GetMessage().(*PublishRequest_Data); ok {
+ return x.Data
}
return nil
}
+func (x *PublishRequest) GetSequence() int64 {
+ if x != nil {
+ return x.Sequence
+ }
+ return 0
+}
+
+type isPublishRequest_Message interface {
+ isPublishRequest_Message()
+}
+
+type PublishRequest_Init struct {
+ Init *PublishRequest_InitMessage `protobuf:"bytes,1,opt,name=init,proto3,oneof"`
+}
+
+type PublishRequest_Data struct {
+ Data *PublishRequest_DataMessage `protobuf:"bytes,2,opt,name=data,proto3,oneof"`
+}
+
+func (*PublishRequest_Init) isPublishRequest_Message() {}
+
+func (*PublishRequest_Data) isPublishRequest_Message() {}
+
type PublishResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- AckSequence int64 `protobuf:"varint,1,opt,name=ack_sequence,json=ackSequence,proto3" json:"ack_sequence,omitempty"`
- IsClosed bool `protobuf:"varint,2,opt,name=is_closed,json=isClosed,proto3" json:"is_closed,omitempty"`
+ AckSequence int64 `protobuf:"varint,1,opt,name=ack_sequence,json=ackSequence,proto3" json:"ack_sequence,omitempty"`
+ Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"`
+ IsClosed bool `protobuf:"varint,3,opt,name=is_closed,json=isClosed,proto3" json:"is_closed,omitempty"`
}
func (x *PublishResponse) Reset() {
*x = PublishResponse{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[12]
+ mi := &file_mq_proto_msgTypes[23]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -698,7 +1345,7 @@ func (x *PublishResponse) String() string {
func (*PublishResponse) ProtoMessage() {}
func (x *PublishResponse) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[12]
+ mi := &file_mq_proto_msgTypes[23]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -711,7 +1358,7 @@ func (x *PublishResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead.
func (*PublishResponse) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{12}
+ return file_mq_proto_rawDescGZIP(), []int{23}
}
func (x *PublishResponse) GetAckSequence() int64 {
@@ -721,6 +1368,13 @@ func (x *PublishResponse) GetAckSequence() int64 {
return 0
}
+func (x *PublishResponse) GetError() string {
+ if x != nil {
+ return x.Error
+ }
+ return ""
+}
+
func (x *PublishResponse) GetIsClosed() bool {
if x != nil {
return x.IsClosed
@@ -739,7 +1393,7 @@ type PublishRequest_InitMessage struct {
func (x *PublishRequest_InitMessage) Reset() {
*x = PublishRequest_InitMessage{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[13]
+ mi := &file_mq_proto_msgTypes[24]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -752,7 +1406,7 @@ func (x *PublishRequest_InitMessage) String() string {
func (*PublishRequest_InitMessage) ProtoMessage() {}
func (x *PublishRequest_InitMessage) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[13]
+ mi := &file_mq_proto_msgTypes[24]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -765,7 +1419,7 @@ func (x *PublishRequest_InitMessage) ProtoReflect() protoreflect.Message {
// Deprecated: Use PublishRequest_InitMessage.ProtoReflect.Descriptor instead.
func (*PublishRequest_InitMessage) Descriptor() ([]byte, []int) {
- return file_mq_proto_rawDescGZIP(), []int{11, 0}
+ return file_mq_proto_rawDescGZIP(), []int{22, 0}
}
func (x *PublishRequest_InitMessage) GetSegment() *Segment {
@@ -775,6 +1429,61 @@ func (x *PublishRequest_InitMessage) GetSegment() *Segment {
return nil
}
+type PublishRequest_DataMessage struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+ Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *PublishRequest_DataMessage) Reset() {
+ *x = PublishRequest_DataMessage{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_mq_proto_msgTypes[25]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *PublishRequest_DataMessage) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*PublishRequest_DataMessage) ProtoMessage() {}
+
+func (x *PublishRequest_DataMessage) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[25]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use PublishRequest_DataMessage.ProtoReflect.Descriptor instead.
+func (*PublishRequest_DataMessage) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{22, 1}
+}
+
+func (x *PublishRequest_DataMessage) GetKey() []byte {
+ if x != nil {
+ return x.Key
+ }
+ return nil
+}
+
+func (x *PublishRequest_DataMessage) GetValue() []byte {
+ if x != nil {
+ return x.Value
+ }
+ return nil
+}
+
var File_mq_proto protoreflect.FileDescriptor
var file_mq_proto_rawDesc = []byte{
@@ -801,100 +1510,239 @@ var file_mq_proto_rawDesc = []byte{
0x32, 0x0a, 0x18, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61,
0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x62,
0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x72, 0x6f,
- 0x6b, 0x65, 0x72, 0x22, 0x68, 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e,
- 0x12, 0x1b, 0x0a, 0x09, 0x72, 0x69, 0x6e, 0x67, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20,
- 0x01, 0x28, 0x05, 0x52, 0x08, 0x72, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x1f, 0x0a,
- 0x0b, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01,
- 0x28, 0x05, 0x52, 0x0a, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x1d,
- 0x0a, 0x0a, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x5f, 0x73, 0x74, 0x6f, 0x70, 0x18, 0x03, 0x20, 0x01,
- 0x28, 0x05, 0x52, 0x09, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x74, 0x6f, 0x70, 0x22, 0x84, 0x01,
- 0x0a, 0x07, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d,
- 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61,
- 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63,
- 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x0e, 0x0a,
- 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x12, 0x35, 0x0a,
- 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b,
- 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e,
- 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69,
- 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x4e, 0x0a, 0x1b, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65,
- 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75,
- 0x65, 0x73, 0x74, 0x12, 0x2f, 0x0a, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01,
- 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
- 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x07, 0x73, 0x65, 0x67,
- 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x38, 0x0a, 0x1c, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65,
- 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70,
- 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x18,
- 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x22, 0x4c,
- 0x0a, 0x19, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74,
- 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2f, 0x0a, 0x07, 0x73,
+ 0x6b, 0x65, 0x72, 0x22, 0x39, 0x0a, 0x05, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x1c, 0x0a, 0x09,
+ 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
+ 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61,
+ 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x22, 0x68,
+ 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1b, 0x0a, 0x09, 0x72,
+ 0x69, 0x6e, 0x67, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08,
+ 0x72, 0x69, 0x6e, 0x67, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x61, 0x6e, 0x67,
+ 0x65, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x72,
+ 0x61, 0x6e, 0x67, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x61, 0x6e,
+ 0x67, 0x65, 0x5f, 0x73, 0x74, 0x6f, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x72,
+ 0x61, 0x6e, 0x67, 0x65, 0x53, 0x74, 0x6f, 0x70, 0x22, 0x84, 0x01, 0x0a, 0x07, 0x53, 0x65, 0x67,
+ 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63,
+ 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61,
+ 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28,
+ 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03,
+ 0x20, 0x01, 0x28, 0x05, 0x52, 0x02, 0x69, 0x64, 0x12, 0x35, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74,
+ 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65,
+ 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69,
+ 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22,
+ 0x4e, 0x0a, 0x1b, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74,
+ 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2f,
+ 0x0a, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32,
+ 0x15, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53,
+ 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x22,
+ 0x38, 0x0a, 0x1c, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74,
+ 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
+ 0x18, 0x0a, 0x07, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09,
+ 0x52, 0x07, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x22, 0x4c, 0x0a, 0x19, 0x43, 0x68, 0x65,
+ 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52,
+ 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2f, 0x0a, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e,
+ 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
+ 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x07,
+ 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x39, 0x0a, 0x1a, 0x43, 0x68, 0x65, 0x63, 0x6b,
+ 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73,
+ 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x73, 0x5f, 0x61, 0x63, 0x74, 0x69,
+ 0x76, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, 0x73, 0x41, 0x63, 0x74, 0x69,
+ 0x76, 0x65, 0x22, 0x18, 0x0a, 0x16, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65,
+ 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x5f, 0x0a, 0x17,
+ 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52,
+ 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61,
+ 0x67, 0x65, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c,
+ 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x1f, 0x0a, 0x0b,
+ 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28,
+ 0x03, 0x52, 0x0a, 0x62, 0x79, 0x74, 0x65, 0x73, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x6a, 0x0a,
+ 0x17, 0x46, 0x69, 0x6e, 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72,
+ 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69,
+ 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
+ 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f,
+ 0x70, 0x69, 0x63, 0x12, 0x24, 0x0a, 0x0e, 0x69, 0x73, 0x5f, 0x66, 0x6f, 0x72, 0x5f, 0x70, 0x75,
+ 0x62, 0x6c, 0x69, 0x73, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x73, 0x46,
+ 0x6f, 0x72, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x22, 0xae, 0x01, 0x0a, 0x18, 0x46, 0x69,
+ 0x6e, 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65,
+ 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18,
+ 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e,
+ 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69,
+ 0x63, 0x12, 0x67, 0x0a, 0x1b, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69,
+ 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74,
+ 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69,
+ 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69,
+ 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x52,
+ 0x19, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73,
+ 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0xbc, 0x01, 0x0a, 0x1a, 0x42,
+ 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41,
+ 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x27, 0x0a, 0x0f, 0x70, 0x61, 0x72,
+ 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x05, 0x52, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61,
+ 0x72, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f,
+ 0x73, 0x74, 0x6f, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0d, 0x70, 0x61, 0x72, 0x74,
+ 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x6f, 0x70, 0x12, 0x23, 0x0a, 0x0d, 0x6c, 0x65, 0x61,
+ 0x64, 0x65, 0x72, 0x5f, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
+ 0x52, 0x0c, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x12, 0x29,
+ 0x0a, 0x10, 0x66, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x72, 0x5f, 0x62, 0x72, 0x6f, 0x6b, 0x65,
+ 0x72, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x66, 0x6f, 0x6c, 0x6c, 0x6f, 0x77,
+ 0x65, 0x72, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x22, 0x9b, 0x01, 0x0a, 0x19, 0x54, 0x6f,
+ 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73,
+ 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x27, 0x0a, 0x0f, 0x70, 0x61, 0x72, 0x74, 0x69,
+ 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05,
+ 0x52, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x75, 0x6e, 0x74,
+ 0x12, 0x55, 0x0a, 0x11, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69,
+ 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x6d, 0x65,
+ 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x42, 0x72, 0x6f, 0x6b, 0x65,
+ 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67,
+ 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x10, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, 0x61, 0x72,
+ 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x73, 0x0a, 0x1d, 0x52, 0x65, 0x71, 0x75, 0x65,
+ 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e,
+ 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69,
+ 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
+ 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f,
+ 0x70, 0x69, 0x63, 0x12, 0x27, 0x0a, 0x0f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e,
+ 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0e, 0x70, 0x61,
+ 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x89, 0x01, 0x0a,
+ 0x1e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72,
+ 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
+ 0x67, 0x0a, 0x1b, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
+ 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
+ 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
+ 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x19, 0x74,
+ 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73,
+ 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0xcf, 0x01, 0x0a, 0x1c, 0x41, 0x73, 0x73,
+ 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f,
+ 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70,
+ 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61,
+ 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74,
+ 0x6f, 0x70, 0x69, 0x63, 0x12, 0x67, 0x0a, 0x1b, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x5f, 0x70, 0x61,
+ 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d,
+ 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61,
+ 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65,
+ 0x6e, 0x74, 0x52, 0x19, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
+ 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x1b, 0x0a,
+ 0x09, 0x69, 0x73, 0x5f, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08,
+ 0x52, 0x08, 0x69, 0x73, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x22, 0x1f, 0x0a, 0x1d, 0x41, 0x73,
+ 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
+ 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xff, 0x01, 0x0a, 0x21,
+ 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74,
+ 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
+ 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12,
+ 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05,
+ 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x6a, 0x0a, 0x1c, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x5f,
+ 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x73, 0x73, 0x69, 0x67,
+ 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x6d, 0x65,
+ 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x42, 0x72, 0x6f, 0x6b, 0x65,
+ 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67,
+ 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x1a, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, 0x61, 0x72,
+ 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e,
+ 0x74, 0x12, 0x3a, 0x0a, 0x1a, 0x73, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x5f, 0x63, 0x61, 0x6e, 0x63,
+ 0x65, 0x6c, 0x5f, 0x69, 0x66, 0x5f, 0x6e, 0x6f, 0x74, 0x5f, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x18,
+ 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x16, 0x73, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x43, 0x61, 0x6e,
+ 0x63, 0x65, 0x6c, 0x49, 0x66, 0x4e, 0x6f, 0x74, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x22, 0x8d, 0x01,
+ 0x0a, 0x22, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74,
+ 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70,
+ 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x67, 0x0a, 0x1b, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x5f, 0x70, 0x61,
+ 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d,
+ 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61,
+ 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65,
+ 0x6e, 0x74, 0x52, 0x19, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
+ 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0xae, 0x02,
+ 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
+ 0x12, 0x3e, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28,
+ 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75,
+ 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69,
+ 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74,
+ 0x12, 0x3e, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28,
+ 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75,
+ 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x44, 0x61, 0x74,
+ 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61,
+ 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01,
+ 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x1a, 0x3e, 0x0a, 0x0b,
+ 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x2f, 0x0a, 0x07, 0x73,
0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x67, 0x6d,
- 0x65, 0x6e, 0x74, 0x52, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x39, 0x0a, 0x1a,
- 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74,
- 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x73,
- 0x5f, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69,
- 0x73, 0x41, 0x63, 0x74, 0x69, 0x76, 0x65, 0x22, 0x18, 0x0a, 0x16, 0x43, 0x68, 0x65, 0x63, 0x6b,
- 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
- 0x74, 0x22, 0x5f, 0x0a, 0x17, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72,
- 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x23, 0x0a, 0x0d,
- 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20,
- 0x01, 0x28, 0x03, 0x52, 0x0c, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x43, 0x6f, 0x75, 0x6e,
- 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74,
- 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x62, 0x79, 0x74, 0x65, 0x73, 0x43, 0x6f, 0x75,
- 0x6e, 0x74, 0x22, 0xa8, 0x01, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65,
- 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3c, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20,
- 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
- 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
- 0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x04, 0x69,
- 0x6e, 0x69, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02,
- 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x3e, 0x0a,
- 0x0b, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x2f, 0x0a, 0x07,
- 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e,
- 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x67,
- 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x51, 0x0a,
- 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
- 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x6b, 0x5f, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65,
- 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x61, 0x63, 0x6b, 0x53, 0x65, 0x71, 0x75, 0x65,
- 0x6e, 0x63, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x73, 0x5f, 0x63, 0x6c, 0x6f, 0x73, 0x65, 0x64,
- 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, 0x73, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x64,
- 0x32, 0x83, 0x04, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73,
- 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f,
- 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73,
- 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f,
- 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
- 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e,
+ 0x65, 0x6e, 0x74, 0x52, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x1a, 0x35, 0x0a, 0x0b,
+ 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b,
+ 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a,
+ 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61,
+ 0x6c, 0x75, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x67,
+ 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
+ 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x6b, 0x5f, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63,
+ 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x61, 0x63, 0x6b, 0x53, 0x65, 0x71, 0x75,
+ 0x65, 0x6e, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20,
+ 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x73,
+ 0x5f, 0x63, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69,
+ 0x73, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x32, 0xd7, 0x07, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77,
+ 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10,
+ 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72,
+ 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e,
0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72,
- 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x14, 0x41, 0x73,
- 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65,
- 0x72, 0x73, 0x12, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
- 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42,
- 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e,
- 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73,
- 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72,
- 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x43,
- 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75,
- 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62,
+ 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
+ 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65,
+ 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
+ 0x00, 0x12, 0x6f, 0x0a, 0x14, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65,
+ 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53,
+ 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71,
+ 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
+ 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e,
+ 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
+ 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65,
+ 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61,
+ 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67,
+ 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
+ 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62,
0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61,
- 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73,
- 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53,
- 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70,
- 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x0f, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42,
- 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73,
- 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72,
- 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
- 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43,
- 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65,
- 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c,
- 0x69, 0x73, 0x68, 0x12, 0x1c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
- 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
- 0x74, 0x1a, 0x1d, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62,
- 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
- 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x4e, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65,
- 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, 0x10, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x51, 0x75,
- 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
- 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x73,
- 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62,
- 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a,
+ 0x0f, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64,
+ 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e,
+ 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52,
+ 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69,
+ 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65,
+ 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
+ 0x63, 0x0a, 0x10, 0x46, 0x69, 0x6e, 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b,
+ 0x65, 0x72, 0x73, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
+ 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b,
+ 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73,
+ 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x54, 0x6f,
+ 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
+ 0x73, 0x65, 0x22, 0x00, 0x12, 0x75, 0x0a, 0x16, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54,
+ 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2b,
+ 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65,
+ 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74,
+ 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x6d, 0x65,
+ 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65,
+ 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e,
+ 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x72, 0x0a, 0x15, 0x41,
+ 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74,
+ 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
+ 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50,
+ 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
+ 0x1a, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e,
+ 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69,
+ 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
+ 0x81, 0x01, 0x0a, 0x1a, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61,
+ 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2f,
+ 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68,
+ 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f,
+ 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
+ 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43,
+ 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
+ 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
+ 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x1c,
+ 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75,
+ 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6d,
+ 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c,
+ 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30,
+ 0x01, 0x42, 0x4e, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d,
+ 0x71, 0x42, 0x10, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x72,
+ 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
+ 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65,
+ 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70,
+ 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -909,45 +1757,76 @@ func file_mq_proto_rawDescGZIP() []byte {
return file_mq_proto_rawDescData
}
-var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 14)
+var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 26)
var file_mq_proto_goTypes = []interface{}{
- (*SegmentInfo)(nil), // 0: messaging_pb.SegmentInfo
- (*FindBrokerLeaderRequest)(nil), // 1: messaging_pb.FindBrokerLeaderRequest
- (*FindBrokerLeaderResponse)(nil), // 2: messaging_pb.FindBrokerLeaderResponse
- (*Partition)(nil), // 3: messaging_pb.Partition
- (*Segment)(nil), // 4: messaging_pb.Segment
- (*AssignSegmentBrokersRequest)(nil), // 5: messaging_pb.AssignSegmentBrokersRequest
- (*AssignSegmentBrokersResponse)(nil), // 6: messaging_pb.AssignSegmentBrokersResponse
- (*CheckSegmentStatusRequest)(nil), // 7: messaging_pb.CheckSegmentStatusRequest
- (*CheckSegmentStatusResponse)(nil), // 8: messaging_pb.CheckSegmentStatusResponse
- (*CheckBrokerLoadRequest)(nil), // 9: messaging_pb.CheckBrokerLoadRequest
- (*CheckBrokerLoadResponse)(nil), // 10: messaging_pb.CheckBrokerLoadResponse
- (*PublishRequest)(nil), // 11: messaging_pb.PublishRequest
- (*PublishResponse)(nil), // 12: messaging_pb.PublishResponse
- (*PublishRequest_InitMessage)(nil), // 13: messaging_pb.PublishRequest.InitMessage
+ (*SegmentInfo)(nil), // 0: messaging_pb.SegmentInfo
+ (*FindBrokerLeaderRequest)(nil), // 1: messaging_pb.FindBrokerLeaderRequest
+ (*FindBrokerLeaderResponse)(nil), // 2: messaging_pb.FindBrokerLeaderResponse
+ (*Topic)(nil), // 3: messaging_pb.Topic
+ (*Partition)(nil), // 4: messaging_pb.Partition
+ (*Segment)(nil), // 5: messaging_pb.Segment
+ (*AssignSegmentBrokersRequest)(nil), // 6: messaging_pb.AssignSegmentBrokersRequest
+ (*AssignSegmentBrokersResponse)(nil), // 7: messaging_pb.AssignSegmentBrokersResponse
+ (*CheckSegmentStatusRequest)(nil), // 8: messaging_pb.CheckSegmentStatusRequest
+ (*CheckSegmentStatusResponse)(nil), // 9: messaging_pb.CheckSegmentStatusResponse
+ (*CheckBrokerLoadRequest)(nil), // 10: messaging_pb.CheckBrokerLoadRequest
+ (*CheckBrokerLoadResponse)(nil), // 11: messaging_pb.CheckBrokerLoadResponse
+ (*FindTopicBrokersRequest)(nil), // 12: messaging_pb.FindTopicBrokersRequest
+ (*FindTopicBrokersResponse)(nil), // 13: messaging_pb.FindTopicBrokersResponse
+ (*BrokerPartitionsAssignment)(nil), // 14: messaging_pb.BrokerPartitionsAssignment
+ (*TopicPartitionsAssignment)(nil), // 15: messaging_pb.TopicPartitionsAssignment
+ (*RequestTopicPartitionsRequest)(nil), // 16: messaging_pb.RequestTopicPartitionsRequest
+ (*RequestTopicPartitionsResponse)(nil), // 17: messaging_pb.RequestTopicPartitionsResponse
+ (*AssignTopicPartitionsRequest)(nil), // 18: messaging_pb.AssignTopicPartitionsRequest
+ (*AssignTopicPartitionsResponse)(nil), // 19: messaging_pb.AssignTopicPartitionsResponse
+ (*CheckTopicPartitionsStatusRequest)(nil), // 20: messaging_pb.CheckTopicPartitionsStatusRequest
+ (*CheckTopicPartitionsStatusResponse)(nil), // 21: messaging_pb.CheckTopicPartitionsStatusResponse
+ (*PublishRequest)(nil), // 22: messaging_pb.PublishRequest
+ (*PublishResponse)(nil), // 23: messaging_pb.PublishResponse
+ (*PublishRequest_InitMessage)(nil), // 24: messaging_pb.PublishRequest.InitMessage
+ (*PublishRequest_DataMessage)(nil), // 25: messaging_pb.PublishRequest.DataMessage
}
var file_mq_proto_depIdxs = []int32{
- 4, // 0: messaging_pb.SegmentInfo.segment:type_name -> messaging_pb.Segment
- 3, // 1: messaging_pb.Segment.partition:type_name -> messaging_pb.Partition
- 4, // 2: messaging_pb.AssignSegmentBrokersRequest.segment:type_name -> messaging_pb.Segment
- 4, // 3: messaging_pb.CheckSegmentStatusRequest.segment:type_name -> messaging_pb.Segment
- 13, // 4: messaging_pb.PublishRequest.init:type_name -> messaging_pb.PublishRequest.InitMessage
- 4, // 5: messaging_pb.PublishRequest.InitMessage.segment:type_name -> messaging_pb.Segment
- 1, // 6: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest
- 5, // 7: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest
- 7, // 8: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest
- 9, // 9: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest
- 11, // 10: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest
- 2, // 11: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse
- 6, // 12: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse
- 8, // 13: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse
- 10, // 14: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse
- 12, // 15: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse
- 11, // [11:16] is the sub-list for method output_type
- 6, // [6:11] is the sub-list for method input_type
- 6, // [6:6] is the sub-list for extension type_name
- 6, // [6:6] is the sub-list for extension extendee
- 0, // [0:6] is the sub-list for field type_name
+ 5, // 0: messaging_pb.SegmentInfo.segment:type_name -> messaging_pb.Segment
+ 4, // 1: messaging_pb.Segment.partition:type_name -> messaging_pb.Partition
+ 5, // 2: messaging_pb.AssignSegmentBrokersRequest.segment:type_name -> messaging_pb.Segment
+ 5, // 3: messaging_pb.CheckSegmentStatusRequest.segment:type_name -> messaging_pb.Segment
+ 3, // 4: messaging_pb.FindTopicBrokersRequest.topic:type_name -> messaging_pb.Topic
+ 3, // 5: messaging_pb.FindTopicBrokersResponse.topic:type_name -> messaging_pb.Topic
+ 15, // 6: messaging_pb.FindTopicBrokersResponse.topic_partitions_assignment:type_name -> messaging_pb.TopicPartitionsAssignment
+ 14, // 7: messaging_pb.TopicPartitionsAssignment.broker_partitions:type_name -> messaging_pb.BrokerPartitionsAssignment
+ 3, // 8: messaging_pb.RequestTopicPartitionsRequest.topic:type_name -> messaging_pb.Topic
+ 15, // 9: messaging_pb.RequestTopicPartitionsResponse.topic_partitions_assignment:type_name -> messaging_pb.TopicPartitionsAssignment
+ 3, // 10: messaging_pb.AssignTopicPartitionsRequest.topic:type_name -> messaging_pb.Topic
+ 15, // 11: messaging_pb.AssignTopicPartitionsRequest.topic_partitions_assignment:type_name -> messaging_pb.TopicPartitionsAssignment
+ 14, // 12: messaging_pb.CheckTopicPartitionsStatusRequest.broker_partitions_assignment:type_name -> messaging_pb.BrokerPartitionsAssignment
+ 15, // 13: messaging_pb.CheckTopicPartitionsStatusResponse.topic_partitions_assignment:type_name -> messaging_pb.TopicPartitionsAssignment
+ 24, // 14: messaging_pb.PublishRequest.init:type_name -> messaging_pb.PublishRequest.InitMessage
+ 25, // 15: messaging_pb.PublishRequest.data:type_name -> messaging_pb.PublishRequest.DataMessage
+ 5, // 16: messaging_pb.PublishRequest.InitMessage.segment:type_name -> messaging_pb.Segment
+ 1, // 17: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest
+ 6, // 18: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest
+ 8, // 19: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest
+ 10, // 20: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest
+ 12, // 21: messaging_pb.SeaweedMessaging.FindTopicBrokers:input_type -> messaging_pb.FindTopicBrokersRequest
+ 16, // 22: messaging_pb.SeaweedMessaging.RequestTopicPartitions:input_type -> messaging_pb.RequestTopicPartitionsRequest
+ 18, // 23: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest
+ 20, // 24: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:input_type -> messaging_pb.CheckTopicPartitionsStatusRequest
+ 22, // 25: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest
+ 2, // 26: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse
+ 7, // 27: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse
+ 9, // 28: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse
+ 11, // 29: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse
+ 13, // 30: messaging_pb.SeaweedMessaging.FindTopicBrokers:output_type -> messaging_pb.FindTopicBrokersResponse
+ 17, // 31: messaging_pb.SeaweedMessaging.RequestTopicPartitions:output_type -> messaging_pb.RequestTopicPartitionsResponse
+ 19, // 32: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse
+ 21, // 33: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:output_type -> messaging_pb.CheckTopicPartitionsStatusResponse
+ 23, // 34: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse
+ 26, // [26:35] is the sub-list for method output_type
+ 17, // [17:26] is the sub-list for method input_type
+ 17, // [17:17] is the sub-list for extension type_name
+ 17, // [17:17] is the sub-list for extension extendee
+ 0, // [0:17] is the sub-list for field type_name
}
func init() { file_mq_proto_init() }
@@ -993,7 +1872,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*Partition); i {
+ switch v := v.(*Topic); i {
case 0:
return &v.state
case 1:
@@ -1005,7 +1884,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*Segment); i {
+ switch v := v.(*Partition); i {
case 0:
return &v.state
case 1:
@@ -1017,7 +1896,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*AssignSegmentBrokersRequest); i {
+ switch v := v.(*Segment); i {
case 0:
return &v.state
case 1:
@@ -1029,7 +1908,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*AssignSegmentBrokersResponse); i {
+ switch v := v.(*AssignSegmentBrokersRequest); i {
case 0:
return &v.state
case 1:
@@ -1041,7 +1920,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*CheckSegmentStatusRequest); i {
+ switch v := v.(*AssignSegmentBrokersResponse); i {
case 0:
return &v.state
case 1:
@@ -1053,7 +1932,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*CheckSegmentStatusResponse); i {
+ switch v := v.(*CheckSegmentStatusRequest); i {
case 0:
return &v.state
case 1:
@@ -1065,7 +1944,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*CheckBrokerLoadRequest); i {
+ switch v := v.(*CheckSegmentStatusResponse); i {
case 0:
return &v.state
case 1:
@@ -1077,7 +1956,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*CheckBrokerLoadResponse); i {
+ switch v := v.(*CheckBrokerLoadRequest); i {
case 0:
return &v.state
case 1:
@@ -1089,7 +1968,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*PublishRequest); i {
+ switch v := v.(*CheckBrokerLoadResponse); i {
case 0:
return &v.state
case 1:
@@ -1101,7 +1980,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*PublishResponse); i {
+ switch v := v.(*FindTopicBrokersRequest); i {
case 0:
return &v.state
case 1:
@@ -1113,6 +1992,138 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*FindTopicBrokersResponse); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_mq_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*BrokerPartitionsAssignment); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_mq_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*TopicPartitionsAssignment); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_mq_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*RequestTopicPartitionsRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_mq_proto_msgTypes[17].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*RequestTopicPartitionsResponse); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_mq_proto_msgTypes[18].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*AssignTopicPartitionsRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_mq_proto_msgTypes[19].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*AssignTopicPartitionsResponse); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_mq_proto_msgTypes[20].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*CheckTopicPartitionsStatusRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_mq_proto_msgTypes[21].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*CheckTopicPartitionsStatusResponse); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_mq_proto_msgTypes[22].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*PublishRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_mq_proto_msgTypes[23].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*PublishResponse); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_mq_proto_msgTypes[24].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PublishRequest_InitMessage); i {
case 0:
return &v.state
@@ -1124,6 +2135,22 @@ func file_mq_proto_init() {
return nil
}
}
+ file_mq_proto_msgTypes[25].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*PublishRequest_DataMessage); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ file_mq_proto_msgTypes[22].OneofWrappers = []interface{}{
+ (*PublishRequest_Init)(nil),
+ (*PublishRequest_Data)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -1131,7 +2158,7 @@ func file_mq_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_mq_proto_rawDesc,
NumEnums: 0,
- NumMessages: 14,
+ NumMessages: 26,
NumExtensions: 0,
NumServices: 1,
},
diff --git a/weed/pb/mq_pb/mq_grpc.pb.go b/weed/pb/mq_pb/mq_grpc.pb.go
index bf52b5a1b..c60ecbe5b 100644
--- a/weed/pb/mq_pb/mq_grpc.pb.go
+++ b/weed/pb/mq_pb/mq_grpc.pb.go
@@ -27,6 +27,12 @@ type SeaweedMessagingClient interface {
AssignSegmentBrokers(ctx context.Context, in *AssignSegmentBrokersRequest, opts ...grpc.CallOption) (*AssignSegmentBrokersResponse, error)
CheckSegmentStatus(ctx context.Context, in *CheckSegmentStatusRequest, opts ...grpc.CallOption) (*CheckSegmentStatusResponse, error)
CheckBrokerLoad(ctx context.Context, in *CheckBrokerLoadRequest, opts ...grpc.CallOption) (*CheckBrokerLoadResponse, error)
+ // control plane for topic partitions
+ FindTopicBrokers(ctx context.Context, in *FindTopicBrokersRequest, opts ...grpc.CallOption) (*FindTopicBrokersResponse, error)
+ // a pub client will call this to get the topic partitions assignment
+ RequestTopicPartitions(ctx context.Context, in *RequestTopicPartitionsRequest, opts ...grpc.CallOption) (*RequestTopicPartitionsResponse, error)
+ AssignTopicPartitions(ctx context.Context, in *AssignTopicPartitionsRequest, opts ...grpc.CallOption) (*AssignTopicPartitionsResponse, error)
+ CheckTopicPartitionsStatus(ctx context.Context, in *CheckTopicPartitionsStatusRequest, opts ...grpc.CallOption) (*CheckTopicPartitionsStatusResponse, error)
// data plane
Publish(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishClient, error)
}
@@ -75,6 +81,42 @@ func (c *seaweedMessagingClient) CheckBrokerLoad(ctx context.Context, in *CheckB
return out, nil
}
+func (c *seaweedMessagingClient) FindTopicBrokers(ctx context.Context, in *FindTopicBrokersRequest, opts ...grpc.CallOption) (*FindTopicBrokersResponse, error) {
+ out := new(FindTopicBrokersResponse)
+ err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/FindTopicBrokers", in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
+func (c *seaweedMessagingClient) RequestTopicPartitions(ctx context.Context, in *RequestTopicPartitionsRequest, opts ...grpc.CallOption) (*RequestTopicPartitionsResponse, error) {
+ out := new(RequestTopicPartitionsResponse)
+ err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/RequestTopicPartitions", in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
+func (c *seaweedMessagingClient) AssignTopicPartitions(ctx context.Context, in *AssignTopicPartitionsRequest, opts ...grpc.CallOption) (*AssignTopicPartitionsResponse, error) {
+ out := new(AssignTopicPartitionsResponse)
+ err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/AssignTopicPartitions", in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
+func (c *seaweedMessagingClient) CheckTopicPartitionsStatus(ctx context.Context, in *CheckTopicPartitionsStatusRequest, opts ...grpc.CallOption) (*CheckTopicPartitionsStatusResponse, error) {
+ out := new(CheckTopicPartitionsStatusResponse)
+ err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/CheckTopicPartitionsStatus", in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
func (c *seaweedMessagingClient) Publish(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishClient, error) {
stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[0], "/messaging_pb.SeaweedMessaging/Publish", opts...)
if err != nil {
@@ -115,6 +157,12 @@ type SeaweedMessagingServer interface {
AssignSegmentBrokers(context.Context, *AssignSegmentBrokersRequest) (*AssignSegmentBrokersResponse, error)
CheckSegmentStatus(context.Context, *CheckSegmentStatusRequest) (*CheckSegmentStatusResponse, error)
CheckBrokerLoad(context.Context, *CheckBrokerLoadRequest) (*CheckBrokerLoadResponse, error)
+ // control plane for topic partitions
+ FindTopicBrokers(context.Context, *FindTopicBrokersRequest) (*FindTopicBrokersResponse, error)
+ // a pub client will call this to get the topic partitions assignment
+ RequestTopicPartitions(context.Context, *RequestTopicPartitionsRequest) (*RequestTopicPartitionsResponse, error)
+ AssignTopicPartitions(context.Context, *AssignTopicPartitionsRequest) (*AssignTopicPartitionsResponse, error)
+ CheckTopicPartitionsStatus(context.Context, *CheckTopicPartitionsStatusRequest) (*CheckTopicPartitionsStatusResponse, error)
// data plane
Publish(SeaweedMessaging_PublishServer) error
mustEmbedUnimplementedSeaweedMessagingServer()
@@ -136,6 +184,18 @@ func (UnimplementedSeaweedMessagingServer) CheckSegmentStatus(context.Context, *
func (UnimplementedSeaweedMessagingServer) CheckBrokerLoad(context.Context, *CheckBrokerLoadRequest) (*CheckBrokerLoadResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method CheckBrokerLoad not implemented")
}
+func (UnimplementedSeaweedMessagingServer) FindTopicBrokers(context.Context, *FindTopicBrokersRequest) (*FindTopicBrokersResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method FindTopicBrokers not implemented")
+}
+func (UnimplementedSeaweedMessagingServer) RequestTopicPartitions(context.Context, *RequestTopicPartitionsRequest) (*RequestTopicPartitionsResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method RequestTopicPartitions not implemented")
+}
+func (UnimplementedSeaweedMessagingServer) AssignTopicPartitions(context.Context, *AssignTopicPartitionsRequest) (*AssignTopicPartitionsResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method AssignTopicPartitions not implemented")
+}
+func (UnimplementedSeaweedMessagingServer) CheckTopicPartitionsStatus(context.Context, *CheckTopicPartitionsStatusRequest) (*CheckTopicPartitionsStatusResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method CheckTopicPartitionsStatus not implemented")
+}
func (UnimplementedSeaweedMessagingServer) Publish(SeaweedMessaging_PublishServer) error {
return status.Errorf(codes.Unimplemented, "method Publish not implemented")
}
@@ -224,6 +284,78 @@ func _SeaweedMessaging_CheckBrokerLoad_Handler(srv interface{}, ctx context.Cont
return interceptor(ctx, in, info, handler)
}
+func _SeaweedMessaging_FindTopicBrokers_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(FindTopicBrokersRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(SeaweedMessagingServer).FindTopicBrokers(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/messaging_pb.SeaweedMessaging/FindTopicBrokers",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(SeaweedMessagingServer).FindTopicBrokers(ctx, req.(*FindTopicBrokersRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
+func _SeaweedMessaging_RequestTopicPartitions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(RequestTopicPartitionsRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(SeaweedMessagingServer).RequestTopicPartitions(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/messaging_pb.SeaweedMessaging/RequestTopicPartitions",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(SeaweedMessagingServer).RequestTopicPartitions(ctx, req.(*RequestTopicPartitionsRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
+func _SeaweedMessaging_AssignTopicPartitions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(AssignTopicPartitionsRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(SeaweedMessagingServer).AssignTopicPartitions(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/messaging_pb.SeaweedMessaging/AssignTopicPartitions",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(SeaweedMessagingServer).AssignTopicPartitions(ctx, req.(*AssignTopicPartitionsRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
+func _SeaweedMessaging_CheckTopicPartitionsStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(CheckTopicPartitionsStatusRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(SeaweedMessagingServer).CheckTopicPartitionsStatus(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/messaging_pb.SeaweedMessaging/CheckTopicPartitionsStatus",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(SeaweedMessagingServer).CheckTopicPartitionsStatus(ctx, req.(*CheckTopicPartitionsStatusRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
func _SeaweedMessaging_Publish_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(SeaweedMessagingServer).Publish(&seaweedMessagingPublishServer{stream})
}
@@ -273,6 +405,22 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{
MethodName: "CheckBrokerLoad",
Handler: _SeaweedMessaging_CheckBrokerLoad_Handler,
},
+ {
+ MethodName: "FindTopicBrokers",
+ Handler: _SeaweedMessaging_FindTopicBrokers_Handler,
+ },
+ {
+ MethodName: "RequestTopicPartitions",
+ Handler: _SeaweedMessaging_RequestTopicPartitions_Handler,
+ },
+ {
+ MethodName: "AssignTopicPartitions",
+ Handler: _SeaweedMessaging_AssignTopicPartitions_Handler,
+ },
+ {
+ MethodName: "CheckTopicPartitionsStatus",
+ Handler: _SeaweedMessaging_CheckTopicPartitionsStatus_Handler,
+ },
},
Streams: []grpc.StreamDesc{
{