diff options
Diffstat (limited to 'weed/mq/schema/to_parquet_value_test.go')
| -rw-r--r-- | weed/mq/schema/to_parquet_value_test.go | 666 |
1 files changed, 666 insertions, 0 deletions
diff --git a/weed/mq/schema/to_parquet_value_test.go b/weed/mq/schema/to_parquet_value_test.go new file mode 100644 index 000000000..71bd94ba5 --- /dev/null +++ b/weed/mq/schema/to_parquet_value_test.go @@ -0,0 +1,666 @@ +package schema + +import ( + "math/big" + "testing" + "time" + + "github.com/parquet-go/parquet-go" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +func TestToParquetValue_BasicTypes(t *testing.T) { + tests := []struct { + name string + value *schema_pb.Value + expected parquet.Value + wantErr bool + }{ + { + name: "BoolValue true", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_BoolValue{BoolValue: true}, + }, + expected: parquet.BooleanValue(true), + }, + { + name: "Int32Value", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_Int32Value{Int32Value: 42}, + }, + expected: parquet.Int32Value(42), + }, + { + name: "Int64Value", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: 12345678901234}, + }, + expected: parquet.Int64Value(12345678901234), + }, + { + name: "FloatValue", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_FloatValue{FloatValue: 3.14159}, + }, + expected: parquet.FloatValue(3.14159), + }, + { + name: "DoubleValue", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DoubleValue{DoubleValue: 2.718281828}, + }, + expected: parquet.DoubleValue(2.718281828), + }, + { + name: "BytesValue", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: []byte("hello world")}, + }, + expected: parquet.ByteArrayValue([]byte("hello world")), + }, + { + name: "BytesValue empty", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: []byte{}}, + }, + expected: parquet.ByteArrayValue([]byte{}), + }, + { + name: "StringValue", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_StringValue{StringValue: "test string"}, + }, + expected: parquet.ByteArrayValue([]byte("test string")), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := toParquetValue(tt.value) + if (err != nil) != tt.wantErr { + t.Errorf("toParquetValue() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !parquetValuesEqual(result, tt.expected) { + t.Errorf("toParquetValue() = %v, want %v", result, tt.expected) + } + }) + } +} + +func TestToParquetValue_TimestampValue(t *testing.T) { + tests := []struct { + name string + value *schema_pb.Value + expected parquet.Value + wantErr bool + }{ + { + name: "Valid TimestampValue UTC", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_TimestampValue{ + TimestampValue: &schema_pb.TimestampValue{ + TimestampMicros: 1704067200000000, // 2024-01-01 00:00:00 UTC in microseconds + IsUtc: true, + }, + }, + }, + expected: parquet.Int64Value(1704067200000000), + }, + { + name: "Valid TimestampValue local", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_TimestampValue{ + TimestampValue: &schema_pb.TimestampValue{ + TimestampMicros: 1704067200000000, + IsUtc: false, + }, + }, + }, + expected: parquet.Int64Value(1704067200000000), + }, + { + name: "TimestampValue zero", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_TimestampValue{ + TimestampValue: &schema_pb.TimestampValue{ + TimestampMicros: 0, + IsUtc: true, + }, + }, + }, + expected: parquet.Int64Value(0), + }, + { + name: "TimestampValue negative (before epoch)", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_TimestampValue{ + TimestampValue: &schema_pb.TimestampValue{ + TimestampMicros: -1000000, // 1 second before epoch + IsUtc: true, + }, + }, + }, + expected: parquet.Int64Value(-1000000), + }, + { + name: "TimestampValue nil pointer", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_TimestampValue{ + TimestampValue: nil, + }, + }, + expected: parquet.NullValue(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := toParquetValue(tt.value) + if (err != nil) != tt.wantErr { + t.Errorf("toParquetValue() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !parquetValuesEqual(result, tt.expected) { + t.Errorf("toParquetValue() = %v, want %v", result, tt.expected) + } + }) + } +} + +func TestToParquetValue_DateValue(t *testing.T) { + tests := []struct { + name string + value *schema_pb.Value + expected parquet.Value + wantErr bool + }{ + { + name: "Valid DateValue (2024-01-01)", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DateValue{ + DateValue: &schema_pb.DateValue{ + DaysSinceEpoch: 19723, // 2024-01-01 = 19723 days since epoch + }, + }, + }, + expected: parquet.Int32Value(19723), + }, + { + name: "DateValue epoch (1970-01-01)", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DateValue{ + DateValue: &schema_pb.DateValue{ + DaysSinceEpoch: 0, + }, + }, + }, + expected: parquet.Int32Value(0), + }, + { + name: "DateValue before epoch", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DateValue{ + DateValue: &schema_pb.DateValue{ + DaysSinceEpoch: -365, // 1969-01-01 + }, + }, + }, + expected: parquet.Int32Value(-365), + }, + { + name: "DateValue nil pointer", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DateValue{ + DateValue: nil, + }, + }, + expected: parquet.NullValue(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := toParquetValue(tt.value) + if (err != nil) != tt.wantErr { + t.Errorf("toParquetValue() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !parquetValuesEqual(result, tt.expected) { + t.Errorf("toParquetValue() = %v, want %v", result, tt.expected) + } + }) + } +} + +func TestToParquetValue_DecimalValue(t *testing.T) { + tests := []struct { + name string + value *schema_pb.Value + expected parquet.Value + wantErr bool + }{ + { + name: "Small Decimal (precision <= 9) - positive", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DecimalValue{ + DecimalValue: &schema_pb.DecimalValue{ + Value: encodeBigIntToBytes(big.NewInt(12345)), // 123.45 with scale 2 + Precision: 5, + Scale: 2, + }, + }, + }, + expected: createFixedLenByteArray(encodeBigIntToBytes(big.NewInt(12345))), // FixedLenByteArray conversion + }, + { + name: "Small Decimal (precision <= 9) - negative", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DecimalValue{ + DecimalValue: &schema_pb.DecimalValue{ + Value: encodeBigIntToBytes(big.NewInt(-12345)), + Precision: 5, + Scale: 2, + }, + }, + }, + expected: createFixedLenByteArray(encodeBigIntToBytes(big.NewInt(-12345))), // FixedLenByteArray conversion + }, + { + name: "Medium Decimal (9 < precision <= 18)", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DecimalValue{ + DecimalValue: &schema_pb.DecimalValue{ + Value: encodeBigIntToBytes(big.NewInt(123456789012345)), + Precision: 15, + Scale: 2, + }, + }, + }, + expected: createFixedLenByteArray(encodeBigIntToBytes(big.NewInt(123456789012345))), // FixedLenByteArray conversion + }, + { + name: "Large Decimal (precision > 18)", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DecimalValue{ + DecimalValue: &schema_pb.DecimalValue{ + Value: []byte{0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF}, // Large number as bytes + Precision: 25, + Scale: 5, + }, + }, + }, + expected: createFixedLenByteArray([]byte{0x01, 0x23, 0x45, 0x67, 0x89, 0xAB, 0xCD, 0xEF}), // FixedLenByteArray conversion + }, + { + name: "Decimal with zero precision", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DecimalValue{ + DecimalValue: &schema_pb.DecimalValue{ + Value: encodeBigIntToBytes(big.NewInt(0)), + Precision: 0, + Scale: 0, + }, + }, + }, + expected: createFixedLenByteArray(encodeBigIntToBytes(big.NewInt(0))), // Zero as FixedLenByteArray + }, + { + name: "Decimal nil pointer", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DecimalValue{ + DecimalValue: nil, + }, + }, + expected: parquet.NullValue(), + }, + { + name: "Decimal with nil Value bytes", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DecimalValue{ + DecimalValue: &schema_pb.DecimalValue{ + Value: nil, // This was the original panic cause + Precision: 5, + Scale: 2, + }, + }, + }, + expected: parquet.NullValue(), + }, + { + name: "Decimal with empty Value bytes", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DecimalValue{ + DecimalValue: &schema_pb.DecimalValue{ + Value: []byte{}, // Empty slice + Precision: 5, + Scale: 2, + }, + }, + }, + expected: parquet.NullValue(), // Returns null for empty bytes + }, + { + name: "Decimal out of int32 range (stored as binary)", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DecimalValue{ + DecimalValue: &schema_pb.DecimalValue{ + Value: encodeBigIntToBytes(big.NewInt(999999999999)), // Too large for int32 + Precision: 5, // But precision says int32 + Scale: 0, + }, + }, + }, + expected: createFixedLenByteArray(encodeBigIntToBytes(big.NewInt(999999999999))), // FixedLenByteArray + }, + { + name: "Decimal out of int64 range (stored as binary)", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DecimalValue{ + DecimalValue: &schema_pb.DecimalValue{ + Value: func() []byte { + // Create a number larger than int64 max + bigNum := new(big.Int) + bigNum.SetString("99999999999999999999999999999", 10) + return encodeBigIntToBytes(bigNum) + }(), + Precision: 15, // Says int64 but value is too large + Scale: 0, + }, + }, + }, + expected: createFixedLenByteArray(func() []byte { + bigNum := new(big.Int) + bigNum.SetString("99999999999999999999999999999", 10) + return encodeBigIntToBytes(bigNum) + }()), // Large number as FixedLenByteArray (truncated to 16 bytes) + }, + { + name: "Decimal extremely large value (should be rejected)", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_DecimalValue{ + DecimalValue: &schema_pb.DecimalValue{ + Value: make([]byte, 100), // 100 bytes > 64 byte limit + Precision: 100, + Scale: 0, + }, + }, + }, + expected: parquet.NullValue(), + wantErr: true, // Should return error instead of corrupting data + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := toParquetValue(tt.value) + if (err != nil) != tt.wantErr { + t.Errorf("toParquetValue() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !parquetValuesEqual(result, tt.expected) { + t.Errorf("toParquetValue() = %v, want %v", result, tt.expected) + } + }) + } +} + +func TestToParquetValue_TimeValue(t *testing.T) { + tests := []struct { + name string + value *schema_pb.Value + expected parquet.Value + wantErr bool + }{ + { + name: "Valid TimeValue (12:34:56.789)", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_TimeValue{ + TimeValue: &schema_pb.TimeValue{ + TimeMicros: 45296789000, // 12:34:56.789 in microseconds since midnight + }, + }, + }, + expected: parquet.Int64Value(45296789000), + }, + { + name: "TimeValue midnight", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_TimeValue{ + TimeValue: &schema_pb.TimeValue{ + TimeMicros: 0, + }, + }, + }, + expected: parquet.Int64Value(0), + }, + { + name: "TimeValue end of day (23:59:59.999999)", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_TimeValue{ + TimeValue: &schema_pb.TimeValue{ + TimeMicros: 86399999999, // 23:59:59.999999 + }, + }, + }, + expected: parquet.Int64Value(86399999999), + }, + { + name: "TimeValue nil pointer", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_TimeValue{ + TimeValue: nil, + }, + }, + expected: parquet.NullValue(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := toParquetValue(tt.value) + if (err != nil) != tt.wantErr { + t.Errorf("toParquetValue() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !parquetValuesEqual(result, tt.expected) { + t.Errorf("toParquetValue() = %v, want %v", result, tt.expected) + } + }) + } +} + +func TestToParquetValue_EdgeCases(t *testing.T) { + tests := []struct { + name string + value *schema_pb.Value + expected parquet.Value + wantErr bool + }{ + { + name: "Nil value", + value: &schema_pb.Value{ + Kind: nil, + }, + wantErr: true, + }, + { + name: "Completely nil value", + value: nil, + wantErr: true, + }, + { + name: "BytesValue with nil slice", + value: &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: nil}, + }, + expected: parquet.ByteArrayValue([]byte{}), // Should convert nil to empty slice + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := toParquetValue(tt.value) + if (err != nil) != tt.wantErr { + t.Errorf("toParquetValue() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr && !parquetValuesEqual(result, tt.expected) { + t.Errorf("toParquetValue() = %v, want %v", result, tt.expected) + } + }) + } +} + +// Helper function to encode a big.Int to bytes using two's complement representation +func encodeBigIntToBytes(n *big.Int) []byte { + if n.Sign() == 0 { + return []byte{0} + } + + // For positive numbers, just use Bytes() + if n.Sign() > 0 { + return n.Bytes() + } + + // For negative numbers, we need two's complement representation + bitLen := n.BitLen() + if bitLen%8 != 0 { + bitLen += 8 - (bitLen % 8) // Round up to byte boundary + } + byteLen := bitLen / 8 + if byteLen == 0 { + byteLen = 1 + } + + // Calculate 2^(byteLen*8) + modulus := new(big.Int).Lsh(big.NewInt(1), uint(byteLen*8)) + + // Convert negative to positive representation: n + 2^(byteLen*8) + positive := new(big.Int).Add(n, modulus) + + bytes := positive.Bytes() + + // Pad with leading zeros if needed + if len(bytes) < byteLen { + padded := make([]byte, byteLen) + copy(padded[byteLen-len(bytes):], bytes) + return padded + } + + return bytes +} + +// Helper function to create a FixedLenByteArray(16) matching our conversion logic +func createFixedLenByteArray(inputBytes []byte) parquet.Value { + fixedBytes := make([]byte, 16) + if len(inputBytes) <= 16 { + // Right-align the value (big-endian) - same as our conversion logic + copy(fixedBytes[16-len(inputBytes):], inputBytes) + } else { + // Truncate if too large, taking the least significant bytes + copy(fixedBytes, inputBytes[len(inputBytes)-16:]) + } + return parquet.FixedLenByteArrayValue(fixedBytes) +} + +// Helper function to compare parquet values +func parquetValuesEqual(a, b parquet.Value) bool { + // Handle both being null + if a.IsNull() && b.IsNull() { + return true + } + if a.IsNull() != b.IsNull() { + return false + } + + // Compare kind first + if a.Kind() != b.Kind() { + return false + } + + // Compare based on type + switch a.Kind() { + case parquet.Boolean: + return a.Boolean() == b.Boolean() + case parquet.Int32: + return a.Int32() == b.Int32() + case parquet.Int64: + return a.Int64() == b.Int64() + case parquet.Float: + return a.Float() == b.Float() + case parquet.Double: + return a.Double() == b.Double() + case parquet.ByteArray: + aBytes := a.ByteArray() + bBytes := b.ByteArray() + if len(aBytes) != len(bBytes) { + return false + } + for i, v := range aBytes { + if v != bBytes[i] { + return false + } + } + return true + case parquet.FixedLenByteArray: + aBytes := a.ByteArray() // FixedLenByteArray also uses ByteArray() method + bBytes := b.ByteArray() + if len(aBytes) != len(bBytes) { + return false + } + for i, v := range aBytes { + if v != bBytes[i] { + return false + } + } + return true + default: + return false + } +} + +// Benchmark tests +func BenchmarkToParquetValue_BasicTypes(b *testing.B) { + value := &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: 12345678901234}, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = toParquetValue(value) + } +} + +func BenchmarkToParquetValue_TimestampValue(b *testing.B) { + value := &schema_pb.Value{ + Kind: &schema_pb.Value_TimestampValue{ + TimestampValue: &schema_pb.TimestampValue{ + TimestampMicros: time.Now().UnixMicro(), + IsUtc: true, + }, + }, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = toParquetValue(value) + } +} + +func BenchmarkToParquetValue_DecimalValue(b *testing.B) { + value := &schema_pb.Value{ + Kind: &schema_pb.Value_DecimalValue{ + DecimalValue: &schema_pb.DecimalValue{ + Value: encodeBigIntToBytes(big.NewInt(123456789012345)), + Precision: 15, + Scale: 2, + }, + }, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = toParquetValue(value) + } +} |
