aboutsummaryrefslogtreecommitdiff
path: root/weed/pb/mq.proto
diff options
context:
space:
mode:
Diffstat (limited to 'weed/pb/mq.proto')
-rw-r--r--weed/pb/mq.proto71
1 files changed, 28 insertions, 43 deletions
diff --git a/weed/pb/mq.proto b/weed/pb/mq.proto
index 369f82fb3..9090dc5e8 100644
--- a/weed/pb/mq.proto
+++ b/weed/pb/mq.proto
@@ -46,9 +46,7 @@ service SeaweedMessaging {
rpc SubscribeMessage (SubscribeMessageRequest) returns (stream SubscribeMessageResponse) {
}
// The lead broker asks a follower broker to follow itself
- rpc PublishFollowMe (PublishFollowMeRequest) returns (PublishFollowMeResponse) {
- }
- rpc FollowInMemoryMessages (FollowInMemoryMessagesRequest) returns (stream FollowInMemoryMessagesResponse) {
+ rpc PublishFollowMe (stream PublishFollowMeRequest) returns (stream PublishFollowMeResponse) {
}
}
@@ -99,8 +97,8 @@ message BrokerStats {
message TopicPartitionStats {
Topic topic = 1;
Partition partition = 2;
- int32 consumer_count = 3;
- bool is_leader = 4;
+ int32 publisher_count = 3;
+ int32 subscriber_count = 4;
}
@@ -172,14 +170,9 @@ message SubscriberToSubCoordinatorRequest {
}
}
message SubscriberToSubCoordinatorResponse {
- message AssignedPartition {
- Partition partition = 1;
- int64 ts_ns = 2;
- string broker = 3;
- }
message Assignment {
int64 generation = 1;
- repeated AssignedPartition assigned_partitions = 2;
+ repeated BrokerPartitionAssignment partition_assignments = 2;
}
oneof message {
Assignment assignment = 1;
@@ -187,10 +180,14 @@ message SubscriberToSubCoordinatorResponse {
}
//////////////////////////////////////////////////
+message ControlMessage {
+ bool is_close = 1;
+}
message DataMessage {
bytes key = 1;
bytes value = 2;
int64 ts_ns = 3;
+ ControlMessage ctrl = 4;
}
message PublishMessageRequest {
message InitMessage {
@@ -198,6 +195,7 @@ message PublishMessageRequest {
Partition partition = 2;
int32 ack_interval = 3;
repeated string follower_brokers = 4;
+ string publisher_name = 5; // for debugging
}
oneof message {
InitMessage init = 1;
@@ -211,12 +209,24 @@ message PublishMessageResponse {
bool should_close = 3;
}
message PublishFollowMeRequest {
- Topic topic = 1;
- Partition partition = 2;
- string broker_self = 3;
+ message InitMessage {
+ Topic topic = 1;
+ Partition partition = 2;
+ }
+ message FlushMessage {
+ int64 ts_ns = 1;
+ }
+ message CloseMessage {
+ }
+ oneof message {
+ InitMessage init = 1;
+ DataMessage data = 2;
+ FlushMessage flush = 3;
+ CloseMessage close = 4;
+ }
}
message PublishFollowMeResponse {
- string error = 1;
+ int64 ack_ts_ns = 1;
}
message SubscribeMessageRequest {
message InitMessage {
@@ -226,6 +236,7 @@ message SubscribeMessageRequest {
Topic topic = 4;
PartitionOffset partition_offset = 5;
string filter = 6;
+ repeated string follower_brokers = 7;
}
message AckMessage {
int64 sequence = 1;
@@ -236,39 +247,13 @@ message SubscribeMessageRequest {
}
}
message SubscribeMessageResponse {
- message CtrlMessage {
+ message SubscribeCtrlMessage {
string error = 1;
bool is_end_of_stream = 2;
bool is_end_of_topic = 3;
}
oneof message {
- CtrlMessage ctrl = 1;
- DataMessage data = 2;
- }
-}
-message FollowInMemoryMessagesRequest {
- message InitMessage {
- string consumer_group = 1;
- string consumer_id = 2;
- int32 follower_id = 3;
- Topic topic = 4;
- PartitionOffset partition_offset = 5;
- }
- message AckMessage {
- int64 sequence = 1;
- }
- oneof message {
- InitMessage init = 1;
- AckMessage ack = 2;
- }
-}
-message FollowInMemoryMessagesResponse {
- message CtrlMessage {
- int64 flushed_sequence = 1;
- int32 follower_changed_to_id = 2;
- }
- oneof message {
- CtrlMessage ctrl = 1;
+ SubscribeCtrlMessage ctrl = 1;
DataMessage data = 2;
}
}