1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
|
package engine
import (
"context"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/stretchr/testify/assert"
)
// TestFastPathCountFixRealistic tests the specific scenario mentioned in the bug report:
// Fast path returning 0 for COUNT(*) when slow path returns 1803
func TestFastPathCountFixRealistic(t *testing.T) {
engine := NewMockSQLEngine()
// Set up debug mode to see our new logging
ctx := context.WithValue(context.Background(), "debug", true)
// Create realistic data sources that mimic a scenario with 1803 rows
dataSources := &TopicDataSources{
ParquetFiles: map[string][]*ParquetFileStats{
"/topics/test/large-topic/0000-1023": {
{
RowCount: 800,
ColumnStats: map[string]*ParquetColumnStats{
"id": {
ColumnName: "id",
MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 1}},
MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 800}},
NullCount: 0,
RowCount: 800,
},
},
},
{
RowCount: 500,
ColumnStats: map[string]*ParquetColumnStats{
"id": {
ColumnName: "id",
MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 801}},
MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 1300}},
NullCount: 0,
RowCount: 500,
},
},
},
},
"/topics/test/large-topic/1024-2047": {
{
RowCount: 300,
ColumnStats: map[string]*ParquetColumnStats{
"id": {
ColumnName: "id",
MinValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 1301}},
MaxValue: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 1600}},
NullCount: 0,
RowCount: 300,
},
},
},
},
},
ParquetRowCount: 1600, // 800 + 500 + 300
LiveLogRowCount: 203, // Additional live log data
PartitionsCount: 2,
LiveLogFilesCount: 15,
}
partitions := []string{
"/topics/test/large-topic/0000-1023",
"/topics/test/large-topic/1024-2047",
}
t.Run("COUNT(*) should return correct total (1803)", func(t *testing.T) {
computer := NewAggregationComputer(engine.SQLEngine)
aggregations := []AggregationSpec{
{Function: FuncCOUNT, Column: "*", Alias: "COUNT(*)"},
}
results, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
assert.NoError(t, err, "Fast path aggregation should not error")
assert.Len(t, results, 1, "Should return one result")
// This is the key test - before our fix, this was returning 0
expectedCount := int64(1803) // 1600 (parquet) + 203 (live log)
actualCount := results[0].Count
assert.Equal(t, expectedCount, actualCount,
"COUNT(*) should return %d (1600 parquet + 203 live log), but got %d",
expectedCount, actualCount)
})
t.Run("MIN/MAX should work with multiple partitions", func(t *testing.T) {
computer := NewAggregationComputer(engine.SQLEngine)
aggregations := []AggregationSpec{
{Function: FuncMIN, Column: "id", Alias: "MIN(id)"},
{Function: FuncMAX, Column: "id", Alias: "MAX(id)"},
}
results, err := computer.ComputeFastPathAggregations(ctx, aggregations, dataSources, partitions)
assert.NoError(t, err, "Fast path aggregation should not error")
assert.Len(t, results, 2, "Should return two results")
// MIN should be the lowest across all parquet files
assert.Equal(t, int64(1), results[0].Min, "MIN should be 1")
// MAX should be the highest across all parquet files
assert.Equal(t, int64(1600), results[1].Max, "MAX should be 1600")
})
}
// TestFastPathDataSourceDiscoveryLogging tests that our debug logging works correctly
func TestFastPathDataSourceDiscoveryLogging(t *testing.T) {
// This test verifies that our enhanced data source collection structure is correct
t.Run("DataSources structure validation", func(t *testing.T) {
// Test the TopicDataSources structure initialization
dataSources := &TopicDataSources{
ParquetFiles: make(map[string][]*ParquetFileStats),
ParquetRowCount: 0,
LiveLogRowCount: 0,
LiveLogFilesCount: 0,
PartitionsCount: 0,
}
assert.NotNil(t, dataSources, "Data sources should not be nil")
assert.NotNil(t, dataSources.ParquetFiles, "ParquetFiles map should be initialized")
assert.GreaterOrEqual(t, dataSources.PartitionsCount, 0, "PartitionsCount should be non-negative")
assert.GreaterOrEqual(t, dataSources.ParquetRowCount, int64(0), "ParquetRowCount should be non-negative")
assert.GreaterOrEqual(t, dataSources.LiveLogRowCount, int64(0), "LiveLogRowCount should be non-negative")
})
}
// TestFastPathValidationLogic tests the enhanced validation we added
func TestFastPathValidationLogic(t *testing.T) {
t.Run("Validation catches data source vs computation mismatch", func(t *testing.T) {
// Create a scenario where data sources and computation might be inconsistent
dataSources := &TopicDataSources{
ParquetFiles: make(map[string][]*ParquetFileStats),
ParquetRowCount: 1000, // Data sources say 1000 rows
LiveLogRowCount: 0,
PartitionsCount: 1,
}
// But aggregation result says different count (simulating the original bug)
aggResults := []AggregationResult{
{Count: 0}, // Bug: returns 0 when data sources show 1000
}
// This simulates the validation logic from tryFastParquetAggregation
totalRows := dataSources.ParquetRowCount + dataSources.LiveLogRowCount
countResult := aggResults[0].Count
// Our validation should catch this mismatch
assert.NotEqual(t, totalRows, countResult,
"This test simulates the bug: data sources show %d but COUNT returns %d",
totalRows, countResult)
// In the real code, this would trigger a fallback to slow path
validationPassed := (countResult == totalRows)
assert.False(t, validationPassed, "Validation should fail for inconsistent data")
})
t.Run("Validation passes for consistent data", func(t *testing.T) {
// Create a scenario where everything is consistent
dataSources := &TopicDataSources{
ParquetFiles: make(map[string][]*ParquetFileStats),
ParquetRowCount: 1000,
LiveLogRowCount: 803,
PartitionsCount: 1,
}
// Aggregation result matches data sources
aggResults := []AggregationResult{
{Count: 1803}, // Correct: matches 1000 + 803
}
totalRows := dataSources.ParquetRowCount + dataSources.LiveLogRowCount
countResult := aggResults[0].Count
// Our validation should pass this
assert.Equal(t, totalRows, countResult,
"Validation should pass when data sources (%d) match COUNT result (%d)",
totalRows, countResult)
validationPassed := (countResult == totalRows)
assert.True(t, validationPassed, "Validation should pass for consistent data")
})
}
|