aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/schema/to_schema_value.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/schema/to_schema_value.go')
-rw-r--r--weed/mq/schema/to_schema_value.go65
1 files changed, 63 insertions, 2 deletions
diff --git a/weed/mq/schema/to_schema_value.go b/weed/mq/schema/to_schema_value.go
index 947a84310..50e86d233 100644
--- a/weed/mq/schema/to_schema_value.go
+++ b/weed/mq/schema/to_schema_value.go
@@ -1,7 +1,9 @@
package schema
import (
+ "bytes"
"fmt"
+
"github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
@@ -77,9 +79,68 @@ func toScalarValue(scalarType schema_pb.ScalarType, levels *ParquetLevels, value
case schema_pb.ScalarType_DOUBLE:
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: value.Double()}}, valueIndex + 1, nil
case schema_pb.ScalarType_BYTES:
- return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: value.ByteArray()}}, valueIndex + 1, nil
+ // Handle nil byte arrays from parquet to prevent growslice panic
+ byteData := value.ByteArray()
+ if byteData == nil {
+ byteData = []byte{} // Use empty slice instead of nil
+ }
+ return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: byteData}}, valueIndex + 1, nil
case schema_pb.ScalarType_STRING:
- return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(value.ByteArray())}}, valueIndex + 1, nil
+ // Handle nil byte arrays from parquet to prevent string conversion issues
+ byteData := value.ByteArray()
+ if byteData == nil {
+ byteData = []byte{} // Use empty slice instead of nil
+ }
+ return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(byteData)}}, valueIndex + 1, nil
+ // Parquet logical types - convert from their physical storage back to logical values
+ case schema_pb.ScalarType_TIMESTAMP:
+ // Stored as INT64, convert back to TimestampValue
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_TimestampValue{
+ TimestampValue: &schema_pb.TimestampValue{
+ TimestampMicros: value.Int64(),
+ IsUtc: true, // Default to UTC for compatibility
+ },
+ },
+ }, valueIndex + 1, nil
+ case schema_pb.ScalarType_DATE:
+ // Stored as INT32, convert back to DateValue
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_DateValue{
+ DateValue: &schema_pb.DateValue{
+ DaysSinceEpoch: value.Int32(),
+ },
+ },
+ }, valueIndex + 1, nil
+ case schema_pb.ScalarType_DECIMAL:
+ // Stored as FixedLenByteArray, convert back to DecimalValue
+ fixedBytes := value.ByteArray() // FixedLenByteArray also uses ByteArray() method
+ if fixedBytes == nil {
+ fixedBytes = []byte{} // Use empty slice instead of nil
+ }
+ // Remove leading zeros to get the minimal representation
+ trimmedBytes := bytes.TrimLeft(fixedBytes, "\x00")
+ if len(trimmedBytes) == 0 {
+ trimmedBytes = []byte{0} // Ensure we have at least one byte for zero
+ }
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_DecimalValue{
+ DecimalValue: &schema_pb.DecimalValue{
+ Value: trimmedBytes,
+ Precision: 38, // Maximum precision supported by schema
+ Scale: 18, // Maximum scale supported by schema
+ },
+ },
+ }, valueIndex + 1, nil
+ case schema_pb.ScalarType_TIME:
+ // Stored as INT64, convert back to TimeValue
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_TimeValue{
+ TimeValue: &schema_pb.TimeValue{
+ TimeMicros: value.Int64(),
+ },
+ },
+ }, valueIndex + 1, nil
}
return nil, valueIndex, fmt.Errorf("unsupported scalar type: %v", scalarType)
}