aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/aggregations.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/query/engine/aggregations.go')
-rw-r--r--weed/query/engine/aggregations.go166
1 files changed, 82 insertions, 84 deletions
diff --git a/weed/query/engine/aggregations.go b/weed/query/engine/aggregations.go
index 623e489dd..6b58517e1 100644
--- a/weed/query/engine/aggregations.go
+++ b/weed/query/engine/aggregations.go
@@ -8,10 +8,8 @@ import (
"strings"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
- "github.com/seaweedfs/seaweedfs/weed/util"
)
// AggregationSpec defines an aggregation function to be computed
@@ -78,6 +76,12 @@ func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec)
// CollectDataSources gathers information about available data sources for a topic
func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) {
+ return opt.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, 0, 0)
+}
+
+// CollectDataSourcesWithTimeFilter gathers information about available data sources for a topic
+// with optional time filtering to skip irrelevant parquet files
+func (opt *FastPathOptimizer) CollectDataSourcesWithTimeFilter(ctx context.Context, hybridScanner *HybridMessageScanner, startTimeNs, stopTimeNs int64) (*TopicDataSources, error) {
dataSources := &TopicDataSources{
ParquetFiles: make(map[string][]*ParquetFileStats),
ParquetRowCount: 0,
@@ -125,14 +129,16 @@ func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScan
fmt.Printf(" No parquet files found in partition\n")
}
} else {
- dataSources.ParquetFiles[partitionPath] = parquetStats
+ // Prune by time range using parquet column statistics
+ filtered := pruneParquetFilesByTime(ctx, parquetStats, hybridScanner, startTimeNs, stopTimeNs)
+ dataSources.ParquetFiles[partitionPath] = filtered
partitionParquetRows := int64(0)
- for _, stat := range parquetStats {
+ for _, stat := range filtered {
partitionParquetRows += stat.RowCount
dataSources.ParquetRowCount += stat.RowCount
}
if isDebugMode(ctx) {
- fmt.Printf(" Found %d parquet files with %d total rows\n", len(parquetStats), partitionParquetRows)
+ fmt.Printf(" Found %d parquet files with %d total rows\n", len(filtered), partitionParquetRows)
}
}
@@ -452,20 +458,27 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS
}
}
- // Extract time filters for optimization
+ // Extract time filters and validate that WHERE clause contains only time-based predicates
startTimeNs, stopTimeNs := int64(0), int64(0)
+ onlyTimePredicates := true
if stmt.Where != nil {
- startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
+ startTimeNs, stopTimeNs, onlyTimePredicates = e.extractTimeFiltersWithValidation(stmt.Where.Expr)
}
- // FAST PATH RE-ENABLED WITH DEBUG LOGGING:
- // Added comprehensive debug logging to identify data counting issues
- // This will help us understand why fast path was returning 0 when slow path returns 1803
- if stmt.Where == nil {
+ // FAST PATH WITH TIME-BASED OPTIMIZATION:
+ // Allow fast path only for queries without WHERE clause or with time-only WHERE clauses
+ // This prevents incorrect results when non-time predicates are present
+ canAttemptFastPath := stmt.Where == nil || onlyTimePredicates
+
+ if canAttemptFastPath {
if isDebugMode(ctx) {
- fmt.Printf("\nFast path optimization attempt...\n")
+ if stmt.Where == nil {
+ fmt.Printf("\nFast path optimization attempt (no WHERE clause)...\n")
+ } else {
+ fmt.Printf("\nFast path optimization attempt (time-only WHERE clause)...\n")
+ }
}
- fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan)
+ fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan, startTimeNs, stopTimeNs, stmt)
if canOptimize {
if isDebugMode(ctx) {
fmt.Printf("Fast path optimization succeeded!\n")
@@ -478,7 +491,7 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS
}
} else {
if isDebugMode(ctx) {
- fmt.Printf("Fast path not applicable due to WHERE clause\n")
+ fmt.Printf("Fast path not applicable due to complex WHERE clause\n")
}
}
@@ -605,23 +618,66 @@ func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridS
// Build execution tree for aggregation queries if plan is provided
if plan != nil {
+ // Populate detailed plan information for full scan (similar to fast path)
+ e.populateFullScanPlanDetails(ctx, plan, hybridScanner, stmt)
plan.RootNode = e.buildExecutionTree(plan, stmt)
}
return result, nil
}
+// populateFullScanPlanDetails populates detailed plan information for full scan queries
+// This provides consistency with fast path execution plan details
+func (e *SQLEngine) populateFullScanPlanDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, stmt *SelectStatement) {
+ // plan.Details is initialized at the start of the SELECT execution
+
+ // Extract table information
+ var database, tableName string
+ if len(stmt.From) == 1 {
+ if table, ok := stmt.From[0].(*AliasedTableExpr); ok {
+ if tableExpr, ok := table.Expr.(TableName); ok {
+ tableName = tableExpr.Name.String()
+ if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" {
+ database = tableExpr.Qualifier.String()
+ }
+ }
+ }
+ }
+
+ // Use current database if not specified
+ if database == "" {
+ database = e.catalog.currentDatabase
+ if database == "" {
+ database = "default"
+ }
+ }
+
+ // Discover partitions and populate file details
+ if partitions, discoverErr := e.discoverTopicPartitions(database, tableName); discoverErr == nil {
+ // Add partition paths to execution plan details
+ plan.Details["partition_paths"] = partitions
+
+ // Populate detailed file information using shared helper
+ e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt)
+ } else {
+ // Record discovery error to plan for better diagnostics
+ plan.Details["error_partition_discovery"] = discoverErr.Error()
+ }
+}
+
// tryFastParquetAggregation attempts to compute aggregations using hybrid approach:
// - Use parquet metadata for parquet files
// - Count live log files for live data
// - Combine both for accurate results per partition
// Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used
func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) {
- return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil)
+ return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil, 0, 0, nil)
}
// tryFastParquetAggregationWithPlan is the same as tryFastParquetAggregation but also populates execution plan if provided
-func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan) (*QueryResult, bool) {
+// startTimeNs, stopTimeNs: optional time range filters for parquet file optimization (0 means no filtering)
+// stmt: SELECT statement for column statistics pruning optimization (can be nil)
+func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan, startTimeNs, stopTimeNs int64, stmt *SelectStatement) (*QueryResult, bool) {
// Use the new modular components
optimizer := NewFastPathOptimizer(e)
computer := NewAggregationComputer(e)
@@ -632,8 +688,8 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri
return nil, false
}
- // Step 2: Collect data sources
- dataSources, err := optimizer.CollectDataSources(ctx, hybridScanner)
+ // Step 2: Collect data sources with time filtering for parquet file optimization
+ dataSources, err := optimizer.CollectDataSourcesWithTimeFilter(ctx, hybridScanner, startTimeNs, stopTimeNs)
if err != nil {
return nil, false
}
@@ -725,9 +781,6 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri
}
// Merge details while preserving existing ones
- if plan.Details == nil {
- plan.Details = make(map[string]interface{})
- }
for key, value := range aggPlan.Details {
plan.Details[key] = value
}
@@ -735,51 +788,17 @@ func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybri
// Add file path information from the data collection
plan.Details["partition_paths"] = partitions
- // Collect actual file information for each partition
- var parquetFiles []string
- var liveLogFiles []string
- parquetSources := make(map[string]bool)
-
- for _, partitionPath := range partitions {
- // Get parquet files for this partition
- if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil {
- for _, stats := range parquetStats {
- parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName))
- }
- }
-
- // Merge accurate parquet sources from metadata (preferred over filename fallback)
- if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil {
- for src := range sources {
- parquetSources[src] = true
- }
- }
+ // Populate detailed file information using shared helper, including time filters for pruning
+ plan.Details[PlanDetailStartTimeNs] = startTimeNs
+ plan.Details[PlanDetailStopTimeNs] = stopTimeNs
+ e.populatePlanFileDetails(ctx, plan, hybridScanner, partitions, stmt)
- // Get live log files for this partition
- if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil {
- for _, fileName := range liveFiles {
- // Exclude live log files that have been converted to parquet (deduplicated)
- if parquetSources[fileName] {
- continue
- }
- liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName))
- }
- }
- }
-
- if len(parquetFiles) > 0 {
- plan.Details["parquet_files"] = parquetFiles
- }
- if len(liveLogFiles) > 0 {
- plan.Details["live_log_files"] = liveLogFiles
+ // Update counts to match discovered live log files
+ if liveLogFiles, ok := plan.Details["live_log_files"].([]string); ok {
+ dataSources.LiveLogFilesCount = len(liveLogFiles)
+ plan.LiveLogFilesScanned = len(liveLogFiles)
}
- // Update the dataSources.LiveLogFilesCount to match the actual files found
- dataSources.LiveLogFilesCount = len(liveLogFiles)
-
- // Also update the plan's LiveLogFilesScanned to match
- plan.LiveLogFilesScanned = len(liveLogFiles)
-
// Ensure PartitionsScanned is set so Statistics section appears
if plan.PartitionsScanned == 0 && len(partitions) > 0 {
plan.PartitionsScanned = len(partitions)
@@ -912,24 +931,3 @@ func debugHybridScanOptions(ctx context.Context, options HybridScanOptions, quer
fmt.Printf("==========================================\n")
}
}
-
-// collectLiveLogFileNames collects the names of live log files in a partition
-func collectLiveLogFileNames(filerClient filer_pb.FilerClient, partitionPath string) ([]string, error) {
- var fileNames []string
-
- err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
- // Skip directories and parquet files
- if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") || strings.HasSuffix(entry.Name, ".offset") {
- return nil
- }
-
- // Only include files with actual content
- if len(entry.Chunks) > 0 {
- fileNames = append(fileNames, entry.Name)
- }
-
- return nil
- })
-
- return fileNames, err
-}