1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
|
package protocol
import (
"context"
"encoding/binary"
"net"
"time"
)
// Kafka Protocol Error Codes
// Based on Apache Kafka protocol specification
const (
// Success
ErrorCodeNone int16 = 0
// General server errors
ErrorCodeUnknownServerError int16 = -1
ErrorCodeOffsetOutOfRange int16 = 1
ErrorCodeCorruptMessage int16 = 3 // Also UNKNOWN_TOPIC_OR_PARTITION
ErrorCodeUnknownTopicOrPartition int16 = 3
ErrorCodeInvalidFetchSize int16 = 4
ErrorCodeLeaderNotAvailable int16 = 5
ErrorCodeNotLeaderOrFollower int16 = 6 // Formerly NOT_LEADER_FOR_PARTITION
ErrorCodeRequestTimedOut int16 = 7
ErrorCodeBrokerNotAvailable int16 = 8
ErrorCodeReplicaNotAvailable int16 = 9
ErrorCodeMessageTooLarge int16 = 10
ErrorCodeStaleControllerEpoch int16 = 11
ErrorCodeOffsetMetadataTooLarge int16 = 12
ErrorCodeNetworkException int16 = 13
ErrorCodeOffsetLoadInProgress int16 = 14
ErrorCodeGroupLoadInProgress int16 = 15
ErrorCodeNotCoordinatorForGroup int16 = 16
ErrorCodeNotCoordinatorForTransaction int16 = 17
// Consumer group coordination errors
ErrorCodeIllegalGeneration int16 = 22
ErrorCodeInconsistentGroupProtocol int16 = 23
ErrorCodeInvalidGroupID int16 = 24
ErrorCodeUnknownMemberID int16 = 25
ErrorCodeInvalidSessionTimeout int16 = 26
ErrorCodeRebalanceInProgress int16 = 27
ErrorCodeInvalidCommitOffsetSize int16 = 28
ErrorCodeTopicAuthorizationFailed int16 = 29
ErrorCodeGroupAuthorizationFailed int16 = 30
ErrorCodeClusterAuthorizationFailed int16 = 31
ErrorCodeInvalidTimestamp int16 = 32
ErrorCodeUnsupportedSASLMechanism int16 = 33
ErrorCodeIllegalSASLState int16 = 34
ErrorCodeUnsupportedVersion int16 = 35
// Topic management errors
ErrorCodeTopicAlreadyExists int16 = 36
ErrorCodeInvalidPartitions int16 = 37
ErrorCodeInvalidReplicationFactor int16 = 38
ErrorCodeInvalidReplicaAssignment int16 = 39
ErrorCodeInvalidConfig int16 = 40
ErrorCodeNotController int16 = 41
ErrorCodeInvalidRecord int16 = 42
ErrorCodePolicyViolation int16 = 43
ErrorCodeOutOfOrderSequenceNumber int16 = 44
ErrorCodeDuplicateSequenceNumber int16 = 45
ErrorCodeInvalidProducerEpoch int16 = 46
ErrorCodeInvalidTxnState int16 = 47
ErrorCodeInvalidProducerIDMapping int16 = 48
ErrorCodeInvalidTransactionTimeout int16 = 49
ErrorCodeConcurrentTransactions int16 = 50
// Connection and timeout errors
ErrorCodeConnectionRefused int16 = 60 // Custom for connection issues
ErrorCodeConnectionTimeout int16 = 61 // Custom for connection timeouts
ErrorCodeReadTimeout int16 = 62 // Custom for read timeouts
ErrorCodeWriteTimeout int16 = 63 // Custom for write timeouts
// Consumer group specific errors
ErrorCodeMemberIDRequired int16 = 79
ErrorCodeFencedInstanceID int16 = 82
ErrorCodeGroupMaxSizeReached int16 = 84
ErrorCodeUnstableOffsetCommit int16 = 95
)
// ErrorInfo contains metadata about a Kafka error
type ErrorInfo struct {
Code int16
Name string
Description string
Retriable bool
}
// KafkaErrors maps error codes to their metadata
var KafkaErrors = map[int16]ErrorInfo{
ErrorCodeNone: {
Code: ErrorCodeNone, Name: "NONE", Description: "No error", Retriable: false,
},
ErrorCodeUnknownServerError: {
Code: ErrorCodeUnknownServerError, Name: "UNKNOWN_SERVER_ERROR",
Description: "Unknown server error", Retriable: true,
},
ErrorCodeOffsetOutOfRange: {
Code: ErrorCodeOffsetOutOfRange, Name: "OFFSET_OUT_OF_RANGE",
Description: "Offset out of range", Retriable: false,
},
ErrorCodeUnknownTopicOrPartition: {
Code: ErrorCodeUnknownTopicOrPartition, Name: "UNKNOWN_TOPIC_OR_PARTITION",
Description: "Topic or partition does not exist", Retriable: false,
},
ErrorCodeInvalidFetchSize: {
Code: ErrorCodeInvalidFetchSize, Name: "INVALID_FETCH_SIZE",
Description: "Invalid fetch size", Retriable: false,
},
ErrorCodeLeaderNotAvailable: {
Code: ErrorCodeLeaderNotAvailable, Name: "LEADER_NOT_AVAILABLE",
Description: "Leader not available", Retriable: true,
},
ErrorCodeNotLeaderOrFollower: {
Code: ErrorCodeNotLeaderOrFollower, Name: "NOT_LEADER_OR_FOLLOWER",
Description: "Not leader or follower", Retriable: true,
},
ErrorCodeRequestTimedOut: {
Code: ErrorCodeRequestTimedOut, Name: "REQUEST_TIMED_OUT",
Description: "Request timed out", Retriable: true,
},
ErrorCodeBrokerNotAvailable: {
Code: ErrorCodeBrokerNotAvailable, Name: "BROKER_NOT_AVAILABLE",
Description: "Broker not available", Retriable: true,
},
ErrorCodeMessageTooLarge: {
Code: ErrorCodeMessageTooLarge, Name: "MESSAGE_TOO_LARGE",
Description: "Message size exceeds limit", Retriable: false,
},
ErrorCodeOffsetMetadataTooLarge: {
Code: ErrorCodeOffsetMetadataTooLarge, Name: "OFFSET_METADATA_TOO_LARGE",
Description: "Offset metadata too large", Retriable: false,
},
ErrorCodeNetworkException: {
Code: ErrorCodeNetworkException, Name: "NETWORK_EXCEPTION",
Description: "Network error", Retriable: true,
},
ErrorCodeOffsetLoadInProgress: {
Code: ErrorCodeOffsetLoadInProgress, Name: "OFFSET_LOAD_IN_PROGRESS",
Description: "Offset load in progress", Retriable: true,
},
ErrorCodeNotCoordinatorForGroup: {
Code: ErrorCodeNotCoordinatorForGroup, Name: "NOT_COORDINATOR_FOR_GROUP",
Description: "Not coordinator for group", Retriable: true,
},
ErrorCodeInvalidGroupID: {
Code: ErrorCodeInvalidGroupID, Name: "INVALID_GROUP_ID",
Description: "Invalid group ID", Retriable: false,
},
ErrorCodeUnknownMemberID: {
Code: ErrorCodeUnknownMemberID, Name: "UNKNOWN_MEMBER_ID",
Description: "Unknown member ID", Retriable: false,
},
ErrorCodeInvalidSessionTimeout: {
Code: ErrorCodeInvalidSessionTimeout, Name: "INVALID_SESSION_TIMEOUT",
Description: "Invalid session timeout", Retriable: false,
},
ErrorCodeRebalanceInProgress: {
Code: ErrorCodeRebalanceInProgress, Name: "REBALANCE_IN_PROGRESS",
Description: "Group rebalance in progress", Retriable: true,
},
ErrorCodeInvalidCommitOffsetSize: {
Code: ErrorCodeInvalidCommitOffsetSize, Name: "INVALID_COMMIT_OFFSET_SIZE",
Description: "Invalid commit offset size", Retriable: false,
},
ErrorCodeTopicAuthorizationFailed: {
Code: ErrorCodeTopicAuthorizationFailed, Name: "TOPIC_AUTHORIZATION_FAILED",
Description: "Topic authorization failed", Retriable: false,
},
ErrorCodeGroupAuthorizationFailed: {
Code: ErrorCodeGroupAuthorizationFailed, Name: "GROUP_AUTHORIZATION_FAILED",
Description: "Group authorization failed", Retriable: false,
},
ErrorCodeUnsupportedVersion: {
Code: ErrorCodeUnsupportedVersion, Name: "UNSUPPORTED_VERSION",
Description: "Unsupported version", Retriable: false,
},
ErrorCodeTopicAlreadyExists: {
Code: ErrorCodeTopicAlreadyExists, Name: "TOPIC_ALREADY_EXISTS",
Description: "Topic already exists", Retriable: false,
},
ErrorCodeInvalidPartitions: {
Code: ErrorCodeInvalidPartitions, Name: "INVALID_PARTITIONS",
Description: "Invalid number of partitions", Retriable: false,
},
ErrorCodeInvalidReplicationFactor: {
Code: ErrorCodeInvalidReplicationFactor, Name: "INVALID_REPLICATION_FACTOR",
Description: "Invalid replication factor", Retriable: false,
},
ErrorCodeInvalidRecord: {
Code: ErrorCodeInvalidRecord, Name: "INVALID_RECORD",
Description: "Invalid record", Retriable: false,
},
ErrorCodeConnectionRefused: {
Code: ErrorCodeConnectionRefused, Name: "CONNECTION_REFUSED",
Description: "Connection refused", Retriable: true,
},
ErrorCodeConnectionTimeout: {
Code: ErrorCodeConnectionTimeout, Name: "CONNECTION_TIMEOUT",
Description: "Connection timeout", Retriable: true,
},
ErrorCodeReadTimeout: {
Code: ErrorCodeReadTimeout, Name: "READ_TIMEOUT",
Description: "Read operation timeout", Retriable: true,
},
ErrorCodeWriteTimeout: {
Code: ErrorCodeWriteTimeout, Name: "WRITE_TIMEOUT",
Description: "Write operation timeout", Retriable: true,
},
ErrorCodeIllegalGeneration: {
Code: ErrorCodeIllegalGeneration, Name: "ILLEGAL_GENERATION",
Description: "Illegal generation", Retriable: false,
},
ErrorCodeInconsistentGroupProtocol: {
Code: ErrorCodeInconsistentGroupProtocol, Name: "INCONSISTENT_GROUP_PROTOCOL",
Description: "Inconsistent group protocol", Retriable: false,
},
ErrorCodeMemberIDRequired: {
Code: ErrorCodeMemberIDRequired, Name: "MEMBER_ID_REQUIRED",
Description: "Member ID required", Retriable: false,
},
ErrorCodeFencedInstanceID: {
Code: ErrorCodeFencedInstanceID, Name: "FENCED_INSTANCE_ID",
Description: "Instance ID fenced", Retriable: false,
},
ErrorCodeGroupMaxSizeReached: {
Code: ErrorCodeGroupMaxSizeReached, Name: "GROUP_MAX_SIZE_REACHED",
Description: "Group max size reached", Retriable: false,
},
ErrorCodeUnstableOffsetCommit: {
Code: ErrorCodeUnstableOffsetCommit, Name: "UNSTABLE_OFFSET_COMMIT",
Description: "Offset commit during rebalance", Retriable: true,
},
}
// GetErrorInfo returns error information for the given error code
func GetErrorInfo(code int16) ErrorInfo {
if info, exists := KafkaErrors[code]; exists {
return info
}
return ErrorInfo{
Code: code, Name: "UNKNOWN", Description: "Unknown error code", Retriable: false,
}
}
// IsRetriableError returns true if the error is retriable
func IsRetriableError(code int16) bool {
return GetErrorInfo(code).Retriable
}
// BuildErrorResponse builds a standard Kafka error response
func BuildErrorResponse(correlationID uint32, errorCode int16) []byte {
response := make([]byte, 0, 8)
// NOTE: Correlation ID is handled by writeResponseWithCorrelationID
// Do NOT include it in the response body
// Error code (2 bytes)
errorCodeBytes := make([]byte, 2)
binary.BigEndian.PutUint16(errorCodeBytes, uint16(errorCode))
response = append(response, errorCodeBytes...)
return response
}
// BuildErrorResponseWithMessage builds a Kafka error response with error message
func BuildErrorResponseWithMessage(correlationID uint32, errorCode int16, message string) []byte {
response := BuildErrorResponse(correlationID, errorCode)
// Error message (2 bytes length + message)
if message == "" {
response = append(response, 0xFF, 0xFF) // Null string
} else {
messageLen := uint16(len(message))
messageLenBytes := make([]byte, 2)
binary.BigEndian.PutUint16(messageLenBytes, messageLen)
response = append(response, messageLenBytes...)
response = append(response, []byte(message)...)
}
return response
}
// ClassifyNetworkError classifies network errors into appropriate Kafka error codes
func ClassifyNetworkError(err error) int16 {
if err == nil {
return ErrorCodeNone
}
// Check for network errors
if netErr, ok := err.(net.Error); ok {
if netErr.Timeout() {
return ErrorCodeRequestTimedOut
}
return ErrorCodeNetworkException
}
// Check for specific error types
switch err.Error() {
case "connection refused":
return ErrorCodeConnectionRefused
case "connection timeout":
return ErrorCodeConnectionTimeout
default:
return ErrorCodeUnknownServerError
}
}
// TimeoutConfig holds timeout configuration for connections and operations
type TimeoutConfig struct {
ConnectionTimeout time.Duration // Timeout for establishing connections
ReadTimeout time.Duration // Timeout for read operations
WriteTimeout time.Duration // Timeout for write operations
RequestTimeout time.Duration // Overall request timeout
}
// DefaultTimeoutConfig returns default timeout configuration
func DefaultTimeoutConfig() TimeoutConfig {
return TimeoutConfig{
ConnectionTimeout: 30 * time.Second,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
RequestTimeout: 30 * time.Second,
}
}
// HandleTimeoutError handles timeout errors and returns appropriate error code
func HandleTimeoutError(err error, operation string) int16 {
if err == nil {
return ErrorCodeNone
}
// Handle context timeout errors
if err == context.DeadlineExceeded {
switch operation {
case "read":
return ErrorCodeReadTimeout
case "write":
return ErrorCodeWriteTimeout
case "connect":
return ErrorCodeConnectionTimeout
default:
return ErrorCodeRequestTimedOut
}
}
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
switch operation {
case "read":
return ErrorCodeReadTimeout
case "write":
return ErrorCodeWriteTimeout
case "connect":
return ErrorCodeConnectionTimeout
default:
return ErrorCodeRequestTimedOut
}
}
return ClassifyNetworkError(err)
}
|