aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_grpc_pub.go1
-rw-r--r--weed/mq/broker/broker_grpc_sub.go10
-rw-r--r--weed/mq/client/cmd/weed_sub/subscriber.go56
-rw-r--r--weed/mq/client/pub_client/lookup.go8
-rw-r--r--weed/mq/client/pub_client/publisher.go4
-rw-r--r--weed/mq/client/sub_client/lookup.go54
-rw-r--r--weed/mq/client/sub_client/subscribe.go81
-rw-r--r--weed/mq/client/sub_client/subscriber.go36
-rw-r--r--weed/mq/topic/local_manager.go1
-rw-r--r--weed/pb/mq.proto14
-rw-r--r--weed/pb/mq_pb/mq.pb.go441
11 files changed, 433 insertions, 273 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index d8f33c2a5..79393332f 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -85,6 +85,7 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p)
if localTopicPartition == nil {
localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
+ broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
}
} else {
response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 8a673009d..9a7e53ca1 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -11,20 +11,22 @@ import (
func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error {
- localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.Init.Topic),
- topic.FromPbPartition(req.Init.Partition))
+ localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.Cursor.Topic),
+ topic.FromPbPartition(req.Cursor.Partition))
if localTopicPartition == nil {
stream.Send(&mq_pb.SubscribeResponse{
Message: &mq_pb.SubscribeResponse_Ctrl{
Ctrl: &mq_pb.SubscribeResponse_CtrlMessage{
- Error: "not found",
+ Error: "not initialized",
},
},
})
return nil
}
- localTopicPartition.Subscribe("client", time.Now(), func(logEntry *filer_pb.LogEntry) error {
+ clientName := fmt.Sprintf("%s/%s", req.Consumer.ConsumerGroup, req.Consumer.ConsumerId)
+
+ localTopicPartition.Subscribe(clientName, time.Now(), func(logEntry *filer_pb.LogEntry) error {
value := logEntry.GetData()
if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{
Data: &mq_pb.DataMessage{
diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go
index b7ae2fe10..529d09a4d 100644
--- a/weed/mq/client/cmd/weed_sub/subscriber.go
+++ b/weed/mq/client/cmd/weed_sub/subscriber.go
@@ -1,50 +1,30 @@
package main
import (
- "context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
+ "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
)
func main() {
- err := pb.WithBrokerGrpcClient(true,
- "localhost:17777",
- grpc.WithTransportCredentials(insecure.NewCredentials()),
- func(client mq_pb.SeaweedMessagingClient) error {
- subClient, err := client.Subscribe(context.Background(), &mq_pb.SubscribeRequest{
- Init: &mq_pb.SubscribeRequest_InitMessage{
- Topic: &mq_pb.Topic{
- Namespace: "test",
- Name: "test",
- },
- },
- })
- if err != nil {
- return err
- }
- for {
- resp, err := subClient.Recv()
- if err != nil {
- return err
- }
- if resp.GetCtrl() != nil {
- if resp.GetCtrl().Error != "" {
- return fmt.Errorf("ctrl error: %v", resp.GetCtrl().Error)
- }
- }
- if resp.GetData() != nil {
- println(string(resp.GetData().Key), "=>", string(resp.GetData().Value))
- }
-
- }
- return nil
- })
+ subscriber := sub_client.NewTopicSubscriber(
+ &sub_client.SubscriberConfiguration{
+ ConsumerGroup: "test",
+ ConsumerId: "test",
+ },
+ "test", "test")
+ if err := subscriber.Connect("localhost:17777"); err != nil {
+ fmt.Println(err)
+ return
+ }
- if err != nil {
+ if err := subscriber.Subscribe(func(key, value []byte) bool {
+ println(string(key), "=>", string(value))
+ return true
+ }, func() {
+ println("done subscribing")
+ }); err != nil {
fmt.Println(err)
}
+
}
diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go
index c54b2687d..5a9376ab1 100644
--- a/weed/mq/client/pub_client/lookup.go
+++ b/weed/mq/client/pub_client/lookup.go
@@ -5,14 +5,12 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc"
)
-func (p *TopicPublisher) doLookup(
- brokerAddress string, grpcDialOption grpc.DialOption) error {
+func (p *TopicPublisher) doLookup(brokerAddress string) error {
err := pb.WithBrokerGrpcClient(true,
brokerAddress,
- grpcDialOption,
+ p.grpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error {
lookupResp, err := client.LookupTopicBrokers(context.Background(),
&mq_pb.LookupTopicBrokersRequest{
@@ -36,7 +34,7 @@ func (p *TopicPublisher) doLookup(
// send init message
// save the publishing client
brokerAddress := brokerPartitionAssignment.LeaderBroker
- grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, grpcDialOption)
+ grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
if err != nil {
return fmt.Errorf("dial broker %s: %v", brokerAddress, err)
}
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index 171b5ebd7..5963838ce 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -15,6 +15,7 @@ type TopicPublisher struct {
topic string
partition2Broker *interval.SearchTree[string, int32]
broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient]
+ grpcDialOption grpc.DialOption
}
func NewTopicPublisher(namespace, topic string) *TopicPublisher {
@@ -25,11 +26,12 @@ func NewTopicPublisher(namespace, topic string) *TopicPublisher {
return int(a - b)
}),
broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](),
+ grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
}
}
func (p *TopicPublisher) Connect(bootstrapBroker string) error {
- if err := p.doLookup(bootstrapBroker, grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
+ if err := p.doLookup(bootstrapBroker); err != nil {
return err
}
return nil
diff --git a/weed/mq/client/sub_client/lookup.go b/weed/mq/client/sub_client/lookup.go
index 89d3d2c45..e836c4864 100644
--- a/weed/mq/client/sub_client/lookup.go
+++ b/weed/mq/client/sub_client/lookup.go
@@ -5,70 +5,30 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc"
)
-func (p *TopicSubscriber) doLookup(
- brokerAddress string, grpcDialOption grpc.DialOption) error {
+func (sub *TopicSubscriber) doLookup(brokerAddress string) error {
err := pb.WithBrokerGrpcClient(true,
brokerAddress,
- grpcDialOption,
+ sub.grpcDialOption,
func(client mq_pb.SeaweedMessagingClient) error {
lookupResp, err := client.LookupTopicBrokers(context.Background(),
&mq_pb.LookupTopicBrokersRequest{
Topic: &mq_pb.Topic{
- Namespace: p.namespace,
- Name: p.topic,
+ Namespace: sub.namespace,
+ Name: sub.topic,
},
- IsForPublish: true,
+ IsForPublish: false,
})
if err != nil {
return err
}
- for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments {
- // partition => broker
- p.partition2Broker.Insert(
- brokerPartitionAssignment.Partition.RangeStart,
- brokerPartitionAssignment.Partition.RangeStop,
- brokerPartitionAssignment.LeaderBroker)
-
- // broker => publish client
- // send init message
- // save the publishing client
- brokerAddress := brokerPartitionAssignment.LeaderBroker
- grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, grpcDialOption)
- if err != nil {
- return fmt.Errorf("dial broker %s: %v", brokerAddress, err)
- }
- brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
- publishClient, err := brokerClient.Publish(context.Background())
- if err != nil {
- return fmt.Errorf("create publish client: %v", err)
- }
- p.broker2PublishClient.Set(brokerAddress, publishClient)
- if err = publishClient.Send(&mq_pb.PublishRequest{
- Message: &mq_pb.PublishRequest_Init{
- Init: &mq_pb.PublishRequest_InitMessage{
- Topic: &mq_pb.Topic{
- Namespace: p.namespace,
- Name: p.topic,
- },
- Partition: &mq_pb.Partition{
- RingSize: brokerPartitionAssignment.Partition.RingSize,
- RangeStart: brokerPartitionAssignment.Partition.RangeStart,
- RangeStop: brokerPartitionAssignment.Partition.RangeStop,
- },
- },
- },
- }); err != nil {
- return fmt.Errorf("send init message: %v", err)
- }
- }
+ sub.brokerPartitionAssignments = lookupResp.BrokerPartitionAssignments
return nil
})
if err != nil {
- return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err)
+ return fmt.Errorf("lookup topic %s/%s: %v", sub.namespace, sub.topic, err)
}
return nil
}
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index 158c93010..622b88828 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -1,28 +1,71 @@
package sub_client
import (
- cmap "github.com/orcaman/concurrent-map"
- "github.com/rdleal/intervalst/interval"
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "sync"
)
-type SubscriberConfiguration struct {
-}
-
-type TopicSubscriber struct {
- namespace string
- topic string
- partition2Broker *interval.SearchTree[string, int32]
- broker2PublishClient cmap.ConcurrentMap[string, mq_pb.SeaweedMessaging_PublishClient]
-}
+type EachMessageFunc func(key, value []byte) (shouldContinue bool)
+type FinalFunc func()
-func NewTopicSubscriber(config *SubscriberConfiguration, namespace, topic string) *TopicSubscriber {
- return &TopicSubscriber{
- namespace: namespace,
- topic: topic,
- partition2Broker: interval.NewSearchTree[string](func(a, b int32) int {
- return int(a - b)
- }),
- broker2PublishClient: cmap.New[mq_pb.SeaweedMessaging_PublishClient](),
+func (sub *TopicSubscriber) Subscribe(eachMessageFn EachMessageFunc, finalFn FinalFunc) error {
+ var wg sync.WaitGroup
+ for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments {
+ brokerAddress := brokerPartitionAssignment.LeaderBroker
+ grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.grpcDialOption)
+ if err != nil {
+ return fmt.Errorf("dial broker %s: %v", brokerAddress, err)
+ }
+ brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
+ subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{
+ Consumer: &mq_pb.SubscribeRequest_Consumer{
+ ConsumerGroup: sub.config.ConsumerGroup,
+ ConsumerId: sub.config.ConsumerId,
+ },
+ Cursor: &mq_pb.SubscribeRequest_Cursor{
+ Topic: &mq_pb.Topic{
+ Namespace: sub.namespace,
+ Name: sub.topic,
+ },
+ Partition: &mq_pb.Partition{
+ RingSize: brokerPartitionAssignment.Partition.RingSize,
+ RangeStart: brokerPartitionAssignment.Partition.RangeStart,
+ RangeStop: brokerPartitionAssignment.Partition.RangeStop,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("create subscribe client: %v", err)
+ }
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if finalFn != nil {
+ defer finalFn()
+ }
+ for {
+ resp, err := subscribeClient.Recv()
+ if err != nil {
+ fmt.Printf("subscribe error: %v\n", err)
+ return
+ }
+ if resp.Message == nil {
+ continue
+ }
+ switch m := resp.Message.(type) {
+ case *mq_pb.SubscribeResponse_Data:
+ if !eachMessageFn(m.Data.Key, m.Data.Value) {
+ return
+ }
+ case *mq_pb.SubscribeResponse_Ctrl:
+ // ignore
+ }
+ }
+ }()
}
+ wg.Wait()
+ return nil
}
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
new file mode 100644
index 000000000..a193730b0
--- /dev/null
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -0,0 +1,36 @@
+package sub_client
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+type SubscriberConfiguration struct {
+ ConsumerGroup string
+ ConsumerId string
+}
+
+type TopicSubscriber struct {
+ config *SubscriberConfiguration
+ namespace string
+ topic string
+ brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
+ grpcDialOption grpc.DialOption
+}
+
+func NewTopicSubscriber(config *SubscriberConfiguration, namespace, topic string) *TopicSubscriber {
+ return &TopicSubscriber{
+ config: config,
+ namespace: namespace,
+ topic: topic,
+ grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
+ }
+}
+
+func (sub *TopicSubscriber) Connect(bootstrapBroker string) error {
+ if err := sub.doLookup(bootstrapBroker); err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
index 168e3d561..6e7db5d08 100644
--- a/weed/mq/topic/local_manager.go
+++ b/weed/mq/topic/local_manager.go
@@ -25,6 +25,7 @@ func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition
Partitions: make([]*LocalPartition, 0),
}
}
+ manager.topics.SetIfAbsent(topic.String(), localTopic)
if localTopic.findPartition(localPartition.Partition) != nil {
return
}
diff --git a/weed/pb/mq.proto b/weed/pb/mq.proto
index 8b5422596..424dc4b4f 100644
--- a/weed/pb/mq.proto
+++ b/weed/pb/mq.proto
@@ -162,12 +162,20 @@ message PublishResponse {
string redirect_to_broker = 3;
}
message SubscribeRequest {
- message InitMessage {
+ message Consumer {
+ string consumer_group = 1;
+ string consumer_id = 2;
+ }
+ message Cursor {
Topic topic = 1;
Partition partition = 2;
+ oneof offset {
+ int64 start_offset = 3;
+ int64 start_timestamp_ns = 4;
+ }
}
- InitMessage init = 1;
- int64 sequence = 2;
+ Consumer consumer = 1;
+ Cursor cursor = 2;
}
message SubscribeResponse {
message CtrlMessage {
diff --git a/weed/pb/mq_pb/mq.pb.go b/weed/pb/mq_pb/mq.pb.go
index 3cec8deaa..1394d01be 100644
--- a/weed/pb/mq_pb/mq.pb.go
+++ b/weed/pb/mq_pb/mq.pb.go
@@ -1379,8 +1379,8 @@ type SubscribeRequest struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Init *SubscribeRequest_InitMessage `protobuf:"bytes,1,opt,name=init,proto3" json:"init,omitempty"`
- Sequence int64 `protobuf:"varint,2,opt,name=sequence,proto3" json:"sequence,omitempty"`
+ Consumer *SubscribeRequest_Consumer `protobuf:"bytes,1,opt,name=consumer,proto3" json:"consumer,omitempty"`
+ Cursor *SubscribeRequest_Cursor `protobuf:"bytes,2,opt,name=cursor,proto3" json:"cursor,omitempty"`
}
func (x *SubscribeRequest) Reset() {
@@ -1415,18 +1415,18 @@ func (*SubscribeRequest) Descriptor() ([]byte, []int) {
return file_mq_proto_rawDescGZIP(), []int{24}
}
-func (x *SubscribeRequest) GetInit() *SubscribeRequest_InitMessage {
+func (x *SubscribeRequest) GetConsumer() *SubscribeRequest_Consumer {
if x != nil {
- return x.Init
+ return x.Consumer
}
return nil
}
-func (x *SubscribeRequest) GetSequence() int64 {
+func (x *SubscribeRequest) GetCursor() *SubscribeRequest_Cursor {
if x != nil {
- return x.Sequence
+ return x.Cursor
}
- return 0
+ return nil
}
type SubscribeResponse struct {
@@ -1565,17 +1565,17 @@ func (x *PublishRequest_InitMessage) GetPartition() *Partition {
return nil
}
-type SubscribeRequest_InitMessage struct {
+type SubscribeRequest_Consumer struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
- Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"`
+ ConsumerGroup string `protobuf:"bytes,1,opt,name=consumer_group,json=consumerGroup,proto3" json:"consumer_group,omitempty"`
+ ConsumerId string `protobuf:"bytes,2,opt,name=consumer_id,json=consumerId,proto3" json:"consumer_id,omitempty"`
}
-func (x *SubscribeRequest_InitMessage) Reset() {
- *x = SubscribeRequest_InitMessage{}
+func (x *SubscribeRequest_Consumer) Reset() {
+ *x = SubscribeRequest_Consumer{}
if protoimpl.UnsafeEnabled {
mi := &file_mq_proto_msgTypes[27]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -1583,13 +1583,13 @@ func (x *SubscribeRequest_InitMessage) Reset() {
}
}
-func (x *SubscribeRequest_InitMessage) String() string {
+func (x *SubscribeRequest_Consumer) String() string {
return protoimpl.X.MessageStringOf(x)
}
-func (*SubscribeRequest_InitMessage) ProtoMessage() {}
+func (*SubscribeRequest_Consumer) ProtoMessage() {}
-func (x *SubscribeRequest_InitMessage) ProtoReflect() protoreflect.Message {
+func (x *SubscribeRequest_Consumer) ProtoReflect() protoreflect.Message {
mi := &file_mq_proto_msgTypes[27]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -1601,25 +1601,122 @@ func (x *SubscribeRequest_InitMessage) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
-// Deprecated: Use SubscribeRequest_InitMessage.ProtoReflect.Descriptor instead.
-func (*SubscribeRequest_InitMessage) Descriptor() ([]byte, []int) {
+// Deprecated: Use SubscribeRequest_Consumer.ProtoReflect.Descriptor instead.
+func (*SubscribeRequest_Consumer) Descriptor() ([]byte, []int) {
return file_mq_proto_rawDescGZIP(), []int{24, 0}
}
-func (x *SubscribeRequest_InitMessage) GetTopic() *Topic {
+func (x *SubscribeRequest_Consumer) GetConsumerGroup() string {
+ if x != nil {
+ return x.ConsumerGroup
+ }
+ return ""
+}
+
+func (x *SubscribeRequest_Consumer) GetConsumerId() string {
+ if x != nil {
+ return x.ConsumerId
+ }
+ return ""
+}
+
+type SubscribeRequest_Cursor struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Topic *Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
+ Partition *Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"`
+ // Types that are assignable to Offset:
+ //
+ // *SubscribeRequest_Cursor_StartOffset
+ // *SubscribeRequest_Cursor_StartTimestampNs
+ Offset isSubscribeRequest_Cursor_Offset `protobuf_oneof:"offset"`
+}
+
+func (x *SubscribeRequest_Cursor) Reset() {
+ *x = SubscribeRequest_Cursor{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_mq_proto_msgTypes[28]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *SubscribeRequest_Cursor) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SubscribeRequest_Cursor) ProtoMessage() {}
+
+func (x *SubscribeRequest_Cursor) ProtoReflect() protoreflect.Message {
+ mi := &file_mq_proto_msgTypes[28]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SubscribeRequest_Cursor.ProtoReflect.Descriptor instead.
+func (*SubscribeRequest_Cursor) Descriptor() ([]byte, []int) {
+ return file_mq_proto_rawDescGZIP(), []int{24, 1}
+}
+
+func (x *SubscribeRequest_Cursor) GetTopic() *Topic {
if x != nil {
return x.Topic
}
return nil
}
-func (x *SubscribeRequest_InitMessage) GetPartition() *Partition {
+func (x *SubscribeRequest_Cursor) GetPartition() *Partition {
if x != nil {
return x.Partition
}
return nil
}
+func (m *SubscribeRequest_Cursor) GetOffset() isSubscribeRequest_Cursor_Offset {
+ if m != nil {
+ return m.Offset
+ }
+ return nil
+}
+
+func (x *SubscribeRequest_Cursor) GetStartOffset() int64 {
+ if x, ok := x.GetOffset().(*SubscribeRequest_Cursor_StartOffset); ok {
+ return x.StartOffset
+ }
+ return 0
+}
+
+func (x *SubscribeRequest_Cursor) GetStartTimestampNs() int64 {
+ if x, ok := x.GetOffset().(*SubscribeRequest_Cursor_StartTimestampNs); ok {
+ return x.StartTimestampNs
+ }
+ return 0
+}
+
+type isSubscribeRequest_Cursor_Offset interface {
+ isSubscribeRequest_Cursor_Offset()
+}
+
+type SubscribeRequest_Cursor_StartOffset struct {
+ StartOffset int64 `protobuf:"varint,3,opt,name=start_offset,json=startOffset,proto3,oneof"`
+}
+
+type SubscribeRequest_Cursor_StartTimestampNs struct {
+ StartTimestampNs int64 `protobuf:"varint,4,opt,name=start_timestamp_ns,json=startTimestampNs,proto3,oneof"`
+}
+
+func (*SubscribeRequest_Cursor_StartOffset) isSubscribeRequest_Cursor_Offset() {}
+
+func (*SubscribeRequest_Cursor_StartTimestampNs) isSubscribeRequest_Cursor_Offset() {}
+
type SubscribeResponse_CtrlMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -1632,7 +1729,7 @@ type SubscribeResponse_CtrlMessage struct {
func (x *SubscribeResponse_CtrlMessage) Reset() {
*x = SubscribeResponse_CtrlMessage{}
if protoimpl.UnsafeEnabled {
- mi := &file_mq_proto_msgTypes[28]
+ mi := &file_mq_proto_msgTypes[29]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -1645,7 +1742,7 @@ func (x *SubscribeResponse_CtrlMessage) String() string {
func (*SubscribeResponse_CtrlMessage) ProtoMessage() {}
func (x *SubscribeResponse_CtrlMessage) ProtoReflect() protoreflect.Message {
- mi := &file_mq_proto_msgTypes[28]
+ mi := &file_mq_proto_msgTypes[29]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -1858,108 +1955,122 @@ var file_mq_proto_rawDesc = []byte{
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x2c, 0x0a,
0x12, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5f, 0x74, 0x6f, 0x5f, 0x62, 0x72, 0x6f,
0x6b, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x72, 0x65, 0x64, 0x69, 0x72,
- 0x65, 0x63, 0x74, 0x54, 0x6f, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x22, 0xdf, 0x01, 0x0a, 0x10,
+ 0x65, 0x63, 0x74, 0x54, 0x6f, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x22, 0xb6, 0x03, 0x0a, 0x10,
0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
- 0x12, 0x3e, 0x0a, 0x04, 0x69, 0x6e, 0x69, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2a,
- 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75,
- 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x49,
- 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x04, 0x69, 0x6e, 0x69, 0x74,
- 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x18, 0x02, 0x20, 0x01,
- 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x1a, 0x6f, 0x0a, 0x0b,
- 0x49, 0x6e, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x29, 0x0a, 0x05, 0x74,
- 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73,
- 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52,
- 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x35, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74,
- 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73,
- 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
- 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0xe5, 0x01,
- 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f,
- 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28,
- 0x0b, 0x32, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62,
- 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
- 0x73, 0x65, 0x2e, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00,
- 0x52, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x12, 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02,
- 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
- 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48,
- 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x51, 0x0a, 0x0b, 0x43, 0x74, 0x72, 0x6c, 0x4d,
- 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18,
- 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x2c, 0x0a, 0x12,
- 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5f, 0x74, 0x6f, 0x5f, 0x62, 0x72, 0x6f, 0x6b,
- 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65,
- 0x63, 0x74, 0x54, 0x6f, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65,
- 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xaf, 0x08, 0x0a, 0x10, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65,
- 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69,
- 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x25,
- 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69,
- 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65,
- 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e,
+ 0x12, 0x43, 0x0a, 0x08, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
+ 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65,
+ 0x73, 0x74, 0x2e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x52, 0x08, 0x63, 0x6f, 0x6e,
+ 0x73, 0x75, 0x6d, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18,
+ 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e,
+ 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65,
+ 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x52, 0x06, 0x63, 0x75,
+ 0x72, 0x73, 0x6f, 0x72, 0x1a, 0x52, 0x0a, 0x08, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72,
+ 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x5f, 0x67, 0x72, 0x6f,
+ 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6e, 0x73, 0x75, 0x6d,
+ 0x65, 0x72, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x73, 0x75,
+ 0x6d, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6f,
+ 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x49, 0x64, 0x1a, 0xc9, 0x01, 0x0a, 0x06, 0x43, 0x75, 0x72,
+ 0x73, 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
+ 0x62, 0x2e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x35,
+ 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28,
+ 0x0b, 0x32, 0x17, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62,
+ 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74,
+ 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x0a, 0x0c, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x6f,
+ 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x48, 0x00, 0x52, 0x0b, 0x73,
+ 0x74, 0x61, 0x72, 0x74, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x2e, 0x0a, 0x12, 0x73, 0x74,
+ 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x5f, 0x6e, 0x73,
+ 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x48, 0x00, 0x52, 0x10, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54,
+ 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x4e, 0x73, 0x42, 0x08, 0x0a, 0x06, 0x6f, 0x66,
+ 0x66, 0x73, 0x65, 0x74, 0x22, 0xe5, 0x01, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69,
+ 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x41, 0x0a, 0x04, 0x63, 0x74,
+ 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61,
+ 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62,
+ 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65,
+ 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x63, 0x74, 0x72, 0x6c, 0x12, 0x2f, 0x0a,
+ 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, 0x65,
+ 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x4d,
+ 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x1a, 0x51,
+ 0x0a, 0x0b, 0x43, 0x74, 0x72, 0x6c, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a,
+ 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72,
+ 0x72, 0x6f, 0x72, 0x12, 0x2c, 0x0a, 0x12, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x5f,
+ 0x74, 0x6f, 0x5f, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
+ 0x10, 0x72, 0x65, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x54, 0x6f, 0x42, 0x72, 0x6f, 0x6b, 0x65,
+ 0x72, 0x42, 0x09, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xaf, 0x08, 0x0a,
+ 0x10, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e,
+ 0x67, 0x12, 0x63, 0x0a, 0x10, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c,
+ 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e,
0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c,
- 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
- 0x6f, 0x0a, 0x14, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74,
- 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x29, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
- 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67,
- 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65,
- 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
- 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42,
- 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00,
- 0x12, 0x69, 0x0a, 0x12, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74,
- 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69,
+ 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x6d,
+ 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6e, 0x64,
+ 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70,
+ 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x14, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e,
+ 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x29,
+ 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73,
+ 0x73, 0x69, 0x67, 0x6e, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65,
+ 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x53,
+ 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73,
+ 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x43, 0x68, 0x65, 0x63, 0x6b,
+ 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x27, 0x2e,
+ 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65,
+ 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52,
+ 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69,
0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65,
- 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
- 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43,
- 0x68, 0x65, 0x63, 0x6b, 0x53, 0x65, 0x67, 0x6d, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75,
- 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x0f, 0x43,
- 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x12, 0x24,
- 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68,
- 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71,
- 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
- 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c,
- 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a,
- 0x12, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b,
- 0x65, 0x72, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
+ 0x6e, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
+ 0x22, 0x00, 0x12, 0x60, 0x0a, 0x0f, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65,
+ 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x12, 0x24, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e,
+ 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72,
+ 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x25, 0x2e, 0x6d, 0x65,
+ 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b,
+ 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
+ 0x73, 0x65, 0x22, 0x00, 0x12, 0x69, 0x0a, 0x12, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f,
+ 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x27, 0x2e, 0x6d, 0x65, 0x73,
+ 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70,
+ 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75,
+ 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72,
- 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x6d,
- 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b,
- 0x75, 0x70, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65,
- 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x75, 0x0a, 0x16, 0x52, 0x65, 0x71, 0x75,
- 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f,
- 0x6e, 0x73, 0x12, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
- 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61,
- 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
- 0x2c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52,
- 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69,
- 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
- 0x72, 0x0a, 0x15, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61,
- 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61,
- 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f,
- 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71,
- 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67,
- 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50,
- 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
- 0x65, 0x22, 0x00, 0x12, 0x81, 0x01, 0x0a, 0x1a, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70,
- 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74,
- 0x75, 0x73, 0x12, 0x2f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
- 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74,
- 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75,
- 0x65, 0x73, 0x74, 0x1a, 0x30, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f,
- 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72,
- 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73,
- 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69,
- 0x73, 0x68, 0x12, 0x1c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
- 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
- 0x1a, 0x1d, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e,
- 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
- 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x50, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69,
- 0x62, 0x65, 0x12, 0x1e, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
- 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65,
- 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70,
- 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f,
- 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x4e, 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65,
- 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, 0x10, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x51,
- 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75,
- 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f,
- 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70,
- 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x6f, 0x6b, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12,
+ 0x75, 0x0a, 0x16, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50,
+ 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2b, 0x2e, 0x6d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
+ 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52,
+ 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69,
+ 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x6f, 0x70,
+ 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70,
+ 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x72, 0x0a, 0x15, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e,
+ 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12,
+ 0x2a, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41,
+ 0x73, 0x73, 0x69, 0x67, 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74,
+ 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6d, 0x65,
+ 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67,
+ 0x6e, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73,
+ 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x81, 0x01, 0x0a, 0x1a, 0x43,
+ 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69,
+ 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2f, 0x2e, 0x6d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54, 0x6f,
+ 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74, 0x61,
+ 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x30, 0x2e, 0x6d, 0x65, 0x73,
+ 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x54,
+ 0x6f, 0x70, 0x69, 0x63, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x53, 0x74,
+ 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c,
+ 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x1c, 0x2e, 0x6d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68,
+ 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
+ 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65,
+ 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x50, 0x0a, 0x09,
+ 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x1e, 0x2e, 0x6d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69,
+ 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x65, 0x73, 0x73,
+ 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69,
+ 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x4e,
+ 0x0a, 0x0c, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x6d, 0x71, 0x42, 0x10,
+ 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x51, 0x75, 0x65, 0x75, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f,
+ 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61,
+ 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73,
+ 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x71, 0x5f, 0x70, 0x62, 0x62, 0x06,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -1974,7 +2085,7 @@ func file_mq_proto_rawDescGZIP() []byte {
return file_mq_proto_rawDescData
}
-var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 29)
+var file_mq_proto_msgTypes = make([]protoimpl.MessageInfo, 30)
var file_mq_proto_goTypes = []interface{}{
(*SegmentInfo)(nil), // 0: messaging_pb.SegmentInfo
(*FindBrokerLeaderRequest)(nil), // 1: messaging_pb.FindBrokerLeaderRequest
@@ -2003,8 +2114,9 @@ var file_mq_proto_goTypes = []interface{}{
(*SubscribeRequest)(nil), // 24: messaging_pb.SubscribeRequest
(*SubscribeResponse)(nil), // 25: messaging_pb.SubscribeResponse
(*PublishRequest_InitMessage)(nil), // 26: messaging_pb.PublishRequest.InitMessage
- (*SubscribeRequest_InitMessage)(nil), // 27: messaging_pb.SubscribeRequest.InitMessage
- (*SubscribeResponse_CtrlMessage)(nil), // 28: messaging_pb.SubscribeResponse.CtrlMessage
+ (*SubscribeRequest_Consumer)(nil), // 27: messaging_pb.SubscribeRequest.Consumer
+ (*SubscribeRequest_Cursor)(nil), // 28: messaging_pb.SubscribeRequest.Cursor
+ (*SubscribeResponse_CtrlMessage)(nil), // 29: messaging_pb.SubscribeResponse.CtrlMessage
}
var file_mq_proto_depIdxs = []int32{
5, // 0: messaging_pb.SegmentInfo.segment:type_name -> messaging_pb.Segment
@@ -2023,38 +2135,39 @@ var file_mq_proto_depIdxs = []int32{
14, // 13: messaging_pb.CheckTopicPartitionsStatusResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment
26, // 14: messaging_pb.PublishRequest.init:type_name -> messaging_pb.PublishRequest.InitMessage
21, // 15: messaging_pb.PublishRequest.data:type_name -> messaging_pb.DataMessage
- 27, // 16: messaging_pb.SubscribeRequest.init:type_name -> messaging_pb.SubscribeRequest.InitMessage
- 28, // 17: messaging_pb.SubscribeResponse.ctrl:type_name -> messaging_pb.SubscribeResponse.CtrlMessage
- 21, // 18: messaging_pb.SubscribeResponse.data:type_name -> messaging_pb.DataMessage
- 3, // 19: messaging_pb.PublishRequest.InitMessage.topic:type_name -> messaging_pb.Topic
- 4, // 20: messaging_pb.PublishRequest.InitMessage.partition:type_name -> messaging_pb.Partition
- 3, // 21: messaging_pb.SubscribeRequest.InitMessage.topic:type_name -> messaging_pb.Topic
- 4, // 22: messaging_pb.SubscribeRequest.InitMessage.partition:type_name -> messaging_pb.Partition
- 1, // 23: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest
- 6, // 24: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest
- 8, // 25: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest
- 10, // 26: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest
- 12, // 27: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest
- 15, // 28: messaging_pb.SeaweedMessaging.RequestTopicPartitions:input_type -> messaging_pb.RequestTopicPartitionsRequest
- 17, // 29: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest
- 19, // 30: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:input_type -> messaging_pb.CheckTopicPartitionsStatusRequest
- 22, // 31: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest
- 24, // 32: messaging_pb.SeaweedMessaging.Subscribe:input_type -> messaging_pb.SubscribeRequest
- 2, // 33: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse
- 7, // 34: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse
- 9, // 35: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse
- 11, // 36: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse
- 13, // 37: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse
- 16, // 38: messaging_pb.SeaweedMessaging.RequestTopicPartitions:output_type -> messaging_pb.RequestTopicPartitionsResponse
- 18, // 39: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse
- 20, // 40: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:output_type -> messaging_pb.CheckTopicPartitionsStatusResponse
- 23, // 41: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse
- 25, // 42: messaging_pb.SeaweedMessaging.Subscribe:output_type -> messaging_pb.SubscribeResponse
- 33, // [33:43] is the sub-list for method output_type
- 23, // [23:33] is the sub-list for method input_type
- 23, // [23:23] is the sub-list for extension type_name
- 23, // [23:23] is the sub-list for extension extendee
- 0, // [0:23] is the sub-list for field type_name
+ 27, // 16: messaging_pb.SubscribeRequest.consumer:type_name -> messaging_pb.SubscribeRequest.Consumer
+ 28, // 17: messaging_pb.SubscribeRequest.cursor:type_name -> messaging_pb.SubscribeRequest.Cursor
+ 29, // 18: messaging_pb.SubscribeResponse.ctrl:type_name -> messaging_pb.SubscribeResponse.CtrlMessage
+ 21, // 19: messaging_pb.SubscribeResponse.data:type_name -> messaging_pb.DataMessage
+ 3, // 20: messaging_pb.PublishRequest.InitMessage.topic:type_name -> messaging_pb.Topic
+ 4, // 21: messaging_pb.PublishRequest.InitMessage.partition:type_name -> messaging_pb.Partition
+ 3, // 22: messaging_pb.SubscribeRequest.Cursor.topic:type_name -> messaging_pb.Topic
+ 4, // 23: messaging_pb.SubscribeRequest.Cursor.partition:type_name -> messaging_pb.Partition
+ 1, // 24: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest
+ 6, // 25: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:input_type -> messaging_pb.AssignSegmentBrokersRequest
+ 8, // 26: messaging_pb.SeaweedMessaging.CheckSegmentStatus:input_type -> messaging_pb.CheckSegmentStatusRequest
+ 10, // 27: messaging_pb.SeaweedMessaging.CheckBrokerLoad:input_type -> messaging_pb.CheckBrokerLoadRequest
+ 12, // 28: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest
+ 15, // 29: messaging_pb.SeaweedMessaging.RequestTopicPartitions:input_type -> messaging_pb.RequestTopicPartitionsRequest
+ 17, // 30: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest
+ 19, // 31: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:input_type -> messaging_pb.CheckTopicPartitionsStatusRequest
+ 22, // 32: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest
+ 24, // 33: messaging_pb.SeaweedMessaging.Subscribe:input_type -> messaging_pb.SubscribeRequest
+ 2, // 34: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse
+ 7, // 35: messaging_pb.SeaweedMessaging.AssignSegmentBrokers:output_type -> messaging_pb.AssignSegmentBrokersResponse
+ 9, // 36: messaging_pb.SeaweedMessaging.CheckSegmentStatus:output_type -> messaging_pb.CheckSegmentStatusResponse
+ 11, // 37: messaging_pb.SeaweedMessaging.CheckBrokerLoad:output_type -> messaging_pb.CheckBrokerLoadResponse
+ 13, // 38: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse
+ 16, // 39: messaging_pb.SeaweedMessaging.RequestTopicPartitions:output_type -> messaging_pb.RequestTopicPartitionsResponse
+ 18, // 40: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse
+ 20, // 41: messaging_pb.SeaweedMessaging.CheckTopicPartitionsStatus:output_type -> messaging_pb.CheckTopicPartitionsStatusResponse
+ 23, // 42: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse
+ 25, // 43: messaging_pb.SeaweedMessaging.Subscribe:output_type -> messaging_pb.SubscribeResponse
+ 34, // [34:44] is the sub-list for method output_type
+ 24, // [24:34] is the sub-list for method input_type
+ 24, // [24:24] is the sub-list for extension type_name
+ 24, // [24:24] is the sub-list for extension extendee
+ 0, // [0:24] is the sub-list for field type_name
}
func init() { file_mq_proto_init() }
@@ -2388,7 +2501,7 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[27].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*SubscribeRequest_InitMessage); i {
+ switch v := v.(*SubscribeRequest_Consumer); i {
case 0:
return &v.state
case 1:
@@ -2400,6 +2513,18 @@ func file_mq_proto_init() {
}
}
file_mq_proto_msgTypes[28].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*SubscribeRequest_Cursor); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_mq_proto_msgTypes[29].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SubscribeResponse_CtrlMessage); i {
case 0:
return &v.state
@@ -2420,13 +2545,17 @@ func file_mq_proto_init() {
(*SubscribeResponse_Ctrl)(nil),
(*SubscribeResponse_Data)(nil),
}
+ file_mq_proto_msgTypes[28].OneofWrappers = []interface{}{
+ (*SubscribeRequest_Cursor_StartOffset)(nil),
+ (*SubscribeRequest_Cursor_StartTimestampNs)(nil),
+ }
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_mq_proto_rawDesc,
NumEnums: 0,
- NumMessages: 29,
+ NumMessages: 30,
NumExtensions: 0,
NumServices: 1,
},