diff options
| author | chrislu <chris.lu@gmail.com> | 2024-04-25 09:14:37 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-04-25 09:14:37 -0700 |
| commit | 977e7988e6663e74f5e717545df8b75cd3d930a6 (patch) | |
| tree | 401258c84fbf875fd5582f87fb78d02ee0b84635 | |
| parent | 9cb9d27b5b9a60d42f381e314000d5ead53cde0f (diff) | |
| download | seaweedfs-977e7988e6663e74f5e717545df8b75cd3d930a6.tar.xz seaweedfs-977e7988e6663e74f5e717545df8b75cd3d930a6.zip | |
toRow conversion with levels info
| -rw-r--r-- | weed/mq/schema/to_parquet_value.go | 38 |
1 files changed, 18 insertions, 20 deletions
diff --git a/weed/mq/schema/to_parquet_value.go b/weed/mq/schema/to_parquet_value.go index f3907a657..e80b96396 100644 --- a/weed/mq/schema/to_parquet_value.go +++ b/weed/mq/schema/to_parquet_value.go @@ -6,24 +6,23 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) -func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int) (endIndex int, err error) { +func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) { switch fieldType.Kind.(type) { case *schema_pb.Type_ScalarType: - endIndex = columnIndex+1 var parquetValue parquet.Value parquetValue, err = toParquetValue(fieldValue) if err != nil { return } - rowBuilder.Add(columnIndex, parquetValue) + rowBuilder.Add(levels.startColumnIndex, parquetValue) // fmt.Printf("rowBuilder.Add %d %v\n", columnIndex, parquetValue) case *schema_pb.Type_ListType: - rowBuilder.Next(columnIndex) + rowBuilder.Next(levels.startColumnIndex) // fmt.Printf("rowBuilder.Next %d\n", columnIndex) elementType := fieldType.GetListType().ElementType for _, value := range fieldValue.GetListValue().Values { - if endIndex, err = rowBuilderVisit(rowBuilder, elementType, value, columnIndex); err != nil { + if err = rowBuilderVisit(rowBuilder, elementType, levels, value); err != nil { return } } @@ -32,44 +31,43 @@ func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type, } func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, recordValue *schema_pb.RecordValue) error { - visitor := func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) (endIndex int, err error) { - return rowBuilderVisit(rowBuilder, fieldType, fieldValue, index) + parquetLevels, err := ToParquetLevels(recordType) + if err != nil { + return err + } + visitor := func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) { + return rowBuilderVisit(rowBuilder, fieldType, levels, fieldValue) } fieldType := &schema_pb.Type{Kind: &schema_pb.Type_RecordType{RecordType: recordType}} fieldValue := &schema_pb.Value{Kind: &schema_pb.Value_RecordValue{RecordValue: recordValue}} - return visitValue(fieldType, fieldValue, visitor) + return doVisitValue(fieldType, parquetLevels, fieldValue, visitor) } // typeValueVisitor is a function that is called for each value in a schema_pb.Value // Find the column index. // intended to be used in RowBuilder.Add(columnIndex, value) -type typeValueVisitor func(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, index int) (endIndex int, err error) - -func visitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) { - _, err = doVisitValue(fieldType, fieldValue, 0, visitor) - return -} +type typeValueVisitor func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) // endIndex is exclusive // same logic as RowBuilder.configure in row_builder.go -func doVisitValue(fieldType *schema_pb.Type, fieldValue *schema_pb.Value, columnIndex int, visitor typeValueVisitor) (endIndex int, err error) { +func doVisitValue(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value, visitor typeValueVisitor) (err error) { switch fieldType.Kind.(type) { case *schema_pb.Type_ScalarType: - return visitor(fieldType, fieldValue, columnIndex) + return visitor(fieldType, levels, fieldValue) case *schema_pb.Type_ListType: - return visitor(fieldType, fieldValue, columnIndex) + return visitor(fieldType, levels, fieldValue) case *schema_pb.Type_RecordType: for _, field := range fieldType.GetRecordType().Fields { fieldValue, found := fieldValue.GetRecordValue().Fields[field.Name] if !found { // TODO check this if no such field found - return columnIndex, nil + continue } - endIndex, err = doVisitValue(field.Type, fieldValue, columnIndex, visitor) + fieldLevels := levels.levels[field.Name] + err = doVisitValue(field.Type, fieldLevels, fieldValue, visitor) if err != nil { return } - columnIndex = endIndex } return } |
