aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/hybrid_message_scanner.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-09-10 11:04:42 -0700
committerGitHub <noreply@github.com>2025-09-10 11:04:42 -0700
commit58e0c1b3301f5c5c0723f71726b6f1053e9152f9 (patch)
tree2ee40273cb74587227052f908a0eb38c14c2cb56 /weed/query/engine/hybrid_message_scanner.go
parent8ed1b104cea39856a51b67d31c6782031c89f642 (diff)
downloadseaweedfs-58e0c1b3301f5c5c0723f71726b6f1053e9152f9.tar.xz
seaweedfs-58e0c1b3301f5c5c0723f71726b6f1053e9152f9.zip
Fix sql bugs (#7219)
* fix nil when explaining * add plain details when running full scan * skip files by timestamp * skip file by timestamp * refactor * handle filter by time * skip broker memory only if it has unflushed messages * refactoring * refactor * address comments * address comments * filter by parquet stats * simplify * refactor * prune old code * optimize * Update aggregations.go * ensure non-time predicates are properly detected * add stmt to populatePlanFileDetails This helper function is a great way to centralize logic for populating file details. However, it's missing an optimization that is present in executeSelectStatementWithBrokerStats: pruning Parquet files based on column statistics from the WHERE clause. Aggregation queries that fall back to the slow path could benefit from this optimization. Consider modifying the function signature to accept the *SelectStatement and adding the column statistics pruning logic here, similar to how it's done in executeSelectStatementWithBrokerStats. * refactoring to work with *schema_pb.Value directly after the initial conversion
Diffstat (limited to 'weed/query/engine/hybrid_message_scanner.go')
-rw-r--r--weed/query/engine/hybrid_message_scanner.go50
1 files changed, 50 insertions, 0 deletions
diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go
index 2584b54a6..eee57bc23 100644
--- a/weed/query/engine/hybrid_message_scanner.go
+++ b/weed/query/engine/hybrid_message_scanner.go
@@ -3,6 +3,7 @@ package engine
import (
"container/heap"
"context"
+ "encoding/binary"
"encoding/json"
"fmt"
"io"
@@ -145,6 +146,46 @@ type ParquetFileStats struct {
FileName string
RowCount int64
ColumnStats map[string]*ParquetColumnStats
+ // Optional file-level timestamp range from filer extended attributes
+ MinTimestampNs int64
+ MaxTimestampNs int64
+}
+
+// getTimestampRangeFromStats returns (minTsNs, maxTsNs, ok) by inspecting common timestamp columns
+func (h *HybridMessageScanner) getTimestampRangeFromStats(fileStats *ParquetFileStats) (int64, int64, bool) {
+ if fileStats == nil {
+ return 0, 0, false
+ }
+ // Prefer column stats for _ts_ns if present
+ if len(fileStats.ColumnStats) > 0 {
+ if s, ok := fileStats.ColumnStats[logstore.SW_COLUMN_NAME_TS]; ok && s != nil && s.MinValue != nil && s.MaxValue != nil {
+ if minNs, okMin := h.schemaValueToNs(s.MinValue); okMin {
+ if maxNs, okMax := h.schemaValueToNs(s.MaxValue); okMax {
+ return minNs, maxNs, true
+ }
+ }
+ }
+ }
+ // Fallback to file-level range if present in filer extended metadata
+ if fileStats.MinTimestampNs != 0 || fileStats.MaxTimestampNs != 0 {
+ return fileStats.MinTimestampNs, fileStats.MaxTimestampNs, true
+ }
+ return 0, 0, false
+}
+
+// schemaValueToNs converts a schema_pb.Value that represents a timestamp to ns
+func (h *HybridMessageScanner) schemaValueToNs(v *schema_pb.Value) (int64, bool) {
+ if v == nil {
+ return 0, false
+ }
+ switch k := v.Kind.(type) {
+ case *schema_pb.Value_Int64Value:
+ return k.Int64Value, true
+ case *schema_pb.Value_Int32Value:
+ return int64(k.Int32Value), true
+ default:
+ return 0, false
+ }
}
// StreamingDataSource provides a streaming interface for reading scan results
@@ -1080,6 +1121,15 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo
RowCount: fileView.NumRows(),
ColumnStats: make(map[string]*ParquetColumnStats),
}
+ // Populate optional min/max from filer extended attributes (writer stores ns timestamps)
+ if entry != nil && entry.Extended != nil {
+ if minBytes, ok := entry.Extended["min"]; ok && len(minBytes) == 8 {
+ fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes))
+ }
+ if maxBytes, ok := entry.Extended["max"]; ok && len(maxBytes) == 8 {
+ fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes))
+ }
+ }
// Get schema information
schema := fileView.Schema()