1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
|
package engine
import (
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
)
// System column constants used throughout the SQL engine
const (
SW_COLUMN_NAME_TIMESTAMP = "_timestamp_ns" // Message timestamp in nanoseconds (internal)
SW_COLUMN_NAME_KEY = "_key" // Message key
SW_COLUMN_NAME_SOURCE = "_source" // Data source (live_log, parquet_archive, etc.)
)
// System column display names (what users see)
const (
SW_DISPLAY_NAME_TIMESTAMP = "_ts" // User-facing timestamp column name
// Note: _key and _source keep the same names, only _timestamp_ns changes to _ts
)
// isSystemColumn checks if a column is a system column (_timestamp_ns, _key, _source)
func (e *SQLEngine) isSystemColumn(columnName string) bool {
lowerName := strings.ToLower(columnName)
return lowerName == SW_COLUMN_NAME_TIMESTAMP ||
lowerName == SW_COLUMN_NAME_KEY ||
lowerName == SW_COLUMN_NAME_SOURCE
}
// isRegularColumn checks if a column might be a regular data column (placeholder)
func (e *SQLEngine) isRegularColumn(columnName string) bool {
// For now, assume any non-system column is a regular column
return !e.isSystemColumn(columnName)
}
// getSystemColumnDisplayName returns the user-facing display name for system columns
func (e *SQLEngine) getSystemColumnDisplayName(columnName string) string {
lowerName := strings.ToLower(columnName)
switch lowerName {
case SW_COLUMN_NAME_TIMESTAMP:
return SW_DISPLAY_NAME_TIMESTAMP
case SW_COLUMN_NAME_KEY:
return SW_COLUMN_NAME_KEY // _key stays the same
case SW_COLUMN_NAME_SOURCE:
return SW_COLUMN_NAME_SOURCE // _source stays the same
default:
return columnName // Return original name for non-system columns
}
}
// isSystemColumnDisplayName checks if a column name is a system column display name
func (e *SQLEngine) isSystemColumnDisplayName(columnName string) bool {
lowerName := strings.ToLower(columnName)
return lowerName == SW_DISPLAY_NAME_TIMESTAMP ||
lowerName == SW_COLUMN_NAME_KEY ||
lowerName == SW_COLUMN_NAME_SOURCE
}
// getSystemColumnInternalName returns the internal name for a system column display name
func (e *SQLEngine) getSystemColumnInternalName(displayName string) string {
lowerName := strings.ToLower(displayName)
switch lowerName {
case SW_DISPLAY_NAME_TIMESTAMP:
return SW_COLUMN_NAME_TIMESTAMP
case SW_COLUMN_NAME_KEY:
return SW_COLUMN_NAME_KEY
case SW_COLUMN_NAME_SOURCE:
return SW_COLUMN_NAME_SOURCE
default:
return displayName // Return original name for non-system columns
}
}
// formatTimestampColumn formats a nanosecond timestamp as a proper timestamp value
func (e *SQLEngine) formatTimestampColumn(timestampNs int64) sqltypes.Value {
// Convert nanoseconds to time.Time
timestamp := time.Unix(timestampNs/1e9, timestampNs%1e9)
// Format as timestamp string in MySQL datetime format
timestampStr := timestamp.UTC().Format("2006-01-02 15:04:05")
// Return as a timestamp value using the Timestamp type
return sqltypes.MakeTrusted(sqltypes.Timestamp, []byte(timestampStr))
}
// getSystemColumnGlobalMin computes global min for system columns using file metadata
func (e *SQLEngine) getSystemColumnGlobalMin(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} {
lowerName := strings.ToLower(columnName)
switch lowerName {
case SW_COLUMN_NAME_TIMESTAMP:
// For timestamps, find the earliest timestamp across all files
// This should match what's in the Extended["min"] metadata
var minTimestamp *int64
for _, fileStats := range allFileStats {
for _, fileStat := range fileStats {
// Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet)
timestamp := e.extractTimestampFromFilename(fileStat.FileName)
if timestamp != 0 {
if minTimestamp == nil || timestamp < *minTimestamp {
minTimestamp = ×tamp
}
}
}
}
if minTimestamp != nil {
return *minTimestamp
}
case SW_COLUMN_NAME_KEY:
// For keys, we'd need to read the actual parquet column stats
// Fall back to scanning if not available in our current stats
return nil
case SW_COLUMN_NAME_SOURCE:
// Source is always "parquet_archive" for parquet files
return "parquet_archive"
}
return nil
}
// getSystemColumnGlobalMax computes global max for system columns using file metadata
func (e *SQLEngine) getSystemColumnGlobalMax(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} {
lowerName := strings.ToLower(columnName)
switch lowerName {
case SW_COLUMN_NAME_TIMESTAMP:
// For timestamps, find the latest timestamp across all files
// This should match what's in the Extended["max"] metadata
var maxTimestamp *int64
for _, fileStats := range allFileStats {
for _, fileStat := range fileStats {
// Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet)
timestamp := e.extractTimestampFromFilename(fileStat.FileName)
if timestamp != 0 {
if maxTimestamp == nil || timestamp > *maxTimestamp {
maxTimestamp = ×tamp
}
}
}
}
if maxTimestamp != nil {
return *maxTimestamp
}
case SW_COLUMN_NAME_KEY:
// For keys, we'd need to read the actual parquet column stats
// Fall back to scanning if not available in our current stats
return nil
case SW_COLUMN_NAME_SOURCE:
// Source is always "parquet_archive" for parquet files
return "parquet_archive"
}
return nil
}
|