aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/integration/broker_error_mapping.go
blob: 61476eeb06739f0f6e06eb9567a270bdf81c8a79 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package integration

import (
	"strings"

	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)

// Kafka Protocol Error Codes (copied from protocol package to avoid import cycle)
const (
	kafkaErrorCodeNone                    int16 = 0
	kafkaErrorCodeUnknownServerError      int16 = 1
	kafkaErrorCodeUnknownTopicOrPartition int16 = 3
	kafkaErrorCodeNotLeaderOrFollower     int16 = 6
	kafkaErrorCodeRequestTimedOut         int16 = 7
	kafkaErrorCodeBrokerNotAvailable      int16 = 8
	kafkaErrorCodeMessageTooLarge         int16 = 10
	kafkaErrorCodeNetworkException        int16 = 13
	kafkaErrorCodeOffsetLoadInProgress    int16 = 14
	kafkaErrorCodeTopicAlreadyExists      int16 = 36
	kafkaErrorCodeInvalidPartitions       int16 = 37
	kafkaErrorCodeInvalidConfig           int16 = 40
	kafkaErrorCodeInvalidRecord           int16 = 42
)

// MapBrokerErrorToKafka maps a broker error code to the corresponding Kafka protocol error code
func MapBrokerErrorToKafka(brokerErrorCode int32) int16 {
	switch brokerErrorCode {
	case 0: // BrokerErrorNone
		return kafkaErrorCodeNone
	case 1: // BrokerErrorUnknownServerError
		return kafkaErrorCodeUnknownServerError
	case 2: // BrokerErrorTopicNotFound
		return kafkaErrorCodeUnknownTopicOrPartition
	case 3: // BrokerErrorPartitionNotFound
		return kafkaErrorCodeUnknownTopicOrPartition
	case 6: // BrokerErrorNotLeaderOrFollower
		return kafkaErrorCodeNotLeaderOrFollower
	case 7: // BrokerErrorRequestTimedOut
		return kafkaErrorCodeRequestTimedOut
	case 8: // BrokerErrorBrokerNotAvailable
		return kafkaErrorCodeBrokerNotAvailable
	case 10: // BrokerErrorMessageTooLarge
		return kafkaErrorCodeMessageTooLarge
	case 13: // BrokerErrorNetworkException
		return kafkaErrorCodeNetworkException
	case 14: // BrokerErrorOffsetLoadInProgress
		return kafkaErrorCodeOffsetLoadInProgress
	case 42: // BrokerErrorInvalidRecord
		return kafkaErrorCodeInvalidRecord
	case 36: // BrokerErrorTopicAlreadyExists
		return kafkaErrorCodeTopicAlreadyExists
	case 37: // BrokerErrorInvalidPartitions
		return kafkaErrorCodeInvalidPartitions
	case 40: // BrokerErrorInvalidConfig
		return kafkaErrorCodeInvalidConfig
	case 100: // BrokerErrorPublisherNotFound
		return kafkaErrorCodeUnknownServerError
	case 101: // BrokerErrorConnectionFailed
		return kafkaErrorCodeNetworkException
	case 102: // BrokerErrorFollowerConnectionFailed
		return kafkaErrorCodeNetworkException
	default:
		// Unknown broker error code, default to unknown server error
		return kafkaErrorCodeUnknownServerError
	}
}

// HandleBrokerResponse processes a broker response and returns appropriate error information
// Returns (kafkaErrorCode, errorMessage, error) where error is non-nil for system errors
func HandleBrokerResponse(resp *mq_pb.PublishMessageResponse) (int16, string, error) {
	if resp.Error == "" && resp.ErrorCode == 0 {
		// No error
		return kafkaErrorCodeNone, "", nil
	}

	// Use structured error code if available, otherwise fall back to string parsing
	if resp.ErrorCode != 0 {
		kafkaErrorCode := MapBrokerErrorToKafka(resp.ErrorCode)
		return kafkaErrorCode, resp.Error, nil
	}

	// Fallback: parse string error for backward compatibility
	// This handles cases where older brokers might not set ErrorCode
	kafkaErrorCode := parseStringErrorToKafkaCode(resp.Error)
	return kafkaErrorCode, resp.Error, nil
}

// parseStringErrorToKafkaCode provides backward compatibility for string-based error parsing
// This is the old brittle approach that we're replacing with structured error codes
func parseStringErrorToKafkaCode(errorMsg string) int16 {
	if errorMsg == "" {
		return kafkaErrorCodeNone
	}

	// Check for common error patterns (brittle string matching)
	switch {
	case containsAny(errorMsg, "not the leader", "not leader"):
		return kafkaErrorCodeNotLeaderOrFollower
	case containsAny(errorMsg, "topic", "not found", "does not exist"):
		return kafkaErrorCodeUnknownTopicOrPartition
	case containsAny(errorMsg, "partition", "not found"):
		return kafkaErrorCodeUnknownTopicOrPartition
	case containsAny(errorMsg, "timeout", "timed out"):
		return kafkaErrorCodeRequestTimedOut
	case containsAny(errorMsg, "network", "connection"):
		return kafkaErrorCodeNetworkException
	case containsAny(errorMsg, "too large", "size"):
		return kafkaErrorCodeMessageTooLarge
	default:
		return kafkaErrorCodeUnknownServerError
	}
}

// containsAny checks if the text contains any of the given substrings (case-insensitive)
func containsAny(text string, substrings ...string) bool {
	textLower := strings.ToLower(text)
	for _, substr := range substrings {
		if strings.Contains(textLower, strings.ToLower(substr)) {
			return true
		}
	}
	return false
}