diff options
Diffstat (limited to 'weed/pb/mq_broker.proto')
| -rw-r--r-- | weed/pb/mq_broker.proto | 96 |
1 files changed, 78 insertions, 18 deletions
diff --git a/weed/pb/mq_broker.proto b/weed/pb/mq_broker.proto index 0f12edc85..ff6f95de8 100644 --- a/weed/pb/mq_broker.proto +++ b/weed/pb/mq_broker.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package messaging_pb; import "mq_schema.proto"; +import "filer.proto"; option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"; option java_package = "seaweedfs.mq"; @@ -25,6 +26,8 @@ service SeaweedMessaging { // control plane for topic partitions rpc ListTopics (ListTopicsRequest) returns (ListTopicsResponse) { } + rpc TopicExists (TopicExistsRequest) returns (TopicExistsResponse) { + } rpc ConfigureTopic (ConfigureTopicRequest) returns (ConfigureTopicResponse) { } rpc LookupTopicBrokers (LookupTopicBrokersRequest) returns (LookupTopicBrokersResponse) { @@ -62,6 +65,12 @@ service SeaweedMessaging { // SQL query support - get unflushed messages from broker's in-memory buffer (streaming) rpc GetUnflushedMessages (GetUnflushedMessagesRequest) returns (stream GetUnflushedMessagesResponse) { } + + // Get comprehensive partition range information (offsets, timestamps, and other fields) + rpc GetPartitionRangeInfo (GetPartitionRangeInfoRequest) returns (GetPartitionRangeInfoResponse) { + } + + // Removed Kafka Gateway Registration - no longer needed } ////////////////////////////////////////////////// @@ -114,19 +123,29 @@ message TopicRetention { message ConfigureTopicRequest { schema_pb.Topic topic = 1; int32 partition_count = 2; - schema_pb.RecordType record_type = 3; - TopicRetention retention = 4; + TopicRetention retention = 3; + schema_pb.RecordType message_record_type = 4; // Complete flat schema for the message + repeated string key_columns = 5; // Names of columns that form the key + string schema_format = 6; // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless } message ConfigureTopicResponse { repeated BrokerPartitionAssignment broker_partition_assignments = 2; - schema_pb.RecordType record_type = 3; - TopicRetention retention = 4; + TopicRetention retention = 3; + schema_pb.RecordType message_record_type = 4; // Complete flat schema for the message + repeated string key_columns = 5; // Names of columns that form the key + string schema_format = 6; // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless } message ListTopicsRequest { } message ListTopicsResponse { repeated schema_pb.Topic topics = 1; } +message TopicExistsRequest { + schema_pb.Topic topic = 1; +} +message TopicExistsResponse { + bool exists = 1; +} message LookupTopicBrokersRequest { schema_pb.Topic topic = 1; } @@ -145,11 +164,13 @@ message GetTopicConfigurationRequest { message GetTopicConfigurationResponse { schema_pb.Topic topic = 1; int32 partition_count = 2; - schema_pb.RecordType record_type = 3; - repeated BrokerPartitionAssignment broker_partition_assignments = 4; - int64 created_at_ns = 5; - int64 last_updated_ns = 6; - TopicRetention retention = 7; + repeated BrokerPartitionAssignment broker_partition_assignments = 3; + int64 created_at_ns = 4; + int64 last_updated_ns = 5; + TopicRetention retention = 6; + schema_pb.RecordType message_record_type = 7; // Complete flat schema for the message + repeated string key_columns = 8; // Names of columns that form the key + string schema_format = 9; // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless } message GetTopicPublishersRequest { @@ -266,9 +287,11 @@ message PublishMessageRequest { } } message PublishMessageResponse { - int64 ack_sequence = 1; + int64 ack_ts_ns = 1; // Acknowledgment timestamp in nanoseconds string error = 2; bool should_close = 3; + int32 error_code = 4; // Structured error code for reliable error mapping + int64 assigned_offset = 5; // The actual offset assigned by SeaweedMQ for this message } message PublishFollowMeRequest { message InitMessage { @@ -303,7 +326,7 @@ message SubscribeMessageRequest { int32 sliding_window_size = 12; } message AckMessage { - int64 sequence = 1; + int64 ts_ns = 1; // Timestamp in nanoseconds for acknowledgment tracking bytes key = 2; } oneof message { @@ -361,18 +384,55 @@ message CloseSubscribersResponse { message GetUnflushedMessagesRequest { schema_pb.Topic topic = 1; schema_pb.Partition partition = 2; - int64 start_buffer_index = 3; // Filter by buffer index (messages from buffers >= this index) + int64 start_buffer_offset = 3; // Filter by buffer offset (messages from buffers >= this offset) } message GetUnflushedMessagesResponse { - LogEntry message = 1; // Single message per response (streaming) + filer_pb.LogEntry message = 1; // Single message per response (streaming) string error = 2; // Error message if any bool end_of_stream = 3; // Indicates this is the final response } -message LogEntry { - int64 ts_ns = 1; - bytes key = 2; - bytes data = 3; - uint32 partition_key_hash = 4; +////////////////////////////////////////////////// +// Partition range information messages + +message GetPartitionRangeInfoRequest { + schema_pb.Topic topic = 1; + schema_pb.Partition partition = 2; +} + +message GetPartitionRangeInfoResponse { + // Offset range information + OffsetRangeInfo offset_range = 1; + + // Timestamp range information + TimestampRangeInfo timestamp_range = 2; + + // Future: ID range information (for ordered IDs, UUIDs, etc.) + // IdRangeInfo id_range = 3; + + // Partition metadata + int64 record_count = 10; + int64 active_subscriptions = 11; + string error = 12; } + +message OffsetRangeInfo { + int64 earliest_offset = 1; + int64 latest_offset = 2; + int64 high_water_mark = 3; +} + +message TimestampRangeInfo { + int64 earliest_timestamp_ns = 1; // Earliest message timestamp in nanoseconds + int64 latest_timestamp_ns = 2; // Latest message timestamp in nanoseconds +} + +// Future extension for ID ranges +// message IdRangeInfo { +// string earliest_id = 1; +// string latest_id = 2; +// string id_type = 3; // "uuid", "sequential", "custom", etc. +// } + +// Removed Kafka Gateway Registration messages - no longer needed |
