diff options
Diffstat (limited to 'weed/query/engine/engine.go')
| -rw-r--r-- | weed/query/engine/engine.go | 656 |
1 files changed, 389 insertions, 267 deletions
diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index 84c238583..ffed03f35 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -495,69 +495,6 @@ func ParseSQL(sql string) (Statement, error) { } } -// extractFunctionArguments extracts the arguments from a function call expression using CockroachDB parser -func extractFunctionArguments(expr string) ([]SelectExpr, error) { - // Find the parentheses - startParen := strings.Index(expr, "(") - endParen := strings.LastIndex(expr, ")") - - if startParen == -1 || endParen == -1 || endParen <= startParen { - return nil, fmt.Errorf("invalid function syntax") - } - - // Extract arguments string - argsStr := strings.TrimSpace(expr[startParen+1 : endParen]) - - // Handle empty arguments - if argsStr == "" { - return []SelectExpr{}, nil - } - - // Handle single * argument (for COUNT(*)) - if argsStr == "*" { - return []SelectExpr{&StarExpr{}}, nil - } - - // Parse multiple arguments separated by commas - args := []SelectExpr{} - argParts := strings.Split(argsStr, ",") - - // Use CockroachDB parser to parse each argument as a SELECT expression - cockroachParser := NewCockroachSQLParser() - - for _, argPart := range argParts { - argPart = strings.TrimSpace(argPart) - if argPart == "*" { - args = append(args, &StarExpr{}) - } else { - // Create a dummy SELECT statement to parse the argument expression - dummySelect := fmt.Sprintf("SELECT %s", argPart) - - // Parse using CockroachDB parser - stmt, err := cockroachParser.ParseSQL(dummySelect) - if err != nil { - // If CockroachDB parser fails, fall back to simple column name - args = append(args, &AliasedExpr{ - Expr: &ColName{Name: stringValue(argPart)}, - }) - continue - } - - // Extract the expression from the parsed SELECT statement - if selectStmt, ok := stmt.(*SelectStatement); ok && len(selectStmt.SelectExprs) > 0 { - args = append(args, selectStmt.SelectExprs[0]) - } else { - // Fallback to column name if parsing fails - args = append(args, &AliasedExpr{ - Expr: &ColName{Name: stringValue(argPart)}, - }) - } - } - } - - return args, nil -} - // debugModeKey is used to store debug mode flag in context type debugModeKey struct{} @@ -1221,8 +1158,8 @@ func (e *SQLEngine) buildExecutionTree(plan *QueryExecutionPlan, stmt *SelectSta } } - // Create broker buffer node if queried - if plan.BrokerBufferQueried { + // Create broker buffer node only if queried AND has unflushed messages + if plan.BrokerBufferQueried && plan.BrokerBufferMessages > 0 { brokerBufferNodes = append(brokerBufferNodes, &FileSourceNode{ FilePath: "broker_memory_buffer", SourceType: "broker_buffer", @@ -1489,6 +1426,8 @@ func (e *SQLEngine) formatOptimization(opt string) string { return "Duplicate Data Avoidance" case "predicate_pushdown": return "WHERE Clause Pushdown" + case "column_statistics_pruning": + return "Column Statistics File Pruning" case "column_projection": return "Column Selection" case "limit_pushdown": @@ -1540,6 +1479,10 @@ func (e *SQLEngine) executeDDLStatement(ctx context.Context, stmt *DDLStatement) // executeSelectStatementWithPlan handles SELECT queries with execution plan tracking func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { + // Initialize plan details once + if plan != nil && plan.Details == nil { + plan.Details = make(map[string]interface{}) + } // Parse aggregations to populate plan var aggregations []AggregationSpec hasAggregations := false @@ -1577,7 +1520,7 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *Se if table, ok := stmt.From[0].(*AliasedTableExpr); ok { if tableExpr, ok := table.Expr.(TableName); ok { tableName = tableExpr.Name.String() - if tableExpr.Qualifier.String() != "" { + if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { database = tableExpr.Qualifier.String() } } @@ -2290,18 +2233,51 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil { // Add partition paths to execution plan details plan.Details["partition_paths"] = partitions + // Persist time filter details for downstream pruning/diagnostics + plan.Details[PlanDetailStartTimeNs] = startTimeNs + plan.Details[PlanDetailStopTimeNs] = stopTimeNs + + if isDebugMode(ctx) { + fmt.Printf("Debug: Time filters extracted - startTimeNs=%d stopTimeNs=%d\n", startTimeNs, stopTimeNs) + } // Collect actual file information for each partition var parquetFiles []string var liveLogFiles []string parquetSources := make(map[string]bool) + var parquetReadErrors []string + var liveLogListErrors []string for _, partitionPath := range partitions { // Get parquet files for this partition if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { - for _, stats := range parquetStats { + // Prune files by time range with debug logging + filteredStats := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs) + + // Further prune by column statistics from WHERE clause + if stmt.Where != nil { + beforeColumnPrune := len(filteredStats) + filteredStats = e.pruneParquetFilesByColumnStats(ctx, filteredStats, stmt.Where.Expr) + columnPrunedCount := beforeColumnPrune - len(filteredStats) + + if columnPrunedCount > 0 { + if isDebugMode(ctx) { + fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath) + } + // Track column statistics optimization + if !contains(plan.OptimizationsUsed, "column_statistics_pruning") { + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning") + } + } + } + for _, stats := range filteredStats { parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) } + } else { + parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err)) + if isDebugMode(ctx) { + fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err) + } } // Merge accurate parquet sources from metadata @@ -2320,6 +2296,11 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s } liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) } + } else { + liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err)) + if isDebugMode(ctx) { + fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err) + } } } @@ -2329,11 +2310,20 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s if len(liveLogFiles) > 0 { plan.Details["live_log_files"] = liveLogFiles } + if len(parquetReadErrors) > 0 { + plan.Details["error_parquet_statistics"] = parquetReadErrors + } + if len(liveLogListErrors) > 0 { + plan.Details["error_live_log_listing"] = liveLogListErrors + } // Update scan statistics for execution plan display plan.PartitionsScanned = len(partitions) plan.ParquetFilesScanned = len(parquetFiles) plan.LiveLogFilesScanned = len(liveLogFiles) + } else { + // Handle partition discovery error + plan.Details["error_partition_discovery"] = discoverErr.Error() } } else { // Normal mode - just get results @@ -2377,6 +2367,23 @@ func (e *SQLEngine) extractTimeFilters(expr ExprNode) (int64, int64) { return startTimeNs, stopTimeNs } +// extractTimeFiltersWithValidation extracts time filters and validates that WHERE clause contains only time-based predicates +// Returns (startTimeNs, stopTimeNs, onlyTimePredicates) where onlyTimePredicates indicates if fast path is safe +func (e *SQLEngine) extractTimeFiltersWithValidation(expr ExprNode) (int64, int64, bool) { + startTimeNs, stopTimeNs := int64(0), int64(0) + onlyTimePredicates := true + + // Recursively extract time filters and validate predicates + e.extractTimeFiltersWithValidationRecursive(expr, &startTimeNs, &stopTimeNs, &onlyTimePredicates) + + // Special case: if startTimeNs == stopTimeNs, treat it like an equality query + if startTimeNs != 0 && startTimeNs == stopTimeNs { + stopTimeNs = 0 + } + + return startTimeNs, stopTimeNs, onlyTimePredicates +} + // extractTimeFiltersRecursive recursively processes WHERE expressions to find time comparisons func (e *SQLEngine) extractTimeFiltersRecursive(expr ExprNode, startTimeNs, stopTimeNs *int64) { switch exprType := expr.(type) { @@ -2396,6 +2403,39 @@ func (e *SQLEngine) extractTimeFiltersRecursive(expr ExprNode, startTimeNs, stop } } +// extractTimeFiltersWithValidationRecursive recursively processes WHERE expressions to find time comparisons and validate predicates +func (e *SQLEngine) extractTimeFiltersWithValidationRecursive(expr ExprNode, startTimeNs, stopTimeNs *int64, onlyTimePredicates *bool) { + switch exprType := expr.(type) { + case *ComparisonExpr: + // Check if this is a time-based comparison + leftCol := e.getColumnName(exprType.Left) + rightCol := e.getColumnName(exprType.Right) + + isTimeComparison := e.isTimestampColumn(leftCol) || e.isTimestampColumn(rightCol) + if isTimeComparison { + // Extract time filter from this comparison + e.extractTimeFromComparison(exprType, startTimeNs, stopTimeNs) + } else { + // Non-time predicate found - fast path is not safe + *onlyTimePredicates = false + } + case *AndExpr: + // For AND expressions, both sides must be time-only for fast path to be safe + e.extractTimeFiltersWithValidationRecursive(exprType.Left, startTimeNs, stopTimeNs, onlyTimePredicates) + e.extractTimeFiltersWithValidationRecursive(exprType.Right, startTimeNs, stopTimeNs, onlyTimePredicates) + case *OrExpr: + // OR expressions are complex and not supported in fast path + *onlyTimePredicates = false + return + case *ParenExpr: + // Unwrap parentheses and continue + e.extractTimeFiltersWithValidationRecursive(exprType.Expr, startTimeNs, stopTimeNs, onlyTimePredicates) + default: + // Unknown expression type - not safe for fast path + *onlyTimePredicates = false + } +} + // extractTimeFromComparison extracts time bounds from comparison expressions // Handles comparisons against timestamp columns (system columns and schema-defined timestamp types) func (e *SQLEngine) extractTimeFromComparison(comp *ComparisonExpr, startTimeNs, stopTimeNs *int64) { @@ -2465,7 +2505,7 @@ func (e *SQLEngine) isTimestampColumn(columnName string) bool { } // System timestamp columns are always time columns - if columnName == SW_COLUMN_NAME_TIMESTAMP { + if columnName == SW_COLUMN_NAME_TIMESTAMP || columnName == SW_DISPLAY_NAME_TIMESTAMP { return true } @@ -2495,6 +2535,280 @@ func (e *SQLEngine) isTimestampColumn(columnName string) bool { return false } +// getTimeFiltersFromPlan extracts time filter values from execution plan details +func getTimeFiltersFromPlan(plan *QueryExecutionPlan) (startTimeNs, stopTimeNs int64) { + if plan == nil || plan.Details == nil { + return 0, 0 + } + if startNsVal, ok := plan.Details[PlanDetailStartTimeNs]; ok { + if startNs, ok2 := startNsVal.(int64); ok2 { + startTimeNs = startNs + } + } + if stopNsVal, ok := plan.Details[PlanDetailStopTimeNs]; ok { + if stopNs, ok2 := stopNsVal.(int64); ok2 { + stopTimeNs = stopNs + } + } + return +} + +// pruneParquetFilesByTime filters parquet files based on timestamp ranges, with optional debug logging +func pruneParquetFilesByTime(ctx context.Context, parquetStats []*ParquetFileStats, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) []*ParquetFileStats { + if startTimeNs == 0 && stopTimeNs == 0 { + return parquetStats + } + + debugEnabled := ctx != nil && isDebugMode(ctx) + qStart := startTimeNs + qStop := stopTimeNs + if qStop == 0 { + qStop = math.MaxInt64 + } + + n := 0 + for _, fs := range parquetStats { + if debugEnabled { + fmt.Printf("Debug: Checking parquet file %s for pruning\n", fs.FileName) + } + if minNs, maxNs, ok := hybridScanner.getTimestampRangeFromStats(fs); ok { + if debugEnabled { + fmt.Printf("Debug: Prune check parquet %s min=%d max=%d qStart=%d qStop=%d\n", fs.FileName, minNs, maxNs, qStart, qStop) + } + if qStop < minNs || (qStart != 0 && qStart > maxNs) { + if debugEnabled { + fmt.Printf("Debug: Skipping parquet file %s due to no time overlap\n", fs.FileName) + } + continue + } + } else if debugEnabled { + fmt.Printf("Debug: No stats range available for parquet %s, cannot prune\n", fs.FileName) + } + parquetStats[n] = fs + n++ + } + return parquetStats[:n] +} + +// pruneParquetFilesByColumnStats filters parquet files based on column statistics and WHERE predicates +func (e *SQLEngine) pruneParquetFilesByColumnStats(ctx context.Context, parquetStats []*ParquetFileStats, whereExpr ExprNode) []*ParquetFileStats { + if whereExpr == nil { + return parquetStats + } + + debugEnabled := ctx != nil && isDebugMode(ctx) + n := 0 + for _, fs := range parquetStats { + if e.canSkipParquetFile(ctx, fs, whereExpr) { + if debugEnabled { + fmt.Printf("Debug: Skipping parquet file %s due to column statistics pruning\n", fs.FileName) + } + continue + } + parquetStats[n] = fs + n++ + } + return parquetStats[:n] +} + +// canSkipParquetFile determines if a parquet file can be skipped based on column statistics +func (e *SQLEngine) canSkipParquetFile(ctx context.Context, fileStats *ParquetFileStats, whereExpr ExprNode) bool { + switch expr := whereExpr.(type) { + case *ComparisonExpr: + return e.canSkipFileByComparison(ctx, fileStats, expr) + case *AndExpr: + // For AND: skip if ANY condition allows skipping (more aggressive pruning) + return e.canSkipParquetFile(ctx, fileStats, expr.Left) || e.canSkipParquetFile(ctx, fileStats, expr.Right) + case *OrExpr: + // For OR: skip only if ALL conditions allow skipping (conservative) + return e.canSkipParquetFile(ctx, fileStats, expr.Left) && e.canSkipParquetFile(ctx, fileStats, expr.Right) + default: + // Unknown expression type - don't skip + return false + } +} + +// canSkipFileByComparison checks if a file can be skipped based on a comparison predicate +func (e *SQLEngine) canSkipFileByComparison(ctx context.Context, fileStats *ParquetFileStats, expr *ComparisonExpr) bool { + // Extract column name and comparison value + var columnName string + var compareSchemaValue *schema_pb.Value + var operator string = expr.Operator + + // Determine which side is the column and which is the value + if colRef, ok := expr.Left.(*ColName); ok { + columnName = colRef.Name.String() + if sqlVal, ok := expr.Right.(*SQLVal); ok { + compareSchemaValue = e.convertSQLValToSchemaValue(sqlVal) + } else { + return false // Can't optimize complex expressions + } + } else if colRef, ok := expr.Right.(*ColName); ok { + columnName = colRef.Name.String() + if sqlVal, ok := expr.Left.(*SQLVal); ok { + compareSchemaValue = e.convertSQLValToSchemaValue(sqlVal) + // Flip operator for reversed comparison + operator = e.flipOperator(operator) + } else { + return false + } + } else { + return false // No column reference found + } + + // Validate comparison value + if compareSchemaValue == nil { + return false + } + + // Get column statistics + colStats, exists := fileStats.ColumnStats[columnName] + if !exists || colStats == nil { + // Try case-insensitive lookup + for colName, stats := range fileStats.ColumnStats { + if strings.EqualFold(colName, columnName) { + colStats = stats + exists = true + break + } + } + } + + if !exists || colStats == nil || colStats.MinValue == nil || colStats.MaxValue == nil { + return false // No statistics available + } + + // Apply pruning logic based on operator + switch operator { + case ">": + // Skip if max(column) <= compareValue + return e.compareValues(colStats.MaxValue, compareSchemaValue) <= 0 + case ">=": + // Skip if max(column) < compareValue + return e.compareValues(colStats.MaxValue, compareSchemaValue) < 0 + case "<": + // Skip if min(column) >= compareValue + return e.compareValues(colStats.MinValue, compareSchemaValue) >= 0 + case "<=": + // Skip if min(column) > compareValue + return e.compareValues(colStats.MinValue, compareSchemaValue) > 0 + case "=": + // Skip if compareValue is outside [min, max] range + return e.compareValues(compareSchemaValue, colStats.MinValue) < 0 || + e.compareValues(compareSchemaValue, colStats.MaxValue) > 0 + case "!=", "<>": + // Skip if min == max == compareValue (all values are the same and equal to compareValue) + return e.compareValues(colStats.MinValue, colStats.MaxValue) == 0 && + e.compareValues(colStats.MinValue, compareSchemaValue) == 0 + default: + return false // Unknown operator + } +} + +// flipOperator flips comparison operators when operands are swapped +func (e *SQLEngine) flipOperator(op string) string { + switch op { + case ">": + return "<" + case ">=": + return "<=" + case "<": + return ">" + case "<=": + return ">=" + case "=", "!=", "<>": + return op // These are symmetric + default: + return op + } +} + +// populatePlanFileDetails populates execution plan with detailed file information for partitions +// Includes column statistics pruning optimization when WHERE clause is provided +func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, partitions []string, stmt *SelectStatement) { + debugEnabled := ctx != nil && isDebugMode(ctx) + // Collect actual file information for each partition + var parquetFiles []string + var liveLogFiles []string + parquetSources := make(map[string]bool) + var parquetReadErrors []string + var liveLogListErrors []string + + // Extract time filters from plan details + startTimeNs, stopTimeNs := getTimeFiltersFromPlan(plan) + + for _, partitionPath := range partitions { + // Get parquet files for this partition + if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { + // Prune files by time range + filteredStats := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs) + + // Further prune by column statistics from WHERE clause + if stmt != nil && stmt.Where != nil { + beforeColumnPrune := len(filteredStats) + filteredStats = e.pruneParquetFilesByColumnStats(ctx, filteredStats, stmt.Where.Expr) + columnPrunedCount := beforeColumnPrune - len(filteredStats) + + if columnPrunedCount > 0 { + if debugEnabled { + fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath) + } + // Track column statistics optimization + if !contains(plan.OptimizationsUsed, "column_statistics_pruning") { + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning") + } + } + } + + for _, stats := range filteredStats { + parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) + } + } else { + parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err)) + if debugEnabled { + fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err) + } + } + + // Merge accurate parquet sources from metadata + if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil { + for src := range sources { + parquetSources[src] = true + } + } + + // Get live log files for this partition + if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil { + for _, fileName := range liveFiles { + // Exclude live log files that have been converted to parquet (deduplicated) + if parquetSources[fileName] { + continue + } + liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) + } + } else { + liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err)) + if debugEnabled { + fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err) + } + } + } + + // Add file lists to plan details + if len(parquetFiles) > 0 { + plan.Details["parquet_files"] = parquetFiles + } + if len(liveLogFiles) > 0 { + plan.Details["live_log_files"] = liveLogFiles + } + if len(parquetReadErrors) > 0 { + plan.Details["error_parquet_statistics"] = parquetReadErrors + } + if len(liveLogListErrors) > 0 { + plan.Details["error_live_log_listing"] = liveLogListErrors + } +} + // isSQLTypeTimestamp checks if a SQL type string represents a timestamp type func (e *SQLEngine) isSQLTypeTimestamp(sqlType string) bool { upperType := strings.ToUpper(strings.TrimSpace(sqlType)) @@ -2664,56 +2978,6 @@ func (e *SQLEngine) buildPredicateWithContext(expr ExprNode, selectExprs []Selec } } -// buildComparisonPredicateWithAliases creates a predicate for comparison operations with alias support -func (e *SQLEngine) buildComparisonPredicateWithAliases(expr *ComparisonExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { - var columnName string - var compareValue interface{} - var operator string - - // Extract the comparison details, resolving aliases if needed - leftCol := e.getColumnNameWithAliases(expr.Left, aliases) - rightCol := e.getColumnNameWithAliases(expr.Right, aliases) - operator = e.normalizeOperator(expr.Operator) - - if leftCol != "" && rightCol == "" { - // Left side is column, right side is value - columnName = e.getSystemColumnInternalName(leftCol) - val, err := e.extractValueFromExpr(expr.Right) - if err != nil { - return nil, err - } - compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Right) - } else if rightCol != "" && leftCol == "" { - // Right side is column, left side is value - columnName = e.getSystemColumnInternalName(rightCol) - val, err := e.extractValueFromExpr(expr.Left) - if err != nil { - return nil, err - } - compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Left) - // Reverse the operator when column is on the right - operator = e.reverseOperator(operator) - } else if leftCol != "" && rightCol != "" { - return nil, fmt.Errorf("column-to-column comparisons not yet supported") - } else { - return nil, fmt.Errorf("at least one side of comparison must be a column") - } - - return func(record *schema_pb.RecordValue) bool { - fieldValue, exists := record.Fields[columnName] - if !exists { - return false - } - return e.evaluateComparison(fieldValue, operator, compareValue) - }, nil -} - -// buildComparisonPredicate creates a predicate for comparison operations (=, <, >, etc.) -// Handles column names on both left and right sides of the comparison -func (e *SQLEngine) buildComparisonPredicate(expr *ComparisonExpr) (func(*schema_pb.RecordValue) bool, error) { - return e.buildComparisonPredicateWithContext(expr, nil) -} - // buildComparisonPredicateWithContext creates a predicate for comparison operations with alias support func (e *SQLEngine) buildComparisonPredicateWithContext(expr *ComparisonExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { var columnName string @@ -2836,54 +3100,6 @@ func (e *SQLEngine) buildBetweenPredicateWithContext(expr *BetweenExpr, selectEx }, nil } -// buildBetweenPredicateWithAliases creates a predicate for BETWEEN operations with alias support -func (e *SQLEngine) buildBetweenPredicateWithAliases(expr *BetweenExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { - var columnName string - var fromValue, toValue interface{} - - // Extract column name from left side with alias resolution - leftCol := e.getColumnNameWithAliases(expr.Left, aliases) - if leftCol == "" { - return nil, fmt.Errorf("BETWEEN left operand must be a column name, got: %T", expr.Left) - } - columnName = e.getSystemColumnInternalName(leftCol) - - // Extract FROM value - fromVal, err := e.extractValueFromExpr(expr.From) - if err != nil { - return nil, fmt.Errorf("failed to extract BETWEEN from value: %v", err) - } - fromValue = e.convertValueForTimestampColumn(columnName, fromVal, expr.From) - - // Extract TO value - toVal, err := e.extractValueFromExpr(expr.To) - if err != nil { - return nil, fmt.Errorf("failed to extract BETWEEN to value: %v", err) - } - toValue = e.convertValueForTimestampColumn(columnName, toVal, expr.To) - - // Return the predicate function - return func(record *schema_pb.RecordValue) bool { - fieldValue, exists := record.Fields[columnName] - if !exists { - return false - } - - // Evaluate: fieldValue >= fromValue AND fieldValue <= toValue - greaterThanOrEqualFrom := e.evaluateComparison(fieldValue, ">=", fromValue) - lessThanOrEqualTo := e.evaluateComparison(fieldValue, "<=", toValue) - - result := greaterThanOrEqualFrom && lessThanOrEqualTo - - // Handle NOT BETWEEN - if expr.Not { - result = !result - } - - return result - }, nil -} - // buildIsNullPredicateWithContext creates a predicate for IS NULL operations func (e *SQLEngine) buildIsNullPredicateWithContext(expr *IsNullExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { // Check if the expression is a column name @@ -2936,50 +3152,6 @@ func (e *SQLEngine) buildIsNotNullPredicateWithContext(expr *IsNotNullExpr, sele } } -// buildIsNullPredicateWithAliases creates a predicate for IS NULL operations with alias support -func (e *SQLEngine) buildIsNullPredicateWithAliases(expr *IsNullExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { - // Extract column name from expression with alias resolution - columnName := e.getColumnNameWithAliases(expr.Expr, aliases) - if columnName == "" { - return nil, fmt.Errorf("IS NULL operand must be a column name, got: %T", expr.Expr) - } - columnName = e.getSystemColumnInternalName(columnName) - - // Return the predicate function - return func(record *schema_pb.RecordValue) bool { - // Check if field exists and if it's null or missing - fieldValue, exists := record.Fields[columnName] - if !exists { - return true // Field doesn't exist = NULL - } - - // Check if the field value itself is null/empty - return e.isValueNull(fieldValue) - }, nil -} - -// buildIsNotNullPredicateWithAliases creates a predicate for IS NOT NULL operations with alias support -func (e *SQLEngine) buildIsNotNullPredicateWithAliases(expr *IsNotNullExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { - // Extract column name from expression with alias resolution - columnName := e.getColumnNameWithAliases(expr.Expr, aliases) - if columnName == "" { - return nil, fmt.Errorf("IS NOT NULL operand must be a column name, got: %T", expr.Expr) - } - columnName = e.getSystemColumnInternalName(columnName) - - // Return the predicate function - return func(record *schema_pb.RecordValue) bool { - // Check if field exists and if it's not null - fieldValue, exists := record.Fields[columnName] - if !exists { - return false // Field doesn't exist = NULL, so NOT NULL is false - } - - // Check if the field value itself is not null/empty - return !e.isValueNull(fieldValue) - }, nil -} - // isValueNull checks if a schema_pb.Value is null or represents a null value func (e *SQLEngine) isValueNull(value *schema_pb.Value) bool { if value == nil { @@ -3019,33 +3191,6 @@ func (e *SQLEngine) isValueNull(value *schema_pb.Value) bool { } } -// getColumnNameWithAliases extracts column name from expression, resolving aliases if needed -func (e *SQLEngine) getColumnNameWithAliases(expr ExprNode, aliases map[string]ExprNode) string { - switch exprType := expr.(type) { - case *ColName: - colName := exprType.Name.String() - // Check if this is an alias that should be resolved - if aliases != nil { - if actualExpr, exists := aliases[colName]; exists { - // Recursively resolve the aliased expression - return e.getColumnNameWithAliases(actualExpr, nil) // Don't recurse aliases - } - } - return colName - } - return "" -} - -// extractValueFromExpr extracts a value from an expression node (for alias support) -func (e *SQLEngine) extractValueFromExpr(expr ExprNode) (interface{}, error) { - return e.extractComparisonValue(expr) -} - -// normalizeOperator normalizes comparison operators -func (e *SQLEngine) normalizeOperator(op string) string { - return op // For now, just return as-is -} - // extractComparisonValue extracts the comparison value from a SQL expression func (e *SQLEngine) extractComparisonValue(expr ExprNode) (interface{}, error) { switch val := expr.(type) { @@ -4178,31 +4323,6 @@ func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 { return t.UnixNano() } -// countLiveLogRows counts the total number of rows in live log files (non-parquet files) in a partition -func (e *SQLEngine) countLiveLogRows(partitionPath string) (int64, error) { - filerClient, err := e.catalog.brokerClient.GetFilerClient() - if err != nil { - return 0, err - } - - totalRows := int64(0) - err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { - if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { - return nil // Skip directories and parquet files - } - - // Count rows in live log file - rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry) - if err != nil { - fmt.Printf("Warning: failed to count rows in %s/%s: %v\n", partitionPath, entry.Name, err) - return nil // Continue with other files - } - totalRows += rowCount - return nil - }) - return totalRows, err -} - // extractParquetSourceFiles extracts source log file names from parquet file metadata for deduplication func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map[string]bool { sourceFiles := make(map[string]bool) @@ -4226,6 +4346,7 @@ func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map // countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet and duplicate log buffer data func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partitionPath string, parquetSourceFiles map[string]bool) (int64, error) { + debugEnabled := ctx != nil && isDebugMode(ctx) filerClient, err := e.catalog.brokerClient.GetFilerClient() if err != nil { return 0, err @@ -4242,14 +4363,14 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, // Second, get duplicate files from log buffer metadata logBufferDuplicates, err := e.buildLogBufferDeduplicationMap(ctx, partitionPath) if err != nil { - if isDebugMode(ctx) { + if debugEnabled { fmt.Printf("Warning: failed to build log buffer deduplication map: %v\n", err) } logBufferDuplicates = make(map[string]bool) } // Debug: Show deduplication status (only in explain mode) - if isDebugMode(ctx) { + if debugEnabled { if len(actualSourceFiles) > 0 { fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath) } @@ -4266,7 +4387,7 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, // Skip files that have been converted to parquet if actualSourceFiles[entry.Name] { - if isDebugMode(ctx) { + if debugEnabled { fmt.Printf("Skipping %s (already converted to parquet)\n", entry.Name) } return nil @@ -4274,7 +4395,7 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, // Skip files that are duplicated due to log buffer metadata if logBufferDuplicates[entry.Name] { - if isDebugMode(ctx) { + if debugEnabled { fmt.Printf("Skipping %s (duplicate log buffer data)\n", entry.Name) } return nil @@ -4345,6 +4466,7 @@ func (e *SQLEngine) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBuffer // buildLogBufferDeduplicationMap creates a map to track duplicate files based on buffer ranges (ultra-efficient) func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitionPath string) (map[string]bool, error) { + debugEnabled := ctx != nil && isDebugMode(ctx) if e.catalog.brokerClient == nil { return make(map[string]bool), nil } @@ -4390,7 +4512,7 @@ func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitio if fileRange.start <= processedRange.end && fileRange.end >= processedRange.start { // Ranges overlap - this file contains duplicate buffer indexes isDuplicate = true - if isDebugMode(ctx) { + if debugEnabled { fmt.Printf("Marking %s as duplicate (buffer range [%d-%d] overlaps with [%d-%d])\n", entry.Name, fileRange.start, fileRange.end, processedRange.start, processedRange.end) } |
