diff options
Diffstat (limited to 'weed/query/engine/system_columns.go')
| -rw-r--r-- | weed/query/engine/system_columns.go | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/weed/query/engine/system_columns.go b/weed/query/engine/system_columns.go new file mode 100644 index 000000000..12757d4eb --- /dev/null +++ b/weed/query/engine/system_columns.go @@ -0,0 +1,159 @@ +package engine + +import ( + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" +) + +// System column constants used throughout the SQL engine +const ( + SW_COLUMN_NAME_TIMESTAMP = "_timestamp_ns" // Message timestamp in nanoseconds (internal) + SW_COLUMN_NAME_KEY = "_key" // Message key + SW_COLUMN_NAME_SOURCE = "_source" // Data source (live_log, parquet_archive, etc.) +) + +// System column display names (what users see) +const ( + SW_DISPLAY_NAME_TIMESTAMP = "_ts" // User-facing timestamp column name + // Note: _key and _source keep the same names, only _timestamp_ns changes to _ts +) + +// isSystemColumn checks if a column is a system column (_timestamp_ns, _key, _source) +func (e *SQLEngine) isSystemColumn(columnName string) bool { + lowerName := strings.ToLower(columnName) + return lowerName == SW_COLUMN_NAME_TIMESTAMP || + lowerName == SW_COLUMN_NAME_KEY || + lowerName == SW_COLUMN_NAME_SOURCE +} + +// isRegularColumn checks if a column might be a regular data column (placeholder) +func (e *SQLEngine) isRegularColumn(columnName string) bool { + // For now, assume any non-system column is a regular column + return !e.isSystemColumn(columnName) +} + +// getSystemColumnDisplayName returns the user-facing display name for system columns +func (e *SQLEngine) getSystemColumnDisplayName(columnName string) string { + lowerName := strings.ToLower(columnName) + switch lowerName { + case SW_COLUMN_NAME_TIMESTAMP: + return SW_DISPLAY_NAME_TIMESTAMP + case SW_COLUMN_NAME_KEY: + return SW_COLUMN_NAME_KEY // _key stays the same + case SW_COLUMN_NAME_SOURCE: + return SW_COLUMN_NAME_SOURCE // _source stays the same + default: + return columnName // Return original name for non-system columns + } +} + +// isSystemColumnDisplayName checks if a column name is a system column display name +func (e *SQLEngine) isSystemColumnDisplayName(columnName string) bool { + lowerName := strings.ToLower(columnName) + return lowerName == SW_DISPLAY_NAME_TIMESTAMP || + lowerName == SW_COLUMN_NAME_KEY || + lowerName == SW_COLUMN_NAME_SOURCE +} + +// getSystemColumnInternalName returns the internal name for a system column display name +func (e *SQLEngine) getSystemColumnInternalName(displayName string) string { + lowerName := strings.ToLower(displayName) + switch lowerName { + case SW_DISPLAY_NAME_TIMESTAMP: + return SW_COLUMN_NAME_TIMESTAMP + case SW_COLUMN_NAME_KEY: + return SW_COLUMN_NAME_KEY + case SW_COLUMN_NAME_SOURCE: + return SW_COLUMN_NAME_SOURCE + default: + return displayName // Return original name for non-system columns + } +} + +// formatTimestampColumn formats a nanosecond timestamp as a proper timestamp value +func (e *SQLEngine) formatTimestampColumn(timestampNs int64) sqltypes.Value { + // Convert nanoseconds to time.Time + timestamp := time.Unix(timestampNs/1e9, timestampNs%1e9) + + // Format as timestamp string in MySQL datetime format + timestampStr := timestamp.UTC().Format("2006-01-02 15:04:05") + + // Return as a timestamp value using the Timestamp type + return sqltypes.MakeTrusted(sqltypes.Timestamp, []byte(timestampStr)) +} + +// getSystemColumnGlobalMin computes global min for system columns using file metadata +func (e *SQLEngine) getSystemColumnGlobalMin(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} { + lowerName := strings.ToLower(columnName) + + switch lowerName { + case SW_COLUMN_NAME_TIMESTAMP: + // For timestamps, find the earliest timestamp across all files + // This should match what's in the Extended["min"] metadata + var minTimestamp *int64 + for _, fileStats := range allFileStats { + for _, fileStat := range fileStats { + // Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet) + timestamp := e.extractTimestampFromFilename(fileStat.FileName) + if timestamp != 0 { + if minTimestamp == nil || timestamp < *minTimestamp { + minTimestamp = ×tamp + } + } + } + } + if minTimestamp != nil { + return *minTimestamp + } + + case SW_COLUMN_NAME_KEY: + // For keys, we'd need to read the actual parquet column stats + // Fall back to scanning if not available in our current stats + return nil + + case SW_COLUMN_NAME_SOURCE: + // Source is always "parquet_archive" for parquet files + return "parquet_archive" + } + + return nil +} + +// getSystemColumnGlobalMax computes global max for system columns using file metadata +func (e *SQLEngine) getSystemColumnGlobalMax(columnName string, allFileStats map[string][]*ParquetFileStats) interface{} { + lowerName := strings.ToLower(columnName) + + switch lowerName { + case SW_COLUMN_NAME_TIMESTAMP: + // For timestamps, find the latest timestamp across all files + // This should match what's in the Extended["max"] metadata + var maxTimestamp *int64 + for _, fileStats := range allFileStats { + for _, fileStat := range fileStats { + // Extract timestamp from filename (format: YYYY-MM-DD-HH-MM-SS.parquet) + timestamp := e.extractTimestampFromFilename(fileStat.FileName) + if timestamp != 0 { + if maxTimestamp == nil || timestamp > *maxTimestamp { + maxTimestamp = ×tamp + } + } + } + } + if maxTimestamp != nil { + return *maxTimestamp + } + + case SW_COLUMN_NAME_KEY: + // For keys, we'd need to read the actual parquet column stats + // Fall back to scanning if not available in our current stats + return nil + + case SW_COLUMN_NAME_SOURCE: + // Source is always "parquet_archive" for parquet files + return "parquet_archive" + } + + return nil +} |
