aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/schema/avro_decoder.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/schema/avro_decoder.go')
-rw-r--r--weed/mq/kafka/schema/avro_decoder.go719
1 files changed, 719 insertions, 0 deletions
diff --git a/weed/mq/kafka/schema/avro_decoder.go b/weed/mq/kafka/schema/avro_decoder.go
new file mode 100644
index 000000000..f40236a81
--- /dev/null
+++ b/weed/mq/kafka/schema/avro_decoder.go
@@ -0,0 +1,719 @@
+package schema
+
+import (
+ "encoding/json"
+ "fmt"
+ "reflect"
+ "time"
+
+ "github.com/linkedin/goavro/v2"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+// AvroDecoder handles Avro schema decoding and conversion to SeaweedMQ format
+type AvroDecoder struct {
+ codec *goavro.Codec
+}
+
+// NewAvroDecoder creates a new Avro decoder from a schema string
+func NewAvroDecoder(schemaStr string) (*AvroDecoder, error) {
+ codec, err := goavro.NewCodec(schemaStr)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create Avro codec: %w", err)
+ }
+
+ return &AvroDecoder{
+ codec: codec,
+ }, nil
+}
+
+// Decode decodes Avro binary data to a Go map
+func (ad *AvroDecoder) Decode(data []byte) (map[string]interface{}, error) {
+ native, _, err := ad.codec.NativeFromBinary(data)
+ if err != nil {
+ return nil, fmt.Errorf("failed to decode Avro data: %w", err)
+ }
+
+ // Convert to map[string]interface{} for easier processing
+ result, ok := native.(map[string]interface{})
+ if !ok {
+ return nil, fmt.Errorf("expected Avro record, got %T", native)
+ }
+
+ return result, nil
+}
+
+// DecodeToRecordValue decodes Avro data directly to SeaweedMQ RecordValue
+func (ad *AvroDecoder) DecodeToRecordValue(data []byte) (*schema_pb.RecordValue, error) {
+ nativeMap, err := ad.Decode(data)
+ if err != nil {
+ return nil, err
+ }
+
+ return MapToRecordValue(nativeMap), nil
+}
+
+// InferRecordType infers a SeaweedMQ RecordType from an Avro schema
+func (ad *AvroDecoder) InferRecordType() (*schema_pb.RecordType, error) {
+ schema := ad.codec.Schema()
+ return avroSchemaToRecordType(schema)
+}
+
+// MapToRecordValue converts a Go map to SeaweedMQ RecordValue
+func MapToRecordValue(m map[string]interface{}) *schema_pb.RecordValue {
+ fields := make(map[string]*schema_pb.Value)
+
+ for key, value := range m {
+ fields[key] = goValueToSchemaValue(value)
+ }
+
+ return &schema_pb.RecordValue{
+ Fields: fields,
+ }
+}
+
+// goValueToSchemaValue converts a Go value to a SeaweedMQ Value
+func goValueToSchemaValue(value interface{}) *schema_pb.Value {
+ if value == nil {
+ // For null values, use an empty string as default
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_StringValue{StringValue: ""},
+ }
+ }
+
+ switch v := value.(type) {
+ case bool:
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_BoolValue{BoolValue: v},
+ }
+ case int32:
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_Int32Value{Int32Value: v},
+ }
+ case int64:
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: v},
+ }
+ case int:
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: int64(v)},
+ }
+ case float32:
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_FloatValue{FloatValue: v},
+ }
+ case float64:
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_DoubleValue{DoubleValue: v},
+ }
+ case string:
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_StringValue{StringValue: v},
+ }
+ case []byte:
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: v},
+ }
+ case time.Time:
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_TimestampValue{
+ TimestampValue: &schema_pb.TimestampValue{
+ TimestampMicros: v.UnixMicro(),
+ IsUtc: true,
+ },
+ },
+ }
+ case []interface{}:
+ // Handle arrays
+ listValues := make([]*schema_pb.Value, len(v))
+ for i, item := range v {
+ listValues[i] = goValueToSchemaValue(item)
+ }
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_ListValue{
+ ListValue: &schema_pb.ListValue{
+ Values: listValues,
+ },
+ },
+ }
+ case map[string]interface{}:
+ // Check if this is an Avro union type (single key-value pair with type name as key)
+ // Union types have keys that are typically Avro type names like "int", "string", etc.
+ // Regular nested records would have meaningful field names like "inner", "name", etc.
+ if len(v) == 1 {
+ for unionType, unionValue := range v {
+ // Handle common Avro union type patterns (only if key looks like a type name)
+ switch unionType {
+ case "int":
+ if intVal, ok := unionValue.(int32); ok {
+ // Store union as a record with the union type as field name
+ // This preserves the union information for re-encoding
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_RecordValue{
+ RecordValue: &schema_pb.RecordValue{
+ Fields: map[string]*schema_pb.Value{
+ "int": {
+ Kind: &schema_pb.Value_Int32Value{Int32Value: intVal},
+ },
+ },
+ },
+ },
+ }
+ }
+ case "long":
+ if longVal, ok := unionValue.(int64); ok {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_RecordValue{
+ RecordValue: &schema_pb.RecordValue{
+ Fields: map[string]*schema_pb.Value{
+ "long": {
+ Kind: &schema_pb.Value_Int64Value{Int64Value: longVal},
+ },
+ },
+ },
+ },
+ }
+ }
+ case "float":
+ if floatVal, ok := unionValue.(float32); ok {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_RecordValue{
+ RecordValue: &schema_pb.RecordValue{
+ Fields: map[string]*schema_pb.Value{
+ "float": {
+ Kind: &schema_pb.Value_FloatValue{FloatValue: floatVal},
+ },
+ },
+ },
+ },
+ }
+ }
+ case "double":
+ if doubleVal, ok := unionValue.(float64); ok {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_RecordValue{
+ RecordValue: &schema_pb.RecordValue{
+ Fields: map[string]*schema_pb.Value{
+ "double": {
+ Kind: &schema_pb.Value_DoubleValue{DoubleValue: doubleVal},
+ },
+ },
+ },
+ },
+ }
+ }
+ case "string":
+ if strVal, ok := unionValue.(string); ok {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_RecordValue{
+ RecordValue: &schema_pb.RecordValue{
+ Fields: map[string]*schema_pb.Value{
+ "string": {
+ Kind: &schema_pb.Value_StringValue{StringValue: strVal},
+ },
+ },
+ },
+ },
+ }
+ }
+ case "boolean":
+ if boolVal, ok := unionValue.(bool); ok {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_RecordValue{
+ RecordValue: &schema_pb.RecordValue{
+ Fields: map[string]*schema_pb.Value{
+ "boolean": {
+ Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal},
+ },
+ },
+ },
+ },
+ }
+ }
+ }
+ // If it's not a recognized union type, fall through to treat as nested record
+ }
+ }
+
+ // Handle nested records (both single-field and multi-field maps)
+ fields := make(map[string]*schema_pb.Value)
+ for key, val := range v {
+ fields[key] = goValueToSchemaValue(val)
+ }
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_RecordValue{
+ RecordValue: &schema_pb.RecordValue{
+ Fields: fields,
+ },
+ },
+ }
+ default:
+ // Handle other types by converting to string
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_StringValue{
+ StringValue: fmt.Sprintf("%v", v),
+ },
+ }
+ }
+}
+
+// avroSchemaToRecordType converts an Avro schema to SeaweedMQ RecordType
+func avroSchemaToRecordType(schemaStr string) (*schema_pb.RecordType, error) {
+ // Validate the Avro schema by creating a codec (this ensures it's valid)
+ _, err := goavro.NewCodec(schemaStr)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse Avro schema: %w", err)
+ }
+
+ // Parse the schema JSON to extract field definitions
+ var avroSchema map[string]interface{}
+ if err := json.Unmarshal([]byte(schemaStr), &avroSchema); err != nil {
+ return nil, fmt.Errorf("failed to parse Avro schema JSON: %w", err)
+ }
+
+ // Extract fields from the Avro schema
+ fields, err := extractAvroFields(avroSchema)
+ if err != nil {
+ return nil, fmt.Errorf("failed to extract Avro fields: %w", err)
+ }
+
+ return &schema_pb.RecordType{
+ Fields: fields,
+ }, nil
+}
+
+// extractAvroFields extracts field definitions from parsed Avro schema JSON
+func extractAvroFields(avroSchema map[string]interface{}) ([]*schema_pb.Field, error) {
+ // Check if this is a record type
+ schemaType, ok := avroSchema["type"].(string)
+ if !ok || schemaType != "record" {
+ return nil, fmt.Errorf("expected record type, got %v", schemaType)
+ }
+
+ // Extract fields array
+ fieldsInterface, ok := avroSchema["fields"]
+ if !ok {
+ return nil, fmt.Errorf("no fields found in Avro record schema")
+ }
+
+ fieldsArray, ok := fieldsInterface.([]interface{})
+ if !ok {
+ return nil, fmt.Errorf("fields must be an array")
+ }
+
+ // Convert each Avro field to SeaweedMQ field
+ fields := make([]*schema_pb.Field, 0, len(fieldsArray))
+ for i, fieldInterface := range fieldsArray {
+ fieldMap, ok := fieldInterface.(map[string]interface{})
+ if !ok {
+ return nil, fmt.Errorf("field %d is not a valid object", i)
+ }
+
+ field, err := convertAvroFieldToSeaweedMQ(fieldMap, int32(i))
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert field %d: %w", i, err)
+ }
+
+ fields = append(fields, field)
+ }
+
+ return fields, nil
+}
+
+// convertAvroFieldToSeaweedMQ converts a single Avro field to SeaweedMQ Field
+func convertAvroFieldToSeaweedMQ(avroField map[string]interface{}, fieldIndex int32) (*schema_pb.Field, error) {
+ // Extract field name
+ name, ok := avroField["name"].(string)
+ if !ok {
+ return nil, fmt.Errorf("field name is required")
+ }
+
+ // Extract field type and check if it's an array
+ fieldType, isRepeated, err := convertAvroTypeToSeaweedMQWithRepeated(avroField["type"])
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert field type for %s: %w", name, err)
+ }
+
+ // Check if field has a default value (indicates it's optional)
+ _, hasDefault := avroField["default"]
+ isRequired := !hasDefault
+
+ return &schema_pb.Field{
+ Name: name,
+ FieldIndex: fieldIndex,
+ Type: fieldType,
+ IsRequired: isRequired,
+ IsRepeated: isRepeated,
+ }, nil
+}
+
+// convertAvroTypeToSeaweedMQ converts Avro type to SeaweedMQ Type
+func convertAvroTypeToSeaweedMQ(avroType interface{}) (*schema_pb.Type, error) {
+ fieldType, _, err := convertAvroTypeToSeaweedMQWithRepeated(avroType)
+ return fieldType, err
+}
+
+// convertAvroTypeToSeaweedMQWithRepeated converts Avro type to SeaweedMQ Type and returns if it's repeated
+func convertAvroTypeToSeaweedMQWithRepeated(avroType interface{}) (*schema_pb.Type, bool, error) {
+ switch t := avroType.(type) {
+ case string:
+ // Simple type
+ fieldType, err := convertAvroSimpleType(t)
+ return fieldType, false, err
+
+ case map[string]interface{}:
+ // Complex type (record, enum, array, map, fixed)
+ return convertAvroComplexTypeWithRepeated(t)
+
+ case []interface{}:
+ // Union type
+ fieldType, err := convertAvroUnionType(t)
+ return fieldType, false, err
+
+ default:
+ return nil, false, fmt.Errorf("unsupported Avro type: %T", avroType)
+ }
+}
+
+// convertAvroSimpleType converts simple Avro types to SeaweedMQ types
+func convertAvroSimpleType(avroType string) (*schema_pb.Type, error) {
+ switch avroType {
+ case "null":
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_BYTES, // Use bytes for null
+ },
+ }, nil
+ case "boolean":
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_BOOL,
+ },
+ }, nil
+ case "int":
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_INT32,
+ },
+ }, nil
+ case "long":
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_INT64,
+ },
+ }, nil
+ case "float":
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_FLOAT,
+ },
+ }, nil
+ case "double":
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_DOUBLE,
+ },
+ }, nil
+ case "bytes":
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_BYTES,
+ },
+ }, nil
+ case "string":
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_STRING,
+ },
+ }, nil
+ default:
+ return nil, fmt.Errorf("unsupported simple Avro type: %s", avroType)
+ }
+}
+
+// convertAvroComplexType converts complex Avro types to SeaweedMQ types
+func convertAvroComplexType(avroType map[string]interface{}) (*schema_pb.Type, error) {
+ fieldType, _, err := convertAvroComplexTypeWithRepeated(avroType)
+ return fieldType, err
+}
+
+// convertAvroComplexTypeWithRepeated converts complex Avro types to SeaweedMQ types and returns if it's repeated
+func convertAvroComplexTypeWithRepeated(avroType map[string]interface{}) (*schema_pb.Type, bool, error) {
+ typeStr, ok := avroType["type"].(string)
+ if !ok {
+ return nil, false, fmt.Errorf("complex type must have a type field")
+ }
+
+ // Handle logical types - they are based on underlying primitive types
+ if _, hasLogicalType := avroType["logicalType"]; hasLogicalType {
+ // For logical types, use the underlying primitive type
+ return convertAvroSimpleTypeWithLogical(typeStr, avroType)
+ }
+
+ switch typeStr {
+ case "record":
+ // Nested record type
+ fields, err := extractAvroFields(avroType)
+ if err != nil {
+ return nil, false, fmt.Errorf("failed to extract nested record fields: %w", err)
+ }
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_RecordType{
+ RecordType: &schema_pb.RecordType{
+ Fields: fields,
+ },
+ },
+ }, false, nil
+
+ case "enum":
+ // Enum type - treat as string for now
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_STRING,
+ },
+ }, false, nil
+
+ case "array":
+ // Array type
+ itemsType, err := convertAvroTypeToSeaweedMQ(avroType["items"])
+ if err != nil {
+ return nil, false, fmt.Errorf("failed to convert array items type: %w", err)
+ }
+ // For arrays, we return the item type and set IsRepeated=true
+ return itemsType, true, nil
+
+ case "map":
+ // Map type - treat as record with dynamic fields
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_RecordType{
+ RecordType: &schema_pb.RecordType{
+ Fields: []*schema_pb.Field{}, // Dynamic fields
+ },
+ },
+ }, false, nil
+
+ case "fixed":
+ // Fixed-length bytes
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_BYTES,
+ },
+ }, false, nil
+
+ default:
+ return nil, false, fmt.Errorf("unsupported complex Avro type: %s", typeStr)
+ }
+}
+
+// convertAvroSimpleTypeWithLogical handles logical types based on their underlying primitive types
+func convertAvroSimpleTypeWithLogical(primitiveType string, avroType map[string]interface{}) (*schema_pb.Type, bool, error) {
+ logicalType, _ := avroType["logicalType"].(string)
+
+ // Map logical types to appropriate SeaweedMQ types
+ switch logicalType {
+ case "decimal":
+ // Decimal logical type - use bytes for precision
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_BYTES,
+ },
+ }, false, nil
+ case "uuid":
+ // UUID logical type - use string
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_STRING,
+ },
+ }, false, nil
+ case "date":
+ // Date logical type (int) - use int32
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_INT32,
+ },
+ }, false, nil
+ case "time-millis":
+ // Time in milliseconds (int) - use int32
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_INT32,
+ },
+ }, false, nil
+ case "time-micros":
+ // Time in microseconds (long) - use int64
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_INT64,
+ },
+ }, false, nil
+ case "timestamp-millis":
+ // Timestamp in milliseconds (long) - use int64
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_INT64,
+ },
+ }, false, nil
+ case "timestamp-micros":
+ // Timestamp in microseconds (long) - use int64
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_INT64,
+ },
+ }, false, nil
+ default:
+ // For unknown logical types, fall back to the underlying primitive type
+ fieldType, err := convertAvroSimpleType(primitiveType)
+ return fieldType, false, err
+ }
+}
+
+// convertAvroUnionType converts Avro union types to SeaweedMQ types
+func convertAvroUnionType(unionTypes []interface{}) (*schema_pb.Type, error) {
+ // For unions, we'll use the first non-null type
+ // This is a simplification - in a full implementation, we might want to create a union type
+ for _, unionType := range unionTypes {
+ if typeStr, ok := unionType.(string); ok && typeStr == "null" {
+ continue // Skip null types
+ }
+
+ // Use the first non-null type
+ return convertAvroTypeToSeaweedMQ(unionType)
+ }
+
+ // If all types are null, return bytes type
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_BYTES,
+ },
+ }, nil
+}
+
+// InferRecordTypeFromMap infers a RecordType from a decoded map
+// This is useful when we don't have the original Avro schema
+func InferRecordTypeFromMap(m map[string]interface{}) *schema_pb.RecordType {
+ fields := make([]*schema_pb.Field, 0, len(m))
+ fieldIndex := int32(0)
+
+ for key, value := range m {
+ fieldType := inferTypeFromValue(value)
+
+ field := &schema_pb.Field{
+ Name: key,
+ FieldIndex: fieldIndex,
+ Type: fieldType,
+ IsRequired: value != nil, // Non-nil values are considered required
+ IsRepeated: false,
+ }
+
+ // Check if it's an array
+ if reflect.TypeOf(value).Kind() == reflect.Slice {
+ field.IsRepeated = true
+ }
+
+ fields = append(fields, field)
+ fieldIndex++
+ }
+
+ return &schema_pb.RecordType{
+ Fields: fields,
+ }
+}
+
+// inferTypeFromValue infers a SeaweedMQ Type from a Go value
+func inferTypeFromValue(value interface{}) *schema_pb.Type {
+ if value == nil {
+ // Default to string for null values
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_STRING,
+ },
+ }
+ }
+
+ switch v := value.(type) {
+ case bool:
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_BOOL,
+ },
+ }
+ case int32:
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_INT32,
+ },
+ }
+ case int64, int:
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_INT64,
+ },
+ }
+ case float32:
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_FLOAT,
+ },
+ }
+ case float64:
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_DOUBLE,
+ },
+ }
+ case string:
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_STRING,
+ },
+ }
+ case []byte:
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_BYTES,
+ },
+ }
+ case time.Time:
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_TIMESTAMP,
+ },
+ }
+ case []interface{}:
+ // Handle arrays - infer element type from first element
+ var elementType *schema_pb.Type
+ if len(v) > 0 {
+ elementType = inferTypeFromValue(v[0])
+ } else {
+ // Default to string for empty arrays
+ elementType = &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_STRING,
+ },
+ }
+ }
+
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ListType{
+ ListType: &schema_pb.ListType{
+ ElementType: elementType,
+ },
+ },
+ }
+ case map[string]interface{}:
+ // Handle nested records
+ nestedRecordType := InferRecordTypeFromMap(v)
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_RecordType{
+ RecordType: nestedRecordType,
+ },
+ }
+ default:
+ // Default to string for unknown types
+ return &schema_pb.Type{
+ Kind: &schema_pb.Type_ScalarType{
+ ScalarType: schema_pb.ScalarType_STRING,
+ },
+ }
+ }
+}