diff options
Diffstat (limited to 'weed/query/engine/engine.go')
| -rw-r--r-- | weed/query/engine/engine.go | 327 |
1 files changed, 241 insertions, 86 deletions
diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index ffed03f35..e00fd78ca 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -1513,47 +1513,49 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *Se var result *QueryResult var err error - if hasAggregations { - // Extract table information for aggregation execution - var database, tableName string - if len(stmt.From) == 1 { - if table, ok := stmt.From[0].(*AliasedTableExpr); ok { - if tableExpr, ok := table.Expr.(TableName); ok { - tableName = tableExpr.Name.String() - if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { - database = tableExpr.Qualifier.String() - } + // Extract table information for execution (needed for both aggregation and regular queries) + var database, tableName string + if len(stmt.From) == 1 { + if table, ok := stmt.From[0].(*AliasedTableExpr); ok { + if tableExpr, ok := table.Expr.(TableName); ok { + tableName = tableExpr.Name.String() + if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { + database = tableExpr.Qualifier.String() } } } + } - // Use current database if not specified + // Use current database if not specified + if database == "" { + database = e.catalog.currentDatabase if database == "" { - database = e.catalog.currentDatabase - if database == "" { - database = "default" - } - } - - // Create hybrid scanner for aggregation execution - var filerClient filer_pb.FilerClient - if e.catalog.brokerClient != nil { - filerClient, err = e.catalog.brokerClient.GetFilerClient() - if err != nil { - return &QueryResult{Error: err}, err - } + database = "default" } + } - hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e) + // CRITICAL FIX: Always use HybridMessageScanner for ALL queries to read both flushed and unflushed data + // Create hybrid scanner for both aggregation and regular SELECT queries + var filerClient filer_pb.FilerClient + if e.catalog.brokerClient != nil { + filerClient, err = e.catalog.brokerClient.GetFilerClient() if err != nil { return &QueryResult{Error: err}, err } + } + hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e) + if err != nil { + return &QueryResult{Error: err}, err + } + + if hasAggregations { // Execute aggregation query with plan tracking result, err = e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, plan) } else { - // Regular SELECT query with plan tracking - result, err = e.executeSelectStatementWithBrokerStats(ctx, stmt, plan) + // CRITICAL FIX: Use HybridMessageScanner for regular SELECT queries too + // This ensures both flushed and unflushed data are read + result, err = e.executeRegularSelectWithHybridScanner(ctx, hybridScanner, stmt, plan) } if err == nil && result != nil { @@ -1981,6 +1983,198 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *SelectStat return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil } +// executeRegularSelectWithHybridScanner handles regular SELECT queries using HybridMessageScanner +// This ensures both flushed and unflushed data are read, fixing the SQL empty results issue +func (e *SQLEngine) executeRegularSelectWithHybridScanner(ctx context.Context, hybridScanner *HybridMessageScanner, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { + // Parse SELECT expressions to determine columns and detect aggregations + var columns []string + var aggregations []AggregationSpec + var hasAggregations bool + selectAll := false + baseColumnsSet := make(map[string]bool) // Track base columns needed for expressions + + for _, selectExpr := range stmt.SelectExprs { + switch expr := selectExpr.(type) { + case *StarExpr: + selectAll = true + case *AliasedExpr: + switch col := expr.Expr.(type) { + case *ColName: + columnName := col.Name.String() + columns = append(columns, columnName) + baseColumnsSet[columnName] = true + case *FuncExpr: + funcName := strings.ToLower(col.Name.String()) + if e.isAggregationFunction(funcName) { + // Handle aggregation functions + aggSpec, err := e.parseAggregationFunction(col, expr) + if err != nil { + return &QueryResult{Error: err}, err + } + aggregations = append(aggregations, *aggSpec) + hasAggregations = true + } else if e.isStringFunction(funcName) { + // Handle string functions like UPPER, LENGTH, etc. + columns = append(columns, e.getStringFunctionAlias(col)) + // Extract base columns needed for this string function + e.extractBaseColumnsFromFunction(col, baseColumnsSet) + } else if e.isDateTimeFunction(funcName) { + // Handle datetime functions like CURRENT_DATE, NOW, EXTRACT, DATE_TRUNC + columns = append(columns, e.getDateTimeFunctionAlias(col)) + // Extract base columns needed for this datetime function + e.extractBaseColumnsFromFunction(col, baseColumnsSet) + } else { + return &QueryResult{Error: fmt.Errorf("unsupported function: %s", funcName)}, fmt.Errorf("unsupported function: %s", funcName) + } + default: + err := fmt.Errorf("unsupported SELECT expression: %T", col) + return &QueryResult{Error: err}, err + } + default: + err := fmt.Errorf("unsupported SELECT expression: %T", expr) + return &QueryResult{Error: err}, err + } + } + + // If we have aggregations, delegate to aggregation handler + if hasAggregations { + return e.executeAggregationQuery(ctx, hybridScanner, aggregations, stmt) + } + + // Parse WHERE clause for predicate pushdown + var predicate func(*schema_pb.RecordValue) bool + var err error + if stmt.Where != nil { + predicate, err = e.buildPredicateWithContext(stmt.Where.Expr, stmt.SelectExprs) + if err != nil { + return &QueryResult{Error: err}, err + } + } + + // Parse LIMIT and OFFSET clauses + // Use -1 to distinguish "no LIMIT" from "LIMIT 0" + limit := -1 + offset := 0 + if stmt.Limit != nil && stmt.Limit.Rowcount != nil { + switch limitExpr := stmt.Limit.Rowcount.(type) { + case *SQLVal: + if limitExpr.Type == IntVal { + var parseErr error + limit64, parseErr := strconv.ParseInt(string(limitExpr.Val), 10, 64) + if parseErr != nil { + return &QueryResult{Error: parseErr}, parseErr + } + if limit64 > math.MaxInt32 || limit64 < 0 { + return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64) + } + limit = int(limit64) + } + } + } + + // Parse OFFSET clause if present + if stmt.Limit != nil && stmt.Limit.Offset != nil { + switch offsetExpr := stmt.Limit.Offset.(type) { + case *SQLVal: + if offsetExpr.Type == IntVal { + var parseErr error + offset64, parseErr := strconv.ParseInt(string(offsetExpr.Val), 10, 64) + if parseErr != nil { + return &QueryResult{Error: parseErr}, parseErr + } + if offset64 > math.MaxInt32 || offset64 < 0 { + return &QueryResult{Error: fmt.Errorf("OFFSET value %d is out of valid range", offset64)}, fmt.Errorf("OFFSET value %d is out of valid range", offset64) + } + offset = int(offset64) + } + } + } + + // Build hybrid scan options + // Extract time filters from WHERE clause to optimize scanning + startTimeNs, stopTimeNs := int64(0), int64(0) + if stmt.Where != nil { + startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) + } + + hybridScanOptions := HybridScanOptions{ + StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons + StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons + Limit: limit, + Offset: offset, + Predicate: predicate, + } + + if !selectAll { + // Convert baseColumnsSet to slice for hybrid scan options + baseColumns := make([]string, 0, len(baseColumnsSet)) + for columnName := range baseColumnsSet { + baseColumns = append(baseColumns, columnName) + } + // Use base columns (not expression aliases) for data retrieval + if len(baseColumns) > 0 { + hybridScanOptions.Columns = baseColumns + } else { + // If no base columns found (shouldn't happen), use original columns + hybridScanOptions.Columns = columns + } + } + + // Execute the hybrid scan (both flushed and unflushed data) + var results []HybridScanResult + if plan != nil { + // EXPLAIN mode - capture broker buffer stats + var stats *HybridScanStats + results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions) + if err != nil { + return &QueryResult{Error: err}, err + } + + // Populate plan with broker buffer information + if stats != nil { + plan.BrokerBufferQueried = stats.BrokerBufferQueried + plan.BrokerBufferMessages = stats.BrokerBufferMessages + plan.BufferStartIndex = stats.BufferStartIndex + + // Add broker_buffer to data sources if buffer was queried + if stats.BrokerBufferQueried { + // Check if broker_buffer is already in data sources + hasBrokerBuffer := false + for _, source := range plan.DataSources { + if source == "broker_buffer" { + hasBrokerBuffer = true + break + } + } + if !hasBrokerBuffer { + plan.DataSources = append(plan.DataSources, "broker_buffer") + } + } + } + } else { + // Normal mode - just get results + results, err = hybridScanner.Scan(ctx, hybridScanOptions) + if err != nil { + return &QueryResult{Error: err}, err + } + } + + // Convert to SQL result format + if selectAll { + if len(columns) > 0 { + // SELECT *, specific_columns - include both auto-discovered and explicit columns + return hybridScanner.ConvertToSQLResultWithMixedColumns(results, columns), nil + } else { + // SELECT * only - let converter determine all columns (excludes system columns) + columns = nil + return hybridScanner.ConvertToSQLResult(results, columns), nil + } + } + + // Handle custom column expressions (including arithmetic) + return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil +} + // executeSelectStatementWithBrokerStats handles SELECT queries with broker buffer statistics capture // This is used by EXPLAIN queries to capture complete data source information including broker memory func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { @@ -2237,10 +2431,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s plan.Details[PlanDetailStartTimeNs] = startTimeNs plan.Details[PlanDetailStopTimeNs] = stopTimeNs - if isDebugMode(ctx) { - fmt.Printf("Debug: Time filters extracted - startTimeNs=%d stopTimeNs=%d\n", startTimeNs, stopTimeNs) - } - // Collect actual file information for each partition var parquetFiles []string var liveLogFiles []string @@ -2261,9 +2451,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s columnPrunedCount := beforeColumnPrune - len(filteredStats) if columnPrunedCount > 0 { - if isDebugMode(ctx) { - fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath) - } // Track column statistics optimization if !contains(plan.OptimizationsUsed, "column_statistics_pruning") { plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning") @@ -2275,9 +2462,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s } } else { parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err)) - if isDebugMode(ctx) { - fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err) - } } // Merge accurate parquet sources from metadata @@ -2298,9 +2482,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s } } else { liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err)) - if isDebugMode(ctx) { - fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err) - } } } @@ -2559,7 +2740,6 @@ func pruneParquetFilesByTime(ctx context.Context, parquetStats []*ParquetFileSta return parquetStats } - debugEnabled := ctx != nil && isDebugMode(ctx) qStart := startTimeNs qStop := stopTimeNs if qStop == 0 { @@ -2568,21 +2748,10 @@ func pruneParquetFilesByTime(ctx context.Context, parquetStats []*ParquetFileSta n := 0 for _, fs := range parquetStats { - if debugEnabled { - fmt.Printf("Debug: Checking parquet file %s for pruning\n", fs.FileName) - } if minNs, maxNs, ok := hybridScanner.getTimestampRangeFromStats(fs); ok { - if debugEnabled { - fmt.Printf("Debug: Prune check parquet %s min=%d max=%d qStart=%d qStop=%d\n", fs.FileName, minNs, maxNs, qStart, qStop) - } if qStop < minNs || (qStart != 0 && qStart > maxNs) { - if debugEnabled { - fmt.Printf("Debug: Skipping parquet file %s due to no time overlap\n", fs.FileName) - } continue } - } else if debugEnabled { - fmt.Printf("Debug: No stats range available for parquet %s, cannot prune\n", fs.FileName) } parquetStats[n] = fs n++ @@ -2596,13 +2765,9 @@ func (e *SQLEngine) pruneParquetFilesByColumnStats(ctx context.Context, parquetS return parquetStats } - debugEnabled := ctx != nil && isDebugMode(ctx) n := 0 for _, fs := range parquetStats { if e.canSkipParquetFile(ctx, fs, whereExpr) { - if debugEnabled { - fmt.Printf("Debug: Skipping parquet file %s due to column statistics pruning\n", fs.FileName) - } continue } parquetStats[n] = fs @@ -2726,7 +2891,6 @@ func (e *SQLEngine) flipOperator(op string) string { // populatePlanFileDetails populates execution plan with detailed file information for partitions // Includes column statistics pruning optimization when WHERE clause is provided func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, partitions []string, stmt *SelectStatement) { - debugEnabled := ctx != nil && isDebugMode(ctx) // Collect actual file information for each partition var parquetFiles []string var liveLogFiles []string @@ -2750,9 +2914,6 @@ func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExec columnPrunedCount := beforeColumnPrune - len(filteredStats) if columnPrunedCount > 0 { - if debugEnabled { - fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath) - } // Track column statistics optimization if !contains(plan.OptimizationsUsed, "column_statistics_pruning") { plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning") @@ -2765,9 +2926,6 @@ func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExec } } else { parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err)) - if debugEnabled { - fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err) - } } // Merge accurate parquet sources from metadata @@ -2788,9 +2946,6 @@ func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExec } } else { liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err)) - if debugEnabled { - fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err) - } } } @@ -3848,7 +4003,7 @@ func (e *SQLEngine) createTable(ctx context.Context, stmt *DDLStatement) (*Query // Create the topic via broker using configurable partition count partitionCount := e.catalog.GetDefaultPartitionCount() - err := e.catalog.brokerClient.ConfigureTopic(ctx, database, tableName, partitionCount, recordType) + err := e.catalog.brokerClient.ConfigureTopic(ctx, database, tableName, partitionCount, recordType, nil) if err != nil { return &QueryResult{Error: err}, err } @@ -4283,29 +4438,29 @@ func (e *SQLEngine) eachLogEntryInFile(filerClient filer_pb.FilerClient, filePat // convertLogEntryToRecordValue helper method (reuse existing logic) func (e *SQLEngine) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { - // Parse the log entry data as Protocol Buffer (not JSON!) + // Try to unmarshal as RecordValue first (schematized data) recordValue := &schema_pb.RecordValue{} - if err := proto.Unmarshal(logEntry.Data, recordValue); err != nil { - return nil, "", fmt.Errorf("failed to unmarshal log entry protobuf: %v", err) - } + err := proto.Unmarshal(logEntry.Data, recordValue) + if err == nil { + // Successfully unmarshaled as RecordValue (valid protobuf) + // Initialize Fields map if nil + if recordValue.Fields == nil { + recordValue.Fields = make(map[string]*schema_pb.Value) + } - // Ensure Fields map exists - if recordValue.Fields == nil { - recordValue.Fields = make(map[string]*schema_pb.Value) - } + // Add system columns from LogEntry + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, + } + recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + } - // Add system columns - recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ - Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, - } - recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ - Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + return recordValue, "live_log", nil } - // User data fields are already present in the protobuf-deserialized recordValue - // No additional processing needed since proto.Unmarshal already populated the Fields map - - return recordValue, "live_log", nil + // Failed to unmarshal as RecordValue - invalid protobuf data + return nil, "", fmt.Errorf("failed to unmarshal log entry protobuf: %w", err) } // extractTimestampFromFilename extracts timestamp from parquet filename @@ -4782,7 +4937,7 @@ func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) // discoverAndRegisterTopic attempts to discover an existing topic and register it in the SQL catalog func (e *SQLEngine) discoverAndRegisterTopic(ctx context.Context, database, tableName string) error { // First, check if topic exists by trying to get its schema from the broker/filer - recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName) + recordType, _, _, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName) if err != nil { return fmt.Errorf("topic %s.%s not found or no schema available: %v", database, tableName, err) } |
