diff options
24 files changed, 1680 insertions, 529 deletions
diff --git a/.gitignore b/.gitignore index 25a58bc67..3e09f6e38 100644 --- a/.gitignore +++ b/.gitignore @@ -87,3 +87,4 @@ other/java/hdfs/dependency-reduced-pom.xml # binary file weed/weed +weed/mq/client/cmd/weed_pub/weed_pub @@ -266,6 +266,7 @@ require ( github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/putdotio/go-putio/putio v0.0.0-20200123120452-16d982cac2b8 // indirect github.com/rclone/ftp v0.0.0-20230327202000-dadc1f64e87d // indirect + github.com/rdleal/intervalst v0.0.0-20221028215511-a098aa0d2cb8 // indirect github.com/rfjakob/eme v1.1.2 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/shirou/gopsutil/v3 v3.23.5 // indirect @@ -752,6 +752,8 @@ github.com/rclone/rclone v1.63.1 h1:iITCUNBfAXnguHjRPFq+w/gGIW0L0las78h4H5CH2Ms= github.com/rclone/rclone v1.63.1/go.mod h1:eUQaKsf1wJfHKB0RDoM8RaPAeRB2eI/Qw+Vc9Ho5FGM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rdleal/intervalst v0.0.0-20221028215511-a098aa0d2cb8 h1:5jSBlCYQYquRF8Zch4QrimJcX7/H1qbWQEAzYNxMubc= +github.com/rdleal/intervalst v0.0.0-20221028215511-a098aa0d2cb8/go.mod h1:xO89Z6BC+LQDH+IPQQw/OESt5UADgFD41tYMUINGpxQ= github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE= github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps= github.com/rekby/fixenv v0.3.2/go.mod h1:/b5LRc06BYJtslRtHKxsPWFT/ySpHV+rWvzTg+XWk4c= diff --git a/weed/Makefile b/weed/Makefile index 356dc4b3e..c84678f97 100644 --- a/weed/Makefile +++ b/weed/Makefile @@ -37,6 +37,10 @@ debug_s3: go build -gcflags="all=-N -l" dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec ./weed -- -v=4 s3 +debug_mq: + go build -gcflags="all=-N -l" + dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec ./weed -- -v=4 mq.broker + debug_filer_copy: go build -gcflags="all=-N -l" dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec ./weed -- -v=4 filer.backup -filer=localhost:8888 -filerProxy -timeAgo=10h diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go new file mode 100644 index 000000000..4d9005e82 --- /dev/null +++ b/weed/mq/balancer/balancer.go @@ -0,0 +1,20 @@ +package balancer + +import cmap "github.com/orcaman/concurrent-map" + +type Balancer struct { + brokers cmap.ConcurrentMap[string, *BrokerStats] +} +type BrokerStats struct { + stats map[TopicPartition]*TopicPartitionStats +} + +type TopicPartition struct { + Topic string + RangeStart int32 + RangeStop int32 +} + +type TopicPartitionStats struct { + Throughput int64 +} diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/broker_grpc_admin.go index 5aac780fb..7337ba23e 100644 --- a/weed/mq/broker/brokder_grpc_admin.go +++ b/weed/mq/broker/broker_grpc_admin.go @@ -88,69 +88,38 @@ func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq return ret, nil } -// FindTopicBrokers returns the brokers that are serving the topic -// -// 1. lock the topic -// -// 2. find the topic partitions on the filer -// 2.1 if the topic is not found, return error -// 2.2 if the request is_for_publish, create the topic -// 2.2.1 if the request is_for_subscribe, return error not found -// 2.2.2 if the request is_for_publish, create the topic -// 2.2 if the topic is found, return the brokers -// -// 3. unlock the topic -func (broker *MessageQueueBroker) FindTopicBrokers(c context.Context, request *mq_pb.FindTopicBrokersRequest) (*mq_pb.FindTopicBrokersResponse, error) { - ret := &mq_pb.FindTopicBrokersResponse{} - // lock the topic - - // find the topic partitions on the filer - // if the topic is not found - // if the request is_for_publish - // create the topic - // if the request is_for_subscribe - // return error not found - return ret, nil -} - -// CheckTopicPartitionsStatus check the topic partitions on the broker -func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) { - ret := &mq_pb.CheckTopicPartitionsStatusResponse{} - return ret, nil -} - // createOrUpdateTopicPartitions creates the topic partitions on the broker // 1. check -func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment) (err error) { +func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignments []*mq_pb.BrokerPartitionAssignment) (err error) { // create or update each partition - if prevAssignment == nil { + if prevAssignments == nil { broker.createOrUpdateTopicPartition(topic, nil) } else { - for _, partitionAssignment := range prevAssignment.BrokerPartitions { - broker.createOrUpdateTopicPartition(topic, partitionAssignment) + for _, brokerPartitionAssignment := range prevAssignments { + broker.createOrUpdateTopicPartition(topic, brokerPartitionAssignment) } } return nil } -func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (newAssignment *mq_pb.BrokerPartitionsAssignment) { +func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (newAssignment *mq_pb.BrokerPartitionAssignment) { shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment) if !shouldCreate { } return } -func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (shouldCreate bool) { +func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (shouldCreate bool) { if oldAssignment == nil { return true } for _, b := range oldAssignment.FollowerBrokers { - pb.WithBrokerClient(false, pb.ServerAddress(b), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + pb.WithBrokerGrpcClient(false, b, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { _, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{ - Namespace: string(topic.Namespace), - Topic: topic.Name, - BrokerPartitionsAssignment: oldAssignment, - ShouldCancelIfNotMatch: true, + Namespace: string(topic.Namespace), + Topic: topic.Name, + BrokerPartitionAssignment: oldAssignment, + ShouldCancelIfNotMatch: true, }) if err != nil { shouldCreate = true diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go new file mode 100644 index 000000000..96448be83 --- /dev/null +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -0,0 +1,50 @@ +package broker + +import ( + "context" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +// FindTopicBrokers returns the brokers that are serving the topic +// +// 1. lock the topic +// +// 2. find the topic partitions on the filer +// 2.1 if the topic is not found, return error +// 2.2 if the request is_for_publish, create the topic +// 2.2.1 if the request is_for_subscribe, return error not found +// 2.2.2 if the request is_for_publish, create the topic +// 2.2 if the topic is found, return the brokers +// +// 3. unlock the topic +func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (*mq_pb.LookupTopicBrokersResponse, error) { + ret := &mq_pb.LookupTopicBrokersResponse{} + // TODO lock the topic + + // find the topic partitions on the filer + // if the topic is not found + // if the request is_for_publish + // create the topic + // if the request is_for_subscribe + // return error not found + // t := topic.FromPbTopic(request.Topic) + ret.Topic = request.Topic + ret.BrokerPartitionAssignments = []*mq_pb.BrokerPartitionAssignment{ + { + LeaderBroker: "localhost:17777", + FollowerBrokers: []string{"localhost:17777"}, + Partition: &mq_pb.Partition{ + RingSize: MaxPartitionCount, + RangeStart: 0, + RangeStop: MaxPartitionCount, + }, + }, + } + return ret, nil +} + +// CheckTopicPartitionsStatus check the topic partitions on the broker +func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) { + ret := &mq_pb.CheckTopicPartitionsStatusResponse{} + return ret, nil +} diff --git a/weed/mq/broker/brokder_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 58ab6e5d2..acbffefba 100644 --- a/weed/mq/broker/brokder_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -7,6 +7,8 @@ import ( "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "sync/atomic" + "time" ) // For a new or re-configured topic, or one of the broker went offline, @@ -73,39 +75,89 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS // 3. write to the filer var localTopicPartition *topic.LocalPartition + req, err := stream.Recv() + if err != nil { + return err + } + response := &mq_pb.PublishResponse{} + // TODO check whether current broker should be the leader for the topic partition + ackInterval := 1 + initMessage := req.GetInit() + if initMessage != nil { + t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) + localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p) + if localTopicPartition == nil { + localTopicPartition = topic.NewLocalPartition(t, p, true, nil) + broker.localTopicManager.AddTopicPartition(t, localTopicPartition) + } + ackInterval = int(initMessage.AckInterval) + stream.Send(response) + } else { + response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition) + glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition) + return stream.Send(response) + } + + ackCounter := 0 + var ackSequence int64 + var isStopping int32 + respChan := make(chan *mq_pb.PublishResponse, 128) + defer func() { + atomic.StoreInt32(&isStopping, 1) + close(respChan) + }() + go func() { + ticker := time.NewTicker(1 * time.Second) + for { + select { + case resp := <-respChan: + if resp != nil { + if err := stream.Send(resp); err != nil { + glog.Errorf("Error sending response %v: %v", resp, err) + } + } else { + return + } + case <-ticker.C: + if atomic.LoadInt32(&isStopping) == 0 { + response := &mq_pb.PublishResponse{ + AckSequence: ackSequence, + } + respChan <- response + } else { + return + } + } + } + }() + + // process each published messages for { + // receive a message req, err := stream.Recv() if err != nil { return err } // Process the received message - sequence := req.GetSequence() - response := &mq_pb.PublishResponse{ - AckSequence: sequence, - } if dataMessage := req.GetData(); dataMessage != nil { - if localTopicPartition == nil { - response.Error = "topic partition not initialized" - glog.Errorf("topic partition not found") - } else { - localTopicPartition.Publish(dataMessage) - } - } else if initMessage := req.GetInit(); initMessage != nil { - localTopicPartition = broker.localTopicManager.GetTopicPartition( - topic.NewTopic(topic.Namespace(initMessage.Segment.Namespace), initMessage.Segment.Topic), - topic.FromPbPartition(initMessage.Segment.Partition), - ) - if localTopicPartition == nil { - response.Error = fmt.Sprintf("topic partition %v not found", initMessage.Segment) - glog.Errorf("topic partition %v not found", initMessage.Segment) - } + localTopicPartition.Publish(dataMessage) } - if err := stream.Send(response); err != nil { - glog.Errorf("Error sending setup response: %v", err) + + ackCounter++ + ackSequence++ + if ackCounter >= ackInterval { + ackCounter = 0 + // send back the ack + response := &mq_pb.PublishResponse{ + AckSequence: ackSequence, + } + respChan <- response } } + glog.Infof("publish stream closed") + return nil } @@ -114,14 +166,14 @@ func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, reque ret := &mq_pb.AssignTopicPartitionsResponse{} self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port)) - for _, partition := range request.TopicPartitionsAssignment.BrokerPartitions { - localPartiton := topic.FromPbBrokerPartitionsAssignment(self, partition) + for _, brokerPartition := range request.BrokerPartitionAssignments { + localPartiton := topic.FromPbBrokerPartitionAssignment(self, brokerPartition) broker.localTopicManager.AddTopicPartition( topic.FromPbTopic(request.Topic), localPartiton) if request.IsLeader { for _, follower := range localPartiton.FollowerBrokers { - err := pb.WithBrokerClient(false, follower, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + err := pb.WithBrokerGrpcClient(false, follower.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { _, err := client.AssignTopicPartitions(context.Background(), request) return err }) diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go new file mode 100644 index 000000000..ad65df2d1 --- /dev/null +++ b/weed/mq/broker/broker_grpc_sub.go @@ -0,0 +1,44 @@ +package broker + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "time" +) + +func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error { + + localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.Cursor.Topic), + topic.FromPbPartition(req.Cursor.Partition)) + if localTopicPartition == nil { + stream.Send(&mq_pb.SubscribeResponse{ + Message: &mq_pb.SubscribeResponse_Ctrl{ + Ctrl: &mq_pb.SubscribeResponse_CtrlMessage{ + Error: "not initialized", + }, + }, + }) + return nil + } + + clientName := fmt.Sprintf("%s/%s-%s", req.Consumer.ConsumerGroup, req.Consumer.ConsumerId, req.Consumer.ClientId) + + localTopicPartition.Subscribe(clientName, time.Now(), func(logEntry *filer_pb.LogEntry) error { + value := logEntry.GetData() + if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{ + Data: &mq_pb.DataMessage{ + Key: []byte(fmt.Sprintf("key-%d", logEntry.PartitionKeyHash)), + Value: value, + }, + }}); err != nil { + glog.Errorf("Error sending setup response: %v", err) + return err + } + return nil + }) + + return nil +} diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index da1284c80..aceaab82b 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -112,7 +112,7 @@ func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb func (broker *MessageQueueBroker) withBrokerClient(streamingMode bool, server pb.ServerAddress, fn func(client mq_pb.SeaweedMessagingClient) error) error { - return pb.WithBrokerClient(streamingMode, server, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + return pb.WithBrokerGrpcClient(streamingMode, server.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { return fn(client) }) diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go new file mode 100644 index 000000000..03674db3f --- /dev/null +++ b/weed/mq/client/cmd/weed_pub/publisher.go @@ -0,0 +1,58 @@ +package main + +import ( + "flag" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client" + "log" + "sync" + "time" +) + +var ( + messageCount = flag.Int("n", 1000, "message count") + concurrency = flag.Int("c", 4, "concurrency count") +) + +func doPublish(publisher *pub_client.TopicPublisher, id int) { + startTime := time.Now() + for i := 0; i < *messageCount / *concurrency; i++ { + // Simulate publishing a message + key := []byte(fmt.Sprintf("key-%d-%d", id, i)) + value := []byte(fmt.Sprintf("value-%d-%d", id, i)) + publisher.Publish(key, value) // Call your publisher function here + // println("Published", string(key), string(value)) + } + elapsed := time.Since(startTime) + log.Printf("Publisher %d finished in %s", id, elapsed) +} + +func main() { + flag.Parse() + publisher := pub_client.NewTopicPublisher( + "test", "test") + if err := publisher.Connect("localhost:17777"); err != nil { + fmt.Println(err) + return + } + + startTime := time.Now() + + // Start multiple publishers + var wg sync.WaitGroup + for i := 0; i < *concurrency; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + doPublish(publisher, id) + }(i) + } + + // Wait for all publishers to finish + wg.Wait() + elapsed := time.Since(startTime) + publisher.Shutdown() + + log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds()) + +} diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go new file mode 100644 index 000000000..1ec24f406 --- /dev/null +++ b/weed/mq/client/cmd/weed_sub/subscriber.go @@ -0,0 +1,44 @@ +package main + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func main() { + + subscriberConfig := &sub_client.SubscriberConfiguration{ + ClientId: "testSubscriber", + GroupId: "test", + GroupInstanceId: "test", + GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + } + + contentConfig := &sub_client.ContentConfiguration{ + Namespace: "test", + Topic: "test", + Filter: "", + } + + subscriber := sub_client.NewTopicSubscriber(subscriberConfig, contentConfig) + if err := subscriber.Connect("localhost:17777"); err != nil { + fmt.Println(err) + return + } + + subscriber.SetEachMessageFunc(func(key, value []byte) bool { + println(string(key), "=>", string(value)) + return true + }) + + subscriber.SetCompletionFunc(func() { + println("done subscribing") + }) + + if err := subscriber.Subscribe(); err != nil { + fmt.Println(err) + } + +} diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go new file mode 100644 index 000000000..28cb29015 --- /dev/null +++ b/weed/mq/client/pub_client/lookup.go @@ -0,0 +1,116 @@ +package pub_client + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func (p *TopicPublisher) doLookup(brokerAddress string) error { + err := pb.WithBrokerGrpcClient(true, + brokerAddress, + p.grpcDialOption, + func(client mq_pb.SeaweedMessagingClient) error { + lookupResp, err := client.LookupTopicBrokers(context.Background(), + &mq_pb.LookupTopicBrokersRequest{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + IsForPublish: true, + }) + if err != nil { + return err + } + for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments { + // partition => publishClient + publishClient, redirectTo, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker) + if err != nil { + return err + } + for redirectTo != "" { + publishClient, redirectTo, err = p.doConnect(brokerPartitionAssignment.Partition, redirectTo) + if err != nil { + return err + } + } + p.partition2Broker.Insert( + brokerPartitionAssignment.Partition.RangeStart, + brokerPartitionAssignment.Partition.RangeStop, + publishClient) + } + return nil + }) + + if err != nil { + return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err) + } + return nil +} + +// broker => publish client +// send init message +// save the publishing client +func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, redirectTo string, err error) { + grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption) + if err != nil { + return publishClient, redirectTo, fmt.Errorf("dial broker %s: %v", brokerAddress, err) + } + brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) + stream, err := brokerClient.Publish(context.Background()) + if err != nil { + return publishClient, redirectTo, fmt.Errorf("create publish client: %v", err) + } + publishClient = &PublishClient{ + SeaweedMessaging_PublishClient: stream, + Broker: brokerAddress, + } + if err = publishClient.Send(&mq_pb.PublishRequest{ + Message: &mq_pb.PublishRequest_Init{ + Init: &mq_pb.PublishRequest_InitMessage{ + Topic: &mq_pb.Topic{ + Namespace: p.namespace, + Name: p.topic, + }, + Partition: &mq_pb.Partition{ + RingSize: partition.RingSize, + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + }, + AckInterval: 128, + }, + }, + }); err != nil { + return publishClient, redirectTo, fmt.Errorf("send init message: %v", err) + } + resp, err := stream.Recv() + if err != nil { + return publishClient, redirectTo, fmt.Errorf("recv init response: %v", err) + } + if resp.Error != "" { + return publishClient, redirectTo, fmt.Errorf("init response error: %v", resp.Error) + } + if resp.RedirectToBroker != "" { + redirectTo = resp.RedirectToBroker + return publishClient, redirectTo, nil + } + + go func() { + for { + _, err := publishClient.Recv() + if err != nil { + e, ok := status.FromError(err) + if ok && e.Code() == codes.Unknown && e.Message() == "EOF" { + return + } + publishClient.Err = err + fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err) + return + } + } + }() + return publishClient, redirectTo, nil +} diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go new file mode 100644 index 000000000..de4714831 --- /dev/null +++ b/weed/mq/client/pub_client/publish.go @@ -0,0 +1,41 @@ +package pub_client + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/mq/broker" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +func (p *TopicPublisher) Publish(key, value []byte) error { + hashKey := util.HashToInt32(key) % broker.MaxPartitionCount + if hashKey < 0 { + hashKey = -hashKey + } + publishClient, found := p.partition2Broker.Floor(hashKey, hashKey) + if !found { + return fmt.Errorf("no broker found for key %d", hashKey) + } + p.Lock() + defer p.Unlock() + // dead lock here + //google.golang.org/grpc/internal/transport.(*writeQuota).get(flowcontrol.go:59) + //google.golang.org/grpc/internal/transport.(*http2Client).Write(http2_client.go:1047) + //google.golang.org/grpc.(*csAttempt).sendMsg(stream.go:1040) + //google.golang.org/grpc.(*clientStream).SendMsg.func2(stream.go:892) + //google.golang.org/grpc.(*clientStream).withRetry(stream.go:752) + //google.golang.org/grpc.(*clientStream).SendMsg(stream.go:894) + //github.com/seaweedfs/seaweedfs/weed/pb/mq_pb.(*seaweedMessagingPublishClient).Send(mq_grpc.pb.go:141) + //github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client.(*TopicPublisher).Publish(publish.go:19) + if err := publishClient.Send(&mq_pb.PublishRequest{ + Message: &mq_pb.PublishRequest_Data{ + Data: &mq_pb.DataMessage{ + Key: key, + Value: value, + }, + }, + }); err != nil { + return fmt.Errorf("send publish request: %v", err) + } + return nil +} diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go new file mode 100644 index 000000000..7073457f3 --- /dev/null +++ b/weed/mq/client/pub_client/publisher.go @@ -0,0 +1,57 @@ +package pub_client + +import ( + "github.com/rdleal/intervalst/interval" + "github.com/seaweedfs/seaweedfs/weed/mq/broker" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "sync" + "time" +) + +type PublisherConfiguration struct { +} + +type PublishClient struct { + mq_pb.SeaweedMessaging_PublishClient + Broker string + Err error +} +type TopicPublisher struct { + namespace string + topic string + partition2Broker *interval.SearchTree[*PublishClient, int32] + grpcDialOption grpc.DialOption + sync.Mutex // protects grpc +} + +func NewTopicPublisher(namespace, topic string) *TopicPublisher { + return &TopicPublisher{ + namespace: namespace, + topic: topic, + partition2Broker: interval.NewSearchTree[*PublishClient](func(a, b int32) int { + return int(a - b) + }), + grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + } +} + +func (p *TopicPublisher) Connect(bootstrapBroker string) error { + if err := p.doLookup(bootstrapBroker); err != nil { + return err + } + return nil +} + +func (p *TopicPublisher) Shutdown() error { + + if clients, found := p.partition2Broker.AllIntersections(0, broker.MaxPartitionCount); found { + for _, client := range clients { + client.CloseSend() + } + } + time.Sleep(1100 * time.Millisecond) + + return nil +} diff --git a/weed/mq/client/sub_client/lookup.go b/weed/mq/client/sub_client/lookup.go new file mode 100644 index 000000000..b6d2a8c53 --- /dev/null +++ b/weed/mq/client/sub_client/lookup.go @@ -0,0 +1,34 @@ +package sub_client + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +func (sub *TopicSubscriber) doLookup(brokerAddress string) error { + err := pb.WithBrokerGrpcClient(true, + brokerAddress, + sub.SubscriberConfig.GrpcDialOption, + func(client mq_pb.SeaweedMessagingClient) error { + lookupResp, err := client.LookupTopicBrokers(context.Background(), + &mq_pb.LookupTopicBrokersRequest{ + Topic: &mq_pb.Topic{ + Namespace: sub.ContentConfig.Namespace, + Name: sub.ContentConfig.Topic, + }, + IsForPublish: false, + }) + if err != nil { + return err + } + sub.brokerPartitionAssignments = lookupResp.BrokerPartitionAssignments + return nil + }) + + if err != nil { + return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err) + } + return nil +} diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go new file mode 100644 index 000000000..3667a2bdf --- /dev/null +++ b/weed/mq/client/sub_client/subscribe.go @@ -0,0 +1,72 @@ +package sub_client + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "sync" +) + +// Subscribe subscribes to a topic's specified partitions. +// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker. + +func (sub *TopicSubscriber) Subscribe() error { + var wg sync.WaitGroup + for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments { + brokerAddress := brokerPartitionAssignment.LeaderBroker + grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.SubscriberConfig.GrpcDialOption) + if err != nil { + return fmt.Errorf("dial broker %s: %v", brokerAddress, err) + } + brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) + subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{ + Consumer: &mq_pb.SubscribeRequest_Consumer{ + ConsumerGroup: sub.SubscriberConfig.GroupId, + ConsumerId: sub.SubscriberConfig.GroupInstanceId, + }, + Cursor: &mq_pb.SubscribeRequest_Cursor{ + Topic: &mq_pb.Topic{ + Namespace: sub.ContentConfig.Namespace, + Name: sub.ContentConfig.Topic, + }, + Partition: &mq_pb.Partition{ + RingSize: brokerPartitionAssignment.Partition.RingSize, + RangeStart: brokerPartitionAssignment.Partition.RangeStart, + RangeStop: brokerPartitionAssignment.Partition.RangeStop, + }, + Filter: sub.ContentConfig.Filter, + }, + }) + if err != nil { + return fmt.Errorf("create subscribe client: %v", err) + } + wg.Add(1) + go func() { + defer wg.Done() + if sub.OnCompletionFunc != nil { + defer sub.OnCompletionFunc() + } + for { + resp, err := subscribeClient.Recv() + if err != nil { + fmt.Printf("subscribe error: %v\n", err) + return + } + if resp.Message == nil { + continue + } + switch m := resp.Message.(type) { + case *mq_pb.SubscribeResponse_Data: + if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) { + return + } + case *mq_pb.SubscribeResponse_Ctrl: + // ignore + } + } + }() + } + wg.Wait() + return nil +} diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go new file mode 100644 index 000000000..404d05222 --- /dev/null +++ b/weed/mq/client/sub_client/subscriber.go @@ -0,0 +1,53 @@ +package sub_client + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "google.golang.org/grpc" +) + +type SubscriberConfiguration struct { + ClientId string + GroupId string + GroupInstanceId string + BootstrapServers []string + GrpcDialOption grpc.DialOption +} + +type ContentConfiguration struct { + Namespace string + Topic string + Filter string +} + +type OnEachMessageFunc func(key, value []byte) (shouldContinue bool) +type OnCompletionFunc func() + +type TopicSubscriber struct { + SubscriberConfig *SubscriberConfiguration + ContentConfig *ContentConfiguration + brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment + OnEachMessageFunc OnEachMessageFunc + OnCompletionFunc OnCompletionFunc +} + +func NewTopicSubscriber(subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber { + return &TopicSubscriber{ + SubscriberConfig: subscriber, + ContentConfig: content, + } +} + +func (sub *TopicSubscriber) Connect(bootstrapBroker string) error { + if err := sub.doLookup(bootstrapBroker); err != nil { + return err + } + return nil +} + +func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc) { + sub.OnEachMessageFunc = onEachMessageFn +} + +func (sub *TopicSubscriber) SetCompletionFunc(onCompeletionFn OnCompletionFunc) { + sub.OnCompletionFunc = onCompeletionFn +} diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go index 168e3d561..6e7db5d08 100644 --- a/weed/mq/topic/local_manager.go +++ b/weed/mq/topic/local_manager.go @@ -25,6 +25,7 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition Partitions: make([]*LocalPartition, 0), } } + manager.topics.SetIfAbsent(topic.String(), localTopic) if localTopic.findPartition(localPartition.Partition) != nil { return } diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index e26b7afd1..eaedb9f20 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -1,7 +1,9 @@ package topic import ( + "fmt" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "time" @@ -14,19 +16,41 @@ type LocalPartition struct { logBuffer *log_buffer.LogBuffer } -func (p LocalPartition) Publish(message *mq_pb.PublishRequest_DataMessage) { +func NewLocalPartition(topic Topic, partition Partition, isLeader bool, followerBrokers []pb.ServerAddress) *LocalPartition { + return &LocalPartition{ + Partition: partition, + isLeader: isLeader, + FollowerBrokers: followerBrokers, + logBuffer: log_buffer.NewLogBuffer( + fmt.Sprintf("%s/%s/%4d-%4d", topic.Namespace, topic.Name, partition.RangeStart, partition.RangeStop), + 2*time.Minute, + func(startTime, stopTime time.Time, buf []byte) { + + }, + func() { + + }, + ), + } +} + +type OnEachMessageFn func(logEntry *filer_pb.LogEntry) error + +func (p LocalPartition) Publish(message *mq_pb.DataMessage) { p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano()) } -func FromPbBrokerPartitionsAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionsAssignment) *LocalPartition { +func (p LocalPartition) Subscribe(clientName string, startReadTime time.Time, eachMessageFn OnEachMessageFn) { + p.logBuffer.LoopProcessLogData(clientName, startReadTime, 0, func() bool { + return true + }, eachMessageFn) +} + +func FromPbBrokerPartitionAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionAssignment) *LocalPartition { isLeaer := assignment.LeaderBroker == string(self) localPartition := &LocalPartition{ - Partition: Partition{ - RangeStart: assignment.PartitionStart, - RangeStop: assignment.PartitionStop, - RingSize: PartitionCount, - }, - isLeader: isLeaer, + Partition: FromPbPartition(assignment.Partition), + isLeader: isLeaer, } if !isLeaer { return localPartition diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index 5f685912e..2df2be70f 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -225,14 +225,6 @@ func WithVolumeServerClient(streamingMode bool, volumeServer ServerAddress, grpc } -func WithBrokerClient(streamingMode bool, broker ServerAddress, grpcDialOption grpc.DialOption, fn func(client mq_pb.SeaweedMessagingClient) error) error { - return WithGrpcClient(streamingMode, 0, func(grpcConnection *grpc.ClientConn) error { - client := mq_pb.NewSeaweedMessagingClient(grpcConnection) - return fn(client) - }, broker.ToGrpcAddress(), false, grpcDialOption) - -} - func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[string]ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) { for _, masterGrpcAddress := range masterGrpcAddresses { diff --git a/weed/pb/mq.proto b/weed/pb/mq.proto index 47440a46e..8a75bd7a2 100644 --- a/weed/pb/mq.proto +++ b/weed/pb/mq.proto @@ -21,7 +21,7 @@ service SeaweedMessaging { } // control plane for topic partitions - rpc FindTopicBrokers (FindTopicBrokersRequest) returns (FindTopicBrokersResponse) { + rpc LookupTopicBrokers (LookupTopicBrokersRequest) returns (LookupTopicBrokersResponse) { } // a pub client will call this to get the topic partitions assignment rpc RequestTopicPartitions (RequestTopicPartitionsRequest) returns (RequestTopicPartitionsResponse) { @@ -34,6 +34,8 @@ service SeaweedMessaging { // data plane rpc Publish (stream PublishRequest) returns (stream PublishResponse) { } + rpc Subscribe (SubscribeRequest) returns (stream SubscribeResponse) { + } } ////////////////////////////////////////////////// @@ -98,23 +100,18 @@ message CheckBrokerLoadResponse { } -message FindTopicBrokersRequest { +message LookupTopicBrokersRequest { Topic topic = 1; bool is_for_publish = 2; } -message FindTopicBrokersResponse { +message LookupTopicBrokersResponse { Topic topic = 1; - TopicPartitionsAssignment topic_partitions_assignment = 2; -} -message BrokerPartitionsAssignment { - int32 partition_start = 1; - int32 partition_stop = 2; - string leader_broker = 3; - repeated string follower_brokers = 4; + repeated BrokerPartitionAssignment broker_partition_assignments = 2; } -message TopicPartitionsAssignment { - int32 partition_count = 1; // over-sharded partitions, usually 1024 - repeated BrokerPartitionsAssignment broker_partitions = 2; +message BrokerPartitionAssignment { + Partition partition = 1; + string leader_broker = 2; + repeated string follower_brokers = 3; } message RequestTopicPartitionsRequest { @@ -122,12 +119,12 @@ message RequestTopicPartitionsRequest { int32 partition_count = 2; } message RequestTopicPartitionsResponse { - TopicPartitionsAssignment topic_partitions_assignment = 1; + repeated BrokerPartitionAssignment broker_partition_assignments = 1; } message AssignTopicPartitionsRequest { Topic topic = 1; - TopicPartitionsAssignment topic_partitions_assignment = 2; + repeated BrokerPartitionAssignment broker_partition_assignments = 2; bool is_leader = 3; } message AssignTopicPartitionsResponse { @@ -136,21 +133,23 @@ message AssignTopicPartitionsResponse { message CheckTopicPartitionsStatusRequest { string namespace = 1; string topic = 2; - BrokerPartitionsAssignment broker_partitions_assignment = 3; + BrokerPartitionAssignment broker_partition_assignment = 3; bool should_cancel_if_not_match = 4; } message CheckTopicPartitionsStatusResponse { - TopicPartitionsAssignment topic_partitions_assignment = 1; + repeated BrokerPartitionAssignment broker_partition_assignments = 1; } ////////////////////////////////////////////////// +message DataMessage { + bytes key = 1; + bytes value = 2; +} message PublishRequest { message InitMessage { - Segment segment = 1; - } - message DataMessage { - bytes key = 1; - bytes value = 2; + Topic topic = 1; + Partition partition = 2; + int32 ack_interval = 3; } oneof message { InitMessage init = 1; @@ -161,5 +160,33 @@ message PublishRequest { message PublishResponse { int64 ack_sequence = 1; string error = 2; - bool is_closed = 3; + string redirect_to_broker = 3; +} +message SubscribeRequest { + message Consumer { + string consumer_group = 1; + string consumer_id = 2; + string client_id = 3; + } + message Cursor { + Topic topic = 1; + Partition partition = 2; + oneof offset { + int64 start_offset = 3; + int64 start_timestamp_ns = 4; + } + string filter = 5; + } + Consumer consumer = 1; + Cursor cursor = 2; +} +message SubscribeResponse { + message CtrlMessage { + string error = 1; + string redirect_to_broker = 2; + } + oneof message { + CtrlMessage ctrl = 1; + DataMessage data = 2; + } } diff --git a/weed/pb/mq_pb/mq.pb.go b/weed/pb/mq_pb/mq.pb.go index 818acc111..49afdb3d1 100644 --- a/weed/pb/mq_pb/mq.pb.go +++ b/weed/pb/mq_pb/mq.pb.go @@ -672,7 +672,7 @@ func (x *CheckBrokerLoadResponse) GetBytesCount() int64 { return 0 } -type FindTopicBrokersRequest struct { +type LookupTopicBrokersRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -681,8 +681,8 @@ type FindTopicBrokersRequest struct { IsForPublish bool `protobuf:"varint,2,opt,name=is_for_publish,json=isForPublish,proto3" json:"is_for_publish,omitempty"` } -func (x *FindTopicBrokersRequest) Reset() { - *x = FindTopicBrokersRequest{} +func (x *LookupTopicBrokersRequest) Reset() { + *x = LookupTopicBrokersRequest{} if protoimpl.UnsafeEnabled { mi := &file_mq_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -690,13 +690,13 @@ func (x *FindTopicBrokersRequest) Reset() { } } -func (x *FindTopicBrokersRequest) String() string { +func (x *LookupTopicBrokersRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*FindTopicBrokersRequest) ProtoMessage() {} +func (*LookupTopicBrokersRequest) ProtoMessage() {} -func (x *FindTopicBrokersRequest) ProtoReflect() protoreflect.Message { +func (x *LookupTopicBrokersRequest) ProtoReflect() protoreflect.Message { mi := &file_mq_proto_msgTypes[12] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -708,36 +708,36 @@ func (x *FindTopicBrokersRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use FindTopicBrokersRequest.ProtoReflect.Descriptor instead. -func (*FindTopicBrokersRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use LookupTopicBrokersRequest.ProtoReflect.Descriptor instead. +func (*LookupTopicBrokersRequest) Descriptor() ([]byte, []int) { return file_mq_proto_rawDescGZIP(), []int{12} } -func (x *FindTopicBrokersRequest) GetTopic() *Topic { +func (x *LookupTopicBrokersRequest) GetTopic() *Topic { if x != nil { return x.Topic } return nil } -func (x *FindTopicBrokersRequest) GetIsForPublish() bool { +func (x *LookupTopicBrokersRequest) GetIsForPublish() bool { if x != nil { return x.IsForPublish } return false } -type FindTopicBrokersResponse struct { +type LookupTopicBrokersResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - TopicPartitionsAssignment *TopicPartitionsAssignment `protobuf:"bytes,2,opt,name=topic_partitions_assignment,json=topicPartitionsAssignment,proto3" json:"topic_partitions_assignment,omitempty"` + Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + BrokerPartitionAssignments []*BrokerPartitionAssignment `protobuf:"bytes,2,rep,name=broker_partition_assignments,json=brokerPartitionAssignments,proto3" json:"broker_partition_assignments,omitempty"` } -func (x *FindTopicBrokersResponse) Reset() { - *x = FindTopicBrokersResponse{} +func (x *LookupTopicBrokersResponse) Reset() { + *x = LookupTopicBrokersResponse{} if protoimpl.UnsafeEnabled { mi := &file_mq_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -745,13 +745,13 @@ func (x *FindTopicBrokersResponse) Reset() { } } -func (x *FindTopicBrokersResponse) String() string { +func (x *LookupTopicBrokersResponse) String() string { return protoimpl.X.MessageStringOf(x) } -func (*FindTopicBrokersResponse) ProtoMessage() {} +func (*LookupTopicBrokersResponse) ProtoMessage() {} -func (x *FindTopicBrokersResponse) ProtoReflect() protoreflect.Message { +func (x *LookupTopicBrokersResponse) ProtoReflect() protoreflect.Message { mi := &file_mq_proto_msgTypes[13] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -763,38 +763,37 @@ func (x *FindTopicBrokersResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use FindTopicBrokersResponse.ProtoReflect.Descriptor instead. -func (*FindTopicBrokersResponse) Descriptor() ([]byte, []int) { +// Deprecated: Use LookupTopicBrokersResponse.ProtoReflect.Descriptor instead. +func (*LookupTopicBrokersResponse) Descriptor() ([]byte, []int) { return file_mq_proto_rawDescGZIP(), []int{13} } -func (x *FindTopicBrokersResponse) GetTopic() *Topic { +func (x *LookupTopicBrokersResponse) GetTopic() *Topic { if x != nil { return x.Topic } return nil } -func (x *FindTopicBrokersResponse) GetTopicPartitionsAssignment() *TopicPartitionsAssignment { +func (x *LookupTopicBrokersResponse) GetBrokerPartitionAssignments() []*BrokerPartitionAssignment { if x != nil { - return x.TopicPartitionsAssignment + return x.BrokerPartitionAssignments } return nil } -type BrokerPartitionsAssignment struct { +type BrokerPartitionAssignment struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - PartitionStart int32 `protobuf:"varint,1,opt,name=partition_start,json=partitionStart,proto3" json:"partition_start,omitempty"` - PartitionStop int32 `protobuf:"varint,2,opt,name=partition_stop,json=partitionStop,proto3" json:"partition_stop,omitempty"` - LeaderBroker string `protobuf:"bytes,3,opt,name=leader_broker,json=leaderBroker,proto3" json:"leader_broker,omitempty"` - FollowerBrokers []string `protobuf:"bytes,4,rep,name=follower_brokers,json=followerBrokers,proto3" json:"follower_brokers,omitempty"` + Partition *Partition `protobuf:"bytes,1,opt,name=partition,proto3" json:"partition,omitempty"` + LeaderBroker string `protobuf:"bytes,2,opt,name=leader_broker,json=leaderBroker,proto3" json:"leader_broker,omitempty"` + FollowerBrokers []string `protobuf:"bytes,3,rep,name=follower_brokers,json=followerBrokers,proto3" json:"follower_brokers,omitempty"` } -func (x *BrokerPartitionsAssignment) Reset() { - *x = BrokerPartitionsAssignment{} +func (x *BrokerPartitionAssignment) Reset() { + *x = BrokerPartitionAssignment{} if protoimpl.UnsafeEnabled { mi := &file_mq_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -802,13 +801,13 @@ func (x *BrokerPartitionsAssignment) Reset() { } } -func (x *BrokerPartitionsAssignment) String() string { +func (x *BrokerPartitionAssignment) String() string { return protoimpl.X.MessageStringOf(x) } -func (*BrokerPartitionsAssignment) ProtoMessage() {} +func (*BrokerPartitionAssignment) ProtoMessage() {} -func (x *BrokerPartitionsAssignment) ProtoReflect() protoreflect.Message { +func (x *BrokerPartitionAssignment) ProtoReflect() protoreflect.Message { mi := &file_mq_proto_msgTypes[14] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -820,94 +819,32 @@ func (x *BrokerPartitionsAssignment) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use BrokerPartitionsAssignment.ProtoReflect.Descriptor instead. -func (*BrokerPartitionsAssignment) Descriptor() ([]byte, []int) { +// Deprecated: Use BrokerPartitionAssignment.ProtoReflect.Descriptor instead. +func (*BrokerPartitionAssignment) Descriptor() ([]byte, []int) { return file_mq_proto_rawDescGZIP(), []int{14} } -func (x *BrokerPartitionsAssignment) GetPartitionStart() int32 { +func (x *BrokerPartitionAssignment) GetPartition() *Partition { if x != nil { - return x.PartitionStart - } - return 0 -} - -func (x *BrokerPartitionsAssignment) GetPartitionStop() int32 { - if x != nil { - return x.PartitionStop + return x.Partition } - return 0 + return nil } -func (x *BrokerPartitionsAssignment) GetLeaderBroker() string { +func (x *BrokerPartitionAssignment) GetLeaderBroker() string { if x != nil { return x.LeaderBroker } return "" } -func (x *BrokerPartitionsAssignment) GetFollowerBrokers() []string { +func (x *BrokerPartitionAssignment) GetFollowerBrokers() []string { if x != nil { return x.FollowerBrokers } return nil } -type TopicPartitionsAssignment struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - PartitionCount int32 `protobuf:"varint,1,opt,name=partition_count,json=partitionCount,proto3" json:"partition_count,omitempty"` // over-sharded partitions, usually 1024 - BrokerPartitions []*BrokerPartitionsAssignment `protobuf:"bytes,2,rep,name=broker_partitions,json=brokerPartitions,proto3" json:"broker_partitions,omitempty"` -} - -func (x *TopicPartitionsAssignment) Reset() { - *x = TopicPartitionsAssignment{} - if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[15] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *TopicPartitionsAssignment) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*TopicPartitionsAssignment) ProtoMessage() {} - -func (x *TopicPartitionsAssignment) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[15] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use TopicPartitionsAssignment.ProtoReflect.Descriptor instead. -func (*TopicPartitionsAssignment) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{15} -} - -func (x *TopicPartitionsAssignment) GetPartitionCount() int32 { - if x != nil { - return x.PartitionCount - } - return 0 -} - -func (x *TopicPartitionsAssignment) GetBrokerPartitions() []*BrokerPartitionsAssignment { - if x != nil { - return x.BrokerPartitions - } - return nil -} - type RequestTopicPartitionsRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -920,7 +857,7 @@ type RequestTopicPartitionsRequest struct { func (x *RequestTopicPartitionsRequest) Reset() { *x = RequestTopicPartitionsRequest{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[16] + mi := &file_mq_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -933,7 +870,7 @@ func (x *RequestTopicPartitionsRequest) String() string { func (*RequestTopicPartitionsRequest) ProtoMessage() {} func (x *RequestTopicPartitionsRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[16] + mi := &file_mq_proto_msgTypes[15] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -946,7 +883,7 @@ func (x *RequestTopicPartitionsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RequestTopicPartitionsRequest.ProtoReflect.Descriptor instead. func (*RequestTopicPartitionsRequest) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{16} + return file_mq_proto_rawDescGZIP(), []int{15} } func (x *RequestTopicPartitionsRequest) GetTopic() *Topic { @@ -968,13 +905,13 @@ type RequestTopicPartitionsResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - TopicPartitionsAssignment *TopicPartitionsAssignment `protobuf:"bytes,1,opt,name=topic_partitions_assignment,json=topicPartitionsAssignment,proto3" json:"topic_partitions_assignment,omitempty"` + BrokerPartitionAssignments []*BrokerPartitionAssignment `protobuf:"bytes,1,rep,name=broker_partition_assignments,json=brokerPartitionAssignments,proto3" json:"broker_partition_assignments,omitempty"` } func (x *RequestTopicPartitionsResponse) Reset() { *x = RequestTopicPartitionsResponse{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[17] + mi := &file_mq_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -987,7 +924,7 @@ func (x *RequestTopicPartitionsResponse) String() string { func (*RequestTopicPartitionsResponse) ProtoMessage() {} func (x *RequestTopicPartitionsResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[17] + mi := &file_mq_proto_msgTypes[16] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1000,12 +937,12 @@ func (x *RequestTopicPartitionsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RequestTopicPartitionsResponse.ProtoReflect.Descriptor instead. func (*RequestTopicPartitionsResponse) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{17} + return file_mq_proto_rawDescGZIP(), []int{16} } -func (x *RequestTopicPartitionsResponse) GetTopicPartitionsAssignment() *TopicPartitionsAssignment { +func (x *RequestTopicPartitionsResponse) GetBrokerPartitionAssignments() []*BrokerPartitionAssignment { if x != nil { - return x.TopicPartitionsAssignment + return x.BrokerPartitionAssignments } return nil } @@ -1015,15 +952,15 @@ type AssignTopicPartitionsRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - TopicPartitionsAssignment *TopicPartitionsAssignment `protobuf:"bytes,2,opt,name=topic_partitions_assignment,json=topicPartitionsAssignment,proto3" json:"topic_partitions_assignment,omitempty"` - IsLeader bool `protobuf:"varint,3,opt,name=is_leader,json=isLeader,proto3" json:"is_leader,omitempty"` + Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + BrokerPartitionAssignments []*BrokerPartitionAssignment `protobuf:"bytes,2,rep,name=broker_partition_assignments,json=brokerPartitionAssignments,proto3" json:"broker_partition_assignments,omitempty"` + IsLeader bool `protobuf:"varint,3,opt,name=is_leader,json=isLeader,proto3" json:"is_leader,omitempty"` } func (x *AssignTopicPartitionsRequest) Reset() { *x = AssignTopicPartitionsRequest{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[18] + mi := &file_mq_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1036,7 +973,7 @@ func (x *AssignTopicPartitionsRequest) String() string { func (*AssignTopicPartitionsRequest) ProtoMessage() {} func (x *AssignTopicPartitionsRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[18] + mi := &file_mq_proto_msgTypes[17] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1049,7 +986,7 @@ func (x *AssignTopicPartitionsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use AssignTopicPartitionsRequest.ProtoReflect.Descriptor instead. func (*AssignTopicPartitionsRequest) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{18} + return file_mq_proto_rawDescGZIP(), []int{17} } func (x *AssignTopicPartitionsRequest) GetTopic() *Topic { @@ -1059,9 +996,9 @@ func (x *AssignTopicPartitionsRequest) GetTopic() *Topic { return nil } -func (x *AssignTopicPartitionsRequest) GetTopicPartitionsAssignment() *TopicPartitionsAssignment { +func (x *AssignTopicPartitionsRequest) GetBrokerPartitionAssignments() []*BrokerPartitionAssignment { if x != nil { - return x.TopicPartitionsAssignment + return x.BrokerPartitionAssignments } return nil } @@ -1082,7 +1019,7 @@ type AssignTopicPartitionsResponse struct { func (x *AssignTopicPartitionsResponse) Reset() { *x = AssignTopicPartitionsResponse{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[19] + mi := &file_mq_proto_msgTypes[18] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1095,7 +1032,7 @@ func (x *AssignTopicPartitionsResponse) String() string { func (*AssignTopicPartitionsResponse) ProtoMessage() {} func (x *AssignTopicPartitionsResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[19] + mi := &file_mq_proto_msgTypes[18] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1108,7 +1045,7 @@ func (x *AssignTopicPartitionsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use AssignTopicPartitionsResponse.ProtoReflect.Descriptor instead. func (*AssignTopicPartitionsResponse) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{19} + return file_mq_proto_rawDescGZIP(), []int{18} } type CheckTopicPartitionsStatusRequest struct { @@ -1116,16 +1053,16 @@ type CheckTopicPartitionsStatusRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` - Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` - BrokerPartitionsAssignment *BrokerPartitionsAssignment `protobuf:"bytes,3,opt,name=broker_partitions_assignment,json=brokerPartitionsAssignment,proto3" json:"broker_partitions_assignment,omitempty"` - ShouldCancelIfNotMatch bool `protobuf:"varint,4,opt,name=should_cancel_if_not_match,json=shouldCancelIfNotMatch,proto3" json:"should_cancel_if_not_match,omitempty"` + Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"` + Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` + BrokerPartitionAssignment *BrokerPartitionAssignment `protobuf:"bytes,3,opt,name=broker_partition_assignment,json=brokerPartitionAssignment,proto3" json:"broker_partition_assignment,omitempty"` + ShouldCancelIfNotMatch bool `protobuf:"varint,4,opt,name=should_cancel_if_not_match,json=shouldCancelIfNotMatch,proto3" json:"should_cancel_if_not_match,omitempty"` } func (x *CheckTopicPartitionsStatusRequest) Reset() { *x = CheckTopicPartitionsStatusRequest{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[20] + mi := &file_mq_proto_msgTypes[19] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1138,7 +1075,7 @@ func (x *CheckTopicPartitionsStatusRequest) String() string { func (*CheckTopicPartitionsStatusRequest) ProtoMessage() {} func (x *CheckTopicPartitionsStatusRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[20] + mi := &file_mq_proto_msgTypes[19] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1151,7 +1088,7 @@ func (x *CheckTopicPartitionsStatusRequest) ProtoReflect() protoreflect.Message // Deprecated: Use CheckTopicPartitionsStatusRequest.ProtoReflect.Descriptor instead. func (*CheckTopicPartitionsStatusRequest) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{20} + return file_mq_proto_rawDescGZIP(), []int{19} } func (x *CheckTopicPartitionsStatusRequest) GetNamespace() string { @@ -1168,9 +1105,9 @@ func (x *CheckTopicPartitionsStatusRequest) GetTopic() string { return "" } -func (x *CheckTopicPartitionsStatusRequest) GetBrokerPartitionsAssignment() *BrokerPartitionsAssignment { +func (x *CheckTopicPartitionsStatusRequest) GetBrokerPartitionAssignment() *BrokerPartitionAssignment { if x != nil { - return x.BrokerPartitionsAssignment + return x.BrokerPartitionAssignment } return nil } @@ -1187,13 +1124,13 @@ type CheckTopicPartitionsStatusResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - TopicPartitionsAssignment *TopicPartitionsAssignment `protobuf:"bytes,1,opt,name=topic_partitions_assignment,json=topicPartitionsAssignment,proto3" json:"topic_partitions_assignment,omitempty"` + BrokerPartitionAssignments []*BrokerPartitionAssignment `protobuf:"bytes,1,rep,name=broker_partition_assignments,json=brokerPartitionAssignments,proto3" json:"broker_partition_assignments,omitempty"` } func (x *CheckTopicPartitionsStatusResponse) Reset() { *x = CheckTopicPartitionsStatusResponse{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[21] + mi := &file_mq_proto_msgTypes[20] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1206,7 +1143,7 @@ func (x *CheckTopicPartitionsStatusResponse) String() string { func (*CheckTopicPartitionsStatusResponse) ProtoMessage() {} func (x *CheckTopicPartitionsStatusResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[21] + mi := &file_mq_proto_msgTypes[20] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1219,17 +1156,72 @@ func (x *CheckTopicPartitionsStatusResponse) ProtoReflect() protoreflect.Message // Deprecated: Use CheckTopicPartitionsStatusResponse.ProtoReflect.Descriptor instead. func (*CheckTopicPartitionsStatusResponse) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{21} + return file_mq_proto_rawDescGZIP(), []int{20} } -func (x *CheckTopicPartitionsStatusResponse) GetTopicPartitionsAssignment() *TopicPartitionsAssignment { +func (x *CheckTopicPartitionsStatusResponse) GetBrokerPartitionAssignments() []*BrokerPartitionAssignment { if x != nil { - return x.TopicPartitionsAssignment + return x.BrokerPartitionAssignments } return nil } // //////////////////////////////////////////////// +type DataMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *DataMessage) Reset() { + *x = DataMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[21] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DataMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DataMessage) ProtoMessage() {} + +func (x *DataMessage) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[21] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DataMessage.ProtoReflect.Descriptor instead. +func (*DataMessage) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{21} +} + +func (x *DataMessage) GetKey() []byte { + if x != nil { + return x.Key + } + return nil +} + +func (x *DataMessage) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + type PublishRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1289,7 +1281,7 @@ func (x *PublishRequest) GetInit() *PublishRequest_InitMessage { return nil } -func (x *PublishRequest) GetData() *PublishRequest_DataMessage { +func (x *PublishRequest) GetData() *DataMessage { if x, ok := x.GetMessage().(*PublishRequest_Data); ok { return x.Data } @@ -1312,7 +1304,7 @@ type PublishRequest_Init struct { } type PublishRequest_Data struct { - Data *PublishRequest_DataMessage `protobuf:"bytes,2,opt,name=data,proto3,oneof"` + Data *DataMessage `protobuf:"bytes,2,opt,name=data,proto3,oneof"` } func (*PublishRequest_Init) isPublishRequest_Message() {} @@ -1324,9 +1316,9 @@ type PublishResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - AckSequence int64 `protobuf:"varint,1,opt,name=ack_sequence,json=ackSequence,proto3" json:"ack_sequence,omitempty"` - Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` - IsClosed bool `protobuf:"varint,3,opt,name=is_closed,json=isClosed,proto3" json:"is_closed,omitempty"` + AckSequence int64 `protobuf:"varint,1,opt,name=ack_sequence,json=ackSequence,proto3" json:"ack_sequence,omitempty"` + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + RedirectToBroker string `protobuf:"bytes,3,opt,name=redirect_to_broker,json=redirectToBroker,proto3" json:"redirect_to_broker,omitempty"` } func (x *PublishResponse) Reset() { @@ -1375,25 +1367,163 @@ func (x *PublishResponse) GetError() string { return "" } -func (x *PublishResponse) GetIsClosed() bool { +func (x *PublishResponse) GetRedirectToBroker() string { if x != nil { - return x.IsClosed + return x.RedirectToBroker } - return false + return "" +} + +type SubscribeRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Consumer *SubscribeRequest_Consumer `protobuf:"bytes,1,opt,name=consumer,proto3" json:"consumer,omitempty"` + Cursor *SubscribeRequest_Cursor `protobuf:"bytes,2,opt,name=cursor,proto3" json:"cursor,omitempty"` +} + +func (x *SubscribeRequest) Reset() { + *x = SubscribeRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[24] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeRequest) ProtoMessage() {} + +func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[24] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead. +func (*SubscribeRequest) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{24} +} + +func (x *SubscribeRequest) GetConsumer() *SubscribeRequest_Consumer { + if x != nil { + return x.Consumer + } + return nil } +func (x *SubscribeRequest) GetCursor() *SubscribeRequest_Cursor { + if x != nil { + return x.Cursor + } + return nil +} + +type SubscribeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Message: + // + // *SubscribeResponse_Ctrl + // *SubscribeResponse_Data + Message isSubscribeResponse_Message `protobuf_oneof:"message"` +} + +func (x *SubscribeResponse) Reset() { + *x = SubscribeResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[25] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeResponse) ProtoMessage() {} + +func (x *SubscribeResponse) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[25] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead. +func (*SubscribeResponse) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{25} +} + +func (m *SubscribeResponse) GetMessage() isSubscribeResponse_Message { + if m != nil { + return m.Message + } + return nil +} + +func (x *SubscribeResponse) GetCtrl() *SubscribeResponse_CtrlMessage { + if x, ok := x.GetMessage().(*SubscribeResponse_Ctrl); ok { + return x.Ctrl + } + return nil +} + +func (x *SubscribeResponse) GetData() *DataMessage { + if x, ok := x.GetMessage().(*SubscribeResponse_Data); ok { + return x.Data + } + return nil +} + +type isSubscribeResponse_Message interface { + isSubscribeResponse_Message() +} + +type SubscribeResponse_Ctrl struct { + Ctrl *SubscribeResponse_CtrlMessage `protobuf:"bytes,1,opt,name=ctrl,proto3,oneof"` +} + +type SubscribeResponse_Data struct { + Data *DataMessage `protobuf:"bytes,2,opt,name=data,proto3,oneof"` +} + +func (*SubscribeResponse_Ctrl) isSubscribeResponse_Message() {} + +func (*SubscribeResponse_Data) isSubscribeResponse_Message() {} + type PublishRequest_InitMessage struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Segment *Segment `protobuf:"bytes,1,opt,name=segment,proto3" json:"segment,omitempty"` + Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` + AckInterval int32 `protobuf:"varint,3,opt,name=ack_interval,json=ackInterval,proto3" json:"ack_interval,omitempty"` } func (x *PublishRequest_InitMessage) Reset() { *x = PublishRequest_InitMessage{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[24] + mi := &file_mq_proto_msgTypes[26] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1406,7 +1536,7 @@ func (x *PublishRequest_InitMessage) String() string { func (*PublishRequest_InitMessage) ProtoMessage() {} func (x *PublishRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[24] + mi := &file_mq_proto_msgTypes[26] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1422,39 +1552,54 @@ func (*PublishRequest_InitMessage) Descriptor() ([]byte, []int) { return file_mq_proto_rawDescGZIP(), []int{22, 0} } -func (x *PublishRequest_InitMessage) GetSegment() *Segment { +func (x *PublishRequest_InitMessage) GetTopic() *Topic { if x != nil { - return x.Segment + return x.Topic } return nil } -type PublishRequest_DataMessage struct { +func (x *PublishRequest_InitMessage) GetPartition() *Partition { + if x != nil { + return x.Partition + } + return nil +} + +func (x *PublishRequest_InitMessage) GetAckInterval() int32 { + if x != nil { + return x.AckInterval + } + return 0 +} + +type SubscribeRequest_Consumer struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + ConsumerGroup string `protobuf:"bytes,1,opt,name=consumer_group,json=consumerGroup,proto3" json:"consumer_group,omitempty"` + ConsumerId string `protobuf:"bytes,2,opt,name=consumer_id,json=consumerId,proto3" json:"consumer_id,omitempty"` + ClientId string `protobuf:"bytes,3,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"` } -func (x *PublishRequest_DataMessage) Reset() { - *x = PublishRequest_DataMessage{} +func (x *SubscribeRequest_Consumer) Reset() { + *x = SubscribeRequest_Consumer{} if protoimpl.UnsafeEnabled { - mi := &file_mq_proto_msgTypes[25] + mi := &file_mq_proto_msgTypes[27] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } -func (x *PublishRequest_DataMessage) String() string { +func (x *SubscribeRequest_Consumer) String() string { return protoimpl.X.MessageStringOf(x) } -func (*PublishRequest_DataMessage) ProtoMessage() {} +func (*SubscribeRequest_Consumer) ProtoMessage() {} -func (x *PublishRequest_DataMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_proto_msgTypes[25] +func (x *SubscribeRequest_Consumer) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[27] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1465,25 +1610,192 @@ func (x *PublishRequest_DataMessage) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use PublishRequest_DataMessage.ProtoReflect.Descriptor instead. -func (*PublishRequest_DataMessage) Descriptor() ([]byte, []int) { - return file_mq_proto_rawDescGZIP(), []int{22, 1} +// Deprecated: Use SubscribeRequest_Consumer.ProtoReflect.Descriptor instead. +func (*SubscribeRequest_Consumer) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{24, 0} } -func (x *PublishRequest_DataMessage) GetKey() []byte { +func (x *SubscribeRequest_Consumer) GetConsumerGroup() string { if x != nil { - return x.Key + return x.ConsumerGroup + } + return "" +} + +func (x *SubscribeRequest_Consumer) GetConsumerId() string { + if x != nil { + return x.ConsumerId + } + return "" +} + +func (x *SubscribeRequest_Consumer) GetClientId() string { + if x != nil { + return x.ClientId + } + return "" +} + +type SubscribeRequest_Cursor struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` + // Types that are assignable to Offset: + // + // *SubscribeRequest_Cursor_StartOffset + // *SubscribeRequest_Cursor_StartTimestampNs + Offset isSubscribeRequest_Cursor_Offset `protobuf_oneof:"offset"` + Filter string `protobuf:"bytes,5,opt,name=filter,proto3" json:"filter,omitempty"` +} + +func (x *SubscribeRequest_Cursor) Reset() { + *x = SubscribeRequest_Cursor{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[28] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeRequest_Cursor) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeRequest_Cursor) ProtoMessage() {} + +func (x *SubscribeRequest_Cursor) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[28] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeRequest_Cursor.ProtoReflect.Descriptor instead. +func (*SubscribeRequest_Cursor) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{24, 1} +} + +func (x *SubscribeRequest_Cursor) GetTopic() *Topic { + if x != nil { + return x.Topic } return nil } -func (x *PublishRequest_DataMessage) GetValue() []byte { +func (x *SubscribeRequest_Cursor) GetPartition() *Partition { if x != nil { - return x.Value + return x.Partition } return nil } +func (m *SubscribeRequest_Cursor) GetOffset() isSubscribeRequest_Cursor_Offset { + if m != nil { + return m.Offset + } + return nil +} + +func (x *SubscribeRequest_Cursor) GetStartOffset() int64 { + if x, ok := x.GetOffset().(*SubscribeRequest_Cursor_StartOffset); ok { + return x.StartOffset + } + return 0 +} + +func (x *SubscribeRequest_Cursor) GetStartTimestampNs() int64 { + if x, ok := x.GetOffset().(*SubscribeRequest_Cursor_StartTimestampNs); ok { + return x.StartTimestampNs + } + return 0 +} + +func (x *SubscribeRequest_Cursor) GetFilter() string { + if x != nil { + return x.Filter + } + return "" +} + +type isSubscribeRequest_Cursor_Offset interface { + isSubscribeRequest_Cursor_Offset() +} + +type SubscribeRequest_Cursor_StartOffset struct { + StartOffset int64 `protobuf:"varint,3,opt,name=start_offset,json=startOffset,proto3,oneof"` +} + +type SubscribeRequest_Cursor_StartTimestampNs struct { + StartTimestampNs int64 `protobuf:"varint,4,opt,name=start_timestamp_ns,json=startTimestampNs,proto3,oneof"` +} + +func (*SubscribeRequest_Cursor_StartOffset) isSubscribeRequest_Cursor_Offset() {} + +func (*SubscribeRequest_Cursor_StartTimestampNs) isSubscribeRequest_Cursor_Offset() {} + +type SubscribeResponse_CtrlMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` + RedirectToBroker string `protobuf:"bytes,2,opt,name=redirect_to_broker,json=redirectToBroker,proto3" json:"redirect_to_broker,omitempty"` +} + +func (x *SubscribeResponse_CtrlMessage) Reset() { + *x = SubscribeResponse_CtrlMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_mq_proto_msgTypes[29] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeResponse_CtrlMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeResponse_CtrlMessage) ProtoMessage() {} + +func (x *SubscribeResponse_CtrlMessage) ProtoReflect() protoreflect.Message { + mi := &file_mq_proto_msgTypes[29] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeResponse_CtrlMessage.ProtoReflect.Descriptor instead. +func (*SubscribeResponse_CtrlMessage) Descriptor() ([]byte, []int) { + return file_mq_proto_rawDescGZIP(), []int{25, 0} +} + +func (x *SubscribeResponse_CtrlMessage) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *SubscribeResponse_CtrlMessage) GetRedirectToBroker() string { + if x != nil { + return x.RedirectToBroker + } + return "" +} + var File_mq_proto protoreflect.FileDescriptor var file_mq_proto_rawDesc = []byte{ @@ -1553,196 +1865,241 @@ var file_mq_proto_rawDesc = []byte{ 0x67, 0x65, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x03, 0x52, 0x0a, 0x62, 0x79, 0x74, 0x65, 0x73, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x6a, 0x0a, - 0x17, 0x46, 0x69, 0x6e, 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, - 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, - 0x70, 0x69, 0x63, 0x12, 0x24, 0x0a, 0x0e, 0x69, 0x73, 0x5f, 0x66, 0x6f, 0x72, 0x5f, 0x70, 0x75, - 0x62, 0x6c, 0x69, 0x73, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x73, 0x46, - 0x6f, 0x72, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x22, 0xae, 0x01, 0x0a, 0x18, 0x46, 0x69, - 0x6e, 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, + 0x03, 0x52, 0x0a, 0x62, 0x79, 0x74, 0x65, 0x73, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x6c, 0x0a, + 0x19, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, + 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x24, 0x0a, 0x0e, 0x69, 0x73, 0x5f, 0x66, 0x6f, 0x72, 0x5f, + 0x70, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, + 0x73, 0x46, 0x6f, 0x72, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x22, 0xb2, 0x01, 0x0a, 0x1a, + 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, + 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, + 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, + 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x69, 0x0a, 0x1c, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x5f, + 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, + 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x42, 0x72, 0x6f, 0x6b, 0x65, + 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, + 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x1a, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x73, + 0x22, 0xa2, 0x01, 0x0a, 0x19, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x35, + 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, + 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0d, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x5f, + 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x6c, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x12, 0x29, 0x0a, 0x10, 0x66, 0x6f, + 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x72, 0x5f, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x03, + 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x66, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x72, 0x42, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x22, 0x73, 0x0a, 0x1d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, - 0x63, 0x12, 0x67, 0x0a, 0x1b, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, - 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x52, - 0x19, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0xbc, 0x01, 0x0a, 0x1a, 0x42, - 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, - 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x27, 0x0a, 0x0f, 0x70, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, - 0x72, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, - 0x73, 0x74, 0x6f, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0d, 0x70, 0x61, 0x72, 0x74, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x6f, 0x70, 0x12, 0x23, 0x0a, 0x0d, 0x6c, 0x65, 0x61, - 0x64, 0x65, 0x72, 0x5f, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x0c, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x12, 0x29, - 0x0a, 0x10, 0x66, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x65, 0x72, 0x5f, 0x62, 0x72, 0x6f, 0x6b, 0x65, - 0x72, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x66, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, - 0x65, 0x72, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x22, 0x9b, 0x01, 0x0a, 0x19, 0x54, 0x6f, - 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, - 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x27, 0x0a, 0x0f, 0x70, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, - 0x52, 0x0e, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x75, 0x6e, 0x74, - 0x12, 0x55, 0x0a, 0x11, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x42, 0x72, 0x6f, 0x6b, 0x65, - 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, - 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x10, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x73, 0x0a, 0x1d, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, - 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, - 0x70, 0x69, 0x63, 0x12, 0x27, 0x0a, 0x0f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0e, 0x70, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x89, 0x01, 0x0a, - 0x1e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x67, 0x0a, 0x1b, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, - 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x19, 0x74, - 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, - 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0xcf, 0x01, 0x0a, 0x1c, 0x41, 0x73, 0x73, + 0x63, 0x12, 0x27, 0x0a, 0x0f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, + 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0e, 0x70, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x8b, 0x01, 0x0a, 0x1e, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x69, 0x0a, + 0x1c, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x5f, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x1a, 0x62, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x73, 0x73, + 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x22, 0xd1, 0x01, 0x0a, 0x1c, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, - 0x6f, 0x70, 0x69, 0x63, 0x12, 0x67, 0x0a, 0x1b, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x5f, 0x70, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, - 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, - 0x6e, 0x74, 0x52, 0x19, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x1b, 0x0a, - 0x09, 0x69, 0x73, 0x5f, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, - 0x52, 0x08, 0x69, 0x73, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x22, 0x1f, 0x0a, 0x1d, 0x41, 0x73, - 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xff, 0x01, 0x0a, 0x21, - 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, - 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, - 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x6a, 0x0a, 0x1c, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x5f, - 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x73, 0x73, 0x69, 0x67, - 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x42, 0x72, 0x6f, 0x6b, 0x65, - 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, - 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x1a, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, - 0x74, 0x12, 0x3a, 0x0a, 0x1a, 0x73, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x5f, 0x63, 0x61, 0x6e, 0x63, - 0x65, 0x6c, 0x5f, 0x69, 0x66, 0x5f, 0x6e, 0x6f, 0x74, 0x5f, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x16, 0x73, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x43, 0x61, 0x6e, - 0x63, 0x65, 0x6c, 0x49, 0x66, 0x4e, 0x6f, 0x74, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x22, 0x8d, 0x01, - 0x0a, 0x22, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x67, 0x0a, 0x1b, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x5f, 0x70, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, - 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, - 0x6e, 0x74, 0x52, 0x19, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0xae, 0x02, - 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x3e, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, - 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69, - 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74, - 0x12, 0x3e, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, - 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x44, 0x61, 0x74, - 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, - 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x1a, 0x3e, 0x0a, 0x0b, - 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x2f, 0x0a, 0x07, 0x73, - 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x65, 0x67, 0x6d, - 0x65, 0x6e, 0x74, 0x52, 0x07, 0x73, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x1a, 0x35, 0x0a, 0x0b, - 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b, - 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, - 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x67, - 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x6b, 0x5f, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x61, 0x63, 0x6b, 0x53, 0x65, 0x71, 0x75, - 0x65, 0x6e, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x73, - 0x5f, 0x63, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, - 0x73, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x64, 0x32, 0xd7, 0x07, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, - 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10, - 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, - 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, - 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x12, 0x6f, 0x0a, 0x14, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, - 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, - 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, - 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, - 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, - 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, - 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, - 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, - 0x0f, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, - 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, - 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, - 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, - 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, - 0x63, 0x0a, 0x10, 0x46, 0x69, 0x6e, 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x73, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, - 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x54, 0x6f, - 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x12, 0x75, 0x0a, 0x16, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, - 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2b, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x72, 0x0a, 0x15, 0x41, - 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, - 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, - 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, + 0x6f, 0x70, 0x69, 0x63, 0x12, 0x69, 0x0a, 0x1c, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x5f, 0x70, + 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, + 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, + 0x65, 0x6e, 0x74, 0x52, 0x1a, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x12, + 0x1b, 0x0a, 0x09, 0x69, 0x73, 0x5f, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x08, 0x69, 0x73, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x22, 0x1f, 0x0a, 0x1d, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, - 0x81, 0x01, 0x0a, 0x1a, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2f, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xfc, 0x01, + 0x0a, 0x21, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, + 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x67, 0x0a, 0x1b, 0x62, 0x72, 0x6f, 0x6b, 0x65, + 0x72, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x61, 0x73, 0x73, 0x69, + 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x42, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x73, 0x73, 0x69, 0x67, + 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x19, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, + 0x12, 0x3a, 0x0a, 0x1a, 0x73, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x5f, 0x63, 0x61, 0x6e, 0x63, 0x65, + 0x6c, 0x5f, 0x69, 0x66, 0x5f, 0x6e, 0x6f, 0x74, 0x5f, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x16, 0x73, 0x68, 0x6f, 0x75, 0x6c, 0x64, 0x43, 0x61, 0x6e, 0x63, + 0x65, 0x6c, 0x49, 0x66, 0x4e, 0x6f, 0x74, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x22, 0x8f, 0x01, 0x0a, + 0x22, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x69, 0x0a, 0x1c, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x5f, 0x70, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x61, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, + 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, + 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, + 0x6e, 0x74, 0x52, 0x1a, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x35, + 0x0a, 0x0b, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0xbd, 0x02, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, + 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3e, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, + 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x48, 0x00, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x12, 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, + 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, + 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x71, + 0x75, 0x65, 0x6e, 0x63, 0x65, 0x1a, 0x92, 0x01, 0x0a, 0x0b, 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, + 0x12, 0x35, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x6b, 0x5f, 0x69, + 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x61, + 0x63, 0x6b, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x78, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x6b, 0x5f, + 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, + 0x61, 0x63, 0x6b, 0x53, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, + 0x72, 0x12, 0x2c, 0x0a, 0x12, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5f, 0x74, 0x6f, + 0x5f, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x72, + 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x54, 0x6f, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x22, + 0xeb, 0x03, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x43, 0x0a, 0x08, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, + 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x52, + 0x08, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x06, 0x63, 0x75, 0x72, + 0x73, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, + 0x52, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x1a, 0x6f, 0x0a, 0x08, 0x43, 0x6f, 0x6e, 0x73, + 0x75, 0x6d, 0x65, 0x72, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, + 0x5f, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, + 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x1f, 0x0a, 0x0b, 0x63, + 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0a, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, + 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x1a, 0xe1, 0x01, 0x0a, 0x06, 0x43, 0x75, + 0x72, 0x73, 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, + 0x35, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0c, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, + 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x48, 0x00, 0x52, 0x0b, + 0x73, 0x74, 0x61, 0x72, 0x74, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x2e, 0x0a, 0x12, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x5f, 0x6e, + 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x48, 0x00, 0x52, 0x10, 0x73, 0x74, 0x61, 0x72, 0x74, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x4e, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x66, + 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x66, 0x69, 0x6c, + 0x74, 0x65, 0x72, 0x42, 0x08, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x22, 0xe5, 0x01, + 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, + 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x2e, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, + 0x52, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x12, 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, + 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x51, 0x0a, 0x0b, 0x43, 0x74, 0x72, 0x6c, 0x4d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x2c, 0x0a, 0x12, + 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5f, 0x74, 0x6f, 0x5f, 0x62, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, + 0x63, 0x74, 0x54, 0x6f, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xaf, 0x08, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, + 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69, + 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x25, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, + 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, + 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, + 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, + 0x6f, 0x0a, 0x14, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, + 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, + 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, + 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x12, 0x69, 0x0a, 0x12, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, + 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, + 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, + 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x0f, 0x43, + 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, - 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, - 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x1c, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, - 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, - 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, - 0x01, 0x42, 0x4e, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, - 0x71, 0x42, 0x10, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, - 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, - 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, - 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, + 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, + 0x12, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, + 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x75, 0x0a, 0x16, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x12, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x2c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, + 0x72, 0x0a, 0x15, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, + 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, + 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, + 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x12, 0x81, 0x01, 0x0a, 0x1a, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, + 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x12, 0x2f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, + 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, + 0x73, 0x68, 0x12, 0x1c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x1d, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, + 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x50, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x12, 0x1e, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x4e, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, + 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, 0x10, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x51, + 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, + 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, + 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1757,7 +2114,7 @@ func file_mq_proto_rawDescGZIP() []byte { return file_mq_proto_rawDescData } -var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 26) +var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 30) var file_mq_proto_goTypes = []interface{}{ (*SegmentInfo)(nil), // 0: messaging_pb.SegmentInfo (*FindBrokerLeaderRequest)(nil), // 1: messaging_pb.FindBrokerLeaderRequest @@ -1771,62 +2128,75 @@ var file_mq_proto_goTypes = []interface{}{ (*CheckSegmentStatusResponse)(nil), // 9: messaging_pb.CheckSegmentStatusResponse (*CheckBrokerLoadRequest)(nil), // 10: messaging_pb.CheckBrokerLoadRequest (*CheckBrokerLoadResponse)(nil), // 11: messaging_pb.CheckBrokerLoadResponse - (*FindTopicBrokersRequest)(nil), // 12: messaging_pb.FindTopicBrokersRequest - (*FindTopicBrokersResponse)(nil), // 13: messaging_pb.FindTopicBrokersResponse - (*BrokerPartitionsAssignment)(nil), // 14: messaging_pb.BrokerPartitionsAssignment - (*TopicPartitionsAssignment)(nil), // 15: messaging_pb.TopicPartitionsAssignment - (*RequestTopicPartitionsRequest)(nil), // 16: messaging_pb.RequestTopicPartitionsRequest - (*RequestTopicPartitionsResponse)(nil), // 17: messaging_pb.RequestTopicPartitionsResponse - (*AssignTopicPartitionsRequest)(nil), // 18: messaging_pb.AssignTopicPartitionsRequest - (*AssignTopicPartitionsResponse)(nil), // 19: messaging_pb.AssignTopicPartitionsResponse - (*CheckTopicPartitionsStatusRequest)(nil), // 20: messaging_pb.CheckTopicPartitionsStatusRequest - (*CheckTopicPartitionsStatusResponse)(nil), // 21: messaging_pb.CheckTopicPartitionsStatusResponse + (*LookupTopicBrokersRequest)(nil), // 12: messaging_pb.LookupTopicBrokersRequest + (*LookupTopicBrokersResponse)(nil), // 13: messaging_pb.LookupTopicBrokersResponse + (*BrokerPartitionAssignment)(nil), // 14: messaging_pb.BrokerPartitionAssignment + (*RequestTopicPartitionsRequest)(nil), // 15: messaging_pb.RequestTopicPartitionsRequest + (*RequestTopicPartitionsResponse)(nil), // 16: messaging_pb.RequestTopicPartitionsResponse + (*AssignTopicPartitionsRequest)(nil), // 17: messaging_pb.AssignTopicPartitionsRequest + (*AssignTopicPartitionsResponse)(nil), // 18: messaging_pb.AssignTopicPartitionsResponse + (*CheckTopicPartitionsStatusRequest)(nil), // 19: messaging_pb.CheckTopicPartitionsStatusRequest + (*CheckTopicPartitionsStatusResponse)(nil), // 20: messaging_pb.CheckTopicPartitionsStatusResponse + (*DataMessage)(nil), // 21: messaging_pb.DataMessage (*PublishRequest)(nil), // 22: messaging_pb.PublishRequest (*PublishResponse)(nil), // 23: messaging_pb.PublishResponse - (*PublishRequest_InitMessage)(nil), // 24: messaging_pb.PublishRequest.InitMessage - (*PublishRequest_DataMessage)(nil), // 25: messaging_pb.PublishRequest.DataMessage + (*SubscribeRequest)(nil), // 24: messaging_pb.SubscribeRequest + (*SubscribeResponse)(nil), // 25: messaging_pb.SubscribeResponse + (*PublishRequest_InitMessage)(nil), // 26: messaging_pb.PublishRequest.InitMessage + (*SubscribeRequest_Consumer)(nil), // 27: messaging_pb.SubscribeRequest.Consumer + (*SubscribeRequest_Cursor)(nil), // 28: messaging_pb.SubscribeRequest.Cursor + (*SubscribeResponse_CtrlMessage)(nil), // 29: messaging_pb.SubscribeResponse.CtrlMessage } var file_mq_proto_depIdxs = []int32{ 5, // 0: messaging_pb.SegmentInfo.segment:type_name -> messaging_pb.Segment 4, // 1: messaging_pb.Segment.partition:type_name -> messaging_pb.Partition 5, // 2: messaging_pb.AssignSegmentBrokersRequest.segment:type_name -> messaging_pb.Segment 5, // 3: messaging_pb.CheckSegmentStatusRequest.segment:type_name -> messaging_pb.Segment - 3, // 4: messaging_pb.FindTopicBrokersRequest.topic:type_name -> messaging_pb.Topic - 3, // 5: messaging_pb.FindTopicBrokersResponse.topic:type_name -> messaging_pb.Topic - 15, // 6: messaging_pb.FindTopicBrokersResponse.topic_partitions_assignment:type_name -> messaging_pb.TopicPartitionsAssignment - 14, // 7: messaging_pb.TopicPartitionsAssignment.broker_partitions:type_name -> messaging_pb.BrokerPartitionsAssignment + 3, // 4: messaging_pb.LookupTopicBrokersRequest.topic:type_name -> messaging_pb.Topic + 3, // 5: messaging_pb.LookupTopicBrokersResponse.topic:type_name -> messaging_pb.Topic + 14, // 6: messaging_pb.LookupTopicBrokersResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment + 4, // 7: messaging_pb.BrokerPartitionAssignment.partition:type_name -> messaging_pb.Partition 3, // 8: messaging_pb.RequestTopicPartitionsRequest.topic:type_name -> messaging_pb.Topic - 15, // 9: messaging_pb.RequestTopicPartitionsResponse.topic_partitions_assignment:type_name -> messaging_pb.TopicPartitionsAssignment + 14, // 9: messaging_pb.RequestTopicPartitionsResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment 3, // 10: messaging_pb.AssignTopicPartitionsRequest.topic:type_name -> messaging_pb.Topic - 15, // 11: messaging_pb.AssignTopicPartitionsRequest.topic_partitions_assignment:type_name -> messaging_pb.TopicPartitionsAssignment - 14, // 12: messaging_pb.CheckTopicPartitionsStatusRequest.broker_partitions_assignment:type_name -> messaging_pb.BrokerPartitionsAssignment - 15, // 13: messaging_pb.CheckTopicPartitionsStatusResponse.topic_partitions_assignment:type_name -> messaging_pb.TopicPartitionsAssignment - 24, // 14: messaging_pb.PublishRequest.init:type_name -> messaging_pb.PublishRequest.InitMessage - 25, // 15: messaging_pb.PublishRequest.data:type_name -> messaging_pb.PublishRequest.DataMessage - 5, // 16: messaging_pb.PublishRequest.InitMessage.segment:type_name -> messaging_pb.Segment - 1, // 17: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest - 6, // 18: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest - 8, // 19: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest - 10, // 20: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest - 12, // 21: messaging_pb.SeaweedMessaging.FindTopicBrokers:input_type -> messaging_pb.FindTopicBrokersRequest - 16, // 22: messaging_pb.SeaweedMessaging.RequestTopicPartitions:input_type -> messaging_pb.RequestTopicPartitionsRequest - 18, // 23: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest - 20, // 24: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:input_type -> messaging_pb.CheckTopicPartitionsStatusRequest - 22, // 25: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest - 2, // 26: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse - 7, // 27: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse - 9, // 28: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse - 11, // 29: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse - 13, // 30: messaging_pb.SeaweedMessaging.FindTopicBrokers:output_type -> messaging_pb.FindTopicBrokersResponse - 17, // 31: messaging_pb.SeaweedMessaging.RequestTopicPartitions:output_type -> messaging_pb.RequestTopicPartitionsResponse - 19, // 32: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse - 21, // 33: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:output_type -> messaging_pb.CheckTopicPartitionsStatusResponse - 23, // 34: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse - 26, // [26:35] is the sub-list for method output_type - 17, // [17:26] is the sub-list for method input_type - 17, // [17:17] is the sub-list for extension type_name - 17, // [17:17] is the sub-list for extension extendee - 0, // [0:17] is the sub-list for field type_name + 14, // 11: messaging_pb.AssignTopicPartitionsRequest.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment + 14, // 12: messaging_pb.CheckTopicPartitionsStatusRequest.broker_partition_assignment:type_name -> messaging_pb.BrokerPartitionAssignment + 14, // 13: messaging_pb.CheckTopicPartitionsStatusResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment + 26, // 14: messaging_pb.PublishRequest.init:type_name -> messaging_pb.PublishRequest.InitMessage + 21, // 15: messaging_pb.PublishRequest.data:type_name -> messaging_pb.DataMessage + 27, // 16: messaging_pb.SubscribeRequest.consumer:type_name -> messaging_pb.SubscribeRequest.Consumer + 28, // 17: messaging_pb.SubscribeRequest.cursor:type_name -> messaging_pb.SubscribeRequest.Cursor + 29, // 18: messaging_pb.SubscribeResponse.ctrl:type_name -> messaging_pb.SubscribeResponse.CtrlMessage + 21, // 19: messaging_pb.SubscribeResponse.data:type_name -> messaging_pb.DataMessage + 3, // 20: messaging_pb.PublishRequest.InitMessage.topic:type_name -> messaging_pb.Topic + 4, // 21: messaging_pb.PublishRequest.InitMessage.partition:type_name -> messaging_pb.Partition + 3, // 22: messaging_pb.SubscribeRequest.Cursor.topic:type_name -> messaging_pb.Topic + 4, // 23: messaging_pb.SubscribeRequest.Cursor.partition:type_name -> messaging_pb.Partition + 1, // 24: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest + 6, // 25: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest + 8, // 26: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest + 10, // 27: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest + 12, // 28: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest + 15, // 29: messaging_pb.SeaweedMessaging.RequestTopicPartitions:input_type -> messaging_pb.RequestTopicPartitionsRequest + 17, // 30: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest + 19, // 31: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:input_type -> messaging_pb.CheckTopicPartitionsStatusRequest + 22, // 32: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest + 24, // 33: messaging_pb.SeaweedMessaging.Subscribe:input_type -> messaging_pb.SubscribeRequest + 2, // 34: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse + 7, // 35: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse + 9, // 36: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse + 11, // 37: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse + 13, // 38: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse + 16, // 39: messaging_pb.SeaweedMessaging.RequestTopicPartitions:output_type -> messaging_pb.RequestTopicPartitionsResponse + 18, // 40: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse + 20, // 41: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:output_type -> messaging_pb.CheckTopicPartitionsStatusResponse + 23, // 42: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse + 25, // 43: messaging_pb.SeaweedMessaging.Subscribe:output_type -> messaging_pb.SubscribeResponse + 34, // [34:44] is the sub-list for method output_type + 24, // [24:34] is the sub-list for method input_type + 24, // [24:24] is the sub-list for extension type_name + 24, // [24:24] is the sub-list for extension extendee + 0, // [0:24] is the sub-list for field type_name } func init() { file_mq_proto_init() } @@ -1980,7 +2350,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*FindTopicBrokersRequest); i { + switch v := v.(*LookupTopicBrokersRequest); i { case 0: return &v.state case 1: @@ -1992,7 +2362,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*FindTopicBrokersResponse); i { + switch v := v.(*LookupTopicBrokersResponse); i { case 0: return &v.state case 1: @@ -2004,7 +2374,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*BrokerPartitionsAssignment); i { + switch v := v.(*BrokerPartitionAssignment); i { case 0: return &v.state case 1: @@ -2016,7 +2386,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*TopicPartitionsAssignment); i { + switch v := v.(*RequestTopicPartitionsRequest); i { case 0: return &v.state case 1: @@ -2028,7 +2398,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*RequestTopicPartitionsRequest); i { + switch v := v.(*RequestTopicPartitionsResponse); i { case 0: return &v.state case 1: @@ -2040,7 +2410,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[17].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*RequestTopicPartitionsResponse); i { + switch v := v.(*AssignTopicPartitionsRequest); i { case 0: return &v.state case 1: @@ -2052,7 +2422,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[18].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*AssignTopicPartitionsRequest); i { + switch v := v.(*AssignTopicPartitionsResponse); i { case 0: return &v.state case 1: @@ -2064,7 +2434,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[19].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*AssignTopicPartitionsResponse); i { + switch v := v.(*CheckTopicPartitionsStatusRequest); i { case 0: return &v.state case 1: @@ -2076,7 +2446,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[20].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CheckTopicPartitionsStatusRequest); i { + switch v := v.(*CheckTopicPartitionsStatusResponse); i { case 0: return &v.state case 1: @@ -2088,7 +2458,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[21].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CheckTopicPartitionsStatusResponse); i { + switch v := v.(*DataMessage); i { case 0: return &v.state case 1: @@ -2124,7 +2494,7 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[24].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PublishRequest_InitMessage); i { + switch v := v.(*SubscribeRequest); i { case 0: return &v.state case 1: @@ -2136,7 +2506,55 @@ func file_mq_proto_init() { } } file_mq_proto_msgTypes[25].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*PublishRequest_DataMessage); i { + switch v := v.(*SubscribeResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[26].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PublishRequest_InitMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[27].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeRequest_Consumer); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[28].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeRequest_Cursor); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mq_proto_msgTypes[29].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeResponse_CtrlMessage); i { case 0: return &v.state case 1: @@ -2152,13 +2570,21 @@ func file_mq_proto_init() { (*PublishRequest_Init)(nil), (*PublishRequest_Data)(nil), } + file_mq_proto_msgTypes[25].OneofWrappers = []interface{}{ + (*SubscribeResponse_Ctrl)(nil), + (*SubscribeResponse_Data)(nil), + } + file_mq_proto_msgTypes[28].OneofWrappers = []interface{}{ + (*SubscribeRequest_Cursor_StartOffset)(nil), + (*SubscribeRequest_Cursor_StartTimestampNs)(nil), + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_mq_proto_rawDesc, NumEnums: 0, - NumMessages: 26, + NumMessages: 30, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/pb/mq_pb/mq_grpc.pb.go b/weed/pb/mq_pb/mq_grpc.pb.go index c60ecbe5b..75e93f832 100644 --- a/weed/pb/mq_pb/mq_grpc.pb.go +++ b/weed/pb/mq_pb/mq_grpc.pb.go @@ -28,13 +28,14 @@ type SeaweedMessagingClient interface { CheckSegmentStatus(ctx context.Context, in *CheckSegmentStatusRequest, opts ...grpc.CallOption) (*CheckSegmentStatusResponse, error) CheckBrokerLoad(ctx context.Context, in *CheckBrokerLoadRequest, opts ...grpc.CallOption) (*CheckBrokerLoadResponse, error) // control plane for topic partitions - FindTopicBrokers(ctx context.Context, in *FindTopicBrokersRequest, opts ...grpc.CallOption) (*FindTopicBrokersResponse, error) + LookupTopicBrokers(ctx context.Context, in *LookupTopicBrokersRequest, opts ...grpc.CallOption) (*LookupTopicBrokersResponse, error) // a pub client will call this to get the topic partitions assignment RequestTopicPartitions(ctx context.Context, in *RequestTopicPartitionsRequest, opts ...grpc.CallOption) (*RequestTopicPartitionsResponse, error) AssignTopicPartitions(ctx context.Context, in *AssignTopicPartitionsRequest, opts ...grpc.CallOption) (*AssignTopicPartitionsResponse, error) CheckTopicPartitionsStatus(ctx context.Context, in *CheckTopicPartitionsStatusRequest, opts ...grpc.CallOption) (*CheckTopicPartitionsStatusResponse, error) // data plane Publish(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishClient, error) + Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeClient, error) } type seaweedMessagingClient struct { @@ -81,9 +82,9 @@ func (c *seaweedMessagingClient) CheckBrokerLoad(ctx context.Context, in *CheckB return out, nil } -func (c *seaweedMessagingClient) FindTopicBrokers(ctx context.Context, in *FindTopicBrokersRequest, opts ...grpc.CallOption) (*FindTopicBrokersResponse, error) { - out := new(FindTopicBrokersResponse) - err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/FindTopicBrokers", in, out, opts...) +func (c *seaweedMessagingClient) LookupTopicBrokers(ctx context.Context, in *LookupTopicBrokersRequest, opts ...grpc.CallOption) (*LookupTopicBrokersResponse, error) { + out := new(LookupTopicBrokersResponse) + err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/LookupTopicBrokers", in, out, opts...) if err != nil { return nil, err } @@ -148,6 +149,38 @@ func (x *seaweedMessagingPublishClient) Recv() (*PublishResponse, error) { return m, nil } +func (c *seaweedMessagingClient) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeClient, error) { + stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[1], "/messaging_pb.SeaweedMessaging/Subscribe", opts...) + if err != nil { + return nil, err + } + x := &seaweedMessagingSubscribeClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type SeaweedMessaging_SubscribeClient interface { + Recv() (*SubscribeResponse, error) + grpc.ClientStream +} + +type seaweedMessagingSubscribeClient struct { + grpc.ClientStream +} + +func (x *seaweedMessagingSubscribeClient) Recv() (*SubscribeResponse, error) { + m := new(SubscribeResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // SeaweedMessagingServer is the server API for SeaweedMessaging service. // All implementations must embed UnimplementedSeaweedMessagingServer // for forward compatibility @@ -158,13 +191,14 @@ type SeaweedMessagingServer interface { CheckSegmentStatus(context.Context, *CheckSegmentStatusRequest) (*CheckSegmentStatusResponse, error) CheckBrokerLoad(context.Context, *CheckBrokerLoadRequest) (*CheckBrokerLoadResponse, error) // control plane for topic partitions - FindTopicBrokers(context.Context, *FindTopicBrokersRequest) (*FindTopicBrokersResponse, error) + LookupTopicBrokers(context.Context, *LookupTopicBrokersRequest) (*LookupTopicBrokersResponse, error) // a pub client will call this to get the topic partitions assignment RequestTopicPartitions(context.Context, *RequestTopicPartitionsRequest) (*RequestTopicPartitionsResponse, error) AssignTopicPartitions(context.Context, *AssignTopicPartitionsRequest) (*AssignTopicPartitionsResponse, error) CheckTopicPartitionsStatus(context.Context, *CheckTopicPartitionsStatusRequest) (*CheckTopicPartitionsStatusResponse, error) // data plane Publish(SeaweedMessaging_PublishServer) error + Subscribe(*SubscribeRequest, SeaweedMessaging_SubscribeServer) error mustEmbedUnimplementedSeaweedMessagingServer() } @@ -184,8 +218,8 @@ func (UnimplementedSeaweedMessagingServer) CheckSegmentStatus(context.Context, * func (UnimplementedSeaweedMessagingServer) CheckBrokerLoad(context.Context, *CheckBrokerLoadRequest) (*CheckBrokerLoadResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method CheckBrokerLoad not implemented") } -func (UnimplementedSeaweedMessagingServer) FindTopicBrokers(context.Context, *FindTopicBrokersRequest) (*FindTopicBrokersResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method FindTopicBrokers not implemented") +func (UnimplementedSeaweedMessagingServer) LookupTopicBrokers(context.Context, *LookupTopicBrokersRequest) (*LookupTopicBrokersResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method LookupTopicBrokers not implemented") } func (UnimplementedSeaweedMessagingServer) RequestTopicPartitions(context.Context, *RequestTopicPartitionsRequest) (*RequestTopicPartitionsResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method RequestTopicPartitions not implemented") @@ -199,6 +233,9 @@ func (UnimplementedSeaweedMessagingServer) CheckTopicPartitionsStatus(context.Co func (UnimplementedSeaweedMessagingServer) Publish(SeaweedMessaging_PublishServer) error { return status.Errorf(codes.Unimplemented, "method Publish not implemented") } +func (UnimplementedSeaweedMessagingServer) Subscribe(*SubscribeRequest, SeaweedMessaging_SubscribeServer) error { + return status.Errorf(codes.Unimplemented, "method Subscribe not implemented") +} func (UnimplementedSeaweedMessagingServer) mustEmbedUnimplementedSeaweedMessagingServer() {} // UnsafeSeaweedMessagingServer may be embedded to opt out of forward compatibility for this service. @@ -284,20 +321,20 @@ func _SeaweedMessaging_CheckBrokerLoad_Handler(srv interface{}, ctx context.Cont return interceptor(ctx, in, info, handler) } -func _SeaweedMessaging_FindTopicBrokers_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(FindTopicBrokersRequest) +func _SeaweedMessaging_LookupTopicBrokers_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(LookupTopicBrokersRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(SeaweedMessagingServer).FindTopicBrokers(ctx, in) + return srv.(SeaweedMessagingServer).LookupTopicBrokers(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/messaging_pb.SeaweedMessaging/FindTopicBrokers", + FullMethod: "/messaging_pb.SeaweedMessaging/LookupTopicBrokers", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SeaweedMessagingServer).FindTopicBrokers(ctx, req.(*FindTopicBrokersRequest)) + return srv.(SeaweedMessagingServer).LookupTopicBrokers(ctx, req.(*LookupTopicBrokersRequest)) } return interceptor(ctx, in, info, handler) } @@ -382,6 +419,27 @@ func (x *seaweedMessagingPublishServer) Recv() (*PublishRequest, error) { return m, nil } +func _SeaweedMessaging_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SubscribeRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(SeaweedMessagingServer).Subscribe(m, &seaweedMessagingSubscribeServer{stream}) +} + +type SeaweedMessaging_SubscribeServer interface { + Send(*SubscribeResponse) error + grpc.ServerStream +} + +type seaweedMessagingSubscribeServer struct { + grpc.ServerStream +} + +func (x *seaweedMessagingSubscribeServer) Send(m *SubscribeResponse) error { + return x.ServerStream.SendMsg(m) +} + // SeaweedMessaging_ServiceDesc is the grpc.ServiceDesc for SeaweedMessaging service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -406,8 +464,8 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{ Handler: _SeaweedMessaging_CheckBrokerLoad_Handler, }, { - MethodName: "FindTopicBrokers", - Handler: _SeaweedMessaging_FindTopicBrokers_Handler, + MethodName: "LookupTopicBrokers", + Handler: _SeaweedMessaging_LookupTopicBrokers_Handler, }, { MethodName: "RequestTopicPartitions", @@ -429,6 +487,11 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{ ServerStreams: true, ClientStreams: true, }, + { + StreamName: "Subscribe", + Handler: _SeaweedMessaging_Subscribe_Handler, + ServerStreams: true, + }, }, Metadata: "mq.proto", } |
