aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/integration/broker_error_mapping.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/integration/broker_error_mapping.go')
-rw-r--r--weed/mq/kafka/integration/broker_error_mapping.go124
1 files changed, 124 insertions, 0 deletions
diff --git a/weed/mq/kafka/integration/broker_error_mapping.go b/weed/mq/kafka/integration/broker_error_mapping.go
new file mode 100644
index 000000000..61476eeb0
--- /dev/null
+++ b/weed/mq/kafka/integration/broker_error_mapping.go
@@ -0,0 +1,124 @@
+package integration
+
+import (
+ "strings"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+// Kafka Protocol Error Codes (copied from protocol package to avoid import cycle)
+const (
+ kafkaErrorCodeNone int16 = 0
+ kafkaErrorCodeUnknownServerError int16 = 1
+ kafkaErrorCodeUnknownTopicOrPartition int16 = 3
+ kafkaErrorCodeNotLeaderOrFollower int16 = 6
+ kafkaErrorCodeRequestTimedOut int16 = 7
+ kafkaErrorCodeBrokerNotAvailable int16 = 8
+ kafkaErrorCodeMessageTooLarge int16 = 10
+ kafkaErrorCodeNetworkException int16 = 13
+ kafkaErrorCodeOffsetLoadInProgress int16 = 14
+ kafkaErrorCodeTopicAlreadyExists int16 = 36
+ kafkaErrorCodeInvalidPartitions int16 = 37
+ kafkaErrorCodeInvalidConfig int16 = 40
+ kafkaErrorCodeInvalidRecord int16 = 42
+)
+
+// MapBrokerErrorToKafka maps a broker error code to the corresponding Kafka protocol error code
+func MapBrokerErrorToKafka(brokerErrorCode int32) int16 {
+ switch brokerErrorCode {
+ case 0: // BrokerErrorNone
+ return kafkaErrorCodeNone
+ case 1: // BrokerErrorUnknownServerError
+ return kafkaErrorCodeUnknownServerError
+ case 2: // BrokerErrorTopicNotFound
+ return kafkaErrorCodeUnknownTopicOrPartition
+ case 3: // BrokerErrorPartitionNotFound
+ return kafkaErrorCodeUnknownTopicOrPartition
+ case 6: // BrokerErrorNotLeaderOrFollower
+ return kafkaErrorCodeNotLeaderOrFollower
+ case 7: // BrokerErrorRequestTimedOut
+ return kafkaErrorCodeRequestTimedOut
+ case 8: // BrokerErrorBrokerNotAvailable
+ return kafkaErrorCodeBrokerNotAvailable
+ case 10: // BrokerErrorMessageTooLarge
+ return kafkaErrorCodeMessageTooLarge
+ case 13: // BrokerErrorNetworkException
+ return kafkaErrorCodeNetworkException
+ case 14: // BrokerErrorOffsetLoadInProgress
+ return kafkaErrorCodeOffsetLoadInProgress
+ case 42: // BrokerErrorInvalidRecord
+ return kafkaErrorCodeInvalidRecord
+ case 36: // BrokerErrorTopicAlreadyExists
+ return kafkaErrorCodeTopicAlreadyExists
+ case 37: // BrokerErrorInvalidPartitions
+ return kafkaErrorCodeInvalidPartitions
+ case 40: // BrokerErrorInvalidConfig
+ return kafkaErrorCodeInvalidConfig
+ case 100: // BrokerErrorPublisherNotFound
+ return kafkaErrorCodeUnknownServerError
+ case 101: // BrokerErrorConnectionFailed
+ return kafkaErrorCodeNetworkException
+ case 102: // BrokerErrorFollowerConnectionFailed
+ return kafkaErrorCodeNetworkException
+ default:
+ // Unknown broker error code, default to unknown server error
+ return kafkaErrorCodeUnknownServerError
+ }
+}
+
+// HandleBrokerResponse processes a broker response and returns appropriate error information
+// Returns (kafkaErrorCode, errorMessage, error) where error is non-nil for system errors
+func HandleBrokerResponse(resp *mq_pb.PublishMessageResponse) (int16, string, error) {
+ if resp.Error == "" && resp.ErrorCode == 0 {
+ // No error
+ return kafkaErrorCodeNone, "", nil
+ }
+
+ // Use structured error code if available, otherwise fall back to string parsing
+ if resp.ErrorCode != 0 {
+ kafkaErrorCode := MapBrokerErrorToKafka(resp.ErrorCode)
+ return kafkaErrorCode, resp.Error, nil
+ }
+
+ // Fallback: parse string error for backward compatibility
+ // This handles cases where older brokers might not set ErrorCode
+ kafkaErrorCode := parseStringErrorToKafkaCode(resp.Error)
+ return kafkaErrorCode, resp.Error, nil
+}
+
+// parseStringErrorToKafkaCode provides backward compatibility for string-based error parsing
+// This is the old brittle approach that we're replacing with structured error codes
+func parseStringErrorToKafkaCode(errorMsg string) int16 {
+ if errorMsg == "" {
+ return kafkaErrorCodeNone
+ }
+
+ // Check for common error patterns (brittle string matching)
+ switch {
+ case containsAny(errorMsg, "not the leader", "not leader"):
+ return kafkaErrorCodeNotLeaderOrFollower
+ case containsAny(errorMsg, "topic", "not found", "does not exist"):
+ return kafkaErrorCodeUnknownTopicOrPartition
+ case containsAny(errorMsg, "partition", "not found"):
+ return kafkaErrorCodeUnknownTopicOrPartition
+ case containsAny(errorMsg, "timeout", "timed out"):
+ return kafkaErrorCodeRequestTimedOut
+ case containsAny(errorMsg, "network", "connection"):
+ return kafkaErrorCodeNetworkException
+ case containsAny(errorMsg, "too large", "size"):
+ return kafkaErrorCodeMessageTooLarge
+ default:
+ return kafkaErrorCodeUnknownServerError
+ }
+}
+
+// containsAny checks if the text contains any of the given substrings (case-insensitive)
+func containsAny(text string, substrings ...string) bool {
+ textLower := strings.ToLower(text)
+ for _, substr := range substrings {
+ if strings.Contains(textLower, strings.ToLower(substr)) {
+ return true
+ }
+ }
+ return false
+}