aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/engine.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/query/engine/engine.go')
-rw-r--r--weed/query/engine/engine.go327
1 files changed, 241 insertions, 86 deletions
diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go
index ffed03f35..e00fd78ca 100644
--- a/weed/query/engine/engine.go
+++ b/weed/query/engine/engine.go
@@ -1513,47 +1513,49 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *Se
var result *QueryResult
var err error
- if hasAggregations {
- // Extract table information for aggregation execution
- var database, tableName string
- if len(stmt.From) == 1 {
- if table, ok := stmt.From[0].(*AliasedTableExpr); ok {
- if tableExpr, ok := table.Expr.(TableName); ok {
- tableName = tableExpr.Name.String()
- if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" {
- database = tableExpr.Qualifier.String()
- }
+ // Extract table information for execution (needed for both aggregation and regular queries)
+ var database, tableName string
+ if len(stmt.From) == 1 {
+ if table, ok := stmt.From[0].(*AliasedTableExpr); ok {
+ if tableExpr, ok := table.Expr.(TableName); ok {
+ tableName = tableExpr.Name.String()
+ if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" {
+ database = tableExpr.Qualifier.String()
}
}
}
+ }
- // Use current database if not specified
+ // Use current database if not specified
+ if database == "" {
+ database = e.catalog.currentDatabase
if database == "" {
- database = e.catalog.currentDatabase
- if database == "" {
- database = "default"
- }
- }
-
- // Create hybrid scanner for aggregation execution
- var filerClient filer_pb.FilerClient
- if e.catalog.brokerClient != nil {
- filerClient, err = e.catalog.brokerClient.GetFilerClient()
- if err != nil {
- return &QueryResult{Error: err}, err
- }
+ database = "default"
}
+ }
- hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e)
+ // CRITICAL FIX: Always use HybridMessageScanner for ALL queries to read both flushed and unflushed data
+ // Create hybrid scanner for both aggregation and regular SELECT queries
+ var filerClient filer_pb.FilerClient
+ if e.catalog.brokerClient != nil {
+ filerClient, err = e.catalog.brokerClient.GetFilerClient()
if err != nil {
return &QueryResult{Error: err}, err
}
+ }
+ hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e)
+ if err != nil {
+ return &QueryResult{Error: err}, err
+ }
+
+ if hasAggregations {
// Execute aggregation query with plan tracking
result, err = e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, plan)
} else {
- // Regular SELECT query with plan tracking
- result, err = e.executeSelectStatementWithBrokerStats(ctx, stmt, plan)
+ // CRITICAL FIX: Use HybridMessageScanner for regular SELECT queries too
+ // This ensures both flushed and unflushed data are read
+ result, err = e.executeRegularSelectWithHybridScanner(ctx, hybridScanner, stmt, plan)
}
if err == nil && result != nil {
@@ -1981,6 +1983,198 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *SelectStat
return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil
}
+// executeRegularSelectWithHybridScanner handles regular SELECT queries using HybridMessageScanner
+// This ensures both flushed and unflushed data are read, fixing the SQL empty results issue
+func (e *SQLEngine) executeRegularSelectWithHybridScanner(ctx context.Context, hybridScanner *HybridMessageScanner, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) {
+ // Parse SELECT expressions to determine columns and detect aggregations
+ var columns []string
+ var aggregations []AggregationSpec
+ var hasAggregations bool
+ selectAll := false
+ baseColumnsSet := make(map[string]bool) // Track base columns needed for expressions
+
+ for _, selectExpr := range stmt.SelectExprs {
+ switch expr := selectExpr.(type) {
+ case *StarExpr:
+ selectAll = true
+ case *AliasedExpr:
+ switch col := expr.Expr.(type) {
+ case *ColName:
+ columnName := col.Name.String()
+ columns = append(columns, columnName)
+ baseColumnsSet[columnName] = true
+ case *FuncExpr:
+ funcName := strings.ToLower(col.Name.String())
+ if e.isAggregationFunction(funcName) {
+ // Handle aggregation functions
+ aggSpec, err := e.parseAggregationFunction(col, expr)
+ if err != nil {
+ return &QueryResult{Error: err}, err
+ }
+ aggregations = append(aggregations, *aggSpec)
+ hasAggregations = true
+ } else if e.isStringFunction(funcName) {
+ // Handle string functions like UPPER, LENGTH, etc.
+ columns = append(columns, e.getStringFunctionAlias(col))
+ // Extract base columns needed for this string function
+ e.extractBaseColumnsFromFunction(col, baseColumnsSet)
+ } else if e.isDateTimeFunction(funcName) {
+ // Handle datetime functions like CURRENT_DATE, NOW, EXTRACT, DATE_TRUNC
+ columns = append(columns, e.getDateTimeFunctionAlias(col))
+ // Extract base columns needed for this datetime function
+ e.extractBaseColumnsFromFunction(col, baseColumnsSet)
+ } else {
+ return &QueryResult{Error: fmt.Errorf("unsupported function: %s", funcName)}, fmt.Errorf("unsupported function: %s", funcName)
+ }
+ default:
+ err := fmt.Errorf("unsupported SELECT expression: %T", col)
+ return &QueryResult{Error: err}, err
+ }
+ default:
+ err := fmt.Errorf("unsupported SELECT expression: %T", expr)
+ return &QueryResult{Error: err}, err
+ }
+ }
+
+ // If we have aggregations, delegate to aggregation handler
+ if hasAggregations {
+ return e.executeAggregationQuery(ctx, hybridScanner, aggregations, stmt)
+ }
+
+ // Parse WHERE clause for predicate pushdown
+ var predicate func(*schema_pb.RecordValue) bool
+ var err error
+ if stmt.Where != nil {
+ predicate, err = e.buildPredicateWithContext(stmt.Where.Expr, stmt.SelectExprs)
+ if err != nil {
+ return &QueryResult{Error: err}, err
+ }
+ }
+
+ // Parse LIMIT and OFFSET clauses
+ // Use -1 to distinguish "no LIMIT" from "LIMIT 0"
+ limit := -1
+ offset := 0
+ if stmt.Limit != nil && stmt.Limit.Rowcount != nil {
+ switch limitExpr := stmt.Limit.Rowcount.(type) {
+ case *SQLVal:
+ if limitExpr.Type == IntVal {
+ var parseErr error
+ limit64, parseErr := strconv.ParseInt(string(limitExpr.Val), 10, 64)
+ if parseErr != nil {
+ return &QueryResult{Error: parseErr}, parseErr
+ }
+ if limit64 > math.MaxInt32 || limit64 < 0 {
+ return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64)
+ }
+ limit = int(limit64)
+ }
+ }
+ }
+
+ // Parse OFFSET clause if present
+ if stmt.Limit != nil && stmt.Limit.Offset != nil {
+ switch offsetExpr := stmt.Limit.Offset.(type) {
+ case *SQLVal:
+ if offsetExpr.Type == IntVal {
+ var parseErr error
+ offset64, parseErr := strconv.ParseInt(string(offsetExpr.Val), 10, 64)
+ if parseErr != nil {
+ return &QueryResult{Error: parseErr}, parseErr
+ }
+ if offset64 > math.MaxInt32 || offset64 < 0 {
+ return &QueryResult{Error: fmt.Errorf("OFFSET value %d is out of valid range", offset64)}, fmt.Errorf("OFFSET value %d is out of valid range", offset64)
+ }
+ offset = int(offset64)
+ }
+ }
+ }
+
+ // Build hybrid scan options
+ // Extract time filters from WHERE clause to optimize scanning
+ startTimeNs, stopTimeNs := int64(0), int64(0)
+ if stmt.Where != nil {
+ startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
+ }
+
+ hybridScanOptions := HybridScanOptions{
+ StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons
+ StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons
+ Limit: limit,
+ Offset: offset,
+ Predicate: predicate,
+ }
+
+ if !selectAll {
+ // Convert baseColumnsSet to slice for hybrid scan options
+ baseColumns := make([]string, 0, len(baseColumnsSet))
+ for columnName := range baseColumnsSet {
+ baseColumns = append(baseColumns, columnName)
+ }
+ // Use base columns (not expression aliases) for data retrieval
+ if len(baseColumns) > 0 {
+ hybridScanOptions.Columns = baseColumns
+ } else {
+ // If no base columns found (shouldn't happen), use original columns
+ hybridScanOptions.Columns = columns
+ }
+ }
+
+ // Execute the hybrid scan (both flushed and unflushed data)
+ var results []HybridScanResult
+ if plan != nil {
+ // EXPLAIN mode - capture broker buffer stats
+ var stats *HybridScanStats
+ results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions)
+ if err != nil {
+ return &QueryResult{Error: err}, err
+ }
+
+ // Populate plan with broker buffer information
+ if stats != nil {
+ plan.BrokerBufferQueried = stats.BrokerBufferQueried
+ plan.BrokerBufferMessages = stats.BrokerBufferMessages
+ plan.BufferStartIndex = stats.BufferStartIndex
+
+ // Add broker_buffer to data sources if buffer was queried
+ if stats.BrokerBufferQueried {
+ // Check if broker_buffer is already in data sources
+ hasBrokerBuffer := false
+ for _, source := range plan.DataSources {
+ if source == "broker_buffer" {
+ hasBrokerBuffer = true
+ break
+ }
+ }
+ if !hasBrokerBuffer {
+ plan.DataSources = append(plan.DataSources, "broker_buffer")
+ }
+ }
+ }
+ } else {
+ // Normal mode - just get results
+ results, err = hybridScanner.Scan(ctx, hybridScanOptions)
+ if err != nil {
+ return &QueryResult{Error: err}, err
+ }
+ }
+
+ // Convert to SQL result format
+ if selectAll {
+ if len(columns) > 0 {
+ // SELECT *, specific_columns - include both auto-discovered and explicit columns
+ return hybridScanner.ConvertToSQLResultWithMixedColumns(results, columns), nil
+ } else {
+ // SELECT * only - let converter determine all columns (excludes system columns)
+ columns = nil
+ return hybridScanner.ConvertToSQLResult(results, columns), nil
+ }
+ }
+
+ // Handle custom column expressions (including arithmetic)
+ return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil
+}
+
// executeSelectStatementWithBrokerStats handles SELECT queries with broker buffer statistics capture
// This is used by EXPLAIN queries to capture complete data source information including broker memory
func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) {
@@ -2237,10 +2431,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s
plan.Details[PlanDetailStartTimeNs] = startTimeNs
plan.Details[PlanDetailStopTimeNs] = stopTimeNs
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Time filters extracted - startTimeNs=%d stopTimeNs=%d\n", startTimeNs, stopTimeNs)
- }
-
// Collect actual file information for each partition
var parquetFiles []string
var liveLogFiles []string
@@ -2261,9 +2451,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s
columnPrunedCount := beforeColumnPrune - len(filteredStats)
if columnPrunedCount > 0 {
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath)
- }
// Track column statistics optimization
if !contains(plan.OptimizationsUsed, "column_statistics_pruning") {
plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning")
@@ -2275,9 +2462,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s
}
} else {
parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err))
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err)
- }
}
// Merge accurate parquet sources from metadata
@@ -2298,9 +2482,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s
}
} else {
liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err))
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err)
- }
}
}
@@ -2559,7 +2740,6 @@ func pruneParquetFilesByTime(ctx context.Context, parquetStats []*ParquetFileSta
return parquetStats
}
- debugEnabled := ctx != nil && isDebugMode(ctx)
qStart := startTimeNs
qStop := stopTimeNs
if qStop == 0 {
@@ -2568,21 +2748,10 @@ func pruneParquetFilesByTime(ctx context.Context, parquetStats []*ParquetFileSta
n := 0
for _, fs := range parquetStats {
- if debugEnabled {
- fmt.Printf("Debug: Checking parquet file %s for pruning\n", fs.FileName)
- }
if minNs, maxNs, ok := hybridScanner.getTimestampRangeFromStats(fs); ok {
- if debugEnabled {
- fmt.Printf("Debug: Prune check parquet %s min=%d max=%d qStart=%d qStop=%d\n", fs.FileName, minNs, maxNs, qStart, qStop)
- }
if qStop < minNs || (qStart != 0 && qStart > maxNs) {
- if debugEnabled {
- fmt.Printf("Debug: Skipping parquet file %s due to no time overlap\n", fs.FileName)
- }
continue
}
- } else if debugEnabled {
- fmt.Printf("Debug: No stats range available for parquet %s, cannot prune\n", fs.FileName)
}
parquetStats[n] = fs
n++
@@ -2596,13 +2765,9 @@ func (e *SQLEngine) pruneParquetFilesByColumnStats(ctx context.Context, parquetS
return parquetStats
}
- debugEnabled := ctx != nil && isDebugMode(ctx)
n := 0
for _, fs := range parquetStats {
if e.canSkipParquetFile(ctx, fs, whereExpr) {
- if debugEnabled {
- fmt.Printf("Debug: Skipping parquet file %s due to column statistics pruning\n", fs.FileName)
- }
continue
}
parquetStats[n] = fs
@@ -2726,7 +2891,6 @@ func (e *SQLEngine) flipOperator(op string) string {
// populatePlanFileDetails populates execution plan with detailed file information for partitions
// Includes column statistics pruning optimization when WHERE clause is provided
func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, partitions []string, stmt *SelectStatement) {
- debugEnabled := ctx != nil && isDebugMode(ctx)
// Collect actual file information for each partition
var parquetFiles []string
var liveLogFiles []string
@@ -2750,9 +2914,6 @@ func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExec
columnPrunedCount := beforeColumnPrune - len(filteredStats)
if columnPrunedCount > 0 {
- if debugEnabled {
- fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath)
- }
// Track column statistics optimization
if !contains(plan.OptimizationsUsed, "column_statistics_pruning") {
plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning")
@@ -2765,9 +2926,6 @@ func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExec
}
} else {
parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err))
- if debugEnabled {
- fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err)
- }
}
// Merge accurate parquet sources from metadata
@@ -2788,9 +2946,6 @@ func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExec
}
} else {
liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err))
- if debugEnabled {
- fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err)
- }
}
}
@@ -3848,7 +4003,7 @@ func (e *SQLEngine) createTable(ctx context.Context, stmt *DDLStatement) (*Query
// Create the topic via broker using configurable partition count
partitionCount := e.catalog.GetDefaultPartitionCount()
- err := e.catalog.brokerClient.ConfigureTopic(ctx, database, tableName, partitionCount, recordType)
+ err := e.catalog.brokerClient.ConfigureTopic(ctx, database, tableName, partitionCount, recordType, nil)
if err != nil {
return &QueryResult{Error: err}, err
}
@@ -4283,29 +4438,29 @@ func (e *SQLEngine) eachLogEntryInFile(filerClient filer_pb.FilerClient, filePat
// convertLogEntryToRecordValue helper method (reuse existing logic)
func (e *SQLEngine) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
- // Parse the log entry data as Protocol Buffer (not JSON!)
+ // Try to unmarshal as RecordValue first (schematized data)
recordValue := &schema_pb.RecordValue{}
- if err := proto.Unmarshal(logEntry.Data, recordValue); err != nil {
- return nil, "", fmt.Errorf("failed to unmarshal log entry protobuf: %v", err)
- }
+ err := proto.Unmarshal(logEntry.Data, recordValue)
+ if err == nil {
+ // Successfully unmarshaled as RecordValue (valid protobuf)
+ // Initialize Fields map if nil
+ if recordValue.Fields == nil {
+ recordValue.Fields = make(map[string]*schema_pb.Value)
+ }
- // Ensure Fields map exists
- if recordValue.Fields == nil {
- recordValue.Fields = make(map[string]*schema_pb.Value)
- }
+ // Add system columns from LogEntry
+ recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
+ }
+ recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
+ }
- // Add system columns
- recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
- Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
- }
- recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
- Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
+ return recordValue, "live_log", nil
}
- // User data fields are already present in the protobuf-deserialized recordValue
- // No additional processing needed since proto.Unmarshal already populated the Fields map
-
- return recordValue, "live_log", nil
+ // Failed to unmarshal as RecordValue - invalid protobuf data
+ return nil, "", fmt.Errorf("failed to unmarshal log entry protobuf: %w", err)
}
// extractTimestampFromFilename extracts timestamp from parquet filename
@@ -4782,7 +4937,7 @@ func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string)
// discoverAndRegisterTopic attempts to discover an existing topic and register it in the SQL catalog
func (e *SQLEngine) discoverAndRegisterTopic(ctx context.Context, database, tableName string) error {
// First, check if topic exists by trying to get its schema from the broker/filer
- recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName)
+ recordType, _, _, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName)
if err != nil {
return fmt.Errorf("topic %s.%s not found or no schema available: %v", database, tableName, err)
}