aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-04-25 09:14:37 -0700
committerchrislu <chris.lu@gmail.com>2024-04-25 09:14:37 -0700
commit977e7988e6663e74f5e717545df8b75cd3d930a6 (patch)
tree401258c84fbf875fd5582f87fb78d02ee0b84635
parent9cb9d27b5b9a60d42f381e314000d5ead53cde0f (diff)
downloadseaweedfs-977e7988e6663e74f5e717545df8b75cd3d930a6.tar.xz
seaweedfs-977e7988e6663e74f5e717545df8b75cd3d930a6.zip
toRow conversion with levels info
-rw-r--r--weed/mq/schema/to_parquet_value.go38
1 files changed, 18 insertions, 20 deletions
diff --git a/weed/mq/schema/to_parquet_value.go b/weed/mq/schema/to_parquet_value.go
index f3907a657..e80b96396 100644
--- a/weed/mq/schema/to_parquet_value.go
+++ b/weed/mq/schema/to_parquet_value.go
@@ -6,24 +6,23 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
-func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int) (endIndex int, err error) {
+func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
- endIndex = columnIndex+1
var parquetValue parquet.Value
parquetValue, err = toParquetValue(fieldValue)
if err != nil {
return
}
- rowBuilder.Add(columnIndex, parquetValue)
+ rowBuilder.Add(levels.startColumnIndex, parquetValue)
// fmt.Printf("rowBuilder.Add %d %v\n", columnIndex, parquetValue)
case *schema_pb.Type_ListType:
- rowBuilder.Next(columnIndex)
+ rowBuilder.Next(levels.startColumnIndex)
// fmt.Printf("rowBuilder.Next %d\n", columnIndex)
elementType := fieldType.GetListType().ElementType
for _, value := range fieldValue.GetListValue().Values {
- if endIndex, err = rowBuilderVisit(rowBuilder, elementType, value, columnIndex); err != nil {
+ if err = rowBuilderVisit(rowBuilder, elementType, levels, value); err != nil {
return
}
}
@@ -32,44 +31,43 @@ func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type,
}
func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, recordValue *schema_pb.RecordValue) error {
- visitor := func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) (endIndex int, err error) {
- return rowBuilderVisit(rowBuilder, fieldType, fieldValue, index)
+ parquetLevels, err := ToParquetLevels(recordType)
+ if err != nil {
+ return err
+ }
+ visitor := func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
+ return rowBuilderVisit(rowBuilder, fieldType, levels, fieldValue)
}
fieldType := &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordType}}
fieldValue := &schema_pb.Value{Kind: &schema_pb.Value_RecordValue{RecordValue: recordValue}}
- return visitValue(fieldType, fieldValue, visitor)
+ return doVisitValue(fieldType, parquetLevels, fieldValue, visitor)
}
// typeValueVisitor is a function that is called for each value in a schema_pb.Value
// Find the column index.
// intended to be used in RowBuilder.Add(columnIndex, value)
-type typeValueVisitor func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) (endIndex int, err error)
-
-func visitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) {
- _, err = doVisitValue(fieldType, fieldValue, 0, visitor)
- return
-}
+type typeValueVisitor func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error)
// endIndex is exclusive
// same logic as RowBuilder.configure in row_builder.go
-func doVisitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int, visitor typeValueVisitor) (endIndex int, err error) {
+func doVisitValue(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) {
switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
- return visitor(fieldType, fieldValue, columnIndex)
+ return visitor(fieldType, levels, fieldValue)
case *schema_pb.Type_ListType:
- return visitor(fieldType, fieldValue, columnIndex)
+ return visitor(fieldType, levels, fieldValue)
case *schema_pb.Type_RecordType:
for _, field := range fieldType.GetRecordType().Fields {
fieldValue, found := fieldValue.GetRecordValue().Fields[field.Name]
if !found {
// TODO check this if no such field found
- return columnIndex, nil
+ continue
}
- endIndex, err = doVisitValue(field.Type, fieldValue, columnIndex, visitor)
+ fieldLevels := levels.levels[field.Name]
+ err = doVisitValue(field.Type, fieldLevels, fieldValue, visitor)
if err != nil {
return
}
- columnIndex = endIndex
}
return
}