aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/schema/to_parquet_schema.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/schema/to_parquet_schema.go')
-rw-r--r--weed/mq/schema/to_parquet_schema.go72
1 files changed, 57 insertions, 15 deletions
diff --git a/weed/mq/schema/to_parquet_schema.go b/weed/mq/schema/to_parquet_schema.go
index 036acc153..71bbf81ed 100644
--- a/weed/mq/schema/to_parquet_schema.go
+++ b/weed/mq/schema/to_parquet_schema.go
@@ -2,6 +2,7 @@ package schema
import (
"fmt"
+
parquet "github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
@@ -18,20 +19,8 @@ func ToParquetSchema(topicName string, recordType *schema_pb.RecordType) (*parqu
}
func toParquetFieldType(fieldType *schema_pb.Type) (dataType parquet.Node, err error) {
- switch fieldType.Kind.(type) {
- case *schema_pb.Type_ScalarType:
- dataType, err = toParquetFieldTypeScalar(fieldType.GetScalarType())
- dataType = parquet.Optional(dataType)
- case *schema_pb.Type_RecordType:
- dataType, err = toParquetFieldTypeRecord(fieldType.GetRecordType())
- dataType = parquet.Optional(dataType)
- case *schema_pb.Type_ListType:
- dataType, err = toParquetFieldTypeList(fieldType.GetListType())
- default:
- return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
- }
-
- return dataType, err
+ // This is the old function - now defaults to Optional for backward compatibility
+ return toParquetFieldTypeWithRequirement(fieldType, false)
}
func toParquetFieldTypeList(listType *schema_pb.ListType) (parquet.Node, error) {
@@ -58,6 +47,22 @@ func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, er
return parquet.Leaf(parquet.ByteArrayType), nil
case schema_pb.ScalarType_STRING:
return parquet.Leaf(parquet.ByteArrayType), nil
+ // Parquet logical types - map to their physical storage types
+ case schema_pb.ScalarType_TIMESTAMP:
+ // Stored as INT64 (microseconds since Unix epoch)
+ return parquet.Leaf(parquet.Int64Type), nil
+ case schema_pb.ScalarType_DATE:
+ // Stored as INT32 (days since Unix epoch)
+ return parquet.Leaf(parquet.Int32Type), nil
+ case schema_pb.ScalarType_DECIMAL:
+ // Use maximum precision/scale to accommodate any decimal value
+ // Per Parquet spec: precision ≤9→INT32, ≤18→INT64, >18→FixedLenByteArray
+ // Using precision=38 (max for most systems), scale=18 for flexibility
+ // Individual values can have smaller precision/scale, but schema supports maximum
+ return parquet.Decimal(18, 38, parquet.FixedLenByteArrayType(16)), nil
+ case schema_pb.ScalarType_TIME:
+ // Stored as INT64 (microseconds since midnight)
+ return parquet.Leaf(parquet.Int64Type), nil
default:
return nil, fmt.Errorf("unknown scalar type: %v", scalarType)
}
@@ -65,7 +70,7 @@ func toParquetFieldTypeScalar(scalarType schema_pb.ScalarType) (parquet.Node, er
func toParquetFieldTypeRecord(recordType *schema_pb.RecordType) (parquet.Node, error) {
recordNode := parquet.Group{}
for _, field := range recordType.Fields {
- parquetFieldType, err := toParquetFieldType(field.Type)
+ parquetFieldType, err := toParquetFieldTypeWithRequirement(field.Type, field.IsRequired)
if err != nil {
return nil, err
}
@@ -73,3 +78,40 @@ func toParquetFieldTypeRecord(recordType *schema_pb.RecordType) (parquet.Node, e
}
return recordNode, nil
}
+
+// toParquetFieldTypeWithRequirement creates parquet field type respecting required/optional constraints
+func toParquetFieldTypeWithRequirement(fieldType *schema_pb.Type, isRequired bool) (dataType parquet.Node, err error) {
+ switch fieldType.Kind.(type) {
+ case *schema_pb.Type_ScalarType:
+ dataType, err = toParquetFieldTypeScalar(fieldType.GetScalarType())
+ if err != nil {
+ return nil, err
+ }
+ if isRequired {
+ // Required fields are NOT wrapped in Optional
+ return dataType, nil
+ } else {
+ // Optional fields are wrapped in Optional
+ return parquet.Optional(dataType), nil
+ }
+ case *schema_pb.Type_RecordType:
+ dataType, err = toParquetFieldTypeRecord(fieldType.GetRecordType())
+ if err != nil {
+ return nil, err
+ }
+ if isRequired {
+ return dataType, nil
+ } else {
+ return parquet.Optional(dataType), nil
+ }
+ case *schema_pb.Type_ListType:
+ dataType, err = toParquetFieldTypeList(fieldType.GetListType())
+ if err != nil {
+ return nil, err
+ }
+ // Lists are typically optional by nature
+ return dataType, nil
+ default:
+ return nil, fmt.Errorf("unknown field type: %T", fieldType.Kind)
+ }
+}