aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/hybrid_message_scanner.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/query/engine/hybrid_message_scanner.go')
-rw-r--r--weed/query/engine/hybrid_message_scanner.go50
1 files changed, 50 insertions, 0 deletions
diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go
index 2584b54a6..eee57bc23 100644
--- a/weed/query/engine/hybrid_message_scanner.go
+++ b/weed/query/engine/hybrid_message_scanner.go
@@ -3,6 +3,7 @@ package engine
import (
"container/heap"
"context"
+ "encoding/binary"
"encoding/json"
"fmt"
"io"
@@ -145,6 +146,46 @@ type ParquetFileStats struct {
FileName string
RowCount int64
ColumnStats map[string]*ParquetColumnStats
+ // Optional file-level timestamp range from filer extended attributes
+ MinTimestampNs int64
+ MaxTimestampNs int64
+}
+
+// getTimestampRangeFromStats returns (minTsNs, maxTsNs, ok) by inspecting common timestamp columns
+func (h *HybridMessageScanner) getTimestampRangeFromStats(fileStats *ParquetFileStats) (int64, int64, bool) {
+ if fileStats == nil {
+ return 0, 0, false
+ }
+ // Prefer column stats for _ts_ns if present
+ if len(fileStats.ColumnStats) > 0 {
+ if s, ok := fileStats.ColumnStats[logstore.SW_COLUMN_NAME_TS]; ok && s != nil && s.MinValue != nil && s.MaxValue != nil {
+ if minNs, okMin := h.schemaValueToNs(s.MinValue); okMin {
+ if maxNs, okMax := h.schemaValueToNs(s.MaxValue); okMax {
+ return minNs, maxNs, true
+ }
+ }
+ }
+ }
+ // Fallback to file-level range if present in filer extended metadata
+ if fileStats.MinTimestampNs != 0 || fileStats.MaxTimestampNs != 0 {
+ return fileStats.MinTimestampNs, fileStats.MaxTimestampNs, true
+ }
+ return 0, 0, false
+}
+
+// schemaValueToNs converts a schema_pb.Value that represents a timestamp to ns
+func (h *HybridMessageScanner) schemaValueToNs(v *schema_pb.Value) (int64, bool) {
+ if v == nil {
+ return 0, false
+ }
+ switch k := v.Kind.(type) {
+ case *schema_pb.Value_Int64Value:
+ return k.Int64Value, true
+ case *schema_pb.Value_Int32Value:
+ return int64(k.Int32Value), true
+ default:
+ return 0, false
+ }
}
// StreamingDataSource provides a streaming interface for reading scan results
@@ -1080,6 +1121,15 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo
RowCount: fileView.NumRows(),
ColumnStats: make(map[string]*ParquetColumnStats),
}
+ // Populate optional min/max from filer extended attributes (writer stores ns timestamps)
+ if entry != nil && entry.Extended != nil {
+ if minBytes, ok := entry.Extended["min"]; ok && len(minBytes) == 8 {
+ fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes))
+ }
+ if maxBytes, ok := entry.Extended["max"]; ok && len(maxBytes) == 8 {
+ fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes))
+ }
+ }
// Get schema information
schema := fileView.Schema()