diff options
Diffstat (limited to 'weed/query/engine/hybrid_message_scanner.go')
| -rw-r--r-- | weed/query/engine/hybrid_message_scanner.go | 50 |
1 files changed, 50 insertions, 0 deletions
diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index 2584b54a6..eee57bc23 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -3,6 +3,7 @@ package engine import ( "container/heap" "context" + "encoding/binary" "encoding/json" "fmt" "io" @@ -145,6 +146,46 @@ type ParquetFileStats struct { FileName string RowCount int64 ColumnStats map[string]*ParquetColumnStats + // Optional file-level timestamp range from filer extended attributes + MinTimestampNs int64 + MaxTimestampNs int64 +} + +// getTimestampRangeFromStats returns (minTsNs, maxTsNs, ok) by inspecting common timestamp columns +func (h *HybridMessageScanner) getTimestampRangeFromStats(fileStats *ParquetFileStats) (int64, int64, bool) { + if fileStats == nil { + return 0, 0, false + } + // Prefer column stats for _ts_ns if present + if len(fileStats.ColumnStats) > 0 { + if s, ok := fileStats.ColumnStats[logstore.SW_COLUMN_NAME_TS]; ok && s != nil && s.MinValue != nil && s.MaxValue != nil { + if minNs, okMin := h.schemaValueToNs(s.MinValue); okMin { + if maxNs, okMax := h.schemaValueToNs(s.MaxValue); okMax { + return minNs, maxNs, true + } + } + } + } + // Fallback to file-level range if present in filer extended metadata + if fileStats.MinTimestampNs != 0 || fileStats.MaxTimestampNs != 0 { + return fileStats.MinTimestampNs, fileStats.MaxTimestampNs, true + } + return 0, 0, false +} + +// schemaValueToNs converts a schema_pb.Value that represents a timestamp to ns +func (h *HybridMessageScanner) schemaValueToNs(v *schema_pb.Value) (int64, bool) { + if v == nil { + return 0, false + } + switch k := v.Kind.(type) { + case *schema_pb.Value_Int64Value: + return k.Int64Value, true + case *schema_pb.Value_Int32Value: + return int64(k.Int32Value), true + default: + return 0, false + } } // StreamingDataSource provides a streaming interface for reading scan results @@ -1080,6 +1121,15 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo RowCount: fileView.NumRows(), ColumnStats: make(map[string]*ParquetColumnStats), } + // Populate optional min/max from filer extended attributes (writer stores ns timestamps) + if entry != nil && entry.Extended != nil { + if minBytes, ok := entry.Extended["min"]; ok && len(minBytes) == 8 { + fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes)) + } + if maxBytes, ok := entry.Extended["max"]; ok && len(maxBytes) == 8 { + fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes)) + } + } // Get schema information schema := fileView.Schema() |
