diff options
Diffstat (limited to 'postgres-examples')
| -rw-r--r-- | postgres-examples/README.md | 414 | ||||
| -rw-r--r-- | postgres-examples/test_client.py | 374 |
2 files changed, 788 insertions, 0 deletions
diff --git a/postgres-examples/README.md b/postgres-examples/README.md new file mode 100644 index 000000000..fcf853745 --- /dev/null +++ b/postgres-examples/README.md @@ -0,0 +1,414 @@ +# SeaweedFS PostgreSQL Protocol Examples + +This directory contains examples demonstrating how to connect to SeaweedFS using the PostgreSQL wire protocol. + +## Starting the PostgreSQL Server + +```bash +# Start with trust authentication (no password required) +weed postgres -port=5432 -master=localhost:9333 + +# Start with password authentication +weed postgres -port=5432 -auth=password -users="admin:secret;readonly:view123" + +# Start with MD5 authentication (more secure) +weed postgres -port=5432 -auth=md5 -users="user1:pass1;user2:pass2" + +# Start with TLS encryption +weed postgres -port=5432 -tls-cert=server.crt -tls-key=server.key + +# Allow connections from any host +weed postgres -host=0.0.0.0 -port=5432 +``` + +## Client Connections + +### psql Command Line + +```bash +# Basic connection (trust auth) +psql -h localhost -p 5432 -U seaweedfs -d default + +# With password +PGPASSWORD=secret psql -h localhost -p 5432 -U admin -d default + +# Connection string format +psql "postgresql://admin:secret@localhost:5432/default" + +# Connection string with parameters +psql "host=localhost port=5432 dbname=default user=admin password=secret" +``` + +### Programming Languages + +#### Python (psycopg2) +```python +import psycopg2 + +# Connect to SeaweedFS +conn = psycopg2.connect( + host="localhost", + port=5432, + user="seaweedfs", + database="default" +) + +# Execute queries +cursor = conn.cursor() +cursor.execute("SELECT * FROM my_topic LIMIT 10") + +for row in cursor.fetchall(): + print(row) + +cursor.close() +conn.close() +``` + +#### Java JDBC +```java +import java.sql.*; + +public class SeaweedFSExample { + public static void main(String[] args) throws SQLException { + String url = "jdbc:postgresql://localhost:5432/default"; + + Connection conn = DriverManager.getConnection(url, "seaweedfs", ""); + Statement stmt = conn.createStatement(); + + ResultSet rs = stmt.executeQuery("SELECT * FROM my_topic LIMIT 10"); + while (rs.next()) { + System.out.println("ID: " + rs.getLong("id")); + System.out.println("Message: " + rs.getString("message")); + } + + rs.close(); + stmt.close(); + conn.close(); + } +} +``` + +#### Go (lib/pq) +```go +package main + +import ( + "database/sql" + "fmt" + _ "github.com/lib/pq" +) + +func main() { + db, err := sql.Open("postgres", + "host=localhost port=5432 user=seaweedfs dbname=default sslmode=disable") + if err != nil { + panic(err) + } + defer db.Close() + + rows, err := db.Query("SELECT * FROM my_topic LIMIT 10") + if err != nil { + panic(err) + } + defer rows.Close() + + for rows.Next() { + var id int64 + var message string + err := rows.Scan(&id, &message) + if err != nil { + panic(err) + } + fmt.Printf("ID: %d, Message: %s\n", id, message) + } +} +``` + +#### Node.js (pg) +```javascript +const { Client } = require('pg'); + +const client = new Client({ + host: 'localhost', + port: 5432, + user: 'seaweedfs', + database: 'default', +}); + +async function query() { + await client.connect(); + + const result = await client.query('SELECT * FROM my_topic LIMIT 10'); + console.log(result.rows); + + await client.end(); +} + +query().catch(console.error); +``` + +## SQL Operations + +### Basic Queries +```sql +-- List databases +SHOW DATABASES; + +-- List tables (topics) +SHOW TABLES; + +-- Describe table structure +DESCRIBE my_topic; +-- or use the shorthand: DESC my_topic; + +-- Basic select +SELECT * FROM my_topic; + +-- With WHERE clause +SELECT id, message FROM my_topic WHERE id > 1000; + +-- With LIMIT +SELECT * FROM my_topic LIMIT 100; +``` + +### Aggregations +```sql +-- Count records +SELECT COUNT(*) FROM my_topic; + +-- Multiple aggregations +SELECT + COUNT(*) as total_messages, + MIN(id) as min_id, + MAX(id) as max_id, + AVG(amount) as avg_amount +FROM my_topic; + +-- Aggregations with WHERE +SELECT COUNT(*) FROM my_topic WHERE status = 'active'; +``` + +### System Columns +```sql +-- Access system columns +SELECT + id, + message, + _timestamp_ns as timestamp, + _key as partition_key, + _source as data_source +FROM my_topic; + +-- Filter by timestamp +SELECT * FROM my_topic +WHERE _timestamp_ns > 1640995200000000000 +LIMIT 10; +``` + +### PostgreSQL System Queries +```sql +-- Version information +SELECT version(); + +-- Current database +SELECT current_database(); + +-- Current user +SELECT current_user; + +-- Server settings +SELECT current_setting('server_version'); +SELECT current_setting('server_encoding'); +``` + +## psql Meta-Commands + +```sql +-- List tables +\d +\dt + +-- List databases +\l + +-- Describe specific table +\d my_topic +\dt my_topic + +-- List schemas +\dn + +-- Help +\h +\? + +-- Quit +\q +``` + +## Database Tools Integration + +### DBeaver +1. Create New Connection → PostgreSQL +2. Settings: + - **Host**: localhost + - **Port**: 5432 + - **Database**: default + - **Username**: seaweedfs (or configured user) + - **Password**: (if using password auth) + +### pgAdmin +1. Add New Server +2. Connection tab: + - **Host**: localhost + - **Port**: 5432 + - **Username**: seaweedfs + - **Database**: default + +### DataGrip +1. New Data Source → PostgreSQL +2. Configure: + - **Host**: localhost + - **Port**: 5432 + - **User**: seaweedfs + - **Database**: default + +### Grafana +1. Add Data Source → PostgreSQL +2. Configuration: + - **Host**: localhost:5432 + - **Database**: default + - **User**: seaweedfs + - **SSL Mode**: disable + +## BI Tools + +### Tableau +1. Connect to Data → PostgreSQL +2. Server: localhost +3. Port: 5432 +4. Database: default +5. Username: seaweedfs + +### Power BI +1. Get Data → Database → PostgreSQL +2. Server: localhost +3. Database: default +4. Username: seaweedfs + +## Connection Pooling + +### Java (HikariCP) +```java +HikariConfig config = new HikariConfig(); +config.setJdbcUrl("jdbc:postgresql://localhost:5432/default"); +config.setUsername("seaweedfs"); +config.setMaximumPoolSize(10); + +HikariDataSource dataSource = new HikariDataSource(config); +``` + +### Python (connection pooling) +```python +from psycopg2 import pool + +connection_pool = psycopg2.pool.SimpleConnectionPool( + 1, 20, + host="localhost", + port=5432, + user="seaweedfs", + database="default" +) + +conn = connection_pool.getconn() +# Use connection +connection_pool.putconn(conn) +``` + +## Security Best Practices + +### Use TLS Encryption +```bash +# Generate self-signed certificate for testing +openssl req -x509 -newkey rsa:4096 -keyout server.key -out server.crt -days 365 -nodes + +# Start with TLS +weed postgres -tls-cert=server.crt -tls-key=server.key +``` + +### Use MD5 Authentication +```bash +# More secure than password auth +weed postgres -auth=md5 -users="admin:secret123;readonly:view456" +``` + +### Limit Connections +```bash +# Limit concurrent connections +weed postgres -max-connections=50 -idle-timeout=30m +``` + +## Troubleshooting + +### Connection Issues +```bash +# Test connectivity +telnet localhost 5432 + +# Check if server is running +ps aux | grep "weed postgres" + +# Check logs for errors +tail -f /var/log/seaweedfs/postgres.log +``` + +### Common Errors + +**"Connection refused"** +- Ensure PostgreSQL server is running +- Check host/port configuration +- Verify firewall settings + +**"Authentication failed"** +- Check username/password +- Verify auth method configuration +- Ensure user is configured in server + +**"Database does not exist"** +- Use correct database name (default: 'default') +- Check available databases: `SHOW DATABASES` + +**"Permission denied"** +- Check user permissions +- Verify authentication method +- Use correct credentials + +## Performance Tips + +1. **Use LIMIT clauses** for large result sets +2. **Filter with WHERE clauses** to reduce data transfer +3. **Use connection pooling** for multi-threaded applications +4. **Close resources properly** (connections, statements, result sets) +5. **Use prepared statements** for repeated queries + +## Monitoring + +### Connection Statistics +```sql +-- Current connections (if supported) +SELECT COUNT(*) FROM pg_stat_activity; + +-- Server version +SELECT version(); + +-- Current settings +SELECT name, setting FROM pg_settings WHERE name LIKE '%connection%'; +``` + +### Query Performance +```sql +-- Use EXPLAIN for query plans (if supported) +EXPLAIN SELECT * FROM my_topic WHERE id > 1000; +``` + +This PostgreSQL protocol support makes SeaweedFS accessible to the entire PostgreSQL ecosystem, enabling seamless integration with existing tools, applications, and workflows. diff --git a/postgres-examples/test_client.py b/postgres-examples/test_client.py new file mode 100644 index 000000000..e293d53cc --- /dev/null +++ b/postgres-examples/test_client.py @@ -0,0 +1,374 @@ +#!/usr/bin/env python3 +""" +Test client for SeaweedFS PostgreSQL protocol support. + +This script demonstrates how to connect to SeaweedFS using standard PostgreSQL +libraries and execute various types of queries. + +Requirements: + pip install psycopg2-binary + +Usage: + python test_client.py + python test_client.py --host localhost --port 5432 --user seaweedfs --database default +""" + +import sys +import argparse +import time +import traceback + +try: + import psycopg2 + import psycopg2.extras +except ImportError: + print("Error: psycopg2 not found. Install with: pip install psycopg2-binary") + sys.exit(1) + + +def test_connection(host, port, user, database, password=None): + """Test basic connection to SeaweedFS PostgreSQL server.""" + print(f"🔗 Testing connection to {host}:{port}/{database} as user '{user}'") + + try: + conn_params = { + 'host': host, + 'port': port, + 'user': user, + 'database': database, + 'connect_timeout': 10 + } + + if password: + conn_params['password'] = password + + conn = psycopg2.connect(**conn_params) + print("✅ Connection successful!") + + # Test basic query + cursor = conn.cursor() + cursor.execute("SELECT 1 as test") + result = cursor.fetchone() + print(f"✅ Basic query successful: {result}") + + cursor.close() + conn.close() + return True + + except Exception as e: + print(f"❌ Connection failed: {e}") + return False + + +def test_system_queries(host, port, user, database, password=None): + """Test PostgreSQL system queries.""" + print("\n🔧 Testing PostgreSQL system queries...") + + try: + conn_params = { + 'host': host, + 'port': port, + 'user': user, + 'database': database + } + if password: + conn_params['password'] = password + + conn = psycopg2.connect(**conn_params) + cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + + system_queries = [ + ("Version", "SELECT version()"), + ("Current Database", "SELECT current_database()"), + ("Current User", "SELECT current_user"), + ("Server Encoding", "SELECT current_setting('server_encoding')"), + ("Client Encoding", "SELECT current_setting('client_encoding')"), + ] + + for name, query in system_queries: + try: + cursor.execute(query) + result = cursor.fetchone() + print(f" ✅ {name}: {result[0]}") + except Exception as e: + print(f" ❌ {name}: {e}") + + cursor.close() + conn.close() + + except Exception as e: + print(f"❌ System queries failed: {e}") + + +def test_schema_queries(host, port, user, database, password=None): + """Test schema and metadata queries.""" + print("\n📊 Testing schema queries...") + + try: + conn_params = { + 'host': host, + 'port': port, + 'user': user, + 'database': database + } + if password: + conn_params['password'] = password + + conn = psycopg2.connect(**conn_params) + cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + + schema_queries = [ + ("Show Databases", "SHOW DATABASES"), + ("Show Tables", "SHOW TABLES"), + ("List Schemas", "SELECT 'public' as schema_name"), + ] + + for name, query in schema_queries: + try: + cursor.execute(query) + results = cursor.fetchall() + print(f" ✅ {name}: Found {len(results)} items") + for row in results[:3]: # Show first 3 results + print(f" - {dict(row)}") + if len(results) > 3: + print(f" ... and {len(results) - 3} more") + except Exception as e: + print(f" ❌ {name}: {e}") + + cursor.close() + conn.close() + + except Exception as e: + print(f"❌ Schema queries failed: {e}") + + +def test_data_queries(host, port, user, database, password=None): + """Test data queries on actual topics.""" + print("\n📝 Testing data queries...") + + try: + conn_params = { + 'host': host, + 'port': port, + 'user': user, + 'database': database + } + if password: + conn_params['password'] = password + + conn = psycopg2.connect(**conn_params) + cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) + + # First, try to get available tables/topics + cursor.execute("SHOW TABLES") + tables = cursor.fetchall() + + if not tables: + print(" ℹ️ No tables/topics found for data testing") + cursor.close() + conn.close() + return + + # Test with first available table + table_name = tables[0][0] if tables[0] else 'test_topic' + print(f" 📋 Testing with table: {table_name}") + + test_queries = [ + (f"Count records in {table_name}", f"SELECT COUNT(*) FROM \"{table_name}\""), + (f"Sample data from {table_name}", f"SELECT * FROM \"{table_name}\" LIMIT 3"), + (f"System columns from {table_name}", f"SELECT _timestamp_ns, _key, _source FROM \"{table_name}\" LIMIT 3"), + (f"Describe {table_name}", f"DESCRIBE \"{table_name}\""), + ] + + for name, query in test_queries: + try: + cursor.execute(query) + results = cursor.fetchall() + + if "COUNT" in query.upper(): + count = results[0][0] if results else 0 + print(f" ✅ {name}: {count} records") + elif "DESCRIBE" in query.upper(): + print(f" ✅ {name}: {len(results)} columns") + for row in results[:5]: # Show first 5 columns + print(f" - {dict(row)}") + else: + print(f" ✅ {name}: {len(results)} rows") + for row in results: + print(f" - {dict(row)}") + + except Exception as e: + print(f" ❌ {name}: {e}") + + cursor.close() + conn.close() + + except Exception as e: + print(f"❌ Data queries failed: {e}") + + +def test_prepared_statements(host, port, user, database, password=None): + """Test prepared statements.""" + print("\n📝 Testing prepared statements...") + + try: + conn_params = { + 'host': host, + 'port': port, + 'user': user, + 'database': database + } + if password: + conn_params['password'] = password + + conn = psycopg2.connect(**conn_params) + cursor = conn.cursor() + + # Test parameterized query + try: + cursor.execute("SELECT %s as param1, %s as param2", ("hello", 42)) + result = cursor.fetchone() + print(f" ✅ Prepared statement: {result}") + except Exception as e: + print(f" ❌ Prepared statement: {e}") + + cursor.close() + conn.close() + + except Exception as e: + print(f"❌ Prepared statements test failed: {e}") + + +def test_transaction_support(host, port, user, database, password=None): + """Test transaction support (should be no-op for read-only).""" + print("\n🔄 Testing transaction support...") + + try: + conn_params = { + 'host': host, + 'port': port, + 'user': user, + 'database': database + } + if password: + conn_params['password'] = password + + conn = psycopg2.connect(**conn_params) + cursor = conn.cursor() + + transaction_commands = [ + "BEGIN", + "SELECT 1 as in_transaction", + "COMMIT", + "SELECT 1 as after_commit", + ] + + for cmd in transaction_commands: + try: + cursor.execute(cmd) + if "SELECT" in cmd: + result = cursor.fetchone() + print(f" ✅ {cmd}: {result}") + else: + print(f" ✅ {cmd}: OK") + except Exception as e: + print(f" ❌ {cmd}: {e}") + + cursor.close() + conn.close() + + except Exception as e: + print(f"❌ Transaction test failed: {e}") + + +def test_performance(host, port, user, database, password=None, iterations=10): + """Test query performance.""" + print(f"\n⚡ Testing performance ({iterations} iterations)...") + + try: + conn_params = { + 'host': host, + 'port': port, + 'user': user, + 'database': database + } + if password: + conn_params['password'] = password + + times = [] + + for i in range(iterations): + start_time = time.time() + + conn = psycopg2.connect(**conn_params) + cursor = conn.cursor() + cursor.execute("SELECT 1") + result = cursor.fetchone() + cursor.close() + conn.close() + + elapsed = time.time() - start_time + times.append(elapsed) + + if i < 3: # Show first 3 iterations + print(f" Iteration {i+1}: {elapsed:.3f}s") + + avg_time = sum(times) / len(times) + min_time = min(times) + max_time = max(times) + + print(f" ✅ Performance results:") + print(f" - Average: {avg_time:.3f}s") + print(f" - Min: {min_time:.3f}s") + print(f" - Max: {max_time:.3f}s") + + except Exception as e: + print(f"❌ Performance test failed: {e}") + + +def main(): + parser = argparse.ArgumentParser(description="Test SeaweedFS PostgreSQL Protocol") + parser.add_argument("--host", default="localhost", help="PostgreSQL server host") + parser.add_argument("--port", type=int, default=5432, help="PostgreSQL server port") + parser.add_argument("--user", default="seaweedfs", help="PostgreSQL username") + parser.add_argument("--password", help="PostgreSQL password") + parser.add_argument("--database", default="default", help="PostgreSQL database") + parser.add_argument("--skip-performance", action="store_true", help="Skip performance tests") + + args = parser.parse_args() + + print("🧪 SeaweedFS PostgreSQL Protocol Test Client") + print("=" * 50) + + # Test basic connection first + if not test_connection(args.host, args.port, args.user, args.database, args.password): + print("\n❌ Basic connection failed. Cannot continue with other tests.") + sys.exit(1) + + # Run all tests + try: + test_system_queries(args.host, args.port, args.user, args.database, args.password) + test_schema_queries(args.host, args.port, args.user, args.database, args.password) + test_data_queries(args.host, args.port, args.user, args.database, args.password) + test_prepared_statements(args.host, args.port, args.user, args.database, args.password) + test_transaction_support(args.host, args.port, args.user, args.database, args.password) + + if not args.skip_performance: + test_performance(args.host, args.port, args.user, args.database, args.password) + + except KeyboardInterrupt: + print("\n\n⚠️ Tests interrupted by user") + sys.exit(0) + except Exception as e: + print(f"\n❌ Unexpected error during testing: {e}") + traceback.print_exc() + sys.exit(1) + + print("\n🎉 All tests completed!") + print("\nTo use SeaweedFS with PostgreSQL tools:") + print(f" psql -h {args.host} -p {args.port} -U {args.user} -d {args.database}") + print(f" Connection string: postgresql://{args.user}@{args.host}:{args.port}/{args.database}") + + +if __name__ == "__main__": + main() |
