aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/describe.go
blob: 3a26bb2a6d1e52866ba90a5c6c9c1064c149925d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package engine

import (
	"context"
	"fmt"
	"strings"

	"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
)

// executeDescribeStatement handles DESCRIBE table commands
// Shows table schema in PostgreSQL-compatible format
func (e *SQLEngine) executeDescribeStatement(ctx context.Context, tableName string, database string) (*QueryResult, error) {
	if database == "" {
		database = e.catalog.GetCurrentDatabase()
		if database == "" {
			database = "default"
		}
	}

	// Auto-discover and register topic if not already in catalog (same logic as SELECT)
	if _, err := e.catalog.GetTableInfo(database, tableName); err != nil {
		// Topic not in catalog, try to discover and register it
		if regErr := e.discoverAndRegisterTopic(ctx, database, tableName); regErr != nil {
			fmt.Printf("Warning: Failed to discover topic %s.%s: %v\n", database, tableName, regErr)
			return &QueryResult{Error: fmt.Errorf("topic %s.%s not found and auto-discovery failed: %v", database, tableName, regErr)}, regErr
		}
	}

	// Get topic schema from broker
	recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName)
	if err != nil {
		return &QueryResult{Error: err}, err
	}

	// System columns to include in DESCRIBE output
	systemColumns := []struct {
		Name  string
		Type  string
		Extra string
	}{
		{"_ts", "TIMESTAMP", "System column: Message timestamp"},
		{"_key", "VARBINARY", "System column: Message key"},
		{"_source", "VARCHAR(255)", "System column: Data source (parquet/log)"},
	}

	// Format schema as DESCRIBE output (regular fields + system columns)
	totalRows := len(recordType.Fields) + len(systemColumns)
	result := &QueryResult{
		Columns: []string{"Field", "Type", "Null", "Key", "Default", "Extra"},
		Rows:    make([][]sqltypes.Value, totalRows),
	}

	// Add regular fields
	for i, field := range recordType.Fields {
		sqlType := e.convertMQTypeToSQL(field.Type)

		result.Rows[i] = []sqltypes.Value{
			sqltypes.NewVarChar(field.Name), // Field
			sqltypes.NewVarChar(sqlType),    // Type
			sqltypes.NewVarChar("YES"),      // Null (assume nullable)
			sqltypes.NewVarChar(""),         // Key (no keys for now)
			sqltypes.NewVarChar("NULL"),     // Default
			sqltypes.NewVarChar(""),         // Extra
		}
	}

	// Add system columns
	for i, sysCol := range systemColumns {
		rowIndex := len(recordType.Fields) + i
		result.Rows[rowIndex] = []sqltypes.Value{
			sqltypes.NewVarChar(sysCol.Name),  // Field
			sqltypes.NewVarChar(sysCol.Type),  // Type
			sqltypes.NewVarChar("YES"),        // Null
			sqltypes.NewVarChar(""),           // Key
			sqltypes.NewVarChar("NULL"),       // Default
			sqltypes.NewVarChar(sysCol.Extra), // Extra - description
		}
	}

	return result, nil
}

// Enhanced executeShowStatementWithDescribe handles SHOW statements including DESCRIBE
func (e *SQLEngine) executeShowStatementWithDescribe(ctx context.Context, stmt *ShowStatement) (*QueryResult, error) {
	switch strings.ToUpper(stmt.Type) {
	case "DATABASES":
		return e.showDatabases(ctx)
	case "TABLES":
		// Parse FROM clause for database specification, or use current database context
		database := ""
		// Check if there's a database specified in SHOW TABLES FROM database
		if stmt.Schema != "" {
			// Use schema field if set by parser
			database = stmt.Schema
		} else {
			// Try to get from OnTable.Name with proper nil checks
			if stmt.OnTable.Name != nil {
				if nameStr := stmt.OnTable.Name.String(); nameStr != "" {
					database = nameStr
				} else {
					database = e.catalog.GetCurrentDatabase()
				}
			} else {
				database = e.catalog.GetCurrentDatabase()
			}
		}
		if database == "" {
			// Use current database context
			database = e.catalog.GetCurrentDatabase()
		}
		return e.showTables(ctx, database)
	case "COLUMNS":
		// SHOW COLUMNS FROM table is equivalent to DESCRIBE
		var tableName, database string

		// Safely extract table name and database with proper nil checks
		if stmt.OnTable.Name != nil {
			tableName = stmt.OnTable.Name.String()
			if stmt.OnTable.Qualifier != nil {
				database = stmt.OnTable.Qualifier.String()
			}
		}

		if tableName != "" {
			return e.executeDescribeStatement(ctx, tableName, database)
		}
		fallthrough
	default:
		err := fmt.Errorf("unsupported SHOW statement: %s", stmt.Type)
		return &QueryResult{Error: err}, err
	}
}