diff options
Diffstat (limited to 'weed/mq/kafka/schema/avro_decoder_test.go')
| -rw-r--r-- | weed/mq/kafka/schema/avro_decoder_test.go | 542 |
1 files changed, 542 insertions, 0 deletions
diff --git a/weed/mq/kafka/schema/avro_decoder_test.go b/weed/mq/kafka/schema/avro_decoder_test.go new file mode 100644 index 000000000..f34a0a800 --- /dev/null +++ b/weed/mq/kafka/schema/avro_decoder_test.go @@ -0,0 +1,542 @@ +package schema + +import ( + "reflect" + "testing" + "time" + + "github.com/linkedin/goavro/v2" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +func TestNewAvroDecoder(t *testing.T) { + tests := []struct { + name string + schema string + expectErr bool + }{ + { + name: "valid record schema", + schema: `{ + "type": "record", + "name": "User", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"} + ] + }`, + expectErr: false, + }, + { + name: "valid enum schema", + schema: `{ + "type": "enum", + "name": "Color", + "symbols": ["RED", "GREEN", "BLUE"] + }`, + expectErr: false, + }, + { + name: "invalid schema", + schema: `{"invalid": "schema"}`, + expectErr: true, + }, + { + name: "empty schema", + schema: "", + expectErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + decoder, err := NewAvroDecoder(tt.schema) + + if (err != nil) != tt.expectErr { + t.Errorf("NewAvroDecoder() error = %v, expectErr %v", err, tt.expectErr) + return + } + + if !tt.expectErr && decoder == nil { + t.Error("Expected non-nil decoder for valid schema") + } + }) + } +} + +func TestAvroDecoder_Decode(t *testing.T) { + schema := `{ + "type": "record", + "name": "User", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"}, + {"name": "email", "type": ["null", "string"], "default": null} + ] + }` + + decoder, err := NewAvroDecoder(schema) + if err != nil { + t.Fatalf("Failed to create decoder: %v", err) + } + + // Create test data + codec, _ := goavro.NewCodec(schema) + testRecord := map[string]interface{}{ + "id": int32(123), + "name": "John Doe", + "email": map[string]interface{}{ + "string": "john@example.com", // Avro union format + }, + } + + // Encode to binary + binary, err := codec.BinaryFromNative(nil, testRecord) + if err != nil { + t.Fatalf("Failed to encode test data: %v", err) + } + + // Test decoding + result, err := decoder.Decode(binary) + if err != nil { + t.Fatalf("Failed to decode: %v", err) + } + + // Verify results + if result["id"] != int32(123) { + t.Errorf("Expected id=123, got %v", result["id"]) + } + + if result["name"] != "John Doe" { + t.Errorf("Expected name='John Doe', got %v", result["name"]) + } + + // For union types, Avro returns a map with the type name as key + if emailMap, ok := result["email"].(map[string]interface{}); ok { + if emailMap["string"] != "john@example.com" { + t.Errorf("Expected email='john@example.com', got %v", emailMap["string"]) + } + } else { + t.Errorf("Expected email to be a union map, got %v", result["email"]) + } +} + +func TestAvroDecoder_DecodeToRecordValue(t *testing.T) { + schema := `{ + "type": "record", + "name": "SimpleRecord", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"} + ] + }` + + decoder, err := NewAvroDecoder(schema) + if err != nil { + t.Fatalf("Failed to create decoder: %v", err) + } + + // Create and encode test data + codec, _ := goavro.NewCodec(schema) + testRecord := map[string]interface{}{ + "id": int32(456), + "name": "Jane Smith", + } + + binary, err := codec.BinaryFromNative(nil, testRecord) + if err != nil { + t.Fatalf("Failed to encode test data: %v", err) + } + + // Test decoding to RecordValue + recordValue, err := decoder.DecodeToRecordValue(binary) + if err != nil { + t.Fatalf("Failed to decode to RecordValue: %v", err) + } + + // Verify RecordValue structure + if recordValue.Fields == nil { + t.Fatal("Expected non-nil fields") + } + + idValue := recordValue.Fields["id"] + if idValue == nil { + t.Fatal("Expected id field") + } + + if idValue.GetInt32Value() != 456 { + t.Errorf("Expected id=456, got %v", idValue.GetInt32Value()) + } + + nameValue := recordValue.Fields["name"] + if nameValue == nil { + t.Fatal("Expected name field") + } + + if nameValue.GetStringValue() != "Jane Smith" { + t.Errorf("Expected name='Jane Smith', got %v", nameValue.GetStringValue()) + } +} + +func TestMapToRecordValue(t *testing.T) { + testMap := map[string]interface{}{ + "bool_field": true, + "int32_field": int32(123), + "int64_field": int64(456), + "float_field": float32(1.23), + "double_field": float64(4.56), + "string_field": "hello", + "bytes_field": []byte("world"), + "null_field": nil, + "array_field": []interface{}{"a", "b", "c"}, + "nested_field": map[string]interface{}{ + "inner": "value", + }, + } + + recordValue := MapToRecordValue(testMap) + + // Test each field type + if !recordValue.Fields["bool_field"].GetBoolValue() { + t.Error("Expected bool_field=true") + } + + if recordValue.Fields["int32_field"].GetInt32Value() != 123 { + t.Error("Expected int32_field=123") + } + + if recordValue.Fields["int64_field"].GetInt64Value() != 456 { + t.Error("Expected int64_field=456") + } + + if recordValue.Fields["float_field"].GetFloatValue() != 1.23 { + t.Error("Expected float_field=1.23") + } + + if recordValue.Fields["double_field"].GetDoubleValue() != 4.56 { + t.Error("Expected double_field=4.56") + } + + if recordValue.Fields["string_field"].GetStringValue() != "hello" { + t.Error("Expected string_field='hello'") + } + + if string(recordValue.Fields["bytes_field"].GetBytesValue()) != "world" { + t.Error("Expected bytes_field='world'") + } + + // Test null value (converted to empty string) + if recordValue.Fields["null_field"].GetStringValue() != "" { + t.Error("Expected null_field to be empty string") + } + + // Test array + arrayValue := recordValue.Fields["array_field"].GetListValue() + if arrayValue == nil || len(arrayValue.Values) != 3 { + t.Error("Expected array with 3 elements") + } + + // Test nested record + nestedValue := recordValue.Fields["nested_field"].GetRecordValue() + if nestedValue == nil { + t.Fatal("Expected nested record") + } + + if nestedValue.Fields["inner"].GetStringValue() != "value" { + t.Error("Expected nested inner='value'") + } +} + +func TestGoValueToSchemaValue(t *testing.T) { + tests := []struct { + name string + input interface{} + expected func(*schema_pb.Value) bool + }{ + { + name: "nil value", + input: nil, + expected: func(v *schema_pb.Value) bool { + return v.GetStringValue() == "" + }, + }, + { + name: "bool value", + input: true, + expected: func(v *schema_pb.Value) bool { + return v.GetBoolValue() == true + }, + }, + { + name: "int32 value", + input: int32(123), + expected: func(v *schema_pb.Value) bool { + return v.GetInt32Value() == 123 + }, + }, + { + name: "int64 value", + input: int64(456), + expected: func(v *schema_pb.Value) bool { + return v.GetInt64Value() == 456 + }, + }, + { + name: "string value", + input: "test", + expected: func(v *schema_pb.Value) bool { + return v.GetStringValue() == "test" + }, + }, + { + name: "bytes value", + input: []byte("data"), + expected: func(v *schema_pb.Value) bool { + return string(v.GetBytesValue()) == "data" + }, + }, + { + name: "time value", + input: time.Unix(1234567890, 0), + expected: func(v *schema_pb.Value) bool { + return v.GetTimestampValue() != nil + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := goValueToSchemaValue(tt.input) + if !tt.expected(result) { + t.Errorf("goValueToSchemaValue() failed for %v", tt.input) + } + }) + } +} + +func TestInferRecordTypeFromMap(t *testing.T) { + testMap := map[string]interface{}{ + "id": int64(123), + "name": "test", + "active": true, + "score": float64(95.5), + "tags": []interface{}{"tag1", "tag2"}, + "metadata": map[string]interface{}{"key": "value"}, + } + + recordType := InferRecordTypeFromMap(testMap) + + if len(recordType.Fields) != 6 { + t.Errorf("Expected 6 fields, got %d", len(recordType.Fields)) + } + + // Create a map for easier field lookup + fieldMap := make(map[string]*schema_pb.Field) + for _, field := range recordType.Fields { + fieldMap[field.Name] = field + } + + // Test field types + if fieldMap["id"].Type.GetScalarType() != schema_pb.ScalarType_INT64 { + t.Error("Expected id field to be INT64") + } + + if fieldMap["name"].Type.GetScalarType() != schema_pb.ScalarType_STRING { + t.Error("Expected name field to be STRING") + } + + if fieldMap["active"].Type.GetScalarType() != schema_pb.ScalarType_BOOL { + t.Error("Expected active field to be BOOL") + } + + if fieldMap["score"].Type.GetScalarType() != schema_pb.ScalarType_DOUBLE { + t.Error("Expected score field to be DOUBLE") + } + + // Test array field + if fieldMap["tags"].Type.GetListType() == nil { + t.Error("Expected tags field to be LIST") + } + + // Test nested record field + if fieldMap["metadata"].Type.GetRecordType() == nil { + t.Error("Expected metadata field to be RECORD") + } +} + +func TestInferTypeFromValue(t *testing.T) { + tests := []struct { + name string + input interface{} + expected schema_pb.ScalarType + }{ + {"nil", nil, schema_pb.ScalarType_STRING}, // Default for nil + {"bool", true, schema_pb.ScalarType_BOOL}, + {"int32", int32(123), schema_pb.ScalarType_INT32}, + {"int64", int64(456), schema_pb.ScalarType_INT64}, + {"int", int(789), schema_pb.ScalarType_INT64}, + {"float32", float32(1.23), schema_pb.ScalarType_FLOAT}, + {"float64", float64(4.56), schema_pb.ScalarType_DOUBLE}, + {"string", "test", schema_pb.ScalarType_STRING}, + {"bytes", []byte("data"), schema_pb.ScalarType_BYTES}, + {"time", time.Now(), schema_pb.ScalarType_TIMESTAMP}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := inferTypeFromValue(tt.input) + + // Handle special cases + if tt.input == nil || reflect.TypeOf(tt.input).Kind() == reflect.Slice || + reflect.TypeOf(tt.input).Kind() == reflect.Map { + // Skip scalar type check for complex types + return + } + + if result.GetScalarType() != tt.expected { + t.Errorf("inferTypeFromValue() = %v, want %v", result.GetScalarType(), tt.expected) + } + }) + } +} + +// Integration test with real Avro data +func TestAvroDecoder_Integration(t *testing.T) { + // Complex Avro schema with nested records and arrays + schema := `{ + "type": "record", + "name": "Order", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "customer_id", "type": "int"}, + {"name": "total", "type": "double"}, + {"name": "items", "type": { + "type": "array", + "items": { + "type": "record", + "name": "Item", + "fields": [ + {"name": "product_id", "type": "string"}, + {"name": "quantity", "type": "int"}, + {"name": "price", "type": "double"} + ] + } + }}, + {"name": "metadata", "type": { + "type": "record", + "name": "Metadata", + "fields": [ + {"name": "source", "type": "string"}, + {"name": "timestamp", "type": "long"} + ] + }} + ] + }` + + decoder, err := NewAvroDecoder(schema) + if err != nil { + t.Fatalf("Failed to create decoder: %v", err) + } + + // Create complex test data + codec, _ := goavro.NewCodec(schema) + testOrder := map[string]interface{}{ + "id": "order-123", + "customer_id": int32(456), + "total": float64(99.99), + "items": []interface{}{ + map[string]interface{}{ + "product_id": "prod-1", + "quantity": int32(2), + "price": float64(29.99), + }, + map[string]interface{}{ + "product_id": "prod-2", + "quantity": int32(1), + "price": float64(39.99), + }, + }, + "metadata": map[string]interface{}{ + "source": "web", + "timestamp": int64(1234567890), + }, + } + + // Encode to binary + binary, err := codec.BinaryFromNative(nil, testOrder) + if err != nil { + t.Fatalf("Failed to encode test data: %v", err) + } + + // Decode to RecordValue + recordValue, err := decoder.DecodeToRecordValue(binary) + if err != nil { + t.Fatalf("Failed to decode to RecordValue: %v", err) + } + + // Verify complex structure + if recordValue.Fields["id"].GetStringValue() != "order-123" { + t.Error("Expected order ID to be preserved") + } + + if recordValue.Fields["customer_id"].GetInt32Value() != 456 { + t.Error("Expected customer ID to be preserved") + } + + // Check array handling + itemsArray := recordValue.Fields["items"].GetListValue() + if itemsArray == nil || len(itemsArray.Values) != 2 { + t.Fatal("Expected items array with 2 elements") + } + + // Check nested record handling + metadataRecord := recordValue.Fields["metadata"].GetRecordValue() + if metadataRecord == nil { + t.Fatal("Expected metadata record") + } + + if metadataRecord.Fields["source"].GetStringValue() != "web" { + t.Error("Expected metadata source to be preserved") + } +} + +// Benchmark tests +func BenchmarkAvroDecoder_Decode(b *testing.B) { + schema := `{ + "type": "record", + "name": "User", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"} + ] + }` + + decoder, _ := NewAvroDecoder(schema) + codec, _ := goavro.NewCodec(schema) + + testRecord := map[string]interface{}{ + "id": int32(123), + "name": "John Doe", + } + + binary, _ := codec.BinaryFromNative(nil, testRecord) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = decoder.Decode(binary) + } +} + +func BenchmarkMapToRecordValue(b *testing.B) { + testMap := map[string]interface{}{ + "id": int64(123), + "name": "test", + "active": true, + "score": float64(95.5), + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = MapToRecordValue(testMap) + } +} |
