diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-09-10 11:04:42 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-09-10 11:04:42 -0700 |
| commit | 58e0c1b3301f5c5c0723f71726b6f1053e9152f9 (patch) | |
| tree | 2ee40273cb74587227052f908a0eb38c14c2cb56 /weed/query/engine/hybrid_message_scanner.go | |
| parent | 8ed1b104cea39856a51b67d31c6782031c89f642 (diff) | |
| download | seaweedfs-58e0c1b3301f5c5c0723f71726b6f1053e9152f9.tar.xz seaweedfs-58e0c1b3301f5c5c0723f71726b6f1053e9152f9.zip | |
Fix sql bugs (#7219)
* fix nil when explaining
* add plain details when running full scan
* skip files by timestamp
* skip file by timestamp
* refactor
* handle filter by time
* skip broker memory only if it has unflushed messages
* refactoring
* refactor
* address comments
* address comments
* filter by parquet stats
* simplify
* refactor
* prune old code
* optimize
* Update aggregations.go
* ensure non-time predicates are properly detected
* add stmt to populatePlanFileDetails
This helper function is a great way to centralize logic for populating file details. However, it's missing an optimization that is present in executeSelectStatementWithBrokerStats: pruning Parquet files based on column statistics from the WHERE clause.
Aggregation queries that fall back to the slow path could benefit from this optimization. Consider modifying the function signature to accept the *SelectStatement and adding the column statistics pruning logic here, similar to how it's done in executeSelectStatementWithBrokerStats.
* refactoring to work with *schema_pb.Value directly after the initial conversion
Diffstat (limited to 'weed/query/engine/hybrid_message_scanner.go')
| -rw-r--r-- | weed/query/engine/hybrid_message_scanner.go | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index 2584b54a6..eee57bc23 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -3,6 +3,7 @@ package engine import ( "container/heap" "context" + "encoding/binary" "encoding/json" "fmt" "io" @@ -145,6 +146,46 @@ type ParquetFileStats struct { FileName string RowCount int64 ColumnStats map[string]*ParquetColumnStats + // Optional file-level timestamp range from filer extended attributes + MinTimestampNs int64 + MaxTimestampNs int64 +} + +// getTimestampRangeFromStats returns (minTsNs, maxTsNs, ok) by inspecting common timestamp columns +func (h *HybridMessageScanner) getTimestampRangeFromStats(fileStats *ParquetFileStats) (int64, int64, bool) { + if fileStats == nil { + return 0, 0, false + } + // Prefer column stats for _ts_ns if present + if len(fileStats.ColumnStats) > 0 { + if s, ok := fileStats.ColumnStats[logstore.SW_COLUMN_NAME_TS]; ok && s != nil && s.MinValue != nil && s.MaxValue != nil { + if minNs, okMin := h.schemaValueToNs(s.MinValue); okMin { + if maxNs, okMax := h.schemaValueToNs(s.MaxValue); okMax { + return minNs, maxNs, true + } + } + } + } + // Fallback to file-level range if present in filer extended metadata + if fileStats.MinTimestampNs != 0 || fileStats.MaxTimestampNs != 0 { + return fileStats.MinTimestampNs, fileStats.MaxTimestampNs, true + } + return 0, 0, false +} + +// schemaValueToNs converts a schema_pb.Value that represents a timestamp to ns +func (h *HybridMessageScanner) schemaValueToNs(v *schema_pb.Value) (int64, bool) { + if v == nil { + return 0, false + } + switch k := v.Kind.(type) { + case *schema_pb.Value_Int64Value: + return k.Int64Value, true + case *schema_pb.Value_Int32Value: + return int64(k.Int32Value), true + default: + return 0, false + } } // StreamingDataSource provides a streaming interface for reading scan results @@ -1080,6 +1121,15 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo RowCount: fileView.NumRows(), ColumnStats: make(map[string]*ParquetColumnStats), } + // Populate optional min/max from filer extended attributes (writer stores ns timestamps) + if entry != nil && entry.Extended != nil { + if minBytes, ok := entry.Extended["min"]; ok && len(minBytes) == 8 { + fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes)) + } + if maxBytes, ok := entry.Extended["max"]; ok && len(maxBytes) == 8 { + fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes)) + } + } // Get schema information schema := fileView.Schema() |
