aboutsummaryrefslogtreecommitdiff
path: root/weed/pb/mq.proto
blob: 23284b767f9fa6d7095e65760ad157a253363eba (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
syntax = "proto3";

package messaging_pb;

import "schema.proto";

option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb";
option java_package = "seaweedfs.mq";
option java_outer_classname = "MessageQueueProto";

//////////////////////////////////////////////////

service SeaweedMessaging {

    // control plane
    rpc FindBrokerLeader (FindBrokerLeaderRequest) returns (FindBrokerLeaderResponse) {
    }

    // control plane for balancer
    rpc PublisherToPubBalancer (stream PublisherToPubBalancerRequest) returns (stream PublisherToPubBalancerResponse) {
    }
    rpc BalanceTopics (BalanceTopicsRequest) returns (BalanceTopicsResponse) {
    }

    // control plane for topic partitions
    rpc ListTopics (ListTopicsRequest) returns (ListTopicsResponse) {
    }
    rpc ConfigureTopic (ConfigureTopicRequest) returns (ConfigureTopicResponse) {
    }
    rpc LookupTopicBrokers (LookupTopicBrokersRequest) returns (LookupTopicBrokersResponse) {
    }

    // invoked by the balancer, running on each broker
    rpc AssignTopicPartitions (AssignTopicPartitionsRequest) returns (AssignTopicPartitionsResponse) {
    }
    rpc ClosePublishers(ClosePublishersRequest) returns (ClosePublishersResponse) {
    }
    rpc CloseSubscribers(CloseSubscribersRequest) returns (CloseSubscribersResponse) {
    }

    // subscriber connects to broker balancer, which coordinates with the subscribers
    rpc SubscriberToSubCoordinator (stream SubscriberToSubCoordinatorRequest) returns (stream SubscriberToSubCoordinatorResponse) {
    }

    // data plane for each topic partition
    rpc PublishMessage (stream PublishMessageRequest) returns (stream PublishMessageResponse) {
    }
    rpc SubscribeMessage (stream SubscribeMessageRequest) returns (stream SubscribeMessageResponse) {
    }
    // The lead broker asks a follower broker to follow itself
    rpc PublishFollowMe (stream PublishFollowMeRequest) returns (stream PublishFollowMeResponse) {
    }
    rpc SubscribeFollowMe (stream SubscribeFollowMeRequest) returns (SubscribeFollowMeResponse) {
    }
}

//////////////////////////////////////////////////

message FindBrokerLeaderRequest {
    string filer_group = 1;
}

message FindBrokerLeaderResponse {
    string broker = 1;
}

message Topic {
    string namespace = 1;
    string name = 2;
}
message Partition {
    int32 ring_size = 1;
    int32 range_start = 2;
    int32 range_stop = 3;
    int64 unix_time_ns = 4;
}

message Offset {
    Topic topic = 1;
    repeated PartitionOffset partition_offsets = 2;
}

enum PartitionOffsetStartType {
    EARLIEST = 0;
    EARLIEST_IN_MEMORY = 1;
    LATEST = 2;
}

message PartitionOffset {
    Partition partition = 1;
    int64 start_ts_ns = 2;
    int64 stop_ts_ns = 3;
    PartitionOffsetStartType start_type = 4;
}

//////////////////////////////////////////////////
message BrokerStats {
    int32 cpu_usage_percent = 1;
    map<string, TopicPartitionStats> stats = 2;
}
message TopicPartitionStats {
    Topic topic = 1;
    Partition partition = 2;
    int32 publisher_count = 3;
    int32 subscriber_count = 4;
    string follower = 5;
}


message PublisherToPubBalancerRequest {
    message InitMessage {
        string broker = 1;
    }
    oneof message {
        InitMessage init = 1;
        BrokerStats stats = 2;
    }
}
message PublisherToPubBalancerResponse {
}

message BalanceTopicsRequest {
}
message BalanceTopicsResponse {
}

//////////////////////////////////////////////////
message ConfigureTopicRequest {
    Topic topic = 1;
    int32 partition_count = 2;
    schema_pb.RecordType record_type = 3;
}
message ConfigureTopicResponse {
    repeated BrokerPartitionAssignment broker_partition_assignments = 2;
    schema_pb.RecordType record_type = 3;
}
message ListTopicsRequest {
}
message ListTopicsResponse {
    repeated Topic topics = 1;
}
message LookupTopicBrokersRequest {
    Topic topic = 1;
}
message LookupTopicBrokersResponse {
    Topic topic = 1;
    repeated BrokerPartitionAssignment broker_partition_assignments = 2;
}
message BrokerPartitionAssignment {
    Partition partition = 1;
    string leader_broker = 2;
    string follower_broker = 3;
}

message AssignTopicPartitionsRequest {
    Topic topic = 1;
    repeated BrokerPartitionAssignment broker_partition_assignments = 2;
    bool is_leader = 3;
    bool is_draining = 4;
}
message AssignTopicPartitionsResponse {
}

message SubscriberToSubCoordinatorRequest {
    message InitMessage {
        string consumer_group = 1;
        string consumer_group_instance_id = 2;
        Topic topic = 3;
        // The consumer group instance will be assigned at most max_partition_count partitions.
        // If the number of partitions is less than the sum of max_partition_count,
        // the consumer group instance may be assigned partitions less than max_partition_count.
        // Default is 1.
        int32 max_partition_count = 4;
        // If consumer group instance changes, wait for rebalance_seconds before reassigning partitions
        // Exception: if adding a new consumer group instance and sum of max_partition_count equals the number of partitions,
        // the rebalance will happen immediately.
        // Default is 10 seconds.
        int32 rebalance_seconds = 5;
    }
    message AckUnAssignmentMessage {
        Partition partition = 1;
    }
    message AckAssignmentMessage {
        Partition partition = 1;
    }
    oneof message {
        InitMessage init = 1;
        AckAssignmentMessage ack_assignment = 2;
        AckUnAssignmentMessage ack_un_assignment = 3;
    }
}
message SubscriberToSubCoordinatorResponse {
    message Assignment {
        BrokerPartitionAssignment partition_assignment = 1;
    }
    message UnAssignment {
        Partition partition = 1;
    }
    oneof message {
        Assignment assignment = 1;
        UnAssignment un_assignment = 2;
    }
}

//////////////////////////////////////////////////
message ControlMessage {
    bool is_close = 1;
    string publisher_name = 2;
}
message DataMessage {
    bytes key = 1;
    bytes value = 2;
    int64 ts_ns = 3;
    ControlMessage ctrl = 4;
}
message PublishMessageRequest {
    message InitMessage {
        Topic topic = 1;
        Partition partition = 2;
        int32 ack_interval = 3;
        string follower_broker = 4;
        string publisher_name = 5; // for debugging
    }
    oneof message {
        InitMessage init = 1;
        DataMessage data = 2;
    }
}
message PublishMessageResponse {
    int64 ack_sequence = 1;
    string error = 2;
    bool should_close = 3;
}
message PublishFollowMeRequest {
    message InitMessage {
        Topic topic = 1;
        Partition partition = 2;
    }
    message FlushMessage {
        int64 ts_ns = 1;
    }
    message CloseMessage {
    }
    oneof message {
        InitMessage init = 1;
        DataMessage data = 2;
        FlushMessage flush = 3;
        CloseMessage close = 4;
    }
}
message PublishFollowMeResponse {
    int64 ack_ts_ns = 1;
}
message SubscribeMessageRequest {
    message InitMessage {
        string consumer_group = 1;
        string consumer_id = 2;
        string client_id = 3;
        Topic topic = 4;
        PartitionOffset partition_offset = 5;
        string filter = 6;
        string follower_broker = 7;
        int32 concurrency = 8;
    }
    message AckMessage {
        int64 sequence = 1;
        bytes key = 2;
    }
    oneof message {
        InitMessage init = 1;
        AckMessage ack = 2;
    }
}
message SubscribeMessageResponse {
    message SubscribeCtrlMessage {
        string error = 1;
        bool is_end_of_stream = 2;
        bool is_end_of_topic = 3;
    }
    oneof message {
        SubscribeCtrlMessage ctrl = 1;
        DataMessage data = 2;
    }
}
message SubscribeFollowMeRequest {
    message InitMessage {
        Topic topic = 1;
        Partition partition = 2;
        string consumer_group = 3;
    }
    message AckMessage {
        int64 ts_ns = 1;
    }
    message CloseMessage {
    }
    oneof message {
        InitMessage init = 1;
        AckMessage ack = 2;
        CloseMessage close = 3;
    }
}
message SubscribeFollowMeResponse {
    int64 ack_ts_ns = 1;
}
message ClosePublishersRequest {
    Topic topic = 1;
    int64 unix_time_ns = 2;
}
message ClosePublishersResponse {
}
message CloseSubscribersRequest {
    Topic topic = 1;
    int64 unix_time_ns = 2;
}
message CloseSubscribersResponse {
}