diff options
Diffstat (limited to 'weed/query/engine/aggregations.go')
| -rw-r--r-- | weed/query/engine/aggregations.go | 935 |
1 files changed, 935 insertions, 0 deletions
diff --git a/weed/query/engine/aggregations.go b/weed/query/engine/aggregations.go new file mode 100644 index 000000000..623e489dd --- /dev/null +++ b/weed/query/engine/aggregations.go @@ -0,0 +1,935 @@ +package engine + +import ( + "context" + "fmt" + "math" + "strconv" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +// AggregationSpec defines an aggregation function to be computed +type AggregationSpec struct { + Function string // COUNT, SUM, AVG, MIN, MAX + Column string // Column name, or "*" for COUNT(*) + Alias string // Optional alias for the result column + Distinct bool // Support for DISTINCT keyword +} + +// AggregationResult holds the computed result of an aggregation +type AggregationResult struct { + Count int64 + Sum float64 + Min interface{} + Max interface{} +} + +// AggregationStrategy represents the strategy for executing aggregations +type AggregationStrategy struct { + CanUseFastPath bool + Reason string + UnsupportedSpecs []AggregationSpec +} + +// TopicDataSources represents the data sources available for a topic +type TopicDataSources struct { + ParquetFiles map[string][]*ParquetFileStats // partitionPath -> parquet file stats + ParquetRowCount int64 + LiveLogRowCount int64 + LiveLogFilesCount int // Total count of live log files across all partitions + PartitionsCount int + BrokerUnflushedCount int64 +} + +// FastPathOptimizer handles fast path aggregation optimization decisions +type FastPathOptimizer struct { + engine *SQLEngine +} + +// NewFastPathOptimizer creates a new fast path optimizer +func NewFastPathOptimizer(engine *SQLEngine) *FastPathOptimizer { + return &FastPathOptimizer{engine: engine} +} + +// DetermineStrategy analyzes aggregations and determines if fast path can be used +func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) AggregationStrategy { + strategy := AggregationStrategy{ + CanUseFastPath: true, + Reason: "all_aggregations_supported", + UnsupportedSpecs: []AggregationSpec{}, + } + + for _, spec := range aggregations { + if !opt.engine.canUseParquetStatsForAggregation(spec) { + strategy.CanUseFastPath = false + strategy.Reason = "unsupported_aggregation_functions" + strategy.UnsupportedSpecs = append(strategy.UnsupportedSpecs, spec) + } + } + + return strategy +} + +// CollectDataSources gathers information about available data sources for a topic +func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) { + dataSources := &TopicDataSources{ + ParquetFiles: make(map[string][]*ParquetFileStats), + ParquetRowCount: 0, + LiveLogRowCount: 0, + LiveLogFilesCount: 0, + PartitionsCount: 0, + } + + if isDebugMode(ctx) { + fmt.Printf("Collecting data sources for: %s/%s\n", hybridScanner.topic.Namespace, hybridScanner.topic.Name) + } + + // Discover partitions for the topic + partitionPaths, err := opt.engine.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name) + if err != nil { + if isDebugMode(ctx) { + fmt.Printf("ERROR: Partition discovery failed: %v\n", err) + } + return dataSources, DataSourceError{ + Source: "partition_discovery", + Cause: err, + } + } + + // DEBUG: Log discovered partitions + if isDebugMode(ctx) { + fmt.Printf("Discovered %d partitions: %v\n", len(partitionPaths), partitionPaths) + } + + // Collect stats from each partition + // Note: discoverTopicPartitions always returns absolute paths starting with "/topics/" + for _, partitionPath := range partitionPaths { + if isDebugMode(ctx) { + fmt.Printf("\nProcessing partition: %s\n", partitionPath) + } + + // Read parquet file statistics + parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath) + if err != nil { + if isDebugMode(ctx) { + fmt.Printf(" ERROR: Failed to read parquet statistics: %v\n", err) + } + } else if len(parquetStats) == 0 { + if isDebugMode(ctx) { + fmt.Printf(" No parquet files found in partition\n") + } + } else { + dataSources.ParquetFiles[partitionPath] = parquetStats + partitionParquetRows := int64(0) + for _, stat := range parquetStats { + partitionParquetRows += stat.RowCount + dataSources.ParquetRowCount += stat.RowCount + } + if isDebugMode(ctx) { + fmt.Printf(" Found %d parquet files with %d total rows\n", len(parquetStats), partitionParquetRows) + } + } + + // Count live log files (excluding those converted to parquet) + parquetSources := opt.engine.extractParquetSourceFiles(dataSources.ParquetFiles[partitionPath]) + liveLogCount, liveLogErr := opt.engine.countLiveLogRowsExcludingParquetSources(ctx, partitionPath, parquetSources) + if liveLogErr != nil { + if isDebugMode(ctx) { + fmt.Printf(" ERROR: Failed to count live log rows: %v\n", liveLogErr) + } + } else { + dataSources.LiveLogRowCount += liveLogCount + if isDebugMode(ctx) { + fmt.Printf(" Found %d live log rows (excluding %d parquet sources)\n", liveLogCount, len(parquetSources)) + } + } + + // Count live log files for partition with proper range values + // Extract partition name from absolute path (e.g., "0000-2520" from "/topics/.../v2025.../0000-2520") + partitionName := partitionPath[strings.LastIndex(partitionPath, "/")+1:] + partitionParts := strings.Split(partitionName, "-") + if len(partitionParts) == 2 { + rangeStart, err1 := strconv.Atoi(partitionParts[0]) + rangeStop, err2 := strconv.Atoi(partitionParts[1]) + if err1 == nil && err2 == nil { + partition := topic.Partition{ + RangeStart: int32(rangeStart), + RangeStop: int32(rangeStop), + } + liveLogFileCount, err := hybridScanner.countLiveLogFiles(partition) + if err == nil { + dataSources.LiveLogFilesCount += liveLogFileCount + } + + // Count broker unflushed messages for this partition + if hybridScanner.brokerClient != nil { + entries, err := hybridScanner.brokerClient.GetUnflushedMessages(ctx, hybridScanner.topic.Namespace, hybridScanner.topic.Name, partition, 0) + if err == nil { + dataSources.BrokerUnflushedCount += int64(len(entries)) + if isDebugMode(ctx) { + fmt.Printf(" Found %d unflushed broker messages\n", len(entries)) + } + } else if isDebugMode(ctx) { + fmt.Printf(" ERROR: Failed to get unflushed broker messages: %v\n", err) + } + } + } + } + } + + dataSources.PartitionsCount = len(partitionPaths) + + if isDebugMode(ctx) { + fmt.Printf("Data sources collected: %d partitions, %d parquet rows, %d live log rows, %d broker buffer rows\n", + dataSources.PartitionsCount, dataSources.ParquetRowCount, dataSources.LiveLogRowCount, dataSources.BrokerUnflushedCount) + } + + return dataSources, nil +} + +// AggregationComputer handles the computation of aggregations using fast path +type AggregationComputer struct { + engine *SQLEngine +} + +// NewAggregationComputer creates a new aggregation computer +func NewAggregationComputer(engine *SQLEngine) *AggregationComputer { + return &AggregationComputer{engine: engine} +} + +// ComputeFastPathAggregations computes aggregations using parquet statistics and live log data +func (comp *AggregationComputer) ComputeFastPathAggregations( + ctx context.Context, + aggregations []AggregationSpec, + dataSources *TopicDataSources, + partitions []string, +) ([]AggregationResult, error) { + + aggResults := make([]AggregationResult, len(aggregations)) + + for i, spec := range aggregations { + switch spec.Function { + case FuncCOUNT: + if spec.Column == "*" { + aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount + } else { + // For specific columns, we might need to account for NULLs in the future + aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount + } + + case FuncMIN: + globalMin, err := comp.computeGlobalMin(spec, dataSources, partitions) + if err != nil { + return nil, AggregationError{ + Operation: spec.Function, + Column: spec.Column, + Cause: err, + } + } + aggResults[i].Min = globalMin + + case FuncMAX: + globalMax, err := comp.computeGlobalMax(spec, dataSources, partitions) + if err != nil { + return nil, AggregationError{ + Operation: spec.Function, + Column: spec.Column, + Cause: err, + } + } + aggResults[i].Max = globalMax + + default: + return nil, OptimizationError{ + Strategy: "fast_path_aggregation", + Reason: fmt.Sprintf("unsupported aggregation function: %s", spec.Function), + } + } + } + + return aggResults, nil +} + +// computeGlobalMin computes the global minimum value across all data sources +func (comp *AggregationComputer) computeGlobalMin(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) { + var globalMin interface{} + var globalMinValue *schema_pb.Value + hasParquetStats := false + + // Step 1: Get minimum from parquet statistics + for _, fileStats := range dataSources.ParquetFiles { + for _, fileStat := range fileStats { + // Try case-insensitive column lookup + var colStats *ParquetColumnStats + var found bool + + // First try exact match + if stats, exists := fileStat.ColumnStats[spec.Column]; exists { + colStats = stats + found = true + } else { + // Try case-insensitive lookup + for colName, stats := range fileStat.ColumnStats { + if strings.EqualFold(colName, spec.Column) { + colStats = stats + found = true + break + } + } + } + + if found && colStats != nil && colStats.MinValue != nil { + if globalMinValue == nil || comp.engine.compareValues(colStats.MinValue, globalMinValue) < 0 { + globalMinValue = colStats.MinValue + extractedValue := comp.engine.extractRawValue(colStats.MinValue) + if extractedValue != nil { + globalMin = extractedValue + hasParquetStats = true + } + } + } + } + } + + // Step 2: Get minimum from live log data (only if no live logs or if we need to compare) + if dataSources.LiveLogRowCount > 0 { + for _, partition := range partitions { + partitionParquetSources := make(map[string]bool) + if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists { + partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats) + } + + liveLogMin, _, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) + if err != nil { + continue // Skip partitions with errors + } + + if liveLogMin != nil { + if globalMin == nil { + globalMin = liveLogMin + } else { + liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMin) + if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMinValue) < 0 { + globalMin = liveLogMin + globalMinValue = liveLogSchemaValue + } + } + } + } + } + + // Step 3: Handle system columns if no regular data found + if globalMin == nil && !hasParquetStats { + globalMin = comp.engine.getSystemColumnGlobalMin(spec.Column, dataSources.ParquetFiles) + } + + return globalMin, nil +} + +// computeGlobalMax computes the global maximum value across all data sources +func (comp *AggregationComputer) computeGlobalMax(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) { + var globalMax interface{} + var globalMaxValue *schema_pb.Value + hasParquetStats := false + + // Step 1: Get maximum from parquet statistics + for _, fileStats := range dataSources.ParquetFiles { + for _, fileStat := range fileStats { + // Try case-insensitive column lookup + var colStats *ParquetColumnStats + var found bool + + // First try exact match + if stats, exists := fileStat.ColumnStats[spec.Column]; exists { + colStats = stats + found = true + } else { + // Try case-insensitive lookup + for colName, stats := range fileStat.ColumnStats { + if strings.EqualFold(colName, spec.Column) { + colStats = stats + found = true + break + } + } + } + + if found && colStats != nil && colStats.MaxValue != nil { + if globalMaxValue == nil || comp.engine.compareValues(colStats.MaxValue, globalMaxValue) > 0 { + globalMaxValue = colStats.MaxValue + extractedValue := comp.engine.extractRawValue(colStats.MaxValue) + if extractedValue != nil { + globalMax = extractedValue + hasParquetStats = true + } + } + } + } + } + + // Step 2: Get maximum from live log data (only if live logs exist) + if dataSources.LiveLogRowCount > 0 { + for _, partition := range partitions { + partitionParquetSources := make(map[string]bool) + if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists { + partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats) + } + + _, liveLogMax, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources) + if err != nil { + continue // Skip partitions with errors + } + + if liveLogMax != nil { + if globalMax == nil { + globalMax = liveLogMax + } else { + liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMax) + if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMaxValue) > 0 { + globalMax = liveLogMax + globalMaxValue = liveLogSchemaValue + } + } + } + } + } + + // Step 3: Handle system columns if no regular data found + if globalMax == nil && !hasParquetStats { + globalMax = comp.engine.getSystemColumnGlobalMax(spec.Column, dataSources.ParquetFiles) + } + + return globalMax, nil +} + +// executeAggregationQuery handles SELECT queries with aggregation functions +func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *SelectStatement) (*QueryResult, error) { + return e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, nil) +} + +// executeAggregationQueryWithPlan handles SELECT queries with aggregation functions and populates execution plan +func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { + // Parse LIMIT and OFFSET for aggregation results (do this first) + // Use -1 to distinguish "no LIMIT" from "LIMIT 0" + limit := -1 + offset := 0 + if stmt.Limit != nil && stmt.Limit.Rowcount != nil { + if limitExpr, ok := stmt.Limit.Rowcount.(*SQLVal); ok && limitExpr.Type == IntVal { + if limit64, err := strconv.ParseInt(string(limitExpr.Val), 10, 64); err == nil { + if limit64 > int64(math.MaxInt) || limit64 < 0 { + return nil, fmt.Errorf("LIMIT value %d is out of range", limit64) + } + // Safe conversion after bounds check + limit = int(limit64) + } + } + } + if stmt.Limit != nil && stmt.Limit.Offset != nil { + if offsetExpr, ok := stmt.Limit.Offset.(*SQLVal); ok && offsetExpr.Type == IntVal { + if offset64, err := strconv.ParseInt(string(offsetExpr.Val), 10, 64); err == nil { + if offset64 > int64(math.MaxInt) || offset64 < 0 { + return nil, fmt.Errorf("OFFSET value %d is out of range", offset64) + } + // Safe conversion after bounds check + offset = int(offset64) + } + } + } + + // Parse WHERE clause for filtering + var predicate func(*schema_pb.RecordValue) bool + var err error + if stmt.Where != nil { + predicate, err = e.buildPredicate(stmt.Where.Expr) + if err != nil { + return &QueryResult{Error: err}, err + } + } + + // Extract time filters for optimization + startTimeNs, stopTimeNs := int64(0), int64(0) + if stmt.Where != nil { + startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) + } + + // FAST PATH RE-ENABLED WITH DEBUG LOGGING: + // Added comprehensive debug logging to identify data counting issues + // This will help us understand why fast path was returning 0 when slow path returns 1803 + if stmt.Where == nil { + if isDebugMode(ctx) { + fmt.Printf("\nFast path optimization attempt...\n") + } + fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan) + if canOptimize { + if isDebugMode(ctx) { + fmt.Printf("Fast path optimization succeeded!\n") + } + return fastResult, nil + } else { + if isDebugMode(ctx) { + fmt.Printf("Fast path optimization failed, falling back to slow path\n") + } + } + } else { + if isDebugMode(ctx) { + fmt.Printf("Fast path not applicable due to WHERE clause\n") + } + } + + // SLOW PATH: Fall back to full table scan + if isDebugMode(ctx) { + fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n") + } + + // Extract columns needed for aggregations + columnsNeeded := make(map[string]bool) + for _, spec := range aggregations { + if spec.Column != "*" { + columnsNeeded[spec.Column] = true + } + } + + // Convert to slice + var scanColumns []string + if len(columnsNeeded) > 0 { + scanColumns = make([]string, 0, len(columnsNeeded)) + for col := range columnsNeeded { + scanColumns = append(scanColumns, col) + } + } + // If no specific columns needed (COUNT(*) only), don't specify columns (scan all) + + // Build scan options for full table scan (aggregations need all data during scanning) + hybridScanOptions := HybridScanOptions{ + StartTimeNs: startTimeNs, + StopTimeNs: stopTimeNs, + Limit: -1, // Use -1 to mean "no limit" - need all data for aggregation + Offset: 0, // No offset during scanning - OFFSET applies to final results + Predicate: predicate, + Columns: scanColumns, // Include columns needed for aggregation functions + } + + // DEBUG: Log scan options for aggregation + debugHybridScanOptions(ctx, hybridScanOptions, "AGGREGATION") + + // Execute the hybrid scan to get all matching records + var results []HybridScanResult + if plan != nil { + // EXPLAIN mode - capture broker buffer stats + var stats *HybridScanStats + results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions) + if err != nil { + return &QueryResult{Error: err}, err + } + + // Populate plan with broker buffer information + if stats != nil { + plan.BrokerBufferQueried = stats.BrokerBufferQueried + plan.BrokerBufferMessages = stats.BrokerBufferMessages + plan.BufferStartIndex = stats.BufferStartIndex + + // Add broker_buffer to data sources if buffer was queried + if stats.BrokerBufferQueried { + // Check if broker_buffer is already in data sources + hasBrokerBuffer := false + for _, source := range plan.DataSources { + if source == "broker_buffer" { + hasBrokerBuffer = true + break + } + } + if !hasBrokerBuffer { + plan.DataSources = append(plan.DataSources, "broker_buffer") + } + } + } + } else { + // Normal mode - just get results + results, err = hybridScanner.Scan(ctx, hybridScanOptions) + if err != nil { + return &QueryResult{Error: err}, err + } + } + + // DEBUG: Log scan results + if isDebugMode(ctx) { + fmt.Printf("AGGREGATION SCAN RESULTS: %d rows returned\n", len(results)) + } + + // Compute aggregations + aggResults := e.computeAggregations(results, aggregations) + + // Build result set + columns := make([]string, len(aggregations)) + row := make([]sqltypes.Value, len(aggregations)) + + for i, spec := range aggregations { + columns[i] = spec.Alias + row[i] = e.formatAggregationResult(spec, aggResults[i]) + } + + // Apply OFFSET and LIMIT to aggregation results + // Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows + rows := [][]sqltypes.Value{row} + if offset > 0 || limit >= 0 { + // Handle LIMIT 0 first + if limit == 0 { + rows = [][]sqltypes.Value{} + } else { + // Apply OFFSET first + if offset > 0 { + if offset >= len(rows) { + rows = [][]sqltypes.Value{} + } else { + rows = rows[offset:] + } + } + + // Apply LIMIT after OFFSET (only if limit > 0) + if limit > 0 && len(rows) > limit { + rows = rows[:limit] + } + } + } + + result := &QueryResult{ + Columns: columns, + Rows: rows, + } + + // Build execution tree for aggregation queries if plan is provided + if plan != nil { + plan.RootNode = e.buildExecutionTree(plan, stmt) + } + + return result, nil +} + +// tryFastParquetAggregation attempts to compute aggregations using hybrid approach: +// - Use parquet metadata for parquet files +// - Count live log files for live data +// - Combine both for accurate results per partition +// Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used +func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) { + return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil) +} + +// tryFastParquetAggregationWithPlan is the same as tryFastParquetAggregation but also populates execution plan if provided +func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan) (*QueryResult, bool) { + // Use the new modular components + optimizer := NewFastPathOptimizer(e) + computer := NewAggregationComputer(e) + + // Step 1: Determine strategy + strategy := optimizer.DetermineStrategy(aggregations) + if !strategy.CanUseFastPath { + return nil, false + } + + // Step 2: Collect data sources + dataSources, err := optimizer.CollectDataSources(ctx, hybridScanner) + if err != nil { + return nil, false + } + + // Build partition list for aggregation computer + // Note: discoverTopicPartitions always returns absolute paths + partitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name) + if err != nil { + return nil, false + } + + // Debug: Show the hybrid optimization results (only in explain mode) + if isDebugMode(ctx) && (dataSources.ParquetRowCount > 0 || dataSources.LiveLogRowCount > 0 || dataSources.BrokerUnflushedCount > 0) { + partitionsWithLiveLogs := 0 + if dataSources.LiveLogRowCount > 0 || dataSources.BrokerUnflushedCount > 0 { + partitionsWithLiveLogs = 1 // Simplified for now + } + fmt.Printf("Hybrid fast aggregation with deduplication: %d parquet rows + %d deduplicated live log rows + %d broker buffer rows from %d partitions\n", + dataSources.ParquetRowCount, dataSources.LiveLogRowCount, dataSources.BrokerUnflushedCount, partitionsWithLiveLogs) + } + + // Step 3: Compute aggregations using fast path + aggResults, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions) + if err != nil { + return nil, false + } + + // Step 3.5: Validate fast path results (safety check) + // For simple COUNT(*) queries, ensure we got a reasonable result + if len(aggregations) == 1 && aggregations[0].Function == FuncCOUNT && aggregations[0].Column == "*" { + totalRows := dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount + countResult := aggResults[0].Count + + if isDebugMode(ctx) { + fmt.Printf("Validating fast path: COUNT=%d, Sources=%d\n", countResult, totalRows) + } + + if totalRows == 0 && countResult > 0 { + // Fast path found data but data sources show 0 - this suggests a bug + if isDebugMode(ctx) { + fmt.Printf("Fast path validation failed: COUNT=%d but sources=0\n", countResult) + } + return nil, false + } + if totalRows > 0 && countResult == 0 { + // Data sources show data but COUNT is 0 - this also suggests a bug + if isDebugMode(ctx) { + fmt.Printf("Fast path validation failed: sources=%d but COUNT=0\n", totalRows) + } + return nil, false + } + if countResult != totalRows { + // Counts don't match - this suggests inconsistent logic + if isDebugMode(ctx) { + fmt.Printf("Fast path validation failed: COUNT=%d != sources=%d\n", countResult, totalRows) + } + return nil, false + } + if isDebugMode(ctx) { + fmt.Printf("Fast path validation passed: COUNT=%d\n", countResult) + } + } + + // Step 4: Populate execution plan if provided (for EXPLAIN queries) + if plan != nil { + strategy := optimizer.DetermineStrategy(aggregations) + builder := &ExecutionPlanBuilder{} + + // Create a minimal SELECT statement for the plan builder (avoid nil pointer) + stmt := &SelectStatement{} + + // Build aggregation plan with fast path strategy + aggPlan := builder.BuildAggregationPlan(stmt, aggregations, strategy, dataSources) + + // Copy relevant fields to the main plan + plan.ExecutionStrategy = aggPlan.ExecutionStrategy + plan.DataSources = aggPlan.DataSources + plan.OptimizationsUsed = aggPlan.OptimizationsUsed + plan.PartitionsScanned = aggPlan.PartitionsScanned + plan.ParquetFilesScanned = aggPlan.ParquetFilesScanned + plan.LiveLogFilesScanned = aggPlan.LiveLogFilesScanned + plan.TotalRowsProcessed = aggPlan.TotalRowsProcessed + plan.Aggregations = aggPlan.Aggregations + + // Indicate broker buffer participation for EXPLAIN tree rendering + if dataSources.BrokerUnflushedCount > 0 { + plan.BrokerBufferQueried = true + plan.BrokerBufferMessages = int(dataSources.BrokerUnflushedCount) + } + + // Merge details while preserving existing ones + if plan.Details == nil { + plan.Details = make(map[string]interface{}) + } + for key, value := range aggPlan.Details { + plan.Details[key] = value + } + + // Add file path information from the data collection + plan.Details["partition_paths"] = partitions + + // Collect actual file information for each partition + var parquetFiles []string + var liveLogFiles []string + parquetSources := make(map[string]bool) + + for _, partitionPath := range partitions { + // Get parquet files for this partition + if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { + for _, stats := range parquetStats { + parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) + } + } + + // Merge accurate parquet sources from metadata (preferred over filename fallback) + if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil { + for src := range sources { + parquetSources[src] = true + } + } + + // Get live log files for this partition + if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil { + for _, fileName := range liveFiles { + // Exclude live log files that have been converted to parquet (deduplicated) + if parquetSources[fileName] { + continue + } + liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) + } + } + } + + if len(parquetFiles) > 0 { + plan.Details["parquet_files"] = parquetFiles + } + if len(liveLogFiles) > 0 { + plan.Details["live_log_files"] = liveLogFiles + } + + // Update the dataSources.LiveLogFilesCount to match the actual files found + dataSources.LiveLogFilesCount = len(liveLogFiles) + + // Also update the plan's LiveLogFilesScanned to match + plan.LiveLogFilesScanned = len(liveLogFiles) + + // Ensure PartitionsScanned is set so Statistics section appears + if plan.PartitionsScanned == 0 && len(partitions) > 0 { + plan.PartitionsScanned = len(partitions) + } + + if isDebugMode(ctx) { + fmt.Printf("Populated execution plan with fast path strategy\n") + } + } + + // Step 5: Build final query result + columns := make([]string, len(aggregations)) + row := make([]sqltypes.Value, len(aggregations)) + + for i, spec := range aggregations { + columns[i] = spec.Alias + row[i] = e.formatAggregationResult(spec, aggResults[i]) + } + + result := &QueryResult{ + Columns: columns, + Rows: [][]sqltypes.Value{row}, + } + + return result, true +} + +// computeAggregations computes aggregation results from a full table scan +func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations []AggregationSpec) []AggregationResult { + aggResults := make([]AggregationResult, len(aggregations)) + + for i, spec := range aggregations { + switch spec.Function { + case FuncCOUNT: + if spec.Column == "*" { + aggResults[i].Count = int64(len(results)) + } else { + count := int64(0) + for _, result := range results { + if value := e.findColumnValue(result, spec.Column); value != nil && !e.isNullValue(value) { + count++ + } + } + aggResults[i].Count = count + } + + case FuncSUM: + sum := float64(0) + for _, result := range results { + if value := e.findColumnValue(result, spec.Column); value != nil { + if numValue := e.convertToNumber(value); numValue != nil { + sum += *numValue + } + } + } + aggResults[i].Sum = sum + + case FuncAVG: + sum := float64(0) + count := int64(0) + for _, result := range results { + if value := e.findColumnValue(result, spec.Column); value != nil { + if numValue := e.convertToNumber(value); numValue != nil { + sum += *numValue + count++ + } + } + } + if count > 0 { + aggResults[i].Sum = sum / float64(count) // Store average in Sum field + aggResults[i].Count = count + } + + case FuncMIN: + var min interface{} + var minValue *schema_pb.Value + for _, result := range results { + if value := e.findColumnValue(result, spec.Column); value != nil { + if minValue == nil || e.compareValues(value, minValue) < 0 { + minValue = value + min = e.extractRawValue(value) + } + } + } + aggResults[i].Min = min + + case FuncMAX: + var max interface{} + var maxValue *schema_pb.Value + for _, result := range results { + if value := e.findColumnValue(result, spec.Column); value != nil { + if maxValue == nil || e.compareValues(value, maxValue) > 0 { + maxValue = value + max = e.extractRawValue(value) + } + } + } + aggResults[i].Max = max + } + } + + return aggResults +} + +// canUseParquetStatsForAggregation determines if an aggregation can be optimized with parquet stats +func (e *SQLEngine) canUseParquetStatsForAggregation(spec AggregationSpec) bool { + switch spec.Function { + case FuncCOUNT: + return spec.Column == "*" || e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column) + case FuncMIN, FuncMAX: + return e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column) + case FuncSUM, FuncAVG: + // These require scanning actual values, not just min/max + return false + default: + return false + } +} + +// debugHybridScanOptions logs the exact scan options being used +func debugHybridScanOptions(ctx context.Context, options HybridScanOptions, queryType string) { + if isDebugMode(ctx) { + fmt.Printf("\n=== HYBRID SCAN OPTIONS DEBUG (%s) ===\n", queryType) + fmt.Printf("StartTimeNs: %d\n", options.StartTimeNs) + fmt.Printf("StopTimeNs: %d\n", options.StopTimeNs) + fmt.Printf("Limit: %d\n", options.Limit) + fmt.Printf("Offset: %d\n", options.Offset) + fmt.Printf("Predicate: %v\n", options.Predicate != nil) + fmt.Printf("Columns: %v\n", options.Columns) + fmt.Printf("==========================================\n") + } +} + +// collectLiveLogFileNames collects the names of live log files in a partition +func collectLiveLogFileNames(filerClient filer_pb.FilerClient, partitionPath string) ([]string, error) { + var fileNames []string + + err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + // Skip directories and parquet files + if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") || strings.HasSuffix(entry.Name, ".offset") { + return nil + } + + // Only include files with actual content + if len(entry.Chunks) > 0 { + fileNames = append(fileNames, entry.Name) + } + + return nil + }) + + return fileNames, err +} |
