aboutsummaryrefslogtreecommitdiff
path: root/weed/command/db.go
blob: a521da093a1bcbb3503e04a7f035f8f102755a6e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
package command

import (
	"context"
	"crypto/tls"
	"encoding/json"
	"fmt"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/server/postgres"
	"github.com/seaweedfs/seaweedfs/weed/util"
)

var (
	dbOptions DBOptions
)

type DBOptions struct {
	host        *string
	port        *int
	masterAddr  *string
	authMethod  *string
	users       *string
	database    *string
	maxConns    *int
	idleTimeout *string
	tlsCert     *string
	tlsKey      *string
}

func init() {
	cmdDB.Run = runDB // break init cycle
	dbOptions.host = cmdDB.Flag.String("host", "localhost", "Database server host")
	dbOptions.port = cmdDB.Flag.Int("port", 5432, "Database server port")
	dbOptions.masterAddr = cmdDB.Flag.String("master", "localhost:9333", "SeaweedFS master server address")
	dbOptions.authMethod = cmdDB.Flag.String("auth", "trust", "Authentication method: trust, password, md5")
	dbOptions.users = cmdDB.Flag.String("users", "", "User credentials for auth (JSON format '{\"user1\":\"pass1\",\"user2\":\"pass2\"}' or file '@/path/to/users.json')")
	dbOptions.database = cmdDB.Flag.String("database", "default", "Default database name")
	dbOptions.maxConns = cmdDB.Flag.Int("max-connections", 100, "Maximum concurrent connections per server")
	dbOptions.idleTimeout = cmdDB.Flag.String("idle-timeout", "1h", "Connection idle timeout")
	dbOptions.tlsCert = cmdDB.Flag.String("tls-cert", "", "TLS certificate file path")
	dbOptions.tlsKey = cmdDB.Flag.String("tls-key", "", "TLS private key file path")
}

var cmdDB = &Command{
	UsageLine: "db -port=5432 -master=<master_server>",
	Short:     "start a PostgreSQL-compatible database server for SQL queries",
	Long: `Start a PostgreSQL wire protocol compatible database server that provides SQL query access to SeaweedFS.

This database server enables any PostgreSQL client, tool, or application to connect to SeaweedFS
and execute SQL queries against MQ topics. It implements the PostgreSQL wire protocol for maximum
compatibility with the existing PostgreSQL ecosystem.

Examples:

	# Start database server on default port 5432
	weed db
	
	# Start with MD5 authentication using JSON format (recommended)
	weed db -auth=md5 -users='{"admin":"secret","readonly":"view123"}'
	
	# Start with complex passwords using JSON format
	weed db -auth=md5 -users='{"admin":"pass;with;semicolons","user":"password:with:colons"}'
	
	# Start with credentials from JSON file (most secure)
	weed db -auth=md5 -users="@/etc/seaweedfs/users.json"
	
	# Start with custom port and master
	weed db -port=5433 -master=master1:9333
	
	# Allow connections from any host
	weed db -host=0.0.0.0 -port=5432
	
	# Start with TLS encryption
	weed db -tls-cert=server.crt -tls-key=server.key

Client Connection Examples:

	# psql command line client
	psql "host=localhost port=5432 dbname=default user=seaweedfs"
	psql -h localhost -p 5432 -U seaweedfs -d default
	
	# With password
	PGPASSWORD=secret psql -h localhost -p 5432 -U admin -d default
	
	# Connection string
	psql "postgresql://admin:secret@localhost:5432/default"

Programming Language Examples:

	# Python (psycopg2)
	import psycopg2
	conn = psycopg2.connect(
		host="localhost", port=5432, 
		user="seaweedfs", database="default"
	)
	
	# Java JDBC
	String url = "jdbc:postgresql://localhost:5432/default";
	Connection conn = DriverManager.getConnection(url, "seaweedfs", "");
	
	# Go (lib/pq)
	db, err := sql.Open("postgres", "host=localhost port=5432 user=seaweedfs dbname=default sslmode=disable")
	
	# Node.js (pg)
	const client = new Client({
		host: 'localhost', port: 5432,
		user: 'seaweedfs', database: 'default'
	});

Supported SQL Operations:
	- SELECT queries on MQ topics
	- DESCRIBE/DESC table_name commands
	- EXPLAIN query execution plans
	- SHOW DATABASES/TABLES commands  
	- Aggregation functions (COUNT, SUM, AVG, MIN, MAX)
	- WHERE clauses with filtering
	- System columns (_timestamp_ns, _key, _source)
	- Basic PostgreSQL system queries (version(), current_database(), current_user)

Authentication Methods:
	- trust: No authentication required (default)
	- password: Clear text password authentication
	- md5: MD5 password authentication

User Credential Formats:
	- JSON format: '{"user1":"pass1","user2":"pass2"}' (supports any special characters)
	- File format: "@/path/to/users.json" (JSON file)
	
	Note: JSON format supports passwords with semicolons, colons, and any other special characters.
	      File format is recommended for production to keep credentials secure.

Compatible Tools:
	- psql (PostgreSQL command line client)
	- Any PostgreSQL JDBC/ODBC compatible tool

Security Features:
	- Multiple authentication methods
	- TLS encryption support
	- Read-only access (no data modification)

Performance Features:
	- Fast path aggregation optimization (COUNT, MIN, MAX without WHERE clauses)
	- Hybrid data scanning (parquet files + live logs)
	- PostgreSQL wire protocol
	- Query result streaming

`,
}

func runDB(cmd *Command, args []string) bool {

	util.LoadConfiguration("security", false)

	// Validate options
	if *dbOptions.masterAddr == "" {
		fmt.Fprintf(os.Stderr, "Error: master address is required\n")
		return false
	}

	// Parse authentication method
	authMethod, err := parseAuthMethod(*dbOptions.authMethod)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Error: %v\n", err)
		return false
	}

	// Parse user credentials
	users, err := parseUsers(*dbOptions.users, authMethod)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Error: %v\n", err)
		return false
	}

	// Parse idle timeout
	idleTimeout, err := time.ParseDuration(*dbOptions.idleTimeout)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Error parsing idle timeout: %v\n", err)
		return false
	}

	// Validate port number
	if err := validatePortNumber(*dbOptions.port); err != nil {
		fmt.Fprintf(os.Stderr, "Error: %v\n", err)
		return false
	}

	// Setup TLS if requested
	var tlsConfig *tls.Config
	if *dbOptions.tlsCert != "" && *dbOptions.tlsKey != "" {
		cert, err := tls.LoadX509KeyPair(*dbOptions.tlsCert, *dbOptions.tlsKey)
		if err != nil {
			fmt.Fprintf(os.Stderr, "Error loading TLS certificates: %v\n", err)
			return false
		}
		tlsConfig = &tls.Config{
			Certificates: []tls.Certificate{cert},
		}
	}

	// Create server configuration
	config := &postgres.PostgreSQLServerConfig{
		Host:        *dbOptions.host,
		Port:        *dbOptions.port,
		AuthMethod:  authMethod,
		Users:       users,
		Database:    *dbOptions.database,
		MaxConns:    *dbOptions.maxConns,
		IdleTimeout: idleTimeout,
		TLSConfig:   tlsConfig,
	}

	// Create database server
	dbServer, err := postgres.NewPostgreSQLServer(config, *dbOptions.masterAddr)
	if err != nil {
		fmt.Fprintf(os.Stderr, "Error creating database server: %v\n", err)
		return false
	}

	// Print startup information
	fmt.Printf("Starting SeaweedFS Database Server...\n")
	fmt.Printf("Host: %s\n", *dbOptions.host)
	fmt.Printf("Port: %d\n", *dbOptions.port)
	fmt.Printf("Master: %s\n", *dbOptions.masterAddr)
	fmt.Printf("Database: %s\n", *dbOptions.database)
	fmt.Printf("Auth Method: %s\n", *dbOptions.authMethod)
	fmt.Printf("Max Connections: %d\n", *dbOptions.maxConns)
	fmt.Printf("Idle Timeout: %s\n", *dbOptions.idleTimeout)
	if tlsConfig != nil {
		fmt.Printf("TLS: Enabled\n")
	} else {
		fmt.Printf("TLS: Disabled\n")
	}
	if len(users) > 0 {
		fmt.Printf("Users: %d configured\n", len(users))
	}

	fmt.Printf("\nDatabase Connection Examples:\n")
	fmt.Printf("  psql -h %s -p %d -U seaweedfs -d %s\n", *dbOptions.host, *dbOptions.port, *dbOptions.database)
	if len(users) > 0 {
		// Show first user as example
		for username := range users {
			fmt.Printf("  psql -h %s -p %d -U %s -d %s\n", *dbOptions.host, *dbOptions.port, username, *dbOptions.database)
			break
		}
	}
	fmt.Printf("  postgresql://%s:%d/%s\n", *dbOptions.host, *dbOptions.port, *dbOptions.database)

	fmt.Printf("\nSupported Operations:\n")
	fmt.Printf("  - SELECT queries on MQ topics\n")
	fmt.Printf("  - DESCRIBE/DESC table_name\n")
	fmt.Printf("  - EXPLAIN query execution plans\n")
	fmt.Printf("  - SHOW DATABASES/TABLES\n")
	fmt.Printf("  - Aggregations: COUNT, SUM, AVG, MIN, MAX\n")
	fmt.Printf("  - System columns: _timestamp_ns, _key, _source\n")
	fmt.Printf("  - Basic PostgreSQL system queries\n")

	fmt.Printf("\nReady for database connections!\n\n")

	// Start the server
	err = dbServer.Start()
	if err != nil {
		fmt.Fprintf(os.Stderr, "Error starting database server: %v\n", err)
		return false
	}

	// Set up signal handling for graceful shutdown
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	// Wait for shutdown signal
	<-sigChan
	fmt.Printf("\nReceived shutdown signal, stopping database server...\n")

	// Create context with timeout for graceful shutdown
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	// Stop the server with timeout
	done := make(chan error, 1)
	go func() {
		done <- dbServer.Stop()
	}()

	select {
	case err := <-done:
		if err != nil {
			fmt.Fprintf(os.Stderr, "Error stopping database server: %v\n", err)
			return false
		}
		fmt.Printf("Database server stopped successfully\n")
	case <-ctx.Done():
		fmt.Fprintf(os.Stderr, "Timeout waiting for database server to stop\n")
		return false
	}

	return true
}

// parseAuthMethod parses the authentication method string
func parseAuthMethod(method string) (postgres.AuthMethod, error) {
	switch strings.ToLower(method) {
	case "trust":
		return postgres.AuthTrust, nil
	case "password":
		return postgres.AuthPassword, nil
	case "md5":
		return postgres.AuthMD5, nil
	default:
		return postgres.AuthTrust, fmt.Errorf("unsupported auth method '%s'. Supported: trust, password, md5", method)
	}
}

// parseUsers parses the user credentials string with support for secure formats only
// Supported formats:
// 1. JSON format: {"username":"password","username2":"password2"}
// 2. File format: /path/to/users.json or @/path/to/users.json
func parseUsers(usersStr string, authMethod postgres.AuthMethod) (map[string]string, error) {
	users := make(map[string]string)

	if usersStr == "" {
		// No users specified
		if authMethod != postgres.AuthTrust {
			return nil, fmt.Errorf("users must be specified when auth method is not 'trust'")
		}
		return users, nil
	}

	// Trim whitespace
	usersStr = strings.TrimSpace(usersStr)

	// Determine format and parse accordingly
	if strings.HasPrefix(usersStr, "{") && strings.HasSuffix(usersStr, "}") {
		// JSON format
		return parseUsersJSON(usersStr, authMethod)
	}

	// Check if it's a file path (with or without @ prefix) before declaring invalid format
	filePath := strings.TrimPrefix(usersStr, "@")
	if _, err := os.Stat(filePath); err == nil {
		// File format
		return parseUsersFile(usersStr, authMethod) // Pass original string to preserve @ handling
	}

	// Invalid format
	return nil, fmt.Errorf("invalid user credentials format. Use JSON format '{\"user\":\"pass\"}' or file format '@/path/to/users.json' or 'path/to/users.json'. Legacy semicolon-separated format is no longer supported")
}

// parseUsersJSON parses user credentials from JSON format
func parseUsersJSON(jsonStr string, authMethod postgres.AuthMethod) (map[string]string, error) {
	var users map[string]string
	if err := json.Unmarshal([]byte(jsonStr), &users); err != nil {
		return nil, fmt.Errorf("invalid JSON format for users: %v", err)
	}

	// Validate users
	for username, password := range users {
		if username == "" {
			return nil, fmt.Errorf("empty username in JSON user specification")
		}
		if authMethod != postgres.AuthTrust && password == "" {
			return nil, fmt.Errorf("empty password for user '%s' with auth method", username)
		}
	}

	return users, nil
}

// parseUsersFile parses user credentials from a JSON file
func parseUsersFile(filePath string, authMethod postgres.AuthMethod) (map[string]string, error) {
	// Remove @ prefix if present
	filePath = strings.TrimPrefix(filePath, "@")

	// Read file content
	content, err := os.ReadFile(filePath)
	if err != nil {
		return nil, fmt.Errorf("failed to read users file '%s': %v", filePath, err)
	}

	contentStr := strings.TrimSpace(string(content))

	// File must contain JSON format
	if !strings.HasPrefix(contentStr, "{") || !strings.HasSuffix(contentStr, "}") {
		return nil, fmt.Errorf("users file '%s' must contain JSON format: {\"user\":\"pass\"}. Legacy formats are no longer supported", filePath)
	}

	// Parse as JSON
	return parseUsersJSON(contentStr, authMethod)
}

// validatePortNumber validates that the port number is reasonable
func validatePortNumber(port int) error {
	if port < 1 || port > 65535 {
		return fmt.Errorf("port number must be between 1 and 65535, got %d", port)
	}
	if port < 1024 {
		fmt.Fprintf(os.Stderr, "Warning: port number %d may require root privileges\n", port)
	}
	return nil
}