aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/engine.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-09-10 11:04:42 -0700
committerGitHub <noreply@github.com>2025-09-10 11:04:42 -0700
commit58e0c1b3301f5c5c0723f71726b6f1053e9152f9 (patch)
tree2ee40273cb74587227052f908a0eb38c14c2cb56 /weed/query/engine/engine.go
parent8ed1b104cea39856a51b67d31c6782031c89f642 (diff)
downloadseaweedfs-58e0c1b3301f5c5c0723f71726b6f1053e9152f9.tar.xz
seaweedfs-58e0c1b3301f5c5c0723f71726b6f1053e9152f9.zip
Fix sql bugs (#7219)
* fix nil when explaining * add plain details when running full scan * skip files by timestamp * skip file by timestamp * refactor * handle filter by time * skip broker memory only if it has unflushed messages * refactoring * refactor * address comments * address comments * filter by parquet stats * simplify * refactor * prune old code * optimize * Update aggregations.go * ensure non-time predicates are properly detected * add stmt to populatePlanFileDetails This helper function is a great way to centralize logic for populating file details. However, it's missing an optimization that is present in executeSelectStatementWithBrokerStats: pruning Parquet files based on column statistics from the WHERE clause. Aggregation queries that fall back to the slow path could benefit from this optimization. Consider modifying the function signature to accept the *SelectStatement and adding the column statistics pruning logic here, similar to how it's done in executeSelectStatementWithBrokerStats. * refactoring to work with *schema_pb.Value directly after the initial conversion
Diffstat (limited to 'weed/query/engine/engine.go')
-rw-r--r--weed/query/engine/engine.go656
1 files changed, 389 insertions, 267 deletions
diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go
index 84c238583..ffed03f35 100644
--- a/weed/query/engine/engine.go
+++ b/weed/query/engine/engine.go
@@ -495,69 +495,6 @@ func ParseSQL(sql string) (Statement, error) {
}
}
-// extractFunctionArguments extracts the arguments from a function call expression using CockroachDB parser
-func extractFunctionArguments(expr string) ([]SelectExpr, error) {
- // Find the parentheses
- startParen := strings.Index(expr, "(")
- endParen := strings.LastIndex(expr, ")")
-
- if startParen == -1 || endParen == -1 || endParen <= startParen {
- return nil, fmt.Errorf("invalid function syntax")
- }
-
- // Extract arguments string
- argsStr := strings.TrimSpace(expr[startParen+1 : endParen])
-
- // Handle empty arguments
- if argsStr == "" {
- return []SelectExpr{}, nil
- }
-
- // Handle single * argument (for COUNT(*))
- if argsStr == "*" {
- return []SelectExpr{&StarExpr{}}, nil
- }
-
- // Parse multiple arguments separated by commas
- args := []SelectExpr{}
- argParts := strings.Split(argsStr, ",")
-
- // Use CockroachDB parser to parse each argument as a SELECT expression
- cockroachParser := NewCockroachSQLParser()
-
- for _, argPart := range argParts {
- argPart = strings.TrimSpace(argPart)
- if argPart == "*" {
- args = append(args, &StarExpr{})
- } else {
- // Create a dummy SELECT statement to parse the argument expression
- dummySelect := fmt.Sprintf("SELECT %s", argPart)
-
- // Parse using CockroachDB parser
- stmt, err := cockroachParser.ParseSQL(dummySelect)
- if err != nil {
- // If CockroachDB parser fails, fall back to simple column name
- args = append(args, &AliasedExpr{
- Expr: &ColName{Name: stringValue(argPart)},
- })
- continue
- }
-
- // Extract the expression from the parsed SELECT statement
- if selectStmt, ok := stmt.(*SelectStatement); ok && len(selectStmt.SelectExprs) > 0 {
- args = append(args, selectStmt.SelectExprs[0])
- } else {
- // Fallback to column name if parsing fails
- args = append(args, &AliasedExpr{
- Expr: &ColName{Name: stringValue(argPart)},
- })
- }
- }
- }
-
- return args, nil
-}
-
// debugModeKey is used to store debug mode flag in context
type debugModeKey struct{}
@@ -1221,8 +1158,8 @@ func (e *SQLEngine) buildExecutionTree(plan *QueryExecutionPlan, stmt *SelectSta
}
}
- // Create broker buffer node if queried
- if plan.BrokerBufferQueried {
+ // Create broker buffer node only if queried AND has unflushed messages
+ if plan.BrokerBufferQueried && plan.BrokerBufferMessages > 0 {
brokerBufferNodes = append(brokerBufferNodes, &FileSourceNode{
FilePath: "broker_memory_buffer",
SourceType: "broker_buffer",
@@ -1489,6 +1426,8 @@ func (e *SQLEngine) formatOptimization(opt string) string {
return "Duplicate Data Avoidance"
case "predicate_pushdown":
return "WHERE Clause Pushdown"
+ case "column_statistics_pruning":
+ return "Column Statistics File Pruning"
case "column_projection":
return "Column Selection"
case "limit_pushdown":
@@ -1540,6 +1479,10 @@ func (e *SQLEngine) executeDDLStatement(ctx context.Context, stmt *DDLStatement)
// executeSelectStatementWithPlan handles SELECT queries with execution plan tracking
func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) {
+ // Initialize plan details once
+ if plan != nil && plan.Details == nil {
+ plan.Details = make(map[string]interface{})
+ }
// Parse aggregations to populate plan
var aggregations []AggregationSpec
hasAggregations := false
@@ -1577,7 +1520,7 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *Se
if table, ok := stmt.From[0].(*AliasedTableExpr); ok {
if tableExpr, ok := table.Expr.(TableName); ok {
tableName = tableExpr.Name.String()
- if tableExpr.Qualifier.String() != "" {
+ if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" {
database = tableExpr.Qualifier.String()
}
}
@@ -2290,18 +2233,51 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s
if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil {
// Add partition paths to execution plan details
plan.Details["partition_paths"] = partitions
+ // Persist time filter details for downstream pruning/diagnostics
+ plan.Details[PlanDetailStartTimeNs] = startTimeNs
+ plan.Details[PlanDetailStopTimeNs] = stopTimeNs
+
+ if isDebugMode(ctx) {
+ fmt.Printf("Debug: Time filters extracted - startTimeNs=%d stopTimeNs=%d\n", startTimeNs, stopTimeNs)
+ }
// Collect actual file information for each partition
var parquetFiles []string
var liveLogFiles []string
parquetSources := make(map[string]bool)
+ var parquetReadErrors []string
+ var liveLogListErrors []string
for _, partitionPath := range partitions {
// Get parquet files for this partition
if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil {
- for _, stats := range parquetStats {
+ // Prune files by time range with debug logging
+ filteredStats := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs)
+
+ // Further prune by column statistics from WHERE clause
+ if stmt.Where != nil {
+ beforeColumnPrune := len(filteredStats)
+ filteredStats = e.pruneParquetFilesByColumnStats(ctx, filteredStats, stmt.Where.Expr)
+ columnPrunedCount := beforeColumnPrune - len(filteredStats)
+
+ if columnPrunedCount > 0 {
+ if isDebugMode(ctx) {
+ fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath)
+ }
+ // Track column statistics optimization
+ if !contains(plan.OptimizationsUsed, "column_statistics_pruning") {
+ plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning")
+ }
+ }
+ }
+ for _, stats := range filteredStats {
parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName))
}
+ } else {
+ parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err))
+ if isDebugMode(ctx) {
+ fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err)
+ }
}
// Merge accurate parquet sources from metadata
@@ -2320,6 +2296,11 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s
}
liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName))
}
+ } else {
+ liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err))
+ if isDebugMode(ctx) {
+ fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err)
+ }
}
}
@@ -2329,11 +2310,20 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s
if len(liveLogFiles) > 0 {
plan.Details["live_log_files"] = liveLogFiles
}
+ if len(parquetReadErrors) > 0 {
+ plan.Details["error_parquet_statistics"] = parquetReadErrors
+ }
+ if len(liveLogListErrors) > 0 {
+ plan.Details["error_live_log_listing"] = liveLogListErrors
+ }
// Update scan statistics for execution plan display
plan.PartitionsScanned = len(partitions)
plan.ParquetFilesScanned = len(parquetFiles)
plan.LiveLogFilesScanned = len(liveLogFiles)
+ } else {
+ // Handle partition discovery error
+ plan.Details["error_partition_discovery"] = discoverErr.Error()
}
} else {
// Normal mode - just get results
@@ -2377,6 +2367,23 @@ func (e *SQLEngine) extractTimeFilters(expr ExprNode) (int64, int64) {
return startTimeNs, stopTimeNs
}
+// extractTimeFiltersWithValidation extracts time filters and validates that WHERE clause contains only time-based predicates
+// Returns (startTimeNs, stopTimeNs, onlyTimePredicates) where onlyTimePredicates indicates if fast path is safe
+func (e *SQLEngine) extractTimeFiltersWithValidation(expr ExprNode) (int64, int64, bool) {
+ startTimeNs, stopTimeNs := int64(0), int64(0)
+ onlyTimePredicates := true
+
+ // Recursively extract time filters and validate predicates
+ e.extractTimeFiltersWithValidationRecursive(expr, &startTimeNs, &stopTimeNs, &onlyTimePredicates)
+
+ // Special case: if startTimeNs == stopTimeNs, treat it like an equality query
+ if startTimeNs != 0 && startTimeNs == stopTimeNs {
+ stopTimeNs = 0
+ }
+
+ return startTimeNs, stopTimeNs, onlyTimePredicates
+}
+
// extractTimeFiltersRecursive recursively processes WHERE expressions to find time comparisons
func (e *SQLEngine) extractTimeFiltersRecursive(expr ExprNode, startTimeNs, stopTimeNs *int64) {
switch exprType := expr.(type) {
@@ -2396,6 +2403,39 @@ func (e *SQLEngine) extractTimeFiltersRecursive(expr ExprNode, startTimeNs, stop
}
}
+// extractTimeFiltersWithValidationRecursive recursively processes WHERE expressions to find time comparisons and validate predicates
+func (e *SQLEngine) extractTimeFiltersWithValidationRecursive(expr ExprNode, startTimeNs, stopTimeNs *int64, onlyTimePredicates *bool) {
+ switch exprType := expr.(type) {
+ case *ComparisonExpr:
+ // Check if this is a time-based comparison
+ leftCol := e.getColumnName(exprType.Left)
+ rightCol := e.getColumnName(exprType.Right)
+
+ isTimeComparison := e.isTimestampColumn(leftCol) || e.isTimestampColumn(rightCol)
+ if isTimeComparison {
+ // Extract time filter from this comparison
+ e.extractTimeFromComparison(exprType, startTimeNs, stopTimeNs)
+ } else {
+ // Non-time predicate found - fast path is not safe
+ *onlyTimePredicates = false
+ }
+ case *AndExpr:
+ // For AND expressions, both sides must be time-only for fast path to be safe
+ e.extractTimeFiltersWithValidationRecursive(exprType.Left, startTimeNs, stopTimeNs, onlyTimePredicates)
+ e.extractTimeFiltersWithValidationRecursive(exprType.Right, startTimeNs, stopTimeNs, onlyTimePredicates)
+ case *OrExpr:
+ // OR expressions are complex and not supported in fast path
+ *onlyTimePredicates = false
+ return
+ case *ParenExpr:
+ // Unwrap parentheses and continue
+ e.extractTimeFiltersWithValidationRecursive(exprType.Expr, startTimeNs, stopTimeNs, onlyTimePredicates)
+ default:
+ // Unknown expression type - not safe for fast path
+ *onlyTimePredicates = false
+ }
+}
+
// extractTimeFromComparison extracts time bounds from comparison expressions
// Handles comparisons against timestamp columns (system columns and schema-defined timestamp types)
func (e *SQLEngine) extractTimeFromComparison(comp *ComparisonExpr, startTimeNs, stopTimeNs *int64) {
@@ -2465,7 +2505,7 @@ func (e *SQLEngine) isTimestampColumn(columnName string) bool {
}
// System timestamp columns are always time columns
- if columnName == SW_COLUMN_NAME_TIMESTAMP {
+ if columnName == SW_COLUMN_NAME_TIMESTAMP || columnName == SW_DISPLAY_NAME_TIMESTAMP {
return true
}
@@ -2495,6 +2535,280 @@ func (e *SQLEngine) isTimestampColumn(columnName string) bool {
return false
}
+// getTimeFiltersFromPlan extracts time filter values from execution plan details
+func getTimeFiltersFromPlan(plan *QueryExecutionPlan) (startTimeNs, stopTimeNs int64) {
+ if plan == nil || plan.Details == nil {
+ return 0, 0
+ }
+ if startNsVal, ok := plan.Details[PlanDetailStartTimeNs]; ok {
+ if startNs, ok2 := startNsVal.(int64); ok2 {
+ startTimeNs = startNs
+ }
+ }
+ if stopNsVal, ok := plan.Details[PlanDetailStopTimeNs]; ok {
+ if stopNs, ok2 := stopNsVal.(int64); ok2 {
+ stopTimeNs = stopNs
+ }
+ }
+ return
+}
+
+// pruneParquetFilesByTime filters parquet files based on timestamp ranges, with optional debug logging
+func pruneParquetFilesByTime(ctx context.Context, parquetStats []*ParquetFileStats, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) []*ParquetFileStats {
+ if startTimeNs == 0 && stopTimeNs == 0 {
+ return parquetStats
+ }
+
+ debugEnabled := ctx != nil && isDebugMode(ctx)
+ qStart := startTimeNs
+ qStop := stopTimeNs
+ if qStop == 0 {
+ qStop = math.MaxInt64
+ }
+
+ n := 0
+ for _, fs := range parquetStats {
+ if debugEnabled {
+ fmt.Printf("Debug: Checking parquet file %s for pruning\n", fs.FileName)
+ }
+ if minNs, maxNs, ok := hybridScanner.getTimestampRangeFromStats(fs); ok {
+ if debugEnabled {
+ fmt.Printf("Debug: Prune check parquet %s min=%d max=%d qStart=%d qStop=%d\n", fs.FileName, minNs, maxNs, qStart, qStop)
+ }
+ if qStop < minNs || (qStart != 0 && qStart > maxNs) {
+ if debugEnabled {
+ fmt.Printf("Debug: Skipping parquet file %s due to no time overlap\n", fs.FileName)
+ }
+ continue
+ }
+ } else if debugEnabled {
+ fmt.Printf("Debug: No stats range available for parquet %s, cannot prune\n", fs.FileName)
+ }
+ parquetStats[n] = fs
+ n++
+ }
+ return parquetStats[:n]
+}
+
+// pruneParquetFilesByColumnStats filters parquet files based on column statistics and WHERE predicates
+func (e *SQLEngine) pruneParquetFilesByColumnStats(ctx context.Context, parquetStats []*ParquetFileStats, whereExpr ExprNode) []*ParquetFileStats {
+ if whereExpr == nil {
+ return parquetStats
+ }
+
+ debugEnabled := ctx != nil && isDebugMode(ctx)
+ n := 0
+ for _, fs := range parquetStats {
+ if e.canSkipParquetFile(ctx, fs, whereExpr) {
+ if debugEnabled {
+ fmt.Printf("Debug: Skipping parquet file %s due to column statistics pruning\n", fs.FileName)
+ }
+ continue
+ }
+ parquetStats[n] = fs
+ n++
+ }
+ return parquetStats[:n]
+}
+
+// canSkipParquetFile determines if a parquet file can be skipped based on column statistics
+func (e *SQLEngine) canSkipParquetFile(ctx context.Context, fileStats *ParquetFileStats, whereExpr ExprNode) bool {
+ switch expr := whereExpr.(type) {
+ case *ComparisonExpr:
+ return e.canSkipFileByComparison(ctx, fileStats, expr)
+ case *AndExpr:
+ // For AND: skip if ANY condition allows skipping (more aggressive pruning)
+ return e.canSkipParquetFile(ctx, fileStats, expr.Left) || e.canSkipParquetFile(ctx, fileStats, expr.Right)
+ case *OrExpr:
+ // For OR: skip only if ALL conditions allow skipping (conservative)
+ return e.canSkipParquetFile(ctx, fileStats, expr.Left) && e.canSkipParquetFile(ctx, fileStats, expr.Right)
+ default:
+ // Unknown expression type - don't skip
+ return false
+ }
+}
+
+// canSkipFileByComparison checks if a file can be skipped based on a comparison predicate
+func (e *SQLEngine) canSkipFileByComparison(ctx context.Context, fileStats *ParquetFileStats, expr *ComparisonExpr) bool {
+ // Extract column name and comparison value
+ var columnName string
+ var compareSchemaValue *schema_pb.Value
+ var operator string = expr.Operator
+
+ // Determine which side is the column and which is the value
+ if colRef, ok := expr.Left.(*ColName); ok {
+ columnName = colRef.Name.String()
+ if sqlVal, ok := expr.Right.(*SQLVal); ok {
+ compareSchemaValue = e.convertSQLValToSchemaValue(sqlVal)
+ } else {
+ return false // Can't optimize complex expressions
+ }
+ } else if colRef, ok := expr.Right.(*ColName); ok {
+ columnName = colRef.Name.String()
+ if sqlVal, ok := expr.Left.(*SQLVal); ok {
+ compareSchemaValue = e.convertSQLValToSchemaValue(sqlVal)
+ // Flip operator for reversed comparison
+ operator = e.flipOperator(operator)
+ } else {
+ return false
+ }
+ } else {
+ return false // No column reference found
+ }
+
+ // Validate comparison value
+ if compareSchemaValue == nil {
+ return false
+ }
+
+ // Get column statistics
+ colStats, exists := fileStats.ColumnStats[columnName]
+ if !exists || colStats == nil {
+ // Try case-insensitive lookup
+ for colName, stats := range fileStats.ColumnStats {
+ if strings.EqualFold(colName, columnName) {
+ colStats = stats
+ exists = true
+ break
+ }
+ }
+ }
+
+ if !exists || colStats == nil || colStats.MinValue == nil || colStats.MaxValue == nil {
+ return false // No statistics available
+ }
+
+ // Apply pruning logic based on operator
+ switch operator {
+ case ">":
+ // Skip if max(column) <= compareValue
+ return e.compareValues(colStats.MaxValue, compareSchemaValue) <= 0
+ case ">=":
+ // Skip if max(column) < compareValue
+ return e.compareValues(colStats.MaxValue, compareSchemaValue) < 0
+ case "<":
+ // Skip if min(column) >= compareValue
+ return e.compareValues(colStats.MinValue, compareSchemaValue) >= 0
+ case "<=":
+ // Skip if min(column) > compareValue
+ return e.compareValues(colStats.MinValue, compareSchemaValue) > 0
+ case "=":
+ // Skip if compareValue is outside [min, max] range
+ return e.compareValues(compareSchemaValue, colStats.MinValue) < 0 ||
+ e.compareValues(compareSchemaValue, colStats.MaxValue) > 0
+ case "!=", "<>":
+ // Skip if min == max == compareValue (all values are the same and equal to compareValue)
+ return e.compareValues(colStats.MinValue, colStats.MaxValue) == 0 &&
+ e.compareValues(colStats.MinValue, compareSchemaValue) == 0
+ default:
+ return false // Unknown operator
+ }
+}
+
+// flipOperator flips comparison operators when operands are swapped
+func (e *SQLEngine) flipOperator(op string) string {
+ switch op {
+ case ">":
+ return "<"
+ case ">=":
+ return "<="
+ case "<":
+ return ">"
+ case "<=":
+ return ">="
+ case "=", "!=", "<>":
+ return op // These are symmetric
+ default:
+ return op
+ }
+}
+
+// populatePlanFileDetails populates execution plan with detailed file information for partitions
+// Includes column statistics pruning optimization when WHERE clause is provided
+func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, partitions []string, stmt *SelectStatement) {
+ debugEnabled := ctx != nil && isDebugMode(ctx)
+ // Collect actual file information for each partition
+ var parquetFiles []string
+ var liveLogFiles []string
+ parquetSources := make(map[string]bool)
+ var parquetReadErrors []string
+ var liveLogListErrors []string
+
+ // Extract time filters from plan details
+ startTimeNs, stopTimeNs := getTimeFiltersFromPlan(plan)
+
+ for _, partitionPath := range partitions {
+ // Get parquet files for this partition
+ if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil {
+ // Prune files by time range
+ filteredStats := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs)
+
+ // Further prune by column statistics from WHERE clause
+ if stmt != nil && stmt.Where != nil {
+ beforeColumnPrune := len(filteredStats)
+ filteredStats = e.pruneParquetFilesByColumnStats(ctx, filteredStats, stmt.Where.Expr)
+ columnPrunedCount := beforeColumnPrune - len(filteredStats)
+
+ if columnPrunedCount > 0 {
+ if debugEnabled {
+ fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath)
+ }
+ // Track column statistics optimization
+ if !contains(plan.OptimizationsUsed, "column_statistics_pruning") {
+ plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning")
+ }
+ }
+ }
+
+ for _, stats := range filteredStats {
+ parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName))
+ }
+ } else {
+ parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err))
+ if debugEnabled {
+ fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err)
+ }
+ }
+
+ // Merge accurate parquet sources from metadata
+ if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil {
+ for src := range sources {
+ parquetSources[src] = true
+ }
+ }
+
+ // Get live log files for this partition
+ if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil {
+ for _, fileName := range liveFiles {
+ // Exclude live log files that have been converted to parquet (deduplicated)
+ if parquetSources[fileName] {
+ continue
+ }
+ liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName))
+ }
+ } else {
+ liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err))
+ if debugEnabled {
+ fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err)
+ }
+ }
+ }
+
+ // Add file lists to plan details
+ if len(parquetFiles) > 0 {
+ plan.Details["parquet_files"] = parquetFiles
+ }
+ if len(liveLogFiles) > 0 {
+ plan.Details["live_log_files"] = liveLogFiles
+ }
+ if len(parquetReadErrors) > 0 {
+ plan.Details["error_parquet_statistics"] = parquetReadErrors
+ }
+ if len(liveLogListErrors) > 0 {
+ plan.Details["error_live_log_listing"] = liveLogListErrors
+ }
+}
+
// isSQLTypeTimestamp checks if a SQL type string represents a timestamp type
func (e *SQLEngine) isSQLTypeTimestamp(sqlType string) bool {
upperType := strings.ToUpper(strings.TrimSpace(sqlType))
@@ -2664,56 +2978,6 @@ func (e *SQLEngine) buildPredicateWithContext(expr ExprNode, selectExprs []Selec
}
}
-// buildComparisonPredicateWithAliases creates a predicate for comparison operations with alias support
-func (e *SQLEngine) buildComparisonPredicateWithAliases(expr *ComparisonExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) {
- var columnName string
- var compareValue interface{}
- var operator string
-
- // Extract the comparison details, resolving aliases if needed
- leftCol := e.getColumnNameWithAliases(expr.Left, aliases)
- rightCol := e.getColumnNameWithAliases(expr.Right, aliases)
- operator = e.normalizeOperator(expr.Operator)
-
- if leftCol != "" && rightCol == "" {
- // Left side is column, right side is value
- columnName = e.getSystemColumnInternalName(leftCol)
- val, err := e.extractValueFromExpr(expr.Right)
- if err != nil {
- return nil, err
- }
- compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Right)
- } else if rightCol != "" && leftCol == "" {
- // Right side is column, left side is value
- columnName = e.getSystemColumnInternalName(rightCol)
- val, err := e.extractValueFromExpr(expr.Left)
- if err != nil {
- return nil, err
- }
- compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Left)
- // Reverse the operator when column is on the right
- operator = e.reverseOperator(operator)
- } else if leftCol != "" && rightCol != "" {
- return nil, fmt.Errorf("column-to-column comparisons not yet supported")
- } else {
- return nil, fmt.Errorf("at least one side of comparison must be a column")
- }
-
- return func(record *schema_pb.RecordValue) bool {
- fieldValue, exists := record.Fields[columnName]
- if !exists {
- return false
- }
- return e.evaluateComparison(fieldValue, operator, compareValue)
- }, nil
-}
-
-// buildComparisonPredicate creates a predicate for comparison operations (=, <, >, etc.)
-// Handles column names on both left and right sides of the comparison
-func (e *SQLEngine) buildComparisonPredicate(expr *ComparisonExpr) (func(*schema_pb.RecordValue) bool, error) {
- return e.buildComparisonPredicateWithContext(expr, nil)
-}
-
// buildComparisonPredicateWithContext creates a predicate for comparison operations with alias support
func (e *SQLEngine) buildComparisonPredicateWithContext(expr *ComparisonExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) {
var columnName string
@@ -2836,54 +3100,6 @@ func (e *SQLEngine) buildBetweenPredicateWithContext(expr *BetweenExpr, selectEx
}, nil
}
-// buildBetweenPredicateWithAliases creates a predicate for BETWEEN operations with alias support
-func (e *SQLEngine) buildBetweenPredicateWithAliases(expr *BetweenExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) {
- var columnName string
- var fromValue, toValue interface{}
-
- // Extract column name from left side with alias resolution
- leftCol := e.getColumnNameWithAliases(expr.Left, aliases)
- if leftCol == "" {
- return nil, fmt.Errorf("BETWEEN left operand must be a column name, got: %T", expr.Left)
- }
- columnName = e.getSystemColumnInternalName(leftCol)
-
- // Extract FROM value
- fromVal, err := e.extractValueFromExpr(expr.From)
- if err != nil {
- return nil, fmt.Errorf("failed to extract BETWEEN from value: %v", err)
- }
- fromValue = e.convertValueForTimestampColumn(columnName, fromVal, expr.From)
-
- // Extract TO value
- toVal, err := e.extractValueFromExpr(expr.To)
- if err != nil {
- return nil, fmt.Errorf("failed to extract BETWEEN to value: %v", err)
- }
- toValue = e.convertValueForTimestampColumn(columnName, toVal, expr.To)
-
- // Return the predicate function
- return func(record *schema_pb.RecordValue) bool {
- fieldValue, exists := record.Fields[columnName]
- if !exists {
- return false
- }
-
- // Evaluate: fieldValue >= fromValue AND fieldValue <= toValue
- greaterThanOrEqualFrom := e.evaluateComparison(fieldValue, ">=", fromValue)
- lessThanOrEqualTo := e.evaluateComparison(fieldValue, "<=", toValue)
-
- result := greaterThanOrEqualFrom && lessThanOrEqualTo
-
- // Handle NOT BETWEEN
- if expr.Not {
- result = !result
- }
-
- return result
- }, nil
-}
-
// buildIsNullPredicateWithContext creates a predicate for IS NULL operations
func (e *SQLEngine) buildIsNullPredicateWithContext(expr *IsNullExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) {
// Check if the expression is a column name
@@ -2936,50 +3152,6 @@ func (e *SQLEngine) buildIsNotNullPredicateWithContext(expr *IsNotNullExpr, sele
}
}
-// buildIsNullPredicateWithAliases creates a predicate for IS NULL operations with alias support
-func (e *SQLEngine) buildIsNullPredicateWithAliases(expr *IsNullExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) {
- // Extract column name from expression with alias resolution
- columnName := e.getColumnNameWithAliases(expr.Expr, aliases)
- if columnName == "" {
- return nil, fmt.Errorf("IS NULL operand must be a column name, got: %T", expr.Expr)
- }
- columnName = e.getSystemColumnInternalName(columnName)
-
- // Return the predicate function
- return func(record *schema_pb.RecordValue) bool {
- // Check if field exists and if it's null or missing
- fieldValue, exists := record.Fields[columnName]
- if !exists {
- return true // Field doesn't exist = NULL
- }
-
- // Check if the field value itself is null/empty
- return e.isValueNull(fieldValue)
- }, nil
-}
-
-// buildIsNotNullPredicateWithAliases creates a predicate for IS NOT NULL operations with alias support
-func (e *SQLEngine) buildIsNotNullPredicateWithAliases(expr *IsNotNullExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) {
- // Extract column name from expression with alias resolution
- columnName := e.getColumnNameWithAliases(expr.Expr, aliases)
- if columnName == "" {
- return nil, fmt.Errorf("IS NOT NULL operand must be a column name, got: %T", expr.Expr)
- }
- columnName = e.getSystemColumnInternalName(columnName)
-
- // Return the predicate function
- return func(record *schema_pb.RecordValue) bool {
- // Check if field exists and if it's not null
- fieldValue, exists := record.Fields[columnName]
- if !exists {
- return false // Field doesn't exist = NULL, so NOT NULL is false
- }
-
- // Check if the field value itself is not null/empty
- return !e.isValueNull(fieldValue)
- }, nil
-}
-
// isValueNull checks if a schema_pb.Value is null or represents a null value
func (e *SQLEngine) isValueNull(value *schema_pb.Value) bool {
if value == nil {
@@ -3019,33 +3191,6 @@ func (e *SQLEngine) isValueNull(value *schema_pb.Value) bool {
}
}
-// getColumnNameWithAliases extracts column name from expression, resolving aliases if needed
-func (e *SQLEngine) getColumnNameWithAliases(expr ExprNode, aliases map[string]ExprNode) string {
- switch exprType := expr.(type) {
- case *ColName:
- colName := exprType.Name.String()
- // Check if this is an alias that should be resolved
- if aliases != nil {
- if actualExpr, exists := aliases[colName]; exists {
- // Recursively resolve the aliased expression
- return e.getColumnNameWithAliases(actualExpr, nil) // Don't recurse aliases
- }
- }
- return colName
- }
- return ""
-}
-
-// extractValueFromExpr extracts a value from an expression node (for alias support)
-func (e *SQLEngine) extractValueFromExpr(expr ExprNode) (interface{}, error) {
- return e.extractComparisonValue(expr)
-}
-
-// normalizeOperator normalizes comparison operators
-func (e *SQLEngine) normalizeOperator(op string) string {
- return op // For now, just return as-is
-}
-
// extractComparisonValue extracts the comparison value from a SQL expression
func (e *SQLEngine) extractComparisonValue(expr ExprNode) (interface{}, error) {
switch val := expr.(type) {
@@ -4178,31 +4323,6 @@ func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 {
return t.UnixNano()
}
-// countLiveLogRows counts the total number of rows in live log files (non-parquet files) in a partition
-func (e *SQLEngine) countLiveLogRows(partitionPath string) (int64, error) {
- filerClient, err := e.catalog.brokerClient.GetFilerClient()
- if err != nil {
- return 0, err
- }
-
- totalRows := int64(0)
- err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
- if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") {
- return nil // Skip directories and parquet files
- }
-
- // Count rows in live log file
- rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry)
- if err != nil {
- fmt.Printf("Warning: failed to count rows in %s/%s: %v\n", partitionPath, entry.Name, err)
- return nil // Continue with other files
- }
- totalRows += rowCount
- return nil
- })
- return totalRows, err
-}
-
// extractParquetSourceFiles extracts source log file names from parquet file metadata for deduplication
func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map[string]bool {
sourceFiles := make(map[string]bool)
@@ -4226,6 +4346,7 @@ func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map
// countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet and duplicate log buffer data
func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partitionPath string, parquetSourceFiles map[string]bool) (int64, error) {
+ debugEnabled := ctx != nil && isDebugMode(ctx)
filerClient, err := e.catalog.brokerClient.GetFilerClient()
if err != nil {
return 0, err
@@ -4242,14 +4363,14 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context,
// Second, get duplicate files from log buffer metadata
logBufferDuplicates, err := e.buildLogBufferDeduplicationMap(ctx, partitionPath)
if err != nil {
- if isDebugMode(ctx) {
+ if debugEnabled {
fmt.Printf("Warning: failed to build log buffer deduplication map: %v\n", err)
}
logBufferDuplicates = make(map[string]bool)
}
// Debug: Show deduplication status (only in explain mode)
- if isDebugMode(ctx) {
+ if debugEnabled {
if len(actualSourceFiles) > 0 {
fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath)
}
@@ -4266,7 +4387,7 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context,
// Skip files that have been converted to parquet
if actualSourceFiles[entry.Name] {
- if isDebugMode(ctx) {
+ if debugEnabled {
fmt.Printf("Skipping %s (already converted to parquet)\n", entry.Name)
}
return nil
@@ -4274,7 +4395,7 @@ func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context,
// Skip files that are duplicated due to log buffer metadata
if logBufferDuplicates[entry.Name] {
- if isDebugMode(ctx) {
+ if debugEnabled {
fmt.Printf("Skipping %s (duplicate log buffer data)\n", entry.Name)
}
return nil
@@ -4345,6 +4466,7 @@ func (e *SQLEngine) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBuffer
// buildLogBufferDeduplicationMap creates a map to track duplicate files based on buffer ranges (ultra-efficient)
func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitionPath string) (map[string]bool, error) {
+ debugEnabled := ctx != nil && isDebugMode(ctx)
if e.catalog.brokerClient == nil {
return make(map[string]bool), nil
}
@@ -4390,7 +4512,7 @@ func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitio
if fileRange.start <= processedRange.end && fileRange.end >= processedRange.start {
// Ranges overlap - this file contains duplicate buffer indexes
isDuplicate = true
- if isDebugMode(ctx) {
+ if debugEnabled {
fmt.Printf("Marking %s as duplicate (buffer range [%d-%d] overlaps with [%d-%d])\n",
entry.Name, fileRange.start, fileRange.end, processedRange.start, processedRange.end)
}