aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/schema/to_parquet_schema.go
blob: 036acc153dea0aad18f2f8249a0e9947f3e3b30f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package schema

import (
	"fmt"
	parquet "github.com/parquet-go/parquet-go"
	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)

func ToParquetSchema(topicName string, recordType *schema_pb.RecordType) (*parquet.Schema, error) {
	rootNode, err := toParquetFieldTypeRecord(recordType)
	if err != nil {
		return nil, fmt.Errorf("failed to convert record type to parquet schema: %w", err)
	}

	// Fields are sorted by name, so the value should be sorted also
	// the sorting is inside parquet.`func (g Group) Fields() []Field`
	return parquet.NewSchema(topicName, rootNode), nil
}

func toParquetFieldType(fieldType *schema_pb.Type) (dataType parquet.Node, err error) {
	switch fieldType.Kind.(type) {
	case *schema_pb.Type_ScalarType:
		dataType, err = toParquetFieldTypeScalar(fieldType.GetScalarType())
		dataType = parquet.Optional(dataType)
	case *schema_pb.Type_RecordType:
		dataType, err = toParquetFieldTypeRecord(fieldType.GetRecordType())
		dataType = parquet.Optional(dataType)
	case *schema_pb.Type_ListType:
		dataType, err = toParquetFieldTypeList(fieldType.GetListType())
	default:
		return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
	}

	return dataType, err
}

func toParquetFieldTypeList(listType *schema_pb.ListType) (parquet.Node, error) {
	elementType, err := toParquetFieldType(listType.ElementType)
	if err != nil {
		return nil, err
	}
	return parquet.Repeated(elementType), nil
}

func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, error) {
	switch scalarType {
	case schema_pb.ScalarType_BOOL:
		return parquet.Leaf(parquet.BooleanType), nil
	case schema_pb.ScalarType_INT32:
		return parquet.Leaf(parquet.Int32Type), nil
	case schema_pb.ScalarType_INT64:
		return parquet.Leaf(parquet.Int64Type), nil
	case schema_pb.ScalarType_FLOAT:
		return parquet.Leaf(parquet.FloatType), nil
	case schema_pb.ScalarType_DOUBLE:
		return parquet.Leaf(parquet.DoubleType), nil
	case schema_pb.ScalarType_BYTES:
		return parquet.Leaf(parquet.ByteArrayType), nil
	case schema_pb.ScalarType_STRING:
		return parquet.Leaf(parquet.ByteArrayType), nil
	default:
		return nil, fmt.Errorf("unknown scalar type: %v", scalarType)
	}
}
func toParquetFieldTypeRecord(recordType *schema_pb.RecordType) (parquet.Node, error) {
	recordNode := parquet.Group{}
	for _, field := range recordType.Fields {
		parquetFieldType, err := toParquetFieldType(field.Type)
		if err != nil {
			return nil, err
		}
		recordNode[field.Name] = parquetFieldType
	}
	return recordNode, nil
}