diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-09-10 11:04:42 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-09-10 11:04:42 -0700 |
| commit | 58e0c1b3301f5c5c0723f71726b6f1053e9152f9 (patch) | |
| tree | 2ee40273cb74587227052f908a0eb38c14c2cb56 /weed/query/engine/aggregations.go | |
| parent | 8ed1b104cea39856a51b67d31c6782031c89f642 (diff) | |
| download | seaweedfs-58e0c1b3301f5c5c0723f71726b6f1053e9152f9.tar.xz seaweedfs-58e0c1b3301f5c5c0723f71726b6f1053e9152f9.zip | |
Fix sql bugs (#7219)
* fix nil when explaining
* add plain details when running full scan
* skip files by timestamp
* skip file by timestamp
* refactor
* handle filter by time
* skip broker memory only if it has unflushed messages
* refactoring
* refactor
* address comments
* address comments
* filter by parquet stats
* simplify
* refactor
* prune old code
* optimize
* Update aggregations.go
* ensure non-time predicates are properly detected
* add stmt to populatePlanFileDetails
This helper function is a great way to centralize logic for populating file details. However, it's missing an optimization that is present in executeSelectStatementWithBrokerStats: pruning Parquet files based on column statistics from the WHERE clause.
Aggregation queries that fall back to the slow path could benefit from this optimization. Consider modifying the function signature to accept the *SelectStatement and adding the column statistics pruning logic here, similar to how it's done in executeSelectStatementWithBrokerStats.
* refactoring to work with *schema_pb.Value directly after the initial conversion
Diffstat (limited to 'weed/query/engine/aggregations.go')
| -rw-r--r-- | weed/query/engine/aggregations.go | 166 |
1 files changed, 82 insertions, 84 deletions
diff --git a/weed/query/engine/aggregations.go b/weed/query/engine/aggregations.go index 623e489dd..6b58517e1 100644 --- a/weed/query/engine/aggregations.go +++ b/weed/query/engine/aggregations.go @@ -8,10 +8,8 @@ import ( "strings" "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" - "github.com/seaweedfs/seaweedfs/weed/util" ) // AggregationSpec defines an aggregation function to be computed @@ -78,6 +76,12 @@ func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) // CollectDataSources gathers information about available data sources for a topic func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) { + return opt.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, 0, 0) +} + +// CollectDataSourcesWithTimeFilter gathers information about available data sources for a topic +// with optional time filtering to skip irrelevant parquet files +func (opt *FastPathOptimizer) CollectDataSourcesWithTimeFilter(ctx context.Context, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) (*TopicDataSources, error) { dataSources := &TopicDataSources{ ParquetFiles: make(map[string][]*ParquetFileStats), ParquetRowCount: 0, @@ -125,14 +129,16 @@ func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScan fmt.Printf(" No parquet files found in partition\n") } } else { - dataSources.ParquetFiles[partitionPath] = parquetStats + // Prune by time range using parquet column statistics + filtered := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs) + dataSources.ParquetFiles[partitionPath] = filtered partitionParquetRows := int64(0) - for _, stat := range parquetStats { + for _, stat := range filtered { partitionParquetRows += stat.RowCount dataSources.ParquetRowCount += stat.RowCount } if isDebugMode(ctx) { - fmt.Printf(" Found %d parquet files with %d total rows\n", len(parquetStats), partitionParquetRows) + fmt.Printf(" Found %d parquet files with %d total rows\n", len(filtered), partitionParquetRows) } } @@ -452,20 +458,27 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS } } - // Extract time filters for optimization + // Extract time filters and validate that WHERE clause contains only time-based predicates startTimeNs, stopTimeNs := int64(0), int64(0) + onlyTimePredicates := true if stmt.Where != nil { - startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) + startTimeNs, stopTimeNs, onlyTimePredicates = e.extractTimeFiltersWithValidation(stmt.Where.Expr) } - // FAST PATH RE-ENABLED WITH DEBUG LOGGING: - // Added comprehensive debug logging to identify data counting issues - // This will help us understand why fast path was returning 0 when slow path returns 1803 - if stmt.Where == nil { + // FAST PATH WITH TIME-BASED OPTIMIZATION: + // Allow fast path only for queries without WHERE clause or with time-only WHERE clauses + // This prevents incorrect results when non-time predicates are present + canAttemptFastPath := stmt.Where == nil || onlyTimePredicates + + if canAttemptFastPath { if isDebugMode(ctx) { - fmt.Printf("\nFast path optimization attempt...\n") + if stmt.Where == nil { + fmt.Printf("\nFast path optimization attempt (no WHERE clause)...\n") + } else { + fmt.Printf("\nFast path optimization attempt (time-only WHERE clause)...\n") + } } - fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan) + fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan, startTimeNs, stopTimeNs, stmt) if canOptimize { if isDebugMode(ctx) { fmt.Printf("Fast path optimization succeeded!\n") @@ -478,7 +491,7 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS } } else { if isDebugMode(ctx) { - fmt.Printf("Fast path not applicable due to WHERE clause\n") + fmt.Printf("Fast path not applicable due to complex WHERE clause\n") } } @@ -605,23 +618,66 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS // Build execution tree for aggregation queries if plan is provided if plan != nil { + // Populate detailed plan information for full scan (similar to fast path) + e.populateFullScanPlanDetails(ctx, plan, hybridScanner, stmt) plan.RootNode = e.buildExecutionTree(plan, stmt) } return result, nil } +// populateFullScanPlanDetails populates detailed plan information for full scan queries +// This provides consistency with fast path execution plan details +func (e *SQLEngine) populateFullScanPlanDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, stmt *SelectStatement) { + // plan.Details is initialized at the start of the SELECT execution + + // Extract table information + var database, tableName string + if len(stmt.From) == 1 { + if table, ok := stmt.From[0].(*AliasedTableExpr); ok { + if tableExpr, ok := table.Expr.(TableName); ok { + tableName = tableExpr.Name.String() + if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { + database = tableExpr.Qualifier.String() + } + } + } + } + + // Use current database if not specified + if database == "" { + database = e.catalog.currentDatabase + if database == "" { + database = "default" + } + } + + // Discover partitions and populate file details + if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil { + // Add partition paths to execution plan details + plan.Details["partition_paths"] = partitions + + // Populate detailed file information using shared helper + e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt) + } else { + // Record discovery error to plan for better diagnostics + plan.Details["error_partition_discovery"] = discoverErr.Error() + } +} + // tryFastParquetAggregation attempts to compute aggregations using hybrid approach: // - Use parquet metadata for parquet files // - Count live log files for live data // - Combine both for accurate results per partition // Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) { - return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil) + return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil, 0, 0, nil) } // tryFastParquetAggregationWithPlan is the same as tryFastParquetAggregation but also populates execution plan if provided -func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan) (*QueryResult, bool) { +// startTimeNs, stopTimeNs: optional time range filters for parquet file optimization (0 means no filtering) +// stmt: SELECT statement for column statistics pruning optimization (can be nil) +func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan, startTimeNs, stopTimeNs int64, stmt *SelectStatement) (*QueryResult, bool) { // Use the new modular components optimizer := NewFastPathOptimizer(e) computer := NewAggregationComputer(e) @@ -632,8 +688,8 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri return nil, false } - // Step 2: Collect data sources - dataSources, err := optimizer.CollectDataSources(ctx, hybridScanner) + // Step 2: Collect data sources with time filtering for parquet file optimization + dataSources, err := optimizer.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, startTimeNs, stopTimeNs) if err != nil { return nil, false } @@ -725,9 +781,6 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri } // Merge details while preserving existing ones - if plan.Details == nil { - plan.Details = make(map[string]interface{}) - } for key, value := range aggPlan.Details { plan.Details[key] = value } @@ -735,51 +788,17 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri // Add file path information from the data collection plan.Details["partition_paths"] = partitions - // Collect actual file information for each partition - var parquetFiles []string - var liveLogFiles []string - parquetSources := make(map[string]bool) - - for _, partitionPath := range partitions { - // Get parquet files for this partition - if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil { - for _, stats := range parquetStats { - parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName)) - } - } - - // Merge accurate parquet sources from metadata (preferred over filename fallback) - if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil { - for src := range sources { - parquetSources[src] = true - } - } + // Populate detailed file information using shared helper, including time filters for pruning + plan.Details[PlanDetailStartTimeNs] = startTimeNs + plan.Details[PlanDetailStopTimeNs] = stopTimeNs + e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt) - // Get live log files for this partition - if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil { - for _, fileName := range liveFiles { - // Exclude live log files that have been converted to parquet (deduplicated) - if parquetSources[fileName] { - continue - } - liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName)) - } - } - } - - if len(parquetFiles) > 0 { - plan.Details["parquet_files"] = parquetFiles - } - if len(liveLogFiles) > 0 { - plan.Details["live_log_files"] = liveLogFiles + // Update counts to match discovered live log files + if liveLogFiles, ok := plan.Details["live_log_files"].([]string); ok { + dataSources.LiveLogFilesCount = len(liveLogFiles) + plan.LiveLogFilesScanned = len(liveLogFiles) } - // Update the dataSources.LiveLogFilesCount to match the actual files found - dataSources.LiveLogFilesCount = len(liveLogFiles) - - // Also update the plan's LiveLogFilesScanned to match - plan.LiveLogFilesScanned = len(liveLogFiles) - // Ensure PartitionsScanned is set so Statistics section appears if plan.PartitionsScanned == 0 && len(partitions) > 0 { plan.PartitionsScanned = len(partitions) @@ -912,24 +931,3 @@ func debugHybridScanOptions(ctx context.Context, options HybridScanOptions, quer fmt.Printf("==========================================\n") } } - -// collectLiveLogFileNames collects the names of live log files in a partition -func collectLiveLogFileNames(filerClient filer_pb.FilerClient, partitionPath string) ([]string, error) { - var fileNames []string - - err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { - // Skip directories and parquet files - if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") || strings.HasSuffix(entry.Name, ".offset") { - return nil - } - - // Only include files with actual content - if len(entry.Chunks) > 0 { - fileNames = append(fileNames, entry.Name) - } - - return nil - }) - - return fileNames, err -} |
