diff options
Diffstat (limited to 'weed/mq/schema')
| -rw-r--r-- | weed/mq/schema/write_parquet_test.go | 41 |
1 files changed, 31 insertions, 10 deletions
diff --git a/weed/mq/schema/write_parquet_test.go b/weed/mq/schema/write_parquet_test.go index f7ab26860..b7ecdcfc7 100644 --- a/weed/mq/schema/write_parquet_test.go +++ b/weed/mq/schema/write_parquet_test.go @@ -2,12 +2,13 @@ package schema import ( "fmt" - "github.com/parquet-go/parquet-go" - "github.com/parquet-go/parquet-go/compress/zstd" - "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "io" "os" "testing" + + "github.com/parquet-go/parquet-go" + "github.com/parquet-go/parquet-go/compress/zstd" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) func TestWriteReadParquet(t *testing.T) { @@ -125,16 +126,25 @@ func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parque t.Fatalf("os.Open failed: %v", err) } defer file.Close() - reader := parquet.NewReader(file, parquetSchema) + + // Get file info to determine size + fileInfo, err := file.Stat() + if err != nil { + t.Fatalf("file.Stat failed: %v", err) + } + + // Create a parquet file from the opened file + parquetFile, err := parquet.OpenFile(file, fileInfo.Size()) + if err != nil { + t.Fatalf("parquet.OpenFile failed: %v", err) + } + + reader := parquet.NewReader(parquetFile) rows := make([]parquet.Row, 128) for { rowCount, err := reader.ReadRows(rows) - if err != nil { - if err == io.EOF { - break - } - t.Fatalf("reader.Read failed: %v", err) - } + + // Process the rows first, even if EOF is returned for i := 0; i < rowCount; i++ { row := rows[i] // convert parquet row to schema_pb.RecordValue @@ -147,6 +157,17 @@ func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parque } } total += rowCount + + // Check for end conditions after processing rows + if err != nil { + if err == io.EOF { + break + } + t.Fatalf("reader.Read failed: %v", err) + } + if rowCount == 0 { + break + } } fmt.Printf("total: %v\n", total) return |
