aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/integration/broker_error_mapping_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/integration/broker_error_mapping_test.go')
-rw-r--r--weed/mq/kafka/integration/broker_error_mapping_test.go169
1 files changed, 169 insertions, 0 deletions
diff --git a/weed/mq/kafka/integration/broker_error_mapping_test.go b/weed/mq/kafka/integration/broker_error_mapping_test.go
new file mode 100644
index 000000000..2f4849833
--- /dev/null
+++ b/weed/mq/kafka/integration/broker_error_mapping_test.go
@@ -0,0 +1,169 @@
+package integration
+
+import (
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+func TestMapBrokerErrorToKafka(t *testing.T) {
+ tests := []struct {
+ name string
+ brokerErrorCode int32
+ expectedKafka int16
+ }{
+ {"No error", 0, kafkaErrorCodeNone},
+ {"Unknown server error", 1, kafkaErrorCodeUnknownServerError},
+ {"Topic not found", 2, kafkaErrorCodeUnknownTopicOrPartition},
+ {"Partition not found", 3, kafkaErrorCodeUnknownTopicOrPartition},
+ {"Not leader or follower", 6, kafkaErrorCodeNotLeaderOrFollower},
+ {"Request timed out", 7, kafkaErrorCodeRequestTimedOut},
+ {"Broker not available", 8, kafkaErrorCodeBrokerNotAvailable},
+ {"Message too large", 10, kafkaErrorCodeMessageTooLarge},
+ {"Network exception", 13, kafkaErrorCodeNetworkException},
+ {"Offset load in progress", 14, kafkaErrorCodeOffsetLoadInProgress},
+ {"Invalid record", 42, kafkaErrorCodeInvalidRecord},
+ {"Topic already exists", 36, kafkaErrorCodeTopicAlreadyExists},
+ {"Invalid partitions", 37, kafkaErrorCodeInvalidPartitions},
+ {"Invalid config", 40, kafkaErrorCodeInvalidConfig},
+ {"Publisher not found", 100, kafkaErrorCodeUnknownServerError},
+ {"Connection failed", 101, kafkaErrorCodeNetworkException},
+ {"Follower connection failed", 102, kafkaErrorCodeNetworkException},
+ {"Unknown error code", 999, kafkaErrorCodeUnknownServerError},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := MapBrokerErrorToKafka(tt.brokerErrorCode)
+ if result != tt.expectedKafka {
+ t.Errorf("MapBrokerErrorToKafka(%d) = %d, want %d", tt.brokerErrorCode, result, tt.expectedKafka)
+ }
+ })
+ }
+}
+
+func TestHandleBrokerResponse(t *testing.T) {
+ tests := []struct {
+ name string
+ response *mq_pb.PublishMessageResponse
+ expectedKafkaCode int16
+ expectedError string
+ expectSystemError bool
+ }{
+ {
+ name: "No error",
+ response: &mq_pb.PublishMessageResponse{
+ AckTsNs: 123,
+ Error: "",
+ ErrorCode: 0,
+ },
+ expectedKafkaCode: kafkaErrorCodeNone,
+ expectedError: "",
+ expectSystemError: false,
+ },
+ {
+ name: "Structured error - Not leader",
+ response: &mq_pb.PublishMessageResponse{
+ AckTsNs: 0,
+ Error: "not the leader for this partition, leader is: broker2:9092",
+ ErrorCode: 6, // BrokerErrorNotLeaderOrFollower
+ },
+ expectedKafkaCode: kafkaErrorCodeNotLeaderOrFollower,
+ expectedError: "not the leader for this partition, leader is: broker2:9092",
+ expectSystemError: false,
+ },
+ {
+ name: "Structured error - Topic not found",
+ response: &mq_pb.PublishMessageResponse{
+ AckTsNs: 0,
+ Error: "topic test-topic not found",
+ ErrorCode: 2, // BrokerErrorTopicNotFound
+ },
+ expectedKafkaCode: kafkaErrorCodeUnknownTopicOrPartition,
+ expectedError: "topic test-topic not found",
+ expectSystemError: false,
+ },
+ {
+ name: "Fallback string parsing - Not leader",
+ response: &mq_pb.PublishMessageResponse{
+ AckTsNs: 0,
+ Error: "not the leader for this partition",
+ ErrorCode: 0, // No structured error code
+ },
+ expectedKafkaCode: kafkaErrorCodeNotLeaderOrFollower,
+ expectedError: "not the leader for this partition",
+ expectSystemError: false,
+ },
+ {
+ name: "Fallback string parsing - Topic not found",
+ response: &mq_pb.PublishMessageResponse{
+ AckTsNs: 0,
+ Error: "topic does not exist",
+ ErrorCode: 0, // No structured error code
+ },
+ expectedKafkaCode: kafkaErrorCodeUnknownTopicOrPartition,
+ expectedError: "topic does not exist",
+ expectSystemError: false,
+ },
+ {
+ name: "Fallback string parsing - Unknown error",
+ response: &mq_pb.PublishMessageResponse{
+ AckTsNs: 0,
+ Error: "some unknown error occurred",
+ ErrorCode: 0, // No structured error code
+ },
+ expectedKafkaCode: kafkaErrorCodeUnknownServerError,
+ expectedError: "some unknown error occurred",
+ expectSystemError: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ kafkaCode, errorMsg, systemErr := HandleBrokerResponse(tt.response)
+
+ if kafkaCode != tt.expectedKafkaCode {
+ t.Errorf("HandleBrokerResponse() kafkaCode = %d, want %d", kafkaCode, tt.expectedKafkaCode)
+ }
+
+ if errorMsg != tt.expectedError {
+ t.Errorf("HandleBrokerResponse() errorMsg = %q, want %q", errorMsg, tt.expectedError)
+ }
+
+ if (systemErr != nil) != tt.expectSystemError {
+ t.Errorf("HandleBrokerResponse() systemErr = %v, expectSystemError = %v", systemErr, tt.expectSystemError)
+ }
+ })
+ }
+}
+
+func TestParseStringErrorToKafkaCode(t *testing.T) {
+ tests := []struct {
+ name string
+ errorMsg string
+ expectedCode int16
+ }{
+ {"Empty error", "", kafkaErrorCodeNone},
+ {"Not leader error", "not the leader for this partition", kafkaErrorCodeNotLeaderOrFollower},
+ {"Not leader error variant", "not leader", kafkaErrorCodeNotLeaderOrFollower},
+ {"Topic not found", "topic not found", kafkaErrorCodeUnknownTopicOrPartition},
+ {"Topic does not exist", "topic does not exist", kafkaErrorCodeUnknownTopicOrPartition},
+ {"Partition not found", "partition not found", kafkaErrorCodeUnknownTopicOrPartition},
+ {"Timeout error", "request timed out", kafkaErrorCodeRequestTimedOut},
+ {"Timeout error variant", "timeout occurred", kafkaErrorCodeRequestTimedOut},
+ {"Network error", "network exception", kafkaErrorCodeNetworkException},
+ {"Connection error", "connection failed", kafkaErrorCodeNetworkException},
+ {"Message too large", "message too large", kafkaErrorCodeMessageTooLarge},
+ {"Size error", "size exceeds limit", kafkaErrorCodeMessageTooLarge},
+ {"Unknown error", "some random error", kafkaErrorCodeUnknownServerError},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := parseStringErrorToKafkaCode(tt.errorMsg)
+ if result != tt.expectedCode {
+ t.Errorf("parseStringErrorToKafkaCode(%q) = %d, want %d", tt.errorMsg, result, tt.expectedCode)
+ }
+ })
+ }
+}