diff options
Diffstat (limited to 'weed/command/sql.go')
| -rw-r--r-- | weed/command/sql.go | 595 |
1 files changed, 595 insertions, 0 deletions
diff --git a/weed/command/sql.go b/weed/command/sql.go new file mode 100644 index 000000000..adc2ad52b --- /dev/null +++ b/weed/command/sql.go @@ -0,0 +1,595 @@ +package command + +import ( + "context" + "encoding/csv" + "encoding/json" + "fmt" + "io" + "os" + "path" + "strings" + "time" + + "github.com/peterh/liner" + "github.com/seaweedfs/seaweedfs/weed/query/engine" + "github.com/seaweedfs/seaweedfs/weed/util/grace" + "github.com/seaweedfs/seaweedfs/weed/util/sqlutil" +) + +func init() { + cmdSql.Run = runSql +} + +var cmdSql = &Command{ + UsageLine: "sql [-master=localhost:9333] [-interactive] [-file=query.sql] [-output=table|json|csv] [-database=dbname] [-query=\"SQL\"]", + Short: "advanced SQL query interface for SeaweedFS MQ topics with multiple execution modes", + Long: `Enhanced SQL interface for SeaweedFS Message Queue topics with multiple execution modes. + +Execution Modes: +- Interactive shell (default): weed sql -interactive +- Single query: weed sql -query "SELECT * FROM user_events" +- Batch from file: weed sql -file queries.sql +- Context switching: weed sql -database analytics -interactive + +Output Formats: +- table: ASCII table format (default for interactive) +- json: JSON format (default for non-interactive) +- csv: Comma-separated values + +Features: +- Full WHERE clause support (=, <, >, <=, >=, !=, LIKE, IN) +- Advanced pattern matching with LIKE wildcards (%, _) +- Multi-value filtering with IN operator +- Real MQ namespace and topic discovery +- Database context switching + +Examples: + weed sql -interactive + weed sql -query "SHOW DATABASES" -output json + weed sql -file batch_queries.sql -output csv + weed sql -database analytics -query "SELECT COUNT(*) FROM metrics" + weed sql -master broker1:9333 -interactive +`, +} + +var ( + sqlMaster = cmdSql.Flag.String("master", "localhost:9333", "SeaweedFS master server HTTP address") + sqlInteractive = cmdSql.Flag.Bool("interactive", false, "start interactive shell mode") + sqlFile = cmdSql.Flag.String("file", "", "execute SQL queries from file") + sqlOutput = cmdSql.Flag.String("output", "", "output format: table, json, csv (auto-detected if not specified)") + sqlDatabase = cmdSql.Flag.String("database", "", "default database context") + sqlQuery = cmdSql.Flag.String("query", "", "execute single SQL query") +) + +// OutputFormat represents different output formatting options +type OutputFormat string + +const ( + OutputTable OutputFormat = "table" + OutputJSON OutputFormat = "json" + OutputCSV OutputFormat = "csv" +) + +// SQLContext holds the execution context for SQL operations +type SQLContext struct { + engine *engine.SQLEngine + currentDatabase string + outputFormat OutputFormat + interactive bool +} + +func runSql(command *Command, args []string) bool { + // Initialize SQL engine with master address for service discovery + sqlEngine := engine.NewSQLEngine(*sqlMaster) + + // Determine execution mode and output format + interactive := *sqlInteractive || (*sqlQuery == "" && *sqlFile == "") + outputFormat := determineOutputFormat(*sqlOutput, interactive) + + // Create SQL context + ctx := &SQLContext{ + engine: sqlEngine, + currentDatabase: *sqlDatabase, + outputFormat: outputFormat, + interactive: interactive, + } + + // Set current database in SQL engine if specified via command line + if *sqlDatabase != "" { + ctx.engine.GetCatalog().SetCurrentDatabase(*sqlDatabase) + } + + // Execute based on mode + switch { + case *sqlQuery != "": + // Single query mode + return executeSingleQuery(ctx, *sqlQuery) + case *sqlFile != "": + // Batch file mode + return executeFileQueries(ctx, *sqlFile) + default: + // Interactive mode + return runInteractiveShell(ctx) + } +} + +// determineOutputFormat selects the appropriate output format +func determineOutputFormat(specified string, interactive bool) OutputFormat { + switch strings.ToLower(specified) { + case "table": + return OutputTable + case "json": + return OutputJSON + case "csv": + return OutputCSV + default: + // Auto-detect based on mode + if interactive { + return OutputTable + } + return OutputJSON + } +} + +// executeSingleQuery executes a single query and outputs the result +func executeSingleQuery(ctx *SQLContext, query string) bool { + if ctx.outputFormat != OutputTable { + // Suppress banner for non-interactive output + return executeAndDisplay(ctx, query, false) + } + + fmt.Printf("Executing query against %s...\n", *sqlMaster) + return executeAndDisplay(ctx, query, true) +} + +// executeFileQueries processes SQL queries from a file +func executeFileQueries(ctx *SQLContext, filename string) bool { + content, err := os.ReadFile(filename) + if err != nil { + fmt.Printf("Error reading file %s: %v\n", filename, err) + return false + } + + if ctx.outputFormat == OutputTable && ctx.interactive { + fmt.Printf("Executing queries from %s against %s...\n", filename, *sqlMaster) + } + + // Split file content into individual queries (robust approach) + queries := sqlutil.SplitStatements(string(content)) + + for i, query := range queries { + query = strings.TrimSpace(query) + if query == "" { + continue + } + + if ctx.outputFormat == OutputTable && len(queries) > 1 { + fmt.Printf("\n--- Query %d ---\n", i+1) + } + + if !executeAndDisplay(ctx, query, ctx.outputFormat == OutputTable) { + return false + } + } + + return true +} + +// runInteractiveShell starts the enhanced interactive shell with readline support +func runInteractiveShell(ctx *SQLContext) bool { + fmt.Println("SeaweedFS Enhanced SQL Interface") + fmt.Println("Type 'help;' for help, 'exit;' to quit") + fmt.Printf("Connected to master: %s\n", *sqlMaster) + if ctx.currentDatabase != "" { + fmt.Printf("Current database: %s\n", ctx.currentDatabase) + } + fmt.Println("Advanced WHERE operators supported: <=, >=, !=, LIKE, IN") + fmt.Println("Use up/down arrows for command history") + fmt.Println() + + // Initialize liner for readline functionality + line := liner.NewLiner() + defer line.Close() + + // Handle Ctrl+C gracefully + line.SetCtrlCAborts(true) + grace.OnInterrupt(func() { + line.Close() + }) + + // Load command history + historyPath := path.Join(os.TempDir(), "weed-sql-history") + if f, err := os.Open(historyPath); err == nil { + line.ReadHistory(f) + f.Close() + } + + // Save history on exit + defer func() { + if f, err := os.Create(historyPath); err == nil { + line.WriteHistory(f) + f.Close() + } + }() + + var queryBuffer strings.Builder + + for { + // Show prompt with current database context + var prompt string + if queryBuffer.Len() == 0 { + if ctx.currentDatabase != "" { + prompt = fmt.Sprintf("seaweedfs:%s> ", ctx.currentDatabase) + } else { + prompt = "seaweedfs> " + } + } else { + prompt = " -> " // Continuation prompt + } + + // Read line with readline support + input, err := line.Prompt(prompt) + if err != nil { + if err == liner.ErrPromptAborted { + fmt.Println("Query cancelled") + queryBuffer.Reset() + continue + } + if err != io.EOF { + fmt.Printf("Input error: %v\n", err) + } + break + } + + lineStr := strings.TrimSpace(input) + + // Handle empty lines + if lineStr == "" { + continue + } + + // Accumulate lines in query buffer + if queryBuffer.Len() > 0 { + queryBuffer.WriteString(" ") + } + queryBuffer.WriteString(lineStr) + + // Check if we have a complete statement (ends with semicolon or special command) + fullQuery := strings.TrimSpace(queryBuffer.String()) + isComplete := strings.HasSuffix(lineStr, ";") || + isSpecialCommand(fullQuery) + + if !isComplete { + continue // Continue reading more lines + } + + // Add completed command to history + line.AppendHistory(fullQuery) + + // Handle special commands (with or without semicolon) + cleanQuery := strings.TrimSuffix(fullQuery, ";") + cleanQuery = strings.TrimSpace(cleanQuery) + + if cleanQuery == "exit" || cleanQuery == "quit" || cleanQuery == "\\q" { + fmt.Println("Goodbye!") + break + } + + if cleanQuery == "help" { + showEnhancedHelp() + queryBuffer.Reset() + continue + } + + // Handle database switching - use proper SQL parser instead of manual parsing + if strings.HasPrefix(strings.ToUpper(cleanQuery), "USE ") { + // Execute USE statement through the SQL engine for proper parsing + result, err := ctx.engine.ExecuteSQL(context.Background(), cleanQuery) + if err != nil { + fmt.Printf("Error: %v\n\n", err) + } else if result.Error != nil { + fmt.Printf("Error: %v\n\n", result.Error) + } else { + // Extract the database name from the result message for CLI context + if len(result.Rows) > 0 && len(result.Rows[0]) > 0 { + message := result.Rows[0][0].ToString() + // Extract database name from "Database changed to: dbname" + if strings.HasPrefix(message, "Database changed to: ") { + ctx.currentDatabase = strings.TrimPrefix(message, "Database changed to: ") + } + fmt.Printf("%s\n\n", message) + } + } + queryBuffer.Reset() + continue + } + + // Handle output format switching + if strings.HasPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ") { + format := strings.TrimSpace(strings.TrimPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ")) + switch format { + case "TABLE": + ctx.outputFormat = OutputTable + fmt.Println("Output format set to: table") + case "JSON": + ctx.outputFormat = OutputJSON + fmt.Println("Output format set to: json") + case "CSV": + ctx.outputFormat = OutputCSV + fmt.Println("Output format set to: csv") + default: + fmt.Printf("Invalid format: %s. Supported: table, json, csv\n", format) + } + queryBuffer.Reset() + continue + } + + // Execute SQL query (without semicolon) + executeAndDisplay(ctx, cleanQuery, true) + + // Reset buffer for next query + queryBuffer.Reset() + } + + return true +} + +// isSpecialCommand checks if a command is a special command that doesn't require semicolon +func isSpecialCommand(query string) bool { + cleanQuery := strings.TrimSuffix(strings.TrimSpace(query), ";") + cleanQuery = strings.ToLower(cleanQuery) + + // Special commands that work with or without semicolon + specialCommands := []string{ + "exit", "quit", "\\q", "help", + } + + for _, cmd := range specialCommands { + if cleanQuery == cmd { + return true + } + } + + // Commands that are exactly specific commands (not just prefixes) + parts := strings.Fields(strings.ToUpper(cleanQuery)) + if len(parts) == 0 { + return false + } + return (parts[0] == "USE" && len(parts) >= 2) || + strings.HasPrefix(strings.ToUpper(cleanQuery), "\\FORMAT ") +} + +// executeAndDisplay executes a query and displays the result in the specified format +func executeAndDisplay(ctx *SQLContext, query string, showTiming bool) bool { + startTime := time.Now() + + // Execute the query + execCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + result, err := ctx.engine.ExecuteSQL(execCtx, query) + if err != nil { + if ctx.outputFormat == OutputJSON { + errorResult := map[string]interface{}{ + "error": err.Error(), + "query": query, + } + jsonBytes, _ := json.MarshalIndent(errorResult, "", " ") + fmt.Println(string(jsonBytes)) + } else { + fmt.Printf("Error: %v\n", err) + } + return false + } + + if result.Error != nil { + if ctx.outputFormat == OutputJSON { + errorResult := map[string]interface{}{ + "error": result.Error.Error(), + "query": query, + } + jsonBytes, _ := json.MarshalIndent(errorResult, "", " ") + fmt.Println(string(jsonBytes)) + } else { + fmt.Printf("Query Error: %v\n", result.Error) + } + return false + } + + // Display results in the specified format + switch ctx.outputFormat { + case OutputTable: + displayTableResult(result) + case OutputJSON: + displayJSONResult(result) + case OutputCSV: + displayCSVResult(result) + } + + // Show execution time for interactive/table mode + if showTiming && ctx.outputFormat == OutputTable { + elapsed := time.Since(startTime) + fmt.Printf("\n(%d rows in set, %.3f sec)\n\n", len(result.Rows), elapsed.Seconds()) + } + + return true +} + +// displayTableResult formats and displays query results in ASCII table format +func displayTableResult(result *engine.QueryResult) { + if len(result.Columns) == 0 { + fmt.Println("Empty result set") + return + } + + // Calculate column widths for formatting + colWidths := make([]int, len(result.Columns)) + for i, col := range result.Columns { + colWidths[i] = len(col) + } + + // Check data for wider columns + for _, row := range result.Rows { + for i, val := range row { + if i < len(colWidths) { + valStr := val.ToString() + if len(valStr) > colWidths[i] { + colWidths[i] = len(valStr) + } + } + } + } + + // Print header separator + fmt.Print("+") + for _, width := range colWidths { + fmt.Print(strings.Repeat("-", width+2) + "+") + } + fmt.Println() + + // Print column headers + fmt.Print("|") + for i, col := range result.Columns { + fmt.Printf(" %-*s |", colWidths[i], col) + } + fmt.Println() + + // Print separator + fmt.Print("+") + for _, width := range colWidths { + fmt.Print(strings.Repeat("-", width+2) + "+") + } + fmt.Println() + + // Print data rows + for _, row := range result.Rows { + fmt.Print("|") + for i, val := range row { + if i < len(colWidths) { + fmt.Printf(" %-*s |", colWidths[i], val.ToString()) + } + } + fmt.Println() + } + + // Print bottom separator + fmt.Print("+") + for _, width := range colWidths { + fmt.Print(strings.Repeat("-", width+2) + "+") + } + fmt.Println() +} + +// displayJSONResult outputs query results in JSON format +func displayJSONResult(result *engine.QueryResult) { + // Convert result to JSON-friendly format + jsonResult := map[string]interface{}{ + "columns": result.Columns, + "rows": make([]map[string]interface{}, len(result.Rows)), + "count": len(result.Rows), + } + + // Convert rows to JSON objects + for i, row := range result.Rows { + rowObj := make(map[string]interface{}) + for j, val := range row { + if j < len(result.Columns) { + rowObj[result.Columns[j]] = val.ToString() + } + } + jsonResult["rows"].([]map[string]interface{})[i] = rowObj + } + + // Marshal and print JSON + jsonBytes, err := json.MarshalIndent(jsonResult, "", " ") + if err != nil { + fmt.Printf("Error formatting JSON: %v\n", err) + return + } + + fmt.Println(string(jsonBytes)) +} + +// displayCSVResult outputs query results in CSV format +func displayCSVResult(result *engine.QueryResult) { + // Handle execution plan results specially to avoid CSV quoting issues + if len(result.Columns) == 1 && result.Columns[0] == "Query Execution Plan" { + // For execution plans, output directly without CSV encoding to avoid quotes + for _, row := range result.Rows { + if len(row) > 0 { + fmt.Println(row[0].ToString()) + } + } + return + } + + // Standard CSV output for regular query results + writer := csv.NewWriter(os.Stdout) + defer writer.Flush() + + // Write headers + if err := writer.Write(result.Columns); err != nil { + fmt.Printf("Error writing CSV headers: %v\n", err) + return + } + + // Write data rows + for _, row := range result.Rows { + csvRow := make([]string, len(row)) + for i, val := range row { + csvRow[i] = val.ToString() + } + if err := writer.Write(csvRow); err != nil { + fmt.Printf("Error writing CSV row: %v\n", err) + return + } + } +} + +func showEnhancedHelp() { + fmt.Println(`SeaweedFS Enhanced SQL Interface Help: + +METADATA OPERATIONS: + SHOW DATABASES; - List all MQ namespaces + SHOW TABLES; - List all topics in current namespace + SHOW TABLES FROM database; - List topics in specific namespace + DESCRIBE table_name; - Show table schema + +ADVANCED QUERYING: + SELECT * FROM table_name; - Query all data + SELECT col1, col2 FROM table WHERE ...; - Column projection + SELECT * FROM table WHERE id <= 100; - Range filtering + SELECT * FROM table WHERE name LIKE 'admin%'; - Pattern matching + SELECT * FROM table WHERE status IN ('active', 'pending'); - Multi-value + SELECT COUNT(*), MAX(id), MIN(id) FROM ...; - Aggregation functions + +QUERY ANALYSIS: + EXPLAIN SELECT ...; - Show hierarchical execution plan + (data sources, optimizations, timing) + +DDL OPERATIONS: + CREATE TABLE topic (field1 INT, field2 STRING); - Create topic + Note: ALTER TABLE and DROP TABLE are not supported + +SPECIAL COMMANDS: + USE database_name; - Switch database context + \format table|json|csv - Change output format + help; - Show this help + exit; or quit; or \q - Exit interface + +EXTENDED WHERE OPERATORS: + =, <, >, <=, >= - Comparison operators + !=, <> - Not equal operators + LIKE 'pattern%' - Pattern matching (% = any chars, _ = single char) + IN (value1, value2, ...) - Multi-value matching + AND, OR - Logical operators + +EXAMPLES: + SELECT * FROM user_events WHERE user_id >= 10 AND status != 'deleted'; + SELECT username FROM users WHERE email LIKE '%@company.com'; + SELECT * FROM logs WHERE level IN ('error', 'warning') AND timestamp >= '2023-01-01'; + EXPLAIN SELECT MAX(id) FROM events; -- View execution plan + +Current Status: Full WHERE clause support + Real MQ integration`) +} |
