aboutsummaryrefslogtreecommitdiff
path: root/weed/pb/mq_broker.proto
diff options
context:
space:
mode:
Diffstat (limited to 'weed/pb/mq_broker.proto')
-rw-r--r--weed/pb/mq_broker.proto96
1 files changed, 78 insertions, 18 deletions
diff --git a/weed/pb/mq_broker.proto b/weed/pb/mq_broker.proto
index 0f12edc85..ff6f95de8 100644
--- a/weed/pb/mq_broker.proto
+++ b/weed/pb/mq_broker.proto
@@ -3,6 +3,7 @@ syntax = "proto3";
package messaging_pb;
import "mq_schema.proto";
+import "filer.proto";
option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb";
option java_package = "seaweedfs.mq";
@@ -25,6 +26,8 @@ service SeaweedMessaging {
// control plane for topic partitions
rpc ListTopics (ListTopicsRequest) returns (ListTopicsResponse) {
}
+ rpc TopicExists (TopicExistsRequest) returns (TopicExistsResponse) {
+ }
rpc ConfigureTopic (ConfigureTopicRequest) returns (ConfigureTopicResponse) {
}
rpc LookupTopicBrokers (LookupTopicBrokersRequest) returns (LookupTopicBrokersResponse) {
@@ -62,6 +65,12 @@ service SeaweedMessaging {
// SQL query support - get unflushed messages from broker's in-memory buffer (streaming)
rpc GetUnflushedMessages (GetUnflushedMessagesRequest) returns (stream GetUnflushedMessagesResponse) {
}
+
+ // Get comprehensive partition range information (offsets, timestamps, and other fields)
+ rpc GetPartitionRangeInfo (GetPartitionRangeInfoRequest) returns (GetPartitionRangeInfoResponse) {
+ }
+
+ // Removed Kafka Gateway Registration - no longer needed
}
//////////////////////////////////////////////////
@@ -114,19 +123,29 @@ message TopicRetention {
message ConfigureTopicRequest {
schema_pb.Topic topic = 1;
int32 partition_count = 2;
- schema_pb.RecordType record_type = 3;
- TopicRetention retention = 4;
+ TopicRetention retention = 3;
+ schema_pb.RecordType message_record_type = 4; // Complete flat schema for the message
+ repeated string key_columns = 5; // Names of columns that form the key
+ string schema_format = 6; // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless
}
message ConfigureTopicResponse {
repeated BrokerPartitionAssignment broker_partition_assignments = 2;
- schema_pb.RecordType record_type = 3;
- TopicRetention retention = 4;
+ TopicRetention retention = 3;
+ schema_pb.RecordType message_record_type = 4; // Complete flat schema for the message
+ repeated string key_columns = 5; // Names of columns that form the key
+ string schema_format = 6; // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless
}
message ListTopicsRequest {
}
message ListTopicsResponse {
repeated schema_pb.Topic topics = 1;
}
+message TopicExistsRequest {
+ schema_pb.Topic topic = 1;
+}
+message TopicExistsResponse {
+ bool exists = 1;
+}
message LookupTopicBrokersRequest {
schema_pb.Topic topic = 1;
}
@@ -145,11 +164,13 @@ message GetTopicConfigurationRequest {
message GetTopicConfigurationResponse {
schema_pb.Topic topic = 1;
int32 partition_count = 2;
- schema_pb.RecordType record_type = 3;
- repeated BrokerPartitionAssignment broker_partition_assignments = 4;
- int64 created_at_ns = 5;
- int64 last_updated_ns = 6;
- TopicRetention retention = 7;
+ repeated BrokerPartitionAssignment broker_partition_assignments = 3;
+ int64 created_at_ns = 4;
+ int64 last_updated_ns = 5;
+ TopicRetention retention = 6;
+ schema_pb.RecordType message_record_type = 7; // Complete flat schema for the message
+ repeated string key_columns = 8; // Names of columns that form the key
+ string schema_format = 9; // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless
}
message GetTopicPublishersRequest {
@@ -266,9 +287,11 @@ message PublishMessageRequest {
}
}
message PublishMessageResponse {
- int64 ack_sequence = 1;
+ int64 ack_ts_ns = 1; // Acknowledgment timestamp in nanoseconds
string error = 2;
bool should_close = 3;
+ int32 error_code = 4; // Structured error code for reliable error mapping
+ int64 assigned_offset = 5; // The actual offset assigned by SeaweedMQ for this message
}
message PublishFollowMeRequest {
message InitMessage {
@@ -303,7 +326,7 @@ message SubscribeMessageRequest {
int32 sliding_window_size = 12;
}
message AckMessage {
- int64 sequence = 1;
+ int64 ts_ns = 1; // Timestamp in nanoseconds for acknowledgment tracking
bytes key = 2;
}
oneof message {
@@ -361,18 +384,55 @@ message CloseSubscribersResponse {
message GetUnflushedMessagesRequest {
schema_pb.Topic topic = 1;
schema_pb.Partition partition = 2;
- int64 start_buffer_index = 3; // Filter by buffer index (messages from buffers >= this index)
+ int64 start_buffer_offset = 3; // Filter by buffer offset (messages from buffers >= this offset)
}
message GetUnflushedMessagesResponse {
- LogEntry message = 1; // Single message per response (streaming)
+ filer_pb.LogEntry message = 1; // Single message per response (streaming)
string error = 2; // Error message if any
bool end_of_stream = 3; // Indicates this is the final response
}
-message LogEntry {
- int64 ts_ns = 1;
- bytes key = 2;
- bytes data = 3;
- uint32 partition_key_hash = 4;
+//////////////////////////////////////////////////
+// Partition range information messages
+
+message GetPartitionRangeInfoRequest {
+ schema_pb.Topic topic = 1;
+ schema_pb.Partition partition = 2;
+}
+
+message GetPartitionRangeInfoResponse {
+ // Offset range information
+ OffsetRangeInfo offset_range = 1;
+
+ // Timestamp range information
+ TimestampRangeInfo timestamp_range = 2;
+
+ // Future: ID range information (for ordered IDs, UUIDs, etc.)
+ // IdRangeInfo id_range = 3;
+
+ // Partition metadata
+ int64 record_count = 10;
+ int64 active_subscriptions = 11;
+ string error = 12;
}
+
+message OffsetRangeInfo {
+ int64 earliest_offset = 1;
+ int64 latest_offset = 2;
+ int64 high_water_mark = 3;
+}
+
+message TimestampRangeInfo {
+ int64 earliest_timestamp_ns = 1; // Earliest message timestamp in nanoseconds
+ int64 latest_timestamp_ns = 2; // Latest message timestamp in nanoseconds
+}
+
+// Future extension for ID ranges
+// message IdRangeInfo {
+// string earliest_id = 1;
+// string latest_id = 2;
+// string id_type = 3; // "uuid", "sequential", "custom", etc.
+// }
+
+// Removed Kafka Gateway Registration messages - no longer needed