diff options
Diffstat (limited to 'weed/credential/postgres')
| -rw-r--r-- | weed/credential/postgres/postgres_store.go | 570 |
1 files changed, 570 insertions, 0 deletions
diff --git a/weed/credential/postgres/postgres_store.go b/weed/credential/postgres/postgres_store.go new file mode 100644 index 000000000..0d75ad8c0 --- /dev/null +++ b/weed/credential/postgres/postgres_store.go @@ -0,0 +1,570 @@ +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/credential" + "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + + _ "github.com/lib/pq" +) + +func init() { + credential.Stores = append(credential.Stores, &PostgresStore{}) +} + +// PostgresStore implements CredentialStore using PostgreSQL +type PostgresStore struct { + db *sql.DB + configured bool +} + +func (store *PostgresStore) GetName() credential.CredentialStoreTypeName { + return credential.StoreTypePostgres +} + +func (store *PostgresStore) Initialize(configuration util.Configuration, prefix string) error { + if store.configured { + return nil + } + + hostname := configuration.GetString(prefix + "hostname") + port := configuration.GetInt(prefix + "port") + username := configuration.GetString(prefix + "username") + password := configuration.GetString(prefix + "password") + database := configuration.GetString(prefix + "database") + schema := configuration.GetString(prefix + "schema") + sslmode := configuration.GetString(prefix + "sslmode") + + // Set defaults + if hostname == "" { + hostname = "localhost" + } + if port == 0 { + port = 5432 + } + if schema == "" { + schema = "public" + } + if sslmode == "" { + sslmode = "disable" + } + + // Build connection string + connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=%s search_path=%s", + hostname, port, username, password, database, sslmode, schema) + + db, err := sql.Open("postgres", connStr) + if err != nil { + return fmt.Errorf("failed to open database: %v", err) + } + + // Test connection + if err := db.Ping(); err != nil { + db.Close() + return fmt.Errorf("failed to ping database: %v", err) + } + + // Set connection pool settings + db.SetMaxOpenConns(25) + db.SetMaxIdleConns(5) + db.SetConnMaxLifetime(5 * time.Minute) + + store.db = db + + // Create tables if they don't exist + if err := store.createTables(); err != nil { + db.Close() + return fmt.Errorf("failed to create tables: %v", err) + } + + store.configured = true + return nil +} + +func (store *PostgresStore) createTables() error { + // Create users table + usersTable := ` + CREATE TABLE IF NOT EXISTS users ( + username VARCHAR(255) PRIMARY KEY, + email VARCHAR(255), + account_data JSONB, + actions JSONB, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + CREATE INDEX IF NOT EXISTS idx_users_email ON users(email); + ` + + // Create credentials table + credentialsTable := ` + CREATE TABLE IF NOT EXISTS credentials ( + id SERIAL PRIMARY KEY, + username VARCHAR(255) REFERENCES users(username) ON DELETE CASCADE, + access_key VARCHAR(255) UNIQUE NOT NULL, + secret_key VARCHAR(255) NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + CREATE INDEX IF NOT EXISTS idx_credentials_username ON credentials(username); + CREATE INDEX IF NOT EXISTS idx_credentials_access_key ON credentials(access_key); + ` + + // Execute table creation + if _, err := store.db.Exec(usersTable); err != nil { + return fmt.Errorf("failed to create users table: %v", err) + } + + if _, err := store.db.Exec(credentialsTable); err != nil { + return fmt.Errorf("failed to create credentials table: %v", err) + } + + return nil +} + +func (store *PostgresStore) LoadConfiguration(ctx context.Context) (*iam_pb.S3ApiConfiguration, error) { + if !store.configured { + return nil, fmt.Errorf("store not configured") + } + + config := &iam_pb.S3ApiConfiguration{} + + // Query all users + rows, err := store.db.QueryContext(ctx, "SELECT username, email, account_data, actions FROM users") + if err != nil { + return nil, fmt.Errorf("failed to query users: %v", err) + } + defer rows.Close() + + for rows.Next() { + var username, email string + var accountDataJSON, actionsJSON []byte + + if err := rows.Scan(&username, &email, &accountDataJSON, &actionsJSON); err != nil { + return nil, fmt.Errorf("failed to scan user row: %v", err) + } + + identity := &iam_pb.Identity{ + Name: username, + } + + // Parse account data + if len(accountDataJSON) > 0 { + if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil { + return nil, fmt.Errorf("failed to unmarshal account data for user %s: %v", username, err) + } + } + + // Parse actions + if len(actionsJSON) > 0 { + if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil { + return nil, fmt.Errorf("failed to unmarshal actions for user %s: %v", username, err) + } + } + + // Query credentials for this user + credRows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username) + if err != nil { + return nil, fmt.Errorf("failed to query credentials for user %s: %v", username, err) + } + + for credRows.Next() { + var accessKey, secretKey string + if err := credRows.Scan(&accessKey, &secretKey); err != nil { + credRows.Close() + return nil, fmt.Errorf("failed to scan credential row for user %s: %v", username, err) + } + + identity.Credentials = append(identity.Credentials, &iam_pb.Credential{ + AccessKey: accessKey, + SecretKey: secretKey, + }) + } + credRows.Close() + + config.Identities = append(config.Identities, identity) + } + + return config, nil +} + +func (store *PostgresStore) SaveConfiguration(ctx context.Context, config *iam_pb.S3ApiConfiguration) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + // Start transaction + tx, err := store.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %v", err) + } + defer tx.Rollback() + + // Clear existing data + if _, err := tx.ExecContext(ctx, "DELETE FROM credentials"); err != nil { + return fmt.Errorf("failed to clear credentials: %v", err) + } + if _, err := tx.ExecContext(ctx, "DELETE FROM users"); err != nil { + return fmt.Errorf("failed to clear users: %v", err) + } + + // Insert all identities + for _, identity := range config.Identities { + // Marshal account data + var accountDataJSON []byte + if identity.Account != nil { + accountDataJSON, err = json.Marshal(identity.Account) + if err != nil { + return fmt.Errorf("failed to marshal account data for user %s: %v", identity.Name, err) + } + } + + // Marshal actions + var actionsJSON []byte + if identity.Actions != nil { + actionsJSON, err = json.Marshal(identity.Actions) + if err != nil { + return fmt.Errorf("failed to marshal actions for user %s: %v", identity.Name, err) + } + } + + // Insert user + _, err := tx.ExecContext(ctx, + "INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)", + identity.Name, "", accountDataJSON, actionsJSON) + if err != nil { + return fmt.Errorf("failed to insert user %s: %v", identity.Name, err) + } + + // Insert credentials + for _, cred := range identity.Credentials { + _, err := tx.ExecContext(ctx, + "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)", + identity.Name, cred.AccessKey, cred.SecretKey) + if err != nil { + return fmt.Errorf("failed to insert credential for user %s: %v", identity.Name, err) + } + } + } + + return tx.Commit() +} + +func (store *PostgresStore) CreateUser(ctx context.Context, identity *iam_pb.Identity) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + // Check if user already exists + var count int + err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", identity.Name).Scan(&count) + if err != nil { + return fmt.Errorf("failed to check user existence: %v", err) + } + if count > 0 { + return credential.ErrUserAlreadyExists + } + + // Start transaction + tx, err := store.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %v", err) + } + defer tx.Rollback() + + // Marshal account data + var accountDataJSON []byte + if identity.Account != nil { + accountDataJSON, err = json.Marshal(identity.Account) + if err != nil { + return fmt.Errorf("failed to marshal account data: %v", err) + } + } + + // Marshal actions + var actionsJSON []byte + if identity.Actions != nil { + actionsJSON, err = json.Marshal(identity.Actions) + if err != nil { + return fmt.Errorf("failed to marshal actions: %v", err) + } + } + + // Insert user + _, err = tx.ExecContext(ctx, + "INSERT INTO users (username, email, account_data, actions) VALUES ($1, $2, $3, $4)", + identity.Name, "", accountDataJSON, actionsJSON) + if err != nil { + return fmt.Errorf("failed to insert user: %v", err) + } + + // Insert credentials + for _, cred := range identity.Credentials { + _, err = tx.ExecContext(ctx, + "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)", + identity.Name, cred.AccessKey, cred.SecretKey) + if err != nil { + return fmt.Errorf("failed to insert credential: %v", err) + } + } + + return tx.Commit() +} + +func (store *PostgresStore) GetUser(ctx context.Context, username string) (*iam_pb.Identity, error) { + if !store.configured { + return nil, fmt.Errorf("store not configured") + } + + var email string + var accountDataJSON, actionsJSON []byte + + err := store.db.QueryRowContext(ctx, + "SELECT email, account_data, actions FROM users WHERE username = $1", + username).Scan(&email, &accountDataJSON, &actionsJSON) + if err != nil { + if err == sql.ErrNoRows { + return nil, credential.ErrUserNotFound + } + return nil, fmt.Errorf("failed to query user: %v", err) + } + + identity := &iam_pb.Identity{ + Name: username, + } + + // Parse account data + if len(accountDataJSON) > 0 { + if err := json.Unmarshal(accountDataJSON, &identity.Account); err != nil { + return nil, fmt.Errorf("failed to unmarshal account data: %v", err) + } + } + + // Parse actions + if len(actionsJSON) > 0 { + if err := json.Unmarshal(actionsJSON, &identity.Actions); err != nil { + return nil, fmt.Errorf("failed to unmarshal actions: %v", err) + } + } + + // Query credentials + rows, err := store.db.QueryContext(ctx, "SELECT access_key, secret_key FROM credentials WHERE username = $1", username) + if err != nil { + return nil, fmt.Errorf("failed to query credentials: %v", err) + } + defer rows.Close() + + for rows.Next() { + var accessKey, secretKey string + if err := rows.Scan(&accessKey, &secretKey); err != nil { + return nil, fmt.Errorf("failed to scan credential: %v", err) + } + + identity.Credentials = append(identity.Credentials, &iam_pb.Credential{ + AccessKey: accessKey, + SecretKey: secretKey, + }) + } + + return identity, nil +} + +func (store *PostgresStore) UpdateUser(ctx context.Context, username string, identity *iam_pb.Identity) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + // Start transaction + tx, err := store.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %v", err) + } + defer tx.Rollback() + + // Check if user exists + var count int + err = tx.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count) + if err != nil { + return fmt.Errorf("failed to check user existence: %v", err) + } + if count == 0 { + return credential.ErrUserNotFound + } + + // Marshal account data + var accountDataJSON []byte + if identity.Account != nil { + accountDataJSON, err = json.Marshal(identity.Account) + if err != nil { + return fmt.Errorf("failed to marshal account data: %v", err) + } + } + + // Marshal actions + var actionsJSON []byte + if identity.Actions != nil { + actionsJSON, err = json.Marshal(identity.Actions) + if err != nil { + return fmt.Errorf("failed to marshal actions: %v", err) + } + } + + // Update user + _, err = tx.ExecContext(ctx, + "UPDATE users SET email = $2, account_data = $3, actions = $4, updated_at = CURRENT_TIMESTAMP WHERE username = $1", + username, "", accountDataJSON, actionsJSON) + if err != nil { + return fmt.Errorf("failed to update user: %v", err) + } + + // Delete existing credentials + _, err = tx.ExecContext(ctx, "DELETE FROM credentials WHERE username = $1", username) + if err != nil { + return fmt.Errorf("failed to delete existing credentials: %v", err) + } + + // Insert new credentials + for _, cred := range identity.Credentials { + _, err = tx.ExecContext(ctx, + "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)", + username, cred.AccessKey, cred.SecretKey) + if err != nil { + return fmt.Errorf("failed to insert credential: %v", err) + } + } + + return tx.Commit() +} + +func (store *PostgresStore) DeleteUser(ctx context.Context, username string) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + result, err := store.db.ExecContext(ctx, "DELETE FROM users WHERE username = $1", username) + if err != nil { + return fmt.Errorf("failed to delete user: %v", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %v", err) + } + + if rowsAffected == 0 { + return credential.ErrUserNotFound + } + + return nil +} + +func (store *PostgresStore) ListUsers(ctx context.Context) ([]string, error) { + if !store.configured { + return nil, fmt.Errorf("store not configured") + } + + rows, err := store.db.QueryContext(ctx, "SELECT username FROM users ORDER BY username") + if err != nil { + return nil, fmt.Errorf("failed to query users: %v", err) + } + defer rows.Close() + + var usernames []string + for rows.Next() { + var username string + if err := rows.Scan(&username); err != nil { + return nil, fmt.Errorf("failed to scan username: %v", err) + } + usernames = append(usernames, username) + } + + return usernames, nil +} + +func (store *PostgresStore) GetUserByAccessKey(ctx context.Context, accessKey string) (*iam_pb.Identity, error) { + if !store.configured { + return nil, fmt.Errorf("store not configured") + } + + var username string + err := store.db.QueryRowContext(ctx, "SELECT username FROM credentials WHERE access_key = $1", accessKey).Scan(&username) + if err != nil { + if err == sql.ErrNoRows { + return nil, credential.ErrAccessKeyNotFound + } + return nil, fmt.Errorf("failed to query access key: %v", err) + } + + return store.GetUser(ctx, username) +} + +func (store *PostgresStore) CreateAccessKey(ctx context.Context, username string, cred *iam_pb.Credential) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + // Check if user exists + var count int + err := store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count) + if err != nil { + return fmt.Errorf("failed to check user existence: %v", err) + } + if count == 0 { + return credential.ErrUserNotFound + } + + // Insert credential + _, err = store.db.ExecContext(ctx, + "INSERT INTO credentials (username, access_key, secret_key) VALUES ($1, $2, $3)", + username, cred.AccessKey, cred.SecretKey) + if err != nil { + return fmt.Errorf("failed to insert credential: %v", err) + } + + return nil +} + +func (store *PostgresStore) DeleteAccessKey(ctx context.Context, username string, accessKey string) error { + if !store.configured { + return fmt.Errorf("store not configured") + } + + result, err := store.db.ExecContext(ctx, + "DELETE FROM credentials WHERE username = $1 AND access_key = $2", + username, accessKey) + if err != nil { + return fmt.Errorf("failed to delete access key: %v", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %v", err) + } + + if rowsAffected == 0 { + // Check if user exists + var count int + err = store.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM users WHERE username = $1", username).Scan(&count) + if err != nil { + return fmt.Errorf("failed to check user existence: %v", err) + } + if count == 0 { + return credential.ErrUserNotFound + } + return credential.ErrAccessKeyNotFound + } + + return nil +} + +func (store *PostgresStore) Shutdown() { + if store.db != nil { + store.db.Close() + store.db = nil + } + store.configured = false +} |
