1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
|
package engine
import (
"context"
"fmt"
"math"
"strconv"
"strings"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// AggregationSpec defines an aggregation function to be computed
type AggregationSpec struct {
Function string // COUNT, SUM, AVG, MIN, MAX
Column string // Column name, or "*" for COUNT(*)
Alias string // Optional alias for the result column
Distinct bool // Support for DISTINCT keyword
}
// AggregationResult holds the computed result of an aggregation
type AggregationResult struct {
Count int64
Sum float64
Min interface{}
Max interface{}
}
// AggregationStrategy represents the strategy for executing aggregations
type AggregationStrategy struct {
CanUseFastPath bool
Reason string
UnsupportedSpecs []AggregationSpec
}
// TopicDataSources represents the data sources available for a topic
type TopicDataSources struct {
ParquetFiles map[string][]*ParquetFileStats // partitionPath -> parquet file stats
ParquetRowCount int64
LiveLogRowCount int64
LiveLogFilesCount int // Total count of live log files across all partitions
PartitionsCount int
BrokerUnflushedCount int64
}
// FastPathOptimizer handles fast path aggregation optimization decisions
type FastPathOptimizer struct {
engine *SQLEngine
}
// NewFastPathOptimizer creates a new fast path optimizer
func NewFastPathOptimizer(engine *SQLEngine) *FastPathOptimizer {
return &FastPathOptimizer{engine: engine}
}
// DetermineStrategy analyzes aggregations and determines if fast path can be used
func (opt *FastPathOptimizer) DetermineStrategy(aggregations []AggregationSpec) AggregationStrategy {
strategy := AggregationStrategy{
CanUseFastPath: true,
Reason: "all_aggregations_supported",
UnsupportedSpecs: []AggregationSpec{},
}
for _, spec := range aggregations {
if !opt.engine.canUseParquetStatsForAggregation(spec) {
strategy.CanUseFastPath = false
strategy.Reason = "unsupported_aggregation_functions"
strategy.UnsupportedSpecs = append(strategy.UnsupportedSpecs, spec)
}
}
return strategy
}
// CollectDataSources gathers information about available data sources for a topic
func (opt *FastPathOptimizer) CollectDataSources(ctx context.Context, hybridScanner *HybridMessageScanner) (*TopicDataSources, error) {
dataSources := &TopicDataSources{
ParquetFiles: make(map[string][]*ParquetFileStats),
ParquetRowCount: 0,
LiveLogRowCount: 0,
LiveLogFilesCount: 0,
PartitionsCount: 0,
}
if isDebugMode(ctx) {
fmt.Printf("Collecting data sources for: %s/%s\n", hybridScanner.topic.Namespace, hybridScanner.topic.Name)
}
// Discover partitions for the topic
partitionPaths, err := opt.engine.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
if err != nil {
if isDebugMode(ctx) {
fmt.Printf("ERROR: Partition discovery failed: %v\n", err)
}
return dataSources, DataSourceError{
Source: "partition_discovery",
Cause: err,
}
}
// DEBUG: Log discovered partitions
if isDebugMode(ctx) {
fmt.Printf("Discovered %d partitions: %v\n", len(partitionPaths), partitionPaths)
}
// Collect stats from each partition
// Note: discoverTopicPartitions always returns absolute paths starting with "/topics/"
for _, partitionPath := range partitionPaths {
if isDebugMode(ctx) {
fmt.Printf("\nProcessing partition: %s\n", partitionPath)
}
// Read parquet file statistics
parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath)
if err != nil {
if isDebugMode(ctx) {
fmt.Printf(" ERROR: Failed to read parquet statistics: %v\n", err)
}
} else if len(parquetStats) == 0 {
if isDebugMode(ctx) {
fmt.Printf(" No parquet files found in partition\n")
}
} else {
dataSources.ParquetFiles[partitionPath] = parquetStats
partitionParquetRows := int64(0)
for _, stat := range parquetStats {
partitionParquetRows += stat.RowCount
dataSources.ParquetRowCount += stat.RowCount
}
if isDebugMode(ctx) {
fmt.Printf(" Found %d parquet files with %d total rows\n", len(parquetStats), partitionParquetRows)
}
}
// Count live log files (excluding those converted to parquet)
parquetSources := opt.engine.extractParquetSourceFiles(dataSources.ParquetFiles[partitionPath])
liveLogCount, liveLogErr := opt.engine.countLiveLogRowsExcludingParquetSources(ctx, partitionPath, parquetSources)
if liveLogErr != nil {
if isDebugMode(ctx) {
fmt.Printf(" ERROR: Failed to count live log rows: %v\n", liveLogErr)
}
} else {
dataSources.LiveLogRowCount += liveLogCount
if isDebugMode(ctx) {
fmt.Printf(" Found %d live log rows (excluding %d parquet sources)\n", liveLogCount, len(parquetSources))
}
}
// Count live log files for partition with proper range values
// Extract partition name from absolute path (e.g., "0000-2520" from "/topics/.../v2025.../0000-2520")
partitionName := partitionPath[strings.LastIndex(partitionPath, "/")+1:]
partitionParts := strings.Split(partitionName, "-")
if len(partitionParts) == 2 {
rangeStart, err1 := strconv.Atoi(partitionParts[0])
rangeStop, err2 := strconv.Atoi(partitionParts[1])
if err1 == nil && err2 == nil {
partition := topic.Partition{
RangeStart: int32(rangeStart),
RangeStop: int32(rangeStop),
}
liveLogFileCount, err := hybridScanner.countLiveLogFiles(partition)
if err == nil {
dataSources.LiveLogFilesCount += liveLogFileCount
}
// Count broker unflushed messages for this partition
if hybridScanner.brokerClient != nil {
entries, err := hybridScanner.brokerClient.GetUnflushedMessages(ctx, hybridScanner.topic.Namespace, hybridScanner.topic.Name, partition, 0)
if err == nil {
dataSources.BrokerUnflushedCount += int64(len(entries))
if isDebugMode(ctx) {
fmt.Printf(" Found %d unflushed broker messages\n", len(entries))
}
} else if isDebugMode(ctx) {
fmt.Printf(" ERROR: Failed to get unflushed broker messages: %v\n", err)
}
}
}
}
}
dataSources.PartitionsCount = len(partitionPaths)
if isDebugMode(ctx) {
fmt.Printf("Data sources collected: %d partitions, %d parquet rows, %d live log rows, %d broker buffer rows\n",
dataSources.PartitionsCount, dataSources.ParquetRowCount, dataSources.LiveLogRowCount, dataSources.BrokerUnflushedCount)
}
return dataSources, nil
}
// AggregationComputer handles the computation of aggregations using fast path
type AggregationComputer struct {
engine *SQLEngine
}
// NewAggregationComputer creates a new aggregation computer
func NewAggregationComputer(engine *SQLEngine) *AggregationComputer {
return &AggregationComputer{engine: engine}
}
// ComputeFastPathAggregations computes aggregations using parquet statistics and live log data
func (comp *AggregationComputer) ComputeFastPathAggregations(
ctx context.Context,
aggregations []AggregationSpec,
dataSources *TopicDataSources,
partitions []string,
) ([]AggregationResult, error) {
aggResults := make([]AggregationResult, len(aggregations))
for i, spec := range aggregations {
switch spec.Function {
case FuncCOUNT:
if spec.Column == "*" {
aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
} else {
// For specific columns, we might need to account for NULLs in the future
aggResults[i].Count = dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
}
case FuncMIN:
globalMin, err := comp.computeGlobalMin(spec, dataSources, partitions)
if err != nil {
return nil, AggregationError{
Operation: spec.Function,
Column: spec.Column,
Cause: err,
}
}
aggResults[i].Min = globalMin
case FuncMAX:
globalMax, err := comp.computeGlobalMax(spec, dataSources, partitions)
if err != nil {
return nil, AggregationError{
Operation: spec.Function,
Column: spec.Column,
Cause: err,
}
}
aggResults[i].Max = globalMax
default:
return nil, OptimizationError{
Strategy: "fast_path_aggregation",
Reason: fmt.Sprintf("unsupported aggregation function: %s", spec.Function),
}
}
}
return aggResults, nil
}
// computeGlobalMin computes the global minimum value across all data sources
func (comp *AggregationComputer) computeGlobalMin(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) {
var globalMin interface{}
var globalMinValue *schema_pb.Value
hasParquetStats := false
// Step 1: Get minimum from parquet statistics
for _, fileStats := range dataSources.ParquetFiles {
for _, fileStat := range fileStats {
// Try case-insensitive column lookup
var colStats *ParquetColumnStats
var found bool
// First try exact match
if stats, exists := fileStat.ColumnStats[spec.Column]; exists {
colStats = stats
found = true
} else {
// Try case-insensitive lookup
for colName, stats := range fileStat.ColumnStats {
if strings.EqualFold(colName, spec.Column) {
colStats = stats
found = true
break
}
}
}
if found && colStats != nil && colStats.MinValue != nil {
if globalMinValue == nil || comp.engine.compareValues(colStats.MinValue, globalMinValue) < 0 {
globalMinValue = colStats.MinValue
extractedValue := comp.engine.extractRawValue(colStats.MinValue)
if extractedValue != nil {
globalMin = extractedValue
hasParquetStats = true
}
}
}
}
}
// Step 2: Get minimum from live log data (only if no live logs or if we need to compare)
if dataSources.LiveLogRowCount > 0 {
for _, partition := range partitions {
partitionParquetSources := make(map[string]bool)
if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists {
partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats)
}
liveLogMin, _, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources)
if err != nil {
continue // Skip partitions with errors
}
if liveLogMin != nil {
if globalMin == nil {
globalMin = liveLogMin
} else {
liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMin)
if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMinValue) < 0 {
globalMin = liveLogMin
globalMinValue = liveLogSchemaValue
}
}
}
}
}
// Step 3: Handle system columns if no regular data found
if globalMin == nil && !hasParquetStats {
globalMin = comp.engine.getSystemColumnGlobalMin(spec.Column, dataSources.ParquetFiles)
}
return globalMin, nil
}
// computeGlobalMax computes the global maximum value across all data sources
func (comp *AggregationComputer) computeGlobalMax(spec AggregationSpec, dataSources *TopicDataSources, partitions []string) (interface{}, error) {
var globalMax interface{}
var globalMaxValue *schema_pb.Value
hasParquetStats := false
// Step 1: Get maximum from parquet statistics
for _, fileStats := range dataSources.ParquetFiles {
for _, fileStat := range fileStats {
// Try case-insensitive column lookup
var colStats *ParquetColumnStats
var found bool
// First try exact match
if stats, exists := fileStat.ColumnStats[spec.Column]; exists {
colStats = stats
found = true
} else {
// Try case-insensitive lookup
for colName, stats := range fileStat.ColumnStats {
if strings.EqualFold(colName, spec.Column) {
colStats = stats
found = true
break
}
}
}
if found && colStats != nil && colStats.MaxValue != nil {
if globalMaxValue == nil || comp.engine.compareValues(colStats.MaxValue, globalMaxValue) > 0 {
globalMaxValue = colStats.MaxValue
extractedValue := comp.engine.extractRawValue(colStats.MaxValue)
if extractedValue != nil {
globalMax = extractedValue
hasParquetStats = true
}
}
}
}
}
// Step 2: Get maximum from live log data (only if live logs exist)
if dataSources.LiveLogRowCount > 0 {
for _, partition := range partitions {
partitionParquetSources := make(map[string]bool)
if partitionFileStats, exists := dataSources.ParquetFiles[partition]; exists {
partitionParquetSources = comp.engine.extractParquetSourceFiles(partitionFileStats)
}
_, liveLogMax, err := comp.engine.computeLiveLogMinMax(partition, spec.Column, partitionParquetSources)
if err != nil {
continue // Skip partitions with errors
}
if liveLogMax != nil {
if globalMax == nil {
globalMax = liveLogMax
} else {
liveLogSchemaValue := comp.engine.convertRawValueToSchemaValue(liveLogMax)
if liveLogSchemaValue != nil && comp.engine.compareValues(liveLogSchemaValue, globalMaxValue) > 0 {
globalMax = liveLogMax
globalMaxValue = liveLogSchemaValue
}
}
}
}
}
// Step 3: Handle system columns if no regular data found
if globalMax == nil && !hasParquetStats {
globalMax = comp.engine.getSystemColumnGlobalMax(spec.Column, dataSources.ParquetFiles)
}
return globalMax, nil
}
// executeAggregationQuery handles SELECT queries with aggregation functions
func (e *SQLEngine) executeAggregationQuery(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *SelectStatement) (*QueryResult, error) {
return e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, nil)
}
// executeAggregationQueryWithPlan handles SELECT queries with aggregation functions and populates execution plan
func (e *SQLEngine) executeAggregationQueryWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) {
// Parse LIMIT and OFFSET for aggregation results (do this first)
// Use -1 to distinguish "no LIMIT" from "LIMIT 0"
limit := -1
offset := 0
if stmt.Limit != nil && stmt.Limit.Rowcount != nil {
if limitExpr, ok := stmt.Limit.Rowcount.(*SQLVal); ok && limitExpr.Type == IntVal {
if limit64, err := strconv.ParseInt(string(limitExpr.Val), 10, 64); err == nil {
if limit64 > int64(math.MaxInt) || limit64 < 0 {
return nil, fmt.Errorf("LIMIT value %d is out of range", limit64)
}
// Safe conversion after bounds check
limit = int(limit64)
}
}
}
if stmt.Limit != nil && stmt.Limit.Offset != nil {
if offsetExpr, ok := stmt.Limit.Offset.(*SQLVal); ok && offsetExpr.Type == IntVal {
if offset64, err := strconv.ParseInt(string(offsetExpr.Val), 10, 64); err == nil {
if offset64 > int64(math.MaxInt) || offset64 < 0 {
return nil, fmt.Errorf("OFFSET value %d is out of range", offset64)
}
// Safe conversion after bounds check
offset = int(offset64)
}
}
}
// Parse WHERE clause for filtering
var predicate func(*schema_pb.RecordValue) bool
var err error
if stmt.Where != nil {
predicate, err = e.buildPredicate(stmt.Where.Expr)
if err != nil {
return &QueryResult{Error: err}, err
}
}
// Extract time filters for optimization
startTimeNs, stopTimeNs := int64(0), int64(0)
if stmt.Where != nil {
startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
}
// FAST PATH RE-ENABLED WITH DEBUG LOGGING:
// Added comprehensive debug logging to identify data counting issues
// This will help us understand why fast path was returning 0 when slow path returns 1803
if stmt.Where == nil {
if isDebugMode(ctx) {
fmt.Printf("\nFast path optimization attempt...\n")
}
fastResult, canOptimize := e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, plan)
if canOptimize {
if isDebugMode(ctx) {
fmt.Printf("Fast path optimization succeeded!\n")
}
return fastResult, nil
} else {
if isDebugMode(ctx) {
fmt.Printf("Fast path optimization failed, falling back to slow path\n")
}
}
} else {
if isDebugMode(ctx) {
fmt.Printf("Fast path not applicable due to WHERE clause\n")
}
}
// SLOW PATH: Fall back to full table scan
if isDebugMode(ctx) {
fmt.Printf("Using full table scan for aggregation (parquet optimization not applicable)\n")
}
// Extract columns needed for aggregations
columnsNeeded := make(map[string]bool)
for _, spec := range aggregations {
if spec.Column != "*" {
columnsNeeded[spec.Column] = true
}
}
// Convert to slice
var scanColumns []string
if len(columnsNeeded) > 0 {
scanColumns = make([]string, 0, len(columnsNeeded))
for col := range columnsNeeded {
scanColumns = append(scanColumns, col)
}
}
// If no specific columns needed (COUNT(*) only), don't specify columns (scan all)
// Build scan options for full table scan (aggregations need all data during scanning)
hybridScanOptions := HybridScanOptions{
StartTimeNs: startTimeNs,
StopTimeNs: stopTimeNs,
Limit: -1, // Use -1 to mean "no limit" - need all data for aggregation
Offset: 0, // No offset during scanning - OFFSET applies to final results
Predicate: predicate,
Columns: scanColumns, // Include columns needed for aggregation functions
}
// DEBUG: Log scan options for aggregation
debugHybridScanOptions(ctx, hybridScanOptions, "AGGREGATION")
// Execute the hybrid scan to get all matching records
var results []HybridScanResult
if plan != nil {
// EXPLAIN mode - capture broker buffer stats
var stats *HybridScanStats
results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions)
if err != nil {
return &QueryResult{Error: err}, err
}
// Populate plan with broker buffer information
if stats != nil {
plan.BrokerBufferQueried = stats.BrokerBufferQueried
plan.BrokerBufferMessages = stats.BrokerBufferMessages
plan.BufferStartIndex = stats.BufferStartIndex
// Add broker_buffer to data sources if buffer was queried
if stats.BrokerBufferQueried {
// Check if broker_buffer is already in data sources
hasBrokerBuffer := false
for _, source := range plan.DataSources {
if source == "broker_buffer" {
hasBrokerBuffer = true
break
}
}
if !hasBrokerBuffer {
plan.DataSources = append(plan.DataSources, "broker_buffer")
}
}
}
} else {
// Normal mode - just get results
results, err = hybridScanner.Scan(ctx, hybridScanOptions)
if err != nil {
return &QueryResult{Error: err}, err
}
}
// DEBUG: Log scan results
if isDebugMode(ctx) {
fmt.Printf("AGGREGATION SCAN RESULTS: %d rows returned\n", len(results))
}
// Compute aggregations
aggResults := e.computeAggregations(results, aggregations)
// Build result set
columns := make([]string, len(aggregations))
row := make([]sqltypes.Value, len(aggregations))
for i, spec := range aggregations {
columns[i] = spec.Alias
row[i] = e.formatAggregationResult(spec, aggResults[i])
}
// Apply OFFSET and LIMIT to aggregation results
// Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows
rows := [][]sqltypes.Value{row}
if offset > 0 || limit >= 0 {
// Handle LIMIT 0 first
if limit == 0 {
rows = [][]sqltypes.Value{}
} else {
// Apply OFFSET first
if offset > 0 {
if offset >= len(rows) {
rows = [][]sqltypes.Value{}
} else {
rows = rows[offset:]
}
}
// Apply LIMIT after OFFSET (only if limit > 0)
if limit > 0 && len(rows) > limit {
rows = rows[:limit]
}
}
}
result := &QueryResult{
Columns: columns,
Rows: rows,
}
// Build execution tree for aggregation queries if plan is provided
if plan != nil {
plan.RootNode = e.buildExecutionTree(plan, stmt)
}
return result, nil
}
// tryFastParquetAggregation attempts to compute aggregations using hybrid approach:
// - Use parquet metadata for parquet files
// - Count live log files for live data
// - Combine both for accurate results per partition
// Returns (result, canOptimize) where canOptimize=true means the hybrid fast path was used
func (e *SQLEngine) tryFastParquetAggregation(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec) (*QueryResult, bool) {
return e.tryFastParquetAggregationWithPlan(ctx, hybridScanner, aggregations, nil)
}
// tryFastParquetAggregationWithPlan is the same as tryFastParquetAggregation but also populates execution plan if provided
func (e *SQLEngine) tryFastParquetAggregationWithPlan(ctx context.Context, hybridScanner *HybridMessageScanner, aggregations []AggregationSpec, plan *QueryExecutionPlan) (*QueryResult, bool) {
// Use the new modular components
optimizer := NewFastPathOptimizer(e)
computer := NewAggregationComputer(e)
// Step 1: Determine strategy
strategy := optimizer.DetermineStrategy(aggregations)
if !strategy.CanUseFastPath {
return nil, false
}
// Step 2: Collect data sources
dataSources, err := optimizer.CollectDataSources(ctx, hybridScanner)
if err != nil {
return nil, false
}
// Build partition list for aggregation computer
// Note: discoverTopicPartitions always returns absolute paths
partitions, err := e.discoverTopicPartitions(hybridScanner.topic.Namespace, hybridScanner.topic.Name)
if err != nil {
return nil, false
}
// Debug: Show the hybrid optimization results (only in explain mode)
if isDebugMode(ctx) && (dataSources.ParquetRowCount > 0 || dataSources.LiveLogRowCount > 0 || dataSources.BrokerUnflushedCount > 0) {
partitionsWithLiveLogs := 0
if dataSources.LiveLogRowCount > 0 || dataSources.BrokerUnflushedCount > 0 {
partitionsWithLiveLogs = 1 // Simplified for now
}
fmt.Printf("Hybrid fast aggregation with deduplication: %d parquet rows + %d deduplicated live log rows + %d broker buffer rows from %d partitions\n",
dataSources.ParquetRowCount, dataSources.LiveLogRowCount, dataSources.BrokerUnflushedCount, partitionsWithLiveLogs)
}
// Step 3: Compute aggregations using fast path
aggResults, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
if err != nil {
return nil, false
}
// Step 3.5: Validate fast path results (safety check)
// For simple COUNT(*) queries, ensure we got a reasonable result
if len(aggregations) == 1 && aggregations[0].Function == FuncCOUNT && aggregations[0].Column == "*" {
totalRows := dataSources.ParquetRowCount + dataSources.LiveLogRowCount + dataSources.BrokerUnflushedCount
countResult := aggResults[0].Count
if isDebugMode(ctx) {
fmt.Printf("Validating fast path: COUNT=%d, Sources=%d\n", countResult, totalRows)
}
if totalRows == 0 && countResult > 0 {
// Fast path found data but data sources show 0 - this suggests a bug
if isDebugMode(ctx) {
fmt.Printf("Fast path validation failed: COUNT=%d but sources=0\n", countResult)
}
return nil, false
}
if totalRows > 0 && countResult == 0 {
// Data sources show data but COUNT is 0 - this also suggests a bug
if isDebugMode(ctx) {
fmt.Printf("Fast path validation failed: sources=%d but COUNT=0\n", totalRows)
}
return nil, false
}
if countResult != totalRows {
// Counts don't match - this suggests inconsistent logic
if isDebugMode(ctx) {
fmt.Printf("Fast path validation failed: COUNT=%d != sources=%d\n", countResult, totalRows)
}
return nil, false
}
if isDebugMode(ctx) {
fmt.Printf("Fast path validation passed: COUNT=%d\n", countResult)
}
}
// Step 4: Populate execution plan if provided (for EXPLAIN queries)
if plan != nil {
strategy := optimizer.DetermineStrategy(aggregations)
builder := &ExecutionPlanBuilder{}
// Create a minimal SELECT statement for the plan builder (avoid nil pointer)
stmt := &SelectStatement{}
// Build aggregation plan with fast path strategy
aggPlan := builder.BuildAggregationPlan(stmt, aggregations, strategy, dataSources)
// Copy relevant fields to the main plan
plan.ExecutionStrategy = aggPlan.ExecutionStrategy
plan.DataSources = aggPlan.DataSources
plan.OptimizationsUsed = aggPlan.OptimizationsUsed
plan.PartitionsScanned = aggPlan.PartitionsScanned
plan.ParquetFilesScanned = aggPlan.ParquetFilesScanned
plan.LiveLogFilesScanned = aggPlan.LiveLogFilesScanned
plan.TotalRowsProcessed = aggPlan.TotalRowsProcessed
plan.Aggregations = aggPlan.Aggregations
// Indicate broker buffer participation for EXPLAIN tree rendering
if dataSources.BrokerUnflushedCount > 0 {
plan.BrokerBufferQueried = true
plan.BrokerBufferMessages = int(dataSources.BrokerUnflushedCount)
}
// Merge details while preserving existing ones
if plan.Details == nil {
plan.Details = make(map[string]interface{})
}
for key, value := range aggPlan.Details {
plan.Details[key] = value
}
// Add file path information from the data collection
plan.Details["partition_paths"] = partitions
// Collect actual file information for each partition
var parquetFiles []string
var liveLogFiles []string
parquetSources := make(map[string]bool)
for _, partitionPath := range partitions {
// Get parquet files for this partition
if parquetStats, err := hybridScanner.ReadParquetStatistics(partitionPath); err == nil {
for _, stats := range parquetStats {
parquetFiles = append(parquetFiles, fmt.Sprintf("%s/%s", partitionPath, stats.FileName))
}
}
// Merge accurate parquet sources from metadata (preferred over filename fallback)
if sources, err := e.getParquetSourceFilesFromMetadata(partitionPath); err == nil {
for src := range sources {
parquetSources[src] = true
}
}
// Get live log files for this partition
if liveFiles, err := e.collectLiveLogFileNames(hybridScanner.filerClient, partitionPath); err == nil {
for _, fileName := range liveFiles {
// Exclude live log files that have been converted to parquet (deduplicated)
if parquetSources[fileName] {
continue
}
liveLogFiles = append(liveLogFiles, fmt.Sprintf("%s/%s", partitionPath, fileName))
}
}
}
if len(parquetFiles) > 0 {
plan.Details["parquet_files"] = parquetFiles
}
if len(liveLogFiles) > 0 {
plan.Details["live_log_files"] = liveLogFiles
}
// Update the dataSources.LiveLogFilesCount to match the actual files found
dataSources.LiveLogFilesCount = len(liveLogFiles)
// Also update the plan's LiveLogFilesScanned to match
plan.LiveLogFilesScanned = len(liveLogFiles)
// Ensure PartitionsScanned is set so Statistics section appears
if plan.PartitionsScanned == 0 && len(partitions) > 0 {
plan.PartitionsScanned = len(partitions)
}
if isDebugMode(ctx) {
fmt.Printf("Populated execution plan with fast path strategy\n")
}
}
// Step 5: Build final query result
columns := make([]string, len(aggregations))
row := make([]sqltypes.Value, len(aggregations))
for i, spec := range aggregations {
columns[i] = spec.Alias
row[i] = e.formatAggregationResult(spec, aggResults[i])
}
result := &QueryResult{
Columns: columns,
Rows: [][]sqltypes.Value{row},
}
return result, true
}
// computeAggregations computes aggregation results from a full table scan
func (e *SQLEngine) computeAggregations(results []HybridScanResult, aggregations []AggregationSpec) []AggregationResult {
aggResults := make([]AggregationResult, len(aggregations))
for i, spec := range aggregations {
switch spec.Function {
case FuncCOUNT:
if spec.Column == "*" {
aggResults[i].Count = int64(len(results))
} else {
count := int64(0)
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil && !e.isNullValue(value) {
count++
}
}
aggResults[i].Count = count
}
case FuncSUM:
sum := float64(0)
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil {
if numValue := e.convertToNumber(value); numValue != nil {
sum += *numValue
}
}
}
aggResults[i].Sum = sum
case FuncAVG:
sum := float64(0)
count := int64(0)
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil {
if numValue := e.convertToNumber(value); numValue != nil {
sum += *numValue
count++
}
}
}
if count > 0 {
aggResults[i].Sum = sum / float64(count) // Store average in Sum field
aggResults[i].Count = count
}
case FuncMIN:
var min interface{}
var minValue *schema_pb.Value
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil {
if minValue == nil || e.compareValues(value, minValue) < 0 {
minValue = value
min = e.extractRawValue(value)
}
}
}
aggResults[i].Min = min
case FuncMAX:
var max interface{}
var maxValue *schema_pb.Value
for _, result := range results {
if value := e.findColumnValue(result, spec.Column); value != nil {
if maxValue == nil || e.compareValues(value, maxValue) > 0 {
maxValue = value
max = e.extractRawValue(value)
}
}
}
aggResults[i].Max = max
}
}
return aggResults
}
// canUseParquetStatsForAggregation determines if an aggregation can be optimized with parquet stats
func (e *SQLEngine) canUseParquetStatsForAggregation(spec AggregationSpec) bool {
switch spec.Function {
case FuncCOUNT:
return spec.Column == "*" || e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column)
case FuncMIN, FuncMAX:
return e.isSystemColumn(spec.Column) || e.isRegularColumn(spec.Column)
case FuncSUM, FuncAVG:
// These require scanning actual values, not just min/max
return false
default:
return false
}
}
// debugHybridScanOptions logs the exact scan options being used
func debugHybridScanOptions(ctx context.Context, options HybridScanOptions, queryType string) {
if isDebugMode(ctx) {
fmt.Printf("\n=== HYBRID SCAN OPTIONS DEBUG (%s) ===\n", queryType)
fmt.Printf("StartTimeNs: %d\n", options.StartTimeNs)
fmt.Printf("StopTimeNs: %d\n", options.StopTimeNs)
fmt.Printf("Limit: %d\n", options.Limit)
fmt.Printf("Offset: %d\n", options.Offset)
fmt.Printf("Predicate: %v\n", options.Predicate != nil)
fmt.Printf("Columns: %v\n", options.Columns)
fmt.Printf("==========================================\n")
}
}
// collectLiveLogFileNames collects the names of live log files in a partition
func collectLiveLogFileNames(filerClient filer_pb.FilerClient, partitionPath string) ([]string, error) {
var fileNames []string
err := filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
// Skip directories and parquet files
if entry.IsDirectory || strings.HasSuffix(entry.Name, ".parquet") || strings.HasSuffix(entry.Name, ".offset") {
return nil
}
// Only include files with actual content
if len(entry.Chunks) > 0 {
fileNames = append(fileNames, entry.Name)
}
return nil
})
return fileNames, err
}
|