diff options
Diffstat (limited to 'weed/mq/kafka/schema/avro_decoder.go')
| -rw-r--r-- | weed/mq/kafka/schema/avro_decoder.go | 719 |
1 files changed, 719 insertions, 0 deletions
diff --git a/weed/mq/kafka/schema/avro_decoder.go b/weed/mq/kafka/schema/avro_decoder.go new file mode 100644 index 000000000..f40236a81 --- /dev/null +++ b/weed/mq/kafka/schema/avro_decoder.go @@ -0,0 +1,719 @@ +package schema + +import ( + "encoding/json" + "fmt" + "reflect" + "time" + + "github.com/linkedin/goavro/v2" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// AvroDecoder handles Avro schema decoding and conversion to SeaweedMQ format +type AvroDecoder struct { + codec *goavro.Codec +} + +// NewAvroDecoder creates a new Avro decoder from a schema string +func NewAvroDecoder(schemaStr string) (*AvroDecoder, error) { + codec, err := goavro.NewCodec(schemaStr) + if err != nil { + return nil, fmt.Errorf("failed to create Avro codec: %w", err) + } + + return &AvroDecoder{ + codec: codec, + }, nil +} + +// Decode decodes Avro binary data to a Go map +func (ad *AvroDecoder) Decode(data []byte) (map[string]interface{}, error) { + native, _, err := ad.codec.NativeFromBinary(data) + if err != nil { + return nil, fmt.Errorf("failed to decode Avro data: %w", err) + } + + // Convert to map[string]interface{} for easier processing + result, ok := native.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("expected Avro record, got %T", native) + } + + return result, nil +} + +// DecodeToRecordValue decodes Avro data directly to SeaweedMQ RecordValue +func (ad *AvroDecoder) DecodeToRecordValue(data []byte) (*schema_pb.RecordValue, error) { + nativeMap, err := ad.Decode(data) + if err != nil { + return nil, err + } + + return MapToRecordValue(nativeMap), nil +} + +// InferRecordType infers a SeaweedMQ RecordType from an Avro schema +func (ad *AvroDecoder) InferRecordType() (*schema_pb.RecordType, error) { + schema := ad.codec.Schema() + return avroSchemaToRecordType(schema) +} + +// MapToRecordValue converts a Go map to SeaweedMQ RecordValue +func MapToRecordValue(m map[string]interface{}) *schema_pb.RecordValue { + fields := make(map[string]*schema_pb.Value) + + for key, value := range m { + fields[key] = goValueToSchemaValue(value) + } + + return &schema_pb.RecordValue{ + Fields: fields, + } +} + +// goValueToSchemaValue converts a Go value to a SeaweedMQ Value +func goValueToSchemaValue(value interface{}) *schema_pb.Value { + if value == nil { + // For null values, use an empty string as default + return &schema_pb.Value{ + Kind: &schema_pb.Value_StringValue{StringValue: ""}, + } + } + + switch v := value.(type) { + case bool: + return &schema_pb.Value{ + Kind: &schema_pb.Value_BoolValue{BoolValue: v}, + } + case int32: + return &schema_pb.Value{ + Kind: &schema_pb.Value_Int32Value{Int32Value: v}, + } + case int64: + return &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: v}, + } + case int: + return &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: int64(v)}, + } + case float32: + return &schema_pb.Value{ + Kind: &schema_pb.Value_FloatValue{FloatValue: v}, + } + case float64: + return &schema_pb.Value{ + Kind: &schema_pb.Value_DoubleValue{DoubleValue: v}, + } + case string: + return &schema_pb.Value{ + Kind: &schema_pb.Value_StringValue{StringValue: v}, + } + case []byte: + return &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: v}, + } + case time.Time: + return &schema_pb.Value{ + Kind: &schema_pb.Value_TimestampValue{ + TimestampValue: &schema_pb.TimestampValue{ + TimestampMicros: v.UnixMicro(), + IsUtc: true, + }, + }, + } + case []interface{}: + // Handle arrays + listValues := make([]*schema_pb.Value, len(v)) + for i, item := range v { + listValues[i] = goValueToSchemaValue(item) + } + return &schema_pb.Value{ + Kind: &schema_pb.Value_ListValue{ + ListValue: &schema_pb.ListValue{ + Values: listValues, + }, + }, + } + case map[string]interface{}: + // Check if this is an Avro union type (single key-value pair with type name as key) + // Union types have keys that are typically Avro type names like "int", "string", etc. + // Regular nested records would have meaningful field names like "inner", "name", etc. + if len(v) == 1 { + for unionType, unionValue := range v { + // Handle common Avro union type patterns (only if key looks like a type name) + switch unionType { + case "int": + if intVal, ok := unionValue.(int32); ok { + // Store union as a record with the union type as field name + // This preserves the union information for re-encoding + return &schema_pb.Value{ + Kind: &schema_pb.Value_RecordValue{ + RecordValue: &schema_pb.RecordValue{ + Fields: map[string]*schema_pb.Value{ + "int": { + Kind: &schema_pb.Value_Int32Value{Int32Value: intVal}, + }, + }, + }, + }, + } + } + case "long": + if longVal, ok := unionValue.(int64); ok { + return &schema_pb.Value{ + Kind: &schema_pb.Value_RecordValue{ + RecordValue: &schema_pb.RecordValue{ + Fields: map[string]*schema_pb.Value{ + "long": { + Kind: &schema_pb.Value_Int64Value{Int64Value: longVal}, + }, + }, + }, + }, + } + } + case "float": + if floatVal, ok := unionValue.(float32); ok { + return &schema_pb.Value{ + Kind: &schema_pb.Value_RecordValue{ + RecordValue: &schema_pb.RecordValue{ + Fields: map[string]*schema_pb.Value{ + "float": { + Kind: &schema_pb.Value_FloatValue{FloatValue: floatVal}, + }, + }, + }, + }, + } + } + case "double": + if doubleVal, ok := unionValue.(float64); ok { + return &schema_pb.Value{ + Kind: &schema_pb.Value_RecordValue{ + RecordValue: &schema_pb.RecordValue{ + Fields: map[string]*schema_pb.Value{ + "double": { + Kind: &schema_pb.Value_DoubleValue{DoubleValue: doubleVal}, + }, + }, + }, + }, + } + } + case "string": + if strVal, ok := unionValue.(string); ok { + return &schema_pb.Value{ + Kind: &schema_pb.Value_RecordValue{ + RecordValue: &schema_pb.RecordValue{ + Fields: map[string]*schema_pb.Value{ + "string": { + Kind: &schema_pb.Value_StringValue{StringValue: strVal}, + }, + }, + }, + }, + } + } + case "boolean": + if boolVal, ok := unionValue.(bool); ok { + return &schema_pb.Value{ + Kind: &schema_pb.Value_RecordValue{ + RecordValue: &schema_pb.RecordValue{ + Fields: map[string]*schema_pb.Value{ + "boolean": { + Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal}, + }, + }, + }, + }, + } + } + } + // If it's not a recognized union type, fall through to treat as nested record + } + } + + // Handle nested records (both single-field and multi-field maps) + fields := make(map[string]*schema_pb.Value) + for key, val := range v { + fields[key] = goValueToSchemaValue(val) + } + return &schema_pb.Value{ + Kind: &schema_pb.Value_RecordValue{ + RecordValue: &schema_pb.RecordValue{ + Fields: fields, + }, + }, + } + default: + // Handle other types by converting to string + return &schema_pb.Value{ + Kind: &schema_pb.Value_StringValue{ + StringValue: fmt.Sprintf("%v", v), + }, + } + } +} + +// avroSchemaToRecordType converts an Avro schema to SeaweedMQ RecordType +func avroSchemaToRecordType(schemaStr string) (*schema_pb.RecordType, error) { + // Validate the Avro schema by creating a codec (this ensures it's valid) + _, err := goavro.NewCodec(schemaStr) + if err != nil { + return nil, fmt.Errorf("failed to parse Avro schema: %w", err) + } + + // Parse the schema JSON to extract field definitions + var avroSchema map[string]interface{} + if err := json.Unmarshal([]byte(schemaStr), &avroSchema); err != nil { + return nil, fmt.Errorf("failed to parse Avro schema JSON: %w", err) + } + + // Extract fields from the Avro schema + fields, err := extractAvroFields(avroSchema) + if err != nil { + return nil, fmt.Errorf("failed to extract Avro fields: %w", err) + } + + return &schema_pb.RecordType{ + Fields: fields, + }, nil +} + +// extractAvroFields extracts field definitions from parsed Avro schema JSON +func extractAvroFields(avroSchema map[string]interface{}) ([]*schema_pb.Field, error) { + // Check if this is a record type + schemaType, ok := avroSchema["type"].(string) + if !ok || schemaType != "record" { + return nil, fmt.Errorf("expected record type, got %v", schemaType) + } + + // Extract fields array + fieldsInterface, ok := avroSchema["fields"] + if !ok { + return nil, fmt.Errorf("no fields found in Avro record schema") + } + + fieldsArray, ok := fieldsInterface.([]interface{}) + if !ok { + return nil, fmt.Errorf("fields must be an array") + } + + // Convert each Avro field to SeaweedMQ field + fields := make([]*schema_pb.Field, 0, len(fieldsArray)) + for i, fieldInterface := range fieldsArray { + fieldMap, ok := fieldInterface.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("field %d is not a valid object", i) + } + + field, err := convertAvroFieldToSeaweedMQ(fieldMap, int32(i)) + if err != nil { + return nil, fmt.Errorf("failed to convert field %d: %w", i, err) + } + + fields = append(fields, field) + } + + return fields, nil +} + +// convertAvroFieldToSeaweedMQ converts a single Avro field to SeaweedMQ Field +func convertAvroFieldToSeaweedMQ(avroField map[string]interface{}, fieldIndex int32) (*schema_pb.Field, error) { + // Extract field name + name, ok := avroField["name"].(string) + if !ok { + return nil, fmt.Errorf("field name is required") + } + + // Extract field type and check if it's an array + fieldType, isRepeated, err := convertAvroTypeToSeaweedMQWithRepeated(avroField["type"]) + if err != nil { + return nil, fmt.Errorf("failed to convert field type for %s: %w", name, err) + } + + // Check if field has a default value (indicates it's optional) + _, hasDefault := avroField["default"] + isRequired := !hasDefault + + return &schema_pb.Field{ + Name: name, + FieldIndex: fieldIndex, + Type: fieldType, + IsRequired: isRequired, + IsRepeated: isRepeated, + }, nil +} + +// convertAvroTypeToSeaweedMQ converts Avro type to SeaweedMQ Type +func convertAvroTypeToSeaweedMQ(avroType interface{}) (*schema_pb.Type, error) { + fieldType, _, err := convertAvroTypeToSeaweedMQWithRepeated(avroType) + return fieldType, err +} + +// convertAvroTypeToSeaweedMQWithRepeated converts Avro type to SeaweedMQ Type and returns if it's repeated +func convertAvroTypeToSeaweedMQWithRepeated(avroType interface{}) (*schema_pb.Type, bool, error) { + switch t := avroType.(type) { + case string: + // Simple type + fieldType, err := convertAvroSimpleType(t) + return fieldType, false, err + + case map[string]interface{}: + // Complex type (record, enum, array, map, fixed) + return convertAvroComplexTypeWithRepeated(t) + + case []interface{}: + // Union type + fieldType, err := convertAvroUnionType(t) + return fieldType, false, err + + default: + return nil, false, fmt.Errorf("unsupported Avro type: %T", avroType) + } +} + +// convertAvroSimpleType converts simple Avro types to SeaweedMQ types +func convertAvroSimpleType(avroType string) (*schema_pb.Type, error) { + switch avroType { + case "null": + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_BYTES, // Use bytes for null + }, + }, nil + case "boolean": + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_BOOL, + }, + }, nil + case "int": + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_INT32, + }, + }, nil + case "long": + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_INT64, + }, + }, nil + case "float": + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_FLOAT, + }, + }, nil + case "double": + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_DOUBLE, + }, + }, nil + case "bytes": + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_BYTES, + }, + }, nil + case "string": + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_STRING, + }, + }, nil + default: + return nil, fmt.Errorf("unsupported simple Avro type: %s", avroType) + } +} + +// convertAvroComplexType converts complex Avro types to SeaweedMQ types +func convertAvroComplexType(avroType map[string]interface{}) (*schema_pb.Type, error) { + fieldType, _, err := convertAvroComplexTypeWithRepeated(avroType) + return fieldType, err +} + +// convertAvroComplexTypeWithRepeated converts complex Avro types to SeaweedMQ types and returns if it's repeated +func convertAvroComplexTypeWithRepeated(avroType map[string]interface{}) (*schema_pb.Type, bool, error) { + typeStr, ok := avroType["type"].(string) + if !ok { + return nil, false, fmt.Errorf("complex type must have a type field") + } + + // Handle logical types - they are based on underlying primitive types + if _, hasLogicalType := avroType["logicalType"]; hasLogicalType { + // For logical types, use the underlying primitive type + return convertAvroSimpleTypeWithLogical(typeStr, avroType) + } + + switch typeStr { + case "record": + // Nested record type + fields, err := extractAvroFields(avroType) + if err != nil { + return nil, false, fmt.Errorf("failed to extract nested record fields: %w", err) + } + return &schema_pb.Type{ + Kind: &schema_pb.Type_RecordType{ + RecordType: &schema_pb.RecordType{ + Fields: fields, + }, + }, + }, false, nil + + case "enum": + // Enum type - treat as string for now + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_STRING, + }, + }, false, nil + + case "array": + // Array type + itemsType, err := convertAvroTypeToSeaweedMQ(avroType["items"]) + if err != nil { + return nil, false, fmt.Errorf("failed to convert array items type: %w", err) + } + // For arrays, we return the item type and set IsRepeated=true + return itemsType, true, nil + + case "map": + // Map type - treat as record with dynamic fields + return &schema_pb.Type{ + Kind: &schema_pb.Type_RecordType{ + RecordType: &schema_pb.RecordType{ + Fields: []*schema_pb.Field{}, // Dynamic fields + }, + }, + }, false, nil + + case "fixed": + // Fixed-length bytes + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_BYTES, + }, + }, false, nil + + default: + return nil, false, fmt.Errorf("unsupported complex Avro type: %s", typeStr) + } +} + +// convertAvroSimpleTypeWithLogical handles logical types based on their underlying primitive types +func convertAvroSimpleTypeWithLogical(primitiveType string, avroType map[string]interface{}) (*schema_pb.Type, bool, error) { + logicalType, _ := avroType["logicalType"].(string) + + // Map logical types to appropriate SeaweedMQ types + switch logicalType { + case "decimal": + // Decimal logical type - use bytes for precision + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_BYTES, + }, + }, false, nil + case "uuid": + // UUID logical type - use string + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_STRING, + }, + }, false, nil + case "date": + // Date logical type (int) - use int32 + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_INT32, + }, + }, false, nil + case "time-millis": + // Time in milliseconds (int) - use int32 + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_INT32, + }, + }, false, nil + case "time-micros": + // Time in microseconds (long) - use int64 + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_INT64, + }, + }, false, nil + case "timestamp-millis": + // Timestamp in milliseconds (long) - use int64 + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_INT64, + }, + }, false, nil + case "timestamp-micros": + // Timestamp in microseconds (long) - use int64 + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_INT64, + }, + }, false, nil + default: + // For unknown logical types, fall back to the underlying primitive type + fieldType, err := convertAvroSimpleType(primitiveType) + return fieldType, false, err + } +} + +// convertAvroUnionType converts Avro union types to SeaweedMQ types +func convertAvroUnionType(unionTypes []interface{}) (*schema_pb.Type, error) { + // For unions, we'll use the first non-null type + // This is a simplification - in a full implementation, we might want to create a union type + for _, unionType := range unionTypes { + if typeStr, ok := unionType.(string); ok && typeStr == "null" { + continue // Skip null types + } + + // Use the first non-null type + return convertAvroTypeToSeaweedMQ(unionType) + } + + // If all types are null, return bytes type + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_BYTES, + }, + }, nil +} + +// InferRecordTypeFromMap infers a RecordType from a decoded map +// This is useful when we don't have the original Avro schema +func InferRecordTypeFromMap(m map[string]interface{}) *schema_pb.RecordType { + fields := make([]*schema_pb.Field, 0, len(m)) + fieldIndex := int32(0) + + for key, value := range m { + fieldType := inferTypeFromValue(value) + + field := &schema_pb.Field{ + Name: key, + FieldIndex: fieldIndex, + Type: fieldType, + IsRequired: value != nil, // Non-nil values are considered required + IsRepeated: false, + } + + // Check if it's an array + if reflect.TypeOf(value).Kind() == reflect.Slice { + field.IsRepeated = true + } + + fields = append(fields, field) + fieldIndex++ + } + + return &schema_pb.RecordType{ + Fields: fields, + } +} + +// inferTypeFromValue infers a SeaweedMQ Type from a Go value +func inferTypeFromValue(value interface{}) *schema_pb.Type { + if value == nil { + // Default to string for null values + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_STRING, + }, + } + } + + switch v := value.(type) { + case bool: + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_BOOL, + }, + } + case int32: + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_INT32, + }, + } + case int64, int: + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_INT64, + }, + } + case float32: + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_FLOAT, + }, + } + case float64: + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_DOUBLE, + }, + } + case string: + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_STRING, + }, + } + case []byte: + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_BYTES, + }, + } + case time.Time: + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_TIMESTAMP, + }, + } + case []interface{}: + // Handle arrays - infer element type from first element + var elementType *schema_pb.Type + if len(v) > 0 { + elementType = inferTypeFromValue(v[0]) + } else { + // Default to string for empty arrays + elementType = &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_STRING, + }, + } + } + + return &schema_pb.Type{ + Kind: &schema_pb.Type_ListType{ + ListType: &schema_pb.ListType{ + ElementType: elementType, + }, + }, + } + case map[string]interface{}: + // Handle nested records + nestedRecordType := InferRecordTypeFromMap(v) + return &schema_pb.Type{ + Kind: &schema_pb.Type_RecordType{ + RecordType: nestedRecordType, + }, + } + default: + // Default to string for unknown types + return &schema_pb.Type{ + Kind: &schema_pb.Type_ScalarType{ + ScalarType: schema_pb.ScalarType_STRING, + }, + } + } +} |
