aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/system_columns.go
blob: 12757d4ebf5c1a77b650b8cfe61ad910d42bc707 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package engine

import (
	"strings"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
)

// System column constants used throughout the SQL engine
const (
	SW_COLUMN_NAME_TIMESTAMP = "_timestamp_ns" // Message timestamp in nanoseconds (internal)
	SW_COLUMN_NAME_KEY       = "_key"          // Message key
	SW_COLUMN_NAME_SOURCE    = "_source"       // Data source (live_log, parquet_archive, etc.)
)

// System column display names (what users see)
const (
	SW_DISPLAY_NAME_TIMESTAMP = "_ts" // User-facing timestamp column name
	// Note: _key and _source keep the same names, only _timestamp_ns changes to _ts
)

// isSystemColumn checks if a column is a system column (_timestamp_ns, _key, _source)
func (e *SQLEngine) isSystemColumn(columnName string) bool {
	lowerName := strings.ToLower(columnName)
	return lowerName == SW_COLUMN_NAME_TIMESTAMP ||
		lowerName == SW_COLUMN_NAME_KEY ||
		lowerName == SW_COLUMN_NAME_SOURCE
}

// isRegularColumn checks if a column might be a regular data column (placeholder)
func (e *SQLEngine) isRegularColumn(columnName string) bool {
	// For now, assume any non-system column is a regular column
	return !e.isSystemColumn(columnName)
}

// getSystemColumnDisplayName returns the user-facing display name for system columns
func (e *SQLEngine) getSystemColumnDisplayName(columnName string) string {
	lowerName := strings.ToLower(columnName)
	switch lowerName {
	case SW_COLUMN_NAME_TIMESTAMP:
		return SW_DISPLAY_NAME_TIMESTAMP
	case SW_COLUMN_NAME_KEY:
		return SW_COLUMN_NAME_KEY // _key stays the same
	case SW_COLUMN_NAME_SOURCE:
		return SW_COLUMN_NAME_SOURCE // _source stays the same
	default:
		return columnName // Return original name for non-system columns
	}
}

// isSystemColumnDisplayName checks if a column name is a system column display name
func (e *SQLEngine) isSystemColumnDisplayName(columnName string) bool {
	lowerName := strings.ToLower(columnName)
	return lowerName == SW_DISPLAY_NAME_TIMESTAMP ||
		lowerName == SW_COLUMN_NAME_KEY ||
		lowerName == SW_COLUMN_NAME_SOURCE
}

// getSystemColumnInternalName returns the internal name for a system column display name
func (e *SQLEngine) getSystemColumnInternalName(displayName string) string {
	lowerName := strings.ToLower(displayName)
	switch lowerName {
	case SW_DISPLAY_NAME_TIMESTAMP:
		return SW_COLUMN_NAME_TIMESTAMP
	case SW_COLUMN_NAME_KEY:
		return SW_COLUMN_NAME_KEY
	case SW_COLUMN_NAME_SOURCE:
		return SW_COLUMN_NAME_SOURCE
	default:
		return displayName // Return original name for non-system columns
	}
}

// formatTimestampColumn formats a nanosecond timestamp as a proper timestamp value
func (e *SQLEngine) formatTimestampColumn(timestampNs int64) sqltypes.Value {
	// Convert nanoseconds to time.Time
	timestamp := time.Unix(timestampNs/1e9, timestampNs%1e9)

	// Format as timestamp string in MySQL datetime format
	timestampStr := timestamp.UTC().Format("2006-01-02 15:04:05")

	// Return as a timestamp value using the Timestamp type
	return sqltypes.MakeTrusted(sqltypes.Timestamp, []byte(timestampStr))
}

// getSystemColumnGlobalMin computes global min for system columns using file metadata
func (e *SQLEngine) getSystemColumnGlobalMin(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} {
	lowerName := strings.ToLower(columnName)

	switch lowerName {
	case SW_COLUMN_NAME_TIMESTAMP:
		// For timestamps, find the earliest timestamp across all files
		// This should match what's in the Extended["min"] metadata
		var minTimestamp *int64
		for _, fileStats := range allFileStats {
			for _, fileStat := range fileStats {
				// Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet)
				timestamp := e.extractTimestampFromFilename(fileStat.FileName)
				if timestamp != 0 {
					if minTimestamp == nil || timestamp < *minTimestamp {
						minTimestamp = &timestamp
					}
				}
			}
		}
		if minTimestamp != nil {
			return *minTimestamp
		}

	case SW_COLUMN_NAME_KEY:
		// For keys, we'd need to read the actual parquet column stats
		// Fall back to scanning if not available in our current stats
		return nil

	case SW_COLUMN_NAME_SOURCE:
		// Source is always "parquet_archive" for parquet files
		return "parquet_archive"
	}

	return nil
}

// getSystemColumnGlobalMax computes global max for system columns using file metadata
func (e *SQLEngine) getSystemColumnGlobalMax(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} {
	lowerName := strings.ToLower(columnName)

	switch lowerName {
	case SW_COLUMN_NAME_TIMESTAMP:
		// For timestamps, find the latest timestamp across all files
		// This should match what's in the Extended["max"] metadata
		var maxTimestamp *int64
		for _, fileStats := range allFileStats {
			for _, fileStat := range fileStats {
				// Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet)
				timestamp := e.extractTimestampFromFilename(fileStat.FileName)
				if timestamp != 0 {
					if maxTimestamp == nil || timestamp > *maxTimestamp {
						maxTimestamp = &timestamp
					}
				}
			}
		}
		if maxTimestamp != nil {
			return *maxTimestamp
		}

	case SW_COLUMN_NAME_KEY:
		// For keys, we'd need to read the actual parquet column stats
		// Fall back to scanning if not available in our current stats
		return nil

	case SW_COLUMN_NAME_SOURCE:
		// Source is always "parquet_archive" for parquet files
		return "parquet_archive"
	}

	return nil
}