diff options
Diffstat (limited to 'weed/query/engine/aggregations.go')
| -rw-r--r-- | weed/query/engine/aggregations.go | 166 |
1 files changed, 82 insertions, 84 deletions
diff --git a/weed/query/engine/aggregations.go b/weed/query/engine/aggregations.go index 623e489dd..6b58517e1 100644 --- a/weed/query/engine/aggregations.go +++ b/weed/query/engine/aggregations.go @@ -8,10 +8,8 @@ import ( "strings" "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" - "github.com/seaweedfs/seaweedfs/weed/util" ) // AggregationSpec defines an aggregation function to be computed @@ -78,6 +76,12 @@ func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) // CollectDataSources gathers information about available data sources for a topic func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) { + return opt.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, 0, 0) +} + +// CollectDataSourcesWithTimeFilter gathers information about available data sources for a topic +// with optional time filtering to skip irrelevant parquet files +func (opt *FastPathOptimizer) CollectDataSourcesWithTimeFilter(ctx context.Context, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) (*TopicDataSources, error) { dataSources := &TopicDataSources{ ParquetFiles: make(map[string][]*ParquetFileStats), ParquetRowCount: 0, @@ -125,14 +129,16 @@ func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScan fmt.Printf(" No parquet files found in partition\n") } } else { - dataSources.ParquetFiles[partitionPath] = parquetStats + // Prune by time range using parquet column statistics + filtered := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs) + dataSources.ParquetFiles[partitionPath] = filtered partitionParquetRows := int64(0) - for _, stat := range parquetStats { + for _, stat := range filtered { partitionParquetRows += stat.RowCount dataSources.ParquetRowCount += stat.RowCount } if isDebugMode(ctx) { - fmt.Printf(" Found %d parquet files with %d total rows\n", len(parquetStats), partitionParquetRows) + fmt.Printf(" Found %d parquet files with %d total rows\n", len(filtered), partitionParquetRows) } } @@ -452,20 +458,27 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS } } - // Extract time filters for optimization + // Extract time filters and validate that WHERE clause contains only time-based predicates startTimeNs, stopTimeNs := int64(0), int64(0) + onlyTimePredicates := true if stmt.Where != nil { - startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) + startTimeNs, stopTimeNs, onlyTimePredicates = e.extractTimeFiltersWithValidation(stmt.Where.Expr) } - // FAST PATH RE-ENABLED WITH DEBUG LOGGING: - // Added comprehensive debug logging to identify data counting issues - // This will help us understand why fast path was returning 0 when slow path returns 1803 - if stmt.Where == nil { + // FAST PATH WITH TIME-BASED OPTIMIZATION: + // Allow fast path only for queries without WHERE clause or with time-only WHERE clauses + // This prevents incorrect results when non-time predicates are present + canAttemptFastPath := stmt.Where == nil || onlyTimePredicates + + if canAttemptFastPath { if isDebugMode(ctx) { - fmt.Printf("\nFast path optimization attempt...\n") + if stmt.Where == nil { + fmt.Printf("\nFast path optimization attempt (no WHERE clause)...\n") + } else { + fmt.Printf("\nFast path optimization attempt (time-only WHERE clause)...\n") + } } - fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan) + fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan, startTimeNs, stopTimeNs, stmt) if canOptimize { if isDebugMode(ctx) { fmt.Printf("Fast path optimization succeeded!\n") @@ -478,7 +491,7 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS } } else { if isDebugMode(ctx) { - fmt.Printf("Fast path not applicable due to WHERE clause\n") + fmt.Printf("Fast path not applicable due to complex WHERE clause\n") } } @@ -605,23 +618,66 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS // Build execution tree for aggregation queries if plan is provided if plan != nil { + // Populate detailed plan information for full scan (similar to fast path) + e.populateFullScanPlanDetails(ctx, plan, hybridScanner, stmt) plan.RootNode = e.buildExecutionTree(plan, stmt) } return result, nil } +// populateFullScanPlanDetails populates detailed plan information for full scan queries +// This provides consistency with fast path execution plan details +func (e *SQLEngine) populateFullScanPlanDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, stmt *SelectStatement) { + // plan.Details is initialized at the start of the SELECT execution + + // Extract table information + var database, tableName string + if len(stmt.From) == 1 { + if table, ok := stmt.From[0].(*AliasedTableExpr); ok { + if tableExpr, ok := table.Expr.(TableName); ok { + tableName = tableExpr.Name.String() + if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { + database = tableExpr.Qualifier.String() + } + } + } + } + + // Use current database if not specified + if database == "" { + database = e.catalog.currentDatabase + if database == "" { + database = "default" + } + } + + // Discover partitions and populate file details + if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil { + // Add partition paths to execution plan details + plan.Details["partition_paths"] = partitions + + // Populate detailed file information using shared helper + e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt) + } else { + // Record discovery error to plan for better diagnostics + plan.Details["error_partition_discovery"] = discoverErr.Error() + } +} + // tryFastParquetAggregation attempts to compute aggregations using hybrid approach: // - Use parquet metadata for parquet files // - Count live log files for live data // - Combine both for accurate results per partition // Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) { - return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil) + return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil, 0, 0, nil) } // tryFastParquetAggregationWithPlan is the same as tryFastParquetAggregation but also populates execution plan if provided -func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan) (*QueryResult, bool) { +// startTimeNs, stopTimeNs: optional time range filters for parquet file optimization (0 means no filtering) +// stmt: SELECT statement for column statistics pruning optimization (can be nil) +func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan, startTimeNs, stopTimeNs int64, stmt *SelectStatement) (*QueryResult, bool) { // Use the new modular components optimizer := NewFastPathOptimizer(e) computer := NewAggregationComputer(e) @@ -632,8 +688,8 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri return nil, false } - // Step 2: Collect data sources - dataSources, err := optimizer.CollectDataSources(ctx, hybridScanner) + // Step 2: Collect data sources with time filtering for parquet file optimization + dataSources, err := optimizer.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, startTimeNs, stopTimeNs) if err != nil { return nil, false } @@ -725,9 +781,6 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri } // Merge details while preserving existing ones - if plan.Details == nil { - plan.Details = make(map[string]interface{}) - } for key, value := range aggPlan.Details { plan.Details[key] = value } @@ -735,51 +788,17 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri // Add file path information from the data collection plan.Details["partition_paths"] = partitions - // Collect actual file information for each partition - var parquetFiles []string - var liveLogFiles []string - parquetSources := make(map[string]bool) - - for _, partitionPath := range partitions { - // Get parquet files for this partition - if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { - for _, stats := range parquetStats { - parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) - } - } - - // Merge accurate parquet sources from metadata (preferred over filename fallback) - if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil { - for src := range sources { - parquetSources[src] = true - } - } + // Populate detailed file information using shared helper, including time filters for pruning + plan.Details[PlanDetailStartTimeNs] = startTimeNs + plan.Details[PlanDetailStopTimeNs] = stopTimeNs + e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt) - // Get live log files for this partition - if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil { - for _, fileName := range liveFiles { - // Exclude live log files that have been converted to parquet (deduplicated) - if parquetSources[fileName] { - continue - } - liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) - } - } - } - - if len(parquetFiles) > 0 { - plan.Details["parquet_files"] = parquetFiles - } - if len(liveLogFiles) > 0 { - plan.Details["live_log_files"] = liveLogFiles + // Update counts to match discovered live log files + if liveLogFiles, ok := plan.Details["live_log_files"].([]string); ok { + dataSources.LiveLogFilesCount = len(liveLogFiles) + plan.LiveLogFilesScanned = len(liveLogFiles) } - // Update the dataSources.LiveLogFilesCount to match the actual files found - dataSources.LiveLogFilesCount = len(liveLogFiles) - - // Also update the plan's LiveLogFilesScanned to match - plan.LiveLogFilesScanned = len(liveLogFiles) - // Ensure PartitionsScanned is set so Statistics section appears if plan.PartitionsScanned == 0 && len(partitions) > 0 { plan.PartitionsScanned = len(partitions) @@ -912,24 +931,3 @@ func debugHybridScanOptions(ctx context.Context, options HybridScanOptions, quer fmt.Printf("==========================================\n") } } - -// collectLiveLogFileNames collects the names of live log files in a partition -func collectLiveLogFileNames(filerClient filer_pb.FilerClient, partitionPath string) ([]string, error) { - var fileNames []string - - err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { - // Skip directories and parquet files - if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") || strings.HasSuffix(entry.Name, ".offset") { - return nil - } - - // Only include files with actual content - if len(entry.Chunks) > 0 { - fileNames = append(fileNames, entry.Name) - } - - return nil - }) - - return fileNames, err -} |
