aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-04-25 23:59:30 -0700
committerchrislu <chris.lu@gmail.com>2024-04-25 23:59:30 -0700
commit72b50980f487f8ee3163f1fa4cd72fab0d8f484d (patch)
treed5270b314fb3e850cfa4e91437c719376136d415 /weed
parent8948fb51e363bf1011fb754cddd98038bce682cb (diff)
downloadseaweedfs-72b50980f487f8ee3163f1fa4cd72fab0d8f484d.tar.xz
seaweedfs-72b50980f487f8ee3163f1fa4cd72fab0d8f484d.zip
a little bit more efficient
Diffstat (limited to 'weed')
-rw-r--r--weed/mq/schema/to_parquet_value.go6
-rw-r--r--weed/mq/schema/to_schema_value.go6
-rw-r--r--weed/mq/schema/write_parquet_test.go14
3 files changed, 14 insertions, 12 deletions
diff --git a/weed/mq/schema/to_parquet_value.go b/weed/mq/schema/to_parquet_value.go
index e80b96396..22a93de67 100644
--- a/weed/mq/schema/to_parquet_value.go
+++ b/weed/mq/schema/to_parquet_value.go
@@ -30,11 +30,7 @@ func rowBuilderVisit(rowBuilder *parquet.RowBuilder, fieldType *schema_pb.Type,
return
}
-func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, recordValue *schema_pb.RecordValue) error {
- parquetLevels, err := ToParquetLevels(recordType)
- if err != nil {
- return err
- }
+func AddRecordValue(rowBuilder *parquet.RowBuilder, recordType *schema_pb.RecordType, parquetLevels *ParquetLevels, recordValue *schema_pb.RecordValue) error {
visitor := func(fieldType *schema_pb.Type, levels *ParquetLevels, fieldValue *schema_pb.Value) (err error) {
return rowBuilderVisit(rowBuilder, fieldType, levels, fieldValue)
}
diff --git a/weed/mq/schema/to_schema_value.go b/weed/mq/schema/to_schema_value.go
index ac57ca430..18c1c3b5c 100644
--- a/weed/mq/schema/to_schema_value.go
+++ b/weed/mq/schema/to_schema_value.go
@@ -9,11 +9,7 @@ import (
// ToRecordValue converts a parquet.Row to a schema_pb.RecordValue
// This does not work or did not test with nested structures.
// Using this may fail to convert the parquet.Row to schema_pb.RecordValue
-func ToRecordValue(recordType *schema_pb.RecordType, row parquet.Row) (*schema_pb.RecordValue, error) {
- parquetLevels, err := ToParquetLevels(recordType)
- if err != nil {
- return nil, err
- }
+func ToRecordValue(recordType *schema_pb.RecordType, parquetLevels *ParquetLevels, row parquet.Row) (*schema_pb.RecordValue, error) {
values := []parquet.Value(row)
recordValue, _, err := toRecordValue(recordType, parquetLevels, values, 0)
if err != nil {
diff --git a/weed/mq/schema/write_parquet_test.go b/weed/mq/schema/write_parquet_test.go
index 928dab1d3..ac2d6b154 100644
--- a/weed/mq/schema/write_parquet_test.go
+++ b/weed/mq/schema/write_parquet_test.go
@@ -52,6 +52,11 @@ func TestWriteReadParquet(t *testing.T) {
}
func testWritingParquetFile(t *testing.T, count int, filename string, parquetSchema *parquet.Schema, recordType *schema_pb.RecordType) {
+ parquetLevels, err := ToParquetLevels(recordType)
+ if err != nil {
+ t.Fatalf("ToParquetLevels failed: %v", err)
+ }
+
// create a parquet file
file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0664)
if err != nil {
@@ -75,7 +80,7 @@ func testWritingParquetFile(t *testing.T, count int, filename string, parquetSch
fmt.Sprintf("john_%d@d.com", i),
fmt.Sprintf("john_%d@e.com", i))).
AddStringValue("Company", fmt.Sprintf("company_%d", i)).Build()
- AddRecordValue(rowBuilder, recordType, recordValue)
+ AddRecordValue(rowBuilder, recordType, parquetLevels, recordValue)
if count < 10 {
fmt.Printf("RecordValue: %v\n", recordValue)
@@ -101,6 +106,11 @@ func testWritingParquetFile(t *testing.T, count int, filename string, parquetSch
}
func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parquet.Schema, recordType *schema_pb.RecordType) (total int) {
+ parquetLevels, err := ToParquetLevels(recordType)
+ if err != nil {
+ t.Fatalf("ToParquetLevels failed: %v", err)
+ }
+
// read the parquet file
file, err := os.Open(filename)
if err != nil {
@@ -120,7 +130,7 @@ func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parque
for i := 0; i < rowCount; i++ {
row := rows[i]
// convert parquet row to schema_pb.RecordValue
- recordValue, err := ToRecordValue(recordType, row)
+ recordValue, err := ToRecordValue(recordType, parquetLevels, row)
if err != nil {
t.Fatalf("ToRecordValue failed: %v", err)
}