aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/aggregations.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/query/engine/aggregations.go')
-rw-r--r--weed/query/engine/aggregations.go935
1 files changed, 935 insertions, 0 deletions
diff --git a/weed/query/engine/aggregations.go b/weed/query/engine/aggregations.go
new file mode 100644
index 000000000..623e489dd
--- /dev/null
+++ b/weed/query/engine/aggregations.go
@@ -0,0 +1,935 @@
+package engine
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "strconv"
+ "strings"
+
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+// AggregationSpec defines an aggregation function to be computed
+type AggregationSpec struct {
+ Function string // COUNT, SUM, AVG, MIN, MAX
+ Column string // Column name, or "*" for COUNT(*)
+ Alias string // Optional alias for the result column
+ Distinct bool // Support for DISTINCT keyword
+}
+
+// AggregationResult holds the computed result of an aggregation
+type AggregationResult struct {
+ Count int64
+ Sum float64
+ Min interface{}
+ Max interface{}
+}
+
+// AggregationStrategy represents the strategy for executing aggregations
+type AggregationStrategy struct {
+ CanUseFastPath bool
+ Reason string
+ UnsupportedSpecs []AggregationSpec
+}
+
+// TopicDataSources represents the data sources available for a topic
+type TopicDataSources struct {
+ ParquetFiles map[string][]*ParquetFileStats // partitionPath -> parquet file stats
+ ParquetRowCount int64
+ LiveLogRowCount int64
+ LiveLogFilesCount int // Total count of live log files across all partitions
+ PartitionsCount int
+ BrokerUnflushedCount int64
+}
+
+// FastPathOptimizer handles fast path aggregation optimization decisions
+type FastPathOptimizer struct {
+ engine *SQLEngine
+}
+
+// NewFastPathOptimizer creates a new fast path optimizer
+func NewFastPathOptimizer(engine *SQLEngine) *FastPathOptimizer {
+ return &FastPathOptimizer{engine: engine}
+}
+
+// DetermineStrategy analyzes aggregations and determines if fast path can be used
+func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) AggregationStrategy {
+ strategy := AggregationStrategy{
+ CanUseFastPath: true,
+ Reason: "all_aggregations_supported",
+ UnsupportedSpecs: []AggregationSpec{},
+ }
+
+ for _, spec := range aggregations {
+ if !opt.engine.canUseParquetStatsForAggregation(spec) {
+ strategy.CanUseFastPath = false
+ strategy.Reason = "unsupported_aggregation_functions"
+ strategy.UnsupportedSpecs = append(strategy.UnsupportedSpecs, spec)
+ }
+ }
+
+ return strategy
+}
+
+// CollectDataSources gathers information about available data sources for a topic
+func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) {
+ dataSources := &TopicDataSources{
+ ParquetFiles: make(map[string][]*ParquetFileStats),
+ ParquetRowCount: 0,
+ LiveLogRowCount: 0,
+ LiveLogFilesCount: 0,
+ PartitionsCount: 0,
+ }
+
+ if isDebugMode(ctx) {
+ fmt.Printf("Collecting data sources for: %s/%s\n", hybridScanner.topic.Namespace, hybridScanner.topic.Name)
+ }
+
+ // Discover partitions for the topic
+ partitionPaths, err := opt.engine.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
+ if err != nil {
+ if isDebugMode(ctx) {
+ fmt.Printf("ERROR: Partition discovery failed: %v\n", err)
+ }
+ return dataSources, DataSourceError{
+ Source: "partition_discovery",
+ Cause: err,
+ }
+ }
+
+ // DEBUG: Log discovered partitions
+ if isDebugMode(ctx) {
+ fmt.Printf("Discovered %d partitions: %v\n", len(partitionPaths), partitionPaths)
+ }
+
+ // Collect stats from each partition
+ // Note: discoverTopicPartitions always returns absolute paths starting with "/topics/"
+ for _, partitionPath := range partitionPaths {
+ if isDebugMode(ctx) {
+ fmt.Printf("\nProcessing partition: %s\n", partitionPath)
+ }
+
+ // Read parquet file statistics
+ parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath)
+ if err != nil {
+ if isDebugMode(ctx) {
+ fmt.Printf(" ERROR: Failed to read parquet statistics: %v\n", err)
+ }
+ } else if len(parquetStats) == 0 {
+ if isDebugMode(ctx) {
+ fmt.Printf(" No parquet files found in partition\n")
+ }
+ } else {
+ dataSources.ParquetFiles[partitionPath] = parquetStats
+ partitionParquetRows := int64(0)
+ for _, stat := range parquetStats {
+ partitionParquetRows += stat.RowCount
+ dataSources.ParquetRowCount += stat.RowCount
+ }
+ if isDebugMode(ctx) {
+ fmt.Printf(" Found %d parquet files with %d total rows\n", len(parquetStats), partitionParquetRows)
+ }
+ }
+
+ // Count live log files (excluding those converted to parquet)
+ parquetSources := opt.engine.extractParquetSourceFiles(dataSources.ParquetFiles[partitionPath])
+ liveLogCount, liveLogErr := opt.engine.countLiveLogRowsExcludingParquetSources(ctx, partitionPath, parquetSources)
+ if liveLogErr != nil {
+ if isDebugMode(ctx) {
+ fmt.Printf(" ERROR: Failed to count live log rows: %v\n", liveLogErr)
+ }
+ } else {
+ dataSources.LiveLogRowCount += liveLogCount
+ if isDebugMode(ctx) {
+ fmt.Printf(" Found %d live log rows (excluding %d parquet sources)\n", liveLogCount, len(parquetSources))
+ }
+ }
+
+ // Count live log files for partition with proper range values
+ // Extract partition name from absolute path (e.g., "0000-2520" from "/topics/.../v2025.../0000-2520")
+ partitionName := partitionPath[strings.LastIndex(partitionPath, "/")+1:]
+ partitionParts := strings.Split(partitionName, "-")
+ if len(partitionParts) == 2 {
+ rangeStart, err1 := strconv.Atoi(partitionParts[0])
+ rangeStop, err2 := strconv.Atoi(partitionParts[1])
+ if err1 == nil && err2 == nil {
+ partition := topic.Partition{
+ RangeStart: int32(rangeStart),
+ RangeStop: int32(rangeStop),
+ }
+ liveLogFileCount, err := hybridScanner.countLiveLogFiles(partition)
+ if err == nil {
+ dataSources.LiveLogFilesCount += liveLogFileCount
+ }
+
+ // Count broker unflushed messages for this partition
+ if hybridScanner.brokerClient != nil {
+ entries, err := hybridScanner.brokerClient.GetUnflushedMessages(ctx, hybridScanner.topic.Namespace, hybridScanner.topic.Name, partition, 0)
+ if err == nil {
+ dataSources.BrokerUnflushedCount += int64(len(entries))
+ if isDebugMode(ctx) {
+ fmt.Printf(" Found %d unflushed broker messages\n", len(entries))
+ }
+ } else if isDebugMode(ctx) {
+ fmt.Printf(" ERROR: Failed to get unflushed broker messages: %v\n", err)
+ }
+ }
+ }
+ }
+ }
+
+ dataSources.PartitionsCount = len(partitionPaths)
+
+ if isDebugMode(ctx) {
+ fmt.Printf("Data sources collected: %d partitions, %d parquet rows, %d live log rows, %d broker buffer rows\n",
+ dataSources.PartitionsCount, dataSources.ParquetRowCount, dataSources.LiveLogRowCount, dataSources.BrokerUnflushedCount)
+ }
+
+ return dataSources, nil
+}
+
+// AggregationComputer handles the computation of aggregations using fast path
+type AggregationComputer struct {
+ engine *SQLEngine
+}
+
+// NewAggregationComputer creates a new aggregation computer
+func NewAggregationComputer(engine *SQLEngine) *AggregationComputer {
+ return &AggregationComputer{engine: engine}
+}
+
+// ComputeFastPathAggregations computes aggregations using parquet statistics and live log data
+func (comp *AggregationComputer) ComputeFastPathAggregations(
+ ctx context.Context,
+ aggregations []AggregationSpec,
+ dataSources *TopicDataSources,
+ partitions []string,
+) ([]AggregationResult, error) {
+
+ aggResults := make([]AggregationResult, len(aggregations))
+
+ for i, spec := range aggregations {
+ switch spec.Function {
+ case FuncCOUNT:
+ if spec.Column == "*" {
+ aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
+ } else {
+ // For specific columns, we might need to account for NULLs in the future
+ aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
+ }
+
+ case FuncMIN:
+ globalMin, err := comp.computeGlobalMin(spec, dataSources, partitions)
+ if err != nil {
+ return nil, AggregationError{
+ Operation: spec.Function,
+ Column: spec.Column,
+ Cause: err,
+ }
+ }
+ aggResults[i].Min = globalMin
+
+ case FuncMAX:
+ globalMax, err := comp.computeGlobalMax(spec, dataSources, partitions)
+ if err != nil {
+ return nil, AggregationError{
+ Operation: spec.Function,
+ Column: spec.Column,
+ Cause: err,
+ }
+ }
+ aggResults[i].Max = globalMax
+
+ default:
+ return nil, OptimizationError{
+ Strategy: "fast_path_aggregation",
+ Reason: fmt.Sprintf("unsupported aggregation function: %s", spec.Function),
+ }
+ }
+ }
+
+ return aggResults, nil
+}
+
+// computeGlobalMin computes the global minimum value across all data sources
+func (comp *AggregationComputer) computeGlobalMin(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) {
+ var globalMin interface{}
+ var globalMinValue *schema_pb.Value
+ hasParquetStats := false
+
+ // Step 1: Get minimum from parquet statistics
+ for _, fileStats := range dataSources.ParquetFiles {
+ for _, fileStat := range fileStats {
+ // Try case-insensitive column lookup
+ var colStats *ParquetColumnStats
+ var found bool
+
+ // First try exact match
+ if stats, exists := fileStat.ColumnStats[spec.Column]; exists {
+ colStats = stats
+ found = true
+ } else {
+ // Try case-insensitive lookup
+ for colName, stats := range fileStat.ColumnStats {
+ if strings.EqualFold(colName, spec.Column) {
+ colStats = stats
+ found = true
+ break
+ }
+ }
+ }
+
+ if found && colStats != nil && colStats.MinValue != nil {
+ if globalMinValue == nil || comp.engine.compareValues(colStats.MinValue, globalMinValue) < 0 {
+ globalMinValue = colStats.MinValue
+ extractedValue := comp.engine.extractRawValue(colStats.MinValue)
+ if extractedValue != nil {
+ globalMin = extractedValue
+ hasParquetStats = true
+ }
+ }
+ }
+ }
+ }
+
+ // Step 2: Get minimum from live log data (only if no live logs or if we need to compare)
+ if dataSources.LiveLogRowCount > 0 {
+ for _, partition := range partitions {
+ partitionParquetSources := make(map[string]bool)
+ if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists {
+ partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats)
+ }
+
+ liveLogMin, _, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources)
+ if err != nil {
+ continue // Skip partitions with errors
+ }
+
+ if liveLogMin != nil {
+ if globalMin == nil {
+ globalMin = liveLogMin
+ } else {
+ liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMin)
+ if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMinValue) < 0 {
+ globalMin = liveLogMin
+ globalMinValue = liveLogSchemaValue
+ }
+ }
+ }
+ }
+ }
+
+ // Step 3: Handle system columns if no regular data found
+ if globalMin == nil && !hasParquetStats {
+ globalMin = comp.engine.getSystemColumnGlobalMin(spec.Column, dataSources.ParquetFiles)
+ }
+
+ return globalMin, nil
+}
+
+// computeGlobalMax computes the global maximum value across all data sources
+func (comp *AggregationComputer) computeGlobalMax(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) {
+ var globalMax interface{}
+ var globalMaxValue *schema_pb.Value
+ hasParquetStats := false
+
+ // Step 1: Get maximum from parquet statistics
+ for _, fileStats := range dataSources.ParquetFiles {
+ for _, fileStat := range fileStats {
+ // Try case-insensitive column lookup
+ var colStats *ParquetColumnStats
+ var found bool
+
+ // First try exact match
+ if stats, exists := fileStat.ColumnStats[spec.Column]; exists {
+ colStats = stats
+ found = true
+ } else {
+ // Try case-insensitive lookup
+ for colName, stats := range fileStat.ColumnStats {
+ if strings.EqualFold(colName, spec.Column) {
+ colStats = stats
+ found = true
+ break
+ }
+ }
+ }
+
+ if found && colStats != nil && colStats.MaxValue != nil {
+ if globalMaxValue == nil || comp.engine.compareValues(colStats.MaxValue, globalMaxValue) > 0 {
+ globalMaxValue = colStats.MaxValue
+ extractedValue := comp.engine.extractRawValue(colStats.MaxValue)
+ if extractedValue != nil {
+ globalMax = extractedValue
+ hasParquetStats = true
+ }
+ }
+ }
+ }
+ }
+
+ // Step 2: Get maximum from live log data (only if live logs exist)
+ if dataSources.LiveLogRowCount > 0 {
+ for _, partition := range partitions {
+ partitionParquetSources := make(map[string]bool)
+ if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists {
+ partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats)
+ }
+
+ _, liveLogMax, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources)
+ if err != nil {
+ continue // Skip partitions with errors
+ }
+
+ if liveLogMax != nil {
+ if globalMax == nil {
+ globalMax = liveLogMax
+ } else {
+ liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMax)
+ if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMaxValue) > 0 {
+ globalMax = liveLogMax
+ globalMaxValue = liveLogSchemaValue
+ }
+ }
+ }
+ }
+ }
+
+ // Step 3: Handle system columns if no regular data found
+ if globalMax == nil && !hasParquetStats {
+ globalMax = comp.engine.getSystemColumnGlobalMax(spec.Column, dataSources.ParquetFiles)
+ }
+
+ return globalMax, nil
+}
+
+// executeAggregationQuery handles SELECT queries with aggregation functions
+func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *SelectStatement) (*QueryResult, error) {
+ return e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, nil)
+}
+
+// executeAggregationQueryWithPlan handles SELECT queries with aggregation functions and populates execution plan
+func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) {
+ // Parse LIMIT and OFFSET for aggregation results (do this first)
+ // Use -1 to distinguish "no LIMIT" from "LIMIT 0"
+ limit := -1
+ offset := 0
+ if stmt.Limit != nil && stmt.Limit.Rowcount != nil {
+ if limitExpr, ok := stmt.Limit.Rowcount.(*SQLVal); ok && limitExpr.Type == IntVal {
+ if limit64, err := strconv.ParseInt(string(limitExpr.Val), 10, 64); err == nil {
+ if limit64 > int64(math.MaxInt) || limit64 < 0 {
+ return nil, fmt.Errorf("LIMIT value %d is out of range", limit64)
+ }
+ // Safe conversion after bounds check
+ limit = int(limit64)
+ }
+ }
+ }
+ if stmt.Limit != nil && stmt.Limit.Offset != nil {
+ if offsetExpr, ok := stmt.Limit.Offset.(*SQLVal); ok && offsetExpr.Type == IntVal {
+ if offset64, err := strconv.ParseInt(string(offsetExpr.Val), 10, 64); err == nil {
+ if offset64 > int64(math.MaxInt) || offset64 < 0 {
+ return nil, fmt.Errorf("OFFSET value %d is out of range", offset64)
+ }
+ // Safe conversion after bounds check
+ offset = int(offset64)
+ }
+ }
+ }
+
+ // Parse WHERE clause for filtering
+ var predicate func(*schema_pb.RecordValue) bool
+ var err error
+ if stmt.Where != nil {
+ predicate, err = e.buildPredicate(stmt.Where.Expr)
+ if err != nil {
+ return &QueryResult{Error: err}, err
+ }
+ }
+
+ // Extract time filters for optimization
+ startTimeNs, stopTimeNs := int64(0), int64(0)
+ if stmt.Where != nil {
+ startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
+ }
+
+ // FAST PATH RE-ENABLED WITH DEBUG LOGGING:
+ // Added comprehensive debug logging to identify data counting issues
+ // This will help us understand why fast path was returning 0 when slow path returns 1803
+ if stmt.Where == nil {
+ if isDebugMode(ctx) {
+ fmt.Printf("\nFast path optimization attempt...\n")
+ }
+ fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan)
+ if canOptimize {
+ if isDebugMode(ctx) {
+ fmt.Printf("Fast path optimization succeeded!\n")
+ }
+ return fastResult, nil
+ } else {
+ if isDebugMode(ctx) {
+ fmt.Printf("Fast path optimization failed, falling back to slow path\n")
+ }
+ }
+ } else {
+ if isDebugMode(ctx) {
+ fmt.Printf("Fast path not applicable due to WHERE clause\n")
+ }
+ }
+
+ // SLOW PATH: Fall back to full table scan
+ if isDebugMode(ctx) {
+ fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n")
+ }
+
+ // Extract columns needed for aggregations
+ columnsNeeded := make(map[string]bool)
+ for _, spec := range aggregations {
+ if spec.Column != "*" {
+ columnsNeeded[spec.Column] = true
+ }
+ }
+
+ // Convert to slice
+ var scanColumns []string
+ if len(columnsNeeded) > 0 {
+ scanColumns = make([]string, 0, len(columnsNeeded))
+ for col := range columnsNeeded {
+ scanColumns = append(scanColumns, col)
+ }
+ }
+ // If no specific columns needed (COUNT(*) only), don't specify columns (scan all)
+
+ // Build scan options for full table scan (aggregations need all data during scanning)
+ hybridScanOptions := HybridScanOptions{
+ StartTimeNs: startTimeNs,
+ StopTimeNs: stopTimeNs,
+ Limit: -1, // Use -1 to mean "no limit" - need all data for aggregation
+ Offset: 0, // No offset during scanning - OFFSET applies to final results
+ Predicate: predicate,
+ Columns: scanColumns, // Include columns needed for aggregation functions
+ }
+
+ // DEBUG: Log scan options for aggregation
+ debugHybridScanOptions(ctx, hybridScanOptions, "AGGREGATION")
+
+ // Execute the hybrid scan to get all matching records
+ var results []HybridScanResult
+ if plan != nil {
+ // EXPLAIN mode - capture broker buffer stats
+ var stats *HybridScanStats
+ results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions)
+ if err != nil {
+ return &QueryResult{Error: err}, err
+ }
+
+ // Populate plan with broker buffer information
+ if stats != nil {
+ plan.BrokerBufferQueried = stats.BrokerBufferQueried
+ plan.BrokerBufferMessages = stats.BrokerBufferMessages
+ plan.BufferStartIndex = stats.BufferStartIndex
+
+ // Add broker_buffer to data sources if buffer was queried
+ if stats.BrokerBufferQueried {
+ // Check if broker_buffer is already in data sources
+ hasBrokerBuffer := false
+ for _, source := range plan.DataSources {
+ if source == "broker_buffer" {
+ hasBrokerBuffer = true
+ break
+ }
+ }
+ if !hasBrokerBuffer {
+ plan.DataSources = append(plan.DataSources, "broker_buffer")
+ }
+ }
+ }
+ } else {
+ // Normal mode - just get results
+ results, err = hybridScanner.Scan(ctx, hybridScanOptions)
+ if err != nil {
+ return &QueryResult{Error: err}, err
+ }
+ }
+
+ // DEBUG: Log scan results
+ if isDebugMode(ctx) {
+ fmt.Printf("AGGREGATION SCAN RESULTS: %d rows returned\n", len(results))
+ }
+
+ // Compute aggregations
+ aggResults := e.computeAggregations(results, aggregations)
+
+ // Build result set
+ columns := make([]string, len(aggregations))
+ row := make([]sqltypes.Value, len(aggregations))
+
+ for i, spec := range aggregations {
+ columns[i] = spec.Alias
+ row[i] = e.formatAggregationResult(spec, aggResults[i])
+ }
+
+ // Apply OFFSET and LIMIT to aggregation results
+ // Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows
+ rows := [][]sqltypes.Value{row}
+ if offset > 0 || limit >= 0 {
+ // Handle LIMIT 0 first
+ if limit == 0 {
+ rows = [][]sqltypes.Value{}
+ } else {
+ // Apply OFFSET first
+ if offset > 0 {
+ if offset >= len(rows) {
+ rows = [][]sqltypes.Value{}
+ } else {
+ rows = rows[offset:]
+ }
+ }
+
+ // Apply LIMIT after OFFSET (only if limit > 0)
+ if limit > 0 && len(rows) > limit {
+ rows = rows[:limit]
+ }
+ }
+ }
+
+ result := &QueryResult{
+ Columns: columns,
+ Rows: rows,
+ }
+
+ // Build execution tree for aggregation queries if plan is provided
+ if plan != nil {
+ plan.RootNode = e.buildExecutionTree(plan, stmt)
+ }
+
+ return result, nil
+}
+
+// tryFastParquetAggregation attempts to compute aggregations using hybrid approach:
+// - Use parquet metadata for parquet files
+// - Count live log files for live data
+// - Combine both for accurate results per partition
+// Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used
+func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) {
+ return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil)
+}
+
+// tryFastParquetAggregationWithPlan is the same as tryFastParquetAggregation but also populates execution plan if provided
+func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan) (*QueryResult, bool) {
+ // Use the new modular components
+ optimizer := NewFastPathOptimizer(e)
+ computer := NewAggregationComputer(e)
+
+ // Step 1: Determine strategy
+ strategy := optimizer.DetermineStrategy(aggregations)
+ if !strategy.CanUseFastPath {
+ return nil, false
+ }
+
+ // Step 2: Collect data sources
+ dataSources, err := optimizer.CollectDataSources(ctx, hybridScanner)
+ if err != nil {
+ return nil, false
+ }
+
+ // Build partition list for aggregation computer
+ // Note: discoverTopicPartitions always returns absolute paths
+ partitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
+ if err != nil {
+ return nil, false
+ }
+
+ // Debug: Show the hybrid optimization results (only in explain mode)
+ if isDebugMode(ctx) && (dataSources.ParquetRowCount > 0 || dataSources.LiveLogRowCount > 0 || dataSources.BrokerUnflushedCount > 0) {
+ partitionsWithLiveLogs := 0
+ if dataSources.LiveLogRowCount > 0 || dataSources.BrokerUnflushedCount > 0 {
+ partitionsWithLiveLogs = 1 // Simplified for now
+ }
+ fmt.Printf("Hybrid fast aggregation with deduplication: %d parquet rows + %d deduplicated live log rows + %d broker buffer rows from %d partitions\n",
+ dataSources.ParquetRowCount, dataSources.LiveLogRowCount, dataSources.BrokerUnflushedCount, partitionsWithLiveLogs)
+ }
+
+ // Step 3: Compute aggregations using fast path
+ aggResults, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
+ if err != nil {
+ return nil, false
+ }
+
+ // Step 3.5: Validate fast path results (safety check)
+ // For simple COUNT(*) queries, ensure we got a reasonable result
+ if len(aggregations) == 1 && aggregations[0].Function == FuncCOUNT && aggregations[0].Column == "*" {
+ totalRows := dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
+ countResult := aggResults[0].Count
+
+ if isDebugMode(ctx) {
+ fmt.Printf("Validating fast path: COUNT=%d, Sources=%d\n", countResult, totalRows)
+ }
+
+ if totalRows == 0 && countResult > 0 {
+ // Fast path found data but data sources show 0 - this suggests a bug
+ if isDebugMode(ctx) {
+ fmt.Printf("Fast path validation failed: COUNT=%d but sources=0\n", countResult)
+ }
+ return nil, false
+ }
+ if totalRows > 0 && countResult == 0 {
+ // Data sources show data but COUNT is 0 - this also suggests a bug
+ if isDebugMode(ctx) {
+ fmt.Printf("Fast path validation failed: sources=%d but COUNT=0\n", totalRows)
+ }
+ return nil, false
+ }
+ if countResult != totalRows {
+ // Counts don't match - this suggests inconsistent logic
+ if isDebugMode(ctx) {
+ fmt.Printf("Fast path validation failed: COUNT=%d != sources=%d\n", countResult, totalRows)
+ }
+ return nil, false
+ }
+ if isDebugMode(ctx) {
+ fmt.Printf("Fast path validation passed: COUNT=%d\n", countResult)
+ }
+ }
+
+ // Step 4: Populate execution plan if provided (for EXPLAIN queries)
+ if plan != nil {
+ strategy := optimizer.DetermineStrategy(aggregations)
+ builder := &ExecutionPlanBuilder{}
+
+ // Create a minimal SELECT statement for the plan builder (avoid nil pointer)
+ stmt := &SelectStatement{}
+
+ // Build aggregation plan with fast path strategy
+ aggPlan := builder.BuildAggregationPlan(stmt, aggregations, strategy, dataSources)
+
+ // Copy relevant fields to the main plan
+ plan.ExecutionStrategy = aggPlan.ExecutionStrategy
+ plan.DataSources = aggPlan.DataSources
+ plan.OptimizationsUsed = aggPlan.OptimizationsUsed
+ plan.PartitionsScanned = aggPlan.PartitionsScanned
+ plan.ParquetFilesScanned = aggPlan.ParquetFilesScanned
+ plan.LiveLogFilesScanned = aggPlan.LiveLogFilesScanned
+ plan.TotalRowsProcessed = aggPlan.TotalRowsProcessed
+ plan.Aggregations = aggPlan.Aggregations
+
+ // Indicate broker buffer participation for EXPLAIN tree rendering
+ if dataSources.BrokerUnflushedCount > 0 {
+ plan.BrokerBufferQueried = true
+ plan.BrokerBufferMessages = int(dataSources.BrokerUnflushedCount)
+ }
+
+ // Merge details while preserving existing ones
+ if plan.Details == nil {
+ plan.Details = make(map[string]interface{})
+ }
+ for key, value := range aggPlan.Details {
+ plan.Details[key] = value
+ }
+
+ // Add file path information from the data collection
+ plan.Details["partition_paths"] = partitions
+
+ // Collect actual file information for each partition
+ var parquetFiles []string
+ var liveLogFiles []string
+ parquetSources := make(map[string]bool)
+
+ for _, partitionPath := range partitions {
+ // Get parquet files for this partition
+ if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil {
+ for _, stats := range parquetStats {
+ parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName))
+ }
+ }
+
+ // Merge accurate parquet sources from metadata (preferred over filename fallback)
+ if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil {
+ for src := range sources {
+ parquetSources[src] = true
+ }
+ }
+
+ // Get live log files for this partition
+ if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil {
+ for _, fileName := range liveFiles {
+ // Exclude live log files that have been converted to parquet (deduplicated)
+ if parquetSources[fileName] {
+ continue
+ }
+ liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName))
+ }
+ }
+ }
+
+ if len(parquetFiles) > 0 {
+ plan.Details["parquet_files"] = parquetFiles
+ }
+ if len(liveLogFiles) > 0 {
+ plan.Details["live_log_files"] = liveLogFiles
+ }
+
+ // Update the dataSources.LiveLogFilesCount to match the actual files found
+ dataSources.LiveLogFilesCount = len(liveLogFiles)
+
+ // Also update the plan's LiveLogFilesScanned to match
+ plan.LiveLogFilesScanned = len(liveLogFiles)
+
+ // Ensure PartitionsScanned is set so Statistics section appears
+ if plan.PartitionsScanned == 0 && len(partitions) > 0 {
+ plan.PartitionsScanned = len(partitions)
+ }
+
+ if isDebugMode(ctx) {
+ fmt.Printf("Populated execution plan with fast path strategy\n")
+ }
+ }
+
+ // Step 5: Build final query result
+ columns := make([]string, len(aggregations))
+ row := make([]sqltypes.Value, len(aggregations))
+
+ for i, spec := range aggregations {
+ columns[i] = spec.Alias
+ row[i] = e.formatAggregationResult(spec, aggResults[i])
+ }
+
+ result := &QueryResult{
+ Columns: columns,
+ Rows: [][]sqltypes.Value{row},
+ }
+
+ return result, true
+}
+
+// computeAggregations computes aggregation results from a full table scan
+func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations []AggregationSpec) []AggregationResult {
+ aggResults := make([]AggregationResult, len(aggregations))
+
+ for i, spec := range aggregations {
+ switch spec.Function {
+ case FuncCOUNT:
+ if spec.Column == "*" {
+ aggResults[i].Count = int64(len(results))
+ } else {
+ count := int64(0)
+ for _, result := range results {
+ if value := e.findColumnValue(result, spec.Column); value != nil && !e.isNullValue(value) {
+ count++
+ }
+ }
+ aggResults[i].Count = count
+ }
+
+ case FuncSUM:
+ sum := float64(0)
+ for _, result := range results {
+ if value := e.findColumnValue(result, spec.Column); value != nil {
+ if numValue := e.convertToNumber(value); numValue != nil {
+ sum += *numValue
+ }
+ }
+ }
+ aggResults[i].Sum = sum
+
+ case FuncAVG:
+ sum := float64(0)
+ count := int64(0)
+ for _, result := range results {
+ if value := e.findColumnValue(result, spec.Column); value != nil {
+ if numValue := e.convertToNumber(value); numValue != nil {
+ sum += *numValue
+ count++
+ }
+ }
+ }
+ if count > 0 {
+ aggResults[i].Sum = sum / float64(count) // Store average in Sum field
+ aggResults[i].Count = count
+ }
+
+ case FuncMIN:
+ var min interface{}
+ var minValue *schema_pb.Value
+ for _, result := range results {
+ if value := e.findColumnValue(result, spec.Column); value != nil {
+ if minValue == nil || e.compareValues(value, minValue) < 0 {
+ minValue = value
+ min = e.extractRawValue(value)
+ }
+ }
+ }
+ aggResults[i].Min = min
+
+ case FuncMAX:
+ var max interface{}
+ var maxValue *schema_pb.Value
+ for _, result := range results {
+ if value := e.findColumnValue(result, spec.Column); value != nil {
+ if maxValue == nil || e.compareValues(value, maxValue) > 0 {
+ maxValue = value
+ max = e.extractRawValue(value)
+ }
+ }
+ }
+ aggResults[i].Max = max
+ }
+ }
+
+ return aggResults
+}
+
+// canUseParquetStatsForAggregation determines if an aggregation can be optimized with parquet stats
+func (e *SQLEngine) canUseParquetStatsForAggregation(spec AggregationSpec) bool {
+ switch spec.Function {
+ case FuncCOUNT:
+ return spec.Column == "*" || e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column)
+ case FuncMIN, FuncMAX:
+ return e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column)
+ case FuncSUM, FuncAVG:
+ // These require scanning actual values, not just min/max
+ return false
+ default:
+ return false
+ }
+}
+
+// debugHybridScanOptions logs the exact scan options being used
+func debugHybridScanOptions(ctx context.Context, options HybridScanOptions, queryType string) {
+ if isDebugMode(ctx) {
+ fmt.Printf("\n=== HYBRID SCAN OPTIONS DEBUG (%s) ===\n", queryType)
+ fmt.Printf("StartTimeNs: %d\n", options.StartTimeNs)
+ fmt.Printf("StopTimeNs: %d\n", options.StopTimeNs)
+ fmt.Printf("Limit: %d\n", options.Limit)
+ fmt.Printf("Offset: %d\n", options.Offset)
+ fmt.Printf("Predicate: %v\n", options.Predicate != nil)
+ fmt.Printf("Columns: %v\n", options.Columns)
+ fmt.Printf("==========================================\n")
+ }
+}
+
+// collectLiveLogFileNames collects the names of live log files in a partition
+func collectLiveLogFileNames(filerClient filer_pb.FilerClient, partitionPath string) ([]string, error) {
+ var fileNames []string
+
+ err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
+ // Skip directories and parquet files
+ if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") || strings.HasSuffix(entry.Name, ".offset") {
+ return nil
+ }
+
+ // Only include files with actual content
+ if len(entry.Chunks) > 0 {
+ fileNames = append(fileNames, entry.Name)
+ }
+
+ return nil
+ })
+
+ return fileNames, err
+}