aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/schema/to_parquet_levels.go
blob: f9fc073bb550427c0d7d17c473e3b656d2912bba (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package schema

import (
	"fmt"
	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)

type ParquetLevels struct {
	startColumnIndex int
	endColumnIndex   int
	definitionDepth  int
	levels           map[string]*ParquetLevels
}

func ToParquetLevels(recordType *schema_pb.RecordType) (*ParquetLevels, error) {
	return toRecordTypeLevels(recordType, 0, 0)
}

func toFieldTypeLevels(fieldType *schema_pb.Type, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
	switch fieldType.Kind.(type) {
	case *schema_pb.Type_ScalarType:
		return toFieldTypeScalarLevels(fieldType.GetScalarType(), startColumnIndex, definitionDepth)
	case *schema_pb.Type_RecordType:
		return toRecordTypeLevels(fieldType.GetRecordType(), startColumnIndex, definitionDepth)
	case *schema_pb.Type_ListType:
		return toFieldTypeListLevels(fieldType.GetListType(), startColumnIndex, definitionDepth)
	}
	return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
}

func toFieldTypeListLevels(listType *schema_pb.ListType, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
	return toFieldTypeLevels(listType.ElementType, startColumnIndex, definitionDepth)
}

func toFieldTypeScalarLevels(scalarType schema_pb.ScalarType, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
	return &ParquetLevels{
		startColumnIndex: startColumnIndex,
		endColumnIndex:   startColumnIndex + 1,
		definitionDepth:  definitionDepth,
	}, nil
}
func toRecordTypeLevels(recordType *schema_pb.RecordType, startColumnIndex, definitionDepth int) (*ParquetLevels, error) {
	recordTypeLevels := &ParquetLevels{
		startColumnIndex: startColumnIndex,
		definitionDepth:  definitionDepth,
		levels:           make(map[string]*ParquetLevels),
	}
	for _, field := range recordType.Fields {
		fieldTypeLevels, err := toFieldTypeLevels(field.Type, startColumnIndex, definitionDepth+1)
		if err != nil {
			return nil, err
		}
		recordTypeLevels.levels[field.Name] = fieldTypeLevels
		startColumnIndex = fieldTypeLevels.endColumnIndex
	}
	recordTypeLevels.endColumnIndex = startColumnIndex
	return recordTypeLevels, nil
}