aboutsummaryrefslogtreecommitdiff
path: root/weed/pb/messaging.proto
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-16 03:29:57 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-16 03:29:57 -0700
commit508f3490a0a9a2eb0a44ebd45cb5ef03a931c7f9 (patch)
treee38bf6c505151536d67d35568e289559fdf3f081 /weed/pb/messaging.proto
parentf5a748d33c52a2874dbde92746a03140ef379296 (diff)
downloadseaweedfs-508f3490a0a9a2eb0a44ebd45cb5ef03a931c7f9.tar.xz
seaweedfs-508f3490a0a9a2eb0a44ebd45cb5ef03a931c7f9.zip
update messaging proto
Diffstat (limited to 'weed/pb/messaging.proto')
-rw-r--r--weed/pb/messaging.proto65
1 files changed, 47 insertions, 18 deletions
diff --git a/weed/pb/messaging.proto b/weed/pb/messaging.proto
index 050c6fb17..10e317221 100644
--- a/weed/pb/messaging.proto
+++ b/weed/pb/messaging.proto
@@ -9,7 +9,7 @@ option java_outer_classname = "MessagingProto";
service SeaweedMessaging {
- rpc Subscribe (SubscribeRequest) returns (stream Message) {
+ rpc Subscribe (stream SubscriberMessage) returns (stream BrokerMessage) {
}
rpc Publish (stream PublishRequest) returns (stream PublishResponse) {
@@ -25,17 +25,25 @@ service SeaweedMessaging {
//////////////////////////////////////////////////
-message SubscribeRequest {
- string namespace = 1;
- string topic = 2;
- int32 partition = 3;
- enum StartPosition {
- LATEST = 0; // Start at the newest message
- EARLIEST = 1; // Start at the oldest message
- TIMESTAMP = 2; // Start after a specified timestamp, exclusive
+message SubscriberMessage {
+ message InitMessage {
+ string namespace = 1;
+ string topic = 2;
+ int32 partition = 3;
+ enum StartPosition {
+ LATEST = 0; // Start at the newest message
+ EARLIEST = 1; // Start at the oldest message
+ TIMESTAMP = 2; // Start after a specified timestamp, exclusive
+ }
+ StartPosition startPosition = 4; // Where to begin consuming from
+ int64 timestampNs = 5; // timestamp in nano seconds
+ string subscriber_id = 6; // uniquely identify a subscriber to track consumption
+ }
+ InitMessage init = 1;
+ message AckMessage {
+ int64 message_id = 1;
}
- StartPosition startPosition = 4; // Where to begin consuming from
- int64 timestampNs = 5; // timestamp in nano seconds
+ AckMessage ack = 2;
}
message Message {
@@ -45,17 +53,38 @@ message Message {
map<string, bytes> headers = 4; // Message headers
}
+message BrokerMessage {
+ Message data = 1;
+ message RedirectMessage {
+ string new_broker = 1;
+ }
+ RedirectMessage redirect = 2;
+}
+
message PublishRequest {
- string namespace = 1; // only needed on the initial request
- string topic = 2; // only needed on the initial request
- int32 partition = 4;
- bytes key = 5; // Message key
- bytes value = 6; // Message payload
- map<string, bytes> headers = 7; // Message headers
+ message InitMessage {
+ string namespace = 1; // only needed on the initial request
+ string topic = 2; // only needed on the initial request
+ int32 partition = 3;
+ }
+ InitMessage init = 1;
+ message DataMessage {
+ bytes key = 1; // Message key
+ bytes value = 2; // Message payload
+ map<string, bytes> headers = 3; // Message headers
+ }
+ DataMessage data = 2;
}
message PublishResponse {
- int32 partition_count = 1;
+ message ConfigMessage {
+ int32 partition_count = 1;
+ }
+ ConfigMessage config = 1;
+ message RedirectMessage {
+ string new_broker = 1;
+ }
+ RedirectMessage redirect = 2;
}
message ConfigureTopicRequest {