aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-04-18 22:41:12 -0700
committerchrislu <chris.lu@gmail.com>2024-04-18 22:41:12 -0700
commit57949f9959e3b9fcf4438641a8e499a706da0271 (patch)
tree2e59a38f5ef80d68633a8d6e55542fb2eed12eb6
parent05d18130b62ccc8e08bba93d5bc5d462a46c9cf6 (diff)
downloadseaweedfs-57949f9959e3b9fcf4438641a8e499a706da0271.tar.xz
seaweedfs-57949f9959e3b9fcf4438641a8e499a706da0271.zip
support list type
-rw-r--r--weed/mq/schema/to_parquet_schema.go26
1 files changed, 16 insertions, 10 deletions
diff --git a/weed/mq/schema/to_parquet_schema.go b/weed/mq/schema/to_parquet_schema.go
index f39692714..a5f3481e3 100644
--- a/weed/mq/schema/to_parquet_schema.go
+++ b/weed/mq/schema/to_parquet_schema.go
@@ -15,23 +15,29 @@ func ToParquetSchema(topicName string, recordType *schema_pb.RecordType) (*parqu
return parquet.NewSchema(topicName, rootNode), nil
}
-func toParquetFieldType(field *schema_pb.Field) (parquet.Node, error) {
- var (
- dataType parquet.Node
- err error
- )
- switch field.Type.Kind.(type) {
+func toParquetFieldType(fieldType *schema_pb.Type) (dataType parquet.Node, err error) {
+ switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
- dataType, err = toParquetFieldTypeScalar(field.Type.GetScalarType())
+ dataType, err = toParquetFieldTypeScalar(fieldType.GetScalarType())
case *schema_pb.Type_RecordType:
- dataType, err = toParquetFieldTypeRecord(field.Type.GetRecordType())
+ dataType, err = toParquetFieldTypeRecord(fieldType.GetRecordType())
+ case *schema_pb.Type_ListType:
+ dataType, err = toParquetFieldTypeList(fieldType.GetListType())
default:
- return nil, fmt.Errorf("unknown field type: %T", field.Type.Kind)
+ return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
}
return dataType, err
}
+func toParquetFieldTypeList(listType *schema_pb.ListType) (parquet.Node, error) {
+ elementType, err := toParquetFieldType(listType.ElementType)
+ if err != nil {
+ return nil, err
+ }
+ return parquet.List(elementType), nil
+}
+
func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, error) {
switch scalarType {
case schema_pb.ScalarType_BOOLEAN:
@@ -55,7 +61,7 @@ func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, er
func toParquetFieldTypeRecord(recordType *schema_pb.RecordType) (parquet.Node, error) {
recordNode := parquet.Group{}
for _, field := range recordType.Fields {
- parquetFieldType, err := toParquetFieldType(field)
+ parquetFieldType, err := toParquetFieldType(field.Type)
if err != nil {
return nil, err
}