aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/errors.go
blob: 93bc85c80d62ff47505ce51adb0546baac4a8667 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
package protocol

import (
	"context"
	"encoding/binary"
	"net"
	"time"
)

// Kafka Protocol Error Codes
// Based on Apache Kafka protocol specification
const (
	// Success
	ErrorCodeNone int16 = 0

	// General server errors
	ErrorCodeUnknownServerError           int16 = -1
	ErrorCodeOffsetOutOfRange             int16 = 1
	ErrorCodeCorruptMessage               int16 = 3 // Also UNKNOWN_TOPIC_OR_PARTITION
	ErrorCodeUnknownTopicOrPartition      int16 = 3
	ErrorCodeInvalidFetchSize             int16 = 4
	ErrorCodeLeaderNotAvailable           int16 = 5
	ErrorCodeNotLeaderOrFollower          int16 = 6 // Formerly NOT_LEADER_FOR_PARTITION
	ErrorCodeRequestTimedOut              int16 = 7
	ErrorCodeBrokerNotAvailable           int16 = 8
	ErrorCodeReplicaNotAvailable          int16 = 9
	ErrorCodeMessageTooLarge              int16 = 10
	ErrorCodeStaleControllerEpoch         int16 = 11
	ErrorCodeOffsetMetadataTooLarge       int16 = 12
	ErrorCodeNetworkException             int16 = 13
	ErrorCodeOffsetLoadInProgress         int16 = 14
	ErrorCodeGroupLoadInProgress          int16 = 15
	ErrorCodeNotCoordinatorForGroup       int16 = 16
	ErrorCodeNotCoordinatorForTransaction int16 = 17

	// Consumer group coordination errors
	ErrorCodeIllegalGeneration          int16 = 22
	ErrorCodeInconsistentGroupProtocol  int16 = 23
	ErrorCodeInvalidGroupID             int16 = 24
	ErrorCodeUnknownMemberID            int16 = 25
	ErrorCodeInvalidSessionTimeout      int16 = 26
	ErrorCodeRebalanceInProgress        int16 = 27
	ErrorCodeInvalidCommitOffsetSize    int16 = 28
	ErrorCodeTopicAuthorizationFailed   int16 = 29
	ErrorCodeGroupAuthorizationFailed   int16 = 30
	ErrorCodeClusterAuthorizationFailed int16 = 31
	ErrorCodeInvalidTimestamp           int16 = 32
	ErrorCodeUnsupportedSASLMechanism   int16 = 33
	ErrorCodeIllegalSASLState           int16 = 34
	ErrorCodeUnsupportedVersion         int16 = 35

	// Topic management errors
	ErrorCodeTopicAlreadyExists        int16 = 36
	ErrorCodeInvalidPartitions         int16 = 37
	ErrorCodeInvalidReplicationFactor  int16 = 38
	ErrorCodeInvalidReplicaAssignment  int16 = 39
	ErrorCodeInvalidConfig             int16 = 40
	ErrorCodeNotController             int16 = 41
	ErrorCodeInvalidRecord             int16 = 42
	ErrorCodePolicyViolation           int16 = 43
	ErrorCodeOutOfOrderSequenceNumber  int16 = 44
	ErrorCodeDuplicateSequenceNumber   int16 = 45
	ErrorCodeInvalidProducerEpoch      int16 = 46
	ErrorCodeInvalidTxnState           int16 = 47
	ErrorCodeInvalidProducerIDMapping  int16 = 48
	ErrorCodeInvalidTransactionTimeout int16 = 49
	ErrorCodeConcurrentTransactions    int16 = 50

	// Connection and timeout errors
	ErrorCodeConnectionRefused int16 = 60 // Custom for connection issues
	ErrorCodeConnectionTimeout int16 = 61 // Custom for connection timeouts
	ErrorCodeReadTimeout       int16 = 62 // Custom for read timeouts
	ErrorCodeWriteTimeout      int16 = 63 // Custom for write timeouts

	// Consumer group specific errors
	ErrorCodeMemberIDRequired     int16 = 79
	ErrorCodeFencedInstanceID     int16 = 82
	ErrorCodeGroupMaxSizeReached  int16 = 84
	ErrorCodeUnstableOffsetCommit int16 = 95
)

// ErrorInfo contains metadata about a Kafka error
type ErrorInfo struct {
	Code        int16
	Name        string
	Description string
	Retriable   bool
}

// KafkaErrors maps error codes to their metadata
var KafkaErrors = map[int16]ErrorInfo{
	ErrorCodeNone: {
		Code: ErrorCodeNone, Name: "NONE", Description: "No error", Retriable: false,
	},
	ErrorCodeUnknownServerError: {
		Code: ErrorCodeUnknownServerError, Name: "UNKNOWN_SERVER_ERROR",
		Description: "Unknown server error", Retriable: true,
	},
	ErrorCodeOffsetOutOfRange: {
		Code: ErrorCodeOffsetOutOfRange, Name: "OFFSET_OUT_OF_RANGE",
		Description: "Offset out of range", Retriable: false,
	},
	ErrorCodeUnknownTopicOrPartition: {
		Code: ErrorCodeUnknownTopicOrPartition, Name: "UNKNOWN_TOPIC_OR_PARTITION",
		Description: "Topic or partition does not exist", Retriable: false,
	},
	ErrorCodeInvalidFetchSize: {
		Code: ErrorCodeInvalidFetchSize, Name: "INVALID_FETCH_SIZE",
		Description: "Invalid fetch size", Retriable: false,
	},
	ErrorCodeLeaderNotAvailable: {
		Code: ErrorCodeLeaderNotAvailable, Name: "LEADER_NOT_AVAILABLE",
		Description: "Leader not available", Retriable: true,
	},
	ErrorCodeNotLeaderOrFollower: {
		Code: ErrorCodeNotLeaderOrFollower, Name: "NOT_LEADER_OR_FOLLOWER",
		Description: "Not leader or follower", Retriable: true,
	},
	ErrorCodeRequestTimedOut: {
		Code: ErrorCodeRequestTimedOut, Name: "REQUEST_TIMED_OUT",
		Description: "Request timed out", Retriable: true,
	},
	ErrorCodeBrokerNotAvailable: {
		Code: ErrorCodeBrokerNotAvailable, Name: "BROKER_NOT_AVAILABLE",
		Description: "Broker not available", Retriable: true,
	},
	ErrorCodeMessageTooLarge: {
		Code: ErrorCodeMessageTooLarge, Name: "MESSAGE_TOO_LARGE",
		Description: "Message size exceeds limit", Retriable: false,
	},
	ErrorCodeOffsetMetadataTooLarge: {
		Code: ErrorCodeOffsetMetadataTooLarge, Name: "OFFSET_METADATA_TOO_LARGE",
		Description: "Offset metadata too large", Retriable: false,
	},
	ErrorCodeNetworkException: {
		Code: ErrorCodeNetworkException, Name: "NETWORK_EXCEPTION",
		Description: "Network error", Retriable: true,
	},
	ErrorCodeOffsetLoadInProgress: {
		Code: ErrorCodeOffsetLoadInProgress, Name: "OFFSET_LOAD_IN_PROGRESS",
		Description: "Offset load in progress", Retriable: true,
	},
	ErrorCodeNotCoordinatorForGroup: {
		Code: ErrorCodeNotCoordinatorForGroup, Name: "NOT_COORDINATOR_FOR_GROUP",
		Description: "Not coordinator for group", Retriable: true,
	},
	ErrorCodeInvalidGroupID: {
		Code: ErrorCodeInvalidGroupID, Name: "INVALID_GROUP_ID",
		Description: "Invalid group ID", Retriable: false,
	},
	ErrorCodeUnknownMemberID: {
		Code: ErrorCodeUnknownMemberID, Name: "UNKNOWN_MEMBER_ID",
		Description: "Unknown member ID", Retriable: false,
	},
	ErrorCodeInvalidSessionTimeout: {
		Code: ErrorCodeInvalidSessionTimeout, Name: "INVALID_SESSION_TIMEOUT",
		Description: "Invalid session timeout", Retriable: false,
	},
	ErrorCodeRebalanceInProgress: {
		Code: ErrorCodeRebalanceInProgress, Name: "REBALANCE_IN_PROGRESS",
		Description: "Group rebalance in progress", Retriable: true,
	},
	ErrorCodeInvalidCommitOffsetSize: {
		Code: ErrorCodeInvalidCommitOffsetSize, Name: "INVALID_COMMIT_OFFSET_SIZE",
		Description: "Invalid commit offset size", Retriable: false,
	},
	ErrorCodeTopicAuthorizationFailed: {
		Code: ErrorCodeTopicAuthorizationFailed, Name: "TOPIC_AUTHORIZATION_FAILED",
		Description: "Topic authorization failed", Retriable: false,
	},
	ErrorCodeGroupAuthorizationFailed: {
		Code: ErrorCodeGroupAuthorizationFailed, Name: "GROUP_AUTHORIZATION_FAILED",
		Description: "Group authorization failed", Retriable: false,
	},
	ErrorCodeUnsupportedVersion: {
		Code: ErrorCodeUnsupportedVersion, Name: "UNSUPPORTED_VERSION",
		Description: "Unsupported version", Retriable: false,
	},
	ErrorCodeTopicAlreadyExists: {
		Code: ErrorCodeTopicAlreadyExists, Name: "TOPIC_ALREADY_EXISTS",
		Description: "Topic already exists", Retriable: false,
	},
	ErrorCodeInvalidPartitions: {
		Code: ErrorCodeInvalidPartitions, Name: "INVALID_PARTITIONS",
		Description: "Invalid number of partitions", Retriable: false,
	},
	ErrorCodeInvalidReplicationFactor: {
		Code: ErrorCodeInvalidReplicationFactor, Name: "INVALID_REPLICATION_FACTOR",
		Description: "Invalid replication factor", Retriable: false,
	},
	ErrorCodeInvalidRecord: {
		Code: ErrorCodeInvalidRecord, Name: "INVALID_RECORD",
		Description: "Invalid record", Retriable: false,
	},
	ErrorCodeConnectionRefused: {
		Code: ErrorCodeConnectionRefused, Name: "CONNECTION_REFUSED",
		Description: "Connection refused", Retriable: true,
	},
	ErrorCodeConnectionTimeout: {
		Code: ErrorCodeConnectionTimeout, Name: "CONNECTION_TIMEOUT",
		Description: "Connection timeout", Retriable: true,
	},
	ErrorCodeReadTimeout: {
		Code: ErrorCodeReadTimeout, Name: "READ_TIMEOUT",
		Description: "Read operation timeout", Retriable: true,
	},
	ErrorCodeWriteTimeout: {
		Code: ErrorCodeWriteTimeout, Name: "WRITE_TIMEOUT",
		Description: "Write operation timeout", Retriable: true,
	},
	ErrorCodeIllegalGeneration: {
		Code: ErrorCodeIllegalGeneration, Name: "ILLEGAL_GENERATION",
		Description: "Illegal generation", Retriable: false,
	},
	ErrorCodeInconsistentGroupProtocol: {
		Code: ErrorCodeInconsistentGroupProtocol, Name: "INCONSISTENT_GROUP_PROTOCOL",
		Description: "Inconsistent group protocol", Retriable: false,
	},
	ErrorCodeMemberIDRequired: {
		Code: ErrorCodeMemberIDRequired, Name: "MEMBER_ID_REQUIRED",
		Description: "Member ID required", Retriable: false,
	},
	ErrorCodeFencedInstanceID: {
		Code: ErrorCodeFencedInstanceID, Name: "FENCED_INSTANCE_ID",
		Description: "Instance ID fenced", Retriable: false,
	},
	ErrorCodeGroupMaxSizeReached: {
		Code: ErrorCodeGroupMaxSizeReached, Name: "GROUP_MAX_SIZE_REACHED",
		Description: "Group max size reached", Retriable: false,
	},
	ErrorCodeUnstableOffsetCommit: {
		Code: ErrorCodeUnstableOffsetCommit, Name: "UNSTABLE_OFFSET_COMMIT",
		Description: "Offset commit during rebalance", Retriable: true,
	},
}

// GetErrorInfo returns error information for the given error code
func GetErrorInfo(code int16) ErrorInfo {
	if info, exists := KafkaErrors[code]; exists {
		return info
	}
	return ErrorInfo{
		Code: code, Name: "UNKNOWN", Description: "Unknown error code", Retriable: false,
	}
}

// IsRetriableError returns true if the error is retriable
func IsRetriableError(code int16) bool {
	return GetErrorInfo(code).Retriable
}

// BuildErrorResponse builds a standard Kafka error response
func BuildErrorResponse(correlationID uint32, errorCode int16) []byte {
	response := make([]byte, 0, 8)

	// NOTE: Correlation ID is handled by writeResponseWithCorrelationID
	// Do NOT include it in the response body

	// Error code (2 bytes)
	errorCodeBytes := make([]byte, 2)
	binary.BigEndian.PutUint16(errorCodeBytes, uint16(errorCode))
	response = append(response, errorCodeBytes...)

	return response
}

// BuildErrorResponseWithMessage builds a Kafka error response with error message
func BuildErrorResponseWithMessage(correlationID uint32, errorCode int16, message string) []byte {
	response := BuildErrorResponse(correlationID, errorCode)

	// Error message (2 bytes length + message)
	if message == "" {
		response = append(response, 0xFF, 0xFF) // Null string
	} else {
		messageLen := uint16(len(message))
		messageLenBytes := make([]byte, 2)
		binary.BigEndian.PutUint16(messageLenBytes, messageLen)
		response = append(response, messageLenBytes...)
		response = append(response, []byte(message)...)
	}

	return response
}

// ClassifyNetworkError classifies network errors into appropriate Kafka error codes
func ClassifyNetworkError(err error) int16 {
	if err == nil {
		return ErrorCodeNone
	}

	// Check for network errors
	if netErr, ok := err.(net.Error); ok {
		if netErr.Timeout() {
			return ErrorCodeRequestTimedOut
		}
		return ErrorCodeNetworkException
	}

	// Check for specific error types
	switch err.Error() {
	case "connection refused":
		return ErrorCodeConnectionRefused
	case "connection timeout":
		return ErrorCodeConnectionTimeout
	default:
		return ErrorCodeUnknownServerError
	}
}

// TimeoutConfig holds timeout configuration for connections and operations
type TimeoutConfig struct {
	ConnectionTimeout time.Duration // Timeout for establishing connections
	ReadTimeout       time.Duration // Timeout for read operations
	WriteTimeout      time.Duration // Timeout for write operations
	RequestTimeout    time.Duration // Overall request timeout
}

// DefaultTimeoutConfig returns default timeout configuration
func DefaultTimeoutConfig() TimeoutConfig {
	return TimeoutConfig{
		ConnectionTimeout: 30 * time.Second,
		ReadTimeout:       10 * time.Second,
		WriteTimeout:      10 * time.Second,
		RequestTimeout:    30 * time.Second,
	}
}

// HandleTimeoutError handles timeout errors and returns appropriate error code
func HandleTimeoutError(err error, operation string) int16 {
	if err == nil {
		return ErrorCodeNone
	}

	// Handle context timeout errors
	if err == context.DeadlineExceeded {
		switch operation {
		case "read":
			return ErrorCodeReadTimeout
		case "write":
			return ErrorCodeWriteTimeout
		case "connect":
			return ErrorCodeConnectionTimeout
		default:
			return ErrorCodeRequestTimedOut
		}
	}

	if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
		switch operation {
		case "read":
			return ErrorCodeReadTimeout
		case "write":
			return ErrorCodeWriteTimeout
		case "connect":
			return ErrorCodeConnectionTimeout
		default:
			return ErrorCodeRequestTimedOut
		}
	}

	return ClassifyNetworkError(err)
}