aboutsummaryrefslogtreecommitdiff
path: root/weed/query
diff options
context:
space:
mode:
Diffstat (limited to 'weed/query')
-rw-r--r--weed/query/engine/aggregations.go166
-rw-r--r--weed/query/engine/engine.go656
-rw-r--r--weed/query/engine/fast_path_predicate_validation_test.go272
-rw-r--r--weed/query/engine/hybrid_message_scanner.go50
-rw-r--r--weed/query/engine/types.go6
5 files changed, 799 insertions, 351 deletions
diff --git a/weed/query/engine/aggregations.go b/weed/query/engine/aggregations.go
index 623e489dd..6b58517e1 100644
--- a/weed/query/engine/aggregations.go
+++ b/weed/query/engine/aggregations.go
@@ -8,10 +8,8 @@ import (
"strings"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
- "github.com/seaweedfs/seaweedfs/weed/util"
)
// AggregationSpec defines an aggregation function to be computed
@@ -78,6 +76,12 @@ func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec)
// CollectDataSources gathers information about available data sources for a topic
func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) {
+ return opt.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, 0, 0)
+}
+
+// CollectDataSourcesWithTimeFilter gathers information about available data sources for a topic
+// with optional time filtering to skip irrelevant parquet files
+func (opt *FastPathOptimizer) CollectDataSourcesWithTimeFilter(ctx context.Context, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) (*TopicDataSources, error) {
dataSources := &TopicDataSources{
ParquetFiles: make(map[string][]*ParquetFileStats),
ParquetRowCount: 0,
@@ -125,14 +129,16 @@ func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScan
fmt.Printf(" No parquet files found in partition\n")
}
} else {
- dataSources.ParquetFiles[partitionPath] = parquetStats
+ // Prune by time range using parquet column statistics
+ filtered := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs)
+ dataSources.ParquetFiles[partitionPath] = filtered
partitionParquetRows := int64(0)
- for _, stat := range parquetStats {
+ for _, stat := range filtered {
partitionParquetRows += stat.RowCount
dataSources.ParquetRowCount += stat.RowCount
}
if isDebugMode(ctx) {
- fmt.Printf(" Found %d parquet files with %d total rows\n", len(parquetStats), partitionParquetRows)
+ fmt.Printf(" Found %d parquet files with %d total rows\n", len(filtered), partitionParquetRows)
}
}
@@ -452,20 +458,27 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS
}
}
- // Extract time filters for optimization
+ // Extract time filters and validate that WHERE clause contains only time-based predicates
startTimeNs, stopTimeNs := int64(0), int64(0)
+ onlyTimePredicates := true
if stmt.Where != nil {
- startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
+ startTimeNs, stopTimeNs, onlyTimePredicates = e.extractTimeFiltersWithValidation(stmt.Where.Expr)
}
- // FAST PATH RE-ENABLED WITH DEBUG LOGGING:
- // Added comprehensive debug logging to identify data counting issues
- // This will help us understand why fast path was returning 0 when slow path returns 1803
- if stmt.Where == nil {
+ // FAST PATH WITH TIME-BASED OPTIMIZATION:
+ // Allow fast path only for queries without WHERE clause or with time-only WHERE clauses
+ // This prevents incorrect results when non-time predicates are present
+ canAttemptFastPath := stmt.Where == nil || onlyTimePredicates
+
+ if canAttemptFastPath {
if isDebugMode(ctx) {
- fmt.Printf("\nFast path optimization attempt...\n")
+ if stmt.Where == nil {
+ fmt.Printf("\nFast path optimization attempt (no WHERE clause)...\n")
+ } else {
+ fmt.Printf("\nFast path optimization attempt (time-only WHERE clause)...\n")
+ }
}
- fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan)
+ fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan, startTimeNs, stopTimeNs, stmt)
if canOptimize {
if isDebugMode(ctx) {
fmt.Printf("Fast path optimization succeeded!\n")
@@ -478,7 +491,7 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS
}
} else {
if isDebugMode(ctx) {
- fmt.Printf("Fast path not applicable due to WHERE clause\n")
+ fmt.Printf("Fast path not applicable due to complex WHERE clause\n")
}
}
@@ -605,23 +618,66 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS
// Build execution tree for aggregation queries if plan is provided
if plan != nil {
+ // Populate detailed plan information for full scan (similar to fast path)
+ e.populateFullScanPlanDetails(ctx, plan, hybridScanner, stmt)
plan.RootNode = e.buildExecutionTree(plan, stmt)
}
return result, nil
}
+// populateFullScanPlanDetails populates detailed plan information for full scan queries
+// This provides consistency with fast path execution plan details
+func (e *SQLEngine) populateFullScanPlanDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, stmt *SelectStatement) {
+ // plan.Details is initialized at the start of the SELECT execution
+
+ // Extract table information
+ var database, tableName string
+ if len(stmt.From) == 1 {
+ if table, ok := stmt.From[0].(*AliasedTableExpr); ok {
+ if tableExpr, ok := table.Expr.(TableName); ok {
+ tableName = tableExpr.Name.String()
+ if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" {
+ database = tableExpr.Qualifier.String()
+ }
+ }
+ }
+ }
+
+ // Use current database if not specified
+ if database == "" {
+ database = e.catalog.currentDatabase
+ if database == "" {
+ database = "default"
+ }
+ }
+
+ // Discover partitions and populate file details
+ if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil {
+ // Add partition paths to execution plan details
+ plan.Details["partition_paths"] = partitions
+
+ // Populate detailed file information using shared helper
+ e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt)
+ } else {
+ // Record discovery error to plan for better diagnostics
+ plan.Details["error_partition_discovery"] = discoverErr.Error()
+ }
+}
+
// tryFastParquetAggregation attempts to compute aggregations using hybrid approach:
// - Use parquet metadata for parquet files
// - Count live log files for live data
// - Combine both for accurate results per partition
// Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used
func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) {
- return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil)
+ return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil, 0, 0, nil)
}
// tryFastParquetAggregationWithPlan is the same as tryFastParquetAggregation but also populates execution plan if provided
-func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan) (*QueryResult, bool) {
+// startTimeNs, stopTimeNs: optional time range filters for parquet file optimization (0 means no filtering)
+// stmt: SELECT statement for column statistics pruning optimization (can be nil)
+func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan, startTimeNs, stopTimeNs int64, stmt *SelectStatement) (*QueryResult, bool) {
// Use the new modular components
optimizer := NewFastPathOptimizer(e)
computer := NewAggregationComputer(e)
@@ -632,8 +688,8 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri
return nil, false
}
- // Step 2: Collect data sources
- dataSources, err := optimizer.CollectDataSources(ctx, hybridScanner)
+ // Step 2: Collect data sources with time filtering for parquet file optimization
+ dataSources, err := optimizer.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, startTimeNs, stopTimeNs)
if err != nil {
return nil, false
}
@@ -725,9 +781,6 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri
}
// Merge details while preserving existing ones
- if plan.Details == nil {
- plan.Details = make(map[string]interface{})
- }
for key, value := range aggPlan.Details {
plan.Details[key] = value
}
@@ -735,51 +788,17 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri
// Add file path information from the data collection
plan.Details["partition_paths"] = partitions
- // Collect actual file information for each partition
- var parquetFiles []string
- var liveLogFiles []string
- parquetSources := make(map[string]bool)
-
- for _, partitionPath := range partitions {
- // Get parquet files for this partition
- if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil {
- for _, stats := range parquetStats {
- parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName))
- }
- }
-
- // Merge accurate parquet sources from metadata (preferred over filename fallback)
- if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil {
- for src := range sources {
- parquetSources[src] = true
- }
- }
+ // Populate detailed file information using shared helper, including time filters for pruning
+ plan.Details[PlanDetailStartTimeNs] = startTimeNs
+ plan.Details[PlanDetailStopTimeNs] = stopTimeNs
+ e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt)
- // Get live log files for this partition
- if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil {
- for _, fileName := range liveFiles {
- // Exclude live log files that have been converted to parquet (deduplicated)
- if parquetSources[fileName] {
- continue
- }
- liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName))
- }
- }
- }
-
- if len(parquetFiles) > 0 {
- plan.Details["parquet_files"] = parquetFiles
- }
- if len(liveLogFiles) > 0 {
- plan.Details["live_log_files"] = liveLogFiles
+ // Update counts to match discovered live log files
+ if liveLogFiles, ok := plan.Details["live_log_files"].([]string); ok {
+ dataSources.LiveLogFilesCount = len(liveLogFiles)
+ plan.LiveLogFilesScanned = len(liveLogFiles)
}
- // Update the dataSources.LiveLogFilesCount to match the actual files found
- dataSources.LiveLogFilesCount = len(liveLogFiles)
-
- // Also update the plan's LiveLogFilesScanned to match
- plan.LiveLogFilesScanned = len(liveLogFiles)
-
// Ensure PartitionsScanned is set so Statistics section appears
if plan.PartitionsScanned == 0 && len(partitions) > 0 {
plan.PartitionsScanned = len(partitions)
@@ -912,24 +931,3 @@ func debugHybridScanOptions(ctx context.Context, options HybridScanOptions, quer
fmt.Printf("==========================================\n")
}
}
-
-// collectLiveLogFileNames collects the names of live log files in a partition
-func collectLiveLogFileNames(filerClient filer_pb.FilerClient, partitionPath string) ([]string, error) {
- var fileNames []string
-
- err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
- // Skip directories and parquet files
- if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") || strings.HasSuffix(entry.Name, ".offset") {
- return nil
- }
-
- // Only include files with actual content
- if len(entry.Chunks) > 0 {
- fileNames = append(fileNames, entry.Name)
- }
-
- return nil
- })
-
- return fileNames, err
-}
diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go
index 84c238583..ffed03f35 100644
--- a/weed/query/engine/engine.go
+++ b/weed/query/engine/engine.go
@@ -495,69 +495,6 @@ func ParseSQL(sql string) (Statement, error) {
}
}
-// extractFunctionArguments extracts the arguments from a function call expression using CockroachDB parser
-func extractFunctionArguments(expr string) ([]SelectExpr, error) {
- // Find the parentheses
- startParen := strings.Index(expr, "(")
- endParen := strings.LastIndex(expr, ")")
-
- if startParen == -1 || endParen == -1 || endParen <= startParen {
- return nil, fmt.Errorf("invalid function syntax")
- }
-
- // Extract arguments string
- argsStr := strings.TrimSpace(expr[startParen+1 : endParen])
-
- // Handle empty arguments
- if argsStr == "" {
- return []SelectExpr{}, nil
- }
-
- // Handle single * argument (for COUNT(*))
- if argsStr == "*" {
- return []SelectExpr{&StarExpr{}}, nil
- }
-
- // Parse multiple arguments separated by commas
- args := []SelectExpr{}
- argParts := strings.Split(argsStr, ",")
-
- // Use CockroachDB parser to parse each argument as a SELECT expression
- cockroachParser := NewCockroachSQLParser()
-
- for _, argPart := range argParts {
- argPart = strings.TrimSpace(argPart)
- if argPart == "*" {
- args = append(args, &StarExpr{})
- } else {
- // Create a dummy SELECT statement to parse the argument expression
- dummySelect := fmt.Sprintf("SELECT %s", argPart)
-
- // Parse using CockroachDB parser
- stmt, err := cockroachParser.ParseSQL(dummySelect)
- if err != nil {
- // If CockroachDB parser fails, fall back to simple column name
- args = append(args, &AliasedExpr{
- Expr: &ColName{Name: stringValue(argPart)},
- })
- continue
- }
-
- // Extract the expression from the parsed SELECT statement
- if selectStmt, ok := stmt.(*SelectStatement); ok && len(selectStmt.SelectExprs) > 0 {
- args = append(args, selectStmt.SelectExprs[0])
- } else {
- // Fallback to column name if parsing fails
- args = append(args, &AliasedExpr{
- Expr: &ColName{Name: stringValue(argPart)},
- })
- }
- }
- }
-
- return args, nil
-}
-
// debugModeKey is used to store debug mode flag in context
type debugModeKey struct{}
@@ -1221,8 +1158,8 @@ func (e *SQLEngine) buildExecutionTree(plan *QueryExecutionPlan, stmt *SelectSta
}
}
- // Create broker buffer node if queried
- if plan.BrokerBufferQueried {
+ // Create broker buffer node only if queried AND has unflushed messages
+ if plan.BrokerBufferQueried && plan.BrokerBufferMessages > 0 {
brokerBufferNodes = append(brokerBufferNodes, &FileSourceNode{
FilePath: "broker_memory_buffer",
SourceType: "broker_buffer",
@@ -1489,6 +1426,8 @@ func (e *SQLEngine) formatOptimization(opt string) string {
return "Duplicate Data Avoidance"
case "predicate_pushdown":
return "WHERE Clause Pushdown"
+ case "column_statistics_pruning":
+ return "Column Statistics File Pruning"
case "column_projection":
return "Column Selection"
case "limit_pushdown":
@@ -1540,6 +1479,10 @@ func (e *SQLEngine) executeDDLStatement(ctx context.Context, stmt *DDLStatement)
// executeSelectStatementWithPlan handles SELECT queries with execution plan tracking
func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) {
+ // Initialize plan details once
+ if plan != nil && plan.Details == nil {
+ plan.Details = make(map[string]interface{})
+ }
// Parse aggregations to populate plan
var aggregations []AggregationSpec
hasAggregations := false
@@ -1577,7 +1520,7 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *Se
if table, ok := stmt.From[0].(*AliasedTableExpr); ok {
if tableExpr, ok := table.Expr.(TableName); ok {
tableName = tableExpr.Name.String()
- if tableExpr.Qualifier.String() != "" {
+ if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" {
database = tableExpr.Qualifier.String()
}
}
@@ -2290,18 +2233,51 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s
if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil {
// Add partition paths to execution plan details
plan.Details["partition_paths"] = partitions
+ // Persist time filter details for downstream pruning/diagnostics
+ plan.Details[PlanDetailStartTimeNs] = startTimeNs
+ plan.Details[PlanDetailStopTimeNs] = stopTimeNs
+
+ if isDebugMode(ctx) {
+ fmt.Printf("Debug: Time filters extracted - startTimeNs=%d stopTimeNs=%d\n", startTimeNs, stopTimeNs)
+ }
// Collect actual file information for each partition
var parquetFiles []string
var liveLogFiles []string
parquetSources := make(map[string]bool)
+ var parquetReadErrors []string
+ var liveLogListErrors []string
for _, partitionPath := range partitions {
// Get parquet files for this partition
if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil {
- for _, stats := range parquetStats {
+ // Prune files by time range with debug logging
+ filteredStats := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs)
+
+ // Further prune by column statistics from WHERE clause
+ if stmt.Where != nil {
+ beforeColumnPrune := len(filteredStats)
+ filteredStats = e.pruneParquetFilesByColumnStats(ctx, filteredStats, stmt.Where.Expr)
+ columnPrunedCount := beforeColumnPrune - len(filteredStats)
+
+ if columnPrunedCount > 0 {
+ if isDebugMode(ctx) {
+ fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath)
+ }
+ // Track column statistics optimization
+ if !contains(plan.OptimizationsUsed, "column_statistics_pruning") {
+ plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning")
+ }
+ }
+ }
+ for _, stats := range filteredStats {
parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName))
}
+ } else {
+ parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err))
+ if isDebugMode(ctx) {
+ fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err)
+ }
}
// Merge accurate parquet sources from metadata
@@ -2320,6 +2296,11 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s
}
liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName))
}
+ } else {
+ liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err))
+ if isDebugMode(ctx) {
+ fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err)
+ }
}
}
@@ -2329,11 +2310,20 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s
if len(liveLogFiles) > 0 {
plan.Details["live_log_files"] = liveLogFiles
}
+ if len(parquetReadErrors) > 0 {
+ plan.Details["error_parquet_statistics"] = parquetReadErrors
+ }
+ if len(liveLogListErrors) > 0 {
+ plan.Details["error_live_log_listing"] = liveLogListErrors
+ }
// Update scan statistics for execution plan display
plan.PartitionsScanned = len(partitions)
plan.ParquetFilesScanned = len(parquetFiles)
plan.LiveLogFilesScanned = len(liveLogFiles)
+ } else {
+ // Handle partition discovery error
+ plan.Details["error_partition_discovery"] = discoverErr.Error()
}
} else {
// Normal mode - just get results
@@ -2377,6 +2367,23 @@ func (e *SQLEngine) extractTimeFilters(expr ExprNode) (int64, int64) {
return startTimeNs, stopTimeNs
}
+// extractTimeFiltersWithValidation extracts time filters and validates that WHERE clause contains only time-based predicates
+// Returns (startTimeNs, stopTimeNs, onlyTimePredicates) where onlyTimePredicates indicates if fast path is safe
+func (e *SQLEngine) extractTimeFiltersWithValidation(expr ExprNode) (int64, int64, bool) {
+ startTimeNs, stopTimeNs := int64(0), int64(0)
+ onlyTimePredicates := true
+
+ // Recursively extract time filters and validate predicates
+ e.extractTimeFiltersWithValidationRecursive(expr, &startTimeNs, &stopTimeNs, &onlyTimePredicates)
+
+ // Special case: if startTimeNs == stopTimeNs, treat it like an equality query
+ if startTimeNs != 0 && startTimeNs == stopTimeNs {
+ stopTimeNs = 0
+ }
+
+ return startTimeNs, stopTimeNs, onlyTimePredicates
+}
+
// extractTimeFiltersRecursive recursively processes WHERE expressions to find time comparisons
func (e *SQLEngine) extractTimeFiltersRecursive(expr ExprNode, startTimeNs, stopTimeNs *int64) {
switch exprType := expr.(type) {
@@ -2396,6 +2403,39 @@ func (e *SQLEngine) extractTimeFiltersRecursive(expr ExprNode, startTimeNs, stop
}
}
+// extractTimeFiltersWithValidationRecursive recursively processes WHERE expressions to find time comparisons and validate predicates
+func (e *SQLEngine) extractTimeFiltersWithValidationRecursive(expr ExprNode, startTimeNs, stopTimeNs *int64, onlyTimePredicates *bool) {
+ switch exprType := expr.(type) {
+ case *ComparisonExpr:
+ // Check if this is a time-based comparison
+ leftCol := e.getColumnName(exprType.Left)
+ rightCol := e.getColumnName(exprType.Right)
+
+ isTimeComparison := e.isTimestampColumn(leftCol) || e.isTimestampColumn(rightCol)
+ if isTimeComparison {
+ // Extract time filter from this comparison
+ e.extractTimeFromComparison(exprType, startTimeNs, stopTimeNs)
+ } else {
+ // Non-time predicate found - fast path is not safe
+ *onlyTimePredicates = false
+ }
+ case *AndExpr:
+ // For AND expressions, both sides must be time-only for fast path to be safe
+ e.extractTimeFiltersWithValidationRecursive(exprType.Left, startTimeNs, stopTimeNs, onlyTimePredicates)
+ e.extractTimeFiltersWithValidationRecursive(exprType.Right, startTimeNs, stopTimeNs, onlyTimePredicates)
+ case *OrExpr:
+ // OR expressions are complex and not supported in fast path
+ *onlyTimePredicates = false
+ return
+ case *ParenExpr:
+ // Unwrap parentheses and continue
+ e.extractTimeFiltersWithValidationRecursive(exprType.Expr, startTimeNs, stopTimeNs, onlyTimePredicates)
+ default:
+ // Unknown expression type - not safe for fast path
+ *onlyTimePredicates = false
+ }
+}
+
// extractTimeFromComparison extracts time bounds from comparison expressions
// Handles comparisons against timestamp columns (system columns and schema-defined timestamp types)
func (e *SQLEngine) extractTimeFromComparison(comp *ComparisonExpr, startTimeNs, stopTimeNs *int64) {
@@ -2465,7 +2505,7 @@ func (e *SQLEngine) isTimestampColumn(columnName string) bool {
}
// System timestamp columns are always time columns
- if columnName == SW_COLUMN_NAME_TIMESTAMP {
+ if columnName == SW_COLUMN_NAME_TIMESTAMP || columnName == SW_DISPLAY_NAME_TIMESTAMP {
return true
}
@@ -2495,6 +2535,280 @@ func (e *SQLEngine) isTimestampColumn(columnName string) bool {
return false
}
+// getTimeFiltersFromPlan extracts time filter values from execution plan details
+func getTimeFiltersFromPlan(plan *QueryExecutionPlan) (startTimeNs, stopTimeNs int64) {
+ if plan == nil || plan.Details == nil {
+ return 0, 0
+ }
+ if startNsVal, ok := plan.Details[PlanDetailStartTimeNs]; ok {
+ if startNs, ok2 := startNsVal.(int64); ok2 {
+ startTimeNs = startNs
+ }
+ }
+ if stopNsVal, ok := plan.Details[PlanDetailStopTimeNs]; ok {
+ if stopNs, ok2 := stopNsVal.(int64); ok2 {
+ stopTimeNs = stopNs
+ }
+ }
+ return
+}
+
+// pruneParquetFilesByTime filters parquet files based on timestamp ranges, with optional debug logging
+func pruneParquetFilesByTime(ctx context.Context, parquetStats []*ParquetFileStats, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) []*ParquetFileStats {
+ if startTimeNs == 0 && stopTimeNs == 0 {
+ return parquetStats
+ }
+
+ debugEnabled := ctx != nil && isDebugMode(ctx)
+ qStart := startTimeNs
+ qStop := stopTimeNs
+ if qStop == 0 {
+ qStop = math.MaxInt64
+ }
+
+ n := 0
+ for _, fs := range parquetStats {
+ if debugEnabled {
+ fmt.Printf("Debug: Checking parquet file %s for pruning\n", fs.FileName)
+ }
+ if minNs, maxNs, ok := hybridScanner.getTimestampRangeFromStats(fs); ok {
+ if debugEnabled {
+ fmt.Printf("Debug: Prune check parquet %s min=%d max=%d qStart=%d qStop=%d\n", fs.FileName, minNs, maxNs, qStart, qStop)
+ }
+ if qStop < minNs || (qStart != 0 && qStart > maxNs) {
+ if debugEnabled {
+ fmt.Printf("Debug: Skipping parquet file %s due to no time overlap\n", fs.FileName)
+ }
+ continue
+ }
+ } else if debugEnabled {
+ fmt.Printf("Debug: No stats range available for parquet %s, cannot prune\n", fs.FileName)
+ }
+ parquetStats[n] = fs
+ n++
+ }
+ return parquetStats[:n]
+}
+
+// pruneParquetFilesByColumnStats filters parquet files based on column statistics and WHERE predicates
+func (e *SQLEngine) pruneParquetFilesByColumnStats(ctx context.Context, parquetStats []*ParquetFileStats, whereExpr ExprNode) []*ParquetFileStats {
+ if whereExpr == nil {
+ return parquetStats
+ }
+
+ debugEnabled := ctx != nil && isDebugMode(ctx)
+ n := 0
+ for _, fs := range parquetStats {
+ if e.canSkipParquetFile(ctx, fs, whereExpr) {
+ if debugEnabled {
+ fmt.Printf("Debug: Skipping parquet file %s due to column statistics pruning\n", fs.FileName)
+ }
+ continue
+ }
+ parquetStats[n] = fs
+ n++
+ }
+ return parquetStats[:n]
+}
+
+// canSkipParquetFile determines if a parquet file can be skipped based on column statistics
+func (e *SQLEngine) canSkipParquetFile(ctx context.Context, fileStats *ParquetFileStats, whereExpr ExprNode) bool {
+ switch expr := whereExpr.(type) {
+ case *ComparisonExpr:
+ return e.canSkipFileByComparison(ctx, fileStats, expr)
+ case *AndExpr:
+ // For AND: skip if ANY condition allows skipping (more aggressive pruning)
+ return e.canSkipParquetFile(ctx, fileStats, expr.Left) || e.canSkipParquetFile(ctx, fileStats, expr.Right)
+ case *OrExpr:
+ // For OR: skip only if ALL conditions allow skipping (conservative)
+ return e.canSkipParquetFile(ctx, fileStats, expr.Left) && e.canSkipParquetFile(ctx, fileStats, expr.Right)
+ default:
+ // Unknown expression type - don't skip
+ return false
+ }
+}
+
+// canSkipFileByComparison checks if a file can be skipped based on a comparison predicate
+func (e *SQLEngine) canSkipFileByComparison(ctx context.Context, fileStats *ParquetFileStats, expr *ComparisonExpr) bool {
+ // Extract column name and comparison value
+ var columnName string
+ var compareSchemaValue *schema_pb.Value
+ var operator string = expr.Operator
+
+ // Determine which side is the column and which is the value
+ if colRef, ok := expr.Left.(*ColName); ok {
+ columnName = colRef.Name.String()
+ if sqlVal, ok := expr.Right.(*SQLVal); ok {
+ compareSchemaValue = e.convertSQLValToSchemaValue(sqlVal)
+ } else {
+ return false // Can't optimize complex expressions
+ }
+ } else if colRef, ok := expr.Right.(*ColName); ok {
+ columnName = colRef.Name.String()
+ if sqlVal, ok := expr.Left.(*SQLVal); ok {
+ compareSchemaValue = e.convertSQLValToSchemaValue(sqlVal)
+ // Flip operator for reversed comparison
+ operator = e.flipOperator(operator)
+ } else {
+ return false
+ }
+ } else {
+ return false // No column reference found
+ }
+
+ // Validate comparison value
+ if compareSchemaValue == nil {
+ return false
+ }
+
+ // Get column statistics
+ colStats, exists := fileStats.ColumnStats[columnName]
+ if !exists || colStats == nil {
+ // Try case-insensitive lookup
+ for colName, stats := range fileStats.ColumnStats {
+ if strings.EqualFold(colName, columnName) {
+ colStats = stats
+ exists = true
+ break
+ }
+ }
+ }
+
+ if !exists || colStats == nil || colStats.MinValue == nil || colStats.MaxValue == nil {
+ return false // No statistics available
+ }
+
+ // Apply pruning logic based on operator
+ switch operator {
+ case ">":
+ // Skip if max(column) <= compareValue
+ return e.compareValues(colStats.MaxValue, compareSchemaValue) <= 0
+ case ">=":
+ // Skip if max(column) < compareValue
+ return e.compareValues(colStats.MaxValue, compareSchemaValue) < 0
+ case "<":
+ // Skip if min(column) >= compareValue
+ return e.compareValues(colStats.MinValue, compareSchemaValue) >= 0
+ case "<=":
+ // Skip if min(column) > compareValue
+ return e.compareValues(colStats.MinValue, compareSchemaValue) > 0
+ case "=":
+ // Skip if compareValue is outside [min, max] range
+ return e.compareValues(compareSchemaValue, colStats.MinValue) < 0 ||
+ e.compareValues(compareSchemaValue, colStats.MaxValue) > 0
+ case "!=", "<>":
+ // Skip if min == max == compareValue (all values are the same and equal to compareValue)
+ return e.compareValues(colStats.MinValue, colStats.MaxValue) == 0 &&
+ e.compareValues(colStats.MinValue, compareSchemaValue) == 0
+ default:
+ return false // Unknown operator
+ }
+}
+
+// flipOperator flips comparison operators when operands are swapped
+func (e *SQLEngine) flipOperator(op string) string {
+ switch op {
+ case ">":
+ return "<"
+ case ">=":
+ return "<="
+ case "<":
+ return ">"
+ case "<=":
+ return ">="
+ case "=", "!=", "<>":
+ return op // These are symmetric
+ default:
+ return op
+ }
+}
+
+// populatePlanFileDetails populates execution plan with detailed file information for partitions
+// Includes column statistics pruning optimization when WHERE clause is provided
+func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, partitions []string, stmt *SelectStatement) {
+ debugEnabled := ctx != nil && isDebugMode(ctx)
+ // Collect actual file information for each partition
+ var parquetFiles []string
+ var liveLogFiles []string
+ parquetSources := make(map[string]bool)
+ var parquetReadErrors []string
+ var liveLogListErrors []string
+
+ // Extract time filters from plan details
+ startTimeNs, stopTimeNs := getTimeFiltersFromPlan(plan)
+
+ for _, partitionPath := range partitions {
+ // Get parquet files for this partition
+ if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil {
+ // Prune files by time range
+ filteredStats := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs)
+
+ // Further prune by column statistics from WHERE clause
+ if stmt != nil && stmt.Where != nil {
+ beforeColumnPrune := len(filteredStats)
+ filteredStats = e.pruneParquetFilesByColumnStats(ctx, filteredStats, stmt.Where.Expr)
+ columnPrunedCount := beforeColumnPrune - len(filteredStats)
+
+ if columnPrunedCount > 0 {
+ if debugEnabled {
+ fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath)
+ }
+ // Track column statistics optimization
+ if !contains(plan.OptimizationsUsed, "column_statistics_pruning") {
+ plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning")
+ }
+ }
+ }
+
+ for _, stats := range filteredStats {
+ parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName))
+ }
+ } else {
+ parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err))
+ if debugEnabled {
+ fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err)
+ }
+ }
+
+ // Merge accurate parquet sources from metadata
+ if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil {
+ for src := range sources {
+ parquetSources[src] = true
+ }
+ }
+
+ // Get live log files for this partition
+ if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil {
+ for _, fileName := range liveFiles {
+ // Exclude live log files that have been converted to parquet (deduplicated)
+ if parquetSources[fileName] {
+ continue
+ }
+ liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName))
+ }
+ } else {
+ liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err))
+ if debugEnabled {
+ fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err)
+ }
+ }
+ }
+
+ // Add file lists to plan details
+ if len(parquetFiles) > 0 {
+ plan.Details["parquet_files"] = parquetFiles
+ }
+ if len(liveLogFiles) > 0 {
+ plan.Details["live_log_files"] = liveLogFiles
+ }
+ if len(parquetReadErrors) > 0 {
+ plan.Details["error_parquet_statistics"] = parquetReadErrors
+ }
+ if len(liveLogListErrors) > 0 {
+ plan.Details["error_live_log_listing"] = liveLogListErrors
+ }
+}
+
// isSQLTypeTimestamp checks if a SQL type string represents a timestamp type
func (e *SQLEngine) isSQLTypeTimestamp(sqlType string) bool {
upperType := strings.ToUpper(strings.TrimSpace(sqlType))
@@ -2664,56 +2978,6 @@ func (e *SQLEngine) buildPredicateWithContext(expr ExprNode, selectExprs []Selec
}
}
-// buildComparisonPredicateWithAliases creates a predicate for comparison operations with alias support
-func (e *SQLEngine) buildComparisonPredicateWithAliases(expr *ComparisonExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) {
- var columnName string
- var compareValue interface{}
- var operator string
-
- // Extract the comparison details, resolving aliases if needed
- leftCol := e.getColumnNameWithAliases(expr.Left, aliases)
- rightCol := e.getColumnNameWithAliases(expr.Right, aliases)
- operator = e.normalizeOperator(expr.Operator)
-
- if leftCol != "" && rightCol == "" {
- // Left side is column, right side is value
- columnName = e.getSystemColumnInternalName(leftCol)
- val, err := e.extractValueFromExpr(expr.Right)
- if err != nil {
- return nil, err
- }
- compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Right)
- } else if rightCol != "" && leftCol == "" {
- // Right side is column, left side is value
- columnName = e.getSystemColumnInternalName(rightCol)
- val, err := e.extractValueFromExpr(expr.Left)
- if err != nil {
- return nil, err
- }
- compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Left)
- // Reverse the operator when column is on the right
- operator = e.reverseOperator(operator)
- } else if leftCol != "" && rightCol != "" {
- return nil, fmt.Errorf("column-to-column comparisons not yet supported")
- } else {
- return nil, fmt.Errorf("at least one side of comparison must be a column")
- }
-
- return func(record *schema_pb.RecordValue) bool {
- fieldValue, exists := record.Fields[columnName]
- if !exists {
- return false
- }
- return e.evaluateComparison(fieldValue, operator, compareValue)
- }, nil
-}
-
-// buildComparisonPredicate creates a predicate for comparison operations (=, <, >, etc.)
-// Handles column names on both left and right sides of the comparison
-func (e *SQLEngine) buildComparisonPredicate(expr *ComparisonExpr) (func(*schema_pb.RecordValue) bool, error) {
- return e.buildComparisonPredicateWithContext(expr, nil)
-}
-
// buildComparisonPredicateWithContext creates a predicate for comparison operations with alias support
func (e *SQLEngine) buildComparisonPredicateWithContext(expr *ComparisonExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) {
var columnName string
@@ -2836,54 +3100,6 @@ func (e *SQLEngine) buildBetweenPredicateWithContext(expr *BetweenExpr, selectEx
}, nil
}
-// buildBetweenPredicateWithAliases creates a predicate for BETWEEN operations with alias support
-func (e *SQLEngine) buildBetweenPredicateWithAliases(expr *BetweenExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) {
- var columnName string
- var fromValue, toValue interface{}
-
- // Extract column name from left side with alias resolution
- leftCol := e.getColumnNameWithAliases(expr.Left, aliases)
- if leftCol == "" {
- return nil, fmt.Errorf("BETWEEN left operand must be a column name, got: %T", expr.Left)
- }
- columnName = e.getSystemColumnInternalName(leftCol)
-
- // Extract FROM value
- fromVal, err := e.extractValueFromExpr(expr.From)
- if err != nil {
- return nil, fmt.Errorf("failed to extract BETWEEN from value: %v", err)
- }
- fromValue = e.convertValueForTimestampColumn(columnName, fromVal, expr.From)
-
- // Extract TO value
- toVal, err := e.extractValueFromExpr(expr.To)
- if err != nil {
- return nil, fmt.Errorf("failed to extract BETWEEN to value: %v", err)
- }
- toValue = e.convertValueForTimestampColumn(columnName, toVal, expr.To)
-
- // Return the predicate function
- return func(record *schema_pb.RecordValue) bool {
- fieldValue, exists := record.Fields[columnName]
- if !exists {
- return false
- }
-
- // Evaluate: fieldValue >= fromValue AND fieldValue <= toValue
- greaterThanOrEqualFrom := e.evaluateComparison(fieldValue, ">=", fromValue)
- lessThanOrEqualTo := e.evaluateComparison(fieldValue, "<=", toValue)
-
- result := greaterThanOrEqualFrom && lessThanOrEqualTo
-
- // Handle NOT BETWEEN
- if expr.Not {
- result = !result
- }
-
- return result
- }, nil
-}
-
// buildIsNullPredicateWithContext creates a predicate for IS NULL operations
func (e *SQLEngine) buildIsNullPredicateWithContext(expr *IsNullExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) {
// Check if the expression is a column name
@@ -2936,50 +3152,6 @@ func (e *SQLEngine) buildIsNotNullPredicateWithContext(expr *IsNotNullExpr, sele
}
}
-// buildIsNullPredicateWithAliases creates a predicate for IS NULL operations with alias support
-func (e *SQLEngine) buildIsNullPredicateWithAliases(expr *IsNullExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) {
- // Extract column name from expression with alias resolution
- columnName := e.getColumnNameWithAliases(expr.Expr, aliases)
- if columnName == "" {
- return nil, fmt.Errorf("IS NULL operand must be a column name, got: %T", expr.Expr)
- }
- columnName = e.getSystemColumnInternalName(columnName)
-
- // Return the predicate function
- return func(record *schema_pb.RecordValue) bool {
- // Check if field exists and if it's null or missing
- fieldValue, exists := record.Fields[columnName]
- if !exists {
- return true // Field doesn't exist = NULL
- }
-
- // Check if the field value itself is null/empty
- return e.isValueNull(fieldValue)
- }, nil
-}
-
-// buildIsNotNullPredicateWithAliases creates a predicate for IS NOT NULL operations with alias support
-func (e *SQLEngine) buildIsNotNullPredicateWithAliases(expr *IsNotNullExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) {
- // Extract column name from expression with alias resolution
- columnName := e.getColumnNameWithAliases(expr.Expr, aliases)
- if columnName == "" {
- return nil, fmt.Errorf("IS NOT NULL operand must be a column name, got: %T", expr.Expr)
- }
- columnName = e.getSystemColumnInternalName(columnName)
-
- // Return the predicate function
- return func(record *schema_pb.RecordValue) bool {
- // Check if field exists and if it's not null
- fieldValue, exists := record.Fields[columnName]
- if !exists {
- return false // Field doesn't exist = NULL, so NOT NULL is false
- }
-
- // Check if the field value itself is not null/empty
- return !e.isValueNull(fieldValue)
- }, nil
-}
-
// isValueNull checks if a schema_pb.Value is null or represents a null value
func (e *SQLEngine) isValueNull(value *schema_pb.Value) bool {
if value == nil {
@@ -3019,33 +3191,6 @@ func (e *SQLEngine) isValueNull(value *schema_pb.Value) bool {
}
}
-// getColumnNameWithAliases extracts column name from expression, resolving aliases if needed
-func (e *SQLEngine) getColumnNameWithAliases(expr ExprNode, aliases map[string]ExprNode) string {
- switch exprType := expr.(type) {
- case *ColName:
- colName := exprType.Name.String()
- // Check if this is an alias that should be resolved
- if aliases != nil {
- if actualExpr, exists := aliases[colName]; exists {
- // Recursively resolve the aliased expression
- return e.getColumnNameWithAliases(actualExpr, nil) // Don't recurse aliases
- }
- }
- return colName
- }
- return ""
-}
-
-// extractValueFromExpr extracts a value from an expression node (for alias support)
-func (e *SQLEngine) extractValueFromExpr(expr ExprNode) (interface{}, error) {
- return e.extractComparisonValue(expr)
-}
-
-// normalizeOperator normalizes comparison operators
-func (e *SQLEngine) normalizeOperator(op string) string {
- return op // For now, just return as-is
-}
-
// extractComparisonValue extracts the comparison value from a SQL expression
func (e *SQLEngine) extractComparisonValue(expr ExprNode) (interface{}, error) {
switch val := expr.(type) {
@@ -4178,31 +4323,6 @@ func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 {
return t.UnixNano()
}
-// countLiveLogRows counts the total number of rows in live log files (non-parquet files) in a partition
-func (e *SQLEngine) countLiveLogRows(partitionPath string) (int64, error) {
- filerClient, err := e.catalog.brokerClient.GetFilerClient()
- if err != nil {
- return 0, err
- }
-
- totalRows := int64(0)
- err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
- if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") {
- return nil // Skip directories and parquet files
- }
-
- // Count rows in live log file
- rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry)
- if err != nil {
- fmt.Printf("Warning: failed to count rows in %s/%s: %v\n", partitionPath, entry.Name, err)
- return nil // Continue with other files
- }
- totalRows += rowCount
- return nil
- })
- return totalRows, err
-}
-
// extractParquetSourceFiles extracts source log file names from parquet file metadata for deduplication
func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map[string]bool {
sourceFiles := make(map[string]bool)
@@ -4226,6 +4346,7 @@ func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map
// countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet and duplicate log buffer data
func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partitionPath string, parquetSourceFiles map[string]bool) (int64, error) {
+ debugEnabled := ctx != nil && isDebugMode(ctx)
filerClient, err := e.catalog.brokerClient.GetFilerClient()
if err != nil {
return 0, err
@@ -4242,14 +4363,14 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context,
// Second, get duplicate files from log buffer metadata
logBufferDuplicates, err := e.buildLogBufferDeduplicationMap(ctx, partitionPath)
if err != nil {
- if isDebugMode(ctx) {
+ if debugEnabled {
fmt.Printf("Warning: failed to build log buffer deduplication map: %v\n", err)
}
logBufferDuplicates = make(map[string]bool)
}
// Debug: Show deduplication status (only in explain mode)
- if isDebugMode(ctx) {
+ if debugEnabled {
if len(actualSourceFiles) > 0 {
fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath)
}
@@ -4266,7 +4387,7 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context,
// Skip files that have been converted to parquet
if actualSourceFiles[entry.Name] {
- if isDebugMode(ctx) {
+ if debugEnabled {
fmt.Printf("Skipping %s (already converted to parquet)\n", entry.Name)
}
return nil
@@ -4274,7 +4395,7 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context,
// Skip files that are duplicated due to log buffer metadata
if logBufferDuplicates[entry.Name] {
- if isDebugMode(ctx) {
+ if debugEnabled {
fmt.Printf("Skipping %s (duplicate log buffer data)\n", entry.Name)
}
return nil
@@ -4345,6 +4466,7 @@ func (e *SQLEngine) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBuffer
// buildLogBufferDeduplicationMap creates a map to track duplicate files based on buffer ranges (ultra-efficient)
func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitionPath string) (map[string]bool, error) {
+ debugEnabled := ctx != nil && isDebugMode(ctx)
if e.catalog.brokerClient == nil {
return make(map[string]bool), nil
}
@@ -4390,7 +4512,7 @@ func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitio
if fileRange.start <= processedRange.end && fileRange.end >= processedRange.start {
// Ranges overlap - this file contains duplicate buffer indexes
isDuplicate = true
- if isDebugMode(ctx) {
+ if debugEnabled {
fmt.Printf("Marking %s as duplicate (buffer range [%d-%d] overlaps with [%d-%d])\n",
entry.Name, fileRange.start, fileRange.end, processedRange.start, processedRange.end)
}
diff --git a/weed/query/engine/fast_path_predicate_validation_test.go b/weed/query/engine/fast_path_predicate_validation_test.go
new file mode 100644
index 000000000..3322ed51f
--- /dev/null
+++ b/weed/query/engine/fast_path_predicate_validation_test.go
@@ -0,0 +1,272 @@
+package engine
+
+import (
+ "testing"
+)
+
+// TestFastPathPredicateValidation tests the critical fix for fast-path aggregation
+// to ensure non-time predicates are properly detected and fast-path is blocked
+func TestFastPathPredicateValidation(t *testing.T) {
+ engine := NewTestSQLEngine()
+
+ testCases := []struct {
+ name string
+ whereClause string
+ expectedTimeOnly bool
+ expectedStartTimeNs int64
+ expectedStopTimeNs int64
+ description string
+ }{
+ {
+ name: "No WHERE clause",
+ whereClause: "",
+ expectedTimeOnly: true, // No WHERE means time-only is true
+ description: "Queries without WHERE clause should allow fast path",
+ },
+ {
+ name: "Time-only predicate (greater than)",
+ whereClause: "_ts > 1640995200000000000",
+ expectedTimeOnly: true,
+ expectedStartTimeNs: 1640995200000000000,
+ expectedStopTimeNs: 0,
+ description: "Pure time predicates should allow fast path",
+ },
+ {
+ name: "Time-only predicate (less than)",
+ whereClause: "_ts < 1640995200000000000",
+ expectedTimeOnly: true,
+ expectedStartTimeNs: 0,
+ expectedStopTimeNs: 1640995200000000000,
+ description: "Pure time predicates should allow fast path",
+ },
+ {
+ name: "Time-only predicate (range with AND)",
+ whereClause: "_ts > 1640995200000000000 AND _ts < 1641081600000000000",
+ expectedTimeOnly: true,
+ expectedStartTimeNs: 1640995200000000000,
+ expectedStopTimeNs: 1641081600000000000,
+ description: "Time range predicates should allow fast path",
+ },
+ {
+ name: "Mixed predicate (time + non-time)",
+ whereClause: "_ts > 1640995200000000000 AND user_id = 'user123'",
+ expectedTimeOnly: false,
+ description: "CRITICAL: Mixed predicates must block fast path to prevent incorrect results",
+ },
+ {
+ name: "Non-time predicate only",
+ whereClause: "user_id = 'user123'",
+ expectedTimeOnly: false,
+ description: "Non-time predicates must block fast path",
+ },
+ {
+ name: "Multiple non-time predicates",
+ whereClause: "user_id = 'user123' AND status = 'active'",
+ expectedTimeOnly: false,
+ description: "Multiple non-time predicates must block fast path",
+ },
+ {
+ name: "OR with time predicate (unsafe)",
+ whereClause: "_ts > 1640995200000000000 OR user_id = 'user123'",
+ expectedTimeOnly: false,
+ description: "OR expressions are complex and must block fast path",
+ },
+ {
+ name: "OR with only time predicates (still unsafe)",
+ whereClause: "_ts > 1640995200000000000 OR _ts < 1640908800000000000",
+ expectedTimeOnly: false,
+ description: "Even time-only OR expressions must block fast path due to complexity",
+ },
+ // Note: Parenthesized expressions are not supported by the current parser
+ // These test cases are commented out until parser support is added
+ {
+ name: "String column comparison",
+ whereClause: "event_type = 'click'",
+ expectedTimeOnly: false,
+ description: "String column comparisons must block fast path",
+ },
+ {
+ name: "Numeric column comparison",
+ whereClause: "id > 1000",
+ expectedTimeOnly: false,
+ description: "Numeric column comparisons must block fast path",
+ },
+ {
+ name: "Internal timestamp column",
+ whereClause: "_timestamp_ns > 1640995200000000000",
+ expectedTimeOnly: true,
+ expectedStartTimeNs: 1640995200000000000,
+ description: "Internal timestamp column should allow fast path",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ // Parse the WHERE clause if present
+ var whereExpr ExprNode
+ if tc.whereClause != "" {
+ sql := "SELECT COUNT(*) FROM test WHERE " + tc.whereClause
+ stmt, err := ParseSQL(sql)
+ if err != nil {
+ t.Fatalf("Failed to parse SQL: %v", err)
+ }
+ selectStmt := stmt.(*SelectStatement)
+ whereExpr = selectStmt.Where.Expr
+ }
+
+ // Test the validation function
+ var startTimeNs, stopTimeNs int64
+ var onlyTimePredicates bool
+
+ if whereExpr == nil {
+ // No WHERE clause case
+ onlyTimePredicates = true
+ } else {
+ startTimeNs, stopTimeNs, onlyTimePredicates = engine.SQLEngine.extractTimeFiltersWithValidation(whereExpr)
+ }
+
+ // Verify the results
+ if onlyTimePredicates != tc.expectedTimeOnly {
+ t.Errorf("Expected onlyTimePredicates=%v, got %v. %s",
+ tc.expectedTimeOnly, onlyTimePredicates, tc.description)
+ }
+
+ // Check time filters if expected
+ if tc.expectedStartTimeNs != 0 && startTimeNs != tc.expectedStartTimeNs {
+ t.Errorf("Expected startTimeNs=%d, got %d", tc.expectedStartTimeNs, startTimeNs)
+ }
+ if tc.expectedStopTimeNs != 0 && stopTimeNs != tc.expectedStopTimeNs {
+ t.Errorf("Expected stopTimeNs=%d, got %d", tc.expectedStopTimeNs, stopTimeNs)
+ }
+
+ t.Logf("✅ %s: onlyTimePredicates=%v, startTimeNs=%d, stopTimeNs=%d",
+ tc.name, onlyTimePredicates, startTimeNs, stopTimeNs)
+ })
+ }
+}
+
+// TestFastPathAggregationSafety tests that fast-path aggregation is only attempted
+// when it's safe to do so (no non-time predicates)
+func TestFastPathAggregationSafety(t *testing.T) {
+ engine := NewTestSQLEngine()
+
+ testCases := []struct {
+ name string
+ sql string
+ shouldUseFastPath bool
+ description string
+ }{
+ {
+ name: "No WHERE - should use fast path",
+ sql: "SELECT COUNT(*) FROM test",
+ shouldUseFastPath: true,
+ description: "Queries without WHERE should use fast path",
+ },
+ {
+ name: "Time-only WHERE - should use fast path",
+ sql: "SELECT COUNT(*) FROM test WHERE _ts > 1640995200000000000",
+ shouldUseFastPath: true,
+ description: "Time-only predicates should use fast path",
+ },
+ {
+ name: "Mixed WHERE - should NOT use fast path",
+ sql: "SELECT COUNT(*) FROM test WHERE _ts > 1640995200000000000 AND user_id = 'user123'",
+ shouldUseFastPath: false,
+ description: "CRITICAL: Mixed predicates must NOT use fast path to prevent wrong results",
+ },
+ {
+ name: "Non-time WHERE - should NOT use fast path",
+ sql: "SELECT COUNT(*) FROM test WHERE user_id = 'user123'",
+ shouldUseFastPath: false,
+ description: "Non-time predicates must NOT use fast path",
+ },
+ {
+ name: "OR expression - should NOT use fast path",
+ sql: "SELECT COUNT(*) FROM test WHERE _ts > 1640995200000000000 OR user_id = 'user123'",
+ shouldUseFastPath: false,
+ description: "OR expressions must NOT use fast path due to complexity",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ // Parse the SQL
+ stmt, err := ParseSQL(tc.sql)
+ if err != nil {
+ t.Fatalf("Failed to parse SQL: %v", err)
+ }
+ selectStmt := stmt.(*SelectStatement)
+
+ // Test the fast path decision logic
+ startTimeNs, stopTimeNs := int64(0), int64(0)
+ onlyTimePredicates := true
+ if selectStmt.Where != nil {
+ startTimeNs, stopTimeNs, onlyTimePredicates = engine.SQLEngine.extractTimeFiltersWithValidation(selectStmt.Where.Expr)
+ }
+
+ canAttemptFastPath := selectStmt.Where == nil || onlyTimePredicates
+
+ // Verify the decision
+ if canAttemptFastPath != tc.shouldUseFastPath {
+ t.Errorf("Expected canAttemptFastPath=%v, got %v. %s",
+ tc.shouldUseFastPath, canAttemptFastPath, tc.description)
+ }
+
+ t.Logf("✅ %s: canAttemptFastPath=%v (onlyTimePredicates=%v, startTimeNs=%d, stopTimeNs=%d)",
+ tc.name, canAttemptFastPath, onlyTimePredicates, startTimeNs, stopTimeNs)
+ })
+ }
+}
+
+// TestTimestampColumnDetection tests that the engine correctly identifies timestamp columns
+func TestTimestampColumnDetection(t *testing.T) {
+ engine := NewTestSQLEngine()
+
+ testCases := []struct {
+ columnName string
+ isTimestamp bool
+ description string
+ }{
+ {
+ columnName: "_ts",
+ isTimestamp: true,
+ description: "System timestamp display column should be detected",
+ },
+ {
+ columnName: "_timestamp_ns",
+ isTimestamp: true,
+ description: "Internal timestamp column should be detected",
+ },
+ {
+ columnName: "user_id",
+ isTimestamp: false,
+ description: "Non-timestamp column should not be detected as timestamp",
+ },
+ {
+ columnName: "id",
+ isTimestamp: false,
+ description: "ID column should not be detected as timestamp",
+ },
+ {
+ columnName: "status",
+ isTimestamp: false,
+ description: "Status column should not be detected as timestamp",
+ },
+ {
+ columnName: "event_type",
+ isTimestamp: false,
+ description: "Event type column should not be detected as timestamp",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.columnName, func(t *testing.T) {
+ isTimestamp := engine.SQLEngine.isTimestampColumn(tc.columnName)
+ if isTimestamp != tc.isTimestamp {
+ t.Errorf("Expected isTimestampColumn(%s)=%v, got %v. %s",
+ tc.columnName, tc.isTimestamp, isTimestamp, tc.description)
+ }
+ t.Logf("✅ Column '%s': isTimestamp=%v", tc.columnName, isTimestamp)
+ })
+ }
+}
diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go
index 2584b54a6..eee57bc23 100644
--- a/weed/query/engine/hybrid_message_scanner.go
+++ b/weed/query/engine/hybrid_message_scanner.go
@@ -3,6 +3,7 @@ package engine
import (
"container/heap"
"context"
+ "encoding/binary"
"encoding/json"
"fmt"
"io"
@@ -145,6 +146,46 @@ type ParquetFileStats struct {
FileName string
RowCount int64
ColumnStats map[string]*ParquetColumnStats
+ // Optional file-level timestamp range from filer extended attributes
+ MinTimestampNs int64
+ MaxTimestampNs int64
+}
+
+// getTimestampRangeFromStats returns (minTsNs, maxTsNs, ok) by inspecting common timestamp columns
+func (h *HybridMessageScanner) getTimestampRangeFromStats(fileStats *ParquetFileStats) (int64, int64, bool) {
+ if fileStats == nil {
+ return 0, 0, false
+ }
+ // Prefer column stats for _ts_ns if present
+ if len(fileStats.ColumnStats) > 0 {
+ if s, ok := fileStats.ColumnStats[logstore.SW_COLUMN_NAME_TS]; ok && s != nil && s.MinValue != nil && s.MaxValue != nil {
+ if minNs, okMin := h.schemaValueToNs(s.MinValue); okMin {
+ if maxNs, okMax := h.schemaValueToNs(s.MaxValue); okMax {
+ return minNs, maxNs, true
+ }
+ }
+ }
+ }
+ // Fallback to file-level range if present in filer extended metadata
+ if fileStats.MinTimestampNs != 0 || fileStats.MaxTimestampNs != 0 {
+ return fileStats.MinTimestampNs, fileStats.MaxTimestampNs, true
+ }
+ return 0, 0, false
+}
+
+// schemaValueToNs converts a schema_pb.Value that represents a timestamp to ns
+func (h *HybridMessageScanner) schemaValueToNs(v *schema_pb.Value) (int64, bool) {
+ if v == nil {
+ return 0, false
+ }
+ switch k := v.Kind.(type) {
+ case *schema_pb.Value_Int64Value:
+ return k.Int64Value, true
+ case *schema_pb.Value_Int32Value:
+ return int64(k.Int32Value), true
+ default:
+ return 0, false
+ }
}
// StreamingDataSource provides a streaming interface for reading scan results
@@ -1080,6 +1121,15 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo
RowCount: fileView.NumRows(),
ColumnStats: make(map[string]*ParquetColumnStats),
}
+ // Populate optional min/max from filer extended attributes (writer stores ns timestamps)
+ if entry != nil && entry.Extended != nil {
+ if minBytes, ok := entry.Extended["min"]; ok && len(minBytes) == 8 {
+ fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes))
+ }
+ if maxBytes, ok := entry.Extended["max"]; ok && len(maxBytes) == 8 {
+ fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes))
+ }
+ }
// Get schema information
schema := fileView.Schema()
diff --git a/weed/query/engine/types.go b/weed/query/engine/types.go
index 08be17fc0..edcd5bd9a 100644
--- a/weed/query/engine/types.go
+++ b/weed/query/engine/types.go
@@ -87,6 +87,12 @@ type QueryExecutionPlan struct {
BufferStartIndex int64 `json:"buffer_start_index,omitempty"`
}
+// Plan detail keys
+const (
+ PlanDetailStartTimeNs = "StartTimeNs"
+ PlanDetailStopTimeNs = "StopTimeNs"
+)
+
// QueryResult represents the result of a SQL query execution
type QueryResult struct {
Columns []string `json:"columns"`