aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/describe.go
blob: 415fc8e17bd5da3b2430c1a8b0aa317627ed700a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package engine

import (
	"context"
	"fmt"
	"strings"

	"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
)

// executeDescribeStatement handles DESCRIBE table commands
// Shows table schema in PostgreSQL-compatible format
func (e *SQLEngine) executeDescribeStatement(ctx context.Context, tableName string, database string) (*QueryResult, error) {
	if database == "" {
		database = e.catalog.GetCurrentDatabase()
		if database == "" {
			database = "default"
		}
	}

	// Auto-discover and register topic if not already in catalog (same logic as SELECT)
	if _, err := e.catalog.GetTableInfo(database, tableName); err != nil {
		// Topic not in catalog, try to discover and register it
		if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil {
			fmt.Printf("Warning: Failed to discover topic %s.%s: %v\n", database, tableName, regErr)
			return &QueryResult{Error: fmt.Errorf("topic %s.%s not found and auto-discovery failed: %v", database, tableName, regErr)}, regErr
		}
	}

	// Get flat schema and key columns from broker
	flatSchema, keyColumns, _, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName)
	if err != nil {
		return &QueryResult{Error: err}, err
	}

	// System columns to include in DESCRIBE output
	systemColumns := []struct {
		Name  string
		Type  string
		Extra string
	}{
		{"_ts", "TIMESTAMP", "System column: Message timestamp"},
		{"_key", "VARBINARY", "System column: Message key"},
		{"_source", "VARCHAR(255)", "System column: Data source (parquet/log)"},
	}

	// If no schema is defined, include _value field
	if flatSchema == nil {
		systemColumns = append(systemColumns, struct {
			Name  string
			Type  string
			Extra string
		}{SW_COLUMN_NAME_VALUE, "VARBINARY", "Raw message value (no schema defined)"})
	}

	// Calculate total rows: schema fields + system columns
	totalRows := len(systemColumns)
	if flatSchema != nil {
		totalRows += len(flatSchema.Fields)
	}

	// Create key column lookup map
	keyColumnMap := make(map[string]bool)
	for _, keyCol := range keyColumns {
		keyColumnMap[keyCol] = true
	}

	result := &QueryResult{
		Columns: []string{"Field", "Type", "Null", "Key", "Default", "Extra"},
		Rows:    make([][]sqltypes.Value, totalRows),
	}

	rowIndex := 0

	// Add schema fields - mark key columns appropriately
	if flatSchema != nil {
		for _, field := range flatSchema.Fields {
			sqlType := e.convertMQTypeToSQL(field.Type)
			isKey := keyColumnMap[field.Name]
			keyType := ""
			if isKey {
				keyType = "PRI" // Primary key
			}
			extra := "Data field"
			if isKey {
				extra = "Key field"
			}

			result.Rows[rowIndex] = []sqltypes.Value{
				sqltypes.NewVarChar(field.Name),
				sqltypes.NewVarChar(sqlType),
				sqltypes.NewVarChar("YES"),
				sqltypes.NewVarChar(keyType),
				sqltypes.NewVarChar("NULL"),
				sqltypes.NewVarChar(extra),
			}
			rowIndex++
		}
	}

	// Add system columns
	for _, sysCol := range systemColumns {
		result.Rows[rowIndex] = []sqltypes.Value{
			sqltypes.NewVarChar(sysCol.Name),  // Field
			sqltypes.NewVarChar(sysCol.Type),  // Type
			sqltypes.NewVarChar("YES"),        // Null
			sqltypes.NewVarChar("SYS"),        // Key - mark as system column
			sqltypes.NewVarChar("NULL"),       // Default
			sqltypes.NewVarChar(sysCol.Extra), // Extra - description
		}
		rowIndex++
	}

	return result, nil
}

// Enhanced executeShowStatementWithDescribe handles SHOW statements including DESCRIBE
func (e *SQLEngine) executeShowStatementWithDescribe(ctx context.Context, stmt *ShowStatement) (*QueryResult, error) {
	switch strings.ToUpper(stmt.Type) {
	case "DATABASES":
		return e.showDatabases(ctx)
	case "TABLES":
		// Parse FROM clause for database specification, or use current database context
		database := ""
		// Check if there's a database specified in SHOW TABLES FROM database
		if stmt.Schema != "" {
			// Use schema field if set by parser
			database = stmt.Schema
		} else {
			// Try to get from OnTable.Name with proper nil checks
			if stmt.OnTable.Name != nil {
				if nameStr := stmt.OnTable.Name.String(); nameStr != "" {
					database = nameStr
				} else {
					database = e.catalog.GetCurrentDatabase()
				}
			} else {
				database = e.catalog.GetCurrentDatabase()
			}
		}
		if database == "" {
			// Use current database context
			database = e.catalog.GetCurrentDatabase()
		}
		return e.showTables(ctx, database)
	case "COLUMNS":
		// SHOW COLUMNS FROM table is equivalent to DESCRIBE
		var tableName, database string

		// Safely extract table name and database with proper nil checks
		if stmt.OnTable.Name != nil {
			tableName = stmt.OnTable.Name.String()
			if stmt.OnTable.Qualifier != nil {
				database = stmt.OnTable.Qualifier.String()
			}
		}

		if tableName != "" {
			return e.executeDescribeStatement(ctx, tableName, database)
		}
		fallthrough
	default:
		err := fmt.Errorf("unsupported SHOW statement: %s", stmt.Type)
		return &QueryResult{Error: err}, err
	}
}