diff options
| author | dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> | 2025-06-23 10:25:51 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-06-23 10:25:51 -0700 |
| commit | 5f1d2a974567c4ab3b7417c322e79a504b0ce19a (patch) | |
| tree | 0d06055f433ef215a201d124260e0f511972cae8 /weed | |
| parent | b27ba8e984ef680ff63d54d4d734cf6edcce3329 (diff) | |
| download | seaweedfs-5f1d2a974567c4ab3b7417c322e79a504b0ce19a.tar.xz seaweedfs-5f1d2a974567c4ab3b7417c322e79a504b0ce19a.zip | |
chore(deps): bump github.com/parquet-go/parquet-go from 0.24.0 to 0.25.1 (#6851)
* chore(deps): bump github.com/parquet-go/parquet-go from 0.24.0 to 0.25.1
Bumps [github.com/parquet-go/parquet-go](https://github.com/parquet-go/parquet-go) from 0.24.0 to 0.25.1.
- [Release notes](https://github.com/parquet-go/parquet-go/releases)
- [Changelog](https://github.com/parquet-go/parquet-go/blob/main/CHANGELOG.md)
- [Commits](https://github.com/parquet-go/parquet-go/compare/v0.24.0...v0.25.1)
---
updated-dependencies:
- dependency-name: github.com/parquet-go/parquet-go
dependency-version: 0.25.1
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
* adjust to updated API
Fixed Reader Construction: Updated to use parquet.OpenFile() instead of passing io.Reader directly to NewReader()
Fixed EOF Handling: Changed the order of operations to process rows before checking for EOF
Added Zero Row Count Check: Added explicit check for rowCount == 0 as an additional termination condition
---------
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com>
Co-authored-by: chrislu <chris.lu@gmail.com>
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/mq/logstore/read_parquet_to_log.go | 18 | ||||
| -rw-r--r-- | weed/mq/schema/write_parquet_test.go | 41 |
2 files changed, 41 insertions, 18 deletions
diff --git a/weed/mq/logstore/read_parquet_to_log.go b/weed/mq/logstore/read_parquet_to_log.go index 3438af61a..1c53129f4 100644 --- a/weed/mq/logstore/read_parquet_to_log.go +++ b/weed/mq/logstore/read_parquet_to_log.go @@ -4,6 +4,10 @@ import ( "context" "encoding/binary" "fmt" + "io" + "math" + "strings" + "github.com/parquet-go/parquet-go" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/mq/schema" @@ -13,9 +17,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "google.golang.org/protobuf/proto" - "io" - "math" - "strings" ) var ( @@ -42,10 +43,6 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). RecordTypeEnd() - parquetSchema, err := schema.ToParquetSchema(t.Name, recordType) - if err != nil { - return nil - } parquetLevels, err := schema.ToParquetLevels(recordType) if err != nil { return nil @@ -61,11 +58,12 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic readerAt := filer.NewChunkReaderAtFromClient(readerCache, chunkViews, int64(fileSize)) // create parquet reader - parquetReader := parquet.NewReader(readerAt, parquetSchema) + parquetReader := parquet.NewReader(readerAt) rows := make([]parquet.Row, 128) for { rowCount, readErr := parquetReader.ReadRows(rows) + // Process the rows first, even if EOF is returned for i := 0; i < rowCount; i++ { row := rows[i] // convert parquet row to schema_pb.RecordValue @@ -99,12 +97,16 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic } } + // Check for end conditions after processing rows if readErr != nil { if readErr == io.EOF { return processedTsNs, nil } return processedTsNs, readErr } + if rowCount == 0 { + return processedTsNs, nil + } } return } diff --git a/weed/mq/schema/write_parquet_test.go b/weed/mq/schema/write_parquet_test.go index f7ab26860..b7ecdcfc7 100644 --- a/weed/mq/schema/write_parquet_test.go +++ b/weed/mq/schema/write_parquet_test.go @@ -2,12 +2,13 @@ package schema import ( "fmt" - "github.com/parquet-go/parquet-go" - "github.com/parquet-go/parquet-go/compress/zstd" - "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "io" "os" "testing" + + "github.com/parquet-go/parquet-go" + "github.com/parquet-go/parquet-go/compress/zstd" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" ) func TestWriteReadParquet(t *testing.T) { @@ -125,16 +126,25 @@ func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parque t.Fatalf("os.Open failed: %v", err) } defer file.Close() - reader := parquet.NewReader(file, parquetSchema) + + // Get file info to determine size + fileInfo, err := file.Stat() + if err != nil { + t.Fatalf("file.Stat failed: %v", err) + } + + // Create a parquet file from the opened file + parquetFile, err := parquet.OpenFile(file, fileInfo.Size()) + if err != nil { + t.Fatalf("parquet.OpenFile failed: %v", err) + } + + reader := parquet.NewReader(parquetFile) rows := make([]parquet.Row, 128) for { rowCount, err := reader.ReadRows(rows) - if err != nil { - if err == io.EOF { - break - } - t.Fatalf("reader.Read failed: %v", err) - } + + // Process the rows first, even if EOF is returned for i := 0; i < rowCount; i++ { row := rows[i] // convert parquet row to schema_pb.RecordValue @@ -147,6 +157,17 @@ func testReadingParquetFile(t *testing.T, filename string, parquetSchema *parque } } total += rowCount + + // Check for end conditions after processing rows + if err != nil { + if err == io.EOF { + break + } + t.Fatalf("reader.Read failed: %v", err) + } + if rowCount == 0 { + break + } } fmt.Printf("total: %v\n", total) return |
