aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/schema/write_parquet_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/schema/write_parquet_test.go')
-rw-r--r--weed/mq/schema/write_parquet_test.go41
1 files changed, 31 insertions, 10 deletions
diff --git a/weed/mq/schema/write_parquet_test.go b/weed/mq/schema/write_parquet_test.go
index f7ab26860..b7ecdcfc7 100644
--- a/weed/mq/schema/write_parquet_test.go
+++ b/weed/mq/schema/write_parquet_test.go
@@ -2,12 +2,13 @@ package schema
import (
"fmt"
- "github.com/parquet-go/parquet-go"
- "github.com/parquet-go/parquet-go/compress/zstd"
- "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"io"
"os"
"testing"
+
+ "github.com/parquet-go/parquet-go"
+ "github.com/parquet-go/parquet-go/compress/zstd"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
func TestWriteReadParquet(t *testing.T) {
@@ -125,16 +126,25 @@ func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parque
t.Fatalf("os.Open failed: %v", err)
}
defer file.Close()
- reader := parquet.NewReader(file, parquetSchema)
+
+ // Get file info to determine size
+ fileInfo, err := file.Stat()
+ if err != nil {
+ t.Fatalf("file.Stat failed: %v", err)
+ }
+
+ // Create a parquet file from the opened file
+ parquetFile, err := parquet.OpenFile(file, fileInfo.Size())
+ if err != nil {
+ t.Fatalf("parquet.OpenFile failed: %v", err)
+ }
+
+ reader := parquet.NewReader(parquetFile)
rows := make([]parquet.Row, 128)
for {
rowCount, err := reader.ReadRows(rows)
- if err != nil {
- if err == io.EOF {
- break
- }
- t.Fatalf("reader.Read failed: %v", err)
- }
+
+ // Process the rows first, even if EOF is returned
for i := 0; i < rowCount; i++ {
row := rows[i]
// convert parquet row to schema_pb.RecordValue
@@ -147,6 +157,17 @@ func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parque
}
}
total += rowCount
+
+ // Check for end conditions after processing rows
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ t.Fatalf("reader.Read failed: %v", err)
+ }
+ if rowCount == 0 {
+ break
+ }
}
fmt.Printf("total: %v\n", total)
return