diff options
Diffstat (limited to 'weed/mq/schema/to_schema_value.go')
| -rw-r--r-- | weed/mq/schema/to_schema_value.go | 65 |
1 files changed, 63 insertions, 2 deletions
diff --git a/weed/mq/schema/to_schema_value.go b/weed/mq/schema/to_schema_value.go index 947a84310..50e86d233 100644 --- a/weed/mq/schema/to_schema_value.go +++ b/weed/mq/schema/to_schema_value.go @@ -1,7 +1,9 @@ package schema import ( + "bytes" "fmt" + "github.com/parquet-go/parquet-go" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) @@ -77,9 +79,68 @@ func toScalarValue(scalarType schema_pb.ScalarType, levels *ParquetLevels, value case schema_pb.ScalarType_DOUBLE: return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value.Double()}}, valueIndex + 1, nil case schema_pb.ScalarType_BYTES: - return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: value.ByteArray()}}, valueIndex + 1, nil + // Handle nil byte arrays from parquet to prevent growslice panic + byteData := value.ByteArray() + if byteData == nil { + byteData = []byte{} // Use empty slice instead of nil + } + return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: byteData}}, valueIndex + 1, nil case schema_pb.ScalarType_STRING: - return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(value.ByteArray())}}, valueIndex + 1, nil + // Handle nil byte arrays from parquet to prevent string conversion issues + byteData := value.ByteArray() + if byteData == nil { + byteData = []byte{} // Use empty slice instead of nil + } + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(byteData)}}, valueIndex + 1, nil + // Parquet logical types - convert from their physical storage back to logical values + case schema_pb.ScalarType_TIMESTAMP: + // Stored as INT64, convert back to TimestampValue + return &schema_pb.Value{ + Kind: &schema_pb.Value_TimestampValue{ + TimestampValue: &schema_pb.TimestampValue{ + TimestampMicros: value.Int64(), + IsUtc: true, // Default to UTC for compatibility + }, + }, + }, valueIndex + 1, nil + case schema_pb.ScalarType_DATE: + // Stored as INT32, convert back to DateValue + return &schema_pb.Value{ + Kind: &schema_pb.Value_DateValue{ + DateValue: &schema_pb.DateValue{ + DaysSinceEpoch: value.Int32(), + }, + }, + }, valueIndex + 1, nil + case schema_pb.ScalarType_DECIMAL: + // Stored as FixedLenByteArray, convert back to DecimalValue + fixedBytes := value.ByteArray() // FixedLenByteArray also uses ByteArray() method + if fixedBytes == nil { + fixedBytes = []byte{} // Use empty slice instead of nil + } + // Remove leading zeros to get the minimal representation + trimmedBytes := bytes.TrimLeft(fixedBytes, "\x00") + if len(trimmedBytes) == 0 { + trimmedBytes = []byte{0} // Ensure we have at least one byte for zero + } + return &schema_pb.Value{ + Kind: &schema_pb.Value_DecimalValue{ + DecimalValue: &schema_pb.DecimalValue{ + Value: trimmedBytes, + Precision: 38, // Maximum precision supported by schema + Scale: 18, // Maximum scale supported by schema + }, + }, + }, valueIndex + 1, nil + case schema_pb.ScalarType_TIME: + // Stored as INT64, convert back to TimeValue + return &schema_pb.Value{ + Kind: &schema_pb.Value_TimeValue{ + TimeValue: &schema_pb.TimeValue{ + TimeMicros: value.Int64(), + }, + }, + }, valueIndex + 1, nil } return nil, valueIndex, fmt.Errorf("unsupported scalar type: %v", scalarType) } |
