diff options
Diffstat (limited to 'weed/query/engine/engine.go')
| -rw-r--r-- | weed/query/engine/engine.go | 5696 |
1 files changed, 5696 insertions, 0 deletions
diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go new file mode 100644 index 000000000..84c238583 --- /dev/null +++ b/weed/query/engine/engine.go @@ -0,0 +1,5696 @@ +package engine + +import ( + "context" + "encoding/binary" + "encoding/json" + "fmt" + "io" + "math" + "math/big" + "regexp" + "strconv" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" + "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "google.golang.org/protobuf/proto" +) + +// SQL Function Name Constants +const ( + // Aggregation Functions + FuncCOUNT = "COUNT" + FuncSUM = "SUM" + FuncAVG = "AVG" + FuncMIN = "MIN" + FuncMAX = "MAX" + + // String Functions + FuncUPPER = "UPPER" + FuncLOWER = "LOWER" + FuncLENGTH = "LENGTH" + FuncTRIM = "TRIM" + FuncBTRIM = "BTRIM" // CockroachDB's internal name for TRIM + FuncLTRIM = "LTRIM" + FuncRTRIM = "RTRIM" + FuncSUBSTRING = "SUBSTRING" + FuncLEFT = "LEFT" + FuncRIGHT = "RIGHT" + FuncCONCAT = "CONCAT" + + // DateTime Functions + FuncCURRENT_DATE = "CURRENT_DATE" + FuncCURRENT_TIME = "CURRENT_TIME" + FuncCURRENT_TIMESTAMP = "CURRENT_TIMESTAMP" + FuncNOW = "NOW" + FuncEXTRACT = "EXTRACT" + FuncDATE_TRUNC = "DATE_TRUNC" + + // PostgreSQL uses EXTRACT(part FROM date) instead of convenience functions like YEAR(), MONTH(), etc. +) + +// PostgreSQL-compatible SQL AST types +type Statement interface { + isStatement() +} + +type ShowStatement struct { + Type string // "databases", "tables", "columns" + Table string // for SHOW COLUMNS FROM table + Schema string // for database context + OnTable NameRef // for compatibility with existing code that checks OnTable +} + +func (s *ShowStatement) isStatement() {} + +type UseStatement struct { + Database string // database name to switch to +} + +func (u *UseStatement) isStatement() {} + +type DDLStatement struct { + Action string // "create", "alter", "drop" + NewName NameRef + TableSpec *TableSpec +} + +type NameRef struct { + Name StringGetter + Qualifier StringGetter +} + +type StringGetter interface { + String() string +} + +type stringValue string + +func (s stringValue) String() string { return string(s) } + +type TableSpec struct { + Columns []ColumnDef +} + +type ColumnDef struct { + Name StringGetter + Type TypeRef +} + +type TypeRef struct { + Type string +} + +func (d *DDLStatement) isStatement() {} + +type SelectStatement struct { + SelectExprs []SelectExpr + From []TableExpr + Where *WhereClause + Limit *LimitClause + WindowFunctions []*WindowFunction +} + +type WhereClause struct { + Expr ExprNode +} + +type LimitClause struct { + Rowcount ExprNode + Offset ExprNode +} + +func (s *SelectStatement) isStatement() {} + +// Window function types for time-series analytics +type WindowSpec struct { + PartitionBy []ExprNode + OrderBy []*OrderByClause +} + +type WindowFunction struct { + Function string // ROW_NUMBER, RANK, LAG, LEAD + Args []ExprNode // Function arguments + Over *WindowSpec + Alias string // Column alias for the result +} + +type OrderByClause struct { + Column string + Order string // ASC or DESC +} + +type SelectExpr interface { + isSelectExpr() +} + +type StarExpr struct{} + +func (s *StarExpr) isSelectExpr() {} + +type AliasedExpr struct { + Expr ExprNode + As AliasRef +} + +type AliasRef interface { + IsEmpty() bool + String() string +} + +type aliasValue string + +func (a aliasValue) IsEmpty() bool { return string(a) == "" } +func (a aliasValue) String() string { return string(a) } +func (a *AliasedExpr) isSelectExpr() {} + +type TableExpr interface { + isTableExpr() +} + +type AliasedTableExpr struct { + Expr interface{} +} + +func (a *AliasedTableExpr) isTableExpr() {} + +type TableName struct { + Name StringGetter + Qualifier StringGetter +} + +type ExprNode interface { + isExprNode() +} + +type FuncExpr struct { + Name StringGetter + Exprs []SelectExpr +} + +func (f *FuncExpr) isExprNode() {} + +type ColName struct { + Name StringGetter +} + +func (c *ColName) isExprNode() {} + +// ArithmeticExpr represents arithmetic operations like id+user_id and string concatenation like name||suffix +type ArithmeticExpr struct { + Left ExprNode + Right ExprNode + Operator string // +, -, *, /, %, || +} + +func (a *ArithmeticExpr) isExprNode() {} + +type ComparisonExpr struct { + Left ExprNode + Right ExprNode + Operator string +} + +func (c *ComparisonExpr) isExprNode() {} + +type AndExpr struct { + Left ExprNode + Right ExprNode +} + +func (a *AndExpr) isExprNode() {} + +type OrExpr struct { + Left ExprNode + Right ExprNode +} + +func (o *OrExpr) isExprNode() {} + +type ParenExpr struct { + Expr ExprNode +} + +func (p *ParenExpr) isExprNode() {} + +type SQLVal struct { + Type int + Val []byte +} + +func (s *SQLVal) isExprNode() {} + +type ValTuple []ExprNode + +func (v ValTuple) isExprNode() {} + +type IntervalExpr struct { + Value string // The interval value (e.g., "1 hour", "30 minutes") + Unit string // The unit (parsed from value) +} + +func (i *IntervalExpr) isExprNode() {} + +type BetweenExpr struct { + Left ExprNode // The expression to test + From ExprNode // Lower bound (inclusive) + To ExprNode // Upper bound (inclusive) + Not bool // true for NOT BETWEEN +} + +func (b *BetweenExpr) isExprNode() {} + +type IsNullExpr struct { + Expr ExprNode // The expression to test for null +} + +func (i *IsNullExpr) isExprNode() {} + +type IsNotNullExpr struct { + Expr ExprNode // The expression to test for not null +} + +func (i *IsNotNullExpr) isExprNode() {} + +// SQLVal types +const ( + IntVal = iota + StrVal + FloatVal +) + +// Operator constants +const ( + CreateStr = "create" + AlterStr = "alter" + DropStr = "drop" + EqualStr = "=" + LessThanStr = "<" + GreaterThanStr = ">" + LessEqualStr = "<=" + GreaterEqualStr = ">=" + NotEqualStr = "!=" +) + +// parseIdentifier properly parses a potentially quoted identifier (database/table name) +func parseIdentifier(identifier string) string { + identifier = strings.TrimSpace(identifier) + identifier = strings.TrimSuffix(identifier, ";") // Remove trailing semicolon + + // Handle double quotes (PostgreSQL standard) + if len(identifier) >= 2 && identifier[0] == '"' && identifier[len(identifier)-1] == '"' { + return identifier[1 : len(identifier)-1] + } + + // Handle backticks (MySQL compatibility) + if len(identifier) >= 2 && identifier[0] == '`' && identifier[len(identifier)-1] == '`' { + return identifier[1 : len(identifier)-1] + } + + return identifier +} + +// ParseSQL parses PostgreSQL-compatible SQL statements using CockroachDB parser for SELECT queries +func ParseSQL(sql string) (Statement, error) { + sql = strings.TrimSpace(sql) + sqlUpper := strings.ToUpper(sql) + + // Handle USE statement + if strings.HasPrefix(sqlUpper, "USE ") { + parts := strings.Fields(sql) + if len(parts) < 2 { + return nil, fmt.Errorf("USE statement requires a database name") + } + // Parse the database name properly, handling quoted identifiers + dbName := parseIdentifier(strings.Join(parts[1:], " ")) + return &UseStatement{Database: dbName}, nil + } + + // Handle DESCRIBE/DESC statements as aliases for SHOW COLUMNS FROM + if strings.HasPrefix(sqlUpper, "DESCRIBE ") || strings.HasPrefix(sqlUpper, "DESC ") { + parts := strings.Fields(sql) + if len(parts) < 2 { + return nil, fmt.Errorf("DESCRIBE/DESC statement requires a table name") + } + + var tableName string + var database string + + // Get the raw table name (before parsing identifiers) + var rawTableName string + if len(parts) >= 3 && strings.ToUpper(parts[1]) == "TABLE" { + rawTableName = parts[2] + } else { + rawTableName = parts[1] + } + + // Parse database.table format first, then apply parseIdentifier to each part + if strings.Contains(rawTableName, ".") { + // Handle quoted database.table like "db"."table" + if strings.HasPrefix(rawTableName, "\"") || strings.HasPrefix(rawTableName, "`") { + // Find the closing quote and the dot + var quoteChar byte = '"' + if rawTableName[0] == '`' { + quoteChar = '`' + } + + // Find the matching closing quote + closingIndex := -1 + for i := 1; i < len(rawTableName); i++ { + if rawTableName[i] == quoteChar { + closingIndex = i + break + } + } + + if closingIndex != -1 && closingIndex+1 < len(rawTableName) && rawTableName[closingIndex+1] == '.' { + // Valid quoted database name + database = parseIdentifier(rawTableName[:closingIndex+1]) + tableName = parseIdentifier(rawTableName[closingIndex+2:]) + } else { + // Fall back to simple split then parse + dbTableParts := strings.SplitN(rawTableName, ".", 2) + database = parseIdentifier(dbTableParts[0]) + tableName = parseIdentifier(dbTableParts[1]) + } + } else { + // Simple case: no quotes, just split then parse + dbTableParts := strings.SplitN(rawTableName, ".", 2) + database = parseIdentifier(dbTableParts[0]) + tableName = parseIdentifier(dbTableParts[1]) + } + } else { + // No database.table format, just parse the table name + tableName = parseIdentifier(rawTableName) + } + + stmt := &ShowStatement{Type: "columns"} + stmt.OnTable.Name = stringValue(tableName) + if database != "" { + stmt.OnTable.Qualifier = stringValue(database) + } + return stmt, nil + } + + // Handle SHOW statements (keep custom parsing for these simple cases) + if strings.HasPrefix(sqlUpper, "SHOW DATABASES") || strings.HasPrefix(sqlUpper, "SHOW SCHEMAS") { + return &ShowStatement{Type: "databases"}, nil + } + if strings.HasPrefix(sqlUpper, "SHOW TABLES") { + stmt := &ShowStatement{Type: "tables"} + // Handle "SHOW TABLES FROM database" syntax + if strings.Contains(sqlUpper, "FROM") { + partsUpper := strings.Fields(sqlUpper) + partsOriginal := strings.Fields(sql) // Use original casing + for i, part := range partsUpper { + if part == "FROM" && i+1 < len(partsOriginal) { + // Parse the database name properly + dbName := parseIdentifier(partsOriginal[i+1]) + stmt.Schema = dbName // Set the Schema field for the test + stmt.OnTable.Name = stringValue(dbName) // Keep for compatibility + break + } + } + } + return stmt, nil + } + if strings.HasPrefix(sqlUpper, "SHOW COLUMNS FROM") { + // Parse "SHOW COLUMNS FROM table" or "SHOW COLUMNS FROM database.table" + parts := strings.Fields(sql) + if len(parts) < 4 { + return nil, fmt.Errorf("SHOW COLUMNS FROM statement requires a table name") + } + + // Get the raw table name (before parsing identifiers) + rawTableName := parts[3] + var tableName string + var database string + + // Parse database.table format first, then apply parseIdentifier to each part + if strings.Contains(rawTableName, ".") { + // Handle quoted database.table like "db"."table" + if strings.HasPrefix(rawTableName, "\"") || strings.HasPrefix(rawTableName, "`") { + // Find the closing quote and the dot + var quoteChar byte = '"' + if rawTableName[0] == '`' { + quoteChar = '`' + } + + // Find the matching closing quote + closingIndex := -1 + for i := 1; i < len(rawTableName); i++ { + if rawTableName[i] == quoteChar { + closingIndex = i + break + } + } + + if closingIndex != -1 && closingIndex+1 < len(rawTableName) && rawTableName[closingIndex+1] == '.' { + // Valid quoted database name + database = parseIdentifier(rawTableName[:closingIndex+1]) + tableName = parseIdentifier(rawTableName[closingIndex+2:]) + } else { + // Fall back to simple split then parse + dbTableParts := strings.SplitN(rawTableName, ".", 2) + database = parseIdentifier(dbTableParts[0]) + tableName = parseIdentifier(dbTableParts[1]) + } + } else { + // Simple case: no quotes, just split then parse + dbTableParts := strings.SplitN(rawTableName, ".", 2) + database = parseIdentifier(dbTableParts[0]) + tableName = parseIdentifier(dbTableParts[1]) + } + } else { + // No database.table format, just parse the table name + tableName = parseIdentifier(rawTableName) + } + + stmt := &ShowStatement{Type: "columns"} + stmt.OnTable.Name = stringValue(tableName) + if database != "" { + stmt.OnTable.Qualifier = stringValue(database) + } + return stmt, nil + } + + // Use CockroachDB parser for SELECT statements + if strings.HasPrefix(sqlUpper, "SELECT") { + parser := NewCockroachSQLParser() + return parser.ParseSQL(sql) + } + + return nil, UnsupportedFeatureError{ + Feature: fmt.Sprintf("statement type: %s", strings.Fields(sqlUpper)[0]), + Reason: "statement parsing not implemented", + } +} + +// extractFunctionArguments extracts the arguments from a function call expression using CockroachDB parser +func extractFunctionArguments(expr string) ([]SelectExpr, error) { + // Find the parentheses + startParen := strings.Index(expr, "(") + endParen := strings.LastIndex(expr, ")") + + if startParen == -1 || endParen == -1 || endParen <= startParen { + return nil, fmt.Errorf("invalid function syntax") + } + + // Extract arguments string + argsStr := strings.TrimSpace(expr[startParen+1 : endParen]) + + // Handle empty arguments + if argsStr == "" { + return []SelectExpr{}, nil + } + + // Handle single * argument (for COUNT(*)) + if argsStr == "*" { + return []SelectExpr{&StarExpr{}}, nil + } + + // Parse multiple arguments separated by commas + args := []SelectExpr{} + argParts := strings.Split(argsStr, ",") + + // Use CockroachDB parser to parse each argument as a SELECT expression + cockroachParser := NewCockroachSQLParser() + + for _, argPart := range argParts { + argPart = strings.TrimSpace(argPart) + if argPart == "*" { + args = append(args, &StarExpr{}) + } else { + // Create a dummy SELECT statement to parse the argument expression + dummySelect := fmt.Sprintf("SELECT %s", argPart) + + // Parse using CockroachDB parser + stmt, err := cockroachParser.ParseSQL(dummySelect) + if err != nil { + // If CockroachDB parser fails, fall back to simple column name + args = append(args, &AliasedExpr{ + Expr: &ColName{Name: stringValue(argPart)}, + }) + continue + } + + // Extract the expression from the parsed SELECT statement + if selectStmt, ok := stmt.(*SelectStatement); ok && len(selectStmt.SelectExprs) > 0 { + args = append(args, selectStmt.SelectExprs[0]) + } else { + // Fallback to column name if parsing fails + args = append(args, &AliasedExpr{ + Expr: &ColName{Name: stringValue(argPart)}, + }) + } + } + } + + return args, nil +} + +// debugModeKey is used to store debug mode flag in context +type debugModeKey struct{} + +// isDebugMode checks if we're in debug/explain mode +func isDebugMode(ctx context.Context) bool { + debug, ok := ctx.Value(debugModeKey{}).(bool) + return ok && debug +} + +// withDebugMode returns a context with debug mode enabled +func withDebugMode(ctx context.Context) context.Context { + return context.WithValue(ctx, debugModeKey{}, true) +} + +// LogBufferStart tracks the starting buffer index for a file +// Buffer indexes are monotonically increasing, count = len(chunks) +type LogBufferStart struct { + StartIndex int64 `json:"start_index"` // Starting buffer index (count = len(chunks)) +} + +// SQLEngine provides SQL query execution capabilities for SeaweedFS +// Assumptions: +// 1. MQ namespaces map directly to SQL databases +// 2. MQ topics map directly to SQL tables +// 3. Schema evolution is handled transparently with backward compatibility +// 4. Queries run against Parquet-stored MQ messages +type SQLEngine struct { + catalog *SchemaCatalog +} + +// NewSQLEngine creates a new SQL execution engine +// Uses master address for service discovery and initialization +func NewSQLEngine(masterAddress string) *SQLEngine { + // Initialize global HTTP client if not already done + // This is needed for reading partition data from the filer + if util_http.GetGlobalHttpClient() == nil { + util_http.InitGlobalHttpClient() + } + + return &SQLEngine{ + catalog: NewSchemaCatalog(masterAddress), + } +} + +// NewSQLEngineWithCatalog creates a new SQL execution engine with a custom catalog +// Used for testing or when you want to provide a pre-configured catalog +func NewSQLEngineWithCatalog(catalog *SchemaCatalog) *SQLEngine { + // Initialize global HTTP client if not already done + // This is needed for reading partition data from the filer + if util_http.GetGlobalHttpClient() == nil { + util_http.InitGlobalHttpClient() + } + + return &SQLEngine{ + catalog: catalog, + } +} + +// GetCatalog returns the schema catalog for external access +func (e *SQLEngine) GetCatalog() *SchemaCatalog { + return e.catalog +} + +// ExecuteSQL parses and executes a SQL statement +// Assumptions: +// 1. All SQL statements are PostgreSQL-compatible via pg_query_go +// 2. DDL operations (CREATE/ALTER/DROP) modify underlying MQ topics +// 3. DML operations (SELECT) query Parquet files directly +// 4. Error handling follows PostgreSQL conventions +func (e *SQLEngine) ExecuteSQL(ctx context.Context, sql string) (*QueryResult, error) { + startTime := time.Now() + + // Handle EXPLAIN as a special case + sqlTrimmed := strings.TrimSpace(sql) + sqlUpper := strings.ToUpper(sqlTrimmed) + if strings.HasPrefix(sqlUpper, "EXPLAIN") { + // Extract the actual query after EXPLAIN + actualSQL := strings.TrimSpace(sqlTrimmed[7:]) // Remove "EXPLAIN" + return e.executeExplain(ctx, actualSQL, startTime) + } + + // Parse the SQL statement using PostgreSQL parser + stmt, err := ParseSQL(sql) + if err != nil { + return &QueryResult{ + Error: fmt.Errorf("SQL parse error: %v", err), + }, err + } + + // Route to appropriate handler based on statement type + switch stmt := stmt.(type) { + case *ShowStatement: + return e.executeShowStatementWithDescribe(ctx, stmt) + case *UseStatement: + return e.executeUseStatement(ctx, stmt) + case *DDLStatement: + return e.executeDDLStatement(ctx, stmt) + case *SelectStatement: + return e.executeSelectStatement(ctx, stmt) + default: + err := fmt.Errorf("unsupported SQL statement type: %T", stmt) + return &QueryResult{Error: err}, err + } +} + +// executeExplain handles EXPLAIN statements by executing the query with plan tracking +func (e *SQLEngine) executeExplain(ctx context.Context, actualSQL string, startTime time.Time) (*QueryResult, error) { + // Enable debug mode for EXPLAIN queries + ctx = withDebugMode(ctx) + + // Parse the actual SQL statement using PostgreSQL parser + stmt, err := ParseSQL(actualSQL) + if err != nil { + return &QueryResult{ + Error: fmt.Errorf("SQL parse error in EXPLAIN query: %v", err), + }, err + } + + // Create execution plan + plan := &QueryExecutionPlan{ + QueryType: strings.ToUpper(strings.Fields(actualSQL)[0]), + DataSources: []string{}, + OptimizationsUsed: []string{}, + Details: make(map[string]interface{}), + } + + var result *QueryResult + + // Route to appropriate handler based on statement type (with plan tracking) + switch stmt := stmt.(type) { + case *SelectStatement: + result, err = e.executeSelectStatementWithPlan(ctx, stmt, plan) + if err != nil { + plan.Details["error"] = err.Error() + } + case *ShowStatement: + plan.QueryType = "SHOW" + plan.ExecutionStrategy = "metadata_only" + result, err = e.executeShowStatementWithDescribe(ctx, stmt) + default: + err := fmt.Errorf("EXPLAIN not supported for statement type: %T", stmt) + return &QueryResult{Error: err}, err + } + + // Calculate execution time + plan.ExecutionTimeMs = float64(time.Since(startTime).Nanoseconds()) / 1e6 + + // Format execution plan as result + return e.formatExecutionPlan(plan, result, err) +} + +// formatExecutionPlan converts execution plan to a hierarchical tree format for display +func (e *SQLEngine) formatExecutionPlan(plan *QueryExecutionPlan, originalResult *QueryResult, originalErr error) (*QueryResult, error) { + columns := []string{"Query Execution Plan"} + rows := [][]sqltypes.Value{} + + var planLines []string + + // Use new tree structure if available, otherwise fallback to legacy format + if plan.RootNode != nil { + planLines = e.buildTreePlan(plan, originalErr) + } else { + // Build legacy hierarchical plan display + planLines = e.buildHierarchicalPlan(plan, originalErr) + } + + for _, line := range planLines { + rows = append(rows, []sqltypes.Value{ + sqltypes.NewVarChar(line), + }) + } + + if originalErr != nil { + return &QueryResult{ + Columns: columns, + Rows: rows, + ExecutionPlan: plan, + Error: originalErr, + }, originalErr + } + + return &QueryResult{ + Columns: columns, + Rows: rows, + ExecutionPlan: plan, + }, nil +} + +// buildTreePlan creates the new tree-based execution plan display +func (e *SQLEngine) buildTreePlan(plan *QueryExecutionPlan, err error) []string { + var lines []string + + // Root header + lines = append(lines, fmt.Sprintf("%s Query (%s)", plan.QueryType, plan.ExecutionStrategy)) + + // Build the execution tree + if plan.RootNode != nil { + // Root execution node is always the last (and only) child of SELECT Query + treeLines := e.formatExecutionNode(plan.RootNode, "└── ", " ", true) + lines = append(lines, treeLines...) + } + + // Add error information if present + if err != nil { + lines = append(lines, "") + lines = append(lines, fmt.Sprintf("Error: %v", err)) + } + + return lines +} + +// formatExecutionNode recursively formats execution tree nodes +func (e *SQLEngine) formatExecutionNode(node ExecutionNode, prefix, childPrefix string, isRoot bool) []string { + var lines []string + + description := node.GetDescription() + + // Format the current node + if isRoot { + lines = append(lines, fmt.Sprintf("%s%s", prefix, description)) + } else { + lines = append(lines, fmt.Sprintf("%s%s", prefix, description)) + } + + // Add node-specific details + switch n := node.(type) { + case *FileSourceNode: + lines = e.formatFileSourceDetails(lines, n, childPrefix, isRoot) + case *ScanOperationNode: + lines = e.formatScanOperationDetails(lines, n, childPrefix, isRoot) + case *MergeOperationNode: + lines = e.formatMergeOperationDetails(lines, n, childPrefix, isRoot) + } + + // Format children + children := node.GetChildren() + if len(children) > 0 { + for i, child := range children { + isLastChild := i == len(children)-1 + + var nextPrefix, nextChildPrefix string + if isLastChild { + nextPrefix = childPrefix + "└── " + nextChildPrefix = childPrefix + " " + } else { + nextPrefix = childPrefix + "├── " + nextChildPrefix = childPrefix + "│ " + } + + childLines := e.formatExecutionNode(child, nextPrefix, nextChildPrefix, false) + lines = append(lines, childLines...) + } + } + + return lines +} + +// formatFileSourceDetails adds details for file source nodes +func (e *SQLEngine) formatFileSourceDetails(lines []string, node *FileSourceNode, childPrefix string, isRoot bool) []string { + prefix := childPrefix + if isRoot { + prefix = "│ " + } + + // Add predicates + if len(node.Predicates) > 0 { + lines = append(lines, fmt.Sprintf("%s├── Predicates: %s", prefix, strings.Join(node.Predicates, " AND "))) + } + + // Add operations + if len(node.Operations) > 0 { + lines = append(lines, fmt.Sprintf("%s└── Operations: %s", prefix, strings.Join(node.Operations, " + "))) + } else if len(node.Predicates) == 0 { + lines = append(lines, fmt.Sprintf("%s└── Operation: full_scan", prefix)) + } + + return lines +} + +// formatScanOperationDetails adds details for scan operation nodes +func (e *SQLEngine) formatScanOperationDetails(lines []string, node *ScanOperationNode, childPrefix string, isRoot bool) []string { + prefix := childPrefix + if isRoot { + prefix = "│ " + } + + hasChildren := len(node.Children) > 0 + + // Add predicates if present + if len(node.Predicates) > 0 { + if hasChildren { + lines = append(lines, fmt.Sprintf("%s├── Predicates: %s", prefix, strings.Join(node.Predicates, " AND "))) + } else { + lines = append(lines, fmt.Sprintf("%s└── Predicates: %s", prefix, strings.Join(node.Predicates, " AND "))) + } + } + + return lines +} + +// formatMergeOperationDetails adds details for merge operation nodes +func (e *SQLEngine) formatMergeOperationDetails(lines []string, node *MergeOperationNode, childPrefix string, isRoot bool) []string { + hasChildren := len(node.Children) > 0 + + // Add merge strategy info only if we have children, with proper indentation + if strategy, exists := node.Details["merge_strategy"]; exists && hasChildren { + // Strategy should be indented as a detail of this node, before its children + lines = append(lines, fmt.Sprintf("%s├── Strategy: %v", childPrefix, strategy)) + } + + return lines +} + +// buildHierarchicalPlan creates a tree-like structure for the execution plan +func (e *SQLEngine) buildHierarchicalPlan(plan *QueryExecutionPlan, err error) []string { + var lines []string + + // Root node - Query type and strategy + lines = append(lines, fmt.Sprintf("%s Query (%s)", plan.QueryType, plan.ExecutionStrategy)) + + // Aggregations section (if present) + if len(plan.Aggregations) > 0 { + lines = append(lines, "├── Aggregations") + for i, agg := range plan.Aggregations { + if i == len(plan.Aggregations)-1 { + lines = append(lines, fmt.Sprintf("│ └── %s", agg)) + } else { + lines = append(lines, fmt.Sprintf("│ ├── %s", agg)) + } + } + } + + // Data Sources section + if len(plan.DataSources) > 0 { + hasMore := len(plan.OptimizationsUsed) > 0 || plan.TotalRowsProcessed > 0 || len(plan.Details) > 0 || err != nil + if hasMore { + lines = append(lines, "├── Data Sources") + } else { + lines = append(lines, "└── Data Sources") + } + + for i, source := range plan.DataSources { + prefix := "│ " + if !hasMore && i == len(plan.DataSources)-1 { + prefix = " " + } + + if i == len(plan.DataSources)-1 { + lines = append(lines, fmt.Sprintf("%s└── %s", prefix, e.formatDataSource(source))) + } else { + lines = append(lines, fmt.Sprintf("%s├── %s", prefix, e.formatDataSource(source))) + } + } + } + + // Optimizations section + if len(plan.OptimizationsUsed) > 0 { + hasMore := plan.TotalRowsProcessed > 0 || len(plan.Details) > 0 || err != nil + if hasMore { + lines = append(lines, "├── Optimizations") + } else { + lines = append(lines, "└── Optimizations") + } + + for i, opt := range plan.OptimizationsUsed { + prefix := "│ " + if !hasMore && i == len(plan.OptimizationsUsed)-1 { + prefix = " " + } + + if i == len(plan.OptimizationsUsed)-1 { + lines = append(lines, fmt.Sprintf("%s└── %s", prefix, e.formatOptimization(opt))) + } else { + lines = append(lines, fmt.Sprintf("%s├── %s", prefix, e.formatOptimization(opt))) + } + } + } + + // Check for data sources tree availability + partitionPaths, hasPartitions := plan.Details["partition_paths"].([]string) + parquetFiles, _ := plan.Details["parquet_files"].([]string) + liveLogFiles, _ := plan.Details["live_log_files"].([]string) + + // Statistics section + statisticsPresent := plan.PartitionsScanned > 0 || plan.ParquetFilesScanned > 0 || + plan.LiveLogFilesScanned > 0 || plan.TotalRowsProcessed > 0 + + if statisticsPresent { + // Check if there are sections after Statistics (Data Sources Tree, Details, Performance) + hasDataSourcesTree := hasPartitions && len(partitionPaths) > 0 + hasMoreAfterStats := hasDataSourcesTree || len(plan.Details) > 0 || err != nil || true // Performance is always present + if hasMoreAfterStats { + lines = append(lines, "├── Statistics") + } else { + lines = append(lines, "└── Statistics") + } + + stats := []string{} + if plan.PartitionsScanned > 0 { + stats = append(stats, fmt.Sprintf("Partitions Scanned: %d", plan.PartitionsScanned)) + } + if plan.ParquetFilesScanned > 0 { + stats = append(stats, fmt.Sprintf("Parquet Files: %d", plan.ParquetFilesScanned)) + } + if plan.LiveLogFilesScanned > 0 { + stats = append(stats, fmt.Sprintf("Live Log Files: %d", plan.LiveLogFilesScanned)) + } + // Always show row statistics for aggregations, even if 0 (to show fast path efficiency) + if resultsReturned, hasResults := plan.Details["results_returned"]; hasResults { + stats = append(stats, fmt.Sprintf("Rows Scanned: %d", plan.TotalRowsProcessed)) + stats = append(stats, fmt.Sprintf("Results Returned: %v", resultsReturned)) + + // Add fast path explanation when no rows were scanned + if plan.TotalRowsProcessed == 0 { + // Use the actual scan method from Details instead of hardcoding + if scanMethod, exists := plan.Details["scan_method"].(string); exists { + stats = append(stats, fmt.Sprintf("Scan Method: %s", scanMethod)) + } else { + stats = append(stats, "Scan Method: Metadata Only") + } + } + } else if plan.TotalRowsProcessed > 0 { + stats = append(stats, fmt.Sprintf("Rows Processed: %d", plan.TotalRowsProcessed)) + } + + // Broker buffer information + if plan.BrokerBufferQueried { + stats = append(stats, fmt.Sprintf("Broker Buffer Queried: Yes (%d messages)", plan.BrokerBufferMessages)) + if plan.BufferStartIndex > 0 { + stats = append(stats, fmt.Sprintf("Buffer Start Index: %d (deduplication enabled)", plan.BufferStartIndex)) + } + } + + for i, stat := range stats { + if hasMoreAfterStats { + // More sections after Statistics, so use │ prefix + if i == len(stats)-1 { + lines = append(lines, fmt.Sprintf("│ └── %s", stat)) + } else { + lines = append(lines, fmt.Sprintf("│ ├── %s", stat)) + } + } else { + // This is the last main section, so use space prefix for final item + if i == len(stats)-1 { + lines = append(lines, fmt.Sprintf(" └── %s", stat)) + } else { + lines = append(lines, fmt.Sprintf(" ├── %s", stat)) + } + } + } + } + + // Data Sources Tree section (if file paths are available) + if hasPartitions && len(partitionPaths) > 0 { + // Check if there are more sections after this + hasMore := len(plan.Details) > 0 || err != nil + if hasMore { + lines = append(lines, "├── Data Sources Tree") + } else { + lines = append(lines, "├── Data Sources Tree") // Performance always comes after + } + + // Build a tree structure for each partition + for i, partition := range partitionPaths { + isLastPartition := i == len(partitionPaths)-1 + + // Show partition directory + partitionPrefix := "├── " + if isLastPartition { + partitionPrefix = "└── " + } + lines = append(lines, fmt.Sprintf("│ %s%s/", partitionPrefix, partition)) + + // Show parquet files in this partition + partitionParquetFiles := make([]string, 0) + for _, file := range parquetFiles { + if strings.HasPrefix(file, partition+"/") { + fileName := file[len(partition)+1:] + partitionParquetFiles = append(partitionParquetFiles, fileName) + } + } + + // Show live log files in this partition + partitionLiveLogFiles := make([]string, 0) + for _, file := range liveLogFiles { + if strings.HasPrefix(file, partition+"/") { + fileName := file[len(partition)+1:] + partitionLiveLogFiles = append(partitionLiveLogFiles, fileName) + } + } + + // Display files with proper tree formatting + totalFiles := len(partitionParquetFiles) + len(partitionLiveLogFiles) + fileIndex := 0 + + // Display parquet files + for _, fileName := range partitionParquetFiles { + fileIndex++ + isLastFile := fileIndex == totalFiles && isLastPartition + + var filePrefix string + if isLastPartition { + if isLastFile { + filePrefix = " └── " + } else { + filePrefix = " ├── " + } + } else { + if isLastFile { + filePrefix = "│ └── " + } else { + filePrefix = "│ ├── " + } + } + lines = append(lines, fmt.Sprintf("│ %s%s (parquet)", filePrefix, fileName)) + } + + // Display live log files + for _, fileName := range partitionLiveLogFiles { + fileIndex++ + isLastFile := fileIndex == totalFiles && isLastPartition + + var filePrefix string + if isLastPartition { + if isLastFile { + filePrefix = " └── " + } else { + filePrefix = " ├── " + } + } else { + if isLastFile { + filePrefix = "│ └── " + } else { + filePrefix = "│ ├── " + } + } + lines = append(lines, fmt.Sprintf("│ %s%s (live log)", filePrefix, fileName)) + } + } + } + + // Details section + // Filter out details that are shown elsewhere + filteredDetails := make([]string, 0) + for key, value := range plan.Details { + // Skip keys that are already formatted and displayed in the Statistics section + if key != "results_returned" && key != "partition_paths" && key != "parquet_files" && key != "live_log_files" { + filteredDetails = append(filteredDetails, fmt.Sprintf("%s: %v", key, value)) + } + } + + if len(filteredDetails) > 0 { + // Performance is always present, so check if there are errors after Details + hasMore := err != nil + if hasMore { + lines = append(lines, "├── Details") + } else { + lines = append(lines, "├── Details") // Performance always comes after + } + + for i, detail := range filteredDetails { + if i == len(filteredDetails)-1 { + lines = append(lines, fmt.Sprintf("│ └── %s", detail)) + } else { + lines = append(lines, fmt.Sprintf("│ ├── %s", detail)) + } + } + } + + // Performance section (always present) + if err != nil { + lines = append(lines, "├── Performance") + lines = append(lines, fmt.Sprintf("│ └── Execution Time: %.3fms", plan.ExecutionTimeMs)) + lines = append(lines, "└── Error") + lines = append(lines, fmt.Sprintf(" └── %s", err.Error())) + } else { + lines = append(lines, "└── Performance") + lines = append(lines, fmt.Sprintf(" └── Execution Time: %.3fms", plan.ExecutionTimeMs)) + } + + return lines +} + +// formatDataSource provides user-friendly names for data sources +func (e *SQLEngine) formatDataSource(source string) string { + switch source { + case "parquet_stats": + return "Parquet Statistics (fast path)" + case "parquet_files": + return "Parquet Files (full scan)" + case "live_logs": + return "Live Log Files" + case "broker_buffer": + return "Broker Buffer (real-time)" + default: + return source + } +} + +// buildExecutionTree creates a tree representation of the query execution plan +func (e *SQLEngine) buildExecutionTree(plan *QueryExecutionPlan, stmt *SelectStatement) ExecutionNode { + // Extract WHERE clause predicates for pushdown analysis + var predicates []string + if stmt.Where != nil { + predicates = e.extractPredicateStrings(stmt.Where.Expr) + } + + // Check if we have detailed file information + partitionPaths, hasPartitions := plan.Details["partition_paths"].([]string) + parquetFiles, hasParquetFiles := plan.Details["parquet_files"].([]string) + liveLogFiles, hasLiveLogFiles := plan.Details["live_log_files"].([]string) + + if !hasPartitions || len(partitionPaths) == 0 { + // Fallback: create simple structure without file details + return &ScanOperationNode{ + ScanType: "hybrid_scan", + Description: fmt.Sprintf("Hybrid Scan (%s)", plan.ExecutionStrategy), + Predicates: predicates, + Details: map[string]interface{}{ + "note": "File details not available", + }, + } + } + + // Build file source nodes + var parquetNodes []ExecutionNode + var liveLogNodes []ExecutionNode + var brokerBufferNodes []ExecutionNode + + // Create parquet file nodes + if hasParquetFiles { + for _, filePath := range parquetFiles { + operations := e.determineParquetOperations(plan, filePath) + parquetNodes = append(parquetNodes, &FileSourceNode{ + FilePath: filePath, + SourceType: "parquet", + Predicates: predicates, + Operations: operations, + OptimizationHint: e.determineOptimizationHint(plan, "parquet"), + Details: map[string]interface{}{ + "format": "parquet", + }, + }) + } + } + + // Create live log file nodes + if hasLiveLogFiles { + for _, filePath := range liveLogFiles { + operations := e.determineLiveLogOperations(plan, filePath) + liveLogNodes = append(liveLogNodes, &FileSourceNode{ + FilePath: filePath, + SourceType: "live_log", + Predicates: predicates, + Operations: operations, + OptimizationHint: e.determineOptimizationHint(plan, "live_log"), + Details: map[string]interface{}{ + "format": "log_entry", + }, + }) + } + } + + // Create broker buffer node if queried + if plan.BrokerBufferQueried { + brokerBufferNodes = append(brokerBufferNodes, &FileSourceNode{ + FilePath: "broker_memory_buffer", + SourceType: "broker_buffer", + Predicates: predicates, + Operations: []string{"memory_scan"}, + OptimizationHint: "real_time", + Details: map[string]interface{}{ + "messages": plan.BrokerBufferMessages, + "buffer_start_idx": plan.BufferStartIndex, + }, + }) + } + + // Build the tree structure based on data sources + var scanNodes []ExecutionNode + + // Add parquet scan node ONLY if there are actual parquet files + if len(parquetNodes) > 0 { + scanNodes = append(scanNodes, &ScanOperationNode{ + ScanType: "parquet_scan", + Description: fmt.Sprintf("Parquet File Scan (%d files)", len(parquetNodes)), + Predicates: predicates, + Children: parquetNodes, + Details: map[string]interface{}{ + "files_count": len(parquetNodes), + "pushdown": "column_projection + predicate_filtering", + }, + }) + } + + // Add live log scan node ONLY if there are actual live log files + if len(liveLogNodes) > 0 { + scanNodes = append(scanNodes, &ScanOperationNode{ + ScanType: "live_log_scan", + Description: fmt.Sprintf("Live Log Scan (%d files)", len(liveLogNodes)), + Predicates: predicates, + Children: liveLogNodes, + Details: map[string]interface{}{ + "files_count": len(liveLogNodes), + "pushdown": "predicate_filtering", + }, + }) + } + + // Add broker buffer scan node ONLY if buffer was actually queried + if len(brokerBufferNodes) > 0 { + scanNodes = append(scanNodes, &ScanOperationNode{ + ScanType: "broker_buffer_scan", + Description: "Real-time Buffer Scan", + Predicates: predicates, + Children: brokerBufferNodes, + Details: map[string]interface{}{ + "real_time": true, + }, + }) + } + + // Debug: Check what we actually have + totalFileNodes := len(parquetNodes) + len(liveLogNodes) + len(brokerBufferNodes) + if totalFileNodes == 0 { + // No actual files found, return simple fallback + return &ScanOperationNode{ + ScanType: "hybrid_scan", + Description: fmt.Sprintf("Hybrid Scan (%s)", plan.ExecutionStrategy), + Predicates: predicates, + Details: map[string]interface{}{ + "note": "No source files discovered", + }, + } + } + + // If no scan nodes, return a fallback structure + if len(scanNodes) == 0 { + return &ScanOperationNode{ + ScanType: "hybrid_scan", + Description: fmt.Sprintf("Hybrid Scan (%s)", plan.ExecutionStrategy), + Predicates: predicates, + Details: map[string]interface{}{ + "note": "No file details available", + }, + } + } + + // If only one scan type, return it directly + if len(scanNodes) == 1 { + return scanNodes[0] + } + + // Multiple scan types - need merge operation + return &MergeOperationNode{ + OperationType: "chronological_merge", + Description: "Chronological Merge (time-ordered)", + Children: scanNodes, + Details: map[string]interface{}{ + "merge_strategy": "timestamp_based", + "sources_count": len(scanNodes), + }, + } +} + +// extractPredicateStrings extracts predicate descriptions from WHERE clause +func (e *SQLEngine) extractPredicateStrings(expr ExprNode) []string { + var predicates []string + e.extractPredicateStringsRecursive(expr, &predicates) + return predicates +} + +func (e *SQLEngine) extractPredicateStringsRecursive(expr ExprNode, predicates *[]string) { + switch exprType := expr.(type) { + case *ComparisonExpr: + *predicates = append(*predicates, fmt.Sprintf("%s %s %s", + e.exprToString(exprType.Left), exprType.Operator, e.exprToString(exprType.Right))) + case *IsNullExpr: + *predicates = append(*predicates, fmt.Sprintf("%s IS NULL", e.exprToString(exprType.Expr))) + case *IsNotNullExpr: + *predicates = append(*predicates, fmt.Sprintf("%s IS NOT NULL", e.exprToString(exprType.Expr))) + case *AndExpr: + e.extractPredicateStringsRecursive(exprType.Left, predicates) + e.extractPredicateStringsRecursive(exprType.Right, predicates) + case *OrExpr: + e.extractPredicateStringsRecursive(exprType.Left, predicates) + e.extractPredicateStringsRecursive(exprType.Right, predicates) + case *ParenExpr: + e.extractPredicateStringsRecursive(exprType.Expr, predicates) + } +} + +func (e *SQLEngine) exprToString(expr ExprNode) string { + switch exprType := expr.(type) { + case *ColName: + return exprType.Name.String() + default: + // For now, return a simplified representation + return fmt.Sprintf("%T", expr) + } +} + +// determineParquetOperations determines what operations will be performed on parquet files +func (e *SQLEngine) determineParquetOperations(plan *QueryExecutionPlan, filePath string) []string { + var operations []string + + // Check for column projection + if contains(plan.OptimizationsUsed, "column_projection") { + operations = append(operations, "column_projection") + } + + // Check for predicate pushdown + if contains(plan.OptimizationsUsed, "predicate_pushdown") { + operations = append(operations, "predicate_pushdown") + } + + // Check for statistics usage + if contains(plan.OptimizationsUsed, "parquet_statistics") || plan.ExecutionStrategy == "hybrid_fast_path" { + operations = append(operations, "statistics_skip") + } else { + operations = append(operations, "row_group_scan") + } + + if len(operations) == 0 { + operations = append(operations, "full_scan") + } + + return operations +} + +// determineLiveLogOperations determines what operations will be performed on live log files +func (e *SQLEngine) determineLiveLogOperations(plan *QueryExecutionPlan, filePath string) []string { + var operations []string + + // Live logs typically require sequential scan + operations = append(operations, "sequential_scan") + + // Check for predicate filtering + if contains(plan.OptimizationsUsed, "predicate_pushdown") { + operations = append(operations, "predicate_filtering") + } + + return operations +} + +// determineOptimizationHint determines the optimization hint for a data source +func (e *SQLEngine) determineOptimizationHint(plan *QueryExecutionPlan, sourceType string) string { + switch plan.ExecutionStrategy { + case "hybrid_fast_path": + if sourceType == "parquet" { + return "statistics_only" + } + return "minimal_scan" + case "full_scan": + return "full_scan" + case "column_projection": + return "column_filter" + default: + return "" + } +} + +// Helper function to check if slice contains string +func contains(slice []string, item string) bool { + for _, s := range slice { + if s == item { + return true + } + } + return false +} + +// collectLiveLogFileNames collects live log file names from a partition directory +func (e *SQLEngine) collectLiveLogFileNames(filerClient filer_pb.FilerClient, partitionPath string) ([]string, error) { + var liveLogFiles []string + + err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // List all files in partition directory + request := &filer_pb.ListEntriesRequest{ + Directory: partitionPath, + Prefix: "", + StartFromFileName: "", + InclusiveStartFrom: false, + Limit: 10000, // reasonable limit + } + + stream, err := client.ListEntries(context.Background(), request) + if err != nil { + return err + } + + for { + resp, err := stream.Recv() + if err != nil { + if err == io.EOF { + break + } + return err + } + + entry := resp.Entry + if entry != nil && !entry.IsDirectory { + // Check if this is a log file (not a parquet file) + fileName := entry.Name + if !strings.HasSuffix(fileName, ".parquet") && !strings.HasSuffix(fileName, ".metadata") { + liveLogFiles = append(liveLogFiles, fileName) + } + } + } + + return nil + }) + + if err != nil { + return nil, err + } + + return liveLogFiles, nil +} + +// formatOptimization provides user-friendly names for optimizations +func (e *SQLEngine) formatOptimization(opt string) string { + switch opt { + case "parquet_statistics": + return "Parquet Statistics Usage" + case "live_log_counting": + return "Live Log Row Counting" + case "deduplication": + return "Duplicate Data Avoidance" + case "predicate_pushdown": + return "WHERE Clause Pushdown" + case "column_projection": + return "Column Selection" + case "limit_pushdown": + return "LIMIT Optimization" + default: + return opt + } +} + +// executeUseStatement handles USE database statements to switch current database context +func (e *SQLEngine) executeUseStatement(ctx context.Context, stmt *UseStatement) (*QueryResult, error) { + // Validate database name + if stmt.Database == "" { + err := fmt.Errorf("database name cannot be empty") + return &QueryResult{Error: err}, err + } + + // Set the current database in the catalog + e.catalog.SetCurrentDatabase(stmt.Database) + + // Return success message + result := &QueryResult{ + Columns: []string{"message"}, + Rows: [][]sqltypes.Value{ + {sqltypes.MakeString([]byte(fmt.Sprintf("Database changed to: %s", stmt.Database)))}, + }, + Error: nil, + } + return result, nil +} + +// executeDDLStatement handles CREATE operations only +// Note: ALTER TABLE and DROP TABLE are not supported to protect topic data +func (e *SQLEngine) executeDDLStatement(ctx context.Context, stmt *DDLStatement) (*QueryResult, error) { + switch stmt.Action { + case CreateStr: + return e.createTable(ctx, stmt) + case AlterStr: + err := fmt.Errorf("ALTER TABLE is not supported") + return &QueryResult{Error: err}, err + case DropStr: + err := fmt.Errorf("DROP TABLE is not supported") + return &QueryResult{Error: err}, err + default: + err := fmt.Errorf("unsupported DDL action: %s", stmt.Action) + return &QueryResult{Error: err}, err + } +} + +// executeSelectStatementWithPlan handles SELECT queries with execution plan tracking +func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { + // Parse aggregations to populate plan + var aggregations []AggregationSpec + hasAggregations := false + selectAll := false + + for _, selectExpr := range stmt.SelectExprs { + switch expr := selectExpr.(type) { + case *StarExpr: + selectAll = true + case *AliasedExpr: + switch col := expr.Expr.(type) { + case *FuncExpr: + // This is an aggregation function + aggSpec, err := e.parseAggregationFunction(col, expr) + if err != nil { + return &QueryResult{Error: err}, err + } + if aggSpec != nil { + aggregations = append(aggregations, *aggSpec) + hasAggregations = true + plan.Aggregations = append(plan.Aggregations, aggSpec.Function+"("+aggSpec.Column+")") + } + } + } + } + + // Execute the query (handle aggregations specially for plan tracking) + var result *QueryResult + var err error + + if hasAggregations { + // Extract table information for aggregation execution + var database, tableName string + if len(stmt.From) == 1 { + if table, ok := stmt.From[0].(*AliasedTableExpr); ok { + if tableExpr, ok := table.Expr.(TableName); ok { + tableName = tableExpr.Name.String() + if tableExpr.Qualifier.String() != "" { + database = tableExpr.Qualifier.String() + } + } + } + } + + // Use current database if not specified + if database == "" { + database = e.catalog.currentDatabase + if database == "" { + database = "default" + } + } + + // Create hybrid scanner for aggregation execution + var filerClient filer_pb.FilerClient + if e.catalog.brokerClient != nil { + filerClient, err = e.catalog.brokerClient.GetFilerClient() + if err != nil { + return &QueryResult{Error: err}, err + } + } + + hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e) + if err != nil { + return &QueryResult{Error: err}, err + } + + // Execute aggregation query with plan tracking + result, err = e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, plan) + } else { + // Regular SELECT query with plan tracking + result, err = e.executeSelectStatementWithBrokerStats(ctx, stmt, plan) + } + + if err == nil && result != nil { + // Extract table name for use in execution strategy determination + var tableName string + if len(stmt.From) == 1 { + if table, ok := stmt.From[0].(*AliasedTableExpr); ok { + if tableExpr, ok := table.Expr.(TableName); ok { + tableName = tableExpr.Name.String() + } + } + } + + // Try to get topic information for partition count and row processing stats + if tableName != "" { + // Try to discover partitions for statistics + if partitions, discoverErr := e.discoverTopicPartitions("test", tableName); discoverErr == nil { + plan.PartitionsScanned = len(partitions) + } + + // For aggregations, determine actual processing based on execution strategy + if hasAggregations { + plan.Details["results_returned"] = len(result.Rows) + + // Determine actual work done based on execution strategy + if stmt.Where == nil { + // Use the same logic as actual execution to determine if fast path was used + var filerClient filer_pb.FilerClient + if e.catalog.brokerClient != nil { + filerClient, _ = e.catalog.brokerClient.GetFilerClient() + } + + hybridScanner, scannerErr := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, "test", tableName, e) + var canUseFastPath bool + if scannerErr == nil { + // Test if fast path can be used (same as actual execution) + _, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations) + canUseFastPath = canOptimize + } else { + // Fallback to simple check + canUseFastPath = true + for _, spec := range aggregations { + if !e.canUseParquetStatsForAggregation(spec) { + canUseFastPath = false + break + } + } + } + + if canUseFastPath { + // Fast path: minimal scanning (only live logs that weren't converted) + if actualScanCount, countErr := e.getActualRowsScannedForFastPath(ctx, "test", tableName); countErr == nil { + plan.TotalRowsProcessed = actualScanCount + } else { + plan.TotalRowsProcessed = 0 // Parquet stats only, no scanning + } + } else { + // Full scan: count all rows + if actualRowCount, countErr := e.getTopicTotalRowCount(ctx, "test", tableName); countErr == nil { + plan.TotalRowsProcessed = actualRowCount + } else { + plan.TotalRowsProcessed = int64(len(result.Rows)) + plan.Details["note"] = "scan_count_unavailable" + } + } + } else { + // With WHERE clause: full scan required + if actualRowCount, countErr := e.getTopicTotalRowCount(ctx, "test", tableName); countErr == nil { + plan.TotalRowsProcessed = actualRowCount + } else { + plan.TotalRowsProcessed = int64(len(result.Rows)) + plan.Details["note"] = "scan_count_unavailable" + } + } + } else { + // For non-aggregations, result count is meaningful + plan.TotalRowsProcessed = int64(len(result.Rows)) + } + } + + // Determine execution strategy based on query type (reuse fast path detection from above) + if hasAggregations { + // Skip execution strategy determination if plan was already populated by aggregation execution + // This prevents overwriting the correctly built plan from BuildAggregationPlan + if plan.ExecutionStrategy == "" { + // For aggregations, determine if fast path conditions are met + if stmt.Where == nil { + // Reuse the same logic used above for row counting + var canUseFastPath bool + if tableName != "" { + var filerClient filer_pb.FilerClient + if e.catalog.brokerClient != nil { + filerClient, _ = e.catalog.brokerClient.GetFilerClient() + } + + if filerClient != nil { + hybridScanner, scannerErr := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, "test", tableName, e) + if scannerErr == nil { + // Test if fast path can be used (same as actual execution) + _, canOptimize := e.tryFastParquetAggregation(ctx, hybridScanner, aggregations) + canUseFastPath = canOptimize + } else { + canUseFastPath = false + } + } else { + // Fallback check + canUseFastPath = true + for _, spec := range aggregations { + if !e.canUseParquetStatsForAggregation(spec) { + canUseFastPath = false + break + } + } + } + } else { + canUseFastPath = false + } + + if canUseFastPath { + plan.ExecutionStrategy = "hybrid_fast_path" + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "parquet_statistics", "live_log_counting", "deduplication") + plan.DataSources = []string{"parquet_stats", "live_logs"} + } else { + plan.ExecutionStrategy = "full_scan" + plan.DataSources = []string{"live_logs", "parquet_files"} + } + } else { + plan.ExecutionStrategy = "full_scan" + plan.DataSources = []string{"live_logs", "parquet_files"} + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "predicate_pushdown") + } + } + } else { + // For regular SELECT queries + if selectAll { + plan.ExecutionStrategy = "hybrid_scan" + plan.DataSources = []string{"live_logs", "parquet_files"} + } else { + plan.ExecutionStrategy = "column_projection" + plan.DataSources = []string{"live_logs", "parquet_files"} + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_projection") + } + } + + // Add WHERE clause information + if stmt.Where != nil { + // Only add predicate_pushdown if not already added + alreadyHasPredicate := false + for _, opt := range plan.OptimizationsUsed { + if opt == "predicate_pushdown" { + alreadyHasPredicate = true + break + } + } + if !alreadyHasPredicate { + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "predicate_pushdown") + } + plan.Details["where_clause"] = "present" + } + + // Add LIMIT information + if stmt.Limit != nil { + plan.OptimizationsUsed = append(plan.OptimizationsUsed, "limit_pushdown") + if stmt.Limit.Rowcount != nil { + if limitExpr, ok := stmt.Limit.Rowcount.(*SQLVal); ok && limitExpr.Type == IntVal { + plan.Details["limit"] = string(limitExpr.Val) + } + } + } + } + + // Build execution tree after all plan details are populated + if err == nil && result != nil && plan != nil { + plan.RootNode = e.buildExecutionTree(plan, stmt) + } + + return result, err +} + +// executeSelectStatement handles SELECT queries +// Assumptions: +// 1. Queries run against Parquet files in MQ topics +// 2. Predicate pushdown is used for efficiency +// 3. Cross-topic joins are supported via partition-aware execution +func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *SelectStatement) (*QueryResult, error) { + // Parse FROM clause to get table (topic) information + if len(stmt.From) != 1 { + err := fmt.Errorf("SELECT supports single table queries only") + return &QueryResult{Error: err}, err + } + + // Extract table reference + var database, tableName string + switch table := stmt.From[0].(type) { + case *AliasedTableExpr: + switch tableExpr := table.Expr.(type) { + case TableName: + tableName = tableExpr.Name.String() + if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { + database = tableExpr.Qualifier.String() + } + default: + err := fmt.Errorf("unsupported table expression: %T", tableExpr) + return &QueryResult{Error: err}, err + } + default: + err := fmt.Errorf("unsupported FROM clause: %T", table) + return &QueryResult{Error: err}, err + } + + // Use current database context if not specified + if database == "" { + database = e.catalog.GetCurrentDatabase() + if database == "" { + database = "default" + } + } + + // Auto-discover and register topic if not already in catalog + if _, err := e.catalog.GetTableInfo(database, tableName); err != nil { + // Topic not in catalog, try to discover and register it + if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil { + // Return error immediately for non-existent topics instead of falling back to sample data + return &QueryResult{Error: regErr}, regErr + } + } + + // Create HybridMessageScanner for the topic (reads both live logs + Parquet files) + // Get filerClient from broker connection (works with both real and mock brokers) + var filerClient filer_pb.FilerClient + var filerClientErr error + filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() + if filerClientErr != nil { + // Return error if filer client is not available for topic access + return &QueryResult{Error: filerClientErr}, filerClientErr + } + + hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e) + if err != nil { + // Handle quiet topics gracefully: topics exist but have no active schema/brokers + if IsNoSchemaError(err) { + // Return empty result for quiet topics (normal in production environments) + return &QueryResult{ + Columns: []string{}, + Rows: [][]sqltypes.Value{}, + Database: database, + Table: tableName, + }, nil + } + // Return error for other access issues (truly non-existent topics, etc.) + topicErr := fmt.Errorf("failed to access topic %s.%s: %v", database, tableName, err) + return &QueryResult{Error: topicErr}, topicErr + } + + // Parse SELECT columns and detect aggregation functions + var columns []string + var aggregations []AggregationSpec + selectAll := false + hasAggregations := false + _ = hasAggregations // Used later in aggregation routing + // Track required base columns for arithmetic expressions + baseColumnsSet := make(map[string]bool) + + for _, selectExpr := range stmt.SelectExprs { + switch expr := selectExpr.(type) { + case *StarExpr: + selectAll = true + case *AliasedExpr: + switch col := expr.Expr.(type) { + case *ColName: + colName := col.Name.String() + + // Check if this "column" is actually an arithmetic expression with functions + if arithmeticExpr := e.parseColumnLevelCalculation(colName); arithmeticExpr != nil { + columns = append(columns, e.getArithmeticExpressionAlias(arithmeticExpr)) + e.extractBaseColumns(arithmeticExpr, baseColumnsSet) + } else { + columns = append(columns, colName) + baseColumnsSet[colName] = true + } + case *ArithmeticExpr: + // Handle arithmetic expressions like id+user_id and string concatenation like name||suffix + columns = append(columns, e.getArithmeticExpressionAlias(col)) + // Extract base columns needed for this arithmetic expression + e.extractBaseColumns(col, baseColumnsSet) + case *SQLVal: + // Handle string/numeric literals like 'good', 123, etc. + columns = append(columns, e.getSQLValAlias(col)) + case *FuncExpr: + // Distinguish between aggregation functions and string functions + funcName := strings.ToUpper(col.Name.String()) + if e.isAggregationFunction(funcName) { + // Handle aggregation functions + aggSpec, err := e.parseAggregationFunction(col, expr) + if err != nil { + return &QueryResult{Error: err}, err + } + aggregations = append(aggregations, *aggSpec) + hasAggregations = true + } else if e.isStringFunction(funcName) { + // Handle string functions like UPPER, LENGTH, etc. + columns = append(columns, e.getStringFunctionAlias(col)) + // Extract base columns needed for this string function + e.extractBaseColumnsFromFunction(col, baseColumnsSet) + } else if e.isDateTimeFunction(funcName) { + // Handle datetime functions like CURRENT_DATE, NOW, EXTRACT, DATE_TRUNC + columns = append(columns, e.getDateTimeFunctionAlias(col)) + // Extract base columns needed for this datetime function + e.extractBaseColumnsFromFunction(col, baseColumnsSet) + } else { + return &QueryResult{Error: fmt.Errorf("unsupported function: %s", funcName)}, fmt.Errorf("unsupported function: %s", funcName) + } + default: + err := fmt.Errorf("unsupported SELECT expression: %T", col) + return &QueryResult{Error: err}, err + } + default: + err := fmt.Errorf("unsupported SELECT expression: %T", expr) + return &QueryResult{Error: err}, err + } + } + + // If we have aggregations, use aggregation query path + if hasAggregations { + return e.executeAggregationQuery(ctx, hybridScanner, aggregations, stmt) + } + + // Parse WHERE clause for predicate pushdown + var predicate func(*schema_pb.RecordValue) bool + if stmt.Where != nil { + predicate, err = e.buildPredicateWithContext(stmt.Where.Expr, stmt.SelectExprs) + if err != nil { + return &QueryResult{Error: err}, err + } + } + + // Parse LIMIT and OFFSET clauses + // Use -1 to distinguish "no LIMIT" from "LIMIT 0" + limit := -1 + offset := 0 + if stmt.Limit != nil && stmt.Limit.Rowcount != nil { + switch limitExpr := stmt.Limit.Rowcount.(type) { + case *SQLVal: + if limitExpr.Type == IntVal { + var parseErr error + limit64, parseErr := strconv.ParseInt(string(limitExpr.Val), 10, 64) + if parseErr != nil { + return &QueryResult{Error: parseErr}, parseErr + } + if limit64 > math.MaxInt32 || limit64 < 0 { + return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64) + } + limit = int(limit64) + } + } + } + + // Parse OFFSET clause if present + if stmt.Limit != nil && stmt.Limit.Offset != nil { + switch offsetExpr := stmt.Limit.Offset.(type) { + case *SQLVal: + if offsetExpr.Type == IntVal { + var parseErr error + offset64, parseErr := strconv.ParseInt(string(offsetExpr.Val), 10, 64) + if parseErr != nil { + return &QueryResult{Error: parseErr}, parseErr + } + if offset64 > math.MaxInt32 || offset64 < 0 { + return &QueryResult{Error: fmt.Errorf("OFFSET value %d is out of valid range", offset64)}, fmt.Errorf("OFFSET value %d is out of valid range", offset64) + } + offset = int(offset64) + } + } + } + + // Build hybrid scan options + // Extract time filters from WHERE clause to optimize scanning + startTimeNs, stopTimeNs := int64(0), int64(0) + if stmt.Where != nil { + startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) + } + + hybridScanOptions := HybridScanOptions{ + StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons + StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons + Limit: limit, + Offset: offset, + Predicate: predicate, + } + + if !selectAll { + // Convert baseColumnsSet to slice for hybrid scan options + baseColumns := make([]string, 0, len(baseColumnsSet)) + for columnName := range baseColumnsSet { + baseColumns = append(baseColumns, columnName) + } + // Use base columns (not expression aliases) for data retrieval + if len(baseColumns) > 0 { + hybridScanOptions.Columns = baseColumns + } else { + // If no base columns found (shouldn't happen), use original columns + hybridScanOptions.Columns = columns + } + } + + // Execute the hybrid scan (live logs + Parquet files) + results, err := hybridScanner.Scan(ctx, hybridScanOptions) + if err != nil { + return &QueryResult{Error: err}, err + } + + // Convert to SQL result format + if selectAll { + if len(columns) > 0 { + // SELECT *, specific_columns - include both auto-discovered and explicit columns + return hybridScanner.ConvertToSQLResultWithMixedColumns(results, columns), nil + } else { + // SELECT * only - let converter determine all columns (excludes system columns) + columns = nil + return hybridScanner.ConvertToSQLResult(results, columns), nil + } + } + + // Handle custom column expressions (including arithmetic) + return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil +} + +// executeSelectStatementWithBrokerStats handles SELECT queries with broker buffer statistics capture +// This is used by EXPLAIN queries to capture complete data source information including broker memory +func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { + // Parse FROM clause to get table (topic) information + if len(stmt.From) != 1 { + err := fmt.Errorf("SELECT supports single table queries only") + return &QueryResult{Error: err}, err + } + + // Extract table reference + var database, tableName string + switch table := stmt.From[0].(type) { + case *AliasedTableExpr: + switch tableExpr := table.Expr.(type) { + case TableName: + tableName = tableExpr.Name.String() + if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { + database = tableExpr.Qualifier.String() + } + default: + err := fmt.Errorf("unsupported table expression: %T", tableExpr) + return &QueryResult{Error: err}, err + } + default: + err := fmt.Errorf("unsupported FROM clause: %T", table) + return &QueryResult{Error: err}, err + } + + // Use current database context if not specified + if database == "" { + database = e.catalog.GetCurrentDatabase() + if database == "" { + database = "default" + } + } + + // Auto-discover and register topic if not already in catalog + if _, err := e.catalog.GetTableInfo(database, tableName); err != nil { + // Topic not in catalog, try to discover and register it + if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil { + // Return error immediately for non-existent topics instead of falling back to sample data + return &QueryResult{Error: regErr}, regErr + } + } + + // Create HybridMessageScanner for the topic (reads both live logs + Parquet files) + // Get filerClient from broker connection (works with both real and mock brokers) + var filerClient filer_pb.FilerClient + var filerClientErr error + filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() + if filerClientErr != nil { + // Return error if filer client is not available for topic access + return &QueryResult{Error: filerClientErr}, filerClientErr + } + + hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e) + if err != nil { + // Handle quiet topics gracefully: topics exist but have no active schema/brokers + if IsNoSchemaError(err) { + // Return empty result for quiet topics (normal in production environments) + return &QueryResult{ + Columns: []string{}, + Rows: [][]sqltypes.Value{}, + Database: database, + Table: tableName, + }, nil + } + // Return error for other access issues (truly non-existent topics, etc.) + topicErr := fmt.Errorf("failed to access topic %s.%s: %v", database, tableName, err) + return &QueryResult{Error: topicErr}, topicErr + } + + // Parse SELECT columns and detect aggregation functions + var columns []string + var aggregations []AggregationSpec + selectAll := false + hasAggregations := false + _ = hasAggregations // Used later in aggregation routing + // Track required base columns for arithmetic expressions + baseColumnsSet := make(map[string]bool) + + for _, selectExpr := range stmt.SelectExprs { + switch expr := selectExpr.(type) { + case *StarExpr: + selectAll = true + case *AliasedExpr: + switch col := expr.Expr.(type) { + case *ColName: + colName := col.Name.String() + columns = append(columns, colName) + baseColumnsSet[colName] = true + case *ArithmeticExpr: + // Handle arithmetic expressions like id+user_id and string concatenation like name||suffix + columns = append(columns, e.getArithmeticExpressionAlias(col)) + // Extract base columns needed for this arithmetic expression + e.extractBaseColumns(col, baseColumnsSet) + case *SQLVal: + // Handle string/numeric literals like 'good', 123, etc. + columns = append(columns, e.getSQLValAlias(col)) + case *FuncExpr: + // Distinguish between aggregation functions and string functions + funcName := strings.ToUpper(col.Name.String()) + if e.isAggregationFunction(funcName) { + // Handle aggregation functions + aggSpec, err := e.parseAggregationFunction(col, expr) + if err != nil { + return &QueryResult{Error: err}, err + } + aggregations = append(aggregations, *aggSpec) + hasAggregations = true + } else if e.isStringFunction(funcName) { + // Handle string functions like UPPER, LENGTH, etc. + columns = append(columns, e.getStringFunctionAlias(col)) + // Extract base columns needed for this string function + e.extractBaseColumnsFromFunction(col, baseColumnsSet) + } else if e.isDateTimeFunction(funcName) { + // Handle datetime functions like CURRENT_DATE, NOW, EXTRACT, DATE_TRUNC + columns = append(columns, e.getDateTimeFunctionAlias(col)) + // Extract base columns needed for this datetime function + e.extractBaseColumnsFromFunction(col, baseColumnsSet) + } else { + return &QueryResult{Error: fmt.Errorf("unsupported function: %s", funcName)}, fmt.Errorf("unsupported function: %s", funcName) + } + default: + err := fmt.Errorf("unsupported SELECT expression: %T", col) + return &QueryResult{Error: err}, err + } + default: + err := fmt.Errorf("unsupported SELECT expression: %T", expr) + return &QueryResult{Error: err}, err + } + } + + // If we have aggregations, use aggregation query path + if hasAggregations { + return e.executeAggregationQuery(ctx, hybridScanner, aggregations, stmt) + } + + // Parse WHERE clause for predicate pushdown + var predicate func(*schema_pb.RecordValue) bool + if stmt.Where != nil { + predicate, err = e.buildPredicateWithContext(stmt.Where.Expr, stmt.SelectExprs) + if err != nil { + return &QueryResult{Error: err}, err + } + } + + // Parse LIMIT and OFFSET clauses + // Use -1 to distinguish "no LIMIT" from "LIMIT 0" + limit := -1 + offset := 0 + if stmt.Limit != nil && stmt.Limit.Rowcount != nil { + switch limitExpr := stmt.Limit.Rowcount.(type) { + case *SQLVal: + if limitExpr.Type == IntVal { + var parseErr error + limit64, parseErr := strconv.ParseInt(string(limitExpr.Val), 10, 64) + if parseErr != nil { + return &QueryResult{Error: parseErr}, parseErr + } + if limit64 > math.MaxInt32 || limit64 < 0 { + return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64) + } + limit = int(limit64) + } + } + } + + // Parse OFFSET clause if present + if stmt.Limit != nil && stmt.Limit.Offset != nil { + switch offsetExpr := stmt.Limit.Offset.(type) { + case *SQLVal: + if offsetExpr.Type == IntVal { + var parseErr error + offset64, parseErr := strconv.ParseInt(string(offsetExpr.Val), 10, 64) + if parseErr != nil { + return &QueryResult{Error: parseErr}, parseErr + } + if offset64 > math.MaxInt32 || offset64 < 0 { + return &QueryResult{Error: fmt.Errorf("OFFSET value %d is out of valid range", offset64)}, fmt.Errorf("OFFSET value %d is out of valid range", offset64) + } + offset = int(offset64) + } + } + } + + // Build hybrid scan options + // Extract time filters from WHERE clause to optimize scanning + startTimeNs, stopTimeNs := int64(0), int64(0) + if stmt.Where != nil { + startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) + } + + hybridScanOptions := HybridScanOptions{ + StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons + StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons + Limit: limit, + Offset: offset, + Predicate: predicate, + } + + if !selectAll { + // Convert baseColumnsSet to slice for hybrid scan options + baseColumns := make([]string, 0, len(baseColumnsSet)) + for columnName := range baseColumnsSet { + baseColumns = append(baseColumns, columnName) + } + // Use base columns (not expression aliases) for data retrieval + if len(baseColumns) > 0 { + hybridScanOptions.Columns = baseColumns + } else { + // If no base columns found (shouldn't happen), use original columns + hybridScanOptions.Columns = columns + } + } + + // Execute the hybrid scan with stats capture for EXPLAIN + var results []HybridScanResult + if plan != nil { + // EXPLAIN mode - capture broker buffer stats + var stats *HybridScanStats + results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions) + if err != nil { + return &QueryResult{Error: err}, err + } + + // Populate plan with broker buffer information + if stats != nil { + plan.BrokerBufferQueried = stats.BrokerBufferQueried + plan.BrokerBufferMessages = stats.BrokerBufferMessages + plan.BufferStartIndex = stats.BufferStartIndex + + // Add broker_buffer to data sources if buffer was queried + if stats.BrokerBufferQueried { + // Check if broker_buffer is already in data sources + hasBrokerBuffer := false + for _, source := range plan.DataSources { + if source == "broker_buffer" { + hasBrokerBuffer = true + break + } + } + if !hasBrokerBuffer { + plan.DataSources = append(plan.DataSources, "broker_buffer") + } + } + } + + // Populate execution plan details with source file information for Data Sources Tree + if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil { + // Add partition paths to execution plan details + plan.Details["partition_paths"] = partitions + + // Collect actual file information for each partition + var parquetFiles []string + var liveLogFiles []string + parquetSources := make(map[string]bool) + + for _, partitionPath := range partitions { + // Get parquet files for this partition + if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { + for _, stats := range parquetStats { + parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) + } + } + + // Merge accurate parquet sources from metadata + if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil { + for src := range sources { + parquetSources[src] = true + } + } + + // Get live log files for this partition + if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil { + for _, fileName := range liveFiles { + // Exclude live log files that have been converted to parquet (deduplicated) + if parquetSources[fileName] { + continue + } + liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) + } + } + } + + if len(parquetFiles) > 0 { + plan.Details["parquet_files"] = parquetFiles + } + if len(liveLogFiles) > 0 { + plan.Details["live_log_files"] = liveLogFiles + } + + // Update scan statistics for execution plan display + plan.PartitionsScanned = len(partitions) + plan.ParquetFilesScanned = len(parquetFiles) + plan.LiveLogFilesScanned = len(liveLogFiles) + } + } else { + // Normal mode - just get results + results, err = hybridScanner.Scan(ctx, hybridScanOptions) + if err != nil { + return &QueryResult{Error: err}, err + } + } + + // Convert to SQL result format + if selectAll { + if len(columns) > 0 { + // SELECT *, specific_columns - include both auto-discovered and explicit columns + return hybridScanner.ConvertToSQLResultWithMixedColumns(results, columns), nil + } else { + // SELECT * only - let converter determine all columns (excludes system columns) + columns = nil + return hybridScanner.ConvertToSQLResult(results, columns), nil + } + } + + // Handle custom column expressions (including arithmetic) + return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil +} + +// extractTimeFilters extracts time range filters from WHERE clause for optimization +// This allows push-down of time-based queries to improve scan performance +// Returns (startTimeNs, stopTimeNs) where 0 means unbounded +func (e *SQLEngine) extractTimeFilters(expr ExprNode) (int64, int64) { + startTimeNs, stopTimeNs := int64(0), int64(0) + + // Recursively extract time filters from expression tree + e.extractTimeFiltersRecursive(expr, &startTimeNs, &stopTimeNs) + + // Special case: if startTimeNs == stopTimeNs, treat it like an equality query + // to avoid premature scan termination. The predicate will handle exact matching. + if startTimeNs != 0 && startTimeNs == stopTimeNs { + stopTimeNs = 0 + } + + return startTimeNs, stopTimeNs +} + +// extractTimeFiltersRecursive recursively processes WHERE expressions to find time comparisons +func (e *SQLEngine) extractTimeFiltersRecursive(expr ExprNode, startTimeNs, stopTimeNs *int64) { + switch exprType := expr.(type) { + case *ComparisonExpr: + e.extractTimeFromComparison(exprType, startTimeNs, stopTimeNs) + case *AndExpr: + // For AND expressions, combine time filters (intersection) + e.extractTimeFiltersRecursive(exprType.Left, startTimeNs, stopTimeNs) + e.extractTimeFiltersRecursive(exprType.Right, startTimeNs, stopTimeNs) + case *OrExpr: + // For OR expressions, we can't easily optimize time ranges + // Skip time filter extraction for OR clauses to avoid incorrect results + return + case *ParenExpr: + // Unwrap parentheses and continue + e.extractTimeFiltersRecursive(exprType.Expr, startTimeNs, stopTimeNs) + } +} + +// extractTimeFromComparison extracts time bounds from comparison expressions +// Handles comparisons against timestamp columns (system columns and schema-defined timestamp types) +func (e *SQLEngine) extractTimeFromComparison(comp *ComparisonExpr, startTimeNs, stopTimeNs *int64) { + // Check if this is a time-related column comparison + leftCol := e.getColumnName(comp.Left) + rightCol := e.getColumnName(comp.Right) + + var valueExpr ExprNode + var reversed bool + + // Determine which side is the time column (using schema types) + if e.isTimestampColumn(leftCol) { + valueExpr = comp.Right + reversed = false + } else if e.isTimestampColumn(rightCol) { + valueExpr = comp.Left + reversed = true + } else { + // Not a time comparison + return + } + + // Extract the time value + timeValue := e.extractTimeValue(valueExpr) + if timeValue == 0 { + // Couldn't parse time value + return + } + + // Apply the comparison operator to determine time bounds + operator := comp.Operator + if reversed { + // Reverse the operator if column and value are swapped + operator = e.reverseOperator(operator) + } + + switch operator { + case GreaterThanStr: // timestamp > value + if *startTimeNs == 0 || timeValue > *startTimeNs { + *startTimeNs = timeValue + } + case GreaterEqualStr: // timestamp >= value + if *startTimeNs == 0 || timeValue >= *startTimeNs { + *startTimeNs = timeValue + } + case LessThanStr: // timestamp < value + if *stopTimeNs == 0 || timeValue < *stopTimeNs { + *stopTimeNs = timeValue + } + case LessEqualStr: // timestamp <= value + if *stopTimeNs == 0 || timeValue <= *stopTimeNs { + *stopTimeNs = timeValue + } + case EqualStr: // timestamp = value (point query) + // For exact matches, we set startTimeNs slightly before the target + // This works around a scan boundary bug where >= X starts after X instead of at X + // The predicate function will handle exact matching + *startTimeNs = timeValue - 1 + // Do NOT set stopTimeNs - let the predicate handle exact matching + } +} + +// isTimestampColumn checks if a column is a timestamp using schema type information +func (e *SQLEngine) isTimestampColumn(columnName string) bool { + if columnName == "" { + return false + } + + // System timestamp columns are always time columns + if columnName == SW_COLUMN_NAME_TIMESTAMP { + return true + } + + // For user-defined columns, check actual schema type information + if e.catalog != nil { + currentDB := e.catalog.GetCurrentDatabase() + if currentDB == "" { + currentDB = "default" + } + + // Get current table context from query execution + // Note: This is a limitation - we need table context here + // In a full implementation, this would be passed from the query context + tableInfo, err := e.getCurrentTableInfo(currentDB) + if err == nil && tableInfo != nil { + for _, col := range tableInfo.Columns { + if strings.EqualFold(col.Name, columnName) { + // Use actual SQL type to determine if this is a timestamp + return e.isSQLTypeTimestamp(col.Type) + } + } + } + } + + // Only return true if we have explicit type information + // No guessing based on column names + return false +} + +// isSQLTypeTimestamp checks if a SQL type string represents a timestamp type +func (e *SQLEngine) isSQLTypeTimestamp(sqlType string) bool { + upperType := strings.ToUpper(strings.TrimSpace(sqlType)) + + // Handle type with precision/length specifications + if idx := strings.Index(upperType, "("); idx != -1 { + upperType = upperType[:idx] + } + + switch upperType { + case "TIMESTAMP", "DATETIME": + return true + case "BIGINT": + // BIGINT could be a timestamp if it follows the pattern for timestamp storage + // This is a heuristic - in a better system, we'd have semantic type information + return false // Conservative approach - require explicit TIMESTAMP type + default: + return false + } +} + +// getCurrentTableInfo attempts to get table info for the current query context +// This is a simplified implementation - ideally table context would be passed explicitly +func (e *SQLEngine) getCurrentTableInfo(database string) (*TableInfo, error) { + // This is a limitation of the current architecture + // In practice, we'd need the table context from the current query + // For now, return nil to fallback to naming conventions + // TODO: Enhance architecture to pass table context through query execution + return nil, fmt.Errorf("table context not available in current architecture") +} + +// getColumnName extracts column name from expression (handles ColName types) +func (e *SQLEngine) getColumnName(expr ExprNode) string { + switch exprType := expr.(type) { + case *ColName: + return exprType.Name.String() + } + return "" +} + +// resolveColumnAlias tries to resolve a column name that might be an alias +func (e *SQLEngine) resolveColumnAlias(columnName string, selectExprs []SelectExpr) string { + if selectExprs == nil { + return columnName + } + + // Check if this column name is actually an alias in the SELECT list + for _, selectExpr := range selectExprs { + if aliasedExpr, ok := selectExpr.(*AliasedExpr); ok && aliasedExpr != nil { + // Check if the alias matches our column name + if aliasedExpr.As != nil && !aliasedExpr.As.IsEmpty() && aliasedExpr.As.String() == columnName { + // If the aliased expression is a column, return the actual column name + if colExpr, ok := aliasedExpr.Expr.(*ColName); ok && colExpr != nil { + return colExpr.Name.String() + } + } + } + } + + // If no alias found, return the original column name + return columnName +} + +// extractTimeValue parses time values from SQL expressions +// Supports nanosecond timestamps, ISO dates, and relative times +func (e *SQLEngine) extractTimeValue(expr ExprNode) int64 { + switch exprType := expr.(type) { + case *SQLVal: + switch exprType.Type { + case IntVal: + // Parse as nanosecond timestamp + if val, err := strconv.ParseInt(string(exprType.Val), 10, 64); err == nil { + return val + } + case StrVal: + // Parse as ISO date or other string formats + timeStr := string(exprType.Val) + + // Try parsing as RFC3339 (ISO 8601) + if t, err := time.Parse(time.RFC3339, timeStr); err == nil { + return t.UnixNano() + } + + // Try parsing as RFC3339 with nanoseconds + if t, err := time.Parse(time.RFC3339Nano, timeStr); err == nil { + return t.UnixNano() + } + + // Try parsing as date only (YYYY-MM-DD) + if t, err := time.Parse("2006-01-02", timeStr); err == nil { + return t.UnixNano() + } + + // Try parsing as datetime (YYYY-MM-DD HH:MM:SS) + if t, err := time.Parse("2006-01-02 15:04:05", timeStr); err == nil { + return t.UnixNano() + } + } + } + + return 0 // Couldn't parse +} + +// reverseOperator reverses comparison operators when column and value are swapped +func (e *SQLEngine) reverseOperator(op string) string { + switch op { + case GreaterThanStr: + return LessThanStr + case GreaterEqualStr: + return LessEqualStr + case LessThanStr: + return GreaterThanStr + case LessEqualStr: + return GreaterEqualStr + case EqualStr: + return EqualStr + case NotEqualStr: + return NotEqualStr + default: + return op + } +} + +// buildPredicate creates a predicate function from a WHERE clause expression +// This is a simplified implementation - a full implementation would be much more complex +func (e *SQLEngine) buildPredicate(expr ExprNode) (func(*schema_pb.RecordValue) bool, error) { + return e.buildPredicateWithContext(expr, nil) +} + +// buildPredicateWithContext creates a predicate function with SELECT context for alias resolution +func (e *SQLEngine) buildPredicateWithContext(expr ExprNode, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { + switch exprType := expr.(type) { + case *ComparisonExpr: + return e.buildComparisonPredicateWithContext(exprType, selectExprs) + case *BetweenExpr: + return e.buildBetweenPredicateWithContext(exprType, selectExprs) + case *IsNullExpr: + return e.buildIsNullPredicateWithContext(exprType, selectExprs) + case *IsNotNullExpr: + return e.buildIsNotNullPredicateWithContext(exprType, selectExprs) + case *AndExpr: + leftPred, err := e.buildPredicateWithContext(exprType.Left, selectExprs) + if err != nil { + return nil, err + } + rightPred, err := e.buildPredicateWithContext(exprType.Right, selectExprs) + if err != nil { + return nil, err + } + return func(record *schema_pb.RecordValue) bool { + return leftPred(record) && rightPred(record) + }, nil + case *OrExpr: + leftPred, err := e.buildPredicateWithContext(exprType.Left, selectExprs) + if err != nil { + return nil, err + } + rightPred, err := e.buildPredicateWithContext(exprType.Right, selectExprs) + if err != nil { + return nil, err + } + return func(record *schema_pb.RecordValue) bool { + return leftPred(record) || rightPred(record) + }, nil + default: + return nil, fmt.Errorf("unsupported WHERE expression: %T", expr) + } +} + +// buildComparisonPredicateWithAliases creates a predicate for comparison operations with alias support +func (e *SQLEngine) buildComparisonPredicateWithAliases(expr *ComparisonExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { + var columnName string + var compareValue interface{} + var operator string + + // Extract the comparison details, resolving aliases if needed + leftCol := e.getColumnNameWithAliases(expr.Left, aliases) + rightCol := e.getColumnNameWithAliases(expr.Right, aliases) + operator = e.normalizeOperator(expr.Operator) + + if leftCol != "" && rightCol == "" { + // Left side is column, right side is value + columnName = e.getSystemColumnInternalName(leftCol) + val, err := e.extractValueFromExpr(expr.Right) + if err != nil { + return nil, err + } + compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Right) + } else if rightCol != "" && leftCol == "" { + // Right side is column, left side is value + columnName = e.getSystemColumnInternalName(rightCol) + val, err := e.extractValueFromExpr(expr.Left) + if err != nil { + return nil, err + } + compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Left) + // Reverse the operator when column is on the right + operator = e.reverseOperator(operator) + } else if leftCol != "" && rightCol != "" { + return nil, fmt.Errorf("column-to-column comparisons not yet supported") + } else { + return nil, fmt.Errorf("at least one side of comparison must be a column") + } + + return func(record *schema_pb.RecordValue) bool { + fieldValue, exists := record.Fields[columnName] + if !exists { + return false + } + return e.evaluateComparison(fieldValue, operator, compareValue) + }, nil +} + +// buildComparisonPredicate creates a predicate for comparison operations (=, <, >, etc.) +// Handles column names on both left and right sides of the comparison +func (e *SQLEngine) buildComparisonPredicate(expr *ComparisonExpr) (func(*schema_pb.RecordValue) bool, error) { + return e.buildComparisonPredicateWithContext(expr, nil) +} + +// buildComparisonPredicateWithContext creates a predicate for comparison operations with alias support +func (e *SQLEngine) buildComparisonPredicateWithContext(expr *ComparisonExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { + var columnName string + var compareValue interface{} + var operator string + + // Check if column is on the left side (normal case: column > value) + if colName, ok := expr.Left.(*ColName); ok { + rawColumnName := colName.Name.String() + // Resolve potential alias to actual column name + columnName = e.resolveColumnAlias(rawColumnName, selectExprs) + // Map display names to internal names for system columns + columnName = e.getSystemColumnInternalName(columnName) + operator = expr.Operator + + // Extract comparison value from right side + val, err := e.extractComparisonValue(expr.Right) + if err != nil { + return nil, fmt.Errorf("failed to extract right-side value: %v", err) + } + compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Right) + + } else if colName, ok := expr.Right.(*ColName); ok { + // Column is on the right side (reversed case: value < column) + rawColumnName := colName.Name.String() + // Resolve potential alias to actual column name + columnName = e.resolveColumnAlias(rawColumnName, selectExprs) + // Map display names to internal names for system columns + columnName = e.getSystemColumnInternalName(columnName) + + // Reverse the operator when column is on right side + operator = e.reverseOperator(expr.Operator) + + // Extract comparison value from left side + val, err := e.extractComparisonValue(expr.Left) + if err != nil { + return nil, fmt.Errorf("failed to extract left-side value: %v", err) + } + compareValue = e.convertValueForTimestampColumn(columnName, val, expr.Left) + + } else { + // Handle literal-only comparisons like 1 = 0, 'a' = 'b', etc. + leftVal, leftErr := e.extractComparisonValue(expr.Left) + rightVal, rightErr := e.extractComparisonValue(expr.Right) + + if leftErr != nil || rightErr != nil { + return nil, fmt.Errorf("no column name found in comparison expression, left: %T, right: %T", expr.Left, expr.Right) + } + + // Evaluate the literal comparison once + result := e.compareLiteralValues(leftVal, rightVal, expr.Operator) + + // Return a constant predicate + return func(record *schema_pb.RecordValue) bool { + return result + }, nil + } + + // Return the predicate function + return func(record *schema_pb.RecordValue) bool { + fieldValue, exists := record.Fields[columnName] + if !exists { + return false // Column doesn't exist in record + } + + // Use the comparison evaluation function + return e.evaluateComparison(fieldValue, operator, compareValue) + }, nil +} + +// buildBetweenPredicateWithContext creates a predicate for BETWEEN operations +func (e *SQLEngine) buildBetweenPredicateWithContext(expr *BetweenExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { + var columnName string + var fromValue, toValue interface{} + + // Check if left side is a column name + if colName, ok := expr.Left.(*ColName); ok { + rawColumnName := colName.Name.String() + // Resolve potential alias to actual column name + columnName = e.resolveColumnAlias(rawColumnName, selectExprs) + // Map display names to internal names for system columns + columnName = e.getSystemColumnInternalName(columnName) + + // Extract FROM value + fromVal, err := e.extractComparisonValue(expr.From) + if err != nil { + return nil, fmt.Errorf("failed to extract BETWEEN from value: %v", err) + } + fromValue = e.convertValueForTimestampColumn(columnName, fromVal, expr.From) + + // Extract TO value + toVal, err := e.extractComparisonValue(expr.To) + if err != nil { + return nil, fmt.Errorf("failed to extract BETWEEN to value: %v", err) + } + toValue = e.convertValueForTimestampColumn(columnName, toVal, expr.To) + } else { + return nil, fmt.Errorf("BETWEEN left operand must be a column name, got: %T", expr.Left) + } + + // Return the predicate function + return func(record *schema_pb.RecordValue) bool { + fieldValue, exists := record.Fields[columnName] + if !exists { + return false + } + + // Evaluate: fieldValue >= fromValue AND fieldValue <= toValue + greaterThanOrEqualFrom := e.evaluateComparison(fieldValue, ">=", fromValue) + lessThanOrEqualTo := e.evaluateComparison(fieldValue, "<=", toValue) + + result := greaterThanOrEqualFrom && lessThanOrEqualTo + + // Handle NOT BETWEEN + if expr.Not { + result = !result + } + + return result + }, nil +} + +// buildBetweenPredicateWithAliases creates a predicate for BETWEEN operations with alias support +func (e *SQLEngine) buildBetweenPredicateWithAliases(expr *BetweenExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { + var columnName string + var fromValue, toValue interface{} + + // Extract column name from left side with alias resolution + leftCol := e.getColumnNameWithAliases(expr.Left, aliases) + if leftCol == "" { + return nil, fmt.Errorf("BETWEEN left operand must be a column name, got: %T", expr.Left) + } + columnName = e.getSystemColumnInternalName(leftCol) + + // Extract FROM value + fromVal, err := e.extractValueFromExpr(expr.From) + if err != nil { + return nil, fmt.Errorf("failed to extract BETWEEN from value: %v", err) + } + fromValue = e.convertValueForTimestampColumn(columnName, fromVal, expr.From) + + // Extract TO value + toVal, err := e.extractValueFromExpr(expr.To) + if err != nil { + return nil, fmt.Errorf("failed to extract BETWEEN to value: %v", err) + } + toValue = e.convertValueForTimestampColumn(columnName, toVal, expr.To) + + // Return the predicate function + return func(record *schema_pb.RecordValue) bool { + fieldValue, exists := record.Fields[columnName] + if !exists { + return false + } + + // Evaluate: fieldValue >= fromValue AND fieldValue <= toValue + greaterThanOrEqualFrom := e.evaluateComparison(fieldValue, ">=", fromValue) + lessThanOrEqualTo := e.evaluateComparison(fieldValue, "<=", toValue) + + result := greaterThanOrEqualFrom && lessThanOrEqualTo + + // Handle NOT BETWEEN + if expr.Not { + result = !result + } + + return result + }, nil +} + +// buildIsNullPredicateWithContext creates a predicate for IS NULL operations +func (e *SQLEngine) buildIsNullPredicateWithContext(expr *IsNullExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { + // Check if the expression is a column name + if colName, ok := expr.Expr.(*ColName); ok { + rawColumnName := colName.Name.String() + // Resolve potential alias to actual column name + columnName := e.resolveColumnAlias(rawColumnName, selectExprs) + // Map display names to internal names for system columns + columnName = e.getSystemColumnInternalName(columnName) + + // Return the predicate function + return func(record *schema_pb.RecordValue) bool { + // Check if field exists and if it's null or missing + fieldValue, exists := record.Fields[columnName] + if !exists { + return true // Field doesn't exist = NULL + } + + // Check if the field value itself is null/empty + return e.isValueNull(fieldValue) + }, nil + } else { + return nil, fmt.Errorf("IS NULL left operand must be a column name, got: %T", expr.Expr) + } +} + +// buildIsNotNullPredicateWithContext creates a predicate for IS NOT NULL operations +func (e *SQLEngine) buildIsNotNullPredicateWithContext(expr *IsNotNullExpr, selectExprs []SelectExpr) (func(*schema_pb.RecordValue) bool, error) { + // Check if the expression is a column name + if colName, ok := expr.Expr.(*ColName); ok { + rawColumnName := colName.Name.String() + // Resolve potential alias to actual column name + columnName := e.resolveColumnAlias(rawColumnName, selectExprs) + // Map display names to internal names for system columns + columnName = e.getSystemColumnInternalName(columnName) + + // Return the predicate function + return func(record *schema_pb.RecordValue) bool { + // Check if field exists and if it's not null + fieldValue, exists := record.Fields[columnName] + if !exists { + return false // Field doesn't exist = NULL, so NOT NULL is false + } + + // Check if the field value itself is not null/empty + return !e.isValueNull(fieldValue) + }, nil + } else { + return nil, fmt.Errorf("IS NOT NULL left operand must be a column name, got: %T", expr.Expr) + } +} + +// buildIsNullPredicateWithAliases creates a predicate for IS NULL operations with alias support +func (e *SQLEngine) buildIsNullPredicateWithAliases(expr *IsNullExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { + // Extract column name from expression with alias resolution + columnName := e.getColumnNameWithAliases(expr.Expr, aliases) + if columnName == "" { + return nil, fmt.Errorf("IS NULL operand must be a column name, got: %T", expr.Expr) + } + columnName = e.getSystemColumnInternalName(columnName) + + // Return the predicate function + return func(record *schema_pb.RecordValue) bool { + // Check if field exists and if it's null or missing + fieldValue, exists := record.Fields[columnName] + if !exists { + return true // Field doesn't exist = NULL + } + + // Check if the field value itself is null/empty + return e.isValueNull(fieldValue) + }, nil +} + +// buildIsNotNullPredicateWithAliases creates a predicate for IS NOT NULL operations with alias support +func (e *SQLEngine) buildIsNotNullPredicateWithAliases(expr *IsNotNullExpr, aliases map[string]ExprNode) (func(*schema_pb.RecordValue) bool, error) { + // Extract column name from expression with alias resolution + columnName := e.getColumnNameWithAliases(expr.Expr, aliases) + if columnName == "" { + return nil, fmt.Errorf("IS NOT NULL operand must be a column name, got: %T", expr.Expr) + } + columnName = e.getSystemColumnInternalName(columnName) + + // Return the predicate function + return func(record *schema_pb.RecordValue) bool { + // Check if field exists and if it's not null + fieldValue, exists := record.Fields[columnName] + if !exists { + return false // Field doesn't exist = NULL, so NOT NULL is false + } + + // Check if the field value itself is not null/empty + return !e.isValueNull(fieldValue) + }, nil +} + +// isValueNull checks if a schema_pb.Value is null or represents a null value +func (e *SQLEngine) isValueNull(value *schema_pb.Value) bool { + if value == nil { + return true + } + + // Check the Kind field to see if it represents a null value + if value.Kind == nil { + return true + } + + // For different value types, check if they represent null/empty values + switch kind := value.Kind.(type) { + case *schema_pb.Value_StringValue: + // Empty string could be considered null depending on semantics + // For now, treat empty string as not null (SQL standard behavior) + return false + case *schema_pb.Value_BoolValue: + return false // Boolean values are never null + case *schema_pb.Value_Int32Value, *schema_pb.Value_Int64Value: + return false // Integer values are never null + case *schema_pb.Value_FloatValue, *schema_pb.Value_DoubleValue: + return false // Numeric values are never null + case *schema_pb.Value_BytesValue: + // Bytes could be null if empty, but for now treat as not null + return false + case *schema_pb.Value_TimestampValue: + // Check if timestamp is zero/uninitialized + return kind.TimestampValue == nil + case *schema_pb.Value_DateValue: + return kind.DateValue == nil + case *schema_pb.Value_TimeValue: + return kind.TimeValue == nil + default: + // Unknown type, consider it null to be safe + return true + } +} + +// getColumnNameWithAliases extracts column name from expression, resolving aliases if needed +func (e *SQLEngine) getColumnNameWithAliases(expr ExprNode, aliases map[string]ExprNode) string { + switch exprType := expr.(type) { + case *ColName: + colName := exprType.Name.String() + // Check if this is an alias that should be resolved + if aliases != nil { + if actualExpr, exists := aliases[colName]; exists { + // Recursively resolve the aliased expression + return e.getColumnNameWithAliases(actualExpr, nil) // Don't recurse aliases + } + } + return colName + } + return "" +} + +// extractValueFromExpr extracts a value from an expression node (for alias support) +func (e *SQLEngine) extractValueFromExpr(expr ExprNode) (interface{}, error) { + return e.extractComparisonValue(expr) +} + +// normalizeOperator normalizes comparison operators +func (e *SQLEngine) normalizeOperator(op string) string { + return op // For now, just return as-is +} + +// extractComparisonValue extracts the comparison value from a SQL expression +func (e *SQLEngine) extractComparisonValue(expr ExprNode) (interface{}, error) { + switch val := expr.(type) { + case *SQLVal: + switch val.Type { + case IntVal: + intVal, err := strconv.ParseInt(string(val.Val), 10, 64) + if err != nil { + return nil, err + } + return intVal, nil + case StrVal: + return string(val.Val), nil + case FloatVal: + floatVal, err := strconv.ParseFloat(string(val.Val), 64) + if err != nil { + return nil, err + } + return floatVal, nil + default: + return nil, fmt.Errorf("unsupported SQL value type: %v", val.Type) + } + case *ArithmeticExpr: + // Handle arithmetic expressions like CURRENT_TIMESTAMP - INTERVAL '1 hour' + return e.evaluateArithmeticExpressionForComparison(val) + case *FuncExpr: + // Handle function calls like NOW(), CURRENT_TIMESTAMP + return e.evaluateFunctionExpressionForComparison(val) + case *IntervalExpr: + // Handle standalone INTERVAL expressions + nanos, err := e.evaluateInterval(val.Value) + if err != nil { + return nil, err + } + return nanos, nil + case ValTuple: + // Handle IN expressions with multiple values: column IN (value1, value2, value3) + var inValues []interface{} + for _, tupleVal := range val { + switch v := tupleVal.(type) { + case *SQLVal: + switch v.Type { + case IntVal: + intVal, err := strconv.ParseInt(string(v.Val), 10, 64) + if err != nil { + return nil, err + } + inValues = append(inValues, intVal) + case StrVal: + inValues = append(inValues, string(v.Val)) + case FloatVal: + floatVal, err := strconv.ParseFloat(string(v.Val), 64) + if err != nil { + return nil, err + } + inValues = append(inValues, floatVal) + } + } + } + return inValues, nil + default: + return nil, fmt.Errorf("unsupported comparison value type: %T", expr) + } +} + +// evaluateArithmeticExpressionForComparison evaluates an arithmetic expression for WHERE clause comparisons +func (e *SQLEngine) evaluateArithmeticExpressionForComparison(expr *ArithmeticExpr) (interface{}, error) { + // Check if this is timestamp arithmetic with intervals + if e.isTimestampArithmetic(expr.Left, expr.Right) && (expr.Operator == "+" || expr.Operator == "-") { + // Evaluate timestamp arithmetic and return the result as nanoseconds + result, err := e.evaluateTimestampArithmetic(expr.Left, expr.Right, expr.Operator) + if err != nil { + return nil, err + } + + // Extract the timestamp value as nanoseconds for comparison + if result.Kind != nil { + switch resultKind := result.Kind.(type) { + case *schema_pb.Value_Int64Value: + return resultKind.Int64Value, nil + case *schema_pb.Value_StringValue: + // If it's a formatted timestamp string, parse it back to nanoseconds + if timestamp, err := time.Parse("2006-01-02T15:04:05.000000000Z", resultKind.StringValue); err == nil { + return timestamp.UnixNano(), nil + } + return nil, fmt.Errorf("could not parse timestamp string: %s", resultKind.StringValue) + } + } + return nil, fmt.Errorf("invalid timestamp arithmetic result") + } + + // For other arithmetic operations, we'd need to evaluate them differently + // For now, return an error for unsupported arithmetic + return nil, fmt.Errorf("unsupported arithmetic expression in WHERE clause: %s", expr.Operator) +} + +// evaluateFunctionExpressionForComparison evaluates a function expression for WHERE clause comparisons +func (e *SQLEngine) evaluateFunctionExpressionForComparison(expr *FuncExpr) (interface{}, error) { + funcName := strings.ToUpper(expr.Name.String()) + + switch funcName { + case "NOW", "CURRENT_TIMESTAMP": + result, err := e.Now() + if err != nil { + return nil, err + } + // Return as nanoseconds for comparison + if result.Kind != nil { + if resultKind, ok := result.Kind.(*schema_pb.Value_TimestampValue); ok { + // Convert microseconds to nanoseconds + return resultKind.TimestampValue.TimestampMicros * 1000, nil + } + } + return nil, fmt.Errorf("invalid NOW() result: expected TimestampValue, got %T", result.Kind) + + case "CURRENT_DATE": + result, err := e.CurrentDate() + if err != nil { + return nil, err + } + // Convert date to nanoseconds (start of day) + if result.Kind != nil { + if resultKind, ok := result.Kind.(*schema_pb.Value_StringValue); ok { + if date, err := time.Parse("2006-01-02", resultKind.StringValue); err == nil { + return date.UnixNano(), nil + } + } + } + return nil, fmt.Errorf("invalid CURRENT_DATE result") + + case "CURRENT_TIME": + result, err := e.CurrentTime() + if err != nil { + return nil, err + } + // For time comparison, we might need special handling + // For now, just return the string value + if result.Kind != nil { + if resultKind, ok := result.Kind.(*schema_pb.Value_StringValue); ok { + return resultKind.StringValue, nil + } + } + return nil, fmt.Errorf("invalid CURRENT_TIME result") + + default: + return nil, fmt.Errorf("unsupported function in WHERE clause: %s", funcName) + } +} + +// evaluateComparison performs the actual comparison +func (e *SQLEngine) evaluateComparison(fieldValue *schema_pb.Value, operator string, compareValue interface{}) bool { + // This is a simplified implementation + // A full implementation would handle type coercion and all comparison operators + + switch operator { + case "=": + return e.valuesEqual(fieldValue, compareValue) + case "<": + return e.valueLessThan(fieldValue, compareValue) + case ">": + return e.valueGreaterThan(fieldValue, compareValue) + case "<=": + return e.valuesEqual(fieldValue, compareValue) || e.valueLessThan(fieldValue, compareValue) + case ">=": + return e.valuesEqual(fieldValue, compareValue) || e.valueGreaterThan(fieldValue, compareValue) + case "!=", "<>": + return !e.valuesEqual(fieldValue, compareValue) + case "LIKE", "like": + return e.valueLike(fieldValue, compareValue) + case "IN", "in": + return e.valueIn(fieldValue, compareValue) + default: + return false + } +} + +// Helper functions for value comparison with proper type coercion +func (e *SQLEngine) valuesEqual(fieldValue *schema_pb.Value, compareValue interface{}) bool { + // Handle string comparisons first + if strField, ok := fieldValue.Kind.(*schema_pb.Value_StringValue); ok { + if strVal, ok := compareValue.(string); ok { + return strField.StringValue == strVal + } + return false + } + + // Handle boolean comparisons + if boolField, ok := fieldValue.Kind.(*schema_pb.Value_BoolValue); ok { + if boolVal, ok := compareValue.(bool); ok { + return boolField.BoolValue == boolVal + } + return false + } + + // Handle logical type comparisons + if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok { + if timestampVal, ok := compareValue.(int64); ok { + return timestampField.TimestampValue.TimestampMicros == timestampVal + } + return false + } + + if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok { + if dateVal, ok := compareValue.(int32); ok { + return dateField.DateValue.DaysSinceEpoch == dateVal + } + return false + } + + // Handle DecimalValue comparison (convert to string for comparison) + if decimalField, ok := fieldValue.Kind.(*schema_pb.Value_DecimalValue); ok { + if decimalStr, ok := compareValue.(string); ok { + // Convert decimal bytes back to string for comparison + decimalValue := e.decimalToString(decimalField.DecimalValue) + return decimalValue == decimalStr + } + return false + } + + if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok { + if timeVal, ok := compareValue.(int64); ok { + return timeField.TimeValue.TimeMicros == timeVal + } + return false + } + + // Handle direct int64 comparisons for timestamp precision (before float64 conversion) + if int64Field, ok := fieldValue.Kind.(*schema_pb.Value_Int64Value); ok { + if int64Val, ok := compareValue.(int64); ok { + return int64Field.Int64Value == int64Val + } + if intVal, ok := compareValue.(int); ok { + return int64Field.Int64Value == int64(intVal) + } + } + + // Handle direct int32 comparisons + if int32Field, ok := fieldValue.Kind.(*schema_pb.Value_Int32Value); ok { + if int32Val, ok := compareValue.(int32); ok { + return int32Field.Int32Value == int32Val + } + if intVal, ok := compareValue.(int); ok { + return int32Field.Int32Value == int32(intVal) + } + if int64Val, ok := compareValue.(int64); ok && int64Val >= math.MinInt32 && int64Val <= math.MaxInt32 { + return int32Field.Int32Value == int32(int64Val) + } + } + + // Handle numeric comparisons with type coercion (fallback for other numeric types) + fieldNum := e.convertToNumber(fieldValue) + compareNum := e.convertCompareValueToNumber(compareValue) + + if fieldNum != nil && compareNum != nil { + return *fieldNum == *compareNum + } + + return false +} + +// convertCompareValueToNumber converts compare values from SQL queries to float64 +func (e *SQLEngine) convertCompareValueToNumber(compareValue interface{}) *float64 { + switch v := compareValue.(type) { + case int: + result := float64(v) + return &result + case int32: + result := float64(v) + return &result + case int64: + result := float64(v) + return &result + case float32: + result := float64(v) + return &result + case float64: + return &v + case string: + // Try to parse string as number for flexible comparisons + if parsed, err := strconv.ParseFloat(v, 64); err == nil { + return &parsed + } + } + return nil +} + +// decimalToString converts a DecimalValue back to string representation +func (e *SQLEngine) decimalToString(decimalValue *schema_pb.DecimalValue) string { + if decimalValue == nil || decimalValue.Value == nil { + return "0" + } + + // Convert bytes back to big.Int + intValue := new(big.Int).SetBytes(decimalValue.Value) + + // Convert to string with proper decimal placement + str := intValue.String() + + // Handle decimal placement based on scale + scale := int(decimalValue.Scale) + if scale > 0 && len(str) > scale { + // Insert decimal point + decimalPos := len(str) - scale + return str[:decimalPos] + "." + str[decimalPos:] + } + + return str +} + +func (e *SQLEngine) valueLessThan(fieldValue *schema_pb.Value, compareValue interface{}) bool { + // Handle string comparisons lexicographically + if strField, ok := fieldValue.Kind.(*schema_pb.Value_StringValue); ok { + if strVal, ok := compareValue.(string); ok { + return strField.StringValue < strVal + } + return false + } + + // Handle logical type comparisons + if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok { + if timestampVal, ok := compareValue.(int64); ok { + return timestampField.TimestampValue.TimestampMicros < timestampVal + } + return false + } + + if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok { + if dateVal, ok := compareValue.(int32); ok { + return dateField.DateValue.DaysSinceEpoch < dateVal + } + return false + } + + if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok { + if timeVal, ok := compareValue.(int64); ok { + return timeField.TimeValue.TimeMicros < timeVal + } + return false + } + + // Handle direct int64 comparisons for timestamp precision (before float64 conversion) + if int64Field, ok := fieldValue.Kind.(*schema_pb.Value_Int64Value); ok { + if int64Val, ok := compareValue.(int64); ok { + return int64Field.Int64Value < int64Val + } + if intVal, ok := compareValue.(int); ok { + return int64Field.Int64Value < int64(intVal) + } + } + + // Handle direct int32 comparisons + if int32Field, ok := fieldValue.Kind.(*schema_pb.Value_Int32Value); ok { + if int32Val, ok := compareValue.(int32); ok { + return int32Field.Int32Value < int32Val + } + if intVal, ok := compareValue.(int); ok { + return int32Field.Int32Value < int32(intVal) + } + if int64Val, ok := compareValue.(int64); ok && int64Val >= math.MinInt32 && int64Val <= math.MaxInt32 { + return int32Field.Int32Value < int32(int64Val) + } + } + + // Handle numeric comparisons with type coercion (fallback for other numeric types) + fieldNum := e.convertToNumber(fieldValue) + compareNum := e.convertCompareValueToNumber(compareValue) + + if fieldNum != nil && compareNum != nil { + return *fieldNum < *compareNum + } + + return false +} + +func (e *SQLEngine) valueGreaterThan(fieldValue *schema_pb.Value, compareValue interface{}) bool { + // Handle string comparisons lexicographically + if strField, ok := fieldValue.Kind.(*schema_pb.Value_StringValue); ok { + if strVal, ok := compareValue.(string); ok { + return strField.StringValue > strVal + } + return false + } + + // Handle logical type comparisons + if timestampField, ok := fieldValue.Kind.(*schema_pb.Value_TimestampValue); ok { + if timestampVal, ok := compareValue.(int64); ok { + return timestampField.TimestampValue.TimestampMicros > timestampVal + } + return false + } + + if dateField, ok := fieldValue.Kind.(*schema_pb.Value_DateValue); ok { + if dateVal, ok := compareValue.(int32); ok { + return dateField.DateValue.DaysSinceEpoch > dateVal + } + return false + } + + if timeField, ok := fieldValue.Kind.(*schema_pb.Value_TimeValue); ok { + if timeVal, ok := compareValue.(int64); ok { + return timeField.TimeValue.TimeMicros > timeVal + } + return false + } + + // Handle direct int64 comparisons for timestamp precision (before float64 conversion) + if int64Field, ok := fieldValue.Kind.(*schema_pb.Value_Int64Value); ok { + if int64Val, ok := compareValue.(int64); ok { + return int64Field.Int64Value > int64Val + } + if intVal, ok := compareValue.(int); ok { + return int64Field.Int64Value > int64(intVal) + } + } + + // Handle direct int32 comparisons + if int32Field, ok := fieldValue.Kind.(*schema_pb.Value_Int32Value); ok { + if int32Val, ok := compareValue.(int32); ok { + return int32Field.Int32Value > int32Val + } + if intVal, ok := compareValue.(int); ok { + return int32Field.Int32Value > int32(intVal) + } + if int64Val, ok := compareValue.(int64); ok && int64Val >= math.MinInt32 && int64Val <= math.MaxInt32 { + return int32Field.Int32Value > int32(int64Val) + } + } + + // Handle numeric comparisons with type coercion (fallback for other numeric types) + fieldNum := e.convertToNumber(fieldValue) + compareNum := e.convertCompareValueToNumber(compareValue) + + if fieldNum != nil && compareNum != nil { + return *fieldNum > *compareNum + } + + return false +} + +// valueLike implements SQL LIKE pattern matching with % and _ wildcards +func (e *SQLEngine) valueLike(fieldValue *schema_pb.Value, compareValue interface{}) bool { + // Only support LIKE for string values + stringVal, ok := fieldValue.Kind.(*schema_pb.Value_StringValue) + if !ok { + return false + } + + pattern, ok := compareValue.(string) + if !ok { + return false + } + + // Convert SQL LIKE pattern to Go regex pattern + // % matches any sequence of characters (.*), _ matches single character (.) + regexPattern := strings.ReplaceAll(pattern, "%", ".*") + regexPattern = strings.ReplaceAll(regexPattern, "_", ".") + regexPattern = "^" + regexPattern + "$" // Anchor to match entire string + + // Compile and match regex + regex, err := regexp.Compile(regexPattern) + if err != nil { + return false // Invalid pattern + } + + return regex.MatchString(stringVal.StringValue) +} + +// valueIn implements SQL IN operator for checking if value exists in a list +func (e *SQLEngine) valueIn(fieldValue *schema_pb.Value, compareValue interface{}) bool { + // For now, handle simple case where compareValue is a slice of values + // In a full implementation, this would handle SQL IN expressions properly + values, ok := compareValue.([]interface{}) + if !ok { + return false + } + + // Check if fieldValue matches any value in the list + for _, value := range values { + if e.valuesEqual(fieldValue, value) { + return true + } + } + + return false +} + +// Helper methods for specific operations + +func (e *SQLEngine) showDatabases(ctx context.Context) (*QueryResult, error) { + databases := e.catalog.ListDatabases() + + result := &QueryResult{ + Columns: []string{"Database"}, + Rows: make([][]sqltypes.Value, len(databases)), + } + + for i, db := range databases { + result.Rows[i] = []sqltypes.Value{ + sqltypes.NewVarChar(db), + } + } + + return result, nil +} + +func (e *SQLEngine) showTables(ctx context.Context, dbName string) (*QueryResult, error) { + // Use current database context if no database specified + if dbName == "" { + dbName = e.catalog.GetCurrentDatabase() + if dbName == "" { + dbName = "default" + } + } + + tables, err := e.catalog.ListTables(dbName) + if err != nil { + return &QueryResult{Error: err}, err + } + + result := &QueryResult{ + Columns: []string{"Tables_in_" + dbName}, + Rows: make([][]sqltypes.Value, len(tables)), + } + + for i, table := range tables { + result.Rows[i] = []sqltypes.Value{ + sqltypes.NewVarChar(table), + } + } + + return result, nil +} + +// compareLiteralValues compares two literal values with the given operator +func (e *SQLEngine) compareLiteralValues(left, right interface{}, operator string) bool { + switch operator { + case "=", "==": + return e.literalValuesEqual(left, right) + case "!=", "<>": + return !e.literalValuesEqual(left, right) + case "<": + return e.compareLiteralNumber(left, right) < 0 + case "<=": + return e.compareLiteralNumber(left, right) <= 0 + case ">": + return e.compareLiteralNumber(left, right) > 0 + case ">=": + return e.compareLiteralNumber(left, right) >= 0 + default: + // For unsupported operators, default to false + return false + } +} + +// literalValuesEqual checks if two literal values are equal +func (e *SQLEngine) literalValuesEqual(left, right interface{}) bool { + // Convert both to strings for comparison + leftStr := fmt.Sprintf("%v", left) + rightStr := fmt.Sprintf("%v", right) + return leftStr == rightStr +} + +// compareLiteralNumber compares two values as numbers +func (e *SQLEngine) compareLiteralNumber(left, right interface{}) int { + leftNum, leftOk := e.convertToFloat64(left) + rightNum, rightOk := e.convertToFloat64(right) + + if !leftOk || !rightOk { + // Fall back to string comparison if not numeric + leftStr := fmt.Sprintf("%v", left) + rightStr := fmt.Sprintf("%v", right) + if leftStr < rightStr { + return -1 + } else if leftStr > rightStr { + return 1 + } else { + return 0 + } + } + + if leftNum < rightNum { + return -1 + } else if leftNum > rightNum { + return 1 + } else { + return 0 + } +} + +// convertToFloat64 attempts to convert a value to float64 +func (e *SQLEngine) convertToFloat64(value interface{}) (float64, bool) { + switch v := value.(type) { + case int64: + return float64(v), true + case int32: + return float64(v), true + case int: + return float64(v), true + case float64: + return v, true + case float32: + return float64(v), true + case string: + if num, err := strconv.ParseFloat(v, 64); err == nil { + return num, true + } + return 0, false + default: + return 0, false + } +} + +func (e *SQLEngine) createTable(ctx context.Context, stmt *DDLStatement) (*QueryResult, error) { + // Parse CREATE TABLE statement + // Assumption: Table name format is [database.]table_name + tableName := stmt.NewName.Name.String() + database := "" + + // Check if database is specified in table name + if stmt.NewName.Qualifier.String() != "" { + database = stmt.NewName.Qualifier.String() + } else { + // Use current database context or default + database = e.catalog.GetCurrentDatabase() + if database == "" { + database = "default" + } + } + + // Parse column definitions from CREATE TABLE + // Assumption: stmt.TableSpec contains column definitions + if stmt.TableSpec == nil || len(stmt.TableSpec.Columns) == 0 { + err := fmt.Errorf("CREATE TABLE requires column definitions") + return &QueryResult{Error: err}, err + } + + // Convert SQL columns to MQ schema fields + fields := make([]*schema_pb.Field, len(stmt.TableSpec.Columns)) + for i, col := range stmt.TableSpec.Columns { + fieldType, err := e.convertSQLTypeToMQ(col.Type) + if err != nil { + return &QueryResult{Error: err}, err + } + + fields[i] = &schema_pb.Field{ + Name: col.Name.String(), + Type: fieldType, + } + } + + // Create record type for the topic + recordType := &schema_pb.RecordType{ + Fields: fields, + } + + // Create the topic via broker using configurable partition count + partitionCount := e.catalog.GetDefaultPartitionCount() + err := e.catalog.brokerClient.ConfigureTopic(ctx, database, tableName, partitionCount, recordType) + if err != nil { + return &QueryResult{Error: err}, err + } + + // Register the new topic in catalog + mqSchema := &schema.Schema{ + Namespace: database, + Name: tableName, + RecordType: recordType, + RevisionId: 1, // Initial revision + } + + err = e.catalog.RegisterTopic(database, tableName, mqSchema) + if err != nil { + return &QueryResult{Error: err}, err + } + + // Return success result + result := &QueryResult{ + Columns: []string{"Result"}, + Rows: [][]sqltypes.Value{ + {sqltypes.NewVarChar(fmt.Sprintf("Table '%s.%s' created successfully", database, tableName))}, + }, + } + + return result, nil +} + +// ExecutionPlanBuilder handles building execution plans for queries +type ExecutionPlanBuilder struct { + engine *SQLEngine +} + +// NewExecutionPlanBuilder creates a new execution plan builder +func NewExecutionPlanBuilder(engine *SQLEngine) *ExecutionPlanBuilder { + return &ExecutionPlanBuilder{engine: engine} +} + +// BuildAggregationPlan builds an execution plan for aggregation queries +func (builder *ExecutionPlanBuilder) BuildAggregationPlan( + stmt *SelectStatement, + aggregations []AggregationSpec, + strategy AggregationStrategy, + dataSources *TopicDataSources, +) *QueryExecutionPlan { + + plan := &QueryExecutionPlan{ + QueryType: "SELECT", + ExecutionStrategy: builder.determineExecutionStrategy(stmt, strategy), + DataSources: builder.buildDataSourcesList(strategy, dataSources), + PartitionsScanned: dataSources.PartitionsCount, + ParquetFilesScanned: builder.countParquetFiles(dataSources), + LiveLogFilesScanned: builder.countLiveLogFiles(dataSources), + OptimizationsUsed: builder.buildOptimizationsList(stmt, strategy, dataSources), + Aggregations: builder.buildAggregationsList(aggregations), + Details: make(map[string]interface{}), + } + + // Set row counts based on strategy + if strategy.CanUseFastPath { + // Only live logs and broker buffer rows are actually scanned; parquet uses metadata + plan.TotalRowsProcessed = dataSources.LiveLogRowCount + if dataSources.BrokerUnflushedCount > 0 { + plan.TotalRowsProcessed += dataSources.BrokerUnflushedCount + } + // Set scan method based on what data sources actually exist + if dataSources.ParquetRowCount > 0 && (dataSources.LiveLogRowCount > 0 || dataSources.BrokerUnflushedCount > 0) { + plan.Details["scan_method"] = "Parquet Metadata + Live Log/Broker Counting" + } else if dataSources.ParquetRowCount > 0 { + plan.Details["scan_method"] = "Parquet Metadata Only" + } else { + plan.Details["scan_method"] = "Live Log/Broker Counting Only" + } + } else { + plan.TotalRowsProcessed = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + plan.Details["scan_method"] = "Full Data Scan" + } + + return plan +} + +// determineExecutionStrategy determines the execution strategy based on query characteristics +func (builder *ExecutionPlanBuilder) determineExecutionStrategy(stmt *SelectStatement, strategy AggregationStrategy) string { + if stmt.Where != nil { + return "full_scan" + } + + if strategy.CanUseFastPath { + return "hybrid_fast_path" + } + + return "full_scan" +} + +// buildDataSourcesList builds the list of data sources used +func (builder *ExecutionPlanBuilder) buildDataSourcesList(strategy AggregationStrategy, dataSources *TopicDataSources) []string { + sources := []string{} + + if strategy.CanUseFastPath { + // Only show parquet stats if there are actual parquet files + if dataSources.ParquetRowCount > 0 { + sources = append(sources, "parquet_stats") + } + if dataSources.LiveLogRowCount > 0 { + sources = append(sources, "live_logs") + } + if dataSources.BrokerUnflushedCount > 0 { + sources = append(sources, "broker_buffer") + } + } else { + sources = append(sources, "live_logs", "parquet_files") + } + + // Note: broker_buffer is added dynamically during execution when broker is queried + // See aggregations.go lines 397-409 for the broker buffer data source addition logic + + return sources +} + +// countParquetFiles counts the total number of parquet files across all partitions +func (builder *ExecutionPlanBuilder) countParquetFiles(dataSources *TopicDataSources) int { + count := 0 + for _, fileStats := range dataSources.ParquetFiles { + count += len(fileStats) + } + return count +} + +// countLiveLogFiles returns the total number of live log files across all partitions +func (builder *ExecutionPlanBuilder) countLiveLogFiles(dataSources *TopicDataSources) int { + return dataSources.LiveLogFilesCount +} + +// buildOptimizationsList builds the list of optimizations used +func (builder *ExecutionPlanBuilder) buildOptimizationsList(stmt *SelectStatement, strategy AggregationStrategy, dataSources *TopicDataSources) []string { + optimizations := []string{} + + if strategy.CanUseFastPath { + // Only include parquet statistics if there are actual parquet files + if dataSources.ParquetRowCount > 0 { + optimizations = append(optimizations, "parquet_statistics") + } + if dataSources.LiveLogRowCount > 0 { + optimizations = append(optimizations, "live_log_counting") + } + // Always include deduplication when using fast path + optimizations = append(optimizations, "deduplication") + } + + if stmt.Where != nil { + // Check if "predicate_pushdown" is already in the list + found := false + for _, opt := range optimizations { + if opt == "predicate_pushdown" { + found = true + break + } + } + if !found { + optimizations = append(optimizations, "predicate_pushdown") + } + } + + return optimizations +} + +// buildAggregationsList builds the list of aggregations for display +func (builder *ExecutionPlanBuilder) buildAggregationsList(aggregations []AggregationSpec) []string { + aggList := make([]string, len(aggregations)) + for i, spec := range aggregations { + aggList[i] = fmt.Sprintf("%s(%s)", spec.Function, spec.Column) + } + return aggList +} + +// parseAggregationFunction parses an aggregation function expression +func (e *SQLEngine) parseAggregationFunction(funcExpr *FuncExpr, aliasExpr *AliasedExpr) (*AggregationSpec, error) { + funcName := strings.ToUpper(funcExpr.Name.String()) + + spec := &AggregationSpec{ + Function: funcName, + } + + // Parse function arguments + switch funcName { + case FuncCOUNT: + if len(funcExpr.Exprs) != 1 { + return nil, fmt.Errorf("COUNT function expects exactly 1 argument") + } + + switch arg := funcExpr.Exprs[0].(type) { + case *StarExpr: + spec.Column = "*" + spec.Alias = "COUNT(*)" + case *AliasedExpr: + if colName, ok := arg.Expr.(*ColName); ok { + spec.Column = colName.Name.String() + spec.Alias = fmt.Sprintf("COUNT(%s)", spec.Column) + } else { + return nil, fmt.Errorf("COUNT argument must be a column name or *") + } + default: + return nil, fmt.Errorf("unsupported COUNT argument: %T", arg) + } + + case FuncSUM, FuncAVG, FuncMIN, FuncMAX: + if len(funcExpr.Exprs) != 1 { + return nil, fmt.Errorf("%s function expects exactly 1 argument", funcName) + } + + switch arg := funcExpr.Exprs[0].(type) { + case *AliasedExpr: + if colName, ok := arg.Expr.(*ColName); ok { + spec.Column = colName.Name.String() + spec.Alias = fmt.Sprintf("%s(%s)", funcName, spec.Column) + } else { + return nil, fmt.Errorf("%s argument must be a column name", funcName) + } + default: + return nil, fmt.Errorf("unsupported %s argument: %T", funcName, arg) + } + + default: + return nil, fmt.Errorf("unsupported aggregation function: %s", funcName) + } + + // Override with user-specified alias if provided + if aliasExpr != nil && aliasExpr.As != nil && !aliasExpr.As.IsEmpty() { + spec.Alias = aliasExpr.As.String() + } + + return spec, nil +} + +// computeLiveLogMinMax scans live log files to find MIN/MAX values for a specific column +func (e *SQLEngine) computeLiveLogMinMax(partitionPath string, columnName string, parquetSourceFiles map[string]bool) (interface{}, interface{}, error) { + if e.catalog.brokerClient == nil { + return nil, nil, fmt.Errorf("no broker client available") + } + + filerClient, err := e.catalog.brokerClient.GetFilerClient() + if err != nil { + return nil, nil, fmt.Errorf("failed to get filer client: %v", err) + } + + var minValue, maxValue interface{} + var minSchemaValue, maxSchemaValue *schema_pb.Value + + // Process each live log file + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + // Skip parquet files and directories + if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { + return nil + } + // Skip files that have been converted to parquet (deduplication) + if parquetSourceFiles[entry.Name] { + return nil + } + + filePath := partitionPath + "/" + entry.Name + + // Scan this log file for MIN/MAX values + fileMin, fileMax, err := e.computeFileMinMax(filerClient, filePath, columnName) + if err != nil { + fmt.Printf("Warning: failed to compute min/max for file %s: %v\n", filePath, err) + return nil // Continue with other files + } + + // Update global min/max + if fileMin != nil { + if minSchemaValue == nil || e.compareValues(fileMin, minSchemaValue) < 0 { + minSchemaValue = fileMin + minValue = e.extractRawValue(fileMin) + } + } + + if fileMax != nil { + if maxSchemaValue == nil || e.compareValues(fileMax, maxSchemaValue) > 0 { + maxSchemaValue = fileMax + maxValue = e.extractRawValue(fileMax) + } + } + + return nil + }) + + if err != nil { + return nil, nil, fmt.Errorf("failed to process partition directory %s: %v", partitionPath, err) + } + + return minValue, maxValue, nil +} + +// computeFileMinMax scans a single log file to find MIN/MAX values for a specific column +func (e *SQLEngine) computeFileMinMax(filerClient filer_pb.FilerClient, filePath string, columnName string) (*schema_pb.Value, *schema_pb.Value, error) { + var minValue, maxValue *schema_pb.Value + + err := e.eachLogEntryInFile(filerClient, filePath, func(logEntry *filer_pb.LogEntry) error { + // Convert log entry to record value + recordValue, _, err := e.convertLogEntryToRecordValue(logEntry) + if err != nil { + return err // This will stop processing this file but not fail the overall query + } + + // Extract the requested column value + var columnValue *schema_pb.Value + if e.isSystemColumn(columnName) { + // Handle system columns + switch strings.ToLower(columnName) { + case SW_COLUMN_NAME_TIMESTAMP: + columnValue = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}} + case SW_COLUMN_NAME_KEY: + columnValue = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}} + case SW_COLUMN_NAME_SOURCE: + columnValue = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "live_log"}} + } + } else { + // Handle regular data columns + if value, exists := recordValue.Fields[columnName]; exists { + columnValue = value + } + } + + if columnValue == nil { + return nil // Skip this record + } + + // Update min/max + if minValue == nil || e.compareValues(columnValue, minValue) < 0 { + minValue = columnValue + } + if maxValue == nil || e.compareValues(columnValue, maxValue) > 0 { + maxValue = columnValue + } + + return nil + }) + + return minValue, maxValue, err +} + +// eachLogEntryInFile reads a log file and calls the provided function for each log entry +func (e *SQLEngine) eachLogEntryInFile(filerClient filer_pb.FilerClient, filePath string, fn func(*filer_pb.LogEntry) error) error { + // Extract directory and filename + // filePath is like "partitionPath/filename" + lastSlash := strings.LastIndex(filePath, "/") + if lastSlash == -1 { + return fmt.Errorf("invalid file path: %s", filePath) + } + + dirPath := filePath[:lastSlash] + fileName := filePath[lastSlash+1:] + + // Get file entry + var fileEntry *filer_pb.Entry + err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(dirPath), "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.Name == fileName { + fileEntry = entry + } + return nil + }) + + if err != nil { + return fmt.Errorf("failed to find file %s: %v", filePath, err) + } + + if fileEntry == nil { + return fmt.Errorf("file not found: %s", filePath) + } + + lookupFileIdFn := filer.LookupFn(filerClient) + + // eachChunkFn processes each chunk's data (pattern from countRowsInLogFile) + eachChunkFn := func(buf []byte) error { + for pos := 0; pos+4 < len(buf); { + size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + break + } + + entryData := buf[pos+4 : pos+4+int(size)] + + logEntry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, logEntry); err != nil { + pos += 4 + int(size) + continue // Skip corrupted entries + } + + // Call the provided function for each log entry + if err := fn(logEntry); err != nil { + return err + } + + pos += 4 + int(size) + } + return nil + } + + // Read file chunks and process them (pattern from countRowsInLogFile) + fileSize := filer.FileSize(fileEntry) + visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, fileEntry.Chunks, 0, int64(fileSize)) + chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) + + for x := chunkViews.Front(); x != nil; x = x.Next { + chunk := x.Value + urlStrings, err := lookupFileIdFn(context.Background(), chunk.FileId) + if err != nil { + fmt.Printf("Warning: failed to lookup chunk %s: %v\n", chunk.FileId, err) + continue + } + + if len(urlStrings) == 0 { + continue + } + + // Read chunk data + // urlStrings[0] is already a complete URL (http://server:port/fileId) + data, _, err := util_http.Get(urlStrings[0]) + if err != nil { + fmt.Printf("Warning: failed to read chunk %s from %s: %v\n", chunk.FileId, urlStrings[0], err) + continue + } + + // Process this chunk + if err := eachChunkFn(data); err != nil { + return err + } + } + + return nil +} + +// convertLogEntryToRecordValue helper method (reuse existing logic) +func (e *SQLEngine) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { + // Parse the log entry data as Protocol Buffer (not JSON!) + recordValue := &schema_pb.RecordValue{} + if err := proto.Unmarshal(logEntry.Data, recordValue); err != nil { + return nil, "", fmt.Errorf("failed to unmarshal log entry protobuf: %v", err) + } + + // Ensure Fields map exists + if recordValue.Fields == nil { + recordValue.Fields = make(map[string]*schema_pb.Value) + } + + // Add system columns + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, + } + recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + } + + // User data fields are already present in the protobuf-deserialized recordValue + // No additional processing needed since proto.Unmarshal already populated the Fields map + + return recordValue, "live_log", nil +} + +// extractTimestampFromFilename extracts timestamp from parquet filename +// Format: YYYY-MM-DD-HH-MM-SS.parquet +func (e *SQLEngine) extractTimestampFromFilename(filename string) int64 { + // Remove .parquet extension + filename = strings.TrimSuffix(filename, ".parquet") + + // Parse timestamp format: 2006-01-02-15-04-05 + t, err := time.Parse("2006-01-02-15-04-05", filename) + if err != nil { + return 0 + } + + return t.UnixNano() +} + +// countLiveLogRows counts the total number of rows in live log files (non-parquet files) in a partition +func (e *SQLEngine) countLiveLogRows(partitionPath string) (int64, error) { + filerClient, err := e.catalog.brokerClient.GetFilerClient() + if err != nil { + return 0, err + } + + totalRows := int64(0) + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { + return nil // Skip directories and parquet files + } + + // Count rows in live log file + rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry) + if err != nil { + fmt.Printf("Warning: failed to count rows in %s/%s: %v\n", partitionPath, entry.Name, err) + return nil // Continue with other files + } + totalRows += rowCount + return nil + }) + return totalRows, err +} + +// extractParquetSourceFiles extracts source log file names from parquet file metadata for deduplication +func (e *SQLEngine) extractParquetSourceFiles(fileStats []*ParquetFileStats) map[string]bool { + sourceFiles := make(map[string]bool) + + for _, fileStat := range fileStats { + // Each ParquetFileStats should have a reference to the original file entry + // but we need to get it through the hybrid scanner to access Extended metadata + // This is a simplified approach - in practice we'd need to access the filer entry + + // For now, we'll use filename-based deduplication as a fallback + // Extract timestamp from parquet filename (YYYY-MM-DD-HH-MM-SS.parquet) + if strings.HasSuffix(fileStat.FileName, ".parquet") { + timeStr := strings.TrimSuffix(fileStat.FileName, ".parquet") + // Mark this timestamp range as covered by parquet + sourceFiles[timeStr] = true + } + } + + return sourceFiles +} + +// countLiveLogRowsExcludingParquetSources counts live log rows but excludes files that were converted to parquet and duplicate log buffer data +func (e *SQLEngine) countLiveLogRowsExcludingParquetSources(ctx context.Context, partitionPath string, parquetSourceFiles map[string]bool) (int64, error) { + filerClient, err := e.catalog.brokerClient.GetFilerClient() + if err != nil { + return 0, err + } + + // First, get the actual source files from parquet metadata + actualSourceFiles, err := e.getParquetSourceFilesFromMetadata(partitionPath) + if err != nil { + // If we can't read parquet metadata, use filename-based fallback + fmt.Printf("Warning: failed to read parquet metadata, using filename-based deduplication: %v\n", err) + actualSourceFiles = parquetSourceFiles + } + + // Second, get duplicate files from log buffer metadata + logBufferDuplicates, err := e.buildLogBufferDeduplicationMap(ctx, partitionPath) + if err != nil { + if isDebugMode(ctx) { + fmt.Printf("Warning: failed to build log buffer deduplication map: %v\n", err) + } + logBufferDuplicates = make(map[string]bool) + } + + // Debug: Show deduplication status (only in explain mode) + if isDebugMode(ctx) { + if len(actualSourceFiles) > 0 { + fmt.Printf("Excluding %d converted log files from %s\n", len(actualSourceFiles), partitionPath) + } + if len(logBufferDuplicates) > 0 { + fmt.Printf("Excluding %d duplicate log buffer files from %s\n", len(logBufferDuplicates), partitionPath) + } + } + + totalRows := int64(0) + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { + return nil // Skip directories and parquet files + } + + // Skip files that have been converted to parquet + if actualSourceFiles[entry.Name] { + if isDebugMode(ctx) { + fmt.Printf("Skipping %s (already converted to parquet)\n", entry.Name) + } + return nil + } + + // Skip files that are duplicated due to log buffer metadata + if logBufferDuplicates[entry.Name] { + if isDebugMode(ctx) { + fmt.Printf("Skipping %s (duplicate log buffer data)\n", entry.Name) + } + return nil + } + + // Count rows in live log file + rowCount, err := e.countRowsInLogFile(filerClient, partitionPath, entry) + if err != nil { + fmt.Printf("Warning: failed to count rows in %s/%s: %v\n", partitionPath, entry.Name, err) + return nil // Continue with other files + } + totalRows += rowCount + return nil + }) + return totalRows, err +} + +// getParquetSourceFilesFromMetadata reads parquet file metadata to get actual source log files +func (e *SQLEngine) getParquetSourceFilesFromMetadata(partitionPath string) (map[string]bool, error) { + filerClient, err := e.catalog.brokerClient.GetFilerClient() + if err != nil { + return nil, err + } + + sourceFiles := make(map[string]bool) + + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") { + return nil + } + + // Read source files from Extended metadata + if entry.Extended != nil && entry.Extended["sources"] != nil { + var sources []string + if err := json.Unmarshal(entry.Extended["sources"], &sources); err == nil { + for _, source := range sources { + sourceFiles[source] = true + } + } + } + + return nil + }) + + return sourceFiles, err +} + +// getLogBufferStartFromFile reads buffer start from file extended attributes +func (e *SQLEngine) getLogBufferStartFromFile(entry *filer_pb.Entry) (*LogBufferStart, error) { + if entry.Extended == nil { + return nil, nil + } + + // Only support binary buffer_start format + if startData, exists := entry.Extended["buffer_start"]; exists { + if len(startData) == 8 { + startIndex := int64(binary.BigEndian.Uint64(startData)) + if startIndex > 0 { + return &LogBufferStart{StartIndex: startIndex}, nil + } + } else { + return nil, fmt.Errorf("invalid buffer_start format: expected 8 bytes, got %d", len(startData)) + } + } + + return nil, nil +} + +// buildLogBufferDeduplicationMap creates a map to track duplicate files based on buffer ranges (ultra-efficient) +func (e *SQLEngine) buildLogBufferDeduplicationMap(ctx context.Context, partitionPath string) (map[string]bool, error) { + if e.catalog.brokerClient == nil { + return make(map[string]bool), nil + } + + filerClient, err := e.catalog.brokerClient.GetFilerClient() + if err != nil { + return make(map[string]bool), nil // Don't fail the query, just skip deduplication + } + + // Track buffer ranges instead of individual indexes (much more efficient) + type BufferRange struct { + start, end int64 + } + + processedRanges := make([]BufferRange, 0) + duplicateFiles := make(map[string]bool) + + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") { + return nil // Skip directories and parquet files + } + + // Get buffer start for this file (most efficient) + bufferStart, err := e.getLogBufferStartFromFile(entry) + if err != nil || bufferStart == nil { + return nil // No buffer info, can't deduplicate + } + + // Calculate range for this file: [start, start + chunkCount - 1] + chunkCount := int64(len(entry.GetChunks())) + if chunkCount == 0 { + return nil // Empty file, skip + } + + fileRange := BufferRange{ + start: bufferStart.StartIndex, + end: bufferStart.StartIndex + chunkCount - 1, + } + + // Check if this range overlaps with any processed range + isDuplicate := false + for _, processedRange := range processedRanges { + if fileRange.start <= processedRange.end && fileRange.end >= processedRange.start { + // Ranges overlap - this file contains duplicate buffer indexes + isDuplicate = true + if isDebugMode(ctx) { + fmt.Printf("Marking %s as duplicate (buffer range [%d-%d] overlaps with [%d-%d])\n", + entry.Name, fileRange.start, fileRange.end, processedRange.start, processedRange.end) + } + break + } + } + + if isDuplicate { + duplicateFiles[entry.Name] = true + } else { + // Add this range to processed ranges + processedRanges = append(processedRanges, fileRange) + } + + return nil + }) + + if err != nil { + return make(map[string]bool), nil // Don't fail the query + } + + return duplicateFiles, nil +} + +// countRowsInLogFile counts rows in a single log file using SeaweedFS patterns +func (e *SQLEngine) countRowsInLogFile(filerClient filer_pb.FilerClient, partitionPath string, entry *filer_pb.Entry) (int64, error) { + lookupFileIdFn := filer.LookupFn(filerClient) + + rowCount := int64(0) + + // eachChunkFn processes each chunk's data (pattern from read_log_from_disk.go) + eachChunkFn := func(buf []byte) error { + for pos := 0; pos+4 < len(buf); { + size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + break + } + + entryData := buf[pos+4 : pos+4+int(size)] + + logEntry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, logEntry); err != nil { + pos += 4 + int(size) + continue // Skip corrupted entries + } + + // Skip control messages (publisher control, empty key, or no data) + if isControlLogEntry(logEntry) { + pos += 4 + int(size) + continue + } + + rowCount++ + pos += 4 + int(size) + } + return nil + } + + // Read file chunks and process them (pattern from read_log_from_disk.go) + fileSize := filer.FileSize(entry) + visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) + chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) + + for x := chunkViews.Front(); x != nil; x = x.Next { + chunk := x.Value + urlStrings, err := lookupFileIdFn(context.Background(), chunk.FileId) + if err != nil { + fmt.Printf("Warning: failed to lookup chunk %s: %v\n", chunk.FileId, err) + continue + } + + if len(urlStrings) == 0 { + continue + } + + // Read chunk data + // urlStrings[0] is already a complete URL (http://server:port/fileId) + data, _, err := util_http.Get(urlStrings[0]) + if err != nil { + fmt.Printf("Warning: failed to read chunk %s from %s: %v\n", chunk.FileId, urlStrings[0], err) + continue + } + + // Process this chunk + if err := eachChunkFn(data); err != nil { + return rowCount, err + } + } + + return rowCount, nil +} + +// isControlLogEntry checks if a log entry is a control entry without actual user data +// Control entries include: +// - DataMessages with populated Ctrl field (publisher control signals) +// - Entries with empty keys (filtered by subscriber) +// - Entries with no data +func isControlLogEntry(logEntry *filer_pb.LogEntry) bool { + // No data: control or placeholder + if len(logEntry.Data) == 0 { + return true + } + + // Empty keys are treated as control entries (consistent with subscriber filtering) + if len(logEntry.Key) == 0 { + return true + } + + // Check if the payload is a DataMessage carrying a control signal + dataMessage := &mq_pb.DataMessage{} + if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil { + if dataMessage.Ctrl != nil { + return true + } + } + + return false +} + +// discoverTopicPartitions discovers all partitions for a given topic using centralized logic +func (e *SQLEngine) discoverTopicPartitions(namespace, topicName string) ([]string, error) { + // Use centralized topic partition discovery + t := topic.NewTopic(namespace, topicName) + + // Get FilerClient from BrokerClient + filerClient, err := e.catalog.brokerClient.GetFilerClient() + if err != nil { + return nil, err + } + + return t.DiscoverPartitions(context.Background(), filerClient) +} + +// getTopicTotalRowCount returns the total number of rows in a topic (combining parquet and live logs) +func (e *SQLEngine) getTopicTotalRowCount(ctx context.Context, namespace, topicName string) (int64, error) { + // Create a hybrid scanner to access parquet statistics + var filerClient filer_pb.FilerClient + if e.catalog.brokerClient != nil { + var filerClientErr error + filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() + if filerClientErr != nil { + return 0, filerClientErr + } + } + + hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, namespace, topicName, e) + if err != nil { + return 0, err + } + + // Get all partitions for this topic + // Note: discoverTopicPartitions always returns absolute paths + partitions, err := e.discoverTopicPartitions(namespace, topicName) + if err != nil { + return 0, err + } + + totalRowCount := int64(0) + + // For each partition, count both parquet and live log rows + for _, partition := range partitions { + // Count parquet rows + parquetStats, parquetErr := hybridScanner.ReadParquetStatistics(partition) + if parquetErr == nil { + for _, stats := range parquetStats { + totalRowCount += stats.RowCount + } + } + + // Count live log rows (with deduplication) + parquetSourceFiles := make(map[string]bool) + if parquetErr == nil { + parquetSourceFiles = e.extractParquetSourceFiles(parquetStats) + } + + liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(ctx, partition, parquetSourceFiles) + if liveLogErr == nil { + totalRowCount += liveLogCount + } + } + + return totalRowCount, nil +} + +// getActualRowsScannedForFastPath returns only the rows that need to be scanned for fast path aggregations +// (i.e., live log rows that haven't been converted to parquet - parquet uses metadata only) +func (e *SQLEngine) getActualRowsScannedForFastPath(ctx context.Context, namespace, topicName string) (int64, error) { + // Create a hybrid scanner to access parquet statistics + var filerClient filer_pb.FilerClient + if e.catalog.brokerClient != nil { + var filerClientErr error + filerClient, filerClientErr = e.catalog.brokerClient.GetFilerClient() + if filerClientErr != nil { + return 0, filerClientErr + } + } + + hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, namespace, topicName, e) + if err != nil { + return 0, err + } + + // Get all partitions for this topic + // Note: discoverTopicPartitions always returns absolute paths + partitions, err := e.discoverTopicPartitions(namespace, topicName) + if err != nil { + return 0, err + } + + totalScannedRows := int64(0) + + // For each partition, count ONLY the live log rows that need scanning + // (parquet files use metadata/statistics, so they contribute 0 to scan count) + for _, partition := range partitions { + // Get parquet files to determine what was converted + parquetStats, parquetErr := hybridScanner.ReadParquetStatistics(partition) + parquetSourceFiles := make(map[string]bool) + if parquetErr == nil { + parquetSourceFiles = e.extractParquetSourceFiles(parquetStats) + } + + // Count only live log rows that haven't been converted to parquet + liveLogCount, liveLogErr := e.countLiveLogRowsExcludingParquetSources(ctx, partition, parquetSourceFiles) + if liveLogErr == nil { + totalScannedRows += liveLogCount + } + + // Note: Parquet files contribute 0 to scan count since we use their metadata/statistics + } + + return totalScannedRows, nil +} + +// findColumnValue performs case-insensitive lookup of column values +// Now includes support for system columns stored in HybridScanResult +func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) *schema_pb.Value { + // Check system columns first (stored separately in HybridScanResult) + lowerColumnName := strings.ToLower(columnName) + switch lowerColumnName { + case SW_COLUMN_NAME_TIMESTAMP, SW_DISPLAY_NAME_TIMESTAMP: + // For timestamp column, format as proper timestamp instead of raw nanoseconds + timestamp := time.Unix(result.Timestamp/1e9, result.Timestamp%1e9) + timestampStr := timestamp.UTC().Format("2006-01-02T15:04:05.000000000Z") + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: timestampStr}} + case SW_COLUMN_NAME_KEY: + return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}} + case SW_COLUMN_NAME_SOURCE: + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: result.Source}} + } + + // Then check regular columns in Values map + // First try exact match + if value, exists := result.Values[columnName]; exists { + return value + } + + // Then try case-insensitive match + for key, value := range result.Values { + if strings.ToLower(key) == lowerColumnName { + return value + } + } + + return nil +} + +// discoverAndRegisterTopic attempts to discover an existing topic and register it in the SQL catalog +func (e *SQLEngine) discoverAndRegisterTopic(ctx context.Context, database, tableName string) error { + // First, check if topic exists by trying to get its schema from the broker/filer + recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName) + if err != nil { + return fmt.Errorf("topic %s.%s not found or no schema available: %v", database, tableName, err) + } + + // Create a schema object from the discovered record type + mqSchema := &schema.Schema{ + Namespace: database, + Name: tableName, + RecordType: recordType, + RevisionId: 1, // Default to revision 1 for discovered topics + } + + // Register the topic in the SQL catalog + err = e.catalog.RegisterTopic(database, tableName, mqSchema) + if err != nil { + return fmt.Errorf("failed to register discovered topic %s.%s: %v", database, tableName, err) + } + + // Note: This is a discovery operation, not query execution, so it's okay to always log + return nil +} + +// getArithmeticExpressionAlias generates a display alias for arithmetic expressions +func (e *SQLEngine) getArithmeticExpressionAlias(expr *ArithmeticExpr) string { + leftAlias := e.getExpressionAlias(expr.Left) + rightAlias := e.getExpressionAlias(expr.Right) + return leftAlias + expr.Operator + rightAlias +} + +// getExpressionAlias generates an alias for any expression node +func (e *SQLEngine) getExpressionAlias(expr ExprNode) string { + switch exprType := expr.(type) { + case *ColName: + return exprType.Name.String() + case *ArithmeticExpr: + return e.getArithmeticExpressionAlias(exprType) + case *SQLVal: + return e.getSQLValAlias(exprType) + default: + return "expr" + } +} + +// evaluateArithmeticExpression evaluates an arithmetic expression for a given record +func (e *SQLEngine) evaluateArithmeticExpression(expr *ArithmeticExpr, result HybridScanResult) (*schema_pb.Value, error) { + // Check for timestamp arithmetic with intervals first + if e.isTimestampArithmetic(expr.Left, expr.Right) && (expr.Operator == "+" || expr.Operator == "-") { + return e.evaluateTimestampArithmetic(expr.Left, expr.Right, expr.Operator) + } + + // Get left operand value + leftValue, err := e.evaluateExpressionValue(expr.Left, result) + if err != nil { + return nil, fmt.Errorf("error evaluating left operand: %v", err) + } + + // Get right operand value + rightValue, err := e.evaluateExpressionValue(expr.Right, result) + if err != nil { + return nil, fmt.Errorf("error evaluating right operand: %v", err) + } + + // Handle string concatenation operator + if expr.Operator == "||" { + return e.Concat(leftValue, rightValue) + } + + // Perform arithmetic operation + var op ArithmeticOperator + switch expr.Operator { + case "+": + op = OpAdd + case "-": + op = OpSub + case "*": + op = OpMul + case "/": + op = OpDiv + case "%": + op = OpMod + default: + return nil, fmt.Errorf("unsupported arithmetic operator: %s", expr.Operator) + } + + return e.EvaluateArithmeticExpression(leftValue, rightValue, op) +} + +// isTimestampArithmetic checks if an arithmetic operation involves timestamps and intervals +func (e *SQLEngine) isTimestampArithmetic(left, right ExprNode) bool { + // Check if left is a timestamp function (NOW, CURRENT_TIMESTAMP, etc.) + leftIsTimestamp := e.isTimestampFunction(left) + + // Check if right is an interval + rightIsInterval := e.isIntervalExpression(right) + + return leftIsTimestamp && rightIsInterval +} + +// isTimestampFunction checks if an expression is a timestamp function +func (e *SQLEngine) isTimestampFunction(expr ExprNode) bool { + if funcExpr, ok := expr.(*FuncExpr); ok { + funcName := strings.ToUpper(funcExpr.Name.String()) + return funcName == "NOW" || funcName == "CURRENT_TIMESTAMP" || funcName == "CURRENT_DATE" || funcName == "CURRENT_TIME" + } + return false +} + +// isIntervalExpression checks if an expression is an interval +func (e *SQLEngine) isIntervalExpression(expr ExprNode) bool { + _, ok := expr.(*IntervalExpr) + return ok +} + +// evaluateExpressionValue evaluates any expression to get its value from a record +func (e *SQLEngine) evaluateExpressionValue(expr ExprNode, result HybridScanResult) (*schema_pb.Value, error) { + switch exprType := expr.(type) { + case *ColName: + columnName := exprType.Name.String() + upperColumnName := strings.ToUpper(columnName) + + // Check if this is actually a string literal that was parsed as ColName + if (strings.HasPrefix(columnName, "'") && strings.HasSuffix(columnName, "'")) || + (strings.HasPrefix(columnName, "\"") && strings.HasSuffix(columnName, "\"")) { + // This is a string literal that was incorrectly parsed as a column name + literal := strings.Trim(strings.Trim(columnName, "'"), "\"") + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: literal}}, nil + } + + // Check if this is actually a function call that was parsed as ColName + if strings.Contains(columnName, "(") && strings.Contains(columnName, ")") { + // This is a function call that was parsed incorrectly as a column name + // We need to manually evaluate it as a function + return e.evaluateColumnNameAsFunction(columnName, result) + } + + // Check if this is a datetime constant + if upperColumnName == FuncCURRENT_DATE || upperColumnName == FuncCURRENT_TIME || + upperColumnName == FuncCURRENT_TIMESTAMP || upperColumnName == FuncNOW { + switch upperColumnName { + case FuncCURRENT_DATE: + return e.CurrentDate() + case FuncCURRENT_TIME: + return e.CurrentTime() + case FuncCURRENT_TIMESTAMP: + return e.CurrentTimestamp() + case FuncNOW: + return e.Now() + } + } + + // Check if this is actually a numeric literal disguised as a column name + if val, err := strconv.ParseInt(columnName, 10, 64); err == nil { + return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: val}}, nil + } + if val, err := strconv.ParseFloat(columnName, 64); err == nil { + return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: val}}, nil + } + + // Otherwise, treat as a regular column lookup + value := e.findColumnValue(result, columnName) + if value == nil { + return nil, nil + } + return value, nil + case *ArithmeticExpr: + return e.evaluateArithmeticExpression(exprType, result) + case *SQLVal: + // Handle literal values + return e.convertSQLValToSchemaValue(exprType), nil + case *FuncExpr: + // Handle function calls that are part of arithmetic expressions + funcName := strings.ToUpper(exprType.Name.String()) + + // Route to appropriate function evaluator based on function type + if e.isDateTimeFunction(funcName) { + // Use datetime function evaluator + return e.evaluateDateTimeFunction(exprType, result) + } else { + // Use string function evaluator + return e.evaluateStringFunction(exprType, result) + } + case *IntervalExpr: + // Handle interval expressions - evaluate as duration in nanoseconds + nanos, err := e.evaluateInterval(exprType.Value) + if err != nil { + return nil, err + } + return &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: nanos}, + }, nil + default: + return nil, fmt.Errorf("unsupported expression type: %T", expr) + } +} + +// convertSQLValToSchemaValue converts SQLVal literal to schema_pb.Value +func (e *SQLEngine) convertSQLValToSchemaValue(sqlVal *SQLVal) *schema_pb.Value { + switch sqlVal.Type { + case IntVal: + if val, err := strconv.ParseInt(string(sqlVal.Val), 10, 64); err == nil { + return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: val}} + } + case FloatVal: + if val, err := strconv.ParseFloat(string(sqlVal.Val), 64); err == nil { + return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: val}} + } + case StrVal: + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(sqlVal.Val)}} + } + // Default to string if parsing fails + return &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: string(sqlVal.Val)}} +} + +// ConvertToSQLResultWithExpressions converts HybridScanResults to SQL query results with expression evaluation +func (e *SQLEngine) ConvertToSQLResultWithExpressions(hms *HybridMessageScanner, results []HybridScanResult, selectExprs []SelectExpr) *QueryResult { + if len(results) == 0 { + columns := make([]string, 0, len(selectExprs)) + for _, selectExpr := range selectExprs { + switch expr := selectExpr.(type) { + case *AliasedExpr: + // Check if alias is available and use it + if expr.As != nil && !expr.As.IsEmpty() { + columns = append(columns, expr.As.String()) + } else { + // Fall back to expression-based column naming + switch col := expr.Expr.(type) { + case *ColName: + columnName := col.Name.String() + upperColumnName := strings.ToUpper(columnName) + + // Check if this is an arithmetic expression embedded in a ColName + if arithmeticExpr := e.parseColumnLevelCalculation(columnName); arithmeticExpr != nil { + columns = append(columns, e.getArithmeticExpressionAlias(arithmeticExpr)) + } else if upperColumnName == FuncCURRENT_DATE || upperColumnName == FuncCURRENT_TIME || + upperColumnName == FuncCURRENT_TIMESTAMP || upperColumnName == FuncNOW { + // Use lowercase for datetime constants in column headers + columns = append(columns, strings.ToLower(columnName)) + } else { + // Use display name for system columns + displayName := e.getSystemColumnDisplayName(columnName) + columns = append(columns, displayName) + } + case *ArithmeticExpr: + columns = append(columns, e.getArithmeticExpressionAlias(col)) + case *FuncExpr: + columns = append(columns, e.getStringFunctionAlias(col)) + case *SQLVal: + columns = append(columns, e.getSQLValAlias(col)) + default: + columns = append(columns, "expr") + } + } + } + } + + return &QueryResult{ + Columns: columns, + Rows: [][]sqltypes.Value{}, + Database: hms.topic.Namespace, + Table: hms.topic.Name, + } + } + + // Build columns from SELECT expressions + columns := make([]string, 0, len(selectExprs)) + for _, selectExpr := range selectExprs { + switch expr := selectExpr.(type) { + case *AliasedExpr: + // Check if alias is available and use it + if expr.As != nil && !expr.As.IsEmpty() { + columns = append(columns, expr.As.String()) + } else { + // Fall back to expression-based column naming + switch col := expr.Expr.(type) { + case *ColName: + columnName := col.Name.String() + upperColumnName := strings.ToUpper(columnName) + + // Check if this is an arithmetic expression embedded in a ColName + if arithmeticExpr := e.parseColumnLevelCalculation(columnName); arithmeticExpr != nil { + columns = append(columns, e.getArithmeticExpressionAlias(arithmeticExpr)) + } else if upperColumnName == FuncCURRENT_DATE || upperColumnName == FuncCURRENT_TIME || + upperColumnName == FuncCURRENT_TIMESTAMP || upperColumnName == FuncNOW { + // Use lowercase for datetime constants in column headers + columns = append(columns, strings.ToLower(columnName)) + } else { + columns = append(columns, columnName) + } + case *ArithmeticExpr: + columns = append(columns, e.getArithmeticExpressionAlias(col)) + case *FuncExpr: + columns = append(columns, e.getStringFunctionAlias(col)) + case *SQLVal: + columns = append(columns, e.getSQLValAlias(col)) + default: + columns = append(columns, "expr") + } + } + } + } + + // Convert to SQL rows with expression evaluation + rows := make([][]sqltypes.Value, len(results)) + for i, result := range results { + row := make([]sqltypes.Value, len(selectExprs)) + for j, selectExpr := range selectExprs { + switch expr := selectExpr.(type) { + case *AliasedExpr: + switch col := expr.Expr.(type) { + case *ColName: + // Handle regular column, datetime constants, or arithmetic expressions + columnName := col.Name.String() + upperColumnName := strings.ToUpper(columnName) + + // Check if this is an arithmetic expression embedded in a ColName + if arithmeticExpr := e.parseColumnLevelCalculation(columnName); arithmeticExpr != nil { + // Handle as arithmetic expression + if value, err := e.evaluateArithmeticExpression(arithmeticExpr, result); err == nil && value != nil { + row[j] = convertSchemaValueToSQL(value) + } else { + row[j] = sqltypes.NULL + } + } else if upperColumnName == "CURRENT_DATE" || upperColumnName == "CURRENT_TIME" || + upperColumnName == "CURRENT_TIMESTAMP" || upperColumnName == "NOW" { + // Handle as datetime function + var value *schema_pb.Value + var err error + switch upperColumnName { + case FuncCURRENT_DATE: + value, err = e.CurrentDate() + case FuncCURRENT_TIME: + value, err = e.CurrentTime() + case FuncCURRENT_TIMESTAMP: + value, err = e.CurrentTimestamp() + case FuncNOW: + value, err = e.Now() + } + + if err == nil && value != nil { + row[j] = convertSchemaValueToSQL(value) + } else { + row[j] = sqltypes.NULL + } + } else { + // Handle as regular column + if value := e.findColumnValue(result, columnName); value != nil { + row[j] = convertSchemaValueToSQL(value) + } else { + row[j] = sqltypes.NULL + } + } + case *ArithmeticExpr: + // Handle arithmetic expression + if value, err := e.evaluateArithmeticExpression(col, result); err == nil && value != nil { + row[j] = convertSchemaValueToSQL(value) + } else { + row[j] = sqltypes.NULL + } + case *FuncExpr: + // Handle function - route to appropriate evaluator + funcName := strings.ToUpper(col.Name.String()) + var value *schema_pb.Value + var err error + + // Check if it's a datetime function + if e.isDateTimeFunction(funcName) { + value, err = e.evaluateDateTimeFunction(col, result) + } else { + // Default to string function evaluator + value, err = e.evaluateStringFunction(col, result) + } + + if err == nil && value != nil { + row[j] = convertSchemaValueToSQL(value) + } else { + row[j] = sqltypes.NULL + } + case *SQLVal: + // Handle literal value + value := e.convertSQLValToSchemaValue(col) + row[j] = convertSchemaValueToSQL(value) + default: + row[j] = sqltypes.NULL + } + default: + row[j] = sqltypes.NULL + } + } + rows[i] = row + } + + return &QueryResult{ + Columns: columns, + Rows: rows, + Database: hms.topic.Namespace, + Table: hms.topic.Name, + } +} + +// extractBaseColumns recursively extracts base column names from arithmetic expressions +func (e *SQLEngine) extractBaseColumns(expr *ArithmeticExpr, baseColumnsSet map[string]bool) { + // Extract columns from left operand + e.extractBaseColumnsFromExpression(expr.Left, baseColumnsSet) + // Extract columns from right operand + e.extractBaseColumnsFromExpression(expr.Right, baseColumnsSet) +} + +// extractBaseColumnsFromExpression extracts base column names from any expression node +func (e *SQLEngine) extractBaseColumnsFromExpression(expr ExprNode, baseColumnsSet map[string]bool) { + switch exprType := expr.(type) { + case *ColName: + columnName := exprType.Name.String() + // Check if it's a literal number disguised as a column name + if _, err := strconv.ParseInt(columnName, 10, 64); err != nil { + if _, err := strconv.ParseFloat(columnName, 64); err != nil { + // Not a numeric literal, treat as actual column name + baseColumnsSet[columnName] = true + } + } + case *ArithmeticExpr: + // Recursively handle nested arithmetic expressions + e.extractBaseColumns(exprType, baseColumnsSet) + } +} + +// isAggregationFunction checks if a function name is an aggregation function +func (e *SQLEngine) isAggregationFunction(funcName string) bool { + // Convert to uppercase for case-insensitive comparison + upperFuncName := strings.ToUpper(funcName) + switch upperFuncName { + case FuncCOUNT, FuncSUM, FuncAVG, FuncMIN, FuncMAX: + return true + default: + return false + } +} + +// isStringFunction checks if a function name is a string function +func (e *SQLEngine) isStringFunction(funcName string) bool { + switch funcName { + case FuncUPPER, FuncLOWER, FuncLENGTH, FuncTRIM, FuncBTRIM, FuncLTRIM, FuncRTRIM, FuncSUBSTRING, FuncLEFT, FuncRIGHT, FuncCONCAT: + return true + default: + return false + } +} + +// isDateTimeFunction checks if a function name is a datetime function +func (e *SQLEngine) isDateTimeFunction(funcName string) bool { + switch funcName { + case FuncCURRENT_DATE, FuncCURRENT_TIME, FuncCURRENT_TIMESTAMP, FuncNOW, FuncEXTRACT, FuncDATE_TRUNC: + return true + default: + return false + } +} + +// getStringFunctionAlias generates an alias for string functions +func (e *SQLEngine) getStringFunctionAlias(funcExpr *FuncExpr) string { + funcName := funcExpr.Name.String() + if len(funcExpr.Exprs) == 1 { + if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok { + if colName, ok := aliasedExpr.Expr.(*ColName); ok { + return fmt.Sprintf("%s(%s)", funcName, colName.Name.String()) + } + } + } + return fmt.Sprintf("%s(...)", funcName) +} + +// getDateTimeFunctionAlias generates an alias for datetime functions +func (e *SQLEngine) getDateTimeFunctionAlias(funcExpr *FuncExpr) string { + funcName := funcExpr.Name.String() + + // Handle zero-argument functions like CURRENT_DATE, NOW + if len(funcExpr.Exprs) == 0 { + // Use lowercase for datetime constants in column headers + return strings.ToLower(funcName) + } + + // Handle EXTRACT function specially to create unique aliases + if strings.ToUpper(funcName) == "EXTRACT" && len(funcExpr.Exprs) == 2 { + // Try to extract the date part to make the alias unique + if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok { + if sqlVal, ok := aliasedExpr.Expr.(*SQLVal); ok && sqlVal.Type == StrVal { + datePart := strings.ToLower(string(sqlVal.Val)) + return fmt.Sprintf("extract_%s", datePart) + } + } + // Fallback to generic if we can't extract the date part + return fmt.Sprintf("%s(...)", funcName) + } + + // Handle other multi-argument functions like DATE_TRUNC + if len(funcExpr.Exprs) == 2 { + return fmt.Sprintf("%s(...)", funcName) + } + + return fmt.Sprintf("%s(...)", funcName) +} + +// extractBaseColumnsFromFunction extracts base columns needed by a string function +func (e *SQLEngine) extractBaseColumnsFromFunction(funcExpr *FuncExpr, baseColumnsSet map[string]bool) { + for _, expr := range funcExpr.Exprs { + if aliasedExpr, ok := expr.(*AliasedExpr); ok { + e.extractBaseColumnsFromExpression(aliasedExpr.Expr, baseColumnsSet) + } + } +} + +// getSQLValAlias generates an alias for SQL literal values +func (e *SQLEngine) getSQLValAlias(sqlVal *SQLVal) string { + switch sqlVal.Type { + case StrVal: + // Escape single quotes by replacing ' with '' (SQL standard escaping) + escapedVal := strings.ReplaceAll(string(sqlVal.Val), "'", "''") + return fmt.Sprintf("'%s'", escapedVal) + case IntVal: + return string(sqlVal.Val) + case FloatVal: + return string(sqlVal.Val) + default: + return "literal" + } +} + +// evaluateStringFunction evaluates a string function for a given record +func (e *SQLEngine) evaluateStringFunction(funcExpr *FuncExpr, result HybridScanResult) (*schema_pb.Value, error) { + funcName := strings.ToUpper(funcExpr.Name.String()) + + // Most string functions require exactly 1 argument + if len(funcExpr.Exprs) != 1 { + return nil, fmt.Errorf("function %s expects exactly 1 argument", funcName) + } + + // Get the argument value + var argValue *schema_pb.Value + if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok { + var err error + argValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result) + if err != nil { + return nil, fmt.Errorf("error evaluating function argument: %v", err) + } + } else { + return nil, fmt.Errorf("unsupported function argument type") + } + + if argValue == nil { + return nil, nil // NULL input produces NULL output + } + + // Call the appropriate string function + switch funcName { + case FuncUPPER: + return e.Upper(argValue) + case FuncLOWER: + return e.Lower(argValue) + case FuncLENGTH: + return e.Length(argValue) + case FuncTRIM, FuncBTRIM: // CockroachDB converts TRIM to BTRIM + return e.Trim(argValue) + case FuncLTRIM: + return e.LTrim(argValue) + case FuncRTRIM: + return e.RTrim(argValue) + default: + return nil, fmt.Errorf("unsupported string function: %s", funcName) + } +} + +// evaluateDateTimeFunction evaluates a datetime function for a given record +func (e *SQLEngine) evaluateDateTimeFunction(funcExpr *FuncExpr, result HybridScanResult) (*schema_pb.Value, error) { + funcName := strings.ToUpper(funcExpr.Name.String()) + + switch funcName { + case FuncEXTRACT: + // EXTRACT requires exactly 2 arguments: date part and value + if len(funcExpr.Exprs) != 2 { + return nil, fmt.Errorf("EXTRACT function expects exactly 2 arguments (date_part, value), got %d", len(funcExpr.Exprs)) + } + + // Get the first argument (date part) + var datePartValue *schema_pb.Value + if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok { + var err error + datePartValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result) + if err != nil { + return nil, fmt.Errorf("error evaluating EXTRACT date part argument: %v", err) + } + } else { + return nil, fmt.Errorf("unsupported EXTRACT date part argument type") + } + + if datePartValue == nil { + return nil, fmt.Errorf("EXTRACT date part cannot be NULL") + } + + // Convert date part to string + var datePart string + if stringVal, ok := datePartValue.Kind.(*schema_pb.Value_StringValue); ok { + datePart = strings.ToUpper(stringVal.StringValue) + } else { + return nil, fmt.Errorf("EXTRACT date part must be a string") + } + + // Get the second argument (value to extract from) + var extractValue *schema_pb.Value + if aliasedExpr, ok := funcExpr.Exprs[1].(*AliasedExpr); ok { + var err error + extractValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result) + if err != nil { + return nil, fmt.Errorf("error evaluating EXTRACT value argument: %v", err) + } + } else { + return nil, fmt.Errorf("unsupported EXTRACT value argument type") + } + + if extractValue == nil { + return nil, nil // NULL input produces NULL output + } + + // Call the Extract function + return e.Extract(DatePart(datePart), extractValue) + + case FuncDATE_TRUNC: + // DATE_TRUNC requires exactly 2 arguments: precision and value + if len(funcExpr.Exprs) != 2 { + return nil, fmt.Errorf("DATE_TRUNC function expects exactly 2 arguments (precision, value), got %d", len(funcExpr.Exprs)) + } + + // Get the first argument (precision) + var precisionValue *schema_pb.Value + if aliasedExpr, ok := funcExpr.Exprs[0].(*AliasedExpr); ok { + var err error + precisionValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result) + if err != nil { + return nil, fmt.Errorf("error evaluating DATE_TRUNC precision argument: %v", err) + } + } else { + return nil, fmt.Errorf("unsupported DATE_TRUNC precision argument type") + } + + if precisionValue == nil { + return nil, fmt.Errorf("DATE_TRUNC precision cannot be NULL") + } + + // Convert precision to string + var precision string + if stringVal, ok := precisionValue.Kind.(*schema_pb.Value_StringValue); ok { + precision = stringVal.StringValue + } else { + return nil, fmt.Errorf("DATE_TRUNC precision must be a string") + } + + // Get the second argument (value to truncate) + var truncateValue *schema_pb.Value + if aliasedExpr, ok := funcExpr.Exprs[1].(*AliasedExpr); ok { + var err error + truncateValue, err = e.evaluateExpressionValue(aliasedExpr.Expr, result) + if err != nil { + return nil, fmt.Errorf("error evaluating DATE_TRUNC value argument: %v", err) + } + } else { + return nil, fmt.Errorf("unsupported DATE_TRUNC value argument type") + } + + if truncateValue == nil { + return nil, nil // NULL input produces NULL output + } + + // Call the DateTrunc function + return e.DateTrunc(precision, truncateValue) + + case FuncCURRENT_DATE: + // CURRENT_DATE is a zero-argument function + if len(funcExpr.Exprs) != 0 { + return nil, fmt.Errorf("CURRENT_DATE function expects no arguments, got %d", len(funcExpr.Exprs)) + } + return e.CurrentDate() + + case FuncCURRENT_TIME: + // CURRENT_TIME is a zero-argument function + if len(funcExpr.Exprs) != 0 { + return nil, fmt.Errorf("CURRENT_TIME function expects no arguments, got %d", len(funcExpr.Exprs)) + } + return e.CurrentTime() + + case FuncCURRENT_TIMESTAMP: + // CURRENT_TIMESTAMP is a zero-argument function + if len(funcExpr.Exprs) != 0 { + return nil, fmt.Errorf("CURRENT_TIMESTAMP function expects no arguments, got %d", len(funcExpr.Exprs)) + } + return e.CurrentTimestamp() + + case FuncNOW: + // NOW is a zero-argument function (but often used with () syntax) + if len(funcExpr.Exprs) != 0 { + return nil, fmt.Errorf("NOW function expects no arguments, got %d", len(funcExpr.Exprs)) + } + return e.Now() + + // PostgreSQL uses EXTRACT(part FROM date) instead of convenience functions like YEAR(date) + + default: + return nil, fmt.Errorf("unsupported datetime function: %s", funcName) + } +} + +// evaluateInterval parses an interval string and returns duration in nanoseconds +func (e *SQLEngine) evaluateInterval(intervalValue string) (int64, error) { + // Parse interval strings like "1 hour", "30 minutes", "2 days" + parts := strings.Fields(strings.TrimSpace(intervalValue)) + if len(parts) != 2 { + return 0, fmt.Errorf("invalid interval format: %s (expected 'number unit')", intervalValue) + } + + // Parse the numeric value + value, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid interval value: %s", parts[0]) + } + + // Parse the unit and convert to nanoseconds + unit := strings.ToLower(parts[1]) + var multiplier int64 + + switch unit { + case "nanosecond", "nanoseconds", "ns": + multiplier = 1 + case "microsecond", "microseconds", "us": + multiplier = 1000 + case "millisecond", "milliseconds", "ms": + multiplier = 1000000 + case "second", "seconds", "s": + multiplier = 1000000000 + case "minute", "minutes", "m": + multiplier = 60 * 1000000000 + case "hour", "hours", "h": + multiplier = 60 * 60 * 1000000000 + case "day", "days", "d": + multiplier = 24 * 60 * 60 * 1000000000 + case "week", "weeks", "w": + multiplier = 7 * 24 * 60 * 60 * 1000000000 + default: + return 0, fmt.Errorf("unsupported interval unit: %s", unit) + } + + return value * multiplier, nil +} + +// convertValueForTimestampColumn converts string timestamp values to nanoseconds for system timestamp columns +func (e *SQLEngine) convertValueForTimestampColumn(columnName string, value interface{}, expr ExprNode) interface{} { + // Special handling for timestamp system columns + if columnName == SW_COLUMN_NAME_TIMESTAMP { + if _, ok := value.(string); ok { + if timeNanos := e.extractTimeValue(expr); timeNanos != 0 { + return timeNanos + } + } + } + return value +} + +// evaluateTimestampArithmetic performs arithmetic operations with timestamps and intervals +func (e *SQLEngine) evaluateTimestampArithmetic(left, right ExprNode, operator string) (*schema_pb.Value, error) { + // Handle timestamp arithmetic: NOW() - INTERVAL '1 hour' + // For timestamp arithmetic, we don't need the result context, so we pass an empty one + emptyResult := HybridScanResult{} + + leftValue, err := e.evaluateExpressionValue(left, emptyResult) + if err != nil { + return nil, fmt.Errorf("failed to evaluate left operand: %v", err) + } + + rightValue, err := e.evaluateExpressionValue(right, emptyResult) + if err != nil { + return nil, fmt.Errorf("failed to evaluate right operand: %v", err) + } + + // Convert left operand (should be timestamp) + var leftTimestamp int64 + if leftValue.Kind != nil { + switch leftKind := leftValue.Kind.(type) { + case *schema_pb.Value_Int64Value: + leftTimestamp = leftKind.Int64Value + case *schema_pb.Value_TimestampValue: + // Convert microseconds to nanoseconds + leftTimestamp = leftKind.TimestampValue.TimestampMicros * 1000 + case *schema_pb.Value_StringValue: + // Parse timestamp string + if ts, err := time.Parse(time.RFC3339, leftKind.StringValue); err == nil { + leftTimestamp = ts.UnixNano() + } else if ts, err := time.Parse("2006-01-02 15:04:05", leftKind.StringValue); err == nil { + leftTimestamp = ts.UnixNano() + } else { + return nil, fmt.Errorf("invalid timestamp format: %s", leftKind.StringValue) + } + default: + return nil, fmt.Errorf("left operand must be a timestamp, got: %T", leftKind) + } + } else { + return nil, fmt.Errorf("left operand value is nil") + } + + // Convert right operand (should be interval in nanoseconds) + var intervalNanos int64 + if rightValue.Kind != nil { + switch rightKind := rightValue.Kind.(type) { + case *schema_pb.Value_Int64Value: + intervalNanos = rightKind.Int64Value + default: + return nil, fmt.Errorf("right operand must be an interval duration") + } + } else { + return nil, fmt.Errorf("right operand value is nil") + } + + // Perform arithmetic + var resultTimestamp int64 + switch operator { + case "+": + resultTimestamp = leftTimestamp + intervalNanos + case "-": + resultTimestamp = leftTimestamp - intervalNanos + default: + return nil, fmt.Errorf("unsupported timestamp arithmetic operator: %s", operator) + } + + // Return as timestamp + return &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: resultTimestamp}, + }, nil +} + +// evaluateColumnNameAsFunction handles function calls that were incorrectly parsed as column names +func (e *SQLEngine) evaluateColumnNameAsFunction(columnName string, result HybridScanResult) (*schema_pb.Value, error) { + // Simple parser for basic function calls like TRIM('hello world') + // Extract function name and argument + parenPos := strings.Index(columnName, "(") + if parenPos == -1 { + return nil, fmt.Errorf("invalid function format: %s", columnName) + } + + funcName := strings.ToUpper(strings.TrimSpace(columnName[:parenPos])) + argsString := columnName[parenPos+1:] + + // Find the closing parenthesis (handling nested quotes) + closeParen := strings.LastIndex(argsString, ")") + if closeParen == -1 { + return nil, fmt.Errorf("missing closing parenthesis in function: %s", columnName) + } + + argString := strings.TrimSpace(argsString[:closeParen]) + + // Parse the argument - for now handle simple cases + var argValue *schema_pb.Value + var err error + + if strings.HasPrefix(argString, "'") && strings.HasSuffix(argString, "'") { + // String literal argument + literal := strings.Trim(argString, "'") + argValue = &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: literal}} + } else if strings.Contains(argString, "(") && strings.Contains(argString, ")") { + // Nested function call - recursively evaluate it + argValue, err = e.evaluateColumnNameAsFunction(argString, result) + if err != nil { + return nil, fmt.Errorf("error evaluating nested function argument: %v", err) + } + } else { + // Column name or other expression + return nil, fmt.Errorf("unsupported argument type in function: %s", argString) + } + + if argValue == nil { + return nil, nil + } + + // Call the appropriate function + switch funcName { + case FuncUPPER: + return e.Upper(argValue) + case FuncLOWER: + return e.Lower(argValue) + case FuncLENGTH: + return e.Length(argValue) + case FuncTRIM, FuncBTRIM: // CockroachDB converts TRIM to BTRIM + return e.Trim(argValue) + case FuncLTRIM: + return e.LTrim(argValue) + case FuncRTRIM: + return e.RTrim(argValue) + // PostgreSQL-only: Use EXTRACT(YEAR FROM date) instead of YEAR(date) + default: + return nil, fmt.Errorf("unsupported function in column name: %s", funcName) + } +} + +// parseColumnLevelCalculation detects and parses arithmetic expressions that contain function calls +// This handles cases where the SQL parser incorrectly treats "LENGTH('hello') + 10" as a single ColName +func (e *SQLEngine) parseColumnLevelCalculation(expression string) *ArithmeticExpr { + // First check if this looks like an arithmetic expression + if !e.containsArithmeticOperator(expression) { + return nil + } + + // Build AST for the arithmetic expression + return e.buildArithmeticAST(expression) +} + +// containsArithmeticOperator checks if the expression contains arithmetic operators outside of function calls +func (e *SQLEngine) containsArithmeticOperator(expr string) bool { + operators := []string{"+", "-", "*", "/", "%", "||"} + + parenLevel := 0 + quoteLevel := false + + for i, char := range expr { + switch char { + case '(': + if !quoteLevel { + parenLevel++ + } + case ')': + if !quoteLevel { + parenLevel-- + } + case '\'': + quoteLevel = !quoteLevel + default: + // Only check for operators outside of parentheses and quotes + if parenLevel == 0 && !quoteLevel { + for _, op := range operators { + if strings.HasPrefix(expr[i:], op) { + return true + } + } + } + } + } + + return false +} + +// buildArithmeticAST builds an Abstract Syntax Tree for arithmetic expressions containing function calls +func (e *SQLEngine) buildArithmeticAST(expr string) *ArithmeticExpr { + // Remove leading/trailing spaces + expr = strings.TrimSpace(expr) + + // Find the main operator (outside of parentheses) + operators := []string{"||", "+", "-", "*", "/", "%"} // Order matters for precedence + + for _, op := range operators { + opPos := e.findMainOperator(expr, op) + if opPos != -1 { + leftExpr := strings.TrimSpace(expr[:opPos]) + rightExpr := strings.TrimSpace(expr[opPos+len(op):]) + + if leftExpr != "" && rightExpr != "" { + return &ArithmeticExpr{ + Left: e.parseASTExpressionNode(leftExpr), + Right: e.parseASTExpressionNode(rightExpr), + Operator: op, + } + } + } + } + + return nil +} + +// findMainOperator finds the position of an operator that's not inside parentheses or quotes +func (e *SQLEngine) findMainOperator(expr string, operator string) int { + parenLevel := 0 + quoteLevel := false + + for i := 0; i <= len(expr)-len(operator); i++ { + char := expr[i] + + switch char { + case '(': + if !quoteLevel { + parenLevel++ + } + case ')': + if !quoteLevel { + parenLevel-- + } + case '\'': + quoteLevel = !quoteLevel + default: + // Check for operator only at top level (not inside parentheses or quotes) + if parenLevel == 0 && !quoteLevel && strings.HasPrefix(expr[i:], operator) { + return i + } + } + } + + return -1 +} + +// parseASTExpressionNode parses an expression into the appropriate ExprNode type +func (e *SQLEngine) parseASTExpressionNode(expr string) ExprNode { + expr = strings.TrimSpace(expr) + + // Check if it's a function call (contains parentheses) + if strings.Contains(expr, "(") && strings.Contains(expr, ")") { + // This should be parsed as a function expression, but since our SQL parser + // has limitations, we'll create a special ColName that represents the function + return &ColName{Name: stringValue(expr)} + } + + // Check if it's a numeric literal + if _, err := strconv.ParseInt(expr, 10, 64); err == nil { + return &SQLVal{Type: IntVal, Val: []byte(expr)} + } + + if _, err := strconv.ParseFloat(expr, 64); err == nil { + return &SQLVal{Type: FloatVal, Val: []byte(expr)} + } + + // Check if it's a string literal + if strings.HasPrefix(expr, "'") && strings.HasSuffix(expr, "'") { + return &SQLVal{Type: StrVal, Val: []byte(strings.Trim(expr, "'"))} + } + + // Check for nested arithmetic expressions + if nestedArithmetic := e.buildArithmeticAST(expr); nestedArithmetic != nil { + return nestedArithmetic + } + + // Default to column name + return &ColName{Name: stringValue(expr)} +} |
