aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/schema/to_schema_value.go
blob: 50e86d233238c38488a537debe42b6ca2ec823c0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package schema

import (
	"bytes"
	"fmt"

	"github.com/parquet-go/parquet-go"
	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)

// ToRecordValue converts a parquet.Row to a schema_pb.RecordValue
// This does not work or did not test with nested structures.
// Using this may fail to convert the parquet.Row to schema_pb.RecordValue
func ToRecordValue(recordType *schema_pb.RecordType, parquetLevels *ParquetLevels, row parquet.Row) (*schema_pb.RecordValue, error) {
	values := []parquet.Value(row)
	recordValue, _, err := toRecordValue(recordType, parquetLevels, values, 0)
	if err != nil {
		return nil, err
	}
	return recordValue.GetRecordValue(), nil
}

func ToValue(t *schema_pb.Type, levels *ParquetLevels, values []parquet.Value, valueIndex int) (value *schema_pb.Value, endValueIndex int, err error) {
	switch t.Kind.(type) {
	case *schema_pb.Type_ScalarType:
		return toScalarValue(t.GetScalarType(), levels, values, valueIndex)
	case *schema_pb.Type_ListType:
		return toListValue(t.GetListType(), levels, values, valueIndex)
	case *schema_pb.Type_RecordType:
		return toRecordValue(t.GetRecordType(), levels, values, valueIndex)
	}
	return nil, valueIndex, fmt.Errorf("unsupported type: %v", t)
}

func toRecordValue(recordType *schema_pb.RecordType, levels *ParquetLevels, values []parquet.Value, valueIndex int) (*schema_pb.Value, int, error) {
	recordValue := schema_pb.RecordValue{Fields: make(map[string]*schema_pb.Value)}
	for _, field := range recordType.Fields {
		fieldLevels := levels.levels[field.Name]
		fieldValue, endValueIndex, err := ToValue(field.Type, fieldLevels, values, valueIndex)
		if err != nil {
			return nil, 0, err
		}
		valueIndex = endValueIndex
		recordValue.Fields[field.Name] = fieldValue
	}
	return &schema_pb.Value{Kind: &schema_pb.Value_RecordValue{RecordValue: &recordValue}}, valueIndex, nil
}

func toListValue(listType *schema_pb.ListType, levels *ParquetLevels, values []parquet.Value, valueIndex int) (listValue *schema_pb.Value, endValueIndex int, err error) {
	listValues := make([]*schema_pb.Value, 0)
	var value *schema_pb.Value
	for valueIndex < len(values) {
		if values[valueIndex].Column() != levels.startColumnIndex {
			break
		}
		value, valueIndex, err = ToValue(listType.ElementType, levels, values, valueIndex)
		if err != nil {
			return nil, valueIndex, err
		}
		listValues = append(listValues, value)
	}
	return &schema_pb.Value{Kind: &schema_pb.Value_ListValue{ListValue: &schema_pb.ListValue{Values: listValues}}}, valueIndex, nil
}

func toScalarValue(scalarType schema_pb.ScalarType, levels *ParquetLevels, values []parquet.Value, valueIndex int) (*schema_pb.Value, int, error) {
	value := values[valueIndex]
	if value.Column() != levels.startColumnIndex {
		return nil, valueIndex, nil
	}
	switch scalarType {
	case schema_pb.ScalarType_BOOL:
		return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: value.Boolean()}}, valueIndex + 1, nil
	case schema_pb.ScalarType_INT32:
		return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: value.Int32()}}, valueIndex + 1, nil
	case schema_pb.ScalarType_INT64:
		return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: value.Int64()}}, valueIndex + 1, nil
	case schema_pb.ScalarType_FLOAT:
		return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: value.Float()}}, valueIndex + 1, nil
	case schema_pb.ScalarType_DOUBLE:
		return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value.Double()}}, valueIndex + 1, nil
	case schema_pb.ScalarType_BYTES:
		// Handle nil byte arrays from parquet to prevent growslice panic
		byteData := value.ByteArray()
		if byteData == nil {
			byteData = []byte{} // Use empty slice instead of nil
		}
		return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: byteData}}, valueIndex + 1, nil
	case schema_pb.ScalarType_STRING:
		// Handle nil byte arrays from parquet to prevent string conversion issues
		byteData := value.ByteArray()
		if byteData == nil {
			byteData = []byte{} // Use empty slice instead of nil
		}
		return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(byteData)}}, valueIndex + 1, nil
	// Parquet logical types - convert from their physical storage back to logical values
	case schema_pb.ScalarType_TIMESTAMP:
		// Stored as INT64, convert back to TimestampValue
		return &schema_pb.Value{
			Kind: &schema_pb.Value_TimestampValue{
				TimestampValue: &schema_pb.TimestampValue{
					TimestampMicros: value.Int64(),
					IsUtc:           true, // Default to UTC for compatibility
				},
			},
		}, valueIndex + 1, nil
	case schema_pb.ScalarType_DATE:
		// Stored as INT32, convert back to DateValue
		return &schema_pb.Value{
			Kind: &schema_pb.Value_DateValue{
				DateValue: &schema_pb.DateValue{
					DaysSinceEpoch: value.Int32(),
				},
			},
		}, valueIndex + 1, nil
	case schema_pb.ScalarType_DECIMAL:
		// Stored as FixedLenByteArray, convert back to DecimalValue
		fixedBytes := value.ByteArray() // FixedLenByteArray also uses ByteArray() method
		if fixedBytes == nil {
			fixedBytes = []byte{} // Use empty slice instead of nil
		}
		// Remove leading zeros to get the minimal representation
		trimmedBytes := bytes.TrimLeft(fixedBytes, "\x00")
		if len(trimmedBytes) == 0 {
			trimmedBytes = []byte{0} // Ensure we have at least one byte for zero
		}
		return &schema_pb.Value{
			Kind: &schema_pb.Value_DecimalValue{
				DecimalValue: &schema_pb.DecimalValue{
					Value:     trimmedBytes,
					Precision: 38, // Maximum precision supported by schema
					Scale:     18, // Maximum scale supported by schema
				},
			},
		}, valueIndex + 1, nil
	case schema_pb.ScalarType_TIME:
		// Stored as INT64, convert back to TimeValue
		return &schema_pb.Value{
			Kind: &schema_pb.Value_TimeValue{
				TimeValue: &schema_pb.TimeValue{
					TimeMicros: value.Int64(),
				},
			},
		}, valueIndex + 1, nil
	}
	return nil, valueIndex, fmt.Errorf("unsupported scalar type: %v", scalarType)
}