aboutsummaryrefslogtreecommitdiff
path: root/postgres-examples
diff options
context:
space:
mode:
Diffstat (limited to 'postgres-examples')
-rw-r--r--postgres-examples/README.md414
-rw-r--r--postgres-examples/test_client.py374
2 files changed, 788 insertions, 0 deletions
diff --git a/postgres-examples/README.md b/postgres-examples/README.md
new file mode 100644
index 000000000..fcf853745
--- /dev/null
+++ b/postgres-examples/README.md
@@ -0,0 +1,414 @@
+# SeaweedFS PostgreSQL Protocol Examples
+
+This directory contains examples demonstrating how to connect to SeaweedFS using the PostgreSQL wire protocol.
+
+## Starting the PostgreSQL Server
+
+```bash
+# Start with trust authentication (no password required)
+weed postgres -port=5432 -master=localhost:9333
+
+# Start with password authentication
+weed postgres -port=5432 -auth=password -users="admin:secret;readonly:view123"
+
+# Start with MD5 authentication (more secure)
+weed postgres -port=5432 -auth=md5 -users="user1:pass1;user2:pass2"
+
+# Start with TLS encryption
+weed postgres -port=5432 -tls-cert=server.crt -tls-key=server.key
+
+# Allow connections from any host
+weed postgres -host=0.0.0.0 -port=5432
+```
+
+## Client Connections
+
+### psql Command Line
+
+```bash
+# Basic connection (trust auth)
+psql -h localhost -p 5432 -U seaweedfs -d default
+
+# With password
+PGPASSWORD=secret psql -h localhost -p 5432 -U admin -d default
+
+# Connection string format
+psql "postgresql://admin:secret@localhost:5432/default"
+
+# Connection string with parameters
+psql "host=localhost port=5432 dbname=default user=admin password=secret"
+```
+
+### Programming Languages
+
+#### Python (psycopg2)
+```python
+import psycopg2
+
+# Connect to SeaweedFS
+conn = psycopg2.connect(
+ host="localhost",
+ port=5432,
+ user="seaweedfs",
+ database="default"
+)
+
+# Execute queries
+cursor = conn.cursor()
+cursor.execute("SELECT * FROM my_topic LIMIT 10")
+
+for row in cursor.fetchall():
+ print(row)
+
+cursor.close()
+conn.close()
+```
+
+#### Java JDBC
+```java
+import java.sql.*;
+
+public class SeaweedFSExample {
+ public static void main(String[] args) throws SQLException {
+ String url = "jdbc:postgresql://localhost:5432/default";
+
+ Connection conn = DriverManager.getConnection(url, "seaweedfs", "");
+ Statement stmt = conn.createStatement();
+
+ ResultSet rs = stmt.executeQuery("SELECT * FROM my_topic LIMIT 10");
+ while (rs.next()) {
+ System.out.println("ID: " + rs.getLong("id"));
+ System.out.println("Message: " + rs.getString("message"));
+ }
+
+ rs.close();
+ stmt.close();
+ conn.close();
+ }
+}
+```
+
+#### Go (lib/pq)
+```go
+package main
+
+import (
+ "database/sql"
+ "fmt"
+ _ "github.com/lib/pq"
+)
+
+func main() {
+ db, err := sql.Open("postgres",
+ "host=localhost port=5432 user=seaweedfs dbname=default sslmode=disable")
+ if err != nil {
+ panic(err)
+ }
+ defer db.Close()
+
+ rows, err := db.Query("SELECT * FROM my_topic LIMIT 10")
+ if err != nil {
+ panic(err)
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var id int64
+ var message string
+ err := rows.Scan(&id, &message)
+ if err != nil {
+ panic(err)
+ }
+ fmt.Printf("ID: %d, Message: %s\n", id, message)
+ }
+}
+```
+
+#### Node.js (pg)
+```javascript
+const { Client } = require('pg');
+
+const client = new Client({
+ host: 'localhost',
+ port: 5432,
+ user: 'seaweedfs',
+ database: 'default',
+});
+
+async function query() {
+ await client.connect();
+
+ const result = await client.query('SELECT * FROM my_topic LIMIT 10');
+ console.log(result.rows);
+
+ await client.end();
+}
+
+query().catch(console.error);
+```
+
+## SQL Operations
+
+### Basic Queries
+```sql
+-- List databases
+SHOW DATABASES;
+
+-- List tables (topics)
+SHOW TABLES;
+
+-- Describe table structure
+DESCRIBE my_topic;
+-- or use the shorthand: DESC my_topic;
+
+-- Basic select
+SELECT * FROM my_topic;
+
+-- With WHERE clause
+SELECT id, message FROM my_topic WHERE id > 1000;
+
+-- With LIMIT
+SELECT * FROM my_topic LIMIT 100;
+```
+
+### Aggregations
+```sql
+-- Count records
+SELECT COUNT(*) FROM my_topic;
+
+-- Multiple aggregations
+SELECT
+ COUNT(*) as total_messages,
+ MIN(id) as min_id,
+ MAX(id) as max_id,
+ AVG(amount) as avg_amount
+FROM my_topic;
+
+-- Aggregations with WHERE
+SELECT COUNT(*) FROM my_topic WHERE status = 'active';
+```
+
+### System Columns
+```sql
+-- Access system columns
+SELECT
+ id,
+ message,
+ _timestamp_ns as timestamp,
+ _key as partition_key,
+ _source as data_source
+FROM my_topic;
+
+-- Filter by timestamp
+SELECT * FROM my_topic
+WHERE _timestamp_ns > 1640995200000000000
+LIMIT 10;
+```
+
+### PostgreSQL System Queries
+```sql
+-- Version information
+SELECT version();
+
+-- Current database
+SELECT current_database();
+
+-- Current user
+SELECT current_user;
+
+-- Server settings
+SELECT current_setting('server_version');
+SELECT current_setting('server_encoding');
+```
+
+## psql Meta-Commands
+
+```sql
+-- List tables
+\d
+\dt
+
+-- List databases
+\l
+
+-- Describe specific table
+\d my_topic
+\dt my_topic
+
+-- List schemas
+\dn
+
+-- Help
+\h
+\?
+
+-- Quit
+\q
+```
+
+## Database Tools Integration
+
+### DBeaver
+1. Create New Connection → PostgreSQL
+2. Settings:
+ - **Host**: localhost
+ - **Port**: 5432
+ - **Database**: default
+ - **Username**: seaweedfs (or configured user)
+ - **Password**: (if using password auth)
+
+### pgAdmin
+1. Add New Server
+2. Connection tab:
+ - **Host**: localhost
+ - **Port**: 5432
+ - **Username**: seaweedfs
+ - **Database**: default
+
+### DataGrip
+1. New Data Source → PostgreSQL
+2. Configure:
+ - **Host**: localhost
+ - **Port**: 5432
+ - **User**: seaweedfs
+ - **Database**: default
+
+### Grafana
+1. Add Data Source → PostgreSQL
+2. Configuration:
+ - **Host**: localhost:5432
+ - **Database**: default
+ - **User**: seaweedfs
+ - **SSL Mode**: disable
+
+## BI Tools
+
+### Tableau
+1. Connect to Data → PostgreSQL
+2. Server: localhost
+3. Port: 5432
+4. Database: default
+5. Username: seaweedfs
+
+### Power BI
+1. Get Data → Database → PostgreSQL
+2. Server: localhost
+3. Database: default
+4. Username: seaweedfs
+
+## Connection Pooling
+
+### Java (HikariCP)
+```java
+HikariConfig config = new HikariConfig();
+config.setJdbcUrl("jdbc:postgresql://localhost:5432/default");
+config.setUsername("seaweedfs");
+config.setMaximumPoolSize(10);
+
+HikariDataSource dataSource = new HikariDataSource(config);
+```
+
+### Python (connection pooling)
+```python
+from psycopg2 import pool
+
+connection_pool = psycopg2.pool.SimpleConnectionPool(
+ 1, 20,
+ host="localhost",
+ port=5432,
+ user="seaweedfs",
+ database="default"
+)
+
+conn = connection_pool.getconn()
+# Use connection
+connection_pool.putconn(conn)
+```
+
+## Security Best Practices
+
+### Use TLS Encryption
+```bash
+# Generate self-signed certificate for testing
+openssl req -x509 -newkey rsa:4096 -keyout server.key -out server.crt -days 365 -nodes
+
+# Start with TLS
+weed postgres -tls-cert=server.crt -tls-key=server.key
+```
+
+### Use MD5 Authentication
+```bash
+# More secure than password auth
+weed postgres -auth=md5 -users="admin:secret123;readonly:view456"
+```
+
+### Limit Connections
+```bash
+# Limit concurrent connections
+weed postgres -max-connections=50 -idle-timeout=30m
+```
+
+## Troubleshooting
+
+### Connection Issues
+```bash
+# Test connectivity
+telnet localhost 5432
+
+# Check if server is running
+ps aux | grep "weed postgres"
+
+# Check logs for errors
+tail -f /var/log/seaweedfs/postgres.log
+```
+
+### Common Errors
+
+**"Connection refused"**
+- Ensure PostgreSQL server is running
+- Check host/port configuration
+- Verify firewall settings
+
+**"Authentication failed"**
+- Check username/password
+- Verify auth method configuration
+- Ensure user is configured in server
+
+**"Database does not exist"**
+- Use correct database name (default: 'default')
+- Check available databases: `SHOW DATABASES`
+
+**"Permission denied"**
+- Check user permissions
+- Verify authentication method
+- Use correct credentials
+
+## Performance Tips
+
+1. **Use LIMIT clauses** for large result sets
+2. **Filter with WHERE clauses** to reduce data transfer
+3. **Use connection pooling** for multi-threaded applications
+4. **Close resources properly** (connections, statements, result sets)
+5. **Use prepared statements** for repeated queries
+
+## Monitoring
+
+### Connection Statistics
+```sql
+-- Current connections (if supported)
+SELECT COUNT(*) FROM pg_stat_activity;
+
+-- Server version
+SELECT version();
+
+-- Current settings
+SELECT name, setting FROM pg_settings WHERE name LIKE '%connection%';
+```
+
+### Query Performance
+```sql
+-- Use EXPLAIN for query plans (if supported)
+EXPLAIN SELECT * FROM my_topic WHERE id > 1000;
+```
+
+This PostgreSQL protocol support makes SeaweedFS accessible to the entire PostgreSQL ecosystem, enabling seamless integration with existing tools, applications, and workflows.
diff --git a/postgres-examples/test_client.py b/postgres-examples/test_client.py
new file mode 100644
index 000000000..e293d53cc
--- /dev/null
+++ b/postgres-examples/test_client.py
@@ -0,0 +1,374 @@
+#!/usr/bin/env python3
+"""
+Test client for SeaweedFS PostgreSQL protocol support.
+
+This script demonstrates how to connect to SeaweedFS using standard PostgreSQL
+libraries and execute various types of queries.
+
+Requirements:
+ pip install psycopg2-binary
+
+Usage:
+ python test_client.py
+ python test_client.py --host localhost --port 5432 --user seaweedfs --database default
+"""
+
+import sys
+import argparse
+import time
+import traceback
+
+try:
+ import psycopg2
+ import psycopg2.extras
+except ImportError:
+ print("Error: psycopg2 not found. Install with: pip install psycopg2-binary")
+ sys.exit(1)
+
+
+def test_connection(host, port, user, database, password=None):
+ """Test basic connection to SeaweedFS PostgreSQL server."""
+ print(f"🔗 Testing connection to {host}:{port}/{database} as user '{user}'")
+
+ try:
+ conn_params = {
+ 'host': host,
+ 'port': port,
+ 'user': user,
+ 'database': database,
+ 'connect_timeout': 10
+ }
+
+ if password:
+ conn_params['password'] = password
+
+ conn = psycopg2.connect(**conn_params)
+ print("✅ Connection successful!")
+
+ # Test basic query
+ cursor = conn.cursor()
+ cursor.execute("SELECT 1 as test")
+ result = cursor.fetchone()
+ print(f"✅ Basic query successful: {result}")
+
+ cursor.close()
+ conn.close()
+ return True
+
+ except Exception as e:
+ print(f"❌ Connection failed: {e}")
+ return False
+
+
+def test_system_queries(host, port, user, database, password=None):
+ """Test PostgreSQL system queries."""
+ print("\n🔧 Testing PostgreSQL system queries...")
+
+ try:
+ conn_params = {
+ 'host': host,
+ 'port': port,
+ 'user': user,
+ 'database': database
+ }
+ if password:
+ conn_params['password'] = password
+
+ conn = psycopg2.connect(**conn_params)
+ cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ system_queries = [
+ ("Version", "SELECT version()"),
+ ("Current Database", "SELECT current_database()"),
+ ("Current User", "SELECT current_user"),
+ ("Server Encoding", "SELECT current_setting('server_encoding')"),
+ ("Client Encoding", "SELECT current_setting('client_encoding')"),
+ ]
+
+ for name, query in system_queries:
+ try:
+ cursor.execute(query)
+ result = cursor.fetchone()
+ print(f" ✅ {name}: {result[0]}")
+ except Exception as e:
+ print(f" ❌ {name}: {e}")
+
+ cursor.close()
+ conn.close()
+
+ except Exception as e:
+ print(f"❌ System queries failed: {e}")
+
+
+def test_schema_queries(host, port, user, database, password=None):
+ """Test schema and metadata queries."""
+ print("\n📊 Testing schema queries...")
+
+ try:
+ conn_params = {
+ 'host': host,
+ 'port': port,
+ 'user': user,
+ 'database': database
+ }
+ if password:
+ conn_params['password'] = password
+
+ conn = psycopg2.connect(**conn_params)
+ cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ schema_queries = [
+ ("Show Databases", "SHOW DATABASES"),
+ ("Show Tables", "SHOW TABLES"),
+ ("List Schemas", "SELECT 'public' as schema_name"),
+ ]
+
+ for name, query in schema_queries:
+ try:
+ cursor.execute(query)
+ results = cursor.fetchall()
+ print(f" ✅ {name}: Found {len(results)} items")
+ for row in results[:3]: # Show first 3 results
+ print(f" - {dict(row)}")
+ if len(results) > 3:
+ print(f" ... and {len(results) - 3} more")
+ except Exception as e:
+ print(f" ❌ {name}: {e}")
+
+ cursor.close()
+ conn.close()
+
+ except Exception as e:
+ print(f"❌ Schema queries failed: {e}")
+
+
+def test_data_queries(host, port, user, database, password=None):
+ """Test data queries on actual topics."""
+ print("\n📝 Testing data queries...")
+
+ try:
+ conn_params = {
+ 'host': host,
+ 'port': port,
+ 'user': user,
+ 'database': database
+ }
+ if password:
+ conn_params['password'] = password
+
+ conn = psycopg2.connect(**conn_params)
+ cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
+
+ # First, try to get available tables/topics
+ cursor.execute("SHOW TABLES")
+ tables = cursor.fetchall()
+
+ if not tables:
+ print(" ℹ️ No tables/topics found for data testing")
+ cursor.close()
+ conn.close()
+ return
+
+ # Test with first available table
+ table_name = tables[0][0] if tables[0] else 'test_topic'
+ print(f" 📋 Testing with table: {table_name}")
+
+ test_queries = [
+ (f"Count records in {table_name}", f"SELECT COUNT(*) FROM \"{table_name}\""),
+ (f"Sample data from {table_name}", f"SELECT * FROM \"{table_name}\" LIMIT 3"),
+ (f"System columns from {table_name}", f"SELECT _timestamp_ns, _key, _source FROM \"{table_name}\" LIMIT 3"),
+ (f"Describe {table_name}", f"DESCRIBE \"{table_name}\""),
+ ]
+
+ for name, query in test_queries:
+ try:
+ cursor.execute(query)
+ results = cursor.fetchall()
+
+ if "COUNT" in query.upper():
+ count = results[0][0] if results else 0
+ print(f" ✅ {name}: {count} records")
+ elif "DESCRIBE" in query.upper():
+ print(f" ✅ {name}: {len(results)} columns")
+ for row in results[:5]: # Show first 5 columns
+ print(f" - {dict(row)}")
+ else:
+ print(f" ✅ {name}: {len(results)} rows")
+ for row in results:
+ print(f" - {dict(row)}")
+
+ except Exception as e:
+ print(f" ❌ {name}: {e}")
+
+ cursor.close()
+ conn.close()
+
+ except Exception as e:
+ print(f"❌ Data queries failed: {e}")
+
+
+def test_prepared_statements(host, port, user, database, password=None):
+ """Test prepared statements."""
+ print("\n📝 Testing prepared statements...")
+
+ try:
+ conn_params = {
+ 'host': host,
+ 'port': port,
+ 'user': user,
+ 'database': database
+ }
+ if password:
+ conn_params['password'] = password
+
+ conn = psycopg2.connect(**conn_params)
+ cursor = conn.cursor()
+
+ # Test parameterized query
+ try:
+ cursor.execute("SELECT %s as param1, %s as param2", ("hello", 42))
+ result = cursor.fetchone()
+ print(f" ✅ Prepared statement: {result}")
+ except Exception as e:
+ print(f" ❌ Prepared statement: {e}")
+
+ cursor.close()
+ conn.close()
+
+ except Exception as e:
+ print(f"❌ Prepared statements test failed: {e}")
+
+
+def test_transaction_support(host, port, user, database, password=None):
+ """Test transaction support (should be no-op for read-only)."""
+ print("\n🔄 Testing transaction support...")
+
+ try:
+ conn_params = {
+ 'host': host,
+ 'port': port,
+ 'user': user,
+ 'database': database
+ }
+ if password:
+ conn_params['password'] = password
+
+ conn = psycopg2.connect(**conn_params)
+ cursor = conn.cursor()
+
+ transaction_commands = [
+ "BEGIN",
+ "SELECT 1 as in_transaction",
+ "COMMIT",
+ "SELECT 1 as after_commit",
+ ]
+
+ for cmd in transaction_commands:
+ try:
+ cursor.execute(cmd)
+ if "SELECT" in cmd:
+ result = cursor.fetchone()
+ print(f" ✅ {cmd}: {result}")
+ else:
+ print(f" ✅ {cmd}: OK")
+ except Exception as e:
+ print(f" ❌ {cmd}: {e}")
+
+ cursor.close()
+ conn.close()
+
+ except Exception as e:
+ print(f"❌ Transaction test failed: {e}")
+
+
+def test_performance(host, port, user, database, password=None, iterations=10):
+ """Test query performance."""
+ print(f"\n⚡ Testing performance ({iterations} iterations)...")
+
+ try:
+ conn_params = {
+ 'host': host,
+ 'port': port,
+ 'user': user,
+ 'database': database
+ }
+ if password:
+ conn_params['password'] = password
+
+ times = []
+
+ for i in range(iterations):
+ start_time = time.time()
+
+ conn = psycopg2.connect(**conn_params)
+ cursor = conn.cursor()
+ cursor.execute("SELECT 1")
+ result = cursor.fetchone()
+ cursor.close()
+ conn.close()
+
+ elapsed = time.time() - start_time
+ times.append(elapsed)
+
+ if i < 3: # Show first 3 iterations
+ print(f" Iteration {i+1}: {elapsed:.3f}s")
+
+ avg_time = sum(times) / len(times)
+ min_time = min(times)
+ max_time = max(times)
+
+ print(f" ✅ Performance results:")
+ print(f" - Average: {avg_time:.3f}s")
+ print(f" - Min: {min_time:.3f}s")
+ print(f" - Max: {max_time:.3f}s")
+
+ except Exception as e:
+ print(f"❌ Performance test failed: {e}")
+
+
+def main():
+ parser = argparse.ArgumentParser(description="Test SeaweedFS PostgreSQL Protocol")
+ parser.add_argument("--host", default="localhost", help="PostgreSQL server host")
+ parser.add_argument("--port", type=int, default=5432, help="PostgreSQL server port")
+ parser.add_argument("--user", default="seaweedfs", help="PostgreSQL username")
+ parser.add_argument("--password", help="PostgreSQL password")
+ parser.add_argument("--database", default="default", help="PostgreSQL database")
+ parser.add_argument("--skip-performance", action="store_true", help="Skip performance tests")
+
+ args = parser.parse_args()
+
+ print("🧪 SeaweedFS PostgreSQL Protocol Test Client")
+ print("=" * 50)
+
+ # Test basic connection first
+ if not test_connection(args.host, args.port, args.user, args.database, args.password):
+ print("\n❌ Basic connection failed. Cannot continue with other tests.")
+ sys.exit(1)
+
+ # Run all tests
+ try:
+ test_system_queries(args.host, args.port, args.user, args.database, args.password)
+ test_schema_queries(args.host, args.port, args.user, args.database, args.password)
+ test_data_queries(args.host, args.port, args.user, args.database, args.password)
+ test_prepared_statements(args.host, args.port, args.user, args.database, args.password)
+ test_transaction_support(args.host, args.port, args.user, args.database, args.password)
+
+ if not args.skip_performance:
+ test_performance(args.host, args.port, args.user, args.database, args.password)
+
+ except KeyboardInterrupt:
+ print("\n\n⚠️ Tests interrupted by user")
+ sys.exit(0)
+ except Exception as e:
+ print(f"\n❌ Unexpected error during testing: {e}")
+ traceback.print_exc()
+ sys.exit(1)
+
+ print("\n🎉 All tests completed!")
+ print("\nTo use SeaweedFS with PostgreSQL tools:")
+ print(f" psql -h {args.host} -p {args.port} -U {args.user} -d {args.database}")
+ print(f" Connection string: postgresql://{args.user}@{args.host}:{args.port}/{args.database}")
+
+
+if __name__ == "__main__":
+ main()