diff options
Diffstat (limited to 'weed/pb')
| -rw-r--r-- | weed/pb/filer.proto | 1 | ||||
| -rw-r--r-- | weed/pb/filer_pb/filer.pb.go | 13 | ||||
| -rw-r--r-- | weed/pb/grpc_client_server.go | 4 | ||||
| -rw-r--r-- | weed/pb/mq_agent.proto | 3 | ||||
| -rw-r--r-- | weed/pb/mq_agent_pb/mq_agent.pb.go | 37 | ||||
| -rw-r--r-- | weed/pb/mq_agent_pb/publish_response_test.go | 102 | ||||
| -rw-r--r-- | weed/pb/mq_broker.proto | 96 | ||||
| -rw-r--r-- | weed/pb/mq_pb/mq_broker.pb.go | 1224 | ||||
| -rw-r--r-- | weed/pb/mq_pb/mq_broker_grpc.pb.go | 78 | ||||
| -rw-r--r-- | weed/pb/mq_schema.proto | 4 | ||||
| -rw-r--r-- | weed/pb/schema_pb/mq_schema.pb.go | 182 | ||||
| -rw-r--r-- | weed/pb/schema_pb/offset_test.go | 93 |
12 files changed, 1302 insertions, 535 deletions
diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index 3eb3d3a14..9257996ed 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -390,6 +390,7 @@ message LogEntry { int32 partition_key_hash = 2; bytes data = 3; bytes key = 4; + int64 offset = 5; // Sequential offset within partition } message KeepConnectedRequest { diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index c8fbe4a43..31de4e652 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -3060,6 +3060,7 @@ type LogEntry struct { PartitionKeyHash int32 `protobuf:"varint,2,opt,name=partition_key_hash,json=partitionKeyHash,proto3" json:"partition_key_hash,omitempty"` Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` Key []byte `protobuf:"bytes,4,opt,name=key,proto3" json:"key,omitempty"` + Offset int64 `protobuf:"varint,5,opt,name=offset,proto3" json:"offset,omitempty"` // Sequential offset within partition unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -3122,6 +3123,13 @@ func (x *LogEntry) GetKey() []byte { return nil } +func (x *LogEntry) GetOffset() int64 { + if x != nil { + return x.Offset + } + return 0 +} + type KeepConnectedRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` @@ -4659,12 +4667,13 @@ const file_filer_proto_rawDesc = "" + "\x11excluded_prefixes\x18\x02 \x03(\tR\x10excludedPrefixes\"b\n" + "\x1bTraverseBfsMetadataResponse\x12\x1c\n" + "\tdirectory\x18\x01 \x01(\tR\tdirectory\x12%\n" + - "\x05entry\x18\x02 \x01(\v2\x0f.filer_pb.EntryR\x05entry\"s\n" + + "\x05entry\x18\x02 \x01(\v2\x0f.filer_pb.EntryR\x05entry\"\x8b\x01\n" + "\bLogEntry\x12\x13\n" + "\x05ts_ns\x18\x01 \x01(\x03R\x04tsNs\x12,\n" + "\x12partition_key_hash\x18\x02 \x01(\x05R\x10partitionKeyHash\x12\x12\n" + "\x04data\x18\x03 \x01(\fR\x04data\x12\x10\n" + - "\x03key\x18\x04 \x01(\fR\x03key\"e\n" + + "\x03key\x18\x04 \x01(\fR\x03key\x12\x16\n" + + "\x06offset\x18\x05 \x01(\x03R\x06offset\"e\n" + "\x14KeepConnectedRequest\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x12\x1b\n" + "\tgrpc_port\x18\x02 \x01(\rR\bgrpcPort\x12\x1c\n" + diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index 26cdb4f37..e822c36c8 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -290,12 +290,12 @@ func WithFilerClient(streamingMode bool, signature int32, filer ServerAddress, g } -func WithGrpcFilerClient(streamingMode bool, signature int32, filerGrpcAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { +func WithGrpcFilerClient(streamingMode bool, signature int32, filerAddress ServerAddress, grpcDialOption grpc.DialOption, fn func(client filer_pb.SeaweedFilerClient) error) error { return WithGrpcClient(streamingMode, signature, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, filerGrpcAddress.ToGrpcAddress(), false, grpcDialOption) + }, filerAddress.ToGrpcAddress(), false, grpcDialOption) } diff --git a/weed/pb/mq_agent.proto b/weed/pb/mq_agent.proto index 91f5a4cfc..6457cbcd8 100644 --- a/weed/pb/mq_agent.proto +++ b/weed/pb/mq_agent.proto @@ -53,6 +53,8 @@ message PublishRecordRequest { message PublishRecordResponse { int64 ack_sequence = 1; string error = 2; + int64 base_offset = 3; // First offset assigned to this batch + int64 last_offset = 4; // Last offset assigned to this batch } ////////////////////////////////////////////////// message SubscribeRecordRequest { @@ -78,5 +80,6 @@ message SubscribeRecordResponse { string error = 5; bool is_end_of_stream = 6; bool is_end_of_topic = 7; + int64 offset = 8; // Sequential offset within partition } ////////////////////////////////////////////////// diff --git a/weed/pb/mq_agent_pb/mq_agent.pb.go b/weed/pb/mq_agent_pb/mq_agent.pb.go index 11f1ac551..bc321e957 100644 --- a/weed/pb/mq_agent_pb/mq_agent.pb.go +++ b/weed/pb/mq_agent_pb/mq_agent.pb.go @@ -296,6 +296,8 @@ type PublishRecordResponse struct { state protoimpl.MessageState `protogen:"open.v1"` AckSequence int64 `protobuf:"varint,1,opt,name=ack_sequence,json=ackSequence,proto3" json:"ack_sequence,omitempty"` Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + BaseOffset int64 `protobuf:"varint,3,opt,name=base_offset,json=baseOffset,proto3" json:"base_offset,omitempty"` // First offset assigned to this batch + LastOffset int64 `protobuf:"varint,4,opt,name=last_offset,json=lastOffset,proto3" json:"last_offset,omitempty"` // Last offset assigned to this batch unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -344,6 +346,20 @@ func (x *PublishRecordResponse) GetError() string { return "" } +func (x *PublishRecordResponse) GetBaseOffset() int64 { + if x != nil { + return x.BaseOffset + } + return 0 +} + +func (x *PublishRecordResponse) GetLastOffset() int64 { + if x != nil { + return x.LastOffset + } + return 0 +} + // //////////////////////////////////////////////// type SubscribeRecordRequest struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -413,6 +429,7 @@ type SubscribeRecordResponse struct { Error string `protobuf:"bytes,5,opt,name=error,proto3" json:"error,omitempty"` IsEndOfStream bool `protobuf:"varint,6,opt,name=is_end_of_stream,json=isEndOfStream,proto3" json:"is_end_of_stream,omitempty"` IsEndOfTopic bool `protobuf:"varint,7,opt,name=is_end_of_topic,json=isEndOfTopic,proto3" json:"is_end_of_topic,omitempty"` + Offset int64 `protobuf:"varint,8,opt,name=offset,proto3" json:"offset,omitempty"` // Sequential offset within partition unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -489,6 +506,13 @@ func (x *SubscribeRecordResponse) GetIsEndOfTopic() bool { return false } +func (x *SubscribeRecordResponse) GetOffset() int64 { + if x != nil { + return x.Offset + } + return 0 +} + type SubscribeRecordRequest_InitSubscribeRecordRequest struct { state protoimpl.MessageState `protogen:"open.v1"` ConsumerGroup string `protobuf:"bytes,1,opt,name=consumer_group,json=consumerGroup,proto3" json:"consumer_group,omitempty"` @@ -621,10 +645,14 @@ const file_mq_agent_proto_rawDesc = "" + "\n" + "session_id\x18\x01 \x01(\x03R\tsessionId\x12\x10\n" + "\x03key\x18\x02 \x01(\fR\x03key\x12,\n" + - "\x05value\x18\x03 \x01(\v2\x16.schema_pb.RecordValueR\x05value\"P\n" + + "\x05value\x18\x03 \x01(\v2\x16.schema_pb.RecordValueR\x05value\"\x92\x01\n" + "\x15PublishRecordResponse\x12!\n" + "\fack_sequence\x18\x01 \x01(\x03R\vackSequence\x12\x14\n" + - "\x05error\x18\x02 \x01(\tR\x05error\"\xfb\x04\n" + + "\x05error\x18\x02 \x01(\tR\x05error\x12\x1f\n" + + "\vbase_offset\x18\x03 \x01(\x03R\n" + + "baseOffset\x12\x1f\n" + + "\vlast_offset\x18\x04 \x01(\x03R\n" + + "lastOffset\"\xfb\x04\n" + "\x16SubscribeRecordRequest\x12S\n" + "\x04init\x18\x01 \x01(\v2?.messaging_pb.SubscribeRecordRequest.InitSubscribeRecordRequestR\x04init\x12!\n" + "\fack_sequence\x18\x02 \x01(\x03R\vackSequence\x12\x17\n" + @@ -641,14 +669,15 @@ const file_mq_agent_proto_rawDesc = "" + "\x06filter\x18\n" + " \x01(\tR\x06filter\x12:\n" + "\x19max_subscribed_partitions\x18\v \x01(\x05R\x17maxSubscribedPartitions\x12.\n" + - "\x13sliding_window_size\x18\f \x01(\x05R\x11slidingWindowSize\"\xd4\x01\n" + + "\x13sliding_window_size\x18\f \x01(\x05R\x11slidingWindowSize\"\xec\x01\n" + "\x17SubscribeRecordResponse\x12\x10\n" + "\x03key\x18\x02 \x01(\fR\x03key\x12,\n" + "\x05value\x18\x03 \x01(\v2\x16.schema_pb.RecordValueR\x05value\x12\x13\n" + "\x05ts_ns\x18\x04 \x01(\x03R\x04tsNs\x12\x14\n" + "\x05error\x18\x05 \x01(\tR\x05error\x12'\n" + "\x10is_end_of_stream\x18\x06 \x01(\bR\risEndOfStream\x12%\n" + - "\x0fis_end_of_topic\x18\a \x01(\bR\fisEndOfTopic2\xb9\x03\n" + + "\x0fis_end_of_topic\x18\a \x01(\bR\fisEndOfTopic\x12\x16\n" + + "\x06offset\x18\b \x01(\x03R\x06offset2\xb9\x03\n" + "\x15SeaweedMessagingAgent\x12l\n" + "\x13StartPublishSession\x12(.messaging_pb.StartPublishSessionRequest\x1a).messaging_pb.StartPublishSessionResponse\"\x00\x12l\n" + "\x13ClosePublishSession\x12(.messaging_pb.ClosePublishSessionRequest\x1a).messaging_pb.ClosePublishSessionResponse\"\x00\x12^\n" + diff --git a/weed/pb/mq_agent_pb/publish_response_test.go b/weed/pb/mq_agent_pb/publish_response_test.go new file mode 100644 index 000000000..1f2e767e4 --- /dev/null +++ b/weed/pb/mq_agent_pb/publish_response_test.go @@ -0,0 +1,102 @@ +package mq_agent_pb + +import ( + "testing" + "google.golang.org/protobuf/proto" +) + +func TestPublishRecordResponseSerialization(t *testing.T) { + // Test that PublishRecordResponse can serialize/deserialize with new offset fields + original := &PublishRecordResponse{ + AckSequence: 123, + Error: "", + BaseOffset: 1000, // New field + LastOffset: 1005, // New field + } + + // Test proto marshaling/unmarshaling + data, err := proto.Marshal(original) + if err != nil { + t.Fatalf("Failed to marshal PublishRecordResponse: %v", err) + } + + restored := &PublishRecordResponse{} + err = proto.Unmarshal(data, restored) + if err != nil { + t.Fatalf("Failed to unmarshal PublishRecordResponse: %v", err) + } + + // Verify all fields are preserved + if restored.AckSequence != original.AckSequence { + t.Errorf("AckSequence = %d, want %d", restored.AckSequence, original.AckSequence) + } + if restored.BaseOffset != original.BaseOffset { + t.Errorf("BaseOffset = %d, want %d", restored.BaseOffset, original.BaseOffset) + } + if restored.LastOffset != original.LastOffset { + t.Errorf("LastOffset = %d, want %d", restored.LastOffset, original.LastOffset) + } +} + +func TestSubscribeRecordResponseSerialization(t *testing.T) { + // Test that SubscribeRecordResponse can serialize/deserialize with new offset field + original := &SubscribeRecordResponse{ + Key: []byte("test-key"), + TsNs: 1234567890, + Error: "", + IsEndOfStream: false, + IsEndOfTopic: false, + Offset: 42, // New field + } + + // Test proto marshaling/unmarshaling + data, err := proto.Marshal(original) + if err != nil { + t.Fatalf("Failed to marshal SubscribeRecordResponse: %v", err) + } + + restored := &SubscribeRecordResponse{} + err = proto.Unmarshal(data, restored) + if err != nil { + t.Fatalf("Failed to unmarshal SubscribeRecordResponse: %v", err) + } + + // Verify all fields are preserved + if restored.TsNs != original.TsNs { + t.Errorf("TsNs = %d, want %d", restored.TsNs, original.TsNs) + } + if restored.Offset != original.Offset { + t.Errorf("Offset = %d, want %d", restored.Offset, original.Offset) + } + if string(restored.Key) != string(original.Key) { + t.Errorf("Key = %s, want %s", string(restored.Key), string(original.Key)) + } +} + +func TestPublishRecordResponseBackwardCompatibility(t *testing.T) { + // Test that PublishRecordResponse without offset fields still works + original := &PublishRecordResponse{ + AckSequence: 123, + Error: "", + // BaseOffset and LastOffset not set (defaults to 0) + } + + data, err := proto.Marshal(original) + if err != nil { + t.Fatalf("Failed to marshal PublishRecordResponse: %v", err) + } + + restored := &PublishRecordResponse{} + err = proto.Unmarshal(data, restored) + if err != nil { + t.Fatalf("Failed to unmarshal PublishRecordResponse: %v", err) + } + + // Offset fields should default to 0 + if restored.BaseOffset != 0 { + t.Errorf("BaseOffset = %d, want 0", restored.BaseOffset) + } + if restored.LastOffset != 0 { + t.Errorf("LastOffset = %d, want 0", restored.LastOffset) + } +} diff --git a/weed/pb/mq_broker.proto b/weed/pb/mq_broker.proto index 0f12edc85..ff6f95de8 100644 --- a/weed/pb/mq_broker.proto +++ b/weed/pb/mq_broker.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package messaging_pb; import "mq_schema.proto"; +import "filer.proto"; option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"; option java_package = "seaweedfs.mq"; @@ -25,6 +26,8 @@ service SeaweedMessaging { // control plane for topic partitions rpc ListTopics (ListTopicsRequest) returns (ListTopicsResponse) { } + rpc TopicExists (TopicExistsRequest) returns (TopicExistsResponse) { + } rpc ConfigureTopic (ConfigureTopicRequest) returns (ConfigureTopicResponse) { } rpc LookupTopicBrokers (LookupTopicBrokersRequest) returns (LookupTopicBrokersResponse) { @@ -62,6 +65,12 @@ service SeaweedMessaging { // SQL query support - get unflushed messages from broker's in-memory buffer (streaming) rpc GetUnflushedMessages (GetUnflushedMessagesRequest) returns (stream GetUnflushedMessagesResponse) { } + + // Get comprehensive partition range information (offsets, timestamps, and other fields) + rpc GetPartitionRangeInfo (GetPartitionRangeInfoRequest) returns (GetPartitionRangeInfoResponse) { + } + + // Removed Kafka Gateway Registration - no longer needed } ////////////////////////////////////////////////// @@ -114,19 +123,29 @@ message TopicRetention { message ConfigureTopicRequest { schema_pb.Topic topic = 1; int32 partition_count = 2; - schema_pb.RecordType record_type = 3; - TopicRetention retention = 4; + TopicRetention retention = 3; + schema_pb.RecordType message_record_type = 4; // Complete flat schema for the message + repeated string key_columns = 5; // Names of columns that form the key + string schema_format = 6; // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless } message ConfigureTopicResponse { repeated BrokerPartitionAssignment broker_partition_assignments = 2; - schema_pb.RecordType record_type = 3; - TopicRetention retention = 4; + TopicRetention retention = 3; + schema_pb.RecordType message_record_type = 4; // Complete flat schema for the message + repeated string key_columns = 5; // Names of columns that form the key + string schema_format = 6; // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless } message ListTopicsRequest { } message ListTopicsResponse { repeated schema_pb.Topic topics = 1; } +message TopicExistsRequest { + schema_pb.Topic topic = 1; +} +message TopicExistsResponse { + bool exists = 1; +} message LookupTopicBrokersRequest { schema_pb.Topic topic = 1; } @@ -145,11 +164,13 @@ message GetTopicConfigurationRequest { message GetTopicConfigurationResponse { schema_pb.Topic topic = 1; int32 partition_count = 2; - schema_pb.RecordType record_type = 3; - repeated BrokerPartitionAssignment broker_partition_assignments = 4; - int64 created_at_ns = 5; - int64 last_updated_ns = 6; - TopicRetention retention = 7; + repeated BrokerPartitionAssignment broker_partition_assignments = 3; + int64 created_at_ns = 4; + int64 last_updated_ns = 5; + TopicRetention retention = 6; + schema_pb.RecordType message_record_type = 7; // Complete flat schema for the message + repeated string key_columns = 8; // Names of columns that form the key + string schema_format = 9; // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless } message GetTopicPublishersRequest { @@ -266,9 +287,11 @@ message PublishMessageRequest { } } message PublishMessageResponse { - int64 ack_sequence = 1; + int64 ack_ts_ns = 1; // Acknowledgment timestamp in nanoseconds string error = 2; bool should_close = 3; + int32 error_code = 4; // Structured error code for reliable error mapping + int64 assigned_offset = 5; // The actual offset assigned by SeaweedMQ for this message } message PublishFollowMeRequest { message InitMessage { @@ -303,7 +326,7 @@ message SubscribeMessageRequest { int32 sliding_window_size = 12; } message AckMessage { - int64 sequence = 1; + int64 ts_ns = 1; // Timestamp in nanoseconds for acknowledgment tracking bytes key = 2; } oneof message { @@ -361,18 +384,55 @@ message CloseSubscribersResponse { message GetUnflushedMessagesRequest { schema_pb.Topic topic = 1; schema_pb.Partition partition = 2; - int64 start_buffer_index = 3; // Filter by buffer index (messages from buffers >= this index) + int64 start_buffer_offset = 3; // Filter by buffer offset (messages from buffers >= this offset) } message GetUnflushedMessagesResponse { - LogEntry message = 1; // Single message per response (streaming) + filer_pb.LogEntry message = 1; // Single message per response (streaming) string error = 2; // Error message if any bool end_of_stream = 3; // Indicates this is the final response } -message LogEntry { - int64 ts_ns = 1; - bytes key = 2; - bytes data = 3; - uint32 partition_key_hash = 4; +////////////////////////////////////////////////// +// Partition range information messages + +message GetPartitionRangeInfoRequest { + schema_pb.Topic topic = 1; + schema_pb.Partition partition = 2; +} + +message GetPartitionRangeInfoResponse { + // Offset range information + OffsetRangeInfo offset_range = 1; + + // Timestamp range information + TimestampRangeInfo timestamp_range = 2; + + // Future: ID range information (for ordered IDs, UUIDs, etc.) + // IdRangeInfo id_range = 3; + + // Partition metadata + int64 record_count = 10; + int64 active_subscriptions = 11; + string error = 12; } + +message OffsetRangeInfo { + int64 earliest_offset = 1; + int64 latest_offset = 2; + int64 high_water_mark = 3; +} + +message TimestampRangeInfo { + int64 earliest_timestamp_ns = 1; // Earliest message timestamp in nanoseconds + int64 latest_timestamp_ns = 2; // Latest message timestamp in nanoseconds +} + +// Future extension for ID ranges +// message IdRangeInfo { +// string earliest_id = 1; +// string latest_id = 2; +// string id_type = 3; // "uuid", "sequential", "custom", etc. +// } + +// Removed Kafka Gateway Registration messages - no longer needed diff --git a/weed/pb/mq_pb/mq_broker.pb.go b/weed/pb/mq_pb/mq_broker.pb.go index 6b06f6cfa..ae174f224 100644 --- a/weed/pb/mq_pb/mq_broker.pb.go +++ b/weed/pb/mq_pb/mq_broker.pb.go @@ -7,6 +7,7 @@ package mq_pb import ( + filer_pb "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" schema_pb "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" @@ -483,13 +484,15 @@ func (x *TopicRetention) GetEnabled() bool { } type ConfigureTopicRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - PartitionCount int32 `protobuf:"varint,2,opt,name=partition_count,json=partitionCount,proto3" json:"partition_count,omitempty"` - RecordType *schema_pb.RecordType `protobuf:"bytes,3,opt,name=record_type,json=recordType,proto3" json:"record_type,omitempty"` - Retention *TopicRetention `protobuf:"bytes,4,opt,name=retention,proto3" json:"retention,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + PartitionCount int32 `protobuf:"varint,2,opt,name=partition_count,json=partitionCount,proto3" json:"partition_count,omitempty"` + Retention *TopicRetention `protobuf:"bytes,3,opt,name=retention,proto3" json:"retention,omitempty"` + MessageRecordType *schema_pb.RecordType `protobuf:"bytes,4,opt,name=message_record_type,json=messageRecordType,proto3" json:"message_record_type,omitempty"` // Complete flat schema for the message + KeyColumns []string `protobuf:"bytes,5,rep,name=key_columns,json=keyColumns,proto3" json:"key_columns,omitempty"` // Names of columns that form the key + SchemaFormat string `protobuf:"bytes,6,opt,name=schema_format,json=schemaFormat,proto3" json:"schema_format,omitempty"` // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ConfigureTopicRequest) Reset() { @@ -536,25 +539,41 @@ func (x *ConfigureTopicRequest) GetPartitionCount() int32 { return 0 } -func (x *ConfigureTopicRequest) GetRecordType() *schema_pb.RecordType { +func (x *ConfigureTopicRequest) GetRetention() *TopicRetention { if x != nil { - return x.RecordType + return x.Retention } return nil } -func (x *ConfigureTopicRequest) GetRetention() *TopicRetention { +func (x *ConfigureTopicRequest) GetMessageRecordType() *schema_pb.RecordType { if x != nil { - return x.Retention + return x.MessageRecordType + } + return nil +} + +func (x *ConfigureTopicRequest) GetKeyColumns() []string { + if x != nil { + return x.KeyColumns } return nil } +func (x *ConfigureTopicRequest) GetSchemaFormat() string { + if x != nil { + return x.SchemaFormat + } + return "" +} + type ConfigureTopicResponse struct { state protoimpl.MessageState `protogen:"open.v1"` BrokerPartitionAssignments []*BrokerPartitionAssignment `protobuf:"bytes,2,rep,name=broker_partition_assignments,json=brokerPartitionAssignments,proto3" json:"broker_partition_assignments,omitempty"` - RecordType *schema_pb.RecordType `protobuf:"bytes,3,opt,name=record_type,json=recordType,proto3" json:"record_type,omitempty"` - Retention *TopicRetention `protobuf:"bytes,4,opt,name=retention,proto3" json:"retention,omitempty"` + Retention *TopicRetention `protobuf:"bytes,3,opt,name=retention,proto3" json:"retention,omitempty"` + MessageRecordType *schema_pb.RecordType `protobuf:"bytes,4,opt,name=message_record_type,json=messageRecordType,proto3" json:"message_record_type,omitempty"` // Complete flat schema for the message + KeyColumns []string `protobuf:"bytes,5,rep,name=key_columns,json=keyColumns,proto3" json:"key_columns,omitempty"` // Names of columns that form the key + SchemaFormat string `protobuf:"bytes,6,opt,name=schema_format,json=schemaFormat,proto3" json:"schema_format,omitempty"` // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -596,20 +615,34 @@ func (x *ConfigureTopicResponse) GetBrokerPartitionAssignments() []*BrokerPartit return nil } -func (x *ConfigureTopicResponse) GetRecordType() *schema_pb.RecordType { +func (x *ConfigureTopicResponse) GetRetention() *TopicRetention { if x != nil { - return x.RecordType + return x.Retention } return nil } -func (x *ConfigureTopicResponse) GetRetention() *TopicRetention { +func (x *ConfigureTopicResponse) GetMessageRecordType() *schema_pb.RecordType { if x != nil { - return x.Retention + return x.MessageRecordType } return nil } +func (x *ConfigureTopicResponse) GetKeyColumns() []string { + if x != nil { + return x.KeyColumns + } + return nil +} + +func (x *ConfigureTopicResponse) GetSchemaFormat() string { + if x != nil { + return x.SchemaFormat + } + return "" +} + type ListTopicsRequest struct { state protoimpl.MessageState `protogen:"open.v1"` unknownFields protoimpl.UnknownFields @@ -690,6 +723,94 @@ func (x *ListTopicsResponse) GetTopics() []*schema_pb.Topic { return nil } +type TopicExistsRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TopicExistsRequest) Reset() { + *x = TopicExistsRequest{} + mi := &file_mq_broker_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TopicExistsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TopicExistsRequest) ProtoMessage() {} + +func (x *TopicExistsRequest) ProtoReflect() protoreflect.Message { + mi := &file_mq_broker_proto_msgTypes[13] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TopicExistsRequest.ProtoReflect.Descriptor instead. +func (*TopicExistsRequest) Descriptor() ([]byte, []int) { + return file_mq_broker_proto_rawDescGZIP(), []int{13} +} + +func (x *TopicExistsRequest) GetTopic() *schema_pb.Topic { + if x != nil { + return x.Topic + } + return nil +} + +type TopicExistsResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Exists bool `protobuf:"varint,1,opt,name=exists,proto3" json:"exists,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TopicExistsResponse) Reset() { + *x = TopicExistsResponse{} + mi := &file_mq_broker_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TopicExistsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TopicExistsResponse) ProtoMessage() {} + +func (x *TopicExistsResponse) ProtoReflect() protoreflect.Message { + mi := &file_mq_broker_proto_msgTypes[14] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TopicExistsResponse.ProtoReflect.Descriptor instead. +func (*TopicExistsResponse) Descriptor() ([]byte, []int) { + return file_mq_broker_proto_rawDescGZIP(), []int{14} +} + +func (x *TopicExistsResponse) GetExists() bool { + if x != nil { + return x.Exists + } + return false +} + type LookupTopicBrokersRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` @@ -699,7 +820,7 @@ type LookupTopicBrokersRequest struct { func (x *LookupTopicBrokersRequest) Reset() { *x = LookupTopicBrokersRequest{} - mi := &file_mq_broker_proto_msgTypes[13] + mi := &file_mq_broker_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -711,7 +832,7 @@ func (x *LookupTopicBrokersRequest) String() string { func (*LookupTopicBrokersRequest) ProtoMessage() {} func (x *LookupTopicBrokersRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[13] + mi := &file_mq_broker_proto_msgTypes[15] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -724,7 +845,7 @@ func (x *LookupTopicBrokersRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use LookupTopicBrokersRequest.ProtoReflect.Descriptor instead. func (*LookupTopicBrokersRequest) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{13} + return file_mq_broker_proto_rawDescGZIP(), []int{15} } func (x *LookupTopicBrokersRequest) GetTopic() *schema_pb.Topic { @@ -744,7 +865,7 @@ type LookupTopicBrokersResponse struct { func (x *LookupTopicBrokersResponse) Reset() { *x = LookupTopicBrokersResponse{} - mi := &file_mq_broker_proto_msgTypes[14] + mi := &file_mq_broker_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -756,7 +877,7 @@ func (x *LookupTopicBrokersResponse) String() string { func (*LookupTopicBrokersResponse) ProtoMessage() {} func (x *LookupTopicBrokersResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[14] + mi := &file_mq_broker_proto_msgTypes[16] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -769,7 +890,7 @@ func (x *LookupTopicBrokersResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use LookupTopicBrokersResponse.ProtoReflect.Descriptor instead. func (*LookupTopicBrokersResponse) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{14} + return file_mq_broker_proto_rawDescGZIP(), []int{16} } func (x *LookupTopicBrokersResponse) GetTopic() *schema_pb.Topic { @@ -797,7 +918,7 @@ type BrokerPartitionAssignment struct { func (x *BrokerPartitionAssignment) Reset() { *x = BrokerPartitionAssignment{} - mi := &file_mq_broker_proto_msgTypes[15] + mi := &file_mq_broker_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -809,7 +930,7 @@ func (x *BrokerPartitionAssignment) String() string { func (*BrokerPartitionAssignment) ProtoMessage() {} func (x *BrokerPartitionAssignment) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[15] + mi := &file_mq_broker_proto_msgTypes[17] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -822,7 +943,7 @@ func (x *BrokerPartitionAssignment) ProtoReflect() protoreflect.Message { // Deprecated: Use BrokerPartitionAssignment.ProtoReflect.Descriptor instead. func (*BrokerPartitionAssignment) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{15} + return file_mq_broker_proto_rawDescGZIP(), []int{17} } func (x *BrokerPartitionAssignment) GetPartition() *schema_pb.Partition { @@ -855,7 +976,7 @@ type GetTopicConfigurationRequest struct { func (x *GetTopicConfigurationRequest) Reset() { *x = GetTopicConfigurationRequest{} - mi := &file_mq_broker_proto_msgTypes[16] + mi := &file_mq_broker_proto_msgTypes[18] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -867,7 +988,7 @@ func (x *GetTopicConfigurationRequest) String() string { func (*GetTopicConfigurationRequest) ProtoMessage() {} func (x *GetTopicConfigurationRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[16] + mi := &file_mq_broker_proto_msgTypes[18] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -880,7 +1001,7 @@ func (x *GetTopicConfigurationRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use GetTopicConfigurationRequest.ProtoReflect.Descriptor instead. func (*GetTopicConfigurationRequest) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{16} + return file_mq_broker_proto_rawDescGZIP(), []int{18} } func (x *GetTopicConfigurationRequest) GetTopic() *schema_pb.Topic { @@ -894,18 +1015,20 @@ type GetTopicConfigurationResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` PartitionCount int32 `protobuf:"varint,2,opt,name=partition_count,json=partitionCount,proto3" json:"partition_count,omitempty"` - RecordType *schema_pb.RecordType `protobuf:"bytes,3,opt,name=record_type,json=recordType,proto3" json:"record_type,omitempty"` - BrokerPartitionAssignments []*BrokerPartitionAssignment `protobuf:"bytes,4,rep,name=broker_partition_assignments,json=brokerPartitionAssignments,proto3" json:"broker_partition_assignments,omitempty"` - CreatedAtNs int64 `protobuf:"varint,5,opt,name=created_at_ns,json=createdAtNs,proto3" json:"created_at_ns,omitempty"` - LastUpdatedNs int64 `protobuf:"varint,6,opt,name=last_updated_ns,json=lastUpdatedNs,proto3" json:"last_updated_ns,omitempty"` - Retention *TopicRetention `protobuf:"bytes,7,opt,name=retention,proto3" json:"retention,omitempty"` + BrokerPartitionAssignments []*BrokerPartitionAssignment `protobuf:"bytes,3,rep,name=broker_partition_assignments,json=brokerPartitionAssignments,proto3" json:"broker_partition_assignments,omitempty"` + CreatedAtNs int64 `protobuf:"varint,4,opt,name=created_at_ns,json=createdAtNs,proto3" json:"created_at_ns,omitempty"` + LastUpdatedNs int64 `protobuf:"varint,5,opt,name=last_updated_ns,json=lastUpdatedNs,proto3" json:"last_updated_ns,omitempty"` + Retention *TopicRetention `protobuf:"bytes,6,opt,name=retention,proto3" json:"retention,omitempty"` + MessageRecordType *schema_pb.RecordType `protobuf:"bytes,7,opt,name=message_record_type,json=messageRecordType,proto3" json:"message_record_type,omitempty"` // Complete flat schema for the message + KeyColumns []string `protobuf:"bytes,8,rep,name=key_columns,json=keyColumns,proto3" json:"key_columns,omitempty"` // Names of columns that form the key + SchemaFormat string `protobuf:"bytes,9,opt,name=schema_format,json=schemaFormat,proto3" json:"schema_format,omitempty"` // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *GetTopicConfigurationResponse) Reset() { *x = GetTopicConfigurationResponse{} - mi := &file_mq_broker_proto_msgTypes[17] + mi := &file_mq_broker_proto_msgTypes[19] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -917,7 +1040,7 @@ func (x *GetTopicConfigurationResponse) String() string { func (*GetTopicConfigurationResponse) ProtoMessage() {} func (x *GetTopicConfigurationResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[17] + mi := &file_mq_broker_proto_msgTypes[19] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -930,7 +1053,7 @@ func (x *GetTopicConfigurationResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GetTopicConfigurationResponse.ProtoReflect.Descriptor instead. func (*GetTopicConfigurationResponse) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{17} + return file_mq_broker_proto_rawDescGZIP(), []int{19} } func (x *GetTopicConfigurationResponse) GetTopic() *schema_pb.Topic { @@ -947,13 +1070,6 @@ func (x *GetTopicConfigurationResponse) GetPartitionCount() int32 { return 0 } -func (x *GetTopicConfigurationResponse) GetRecordType() *schema_pb.RecordType { - if x != nil { - return x.RecordType - } - return nil -} - func (x *GetTopicConfigurationResponse) GetBrokerPartitionAssignments() []*BrokerPartitionAssignment { if x != nil { return x.BrokerPartitionAssignments @@ -982,6 +1098,27 @@ func (x *GetTopicConfigurationResponse) GetRetention() *TopicRetention { return nil } +func (x *GetTopicConfigurationResponse) GetMessageRecordType() *schema_pb.RecordType { + if x != nil { + return x.MessageRecordType + } + return nil +} + +func (x *GetTopicConfigurationResponse) GetKeyColumns() []string { + if x != nil { + return x.KeyColumns + } + return nil +} + +func (x *GetTopicConfigurationResponse) GetSchemaFormat() string { + if x != nil { + return x.SchemaFormat + } + return "" +} + type GetTopicPublishersRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` @@ -991,7 +1128,7 @@ type GetTopicPublishersRequest struct { func (x *GetTopicPublishersRequest) Reset() { *x = GetTopicPublishersRequest{} - mi := &file_mq_broker_proto_msgTypes[18] + mi := &file_mq_broker_proto_msgTypes[20] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1003,7 +1140,7 @@ func (x *GetTopicPublishersRequest) String() string { func (*GetTopicPublishersRequest) ProtoMessage() {} func (x *GetTopicPublishersRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[18] + mi := &file_mq_broker_proto_msgTypes[20] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1016,7 +1153,7 @@ func (x *GetTopicPublishersRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use GetTopicPublishersRequest.ProtoReflect.Descriptor instead. func (*GetTopicPublishersRequest) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{18} + return file_mq_broker_proto_rawDescGZIP(), []int{20} } func (x *GetTopicPublishersRequest) GetTopic() *schema_pb.Topic { @@ -1035,7 +1172,7 @@ type GetTopicPublishersResponse struct { func (x *GetTopicPublishersResponse) Reset() { *x = GetTopicPublishersResponse{} - mi := &file_mq_broker_proto_msgTypes[19] + mi := &file_mq_broker_proto_msgTypes[21] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1047,7 +1184,7 @@ func (x *GetTopicPublishersResponse) String() string { func (*GetTopicPublishersResponse) ProtoMessage() {} func (x *GetTopicPublishersResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[19] + mi := &file_mq_broker_proto_msgTypes[21] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1060,7 +1197,7 @@ func (x *GetTopicPublishersResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GetTopicPublishersResponse.ProtoReflect.Descriptor instead. func (*GetTopicPublishersResponse) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{19} + return file_mq_broker_proto_rawDescGZIP(), []int{21} } func (x *GetTopicPublishersResponse) GetPublishers() []*TopicPublisher { @@ -1079,7 +1216,7 @@ type GetTopicSubscribersRequest struct { func (x *GetTopicSubscribersRequest) Reset() { *x = GetTopicSubscribersRequest{} - mi := &file_mq_broker_proto_msgTypes[20] + mi := &file_mq_broker_proto_msgTypes[22] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1091,7 +1228,7 @@ func (x *GetTopicSubscribersRequest) String() string { func (*GetTopicSubscribersRequest) ProtoMessage() {} func (x *GetTopicSubscribersRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[20] + mi := &file_mq_broker_proto_msgTypes[22] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1104,7 +1241,7 @@ func (x *GetTopicSubscribersRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use GetTopicSubscribersRequest.ProtoReflect.Descriptor instead. func (*GetTopicSubscribersRequest) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{20} + return file_mq_broker_proto_rawDescGZIP(), []int{22} } func (x *GetTopicSubscribersRequest) GetTopic() *schema_pb.Topic { @@ -1123,7 +1260,7 @@ type GetTopicSubscribersResponse struct { func (x *GetTopicSubscribersResponse) Reset() { *x = GetTopicSubscribersResponse{} - mi := &file_mq_broker_proto_msgTypes[21] + mi := &file_mq_broker_proto_msgTypes[23] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1135,7 +1272,7 @@ func (x *GetTopicSubscribersResponse) String() string { func (*GetTopicSubscribersResponse) ProtoMessage() {} func (x *GetTopicSubscribersResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[21] + mi := &file_mq_broker_proto_msgTypes[23] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1148,7 +1285,7 @@ func (x *GetTopicSubscribersResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GetTopicSubscribersResponse.ProtoReflect.Descriptor instead. func (*GetTopicSubscribersResponse) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{21} + return file_mq_broker_proto_rawDescGZIP(), []int{23} } func (x *GetTopicSubscribersResponse) GetSubscribers() []*TopicSubscriber { @@ -1175,7 +1312,7 @@ type TopicPublisher struct { func (x *TopicPublisher) Reset() { *x = TopicPublisher{} - mi := &file_mq_broker_proto_msgTypes[22] + mi := &file_mq_broker_proto_msgTypes[24] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1187,7 +1324,7 @@ func (x *TopicPublisher) String() string { func (*TopicPublisher) ProtoMessage() {} func (x *TopicPublisher) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[22] + mi := &file_mq_broker_proto_msgTypes[24] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1200,7 +1337,7 @@ func (x *TopicPublisher) ProtoReflect() protoreflect.Message { // Deprecated: Use TopicPublisher.ProtoReflect.Descriptor instead. func (*TopicPublisher) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{22} + return file_mq_broker_proto_rawDescGZIP(), []int{24} } func (x *TopicPublisher) GetPublisherName() string { @@ -1284,7 +1421,7 @@ type TopicSubscriber struct { func (x *TopicSubscriber) Reset() { *x = TopicSubscriber{} - mi := &file_mq_broker_proto_msgTypes[23] + mi := &file_mq_broker_proto_msgTypes[25] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1296,7 +1433,7 @@ func (x *TopicSubscriber) String() string { func (*TopicSubscriber) ProtoMessage() {} func (x *TopicSubscriber) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[23] + mi := &file_mq_broker_proto_msgTypes[25] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1309,7 +1446,7 @@ func (x *TopicSubscriber) ProtoReflect() protoreflect.Message { // Deprecated: Use TopicSubscriber.ProtoReflect.Descriptor instead. func (*TopicSubscriber) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{23} + return file_mq_broker_proto_rawDescGZIP(), []int{25} } func (x *TopicSubscriber) GetConsumerGroup() string { @@ -1394,7 +1531,7 @@ type AssignTopicPartitionsRequest struct { func (x *AssignTopicPartitionsRequest) Reset() { *x = AssignTopicPartitionsRequest{} - mi := &file_mq_broker_proto_msgTypes[24] + mi := &file_mq_broker_proto_msgTypes[26] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1406,7 +1543,7 @@ func (x *AssignTopicPartitionsRequest) String() string { func (*AssignTopicPartitionsRequest) ProtoMessage() {} func (x *AssignTopicPartitionsRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[24] + mi := &file_mq_broker_proto_msgTypes[26] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1419,7 +1556,7 @@ func (x *AssignTopicPartitionsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use AssignTopicPartitionsRequest.ProtoReflect.Descriptor instead. func (*AssignTopicPartitionsRequest) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{24} + return file_mq_broker_proto_rawDescGZIP(), []int{26} } func (x *AssignTopicPartitionsRequest) GetTopic() *schema_pb.Topic { @@ -1458,7 +1595,7 @@ type AssignTopicPartitionsResponse struct { func (x *AssignTopicPartitionsResponse) Reset() { *x = AssignTopicPartitionsResponse{} - mi := &file_mq_broker_proto_msgTypes[25] + mi := &file_mq_broker_proto_msgTypes[27] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1470,7 +1607,7 @@ func (x *AssignTopicPartitionsResponse) String() string { func (*AssignTopicPartitionsResponse) ProtoMessage() {} func (x *AssignTopicPartitionsResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[25] + mi := &file_mq_broker_proto_msgTypes[27] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1483,7 +1620,7 @@ func (x *AssignTopicPartitionsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use AssignTopicPartitionsResponse.ProtoReflect.Descriptor instead. func (*AssignTopicPartitionsResponse) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{25} + return file_mq_broker_proto_rawDescGZIP(), []int{27} } type SubscriberToSubCoordinatorRequest struct { @@ -1500,7 +1637,7 @@ type SubscriberToSubCoordinatorRequest struct { func (x *SubscriberToSubCoordinatorRequest) Reset() { *x = SubscriberToSubCoordinatorRequest{} - mi := &file_mq_broker_proto_msgTypes[26] + mi := &file_mq_broker_proto_msgTypes[28] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1512,7 +1649,7 @@ func (x *SubscriberToSubCoordinatorRequest) String() string { func (*SubscriberToSubCoordinatorRequest) ProtoMessage() {} func (x *SubscriberToSubCoordinatorRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[26] + mi := &file_mq_broker_proto_msgTypes[28] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1525,7 +1662,7 @@ func (x *SubscriberToSubCoordinatorRequest) ProtoReflect() protoreflect.Message // Deprecated: Use SubscriberToSubCoordinatorRequest.ProtoReflect.Descriptor instead. func (*SubscriberToSubCoordinatorRequest) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{26} + return file_mq_broker_proto_rawDescGZIP(), []int{28} } func (x *SubscriberToSubCoordinatorRequest) GetMessage() isSubscriberToSubCoordinatorRequest_Message { @@ -1599,7 +1736,7 @@ type SubscriberToSubCoordinatorResponse struct { func (x *SubscriberToSubCoordinatorResponse) Reset() { *x = SubscriberToSubCoordinatorResponse{} - mi := &file_mq_broker_proto_msgTypes[27] + mi := &file_mq_broker_proto_msgTypes[29] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1611,7 +1748,7 @@ func (x *SubscriberToSubCoordinatorResponse) String() string { func (*SubscriberToSubCoordinatorResponse) ProtoMessage() {} func (x *SubscriberToSubCoordinatorResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[27] + mi := &file_mq_broker_proto_msgTypes[29] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1624,7 +1761,7 @@ func (x *SubscriberToSubCoordinatorResponse) ProtoReflect() protoreflect.Message // Deprecated: Use SubscriberToSubCoordinatorResponse.ProtoReflect.Descriptor instead. func (*SubscriberToSubCoordinatorResponse) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{27} + return file_mq_broker_proto_rawDescGZIP(), []int{29} } func (x *SubscriberToSubCoordinatorResponse) GetMessage() isSubscriberToSubCoordinatorResponse_Message { @@ -1681,7 +1818,7 @@ type ControlMessage struct { func (x *ControlMessage) Reset() { *x = ControlMessage{} - mi := &file_mq_broker_proto_msgTypes[28] + mi := &file_mq_broker_proto_msgTypes[30] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1693,7 +1830,7 @@ func (x *ControlMessage) String() string { func (*ControlMessage) ProtoMessage() {} func (x *ControlMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[28] + mi := &file_mq_broker_proto_msgTypes[30] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1706,7 +1843,7 @@ func (x *ControlMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use ControlMessage.ProtoReflect.Descriptor instead. func (*ControlMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{28} + return file_mq_broker_proto_rawDescGZIP(), []int{30} } func (x *ControlMessage) GetIsClose() bool { @@ -1735,7 +1872,7 @@ type DataMessage struct { func (x *DataMessage) Reset() { *x = DataMessage{} - mi := &file_mq_broker_proto_msgTypes[29] + mi := &file_mq_broker_proto_msgTypes[31] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1747,7 +1884,7 @@ func (x *DataMessage) String() string { func (*DataMessage) ProtoMessage() {} func (x *DataMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[29] + mi := &file_mq_broker_proto_msgTypes[31] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1760,7 +1897,7 @@ func (x *DataMessage) ProtoReflect() protoreflect.Message { // Deprecated: Use DataMessage.ProtoReflect.Descriptor instead. func (*DataMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{29} + return file_mq_broker_proto_rawDescGZIP(), []int{31} } func (x *DataMessage) GetKey() []byte { @@ -1804,7 +1941,7 @@ type PublishMessageRequest struct { func (x *PublishMessageRequest) Reset() { *x = PublishMessageRequest{} - mi := &file_mq_broker_proto_msgTypes[30] + mi := &file_mq_broker_proto_msgTypes[32] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1816,7 +1953,7 @@ func (x *PublishMessageRequest) String() string { func (*PublishMessageRequest) ProtoMessage() {} func (x *PublishMessageRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[30] + mi := &file_mq_broker_proto_msgTypes[32] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1829,7 +1966,7 @@ func (x *PublishMessageRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishMessageRequest.ProtoReflect.Descriptor instead. func (*PublishMessageRequest) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{30} + return file_mq_broker_proto_rawDescGZIP(), []int{32} } func (x *PublishMessageRequest) GetMessage() isPublishMessageRequest_Message { @@ -1874,17 +2011,19 @@ func (*PublishMessageRequest_Init) isPublishMessageRequest_Message() {} func (*PublishMessageRequest_Data) isPublishMessageRequest_Message() {} type PublishMessageResponse struct { - state protoimpl.MessageState `protogen:"open.v1"` - AckSequence int64 `protobuf:"varint,1,opt,name=ack_sequence,json=ackSequence,proto3" json:"ack_sequence,omitempty"` - Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` - ShouldClose bool `protobuf:"varint,3,opt,name=should_close,json=shouldClose,proto3" json:"should_close,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + AckTsNs int64 `protobuf:"varint,1,opt,name=ack_ts_ns,json=ackTsNs,proto3" json:"ack_ts_ns,omitempty"` // Acknowledgment timestamp in nanoseconds + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + ShouldClose bool `protobuf:"varint,3,opt,name=should_close,json=shouldClose,proto3" json:"should_close,omitempty"` + ErrorCode int32 `protobuf:"varint,4,opt,name=error_code,json=errorCode,proto3" json:"error_code,omitempty"` // Structured error code for reliable error mapping + AssignedOffset int64 `protobuf:"varint,5,opt,name=assigned_offset,json=assignedOffset,proto3" json:"assigned_offset,omitempty"` // The actual offset assigned by SeaweedMQ for this message + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *PublishMessageResponse) Reset() { *x = PublishMessageResponse{} - mi := &file_mq_broker_proto_msgTypes[31] + mi := &file_mq_broker_proto_msgTypes[33] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1896,7 +2035,7 @@ func (x *PublishMessageResponse) String() string { func (*PublishMessageResponse) ProtoMessage() {} func (x *PublishMessageResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[31] + mi := &file_mq_broker_proto_msgTypes[33] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1909,12 +2048,12 @@ func (x *PublishMessageResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishMessageResponse.ProtoReflect.Descriptor instead. func (*PublishMessageResponse) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{31} + return file_mq_broker_proto_rawDescGZIP(), []int{33} } -func (x *PublishMessageResponse) GetAckSequence() int64 { +func (x *PublishMessageResponse) GetAckTsNs() int64 { if x != nil { - return x.AckSequence + return x.AckTsNs } return 0 } @@ -1933,6 +2072,20 @@ func (x *PublishMessageResponse) GetShouldClose() bool { return false } +func (x *PublishMessageResponse) GetErrorCode() int32 { + if x != nil { + return x.ErrorCode + } + return 0 +} + +func (x *PublishMessageResponse) GetAssignedOffset() int64 { + if x != nil { + return x.AssignedOffset + } + return 0 +} + type PublishFollowMeRequest struct { state protoimpl.MessageState `protogen:"open.v1"` // Types that are valid to be assigned to Message: @@ -1948,7 +2101,7 @@ type PublishFollowMeRequest struct { func (x *PublishFollowMeRequest) Reset() { *x = PublishFollowMeRequest{} - mi := &file_mq_broker_proto_msgTypes[32] + mi := &file_mq_broker_proto_msgTypes[34] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1960,7 +2113,7 @@ func (x *PublishFollowMeRequest) String() string { func (*PublishFollowMeRequest) ProtoMessage() {} func (x *PublishFollowMeRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[32] + mi := &file_mq_broker_proto_msgTypes[34] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1973,7 +2126,7 @@ func (x *PublishFollowMeRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishFollowMeRequest.ProtoReflect.Descriptor instead. func (*PublishFollowMeRequest) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{32} + return file_mq_broker_proto_rawDescGZIP(), []int{34} } func (x *PublishFollowMeRequest) GetMessage() isPublishFollowMeRequest_Message { @@ -2056,7 +2209,7 @@ type PublishFollowMeResponse struct { func (x *PublishFollowMeResponse) Reset() { *x = PublishFollowMeResponse{} - mi := &file_mq_broker_proto_msgTypes[33] + mi := &file_mq_broker_proto_msgTypes[35] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2068,7 +2221,7 @@ func (x *PublishFollowMeResponse) String() string { func (*PublishFollowMeResponse) ProtoMessage() {} func (x *PublishFollowMeResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[33] + mi := &file_mq_broker_proto_msgTypes[35] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2081,7 +2234,7 @@ func (x *PublishFollowMeResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishFollowMeResponse.ProtoReflect.Descriptor instead. func (*PublishFollowMeResponse) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{33} + return file_mq_broker_proto_rawDescGZIP(), []int{35} } func (x *PublishFollowMeResponse) GetAckTsNs() int64 { @@ -2104,7 +2257,7 @@ type SubscribeMessageRequest struct { func (x *SubscribeMessageRequest) Reset() { *x = SubscribeMessageRequest{} - mi := &file_mq_broker_proto_msgTypes[34] + mi := &file_mq_broker_proto_msgTypes[36] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2116,7 +2269,7 @@ func (x *SubscribeMessageRequest) String() string { func (*SubscribeMessageRequest) ProtoMessage() {} func (x *SubscribeMessageRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[34] + mi := &file_mq_broker_proto_msgTypes[36] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2129,7 +2282,7 @@ func (x *SubscribeMessageRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use SubscribeMessageRequest.ProtoReflect.Descriptor instead. func (*SubscribeMessageRequest) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{34} + return file_mq_broker_proto_rawDescGZIP(), []int{36} } func (x *SubscribeMessageRequest) GetMessage() isSubscribeMessageRequest_Message { @@ -2186,7 +2339,7 @@ type SubscribeMessageResponse struct { func (x *SubscribeMessageResponse) Reset() { *x = SubscribeMessageResponse{} - mi := &file_mq_broker_proto_msgTypes[35] + mi := &file_mq_broker_proto_msgTypes[37] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2198,7 +2351,7 @@ func (x *SubscribeMessageResponse) String() string { func (*SubscribeMessageResponse) ProtoMessage() {} func (x *SubscribeMessageResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[35] + mi := &file_mq_broker_proto_msgTypes[37] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2211,7 +2364,7 @@ func (x *SubscribeMessageResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use SubscribeMessageResponse.ProtoReflect.Descriptor instead. func (*SubscribeMessageResponse) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{35} + return file_mq_broker_proto_rawDescGZIP(), []int{37} } func (x *SubscribeMessageResponse) GetMessage() isSubscribeMessageResponse_Message { @@ -2269,7 +2422,7 @@ type SubscribeFollowMeRequest struct { func (x *SubscribeFollowMeRequest) Reset() { *x = SubscribeFollowMeRequest{} - mi := &file_mq_broker_proto_msgTypes[36] + mi := &file_mq_broker_proto_msgTypes[38] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2281,7 +2434,7 @@ func (x *SubscribeFollowMeRequest) String() string { func (*SubscribeFollowMeRequest) ProtoMessage() {} func (x *SubscribeFollowMeRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[36] + mi := &file_mq_broker_proto_msgTypes[38] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2294,7 +2447,7 @@ func (x *SubscribeFollowMeRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use SubscribeFollowMeRequest.ProtoReflect.Descriptor instead. func (*SubscribeFollowMeRequest) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{36} + return file_mq_broker_proto_rawDescGZIP(), []int{38} } func (x *SubscribeFollowMeRequest) GetMessage() isSubscribeFollowMeRequest_Message { @@ -2362,7 +2515,7 @@ type SubscribeFollowMeResponse struct { func (x *SubscribeFollowMeResponse) Reset() { *x = SubscribeFollowMeResponse{} - mi := &file_mq_broker_proto_msgTypes[37] + mi := &file_mq_broker_proto_msgTypes[39] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2374,7 +2527,7 @@ func (x *SubscribeFollowMeResponse) String() string { func (*SubscribeFollowMeResponse) ProtoMessage() {} func (x *SubscribeFollowMeResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[37] + mi := &file_mq_broker_proto_msgTypes[39] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2387,7 +2540,7 @@ func (x *SubscribeFollowMeResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use SubscribeFollowMeResponse.ProtoReflect.Descriptor instead. func (*SubscribeFollowMeResponse) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{37} + return file_mq_broker_proto_rawDescGZIP(), []int{39} } func (x *SubscribeFollowMeResponse) GetAckTsNs() int64 { @@ -2407,7 +2560,7 @@ type ClosePublishersRequest struct { func (x *ClosePublishersRequest) Reset() { *x = ClosePublishersRequest{} - mi := &file_mq_broker_proto_msgTypes[38] + mi := &file_mq_broker_proto_msgTypes[40] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2419,7 +2572,7 @@ func (x *ClosePublishersRequest) String() string { func (*ClosePublishersRequest) ProtoMessage() {} func (x *ClosePublishersRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[38] + mi := &file_mq_broker_proto_msgTypes[40] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2432,7 +2585,7 @@ func (x *ClosePublishersRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ClosePublishersRequest.ProtoReflect.Descriptor instead. func (*ClosePublishersRequest) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{38} + return file_mq_broker_proto_rawDescGZIP(), []int{40} } func (x *ClosePublishersRequest) GetTopic() *schema_pb.Topic { @@ -2457,7 +2610,7 @@ type ClosePublishersResponse struct { func (x *ClosePublishersResponse) Reset() { *x = ClosePublishersResponse{} - mi := &file_mq_broker_proto_msgTypes[39] + mi := &file_mq_broker_proto_msgTypes[41] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2469,7 +2622,7 @@ func (x *ClosePublishersResponse) String() string { func (*ClosePublishersResponse) ProtoMessage() {} func (x *ClosePublishersResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[39] + mi := &file_mq_broker_proto_msgTypes[41] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2482,7 +2635,7 @@ func (x *ClosePublishersResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ClosePublishersResponse.ProtoReflect.Descriptor instead. func (*ClosePublishersResponse) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{39} + return file_mq_broker_proto_rawDescGZIP(), []int{41} } type CloseSubscribersRequest struct { @@ -2495,7 +2648,7 @@ type CloseSubscribersRequest struct { func (x *CloseSubscribersRequest) Reset() { *x = CloseSubscribersRequest{} - mi := &file_mq_broker_proto_msgTypes[40] + mi := &file_mq_broker_proto_msgTypes[42] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2507,7 +2660,7 @@ func (x *CloseSubscribersRequest) String() string { func (*CloseSubscribersRequest) ProtoMessage() {} func (x *CloseSubscribersRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[40] + mi := &file_mq_broker_proto_msgTypes[42] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2520,7 +2673,7 @@ func (x *CloseSubscribersRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use CloseSubscribersRequest.ProtoReflect.Descriptor instead. func (*CloseSubscribersRequest) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{40} + return file_mq_broker_proto_rawDescGZIP(), []int{42} } func (x *CloseSubscribersRequest) GetTopic() *schema_pb.Topic { @@ -2545,7 +2698,7 @@ type CloseSubscribersResponse struct { func (x *CloseSubscribersResponse) Reset() { *x = CloseSubscribersResponse{} - mi := &file_mq_broker_proto_msgTypes[41] + mi := &file_mq_broker_proto_msgTypes[43] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2557,7 +2710,7 @@ func (x *CloseSubscribersResponse) String() string { func (*CloseSubscribersResponse) ProtoMessage() {} func (x *CloseSubscribersResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[41] + mi := &file_mq_broker_proto_msgTypes[43] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2570,21 +2723,21 @@ func (x *CloseSubscribersResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use CloseSubscribersResponse.ProtoReflect.Descriptor instead. func (*CloseSubscribersResponse) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{41} + return file_mq_broker_proto_rawDescGZIP(), []int{43} } type GetUnflushedMessagesRequest struct { - state protoimpl.MessageState `protogen:"open.v1"` - Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - Partition *schema_pb.Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` - StartBufferIndex int64 `protobuf:"varint,3,opt,name=start_buffer_index,json=startBufferIndex,proto3" json:"start_buffer_index,omitempty"` // Filter by buffer index (messages from buffers >= this index) - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Partition *schema_pb.Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` + StartBufferOffset int64 `protobuf:"varint,3,opt,name=start_buffer_offset,json=startBufferOffset,proto3" json:"start_buffer_offset,omitempty"` // Filter by buffer offset (messages from buffers >= this offset) + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *GetUnflushedMessagesRequest) Reset() { *x = GetUnflushedMessagesRequest{} - mi := &file_mq_broker_proto_msgTypes[42] + mi := &file_mq_broker_proto_msgTypes[44] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2596,7 +2749,7 @@ func (x *GetUnflushedMessagesRequest) String() string { func (*GetUnflushedMessagesRequest) ProtoMessage() {} func (x *GetUnflushedMessagesRequest) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[42] + mi := &file_mq_broker_proto_msgTypes[44] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2609,7 +2762,7 @@ func (x *GetUnflushedMessagesRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use GetUnflushedMessagesRequest.ProtoReflect.Descriptor instead. func (*GetUnflushedMessagesRequest) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{42} + return file_mq_broker_proto_rawDescGZIP(), []int{44} } func (x *GetUnflushedMessagesRequest) GetTopic() *schema_pb.Topic { @@ -2626,16 +2779,16 @@ func (x *GetUnflushedMessagesRequest) GetPartition() *schema_pb.Partition { return nil } -func (x *GetUnflushedMessagesRequest) GetStartBufferIndex() int64 { +func (x *GetUnflushedMessagesRequest) GetStartBufferOffset() int64 { if x != nil { - return x.StartBufferIndex + return x.StartBufferOffset } return 0 } type GetUnflushedMessagesResponse struct { state protoimpl.MessageState `protogen:"open.v1"` - Message *LogEntry `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` // Single message per response (streaming) + Message *filer_pb.LogEntry `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` // Single message per response (streaming) Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` // Error message if any EndOfStream bool `protobuf:"varint,3,opt,name=end_of_stream,json=endOfStream,proto3" json:"end_of_stream,omitempty"` // Indicates this is the final response unknownFields protoimpl.UnknownFields @@ -2644,7 +2797,7 @@ type GetUnflushedMessagesResponse struct { func (x *GetUnflushedMessagesResponse) Reset() { *x = GetUnflushedMessagesResponse{} - mi := &file_mq_broker_proto_msgTypes[43] + mi := &file_mq_broker_proto_msgTypes[45] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2656,7 +2809,7 @@ func (x *GetUnflushedMessagesResponse) String() string { func (*GetUnflushedMessagesResponse) ProtoMessage() {} func (x *GetUnflushedMessagesResponse) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[43] + mi := &file_mq_broker_proto_msgTypes[45] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2669,10 +2822,10 @@ func (x *GetUnflushedMessagesResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GetUnflushedMessagesResponse.ProtoReflect.Descriptor instead. func (*GetUnflushedMessagesResponse) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{43} + return file_mq_broker_proto_rawDescGZIP(), []int{45} } -func (x *GetUnflushedMessagesResponse) GetMessage() *LogEntry { +func (x *GetUnflushedMessagesResponse) GetMessage() *filer_pb.LogEntry { if x != nil { return x.Message } @@ -2693,31 +2846,29 @@ func (x *GetUnflushedMessagesResponse) GetEndOfStream() bool { return false } -type LogEntry struct { - state protoimpl.MessageState `protogen:"open.v1"` - TsNs int64 `protobuf:"varint,1,opt,name=ts_ns,json=tsNs,proto3" json:"ts_ns,omitempty"` - Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` - Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` - PartitionKeyHash uint32 `protobuf:"varint,4,opt,name=partition_key_hash,json=partitionKeyHash,proto3" json:"partition_key_hash,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache +type GetPartitionRangeInfoRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Topic *schema_pb.Topic `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Partition *schema_pb.Partition `protobuf:"bytes,2,opt,name=partition,proto3" json:"partition,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } -func (x *LogEntry) Reset() { - *x = LogEntry{} - mi := &file_mq_broker_proto_msgTypes[44] +func (x *GetPartitionRangeInfoRequest) Reset() { + *x = GetPartitionRangeInfoRequest{} + mi := &file_mq_broker_proto_msgTypes[46] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *LogEntry) String() string { +func (x *GetPartitionRangeInfoRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*LogEntry) ProtoMessage() {} +func (*GetPartitionRangeInfoRequest) ProtoMessage() {} -func (x *LogEntry) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[44] +func (x *GetPartitionRangeInfoRequest) ProtoReflect() protoreflect.Message { + mi := &file_mq_broker_proto_msgTypes[46] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2728,35 +2879,212 @@ func (x *LogEntry) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use LogEntry.ProtoReflect.Descriptor instead. -func (*LogEntry) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{44} +// Deprecated: Use GetPartitionRangeInfoRequest.ProtoReflect.Descriptor instead. +func (*GetPartitionRangeInfoRequest) Descriptor() ([]byte, []int) { + return file_mq_broker_proto_rawDescGZIP(), []int{46} } -func (x *LogEntry) GetTsNs() int64 { +func (x *GetPartitionRangeInfoRequest) GetTopic() *schema_pb.Topic { if x != nil { - return x.TsNs + return x.Topic } - return 0 + return nil } -func (x *LogEntry) GetKey() []byte { +func (x *GetPartitionRangeInfoRequest) GetPartition() *schema_pb.Partition { if x != nil { - return x.Key + return x.Partition + } + return nil +} + +type GetPartitionRangeInfoResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Offset range information + OffsetRange *OffsetRangeInfo `protobuf:"bytes,1,opt,name=offset_range,json=offsetRange,proto3" json:"offset_range,omitempty"` + // Timestamp range information + TimestampRange *TimestampRangeInfo `protobuf:"bytes,2,opt,name=timestamp_range,json=timestampRange,proto3" json:"timestamp_range,omitempty"` + // Partition metadata + RecordCount int64 `protobuf:"varint,10,opt,name=record_count,json=recordCount,proto3" json:"record_count,omitempty"` + ActiveSubscriptions int64 `protobuf:"varint,11,opt,name=active_subscriptions,json=activeSubscriptions,proto3" json:"active_subscriptions,omitempty"` + Error string `protobuf:"bytes,12,opt,name=error,proto3" json:"error,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetPartitionRangeInfoResponse) Reset() { + *x = GetPartitionRangeInfoResponse{} + mi := &file_mq_broker_proto_msgTypes[47] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetPartitionRangeInfoResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetPartitionRangeInfoResponse) ProtoMessage() {} + +func (x *GetPartitionRangeInfoResponse) ProtoReflect() protoreflect.Message { + mi := &file_mq_broker_proto_msgTypes[47] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetPartitionRangeInfoResponse.ProtoReflect.Descriptor instead. +func (*GetPartitionRangeInfoResponse) Descriptor() ([]byte, []int) { + return file_mq_broker_proto_rawDescGZIP(), []int{47} +} + +func (x *GetPartitionRangeInfoResponse) GetOffsetRange() *OffsetRangeInfo { + if x != nil { + return x.OffsetRange } return nil } -func (x *LogEntry) GetData() []byte { +func (x *GetPartitionRangeInfoResponse) GetTimestampRange() *TimestampRangeInfo { if x != nil { - return x.Data + return x.TimestampRange } return nil } -func (x *LogEntry) GetPartitionKeyHash() uint32 { +func (x *GetPartitionRangeInfoResponse) GetRecordCount() int64 { if x != nil { - return x.PartitionKeyHash + return x.RecordCount + } + return 0 +} + +func (x *GetPartitionRangeInfoResponse) GetActiveSubscriptions() int64 { + if x != nil { + return x.ActiveSubscriptions + } + return 0 +} + +func (x *GetPartitionRangeInfoResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +type OffsetRangeInfo struct { + state protoimpl.MessageState `protogen:"open.v1"` + EarliestOffset int64 `protobuf:"varint,1,opt,name=earliest_offset,json=earliestOffset,proto3" json:"earliest_offset,omitempty"` + LatestOffset int64 `protobuf:"varint,2,opt,name=latest_offset,json=latestOffset,proto3" json:"latest_offset,omitempty"` + HighWaterMark int64 `protobuf:"varint,3,opt,name=high_water_mark,json=highWaterMark,proto3" json:"high_water_mark,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *OffsetRangeInfo) Reset() { + *x = OffsetRangeInfo{} + mi := &file_mq_broker_proto_msgTypes[48] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *OffsetRangeInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*OffsetRangeInfo) ProtoMessage() {} + +func (x *OffsetRangeInfo) ProtoReflect() protoreflect.Message { + mi := &file_mq_broker_proto_msgTypes[48] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use OffsetRangeInfo.ProtoReflect.Descriptor instead. +func (*OffsetRangeInfo) Descriptor() ([]byte, []int) { + return file_mq_broker_proto_rawDescGZIP(), []int{48} +} + +func (x *OffsetRangeInfo) GetEarliestOffset() int64 { + if x != nil { + return x.EarliestOffset + } + return 0 +} + +func (x *OffsetRangeInfo) GetLatestOffset() int64 { + if x != nil { + return x.LatestOffset + } + return 0 +} + +func (x *OffsetRangeInfo) GetHighWaterMark() int64 { + if x != nil { + return x.HighWaterMark + } + return 0 +} + +type TimestampRangeInfo struct { + state protoimpl.MessageState `protogen:"open.v1"` + EarliestTimestampNs int64 `protobuf:"varint,1,opt,name=earliest_timestamp_ns,json=earliestTimestampNs,proto3" json:"earliest_timestamp_ns,omitempty"` // Earliest message timestamp in nanoseconds + LatestTimestampNs int64 `protobuf:"varint,2,opt,name=latest_timestamp_ns,json=latestTimestampNs,proto3" json:"latest_timestamp_ns,omitempty"` // Latest message timestamp in nanoseconds + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *TimestampRangeInfo) Reset() { + *x = TimestampRangeInfo{} + mi := &file_mq_broker_proto_msgTypes[49] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TimestampRangeInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TimestampRangeInfo) ProtoMessage() {} + +func (x *TimestampRangeInfo) ProtoReflect() protoreflect.Message { + mi := &file_mq_broker_proto_msgTypes[49] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TimestampRangeInfo.ProtoReflect.Descriptor instead. +func (*TimestampRangeInfo) Descriptor() ([]byte, []int) { + return file_mq_broker_proto_rawDescGZIP(), []int{49} +} + +func (x *TimestampRangeInfo) GetEarliestTimestampNs() int64 { + if x != nil { + return x.EarliestTimestampNs + } + return 0 +} + +func (x *TimestampRangeInfo) GetLatestTimestampNs() int64 { + if x != nil { + return x.LatestTimestampNs } return 0 } @@ -2770,7 +3098,7 @@ type PublisherToPubBalancerRequest_InitMessage struct { func (x *PublisherToPubBalancerRequest_InitMessage) Reset() { *x = PublisherToPubBalancerRequest_InitMessage{} - mi := &file_mq_broker_proto_msgTypes[46] + mi := &file_mq_broker_proto_msgTypes[51] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2782,7 +3110,7 @@ func (x *PublisherToPubBalancerRequest_InitMessage) String() string { func (*PublisherToPubBalancerRequest_InitMessage) ProtoMessage() {} func (x *PublisherToPubBalancerRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[46] + mi := &file_mq_broker_proto_msgTypes[51] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2826,7 +3154,7 @@ type SubscriberToSubCoordinatorRequest_InitMessage struct { func (x *SubscriberToSubCoordinatorRequest_InitMessage) Reset() { *x = SubscriberToSubCoordinatorRequest_InitMessage{} - mi := &file_mq_broker_proto_msgTypes[47] + mi := &file_mq_broker_proto_msgTypes[52] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2838,7 +3166,7 @@ func (x *SubscriberToSubCoordinatorRequest_InitMessage) String() string { func (*SubscriberToSubCoordinatorRequest_InitMessage) ProtoMessage() {} func (x *SubscriberToSubCoordinatorRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[47] + mi := &file_mq_broker_proto_msgTypes[52] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2851,7 +3179,7 @@ func (x *SubscriberToSubCoordinatorRequest_InitMessage) ProtoReflect() protorefl // Deprecated: Use SubscriberToSubCoordinatorRequest_InitMessage.ProtoReflect.Descriptor instead. func (*SubscriberToSubCoordinatorRequest_InitMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{26, 0} + return file_mq_broker_proto_rawDescGZIP(), []int{28, 0} } func (x *SubscriberToSubCoordinatorRequest_InitMessage) GetConsumerGroup() string { @@ -2898,7 +3226,7 @@ type SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage struct { func (x *SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) Reset() { *x = SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage{} - mi := &file_mq_broker_proto_msgTypes[48] + mi := &file_mq_broker_proto_msgTypes[53] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2910,7 +3238,7 @@ func (x *SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) String() stri func (*SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) ProtoMessage() {} func (x *SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[48] + mi := &file_mq_broker_proto_msgTypes[53] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2923,7 +3251,7 @@ func (x *SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) ProtoReflect( // Deprecated: Use SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage.ProtoReflect.Descriptor instead. func (*SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{26, 1} + return file_mq_broker_proto_rawDescGZIP(), []int{28, 1} } func (x *SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) GetPartition() *schema_pb.Partition { @@ -2942,7 +3270,7 @@ type SubscriberToSubCoordinatorRequest_AckAssignmentMessage struct { func (x *SubscriberToSubCoordinatorRequest_AckAssignmentMessage) Reset() { *x = SubscriberToSubCoordinatorRequest_AckAssignmentMessage{} - mi := &file_mq_broker_proto_msgTypes[49] + mi := &file_mq_broker_proto_msgTypes[54] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2954,7 +3282,7 @@ func (x *SubscriberToSubCoordinatorRequest_AckAssignmentMessage) String() string func (*SubscriberToSubCoordinatorRequest_AckAssignmentMessage) ProtoMessage() {} func (x *SubscriberToSubCoordinatorRequest_AckAssignmentMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[49] + mi := &file_mq_broker_proto_msgTypes[54] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2967,7 +3295,7 @@ func (x *SubscriberToSubCoordinatorRequest_AckAssignmentMessage) ProtoReflect() // Deprecated: Use SubscriberToSubCoordinatorRequest_AckAssignmentMessage.ProtoReflect.Descriptor instead. func (*SubscriberToSubCoordinatorRequest_AckAssignmentMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{26, 2} + return file_mq_broker_proto_rawDescGZIP(), []int{28, 2} } func (x *SubscriberToSubCoordinatorRequest_AckAssignmentMessage) GetPartition() *schema_pb.Partition { @@ -2986,7 +3314,7 @@ type SubscriberToSubCoordinatorResponse_Assignment struct { func (x *SubscriberToSubCoordinatorResponse_Assignment) Reset() { *x = SubscriberToSubCoordinatorResponse_Assignment{} - mi := &file_mq_broker_proto_msgTypes[50] + mi := &file_mq_broker_proto_msgTypes[55] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2998,7 +3326,7 @@ func (x *SubscriberToSubCoordinatorResponse_Assignment) String() string { func (*SubscriberToSubCoordinatorResponse_Assignment) ProtoMessage() {} func (x *SubscriberToSubCoordinatorResponse_Assignment) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[50] + mi := &file_mq_broker_proto_msgTypes[55] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3011,7 +3339,7 @@ func (x *SubscriberToSubCoordinatorResponse_Assignment) ProtoReflect() protorefl // Deprecated: Use SubscriberToSubCoordinatorResponse_Assignment.ProtoReflect.Descriptor instead. func (*SubscriberToSubCoordinatorResponse_Assignment) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{27, 0} + return file_mq_broker_proto_rawDescGZIP(), []int{29, 0} } func (x *SubscriberToSubCoordinatorResponse_Assignment) GetPartitionAssignment() *BrokerPartitionAssignment { @@ -3030,7 +3358,7 @@ type SubscriberToSubCoordinatorResponse_UnAssignment struct { func (x *SubscriberToSubCoordinatorResponse_UnAssignment) Reset() { *x = SubscriberToSubCoordinatorResponse_UnAssignment{} - mi := &file_mq_broker_proto_msgTypes[51] + mi := &file_mq_broker_proto_msgTypes[56] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3042,7 +3370,7 @@ func (x *SubscriberToSubCoordinatorResponse_UnAssignment) String() string { func (*SubscriberToSubCoordinatorResponse_UnAssignment) ProtoMessage() {} func (x *SubscriberToSubCoordinatorResponse_UnAssignment) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[51] + mi := &file_mq_broker_proto_msgTypes[56] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3055,7 +3383,7 @@ func (x *SubscriberToSubCoordinatorResponse_UnAssignment) ProtoReflect() protore // Deprecated: Use SubscriberToSubCoordinatorResponse_UnAssignment.ProtoReflect.Descriptor instead. func (*SubscriberToSubCoordinatorResponse_UnAssignment) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{27, 1} + return file_mq_broker_proto_rawDescGZIP(), []int{29, 1} } func (x *SubscriberToSubCoordinatorResponse_UnAssignment) GetPartition() *schema_pb.Partition { @@ -3078,7 +3406,7 @@ type PublishMessageRequest_InitMessage struct { func (x *PublishMessageRequest_InitMessage) Reset() { *x = PublishMessageRequest_InitMessage{} - mi := &file_mq_broker_proto_msgTypes[52] + mi := &file_mq_broker_proto_msgTypes[57] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3090,7 +3418,7 @@ func (x *PublishMessageRequest_InitMessage) String() string { func (*PublishMessageRequest_InitMessage) ProtoMessage() {} func (x *PublishMessageRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[52] + mi := &file_mq_broker_proto_msgTypes[57] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3103,7 +3431,7 @@ func (x *PublishMessageRequest_InitMessage) ProtoReflect() protoreflect.Message // Deprecated: Use PublishMessageRequest_InitMessage.ProtoReflect.Descriptor instead. func (*PublishMessageRequest_InitMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{30, 0} + return file_mq_broker_proto_rawDescGZIP(), []int{32, 0} } func (x *PublishMessageRequest_InitMessage) GetTopic() *schema_pb.Topic { @@ -3151,7 +3479,7 @@ type PublishFollowMeRequest_InitMessage struct { func (x *PublishFollowMeRequest_InitMessage) Reset() { *x = PublishFollowMeRequest_InitMessage{} - mi := &file_mq_broker_proto_msgTypes[53] + mi := &file_mq_broker_proto_msgTypes[58] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3163,7 +3491,7 @@ func (x *PublishFollowMeRequest_InitMessage) String() string { func (*PublishFollowMeRequest_InitMessage) ProtoMessage() {} func (x *PublishFollowMeRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[53] + mi := &file_mq_broker_proto_msgTypes[58] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3176,7 +3504,7 @@ func (x *PublishFollowMeRequest_InitMessage) ProtoReflect() protoreflect.Message // Deprecated: Use PublishFollowMeRequest_InitMessage.ProtoReflect.Descriptor instead. func (*PublishFollowMeRequest_InitMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{32, 0} + return file_mq_broker_proto_rawDescGZIP(), []int{34, 0} } func (x *PublishFollowMeRequest_InitMessage) GetTopic() *schema_pb.Topic { @@ -3202,7 +3530,7 @@ type PublishFollowMeRequest_FlushMessage struct { func (x *PublishFollowMeRequest_FlushMessage) Reset() { *x = PublishFollowMeRequest_FlushMessage{} - mi := &file_mq_broker_proto_msgTypes[54] + mi := &file_mq_broker_proto_msgTypes[59] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3214,7 +3542,7 @@ func (x *PublishFollowMeRequest_FlushMessage) String() string { func (*PublishFollowMeRequest_FlushMessage) ProtoMessage() {} func (x *PublishFollowMeRequest_FlushMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[54] + mi := &file_mq_broker_proto_msgTypes[59] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3227,7 +3555,7 @@ func (x *PublishFollowMeRequest_FlushMessage) ProtoReflect() protoreflect.Messag // Deprecated: Use PublishFollowMeRequest_FlushMessage.ProtoReflect.Descriptor instead. func (*PublishFollowMeRequest_FlushMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{32, 1} + return file_mq_broker_proto_rawDescGZIP(), []int{34, 1} } func (x *PublishFollowMeRequest_FlushMessage) GetTsNs() int64 { @@ -3245,7 +3573,7 @@ type PublishFollowMeRequest_CloseMessage struct { func (x *PublishFollowMeRequest_CloseMessage) Reset() { *x = PublishFollowMeRequest_CloseMessage{} - mi := &file_mq_broker_proto_msgTypes[55] + mi := &file_mq_broker_proto_msgTypes[60] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3257,7 +3585,7 @@ func (x *PublishFollowMeRequest_CloseMessage) String() string { func (*PublishFollowMeRequest_CloseMessage) ProtoMessage() {} func (x *PublishFollowMeRequest_CloseMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[55] + mi := &file_mq_broker_proto_msgTypes[60] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3270,7 +3598,7 @@ func (x *PublishFollowMeRequest_CloseMessage) ProtoReflect() protoreflect.Messag // Deprecated: Use PublishFollowMeRequest_CloseMessage.ProtoReflect.Descriptor instead. func (*PublishFollowMeRequest_CloseMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{32, 2} + return file_mq_broker_proto_rawDescGZIP(), []int{34, 2} } type SubscribeMessageRequest_InitMessage struct { @@ -3290,7 +3618,7 @@ type SubscribeMessageRequest_InitMessage struct { func (x *SubscribeMessageRequest_InitMessage) Reset() { *x = SubscribeMessageRequest_InitMessage{} - mi := &file_mq_broker_proto_msgTypes[56] + mi := &file_mq_broker_proto_msgTypes[61] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3302,7 +3630,7 @@ func (x *SubscribeMessageRequest_InitMessage) String() string { func (*SubscribeMessageRequest_InitMessage) ProtoMessage() {} func (x *SubscribeMessageRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[56] + mi := &file_mq_broker_proto_msgTypes[61] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3315,7 +3643,7 @@ func (x *SubscribeMessageRequest_InitMessage) ProtoReflect() protoreflect.Messag // Deprecated: Use SubscribeMessageRequest_InitMessage.ProtoReflect.Descriptor instead. func (*SubscribeMessageRequest_InitMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{34, 0} + return file_mq_broker_proto_rawDescGZIP(), []int{36, 0} } func (x *SubscribeMessageRequest_InitMessage) GetConsumerGroup() string { @@ -3383,7 +3711,7 @@ func (x *SubscribeMessageRequest_InitMessage) GetSlidingWindowSize() int32 { type SubscribeMessageRequest_AckMessage struct { state protoimpl.MessageState `protogen:"open.v1"` - Sequence int64 `protobuf:"varint,1,opt,name=sequence,proto3" json:"sequence,omitempty"` + TsNs int64 `protobuf:"varint,1,opt,name=ts_ns,json=tsNs,proto3" json:"ts_ns,omitempty"` // Timestamp in nanoseconds for acknowledgment tracking Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -3391,7 +3719,7 @@ type SubscribeMessageRequest_AckMessage struct { func (x *SubscribeMessageRequest_AckMessage) Reset() { *x = SubscribeMessageRequest_AckMessage{} - mi := &file_mq_broker_proto_msgTypes[57] + mi := &file_mq_broker_proto_msgTypes[62] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3403,7 +3731,7 @@ func (x *SubscribeMessageRequest_AckMessage) String() string { func (*SubscribeMessageRequest_AckMessage) ProtoMessage() {} func (x *SubscribeMessageRequest_AckMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[57] + mi := &file_mq_broker_proto_msgTypes[62] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3416,12 +3744,12 @@ func (x *SubscribeMessageRequest_AckMessage) ProtoReflect() protoreflect.Message // Deprecated: Use SubscribeMessageRequest_AckMessage.ProtoReflect.Descriptor instead. func (*SubscribeMessageRequest_AckMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{34, 1} + return file_mq_broker_proto_rawDescGZIP(), []int{36, 1} } -func (x *SubscribeMessageRequest_AckMessage) GetSequence() int64 { +func (x *SubscribeMessageRequest_AckMessage) GetTsNs() int64 { if x != nil { - return x.Sequence + return x.TsNs } return 0 } @@ -3444,7 +3772,7 @@ type SubscribeMessageResponse_SubscribeCtrlMessage struct { func (x *SubscribeMessageResponse_SubscribeCtrlMessage) Reset() { *x = SubscribeMessageResponse_SubscribeCtrlMessage{} - mi := &file_mq_broker_proto_msgTypes[58] + mi := &file_mq_broker_proto_msgTypes[63] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3456,7 +3784,7 @@ func (x *SubscribeMessageResponse_SubscribeCtrlMessage) String() string { func (*SubscribeMessageResponse_SubscribeCtrlMessage) ProtoMessage() {} func (x *SubscribeMessageResponse_SubscribeCtrlMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[58] + mi := &file_mq_broker_proto_msgTypes[63] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3469,7 +3797,7 @@ func (x *SubscribeMessageResponse_SubscribeCtrlMessage) ProtoReflect() protorefl // Deprecated: Use SubscribeMessageResponse_SubscribeCtrlMessage.ProtoReflect.Descriptor instead. func (*SubscribeMessageResponse_SubscribeCtrlMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{35, 0} + return file_mq_broker_proto_rawDescGZIP(), []int{37, 0} } func (x *SubscribeMessageResponse_SubscribeCtrlMessage) GetError() string { @@ -3504,7 +3832,7 @@ type SubscribeFollowMeRequest_InitMessage struct { func (x *SubscribeFollowMeRequest_InitMessage) Reset() { *x = SubscribeFollowMeRequest_InitMessage{} - mi := &file_mq_broker_proto_msgTypes[59] + mi := &file_mq_broker_proto_msgTypes[64] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3516,7 +3844,7 @@ func (x *SubscribeFollowMeRequest_InitMessage) String() string { func (*SubscribeFollowMeRequest_InitMessage) ProtoMessage() {} func (x *SubscribeFollowMeRequest_InitMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[59] + mi := &file_mq_broker_proto_msgTypes[64] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3529,7 +3857,7 @@ func (x *SubscribeFollowMeRequest_InitMessage) ProtoReflect() protoreflect.Messa // Deprecated: Use SubscribeFollowMeRequest_InitMessage.ProtoReflect.Descriptor instead. func (*SubscribeFollowMeRequest_InitMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{36, 0} + return file_mq_broker_proto_rawDescGZIP(), []int{38, 0} } func (x *SubscribeFollowMeRequest_InitMessage) GetTopic() *schema_pb.Topic { @@ -3562,7 +3890,7 @@ type SubscribeFollowMeRequest_AckMessage struct { func (x *SubscribeFollowMeRequest_AckMessage) Reset() { *x = SubscribeFollowMeRequest_AckMessage{} - mi := &file_mq_broker_proto_msgTypes[60] + mi := &file_mq_broker_proto_msgTypes[65] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3574,7 +3902,7 @@ func (x *SubscribeFollowMeRequest_AckMessage) String() string { func (*SubscribeFollowMeRequest_AckMessage) ProtoMessage() {} func (x *SubscribeFollowMeRequest_AckMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[60] + mi := &file_mq_broker_proto_msgTypes[65] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3587,7 +3915,7 @@ func (x *SubscribeFollowMeRequest_AckMessage) ProtoReflect() protoreflect.Messag // Deprecated: Use SubscribeFollowMeRequest_AckMessage.ProtoReflect.Descriptor instead. func (*SubscribeFollowMeRequest_AckMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{36, 1} + return file_mq_broker_proto_rawDescGZIP(), []int{38, 1} } func (x *SubscribeFollowMeRequest_AckMessage) GetTsNs() int64 { @@ -3605,7 +3933,7 @@ type SubscribeFollowMeRequest_CloseMessage struct { func (x *SubscribeFollowMeRequest_CloseMessage) Reset() { *x = SubscribeFollowMeRequest_CloseMessage{} - mi := &file_mq_broker_proto_msgTypes[61] + mi := &file_mq_broker_proto_msgTypes[66] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3617,7 +3945,7 @@ func (x *SubscribeFollowMeRequest_CloseMessage) String() string { func (*SubscribeFollowMeRequest_CloseMessage) ProtoMessage() {} func (x *SubscribeFollowMeRequest_CloseMessage) ProtoReflect() protoreflect.Message { - mi := &file_mq_broker_proto_msgTypes[61] + mi := &file_mq_broker_proto_msgTypes[66] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3630,14 +3958,14 @@ func (x *SubscribeFollowMeRequest_CloseMessage) ProtoReflect() protoreflect.Mess // Deprecated: Use SubscribeFollowMeRequest_CloseMessage.ProtoReflect.Descriptor instead. func (*SubscribeFollowMeRequest_CloseMessage) Descriptor() ([]byte, []int) { - return file_mq_broker_proto_rawDescGZIP(), []int{36, 2} + return file_mq_broker_proto_rawDescGZIP(), []int{38, 2} } var File_mq_broker_proto protoreflect.FileDescriptor const file_mq_broker_proto_rawDesc = "" + "\n" + - "\x0fmq_broker.proto\x12\fmessaging_pb\x1a\x0fmq_schema.proto\":\n" + + "\x0fmq_broker.proto\x12\fmessaging_pb\x1a\x0fmq_schema.proto\x1a\vfiler.proto\":\n" + "\x17FindBrokerLeaderRequest\x12\x1f\n" + "\vfiler_group\x18\x01 \x01(\tR\n" + "filerGroup\"2\n" + @@ -3667,21 +3995,29 @@ const file_mq_broker_proto_rawDesc = "" + "\x15BalanceTopicsResponse\"W\n" + "\x0eTopicRetention\x12+\n" + "\x11retention_seconds\x18\x01 \x01(\x03R\x10retentionSeconds\x12\x18\n" + - "\aenabled\x18\x02 \x01(\bR\aenabled\"\xdc\x01\n" + + "\aenabled\x18\x02 \x01(\bR\aenabled\"\xb1\x02\n" + "\x15ConfigureTopicRequest\x12&\n" + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x12'\n" + - "\x0fpartition_count\x18\x02 \x01(\x05R\x0epartitionCount\x126\n" + - "\vrecord_type\x18\x03 \x01(\v2\x15.schema_pb.RecordTypeR\n" + - "recordType\x12:\n" + - "\tretention\x18\x04 \x01(\v2\x1c.messaging_pb.TopicRetentionR\tretention\"\xf7\x01\n" + + "\x0fpartition_count\x18\x02 \x01(\x05R\x0epartitionCount\x12:\n" + + "\tretention\x18\x03 \x01(\v2\x1c.messaging_pb.TopicRetentionR\tretention\x12E\n" + + "\x13message_record_type\x18\x04 \x01(\v2\x15.schema_pb.RecordTypeR\x11messageRecordType\x12\x1f\n" + + "\vkey_columns\x18\x05 \x03(\tR\n" + + "keyColumns\x12#\n" + + "\rschema_format\x18\x06 \x01(\tR\fschemaFormat\"\xcc\x02\n" + "\x16ConfigureTopicResponse\x12i\n" + - "\x1cbroker_partition_assignments\x18\x02 \x03(\v2'.messaging_pb.BrokerPartitionAssignmentR\x1abrokerPartitionAssignments\x126\n" + - "\vrecord_type\x18\x03 \x01(\v2\x15.schema_pb.RecordTypeR\n" + - "recordType\x12:\n" + - "\tretention\x18\x04 \x01(\v2\x1c.messaging_pb.TopicRetentionR\tretention\"\x13\n" + + "\x1cbroker_partition_assignments\x18\x02 \x03(\v2'.messaging_pb.BrokerPartitionAssignmentR\x1abrokerPartitionAssignments\x12:\n" + + "\tretention\x18\x03 \x01(\v2\x1c.messaging_pb.TopicRetentionR\tretention\x12E\n" + + "\x13message_record_type\x18\x04 \x01(\v2\x15.schema_pb.RecordTypeR\x11messageRecordType\x12\x1f\n" + + "\vkey_columns\x18\x05 \x03(\tR\n" + + "keyColumns\x12#\n" + + "\rschema_format\x18\x06 \x01(\tR\fschemaFormat\"\x13\n" + "\x11ListTopicsRequest\">\n" + "\x12ListTopicsResponse\x12(\n" + - "\x06topics\x18\x01 \x03(\v2\x10.schema_pb.TopicR\x06topics\"C\n" + + "\x06topics\x18\x01 \x03(\v2\x10.schema_pb.TopicR\x06topics\"<\n" + + "\x12TopicExistsRequest\x12&\n" + + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\"-\n" + + "\x13TopicExistsResponse\x12\x16\n" + + "\x06exists\x18\x01 \x01(\bR\x06exists\"C\n" + "\x19LookupTopicBrokersRequest\x12&\n" + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\"\xaf\x01\n" + "\x1aLookupTopicBrokersResponse\x12&\n" + @@ -3692,16 +4028,18 @@ const file_mq_broker_proto_rawDesc = "" + "\rleader_broker\x18\x02 \x01(\tR\fleaderBroker\x12'\n" + "\x0ffollower_broker\x18\x03 \x01(\tR\x0efollowerBroker\"F\n" + "\x1cGetTopicConfigurationRequest\x12&\n" + - "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\"\x9b\x03\n" + + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\"\xf0\x03\n" + "\x1dGetTopicConfigurationResponse\x12&\n" + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x12'\n" + - "\x0fpartition_count\x18\x02 \x01(\x05R\x0epartitionCount\x126\n" + - "\vrecord_type\x18\x03 \x01(\v2\x15.schema_pb.RecordTypeR\n" + - "recordType\x12i\n" + - "\x1cbroker_partition_assignments\x18\x04 \x03(\v2'.messaging_pb.BrokerPartitionAssignmentR\x1abrokerPartitionAssignments\x12\"\n" + - "\rcreated_at_ns\x18\x05 \x01(\x03R\vcreatedAtNs\x12&\n" + - "\x0flast_updated_ns\x18\x06 \x01(\x03R\rlastUpdatedNs\x12:\n" + - "\tretention\x18\a \x01(\v2\x1c.messaging_pb.TopicRetentionR\tretention\"C\n" + + "\x0fpartition_count\x18\x02 \x01(\x05R\x0epartitionCount\x12i\n" + + "\x1cbroker_partition_assignments\x18\x03 \x03(\v2'.messaging_pb.BrokerPartitionAssignmentR\x1abrokerPartitionAssignments\x12\"\n" + + "\rcreated_at_ns\x18\x04 \x01(\x03R\vcreatedAtNs\x12&\n" + + "\x0flast_updated_ns\x18\x05 \x01(\x03R\rlastUpdatedNs\x12:\n" + + "\tretention\x18\x06 \x01(\v2\x1c.messaging_pb.TopicRetentionR\tretention\x12E\n" + + "\x13message_record_type\x18\a \x01(\v2\x15.schema_pb.RecordTypeR\x11messageRecordType\x12\x1f\n" + + "\vkey_columns\x18\b \x03(\tR\n" + + "keyColumns\x12#\n" + + "\rschema_format\x18\t \x01(\tR\fschemaFormat\"C\n" + "\x19GetTopicPublishersRequest\x12&\n" + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\"Z\n" + "\x1aGetTopicPublishersResponse\x12<\n" + @@ -3785,11 +4123,14 @@ const file_mq_broker_proto_rawDesc = "" + "\fack_interval\x18\x03 \x01(\x05R\vackInterval\x12'\n" + "\x0ffollower_broker\x18\x04 \x01(\tR\x0efollowerBroker\x12%\n" + "\x0epublisher_name\x18\x05 \x01(\tR\rpublisherNameB\t\n" + - "\amessage\"t\n" + - "\x16PublishMessageResponse\x12!\n" + - "\fack_sequence\x18\x01 \x01(\x03R\vackSequence\x12\x14\n" + + "\amessage\"\xb5\x01\n" + + "\x16PublishMessageResponse\x12\x1a\n" + + "\tack_ts_ns\x18\x01 \x01(\x03R\aackTsNs\x12\x14\n" + "\x05error\x18\x02 \x01(\tR\x05error\x12!\n" + - "\fshould_close\x18\x03 \x01(\bR\vshouldClose\"\xd2\x03\n" + + "\fshould_close\x18\x03 \x01(\bR\vshouldClose\x12\x1d\n" + + "\n" + + "error_code\x18\x04 \x01(\x05R\terrorCode\x12'\n" + + "\x0fassigned_offset\x18\x05 \x01(\x03R\x0eassignedOffset\"\xd2\x03\n" + "\x16PublishFollowMeRequest\x12F\n" + "\x04init\x18\x01 \x01(\v20.messaging_pb.PublishFollowMeRequest.InitMessageH\x00R\x04init\x12/\n" + "\x04data\x18\x02 \x01(\v2\x19.messaging_pb.DataMessageH\x00R\x04data\x12I\n" + @@ -3803,7 +4144,7 @@ const file_mq_broker_proto_rawDesc = "" + "\fCloseMessageB\t\n" + "\amessage\"5\n" + "\x17PublishFollowMeResponse\x12\x1a\n" + - "\tack_ts_ns\x18\x01 \x01(\x03R\aackTsNs\"\xfc\x04\n" + + "\tack_ts_ns\x18\x01 \x01(\x03R\aackTsNs\"\xf5\x04\n" + "\x17SubscribeMessageRequest\x12G\n" + "\x04init\x18\x01 \x01(\v21.messaging_pb.SubscribeMessageRequest.InitMessageH\x00R\x04init\x12D\n" + "\x03ack\x18\x02 \x01(\v20.messaging_pb.SubscribeMessageRequest.AckMessageH\x00R\x03ack\x1a\x8a\x03\n" + @@ -3819,10 +4160,10 @@ const file_mq_broker_proto_rawDesc = "" + "\x06filter\x18\n" + " \x01(\tR\x06filter\x12'\n" + "\x0ffollower_broker\x18\v \x01(\tR\x0efollowerBroker\x12.\n" + - "\x13sliding_window_size\x18\f \x01(\x05R\x11slidingWindowSize\x1a:\n" + + "\x13sliding_window_size\x18\f \x01(\x05R\x11slidingWindowSize\x1a3\n" + "\n" + - "AckMessage\x12\x1a\n" + - "\bsequence\x18\x01 \x01(\x03R\bsequence\x12\x10\n" + + "AckMessage\x12\x13\n" + + "\x05ts_ns\x18\x01 \x01(\x03R\x04tsNs\x12\x10\n" + "\x03key\x18\x02 \x01(\fR\x03keyB\t\n" + "\amessage\"\xa7\x02\n" + "\x18SubscribeMessageResponse\x12Q\n" + @@ -3857,26 +4198,39 @@ const file_mq_broker_proto_rawDesc = "" + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x12 \n" + "\funix_time_ns\x18\x02 \x01(\x03R\n" + "unixTimeNs\"\x1a\n" + - "\x18CloseSubscribersResponse\"\xa7\x01\n" + + "\x18CloseSubscribersResponse\"\xa9\x01\n" + "\x1bGetUnflushedMessagesRequest\x12&\n" + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x122\n" + - "\tpartition\x18\x02 \x01(\v2\x14.schema_pb.PartitionR\tpartition\x12,\n" + - "\x12start_buffer_index\x18\x03 \x01(\x03R\x10startBufferIndex\"\x8a\x01\n" + - "\x1cGetUnflushedMessagesResponse\x120\n" + - "\amessage\x18\x01 \x01(\v2\x16.messaging_pb.LogEntryR\amessage\x12\x14\n" + + "\tpartition\x18\x02 \x01(\v2\x14.schema_pb.PartitionR\tpartition\x12.\n" + + "\x13start_buffer_offset\x18\x03 \x01(\x03R\x11startBufferOffset\"\x86\x01\n" + + "\x1cGetUnflushedMessagesResponse\x12,\n" + + "\amessage\x18\x01 \x01(\v2\x12.filer_pb.LogEntryR\amessage\x12\x14\n" + "\x05error\x18\x02 \x01(\tR\x05error\x12\"\n" + - "\rend_of_stream\x18\x03 \x01(\bR\vendOfStream\"s\n" + - "\bLogEntry\x12\x13\n" + - "\x05ts_ns\x18\x01 \x01(\x03R\x04tsNs\x12\x10\n" + - "\x03key\x18\x02 \x01(\fR\x03key\x12\x12\n" + - "\x04data\x18\x03 \x01(\fR\x04data\x12,\n" + - "\x12partition_key_hash\x18\x04 \x01(\rR\x10partitionKeyHash2\x8a\x0f\n" + + "\rend_of_stream\x18\x03 \x01(\bR\vendOfStream\"z\n" + + "\x1cGetPartitionRangeInfoRequest\x12&\n" + + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x122\n" + + "\tpartition\x18\x02 \x01(\v2\x14.schema_pb.PartitionR\tpartition\"\x98\x02\n" + + "\x1dGetPartitionRangeInfoResponse\x12@\n" + + "\foffset_range\x18\x01 \x01(\v2\x1d.messaging_pb.OffsetRangeInfoR\voffsetRange\x12I\n" + + "\x0ftimestamp_range\x18\x02 \x01(\v2 .messaging_pb.TimestampRangeInfoR\x0etimestampRange\x12!\n" + + "\frecord_count\x18\n" + + " \x01(\x03R\vrecordCount\x121\n" + + "\x14active_subscriptions\x18\v \x01(\x03R\x13activeSubscriptions\x12\x14\n" + + "\x05error\x18\f \x01(\tR\x05error\"\x87\x01\n" + + "\x0fOffsetRangeInfo\x12'\n" + + "\x0fearliest_offset\x18\x01 \x01(\x03R\x0eearliestOffset\x12#\n" + + "\rlatest_offset\x18\x02 \x01(\x03R\flatestOffset\x12&\n" + + "\x0fhigh_water_mark\x18\x03 \x01(\x03R\rhighWaterMark\"x\n" + + "\x12TimestampRangeInfo\x122\n" + + "\x15earliest_timestamp_ns\x18\x01 \x01(\x03R\x13earliestTimestampNs\x12.\n" + + "\x13latest_timestamp_ns\x18\x02 \x01(\x03R\x11latestTimestampNs2\xd4\x10\n" + "\x10SeaweedMessaging\x12c\n" + "\x10FindBrokerLeader\x12%.messaging_pb.FindBrokerLeaderRequest\x1a&.messaging_pb.FindBrokerLeaderResponse\"\x00\x12y\n" + "\x16PublisherToPubBalancer\x12+.messaging_pb.PublisherToPubBalancerRequest\x1a,.messaging_pb.PublisherToPubBalancerResponse\"\x00(\x010\x01\x12Z\n" + "\rBalanceTopics\x12\".messaging_pb.BalanceTopicsRequest\x1a#.messaging_pb.BalanceTopicsResponse\"\x00\x12Q\n" + "\n" + - "ListTopics\x12\x1f.messaging_pb.ListTopicsRequest\x1a .messaging_pb.ListTopicsResponse\"\x00\x12]\n" + + "ListTopics\x12\x1f.messaging_pb.ListTopicsRequest\x1a .messaging_pb.ListTopicsResponse\"\x00\x12T\n" + + "\vTopicExists\x12 .messaging_pb.TopicExistsRequest\x1a!.messaging_pb.TopicExistsResponse\"\x00\x12]\n" + "\x0eConfigureTopic\x12#.messaging_pb.ConfigureTopicRequest\x1a$.messaging_pb.ConfigureTopicResponse\"\x00\x12i\n" + "\x12LookupTopicBrokers\x12'.messaging_pb.LookupTopicBrokersRequest\x1a(.messaging_pb.LookupTopicBrokersResponse\"\x00\x12r\n" + "\x15GetTopicConfiguration\x12*.messaging_pb.GetTopicConfigurationRequest\x1a+.messaging_pb.GetTopicConfigurationResponse\"\x00\x12i\n" + @@ -3890,7 +4244,8 @@ const file_mq_broker_proto_rawDesc = "" + "\x10SubscribeMessage\x12%.messaging_pb.SubscribeMessageRequest\x1a&.messaging_pb.SubscribeMessageResponse\"\x00(\x010\x01\x12d\n" + "\x0fPublishFollowMe\x12$.messaging_pb.PublishFollowMeRequest\x1a%.messaging_pb.PublishFollowMeResponse\"\x00(\x010\x01\x12h\n" + "\x11SubscribeFollowMe\x12&.messaging_pb.SubscribeFollowMeRequest\x1a'.messaging_pb.SubscribeFollowMeResponse\"\x00(\x01\x12q\n" + - "\x14GetUnflushedMessages\x12).messaging_pb.GetUnflushedMessagesRequest\x1a*.messaging_pb.GetUnflushedMessagesResponse\"\x000\x01BO\n" + + "\x14GetUnflushedMessages\x12).messaging_pb.GetUnflushedMessagesRequest\x1a*.messaging_pb.GetUnflushedMessagesResponse\"\x000\x01\x12r\n" + + "\x15GetPartitionRangeInfo\x12*.messaging_pb.GetPartitionRangeInfoRequest\x1a+.messaging_pb.GetPartitionRangeInfoResponse\"\x00BO\n" + "\fseaweedfs.mqB\x11MessageQueueProtoZ,github.com/seaweedfs/seaweedfs/weed/pb/mq_pbb\x06proto3" var ( @@ -3905,7 +4260,7 @@ func file_mq_broker_proto_rawDescGZIP() []byte { return file_mq_broker_proto_rawDescData } -var file_mq_broker_proto_msgTypes = make([]protoimpl.MessageInfo, 62) +var file_mq_broker_proto_msgTypes = make([]protoimpl.MessageInfo, 67) var file_mq_broker_proto_goTypes = []any{ (*FindBrokerLeaderRequest)(nil), // 0: messaging_pb.FindBrokerLeaderRequest (*FindBrokerLeaderResponse)(nil), // 1: messaging_pb.FindBrokerLeaderResponse @@ -3920,171 +4275,186 @@ var file_mq_broker_proto_goTypes = []any{ (*ConfigureTopicResponse)(nil), // 10: messaging_pb.ConfigureTopicResponse (*ListTopicsRequest)(nil), // 11: messaging_pb.ListTopicsRequest (*ListTopicsResponse)(nil), // 12: messaging_pb.ListTopicsResponse - (*LookupTopicBrokersRequest)(nil), // 13: messaging_pb.LookupTopicBrokersRequest - (*LookupTopicBrokersResponse)(nil), // 14: messaging_pb.LookupTopicBrokersResponse - (*BrokerPartitionAssignment)(nil), // 15: messaging_pb.BrokerPartitionAssignment - (*GetTopicConfigurationRequest)(nil), // 16: messaging_pb.GetTopicConfigurationRequest - (*GetTopicConfigurationResponse)(nil), // 17: messaging_pb.GetTopicConfigurationResponse - (*GetTopicPublishersRequest)(nil), // 18: messaging_pb.GetTopicPublishersRequest - (*GetTopicPublishersResponse)(nil), // 19: messaging_pb.GetTopicPublishersResponse - (*GetTopicSubscribersRequest)(nil), // 20: messaging_pb.GetTopicSubscribersRequest - (*GetTopicSubscribersResponse)(nil), // 21: messaging_pb.GetTopicSubscribersResponse - (*TopicPublisher)(nil), // 22: messaging_pb.TopicPublisher - (*TopicSubscriber)(nil), // 23: messaging_pb.TopicSubscriber - (*AssignTopicPartitionsRequest)(nil), // 24: messaging_pb.AssignTopicPartitionsRequest - (*AssignTopicPartitionsResponse)(nil), // 25: messaging_pb.AssignTopicPartitionsResponse - (*SubscriberToSubCoordinatorRequest)(nil), // 26: messaging_pb.SubscriberToSubCoordinatorRequest - (*SubscriberToSubCoordinatorResponse)(nil), // 27: messaging_pb.SubscriberToSubCoordinatorResponse - (*ControlMessage)(nil), // 28: messaging_pb.ControlMessage - (*DataMessage)(nil), // 29: messaging_pb.DataMessage - (*PublishMessageRequest)(nil), // 30: messaging_pb.PublishMessageRequest - (*PublishMessageResponse)(nil), // 31: messaging_pb.PublishMessageResponse - (*PublishFollowMeRequest)(nil), // 32: messaging_pb.PublishFollowMeRequest - (*PublishFollowMeResponse)(nil), // 33: messaging_pb.PublishFollowMeResponse - (*SubscribeMessageRequest)(nil), // 34: messaging_pb.SubscribeMessageRequest - (*SubscribeMessageResponse)(nil), // 35: messaging_pb.SubscribeMessageResponse - (*SubscribeFollowMeRequest)(nil), // 36: messaging_pb.SubscribeFollowMeRequest - (*SubscribeFollowMeResponse)(nil), // 37: messaging_pb.SubscribeFollowMeResponse - (*ClosePublishersRequest)(nil), // 38: messaging_pb.ClosePublishersRequest - (*ClosePublishersResponse)(nil), // 39: messaging_pb.ClosePublishersResponse - (*CloseSubscribersRequest)(nil), // 40: messaging_pb.CloseSubscribersRequest - (*CloseSubscribersResponse)(nil), // 41: messaging_pb.CloseSubscribersResponse - (*GetUnflushedMessagesRequest)(nil), // 42: messaging_pb.GetUnflushedMessagesRequest - (*GetUnflushedMessagesResponse)(nil), // 43: messaging_pb.GetUnflushedMessagesResponse - (*LogEntry)(nil), // 44: messaging_pb.LogEntry - nil, // 45: messaging_pb.BrokerStats.StatsEntry - (*PublisherToPubBalancerRequest_InitMessage)(nil), // 46: messaging_pb.PublisherToPubBalancerRequest.InitMessage - (*SubscriberToSubCoordinatorRequest_InitMessage)(nil), // 47: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage - (*SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage)(nil), // 48: messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage - (*SubscriberToSubCoordinatorRequest_AckAssignmentMessage)(nil), // 49: messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage - (*SubscriberToSubCoordinatorResponse_Assignment)(nil), // 50: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment - (*SubscriberToSubCoordinatorResponse_UnAssignment)(nil), // 51: messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment - (*PublishMessageRequest_InitMessage)(nil), // 52: messaging_pb.PublishMessageRequest.InitMessage - (*PublishFollowMeRequest_InitMessage)(nil), // 53: messaging_pb.PublishFollowMeRequest.InitMessage - (*PublishFollowMeRequest_FlushMessage)(nil), // 54: messaging_pb.PublishFollowMeRequest.FlushMessage - (*PublishFollowMeRequest_CloseMessage)(nil), // 55: messaging_pb.PublishFollowMeRequest.CloseMessage - (*SubscribeMessageRequest_InitMessage)(nil), // 56: messaging_pb.SubscribeMessageRequest.InitMessage - (*SubscribeMessageRequest_AckMessage)(nil), // 57: messaging_pb.SubscribeMessageRequest.AckMessage - (*SubscribeMessageResponse_SubscribeCtrlMessage)(nil), // 58: messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage - (*SubscribeFollowMeRequest_InitMessage)(nil), // 59: messaging_pb.SubscribeFollowMeRequest.InitMessage - (*SubscribeFollowMeRequest_AckMessage)(nil), // 60: messaging_pb.SubscribeFollowMeRequest.AckMessage - (*SubscribeFollowMeRequest_CloseMessage)(nil), // 61: messaging_pb.SubscribeFollowMeRequest.CloseMessage - (*schema_pb.Topic)(nil), // 62: schema_pb.Topic - (*schema_pb.Partition)(nil), // 63: schema_pb.Partition - (*schema_pb.RecordType)(nil), // 64: schema_pb.RecordType - (*schema_pb.PartitionOffset)(nil), // 65: schema_pb.PartitionOffset - (schema_pb.OffsetType)(0), // 66: schema_pb.OffsetType + (*TopicExistsRequest)(nil), // 13: messaging_pb.TopicExistsRequest + (*TopicExistsResponse)(nil), // 14: messaging_pb.TopicExistsResponse + (*LookupTopicBrokersRequest)(nil), // 15: messaging_pb.LookupTopicBrokersRequest + (*LookupTopicBrokersResponse)(nil), // 16: messaging_pb.LookupTopicBrokersResponse + (*BrokerPartitionAssignment)(nil), // 17: messaging_pb.BrokerPartitionAssignment + (*GetTopicConfigurationRequest)(nil), // 18: messaging_pb.GetTopicConfigurationRequest + (*GetTopicConfigurationResponse)(nil), // 19: messaging_pb.GetTopicConfigurationResponse + (*GetTopicPublishersRequest)(nil), // 20: messaging_pb.GetTopicPublishersRequest + (*GetTopicPublishersResponse)(nil), // 21: messaging_pb.GetTopicPublishersResponse + (*GetTopicSubscribersRequest)(nil), // 22: messaging_pb.GetTopicSubscribersRequest + (*GetTopicSubscribersResponse)(nil), // 23: messaging_pb.GetTopicSubscribersResponse + (*TopicPublisher)(nil), // 24: messaging_pb.TopicPublisher + (*TopicSubscriber)(nil), // 25: messaging_pb.TopicSubscriber + (*AssignTopicPartitionsRequest)(nil), // 26: messaging_pb.AssignTopicPartitionsRequest + (*AssignTopicPartitionsResponse)(nil), // 27: messaging_pb.AssignTopicPartitionsResponse + (*SubscriberToSubCoordinatorRequest)(nil), // 28: messaging_pb.SubscriberToSubCoordinatorRequest + (*SubscriberToSubCoordinatorResponse)(nil), // 29: messaging_pb.SubscriberToSubCoordinatorResponse + (*ControlMessage)(nil), // 30: messaging_pb.ControlMessage + (*DataMessage)(nil), // 31: messaging_pb.DataMessage + (*PublishMessageRequest)(nil), // 32: messaging_pb.PublishMessageRequest + (*PublishMessageResponse)(nil), // 33: messaging_pb.PublishMessageResponse + (*PublishFollowMeRequest)(nil), // 34: messaging_pb.PublishFollowMeRequest + (*PublishFollowMeResponse)(nil), // 35: messaging_pb.PublishFollowMeResponse + (*SubscribeMessageRequest)(nil), // 36: messaging_pb.SubscribeMessageRequest + (*SubscribeMessageResponse)(nil), // 37: messaging_pb.SubscribeMessageResponse + (*SubscribeFollowMeRequest)(nil), // 38: messaging_pb.SubscribeFollowMeRequest + (*SubscribeFollowMeResponse)(nil), // 39: messaging_pb.SubscribeFollowMeResponse + (*ClosePublishersRequest)(nil), // 40: messaging_pb.ClosePublishersRequest + (*ClosePublishersResponse)(nil), // 41: messaging_pb.ClosePublishersResponse + (*CloseSubscribersRequest)(nil), // 42: messaging_pb.CloseSubscribersRequest + (*CloseSubscribersResponse)(nil), // 43: messaging_pb.CloseSubscribersResponse + (*GetUnflushedMessagesRequest)(nil), // 44: messaging_pb.GetUnflushedMessagesRequest + (*GetUnflushedMessagesResponse)(nil), // 45: messaging_pb.GetUnflushedMessagesResponse + (*GetPartitionRangeInfoRequest)(nil), // 46: messaging_pb.GetPartitionRangeInfoRequest + (*GetPartitionRangeInfoResponse)(nil), // 47: messaging_pb.GetPartitionRangeInfoResponse + (*OffsetRangeInfo)(nil), // 48: messaging_pb.OffsetRangeInfo + (*TimestampRangeInfo)(nil), // 49: messaging_pb.TimestampRangeInfo + nil, // 50: messaging_pb.BrokerStats.StatsEntry + (*PublisherToPubBalancerRequest_InitMessage)(nil), // 51: messaging_pb.PublisherToPubBalancerRequest.InitMessage + (*SubscriberToSubCoordinatorRequest_InitMessage)(nil), // 52: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage + (*SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage)(nil), // 53: messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage + (*SubscriberToSubCoordinatorRequest_AckAssignmentMessage)(nil), // 54: messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage + (*SubscriberToSubCoordinatorResponse_Assignment)(nil), // 55: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment + (*SubscriberToSubCoordinatorResponse_UnAssignment)(nil), // 56: messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment + (*PublishMessageRequest_InitMessage)(nil), // 57: messaging_pb.PublishMessageRequest.InitMessage + (*PublishFollowMeRequest_InitMessage)(nil), // 58: messaging_pb.PublishFollowMeRequest.InitMessage + (*PublishFollowMeRequest_FlushMessage)(nil), // 59: messaging_pb.PublishFollowMeRequest.FlushMessage + (*PublishFollowMeRequest_CloseMessage)(nil), // 60: messaging_pb.PublishFollowMeRequest.CloseMessage + (*SubscribeMessageRequest_InitMessage)(nil), // 61: messaging_pb.SubscribeMessageRequest.InitMessage + (*SubscribeMessageRequest_AckMessage)(nil), // 62: messaging_pb.SubscribeMessageRequest.AckMessage + (*SubscribeMessageResponse_SubscribeCtrlMessage)(nil), // 63: messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage + (*SubscribeFollowMeRequest_InitMessage)(nil), // 64: messaging_pb.SubscribeFollowMeRequest.InitMessage + (*SubscribeFollowMeRequest_AckMessage)(nil), // 65: messaging_pb.SubscribeFollowMeRequest.AckMessage + (*SubscribeFollowMeRequest_CloseMessage)(nil), // 66: messaging_pb.SubscribeFollowMeRequest.CloseMessage + (*schema_pb.Topic)(nil), // 67: schema_pb.Topic + (*schema_pb.Partition)(nil), // 68: schema_pb.Partition + (*schema_pb.RecordType)(nil), // 69: schema_pb.RecordType + (*filer_pb.LogEntry)(nil), // 70: filer_pb.LogEntry + (*schema_pb.PartitionOffset)(nil), // 71: schema_pb.PartitionOffset + (schema_pb.OffsetType)(0), // 72: schema_pb.OffsetType } var file_mq_broker_proto_depIdxs = []int32{ - 45, // 0: messaging_pb.BrokerStats.stats:type_name -> messaging_pb.BrokerStats.StatsEntry - 62, // 1: messaging_pb.TopicPartitionStats.topic:type_name -> schema_pb.Topic - 63, // 2: messaging_pb.TopicPartitionStats.partition:type_name -> schema_pb.Partition - 46, // 3: messaging_pb.PublisherToPubBalancerRequest.init:type_name -> messaging_pb.PublisherToPubBalancerRequest.InitMessage + 50, // 0: messaging_pb.BrokerStats.stats:type_name -> messaging_pb.BrokerStats.StatsEntry + 67, // 1: messaging_pb.TopicPartitionStats.topic:type_name -> schema_pb.Topic + 68, // 2: messaging_pb.TopicPartitionStats.partition:type_name -> schema_pb.Partition + 51, // 3: messaging_pb.PublisherToPubBalancerRequest.init:type_name -> messaging_pb.PublisherToPubBalancerRequest.InitMessage 2, // 4: messaging_pb.PublisherToPubBalancerRequest.stats:type_name -> messaging_pb.BrokerStats - 62, // 5: messaging_pb.ConfigureTopicRequest.topic:type_name -> schema_pb.Topic - 64, // 6: messaging_pb.ConfigureTopicRequest.record_type:type_name -> schema_pb.RecordType - 8, // 7: messaging_pb.ConfigureTopicRequest.retention:type_name -> messaging_pb.TopicRetention - 15, // 8: messaging_pb.ConfigureTopicResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment - 64, // 9: messaging_pb.ConfigureTopicResponse.record_type:type_name -> schema_pb.RecordType - 8, // 10: messaging_pb.ConfigureTopicResponse.retention:type_name -> messaging_pb.TopicRetention - 62, // 11: messaging_pb.ListTopicsResponse.topics:type_name -> schema_pb.Topic - 62, // 12: messaging_pb.LookupTopicBrokersRequest.topic:type_name -> schema_pb.Topic - 62, // 13: messaging_pb.LookupTopicBrokersResponse.topic:type_name -> schema_pb.Topic - 15, // 14: messaging_pb.LookupTopicBrokersResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment - 63, // 15: messaging_pb.BrokerPartitionAssignment.partition:type_name -> schema_pb.Partition - 62, // 16: messaging_pb.GetTopicConfigurationRequest.topic:type_name -> schema_pb.Topic - 62, // 17: messaging_pb.GetTopicConfigurationResponse.topic:type_name -> schema_pb.Topic - 64, // 18: messaging_pb.GetTopicConfigurationResponse.record_type:type_name -> schema_pb.RecordType - 15, // 19: messaging_pb.GetTopicConfigurationResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment + 67, // 5: messaging_pb.ConfigureTopicRequest.topic:type_name -> schema_pb.Topic + 8, // 6: messaging_pb.ConfigureTopicRequest.retention:type_name -> messaging_pb.TopicRetention + 69, // 7: messaging_pb.ConfigureTopicRequest.message_record_type:type_name -> schema_pb.RecordType + 17, // 8: messaging_pb.ConfigureTopicResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment + 8, // 9: messaging_pb.ConfigureTopicResponse.retention:type_name -> messaging_pb.TopicRetention + 69, // 10: messaging_pb.ConfigureTopicResponse.message_record_type:type_name -> schema_pb.RecordType + 67, // 11: messaging_pb.ListTopicsResponse.topics:type_name -> schema_pb.Topic + 67, // 12: messaging_pb.TopicExistsRequest.topic:type_name -> schema_pb.Topic + 67, // 13: messaging_pb.LookupTopicBrokersRequest.topic:type_name -> schema_pb.Topic + 67, // 14: messaging_pb.LookupTopicBrokersResponse.topic:type_name -> schema_pb.Topic + 17, // 15: messaging_pb.LookupTopicBrokersResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment + 68, // 16: messaging_pb.BrokerPartitionAssignment.partition:type_name -> schema_pb.Partition + 67, // 17: messaging_pb.GetTopicConfigurationRequest.topic:type_name -> schema_pb.Topic + 67, // 18: messaging_pb.GetTopicConfigurationResponse.topic:type_name -> schema_pb.Topic + 17, // 19: messaging_pb.GetTopicConfigurationResponse.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment 8, // 20: messaging_pb.GetTopicConfigurationResponse.retention:type_name -> messaging_pb.TopicRetention - 62, // 21: messaging_pb.GetTopicPublishersRequest.topic:type_name -> schema_pb.Topic - 22, // 22: messaging_pb.GetTopicPublishersResponse.publishers:type_name -> messaging_pb.TopicPublisher - 62, // 23: messaging_pb.GetTopicSubscribersRequest.topic:type_name -> schema_pb.Topic - 23, // 24: messaging_pb.GetTopicSubscribersResponse.subscribers:type_name -> messaging_pb.TopicSubscriber - 63, // 25: messaging_pb.TopicPublisher.partition:type_name -> schema_pb.Partition - 63, // 26: messaging_pb.TopicSubscriber.partition:type_name -> schema_pb.Partition - 62, // 27: messaging_pb.AssignTopicPartitionsRequest.topic:type_name -> schema_pb.Topic - 15, // 28: messaging_pb.AssignTopicPartitionsRequest.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment - 47, // 29: messaging_pb.SubscriberToSubCoordinatorRequest.init:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage - 49, // 30: messaging_pb.SubscriberToSubCoordinatorRequest.ack_assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage - 48, // 31: messaging_pb.SubscriberToSubCoordinatorRequest.ack_un_assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage - 50, // 32: messaging_pb.SubscriberToSubCoordinatorResponse.assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.Assignment - 51, // 33: messaging_pb.SubscriberToSubCoordinatorResponse.un_assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment - 28, // 34: messaging_pb.DataMessage.ctrl:type_name -> messaging_pb.ControlMessage - 52, // 35: messaging_pb.PublishMessageRequest.init:type_name -> messaging_pb.PublishMessageRequest.InitMessage - 29, // 36: messaging_pb.PublishMessageRequest.data:type_name -> messaging_pb.DataMessage - 53, // 37: messaging_pb.PublishFollowMeRequest.init:type_name -> messaging_pb.PublishFollowMeRequest.InitMessage - 29, // 38: messaging_pb.PublishFollowMeRequest.data:type_name -> messaging_pb.DataMessage - 54, // 39: messaging_pb.PublishFollowMeRequest.flush:type_name -> messaging_pb.PublishFollowMeRequest.FlushMessage - 55, // 40: messaging_pb.PublishFollowMeRequest.close:type_name -> messaging_pb.PublishFollowMeRequest.CloseMessage - 56, // 41: messaging_pb.SubscribeMessageRequest.init:type_name -> messaging_pb.SubscribeMessageRequest.InitMessage - 57, // 42: messaging_pb.SubscribeMessageRequest.ack:type_name -> messaging_pb.SubscribeMessageRequest.AckMessage - 58, // 43: messaging_pb.SubscribeMessageResponse.ctrl:type_name -> messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage - 29, // 44: messaging_pb.SubscribeMessageResponse.data:type_name -> messaging_pb.DataMessage - 59, // 45: messaging_pb.SubscribeFollowMeRequest.init:type_name -> messaging_pb.SubscribeFollowMeRequest.InitMessage - 60, // 46: messaging_pb.SubscribeFollowMeRequest.ack:type_name -> messaging_pb.SubscribeFollowMeRequest.AckMessage - 61, // 47: messaging_pb.SubscribeFollowMeRequest.close:type_name -> messaging_pb.SubscribeFollowMeRequest.CloseMessage - 62, // 48: messaging_pb.ClosePublishersRequest.topic:type_name -> schema_pb.Topic - 62, // 49: messaging_pb.CloseSubscribersRequest.topic:type_name -> schema_pb.Topic - 62, // 50: messaging_pb.GetUnflushedMessagesRequest.topic:type_name -> schema_pb.Topic - 63, // 51: messaging_pb.GetUnflushedMessagesRequest.partition:type_name -> schema_pb.Partition - 44, // 52: messaging_pb.GetUnflushedMessagesResponse.message:type_name -> messaging_pb.LogEntry - 3, // 53: messaging_pb.BrokerStats.StatsEntry.value:type_name -> messaging_pb.TopicPartitionStats - 62, // 54: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage.topic:type_name -> schema_pb.Topic - 63, // 55: messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage.partition:type_name -> schema_pb.Partition - 63, // 56: messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage.partition:type_name -> schema_pb.Partition - 15, // 57: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment.partition_assignment:type_name -> messaging_pb.BrokerPartitionAssignment - 63, // 58: messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment.partition:type_name -> schema_pb.Partition - 62, // 59: messaging_pb.PublishMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic - 63, // 60: messaging_pb.PublishMessageRequest.InitMessage.partition:type_name -> schema_pb.Partition - 62, // 61: messaging_pb.PublishFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic - 63, // 62: messaging_pb.PublishFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition - 62, // 63: messaging_pb.SubscribeMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic - 65, // 64: messaging_pb.SubscribeMessageRequest.InitMessage.partition_offset:type_name -> schema_pb.PartitionOffset - 66, // 65: messaging_pb.SubscribeMessageRequest.InitMessage.offset_type:type_name -> schema_pb.OffsetType - 62, // 66: messaging_pb.SubscribeFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic - 63, // 67: messaging_pb.SubscribeFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition - 0, // 68: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest - 4, // 69: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:input_type -> messaging_pb.PublisherToPubBalancerRequest - 6, // 70: messaging_pb.SeaweedMessaging.BalanceTopics:input_type -> messaging_pb.BalanceTopicsRequest - 11, // 71: messaging_pb.SeaweedMessaging.ListTopics:input_type -> messaging_pb.ListTopicsRequest - 9, // 72: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest - 13, // 73: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest - 16, // 74: messaging_pb.SeaweedMessaging.GetTopicConfiguration:input_type -> messaging_pb.GetTopicConfigurationRequest - 18, // 75: messaging_pb.SeaweedMessaging.GetTopicPublishers:input_type -> messaging_pb.GetTopicPublishersRequest - 20, // 76: messaging_pb.SeaweedMessaging.GetTopicSubscribers:input_type -> messaging_pb.GetTopicSubscribersRequest - 24, // 77: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest - 38, // 78: messaging_pb.SeaweedMessaging.ClosePublishers:input_type -> messaging_pb.ClosePublishersRequest - 40, // 79: messaging_pb.SeaweedMessaging.CloseSubscribers:input_type -> messaging_pb.CloseSubscribersRequest - 26, // 80: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:input_type -> messaging_pb.SubscriberToSubCoordinatorRequest - 30, // 81: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishMessageRequest - 34, // 82: messaging_pb.SeaweedMessaging.SubscribeMessage:input_type -> messaging_pb.SubscribeMessageRequest - 32, // 83: messaging_pb.SeaweedMessaging.PublishFollowMe:input_type -> messaging_pb.PublishFollowMeRequest - 36, // 84: messaging_pb.SeaweedMessaging.SubscribeFollowMe:input_type -> messaging_pb.SubscribeFollowMeRequest - 42, // 85: messaging_pb.SeaweedMessaging.GetUnflushedMessages:input_type -> messaging_pb.GetUnflushedMessagesRequest - 1, // 86: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse - 5, // 87: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:output_type -> messaging_pb.PublisherToPubBalancerResponse - 7, // 88: messaging_pb.SeaweedMessaging.BalanceTopics:output_type -> messaging_pb.BalanceTopicsResponse - 12, // 89: messaging_pb.SeaweedMessaging.ListTopics:output_type -> messaging_pb.ListTopicsResponse - 10, // 90: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse - 14, // 91: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse - 17, // 92: messaging_pb.SeaweedMessaging.GetTopicConfiguration:output_type -> messaging_pb.GetTopicConfigurationResponse - 19, // 93: messaging_pb.SeaweedMessaging.GetTopicPublishers:output_type -> messaging_pb.GetTopicPublishersResponse - 21, // 94: messaging_pb.SeaweedMessaging.GetTopicSubscribers:output_type -> messaging_pb.GetTopicSubscribersResponse - 25, // 95: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse - 39, // 96: messaging_pb.SeaweedMessaging.ClosePublishers:output_type -> messaging_pb.ClosePublishersResponse - 41, // 97: messaging_pb.SeaweedMessaging.CloseSubscribers:output_type -> messaging_pb.CloseSubscribersResponse - 27, // 98: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:output_type -> messaging_pb.SubscriberToSubCoordinatorResponse - 31, // 99: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishMessageResponse - 35, // 100: messaging_pb.SeaweedMessaging.SubscribeMessage:output_type -> messaging_pb.SubscribeMessageResponse - 33, // 101: messaging_pb.SeaweedMessaging.PublishFollowMe:output_type -> messaging_pb.PublishFollowMeResponse - 37, // 102: messaging_pb.SeaweedMessaging.SubscribeFollowMe:output_type -> messaging_pb.SubscribeFollowMeResponse - 43, // 103: messaging_pb.SeaweedMessaging.GetUnflushedMessages:output_type -> messaging_pb.GetUnflushedMessagesResponse - 86, // [86:104] is the sub-list for method output_type - 68, // [68:86] is the sub-list for method input_type - 68, // [68:68] is the sub-list for extension type_name - 68, // [68:68] is the sub-list for extension extendee - 0, // [0:68] is the sub-list for field type_name + 69, // 21: messaging_pb.GetTopicConfigurationResponse.message_record_type:type_name -> schema_pb.RecordType + 67, // 22: messaging_pb.GetTopicPublishersRequest.topic:type_name -> schema_pb.Topic + 24, // 23: messaging_pb.GetTopicPublishersResponse.publishers:type_name -> messaging_pb.TopicPublisher + 67, // 24: messaging_pb.GetTopicSubscribersRequest.topic:type_name -> schema_pb.Topic + 25, // 25: messaging_pb.GetTopicSubscribersResponse.subscribers:type_name -> messaging_pb.TopicSubscriber + 68, // 26: messaging_pb.TopicPublisher.partition:type_name -> schema_pb.Partition + 68, // 27: messaging_pb.TopicSubscriber.partition:type_name -> schema_pb.Partition + 67, // 28: messaging_pb.AssignTopicPartitionsRequest.topic:type_name -> schema_pb.Topic + 17, // 29: messaging_pb.AssignTopicPartitionsRequest.broker_partition_assignments:type_name -> messaging_pb.BrokerPartitionAssignment + 52, // 30: messaging_pb.SubscriberToSubCoordinatorRequest.init:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage + 54, // 31: messaging_pb.SubscriberToSubCoordinatorRequest.ack_assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage + 53, // 32: messaging_pb.SubscriberToSubCoordinatorRequest.ack_un_assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage + 55, // 33: messaging_pb.SubscriberToSubCoordinatorResponse.assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.Assignment + 56, // 34: messaging_pb.SubscriberToSubCoordinatorResponse.un_assignment:type_name -> messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment + 30, // 35: messaging_pb.DataMessage.ctrl:type_name -> messaging_pb.ControlMessage + 57, // 36: messaging_pb.PublishMessageRequest.init:type_name -> messaging_pb.PublishMessageRequest.InitMessage + 31, // 37: messaging_pb.PublishMessageRequest.data:type_name -> messaging_pb.DataMessage + 58, // 38: messaging_pb.PublishFollowMeRequest.init:type_name -> messaging_pb.PublishFollowMeRequest.InitMessage + 31, // 39: messaging_pb.PublishFollowMeRequest.data:type_name -> messaging_pb.DataMessage + 59, // 40: messaging_pb.PublishFollowMeRequest.flush:type_name -> messaging_pb.PublishFollowMeRequest.FlushMessage + 60, // 41: messaging_pb.PublishFollowMeRequest.close:type_name -> messaging_pb.PublishFollowMeRequest.CloseMessage + 61, // 42: messaging_pb.SubscribeMessageRequest.init:type_name -> messaging_pb.SubscribeMessageRequest.InitMessage + 62, // 43: messaging_pb.SubscribeMessageRequest.ack:type_name -> messaging_pb.SubscribeMessageRequest.AckMessage + 63, // 44: messaging_pb.SubscribeMessageResponse.ctrl:type_name -> messaging_pb.SubscribeMessageResponse.SubscribeCtrlMessage + 31, // 45: messaging_pb.SubscribeMessageResponse.data:type_name -> messaging_pb.DataMessage + 64, // 46: messaging_pb.SubscribeFollowMeRequest.init:type_name -> messaging_pb.SubscribeFollowMeRequest.InitMessage + 65, // 47: messaging_pb.SubscribeFollowMeRequest.ack:type_name -> messaging_pb.SubscribeFollowMeRequest.AckMessage + 66, // 48: messaging_pb.SubscribeFollowMeRequest.close:type_name -> messaging_pb.SubscribeFollowMeRequest.CloseMessage + 67, // 49: messaging_pb.ClosePublishersRequest.topic:type_name -> schema_pb.Topic + 67, // 50: messaging_pb.CloseSubscribersRequest.topic:type_name -> schema_pb.Topic + 67, // 51: messaging_pb.GetUnflushedMessagesRequest.topic:type_name -> schema_pb.Topic + 68, // 52: messaging_pb.GetUnflushedMessagesRequest.partition:type_name -> schema_pb.Partition + 70, // 53: messaging_pb.GetUnflushedMessagesResponse.message:type_name -> filer_pb.LogEntry + 67, // 54: messaging_pb.GetPartitionRangeInfoRequest.topic:type_name -> schema_pb.Topic + 68, // 55: messaging_pb.GetPartitionRangeInfoRequest.partition:type_name -> schema_pb.Partition + 48, // 56: messaging_pb.GetPartitionRangeInfoResponse.offset_range:type_name -> messaging_pb.OffsetRangeInfo + 49, // 57: messaging_pb.GetPartitionRangeInfoResponse.timestamp_range:type_name -> messaging_pb.TimestampRangeInfo + 3, // 58: messaging_pb.BrokerStats.StatsEntry.value:type_name -> messaging_pb.TopicPartitionStats + 67, // 59: messaging_pb.SubscriberToSubCoordinatorRequest.InitMessage.topic:type_name -> schema_pb.Topic + 68, // 60: messaging_pb.SubscriberToSubCoordinatorRequest.AckUnAssignmentMessage.partition:type_name -> schema_pb.Partition + 68, // 61: messaging_pb.SubscriberToSubCoordinatorRequest.AckAssignmentMessage.partition:type_name -> schema_pb.Partition + 17, // 62: messaging_pb.SubscriberToSubCoordinatorResponse.Assignment.partition_assignment:type_name -> messaging_pb.BrokerPartitionAssignment + 68, // 63: messaging_pb.SubscriberToSubCoordinatorResponse.UnAssignment.partition:type_name -> schema_pb.Partition + 67, // 64: messaging_pb.PublishMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic + 68, // 65: messaging_pb.PublishMessageRequest.InitMessage.partition:type_name -> schema_pb.Partition + 67, // 66: messaging_pb.PublishFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic + 68, // 67: messaging_pb.PublishFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition + 67, // 68: messaging_pb.SubscribeMessageRequest.InitMessage.topic:type_name -> schema_pb.Topic + 71, // 69: messaging_pb.SubscribeMessageRequest.InitMessage.partition_offset:type_name -> schema_pb.PartitionOffset + 72, // 70: messaging_pb.SubscribeMessageRequest.InitMessage.offset_type:type_name -> schema_pb.OffsetType + 67, // 71: messaging_pb.SubscribeFollowMeRequest.InitMessage.topic:type_name -> schema_pb.Topic + 68, // 72: messaging_pb.SubscribeFollowMeRequest.InitMessage.partition:type_name -> schema_pb.Partition + 0, // 73: messaging_pb.SeaweedMessaging.FindBrokerLeader:input_type -> messaging_pb.FindBrokerLeaderRequest + 4, // 74: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:input_type -> messaging_pb.PublisherToPubBalancerRequest + 6, // 75: messaging_pb.SeaweedMessaging.BalanceTopics:input_type -> messaging_pb.BalanceTopicsRequest + 11, // 76: messaging_pb.SeaweedMessaging.ListTopics:input_type -> messaging_pb.ListTopicsRequest + 13, // 77: messaging_pb.SeaweedMessaging.TopicExists:input_type -> messaging_pb.TopicExistsRequest + 9, // 78: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest + 15, // 79: messaging_pb.SeaweedMessaging.LookupTopicBrokers:input_type -> messaging_pb.LookupTopicBrokersRequest + 18, // 80: messaging_pb.SeaweedMessaging.GetTopicConfiguration:input_type -> messaging_pb.GetTopicConfigurationRequest + 20, // 81: messaging_pb.SeaweedMessaging.GetTopicPublishers:input_type -> messaging_pb.GetTopicPublishersRequest + 22, // 82: messaging_pb.SeaweedMessaging.GetTopicSubscribers:input_type -> messaging_pb.GetTopicSubscribersRequest + 26, // 83: messaging_pb.SeaweedMessaging.AssignTopicPartitions:input_type -> messaging_pb.AssignTopicPartitionsRequest + 40, // 84: messaging_pb.SeaweedMessaging.ClosePublishers:input_type -> messaging_pb.ClosePublishersRequest + 42, // 85: messaging_pb.SeaweedMessaging.CloseSubscribers:input_type -> messaging_pb.CloseSubscribersRequest + 28, // 86: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:input_type -> messaging_pb.SubscriberToSubCoordinatorRequest + 32, // 87: messaging_pb.SeaweedMessaging.PublishMessage:input_type -> messaging_pb.PublishMessageRequest + 36, // 88: messaging_pb.SeaweedMessaging.SubscribeMessage:input_type -> messaging_pb.SubscribeMessageRequest + 34, // 89: messaging_pb.SeaweedMessaging.PublishFollowMe:input_type -> messaging_pb.PublishFollowMeRequest + 38, // 90: messaging_pb.SeaweedMessaging.SubscribeFollowMe:input_type -> messaging_pb.SubscribeFollowMeRequest + 44, // 91: messaging_pb.SeaweedMessaging.GetUnflushedMessages:input_type -> messaging_pb.GetUnflushedMessagesRequest + 46, // 92: messaging_pb.SeaweedMessaging.GetPartitionRangeInfo:input_type -> messaging_pb.GetPartitionRangeInfoRequest + 1, // 93: messaging_pb.SeaweedMessaging.FindBrokerLeader:output_type -> messaging_pb.FindBrokerLeaderResponse + 5, // 94: messaging_pb.SeaweedMessaging.PublisherToPubBalancer:output_type -> messaging_pb.PublisherToPubBalancerResponse + 7, // 95: messaging_pb.SeaweedMessaging.BalanceTopics:output_type -> messaging_pb.BalanceTopicsResponse + 12, // 96: messaging_pb.SeaweedMessaging.ListTopics:output_type -> messaging_pb.ListTopicsResponse + 14, // 97: messaging_pb.SeaweedMessaging.TopicExists:output_type -> messaging_pb.TopicExistsResponse + 10, // 98: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse + 16, // 99: messaging_pb.SeaweedMessaging.LookupTopicBrokers:output_type -> messaging_pb.LookupTopicBrokersResponse + 19, // 100: messaging_pb.SeaweedMessaging.GetTopicConfiguration:output_type -> messaging_pb.GetTopicConfigurationResponse + 21, // 101: messaging_pb.SeaweedMessaging.GetTopicPublishers:output_type -> messaging_pb.GetTopicPublishersResponse + 23, // 102: messaging_pb.SeaweedMessaging.GetTopicSubscribers:output_type -> messaging_pb.GetTopicSubscribersResponse + 27, // 103: messaging_pb.SeaweedMessaging.AssignTopicPartitions:output_type -> messaging_pb.AssignTopicPartitionsResponse + 41, // 104: messaging_pb.SeaweedMessaging.ClosePublishers:output_type -> messaging_pb.ClosePublishersResponse + 43, // 105: messaging_pb.SeaweedMessaging.CloseSubscribers:output_type -> messaging_pb.CloseSubscribersResponse + 29, // 106: messaging_pb.SeaweedMessaging.SubscriberToSubCoordinator:output_type -> messaging_pb.SubscriberToSubCoordinatorResponse + 33, // 107: messaging_pb.SeaweedMessaging.PublishMessage:output_type -> messaging_pb.PublishMessageResponse + 37, // 108: messaging_pb.SeaweedMessaging.SubscribeMessage:output_type -> messaging_pb.SubscribeMessageResponse + 35, // 109: messaging_pb.SeaweedMessaging.PublishFollowMe:output_type -> messaging_pb.PublishFollowMeResponse + 39, // 110: messaging_pb.SeaweedMessaging.SubscribeFollowMe:output_type -> messaging_pb.SubscribeFollowMeResponse + 45, // 111: messaging_pb.SeaweedMessaging.GetUnflushedMessages:output_type -> messaging_pb.GetUnflushedMessagesResponse + 47, // 112: messaging_pb.SeaweedMessaging.GetPartitionRangeInfo:output_type -> messaging_pb.GetPartitionRangeInfoResponse + 93, // [93:113] is the sub-list for method output_type + 73, // [73:93] is the sub-list for method input_type + 73, // [73:73] is the sub-list for extension type_name + 73, // [73:73] is the sub-list for extension extendee + 0, // [0:73] is the sub-list for field type_name } func init() { file_mq_broker_proto_init() } @@ -4096,34 +4466,34 @@ func file_mq_broker_proto_init() { (*PublisherToPubBalancerRequest_Init)(nil), (*PublisherToPubBalancerRequest_Stats)(nil), } - file_mq_broker_proto_msgTypes[26].OneofWrappers = []any{ + file_mq_broker_proto_msgTypes[28].OneofWrappers = []any{ (*SubscriberToSubCoordinatorRequest_Init)(nil), (*SubscriberToSubCoordinatorRequest_AckAssignment)(nil), (*SubscriberToSubCoordinatorRequest_AckUnAssignment)(nil), } - file_mq_broker_proto_msgTypes[27].OneofWrappers = []any{ + file_mq_broker_proto_msgTypes[29].OneofWrappers = []any{ (*SubscriberToSubCoordinatorResponse_Assignment_)(nil), (*SubscriberToSubCoordinatorResponse_UnAssignment_)(nil), } - file_mq_broker_proto_msgTypes[30].OneofWrappers = []any{ + file_mq_broker_proto_msgTypes[32].OneofWrappers = []any{ (*PublishMessageRequest_Init)(nil), (*PublishMessageRequest_Data)(nil), } - file_mq_broker_proto_msgTypes[32].OneofWrappers = []any{ + file_mq_broker_proto_msgTypes[34].OneofWrappers = []any{ (*PublishFollowMeRequest_Init)(nil), (*PublishFollowMeRequest_Data)(nil), (*PublishFollowMeRequest_Flush)(nil), (*PublishFollowMeRequest_Close)(nil), } - file_mq_broker_proto_msgTypes[34].OneofWrappers = []any{ + file_mq_broker_proto_msgTypes[36].OneofWrappers = []any{ (*SubscribeMessageRequest_Init)(nil), (*SubscribeMessageRequest_Ack)(nil), } - file_mq_broker_proto_msgTypes[35].OneofWrappers = []any{ + file_mq_broker_proto_msgTypes[37].OneofWrappers = []any{ (*SubscribeMessageResponse_Ctrl)(nil), (*SubscribeMessageResponse_Data)(nil), } - file_mq_broker_proto_msgTypes[36].OneofWrappers = []any{ + file_mq_broker_proto_msgTypes[38].OneofWrappers = []any{ (*SubscribeFollowMeRequest_Init)(nil), (*SubscribeFollowMeRequest_Ack)(nil), (*SubscribeFollowMeRequest_Close)(nil), @@ -4134,7 +4504,7 @@ func file_mq_broker_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_mq_broker_proto_rawDesc), len(file_mq_broker_proto_rawDesc)), NumEnums: 0, - NumMessages: 62, + NumMessages: 67, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/pb/mq_pb/mq_broker_grpc.pb.go b/weed/pb/mq_pb/mq_broker_grpc.pb.go index 3a6c6dc59..e8544b57f 100644 --- a/weed/pb/mq_pb/mq_broker_grpc.pb.go +++ b/weed/pb/mq_pb/mq_broker_grpc.pb.go @@ -23,6 +23,7 @@ const ( SeaweedMessaging_PublisherToPubBalancer_FullMethodName = "/messaging_pb.SeaweedMessaging/PublisherToPubBalancer" SeaweedMessaging_BalanceTopics_FullMethodName = "/messaging_pb.SeaweedMessaging/BalanceTopics" SeaweedMessaging_ListTopics_FullMethodName = "/messaging_pb.SeaweedMessaging/ListTopics" + SeaweedMessaging_TopicExists_FullMethodName = "/messaging_pb.SeaweedMessaging/TopicExists" SeaweedMessaging_ConfigureTopic_FullMethodName = "/messaging_pb.SeaweedMessaging/ConfigureTopic" SeaweedMessaging_LookupTopicBrokers_FullMethodName = "/messaging_pb.SeaweedMessaging/LookupTopicBrokers" SeaweedMessaging_GetTopicConfiguration_FullMethodName = "/messaging_pb.SeaweedMessaging/GetTopicConfiguration" @@ -37,6 +38,7 @@ const ( SeaweedMessaging_PublishFollowMe_FullMethodName = "/messaging_pb.SeaweedMessaging/PublishFollowMe" SeaweedMessaging_SubscribeFollowMe_FullMethodName = "/messaging_pb.SeaweedMessaging/SubscribeFollowMe" SeaweedMessaging_GetUnflushedMessages_FullMethodName = "/messaging_pb.SeaweedMessaging/GetUnflushedMessages" + SeaweedMessaging_GetPartitionRangeInfo_FullMethodName = "/messaging_pb.SeaweedMessaging/GetPartitionRangeInfo" ) // SeaweedMessagingClient is the client API for SeaweedMessaging service. @@ -50,6 +52,7 @@ type SeaweedMessagingClient interface { BalanceTopics(ctx context.Context, in *BalanceTopicsRequest, opts ...grpc.CallOption) (*BalanceTopicsResponse, error) // control plane for topic partitions ListTopics(ctx context.Context, in *ListTopicsRequest, opts ...grpc.CallOption) (*ListTopicsResponse, error) + TopicExists(ctx context.Context, in *TopicExistsRequest, opts ...grpc.CallOption) (*TopicExistsResponse, error) ConfigureTopic(ctx context.Context, in *ConfigureTopicRequest, opts ...grpc.CallOption) (*ConfigureTopicResponse, error) LookupTopicBrokers(ctx context.Context, in *LookupTopicBrokersRequest, opts ...grpc.CallOption) (*LookupTopicBrokersResponse, error) GetTopicConfiguration(ctx context.Context, in *GetTopicConfigurationRequest, opts ...grpc.CallOption) (*GetTopicConfigurationResponse, error) @@ -69,6 +72,8 @@ type SeaweedMessagingClient interface { SubscribeFollowMe(ctx context.Context, opts ...grpc.CallOption) (grpc.ClientStreamingClient[SubscribeFollowMeRequest, SubscribeFollowMeResponse], error) // SQL query support - get unflushed messages from broker's in-memory buffer (streaming) GetUnflushedMessages(ctx context.Context, in *GetUnflushedMessagesRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[GetUnflushedMessagesResponse], error) + // Get comprehensive partition range information (offsets, timestamps, and other fields) + GetPartitionRangeInfo(ctx context.Context, in *GetPartitionRangeInfoRequest, opts ...grpc.CallOption) (*GetPartitionRangeInfoResponse, error) } type seaweedMessagingClient struct { @@ -122,6 +127,16 @@ func (c *seaweedMessagingClient) ListTopics(ctx context.Context, in *ListTopicsR return out, nil } +func (c *seaweedMessagingClient) TopicExists(ctx context.Context, in *TopicExistsRequest, opts ...grpc.CallOption) (*TopicExistsResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(TopicExistsResponse) + err := c.cc.Invoke(ctx, SeaweedMessaging_TopicExists_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *seaweedMessagingClient) ConfigureTopic(ctx context.Context, in *ConfigureTopicRequest, opts ...grpc.CallOption) (*ConfigureTopicResponse, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(ConfigureTopicResponse) @@ -286,6 +301,16 @@ func (c *seaweedMessagingClient) GetUnflushedMessages(ctx context.Context, in *G // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type SeaweedMessaging_GetUnflushedMessagesClient = grpc.ServerStreamingClient[GetUnflushedMessagesResponse] +func (c *seaweedMessagingClient) GetPartitionRangeInfo(ctx context.Context, in *GetPartitionRangeInfoRequest, opts ...grpc.CallOption) (*GetPartitionRangeInfoResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetPartitionRangeInfoResponse) + err := c.cc.Invoke(ctx, SeaweedMessaging_GetPartitionRangeInfo_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // SeaweedMessagingServer is the server API for SeaweedMessaging service. // All implementations must embed UnimplementedSeaweedMessagingServer // for forward compatibility. @@ -297,6 +322,7 @@ type SeaweedMessagingServer interface { BalanceTopics(context.Context, *BalanceTopicsRequest) (*BalanceTopicsResponse, error) // control plane for topic partitions ListTopics(context.Context, *ListTopicsRequest) (*ListTopicsResponse, error) + TopicExists(context.Context, *TopicExistsRequest) (*TopicExistsResponse, error) ConfigureTopic(context.Context, *ConfigureTopicRequest) (*ConfigureTopicResponse, error) LookupTopicBrokers(context.Context, *LookupTopicBrokersRequest) (*LookupTopicBrokersResponse, error) GetTopicConfiguration(context.Context, *GetTopicConfigurationRequest) (*GetTopicConfigurationResponse, error) @@ -316,6 +342,8 @@ type SeaweedMessagingServer interface { SubscribeFollowMe(grpc.ClientStreamingServer[SubscribeFollowMeRequest, SubscribeFollowMeResponse]) error // SQL query support - get unflushed messages from broker's in-memory buffer (streaming) GetUnflushedMessages(*GetUnflushedMessagesRequest, grpc.ServerStreamingServer[GetUnflushedMessagesResponse]) error + // Get comprehensive partition range information (offsets, timestamps, and other fields) + GetPartitionRangeInfo(context.Context, *GetPartitionRangeInfoRequest) (*GetPartitionRangeInfoResponse, error) mustEmbedUnimplementedSeaweedMessagingServer() } @@ -338,6 +366,9 @@ func (UnimplementedSeaweedMessagingServer) BalanceTopics(context.Context, *Balan func (UnimplementedSeaweedMessagingServer) ListTopics(context.Context, *ListTopicsRequest) (*ListTopicsResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ListTopics not implemented") } +func (UnimplementedSeaweedMessagingServer) TopicExists(context.Context, *TopicExistsRequest) (*TopicExistsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method TopicExists not implemented") +} func (UnimplementedSeaweedMessagingServer) ConfigureTopic(context.Context, *ConfigureTopicRequest) (*ConfigureTopicResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ConfigureTopic not implemented") } @@ -380,6 +411,9 @@ func (UnimplementedSeaweedMessagingServer) SubscribeFollowMe(grpc.ClientStreamin func (UnimplementedSeaweedMessagingServer) GetUnflushedMessages(*GetUnflushedMessagesRequest, grpc.ServerStreamingServer[GetUnflushedMessagesResponse]) error { return status.Errorf(codes.Unimplemented, "method GetUnflushedMessages not implemented") } +func (UnimplementedSeaweedMessagingServer) GetPartitionRangeInfo(context.Context, *GetPartitionRangeInfoRequest) (*GetPartitionRangeInfoResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetPartitionRangeInfo not implemented") +} func (UnimplementedSeaweedMessagingServer) mustEmbedUnimplementedSeaweedMessagingServer() {} func (UnimplementedSeaweedMessagingServer) testEmbeddedByValue() {} @@ -462,6 +496,24 @@ func _SeaweedMessaging_ListTopics_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _SeaweedMessaging_TopicExists_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(TopicExistsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SeaweedMessagingServer).TopicExists(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: SeaweedMessaging_TopicExists_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SeaweedMessagingServer).TopicExists(ctx, req.(*TopicExistsRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _SeaweedMessaging_ConfigureTopic_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(ConfigureTopicRequest) if err := dec(in); err != nil { @@ -652,6 +704,24 @@ func _SeaweedMessaging_GetUnflushedMessages_Handler(srv interface{}, stream grpc // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type SeaweedMessaging_GetUnflushedMessagesServer = grpc.ServerStreamingServer[GetUnflushedMessagesResponse] +func _SeaweedMessaging_GetPartitionRangeInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetPartitionRangeInfoRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SeaweedMessagingServer).GetPartitionRangeInfo(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: SeaweedMessaging_GetPartitionRangeInfo_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SeaweedMessagingServer).GetPartitionRangeInfo(ctx, req.(*GetPartitionRangeInfoRequest)) + } + return interceptor(ctx, in, info, handler) +} + // SeaweedMessaging_ServiceDesc is the grpc.ServiceDesc for SeaweedMessaging service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -672,6 +742,10 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{ Handler: _SeaweedMessaging_ListTopics_Handler, }, { + MethodName: "TopicExists", + Handler: _SeaweedMessaging_TopicExists_Handler, + }, + { MethodName: "ConfigureTopic", Handler: _SeaweedMessaging_ConfigureTopic_Handler, }, @@ -703,6 +777,10 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{ MethodName: "CloseSubscribers", Handler: _SeaweedMessaging_CloseSubscribers_Handler, }, + { + MethodName: "GetPartitionRangeInfo", + Handler: _SeaweedMessaging_GetPartitionRangeInfo_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/weed/pb/mq_schema.proto b/weed/pb/mq_schema.proto index 2deeadb55..81b523bcd 100644 --- a/weed/pb/mq_schema.proto +++ b/weed/pb/mq_schema.proto @@ -30,11 +30,15 @@ enum OffsetType { EXACT_TS_NS = 10; RESET_TO_LATEST = 15; RESUME_OR_LATEST = 20; + // Offset-based positioning + EXACT_OFFSET = 25; + RESET_TO_OFFSET = 30; } message PartitionOffset { Partition partition = 1; int64 start_ts_ns = 2; + int64 start_offset = 3; // For offset-based positioning } /////////////////////////// diff --git a/weed/pb/schema_pb/mq_schema.pb.go b/weed/pb/schema_pb/mq_schema.pb.go index 2cd2118bf..7fbf4a4e6 100644 --- a/weed/pb/schema_pb/mq_schema.pb.go +++ b/weed/pb/schema_pb/mq_schema.pb.go @@ -2,7 +2,7 @@ // versions: // protoc-gen-go v1.36.6 // protoc v5.29.3 -// source: weed/pb/mq_schema.proto +// source: mq_schema.proto package schema_pb @@ -29,6 +29,9 @@ const ( OffsetType_EXACT_TS_NS OffsetType = 10 OffsetType_RESET_TO_LATEST OffsetType = 15 OffsetType_RESUME_OR_LATEST OffsetType = 20 + // Offset-based positioning + OffsetType_EXACT_OFFSET OffsetType = 25 + OffsetType_RESET_TO_OFFSET OffsetType = 30 ) // Enum value maps for OffsetType. @@ -39,6 +42,8 @@ var ( 10: "EXACT_TS_NS", 15: "RESET_TO_LATEST", 20: "RESUME_OR_LATEST", + 25: "EXACT_OFFSET", + 30: "RESET_TO_OFFSET", } OffsetType_value = map[string]int32{ "RESUME_OR_EARLIEST": 0, @@ -46,6 +51,8 @@ var ( "EXACT_TS_NS": 10, "RESET_TO_LATEST": 15, "RESUME_OR_LATEST": 20, + "EXACT_OFFSET": 25, + "RESET_TO_OFFSET": 30, } ) @@ -60,11 +67,11 @@ func (x OffsetType) String() string { } func (OffsetType) Descriptor() protoreflect.EnumDescriptor { - return file_weed_pb_mq_schema_proto_enumTypes[0].Descriptor() + return file_mq_schema_proto_enumTypes[0].Descriptor() } func (OffsetType) Type() protoreflect.EnumType { - return &file_weed_pb_mq_schema_proto_enumTypes[0] + return &file_mq_schema_proto_enumTypes[0] } func (x OffsetType) Number() protoreflect.EnumNumber { @@ -73,7 +80,7 @@ func (x OffsetType) Number() protoreflect.EnumNumber { // Deprecated: Use OffsetType.Descriptor instead. func (OffsetType) EnumDescriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{0} + return file_mq_schema_proto_rawDescGZIP(), []int{0} } type ScalarType int32 @@ -134,11 +141,11 @@ func (x ScalarType) String() string { } func (ScalarType) Descriptor() protoreflect.EnumDescriptor { - return file_weed_pb_mq_schema_proto_enumTypes[1].Descriptor() + return file_mq_schema_proto_enumTypes[1].Descriptor() } func (ScalarType) Type() protoreflect.EnumType { - return &file_weed_pb_mq_schema_proto_enumTypes[1] + return &file_mq_schema_proto_enumTypes[1] } func (x ScalarType) Number() protoreflect.EnumNumber { @@ -147,7 +154,7 @@ func (x ScalarType) Number() protoreflect.EnumNumber { // Deprecated: Use ScalarType.Descriptor instead. func (ScalarType) EnumDescriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{1} + return file_mq_schema_proto_rawDescGZIP(), []int{1} } type Topic struct { @@ -160,7 +167,7 @@ type Topic struct { func (x *Topic) Reset() { *x = Topic{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[0] + mi := &file_mq_schema_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -172,7 +179,7 @@ func (x *Topic) String() string { func (*Topic) ProtoMessage() {} func (x *Topic) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[0] + mi := &file_mq_schema_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -185,7 +192,7 @@ func (x *Topic) ProtoReflect() protoreflect.Message { // Deprecated: Use Topic.ProtoReflect.Descriptor instead. func (*Topic) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{0} + return file_mq_schema_proto_rawDescGZIP(), []int{0} } func (x *Topic) GetNamespace() string { @@ -214,7 +221,7 @@ type Partition struct { func (x *Partition) Reset() { *x = Partition{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[1] + mi := &file_mq_schema_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -226,7 +233,7 @@ func (x *Partition) String() string { func (*Partition) ProtoMessage() {} func (x *Partition) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[1] + mi := &file_mq_schema_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -239,7 +246,7 @@ func (x *Partition) ProtoReflect() protoreflect.Message { // Deprecated: Use Partition.ProtoReflect.Descriptor instead. func (*Partition) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{1} + return file_mq_schema_proto_rawDescGZIP(), []int{1} } func (x *Partition) GetRingSize() int32 { @@ -280,7 +287,7 @@ type Offset struct { func (x *Offset) Reset() { *x = Offset{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[2] + mi := &file_mq_schema_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -292,7 +299,7 @@ func (x *Offset) String() string { func (*Offset) ProtoMessage() {} func (x *Offset) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[2] + mi := &file_mq_schema_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -305,7 +312,7 @@ func (x *Offset) ProtoReflect() protoreflect.Message { // Deprecated: Use Offset.ProtoReflect.Descriptor instead. func (*Offset) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{2} + return file_mq_schema_proto_rawDescGZIP(), []int{2} } func (x *Offset) GetTopic() *Topic { @@ -326,13 +333,14 @@ type PartitionOffset struct { state protoimpl.MessageState `protogen:"open.v1"` Partition *Partition `protobuf:"bytes,1,opt,name=partition,proto3" json:"partition,omitempty"` StartTsNs int64 `protobuf:"varint,2,opt,name=start_ts_ns,json=startTsNs,proto3" json:"start_ts_ns,omitempty"` + StartOffset int64 `protobuf:"varint,3,opt,name=start_offset,json=startOffset,proto3" json:"start_offset,omitempty"` // For offset-based positioning unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *PartitionOffset) Reset() { *x = PartitionOffset{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[3] + mi := &file_mq_schema_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -344,7 +352,7 @@ func (x *PartitionOffset) String() string { func (*PartitionOffset) ProtoMessage() {} func (x *PartitionOffset) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[3] + mi := &file_mq_schema_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -357,7 +365,7 @@ func (x *PartitionOffset) ProtoReflect() protoreflect.Message { // Deprecated: Use PartitionOffset.ProtoReflect.Descriptor instead. func (*PartitionOffset) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{3} + return file_mq_schema_proto_rawDescGZIP(), []int{3} } func (x *PartitionOffset) GetPartition() *Partition { @@ -374,6 +382,13 @@ func (x *PartitionOffset) GetStartTsNs() int64 { return 0 } +func (x *PartitionOffset) GetStartOffset() int64 { + if x != nil { + return x.StartOffset + } + return 0 +} + type RecordType struct { state protoimpl.MessageState `protogen:"open.v1"` Fields []*Field `protobuf:"bytes,1,rep,name=fields,proto3" json:"fields,omitempty"` @@ -383,7 +398,7 @@ type RecordType struct { func (x *RecordType) Reset() { *x = RecordType{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[4] + mi := &file_mq_schema_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -395,7 +410,7 @@ func (x *RecordType) String() string { func (*RecordType) ProtoMessage() {} func (x *RecordType) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[4] + mi := &file_mq_schema_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -408,7 +423,7 @@ func (x *RecordType) ProtoReflect() protoreflect.Message { // Deprecated: Use RecordType.ProtoReflect.Descriptor instead. func (*RecordType) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{4} + return file_mq_schema_proto_rawDescGZIP(), []int{4} } func (x *RecordType) GetFields() []*Field { @@ -431,7 +446,7 @@ type Field struct { func (x *Field) Reset() { *x = Field{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[5] + mi := &file_mq_schema_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -443,7 +458,7 @@ func (x *Field) String() string { func (*Field) ProtoMessage() {} func (x *Field) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[5] + mi := &file_mq_schema_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -456,7 +471,7 @@ func (x *Field) ProtoReflect() protoreflect.Message { // Deprecated: Use Field.ProtoReflect.Descriptor instead. func (*Field) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{5} + return file_mq_schema_proto_rawDescGZIP(), []int{5} } func (x *Field) GetName() string { @@ -508,7 +523,7 @@ type Type struct { func (x *Type) Reset() { *x = Type{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[6] + mi := &file_mq_schema_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -520,7 +535,7 @@ func (x *Type) String() string { func (*Type) ProtoMessage() {} func (x *Type) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[6] + mi := &file_mq_schema_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -533,7 +548,7 @@ func (x *Type) ProtoReflect() protoreflect.Message { // Deprecated: Use Type.ProtoReflect.Descriptor instead. func (*Type) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{6} + return file_mq_schema_proto_rawDescGZIP(), []int{6} } func (x *Type) GetKind() isType_Kind { @@ -601,7 +616,7 @@ type ListType struct { func (x *ListType) Reset() { *x = ListType{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[7] + mi := &file_mq_schema_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -613,7 +628,7 @@ func (x *ListType) String() string { func (*ListType) ProtoMessage() {} func (x *ListType) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[7] + mi := &file_mq_schema_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -626,7 +641,7 @@ func (x *ListType) ProtoReflect() protoreflect.Message { // Deprecated: Use ListType.ProtoReflect.Descriptor instead. func (*ListType) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{7} + return file_mq_schema_proto_rawDescGZIP(), []int{7} } func (x *ListType) GetElementType() *Type { @@ -648,7 +663,7 @@ type RecordValue struct { func (x *RecordValue) Reset() { *x = RecordValue{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[8] + mi := &file_mq_schema_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -660,7 +675,7 @@ func (x *RecordValue) String() string { func (*RecordValue) ProtoMessage() {} func (x *RecordValue) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[8] + mi := &file_mq_schema_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -673,7 +688,7 @@ func (x *RecordValue) ProtoReflect() protoreflect.Message { // Deprecated: Use RecordValue.ProtoReflect.Descriptor instead. func (*RecordValue) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{8} + return file_mq_schema_proto_rawDescGZIP(), []int{8} } func (x *RecordValue) GetFields() map[string]*Value { @@ -707,7 +722,7 @@ type Value struct { func (x *Value) Reset() { *x = Value{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[9] + mi := &file_mq_schema_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -719,7 +734,7 @@ func (x *Value) String() string { func (*Value) ProtoMessage() {} func (x *Value) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[9] + mi := &file_mq_schema_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -732,7 +747,7 @@ func (x *Value) ProtoReflect() protoreflect.Message { // Deprecated: Use Value.ProtoReflect.Descriptor instead. func (*Value) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{9} + return file_mq_schema_proto_rawDescGZIP(), []int{9} } func (x *Value) GetKind() isValue_Kind { @@ -954,7 +969,7 @@ type TimestampValue struct { func (x *TimestampValue) Reset() { *x = TimestampValue{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[10] + mi := &file_mq_schema_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -966,7 +981,7 @@ func (x *TimestampValue) String() string { func (*TimestampValue) ProtoMessage() {} func (x *TimestampValue) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[10] + mi := &file_mq_schema_proto_msgTypes[10] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -979,7 +994,7 @@ func (x *TimestampValue) ProtoReflect() protoreflect.Message { // Deprecated: Use TimestampValue.ProtoReflect.Descriptor instead. func (*TimestampValue) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{10} + return file_mq_schema_proto_rawDescGZIP(), []int{10} } func (x *TimestampValue) GetTimestampMicros() int64 { @@ -1005,7 +1020,7 @@ type DateValue struct { func (x *DateValue) Reset() { *x = DateValue{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[11] + mi := &file_mq_schema_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1017,7 +1032,7 @@ func (x *DateValue) String() string { func (*DateValue) ProtoMessage() {} func (x *DateValue) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[11] + mi := &file_mq_schema_proto_msgTypes[11] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1030,7 +1045,7 @@ func (x *DateValue) ProtoReflect() protoreflect.Message { // Deprecated: Use DateValue.ProtoReflect.Descriptor instead. func (*DateValue) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{11} + return file_mq_schema_proto_rawDescGZIP(), []int{11} } func (x *DateValue) GetDaysSinceEpoch() int32 { @@ -1051,7 +1066,7 @@ type DecimalValue struct { func (x *DecimalValue) Reset() { *x = DecimalValue{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[12] + mi := &file_mq_schema_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1063,7 +1078,7 @@ func (x *DecimalValue) String() string { func (*DecimalValue) ProtoMessage() {} func (x *DecimalValue) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[12] + mi := &file_mq_schema_proto_msgTypes[12] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1076,7 +1091,7 @@ func (x *DecimalValue) ProtoReflect() protoreflect.Message { // Deprecated: Use DecimalValue.ProtoReflect.Descriptor instead. func (*DecimalValue) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{12} + return file_mq_schema_proto_rawDescGZIP(), []int{12} } func (x *DecimalValue) GetValue() []byte { @@ -1109,7 +1124,7 @@ type TimeValue struct { func (x *TimeValue) Reset() { *x = TimeValue{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[13] + mi := &file_mq_schema_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1121,7 +1136,7 @@ func (x *TimeValue) String() string { func (*TimeValue) ProtoMessage() {} func (x *TimeValue) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[13] + mi := &file_mq_schema_proto_msgTypes[13] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1134,7 +1149,7 @@ func (x *TimeValue) ProtoReflect() protoreflect.Message { // Deprecated: Use TimeValue.ProtoReflect.Descriptor instead. func (*TimeValue) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{13} + return file_mq_schema_proto_rawDescGZIP(), []int{13} } func (x *TimeValue) GetTimeMicros() int64 { @@ -1153,7 +1168,7 @@ type ListValue struct { func (x *ListValue) Reset() { *x = ListValue{} - mi := &file_weed_pb_mq_schema_proto_msgTypes[14] + mi := &file_mq_schema_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1165,7 +1180,7 @@ func (x *ListValue) String() string { func (*ListValue) ProtoMessage() {} func (x *ListValue) ProtoReflect() protoreflect.Message { - mi := &file_weed_pb_mq_schema_proto_msgTypes[14] + mi := &file_mq_schema_proto_msgTypes[14] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1178,7 +1193,7 @@ func (x *ListValue) ProtoReflect() protoreflect.Message { // Deprecated: Use ListValue.ProtoReflect.Descriptor instead. func (*ListValue) Descriptor() ([]byte, []int) { - return file_weed_pb_mq_schema_proto_rawDescGZIP(), []int{14} + return file_mq_schema_proto_rawDescGZIP(), []int{14} } func (x *ListValue) GetValues() []*Value { @@ -1188,11 +1203,11 @@ func (x *ListValue) GetValues() []*Value { return nil } -var File_weed_pb_mq_schema_proto protoreflect.FileDescriptor +var File_mq_schema_proto protoreflect.FileDescriptor -const file_weed_pb_mq_schema_proto_rawDesc = "" + +const file_mq_schema_proto_rawDesc = "" + "\n" + - "\x17weed/pb/mq_schema.proto\x12\tschema_pb\"9\n" + + "\x0fmq_schema.proto\x12\tschema_pb\"9\n" + "\x05Topic\x12\x1c\n" + "\tnamespace\x18\x01 \x01(\tR\tnamespace\x12\x12\n" + "\x04name\x18\x02 \x01(\tR\x04name\"\x8a\x01\n" + @@ -1206,10 +1221,11 @@ const file_weed_pb_mq_schema_proto_rawDesc = "" + "unixTimeNs\"y\n" + "\x06Offset\x12&\n" + "\x05topic\x18\x01 \x01(\v2\x10.schema_pb.TopicR\x05topic\x12G\n" + - "\x11partition_offsets\x18\x02 \x03(\v2\x1a.schema_pb.PartitionOffsetR\x10partitionOffsets\"e\n" + + "\x11partition_offsets\x18\x02 \x03(\v2\x1a.schema_pb.PartitionOffsetR\x10partitionOffsets\"\x88\x01\n" + "\x0fPartitionOffset\x122\n" + "\tpartition\x18\x01 \x01(\v2\x14.schema_pb.PartitionR\tpartition\x12\x1e\n" + - "\vstart_ts_ns\x18\x02 \x01(\x03R\tstartTsNs\"6\n" + + "\vstart_ts_ns\x18\x02 \x01(\x03R\tstartTsNs\x12!\n" + + "\fstart_offset\x18\x03 \x01(\x03R\vstartOffset\"6\n" + "\n" + "RecordType\x12(\n" + "\x06fields\x18\x01 \x03(\v2\x10.schema_pb.FieldR\x06fields\"\xa3\x01\n" + @@ -1273,7 +1289,7 @@ const file_weed_pb_mq_schema_proto_rawDesc = "" + "\vtime_micros\x18\x01 \x01(\x03R\n" + "timeMicros\"5\n" + "\tListValue\x12(\n" + - "\x06values\x18\x01 \x03(\v2\x10.schema_pb.ValueR\x06values*w\n" + + "\x06values\x18\x01 \x03(\v2\x10.schema_pb.ValueR\x06values*\x9e\x01\n" + "\n" + "OffsetType\x12\x16\n" + "\x12RESUME_OR_EARLIEST\x10\x00\x12\x15\n" + @@ -1281,7 +1297,9 @@ const file_weed_pb_mq_schema_proto_rawDesc = "" + "\vEXACT_TS_NS\x10\n" + "\x12\x13\n" + "\x0fRESET_TO_LATEST\x10\x0f\x12\x14\n" + - "\x10RESUME_OR_LATEST\x10\x14*\x8a\x01\n" + + "\x10RESUME_OR_LATEST\x10\x14\x12\x10\n" + + "\fEXACT_OFFSET\x10\x19\x12\x13\n" + + "\x0fRESET_TO_OFFSET\x10\x1e*\x8a\x01\n" + "\n" + "ScalarType\x12\b\n" + "\x04BOOL\x10\x00\x12\t\n" + @@ -1300,20 +1318,20 @@ const file_weed_pb_mq_schema_proto_rawDesc = "" + "\x04TIME\x10\vB2Z0github.com/seaweedfs/seaweedfs/weed/pb/schema_pbb\x06proto3" var ( - file_weed_pb_mq_schema_proto_rawDescOnce sync.Once - file_weed_pb_mq_schema_proto_rawDescData []byte + file_mq_schema_proto_rawDescOnce sync.Once + file_mq_schema_proto_rawDescData []byte ) -func file_weed_pb_mq_schema_proto_rawDescGZIP() []byte { - file_weed_pb_mq_schema_proto_rawDescOnce.Do(func() { - file_weed_pb_mq_schema_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_weed_pb_mq_schema_proto_rawDesc), len(file_weed_pb_mq_schema_proto_rawDesc))) +func file_mq_schema_proto_rawDescGZIP() []byte { + file_mq_schema_proto_rawDescOnce.Do(func() { + file_mq_schema_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_mq_schema_proto_rawDesc), len(file_mq_schema_proto_rawDesc))) }) - return file_weed_pb_mq_schema_proto_rawDescData + return file_mq_schema_proto_rawDescData } -var file_weed_pb_mq_schema_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_weed_pb_mq_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 16) -var file_weed_pb_mq_schema_proto_goTypes = []any{ +var file_mq_schema_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_mq_schema_proto_msgTypes = make([]protoimpl.MessageInfo, 16) +var file_mq_schema_proto_goTypes = []any{ (OffsetType)(0), // 0: schema_pb.OffsetType (ScalarType)(0), // 1: schema_pb.ScalarType (*Topic)(nil), // 2: schema_pb.Topic @@ -1333,7 +1351,7 @@ var file_weed_pb_mq_schema_proto_goTypes = []any{ (*ListValue)(nil), // 16: schema_pb.ListValue nil, // 17: schema_pb.RecordValue.FieldsEntry } -var file_weed_pb_mq_schema_proto_depIdxs = []int32{ +var file_mq_schema_proto_depIdxs = []int32{ 2, // 0: schema_pb.Offset.topic:type_name -> schema_pb.Topic 5, // 1: schema_pb.Offset.partition_offsets:type_name -> schema_pb.PartitionOffset 3, // 2: schema_pb.PartitionOffset.partition:type_name -> schema_pb.Partition @@ -1359,17 +1377,17 @@ var file_weed_pb_mq_schema_proto_depIdxs = []int32{ 0, // [0:18] is the sub-list for field type_name } -func init() { file_weed_pb_mq_schema_proto_init() } -func file_weed_pb_mq_schema_proto_init() { - if File_weed_pb_mq_schema_proto != nil { +func init() { file_mq_schema_proto_init() } +func file_mq_schema_proto_init() { + if File_mq_schema_proto != nil { return } - file_weed_pb_mq_schema_proto_msgTypes[6].OneofWrappers = []any{ + file_mq_schema_proto_msgTypes[6].OneofWrappers = []any{ (*Type_ScalarType)(nil), (*Type_RecordType)(nil), (*Type_ListType)(nil), } - file_weed_pb_mq_schema_proto_msgTypes[9].OneofWrappers = []any{ + file_mq_schema_proto_msgTypes[9].OneofWrappers = []any{ (*Value_BoolValue)(nil), (*Value_Int32Value)(nil), (*Value_Int64Value)(nil), @@ -1388,18 +1406,18 @@ func file_weed_pb_mq_schema_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: unsafe.Slice(unsafe.StringData(file_weed_pb_mq_schema_proto_rawDesc), len(file_weed_pb_mq_schema_proto_rawDesc)), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_mq_schema_proto_rawDesc), len(file_mq_schema_proto_rawDesc)), NumEnums: 2, NumMessages: 16, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_weed_pb_mq_schema_proto_goTypes, - DependencyIndexes: file_weed_pb_mq_schema_proto_depIdxs, - EnumInfos: file_weed_pb_mq_schema_proto_enumTypes, - MessageInfos: file_weed_pb_mq_schema_proto_msgTypes, + GoTypes: file_mq_schema_proto_goTypes, + DependencyIndexes: file_mq_schema_proto_depIdxs, + EnumInfos: file_mq_schema_proto_enumTypes, + MessageInfos: file_mq_schema_proto_msgTypes, }.Build() - File_weed_pb_mq_schema_proto = out.File - file_weed_pb_mq_schema_proto_goTypes = nil - file_weed_pb_mq_schema_proto_depIdxs = nil + File_mq_schema_proto = out.File + file_mq_schema_proto_goTypes = nil + file_mq_schema_proto_depIdxs = nil } diff --git a/weed/pb/schema_pb/offset_test.go b/weed/pb/schema_pb/offset_test.go new file mode 100644 index 000000000..28324836e --- /dev/null +++ b/weed/pb/schema_pb/offset_test.go @@ -0,0 +1,93 @@ +package schema_pb + +import ( + "testing" + "google.golang.org/protobuf/proto" +) + +func TestOffsetTypeEnums(t *testing.T) { + // Test that new offset-based enum values are defined + tests := []struct { + name string + value OffsetType + expected int32 + }{ + {"EXACT_OFFSET", OffsetType_EXACT_OFFSET, 25}, + {"RESET_TO_OFFSET", OffsetType_RESET_TO_OFFSET, 30}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if int32(tt.value) != tt.expected { + t.Errorf("OffsetType_%s = %d, want %d", tt.name, int32(tt.value), tt.expected) + } + }) + } +} + +func TestPartitionOffsetSerialization(t *testing.T) { + // Test that PartitionOffset can serialize/deserialize with new offset field + original := &PartitionOffset{ + Partition: &Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: 1234567890, + }, + StartTsNs: 1234567890, + StartOffset: 42, // New field + } + + // Test proto marshaling/unmarshaling + data, err := proto.Marshal(original) + if err != nil { + t.Fatalf("Failed to marshal PartitionOffset: %v", err) + } + + restored := &PartitionOffset{} + err = proto.Unmarshal(data, restored) + if err != nil { + t.Fatalf("Failed to unmarshal PartitionOffset: %v", err) + } + + // Verify all fields are preserved + if restored.StartTsNs != original.StartTsNs { + t.Errorf("StartTsNs = %d, want %d", restored.StartTsNs, original.StartTsNs) + } + if restored.StartOffset != original.StartOffset { + t.Errorf("StartOffset = %d, want %d", restored.StartOffset, original.StartOffset) + } + if restored.Partition.RingSize != original.Partition.RingSize { + t.Errorf("Partition.RingSize = %d, want %d", restored.Partition.RingSize, original.Partition.RingSize) + } +} + +func TestPartitionOffsetBackwardCompatibility(t *testing.T) { + // Test that PartitionOffset without StartOffset still works + original := &PartitionOffset{ + Partition: &Partition{ + RingSize: 1024, + RangeStart: 0, + RangeStop: 31, + UnixTimeNs: 1234567890, + }, + StartTsNs: 1234567890, + // StartOffset not set (defaults to 0) + } + + data, err := proto.Marshal(original) + if err != nil { + t.Fatalf("Failed to marshal PartitionOffset: %v", err) + } + + restored := &PartitionOffset{} + err = proto.Unmarshal(data, restored) + if err != nil { + t.Fatalf("Failed to unmarshal PartitionOffset: %v", err) + } + + // StartOffset should default to 0 + if restored.StartOffset != 0 { + t.Errorf("StartOffset = %d, want 0", restored.StartOffset) + } +} |
