1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
// Package postgres provides PostgreSQL filer store implementation
// Migrated from github.com/lib/pq to github.com/jackc/pgx for:
// - Active development and support
// - Better performance and PostgreSQL-specific features
// - Improved error handling (no more panics)
// - Built-in logging capabilities
// - Superior SSL certificate support
package postgres
import (
"database/sql"
"fmt"
"strconv"
"time"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func init() {
filer.Stores = append(filer.Stores, &PostgresStore{})
}
type PostgresStore struct {
abstract_sql.AbstractSqlStore
}
func (store *PostgresStore) GetName() string {
return "postgres"
}
func (store *PostgresStore) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
configuration.GetString(prefix+"upsertQuery"),
configuration.GetBool(prefix+"enableUpsert"),
configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"),
configuration.GetString(prefix+"hostname"),
configuration.GetInt(prefix+"port"),
configuration.GetString(prefix+"database"),
configuration.GetString(prefix+"schema"),
configuration.GetString(prefix+"sslmode"),
configuration.GetString(prefix+"sslcert"),
configuration.GetString(prefix+"sslkey"),
configuration.GetString(prefix+"sslrootcert"),
configuration.GetString(prefix+"sslcrl"),
configuration.GetBool(prefix+"pgbouncer_compatible"),
configuration.GetInt(prefix+"connection_max_idle"),
configuration.GetInt(prefix+"connection_max_open"),
configuration.GetInt(prefix+"connection_max_lifetime_seconds"),
)
}
func (store *PostgresStore) initialize(upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database, schema, sslmode, sslcert, sslkey, sslrootcert, sslcrl string, pgbouncerCompatible bool, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) {
store.SupportBucketTable = false
if !enableUpsert {
upsertQuery = ""
}
store.SqlGenerator = &SqlGenPostgres{
CreateTableSqlTemplate: "",
DropTableSqlTemplate: `drop table "%s"`,
UpsertQueryTemplate: upsertQuery,
}
// pgx-optimized connection string with better timeouts and connection handling
sqlUrl := "connect_timeout=30"
// PgBouncer compatibility: add prefer_simple_protocol=true when needed
// This avoids prepared statement issues with PgBouncer's transaction pooling mode
if pgbouncerCompatible {
sqlUrl += " prefer_simple_protocol=true"
}
if hostname != "" {
sqlUrl += " host=" + hostname
}
if port != 0 {
sqlUrl += " port=" + strconv.Itoa(port)
}
// SSL configuration - pgx provides better SSL support than lib/pq
if sslmode != "" {
sqlUrl += " sslmode=" + sslmode
}
if sslcert != "" {
sqlUrl += " sslcert=" + sslcert
}
if sslkey != "" {
sqlUrl += " sslkey=" + sslkey
}
if sslrootcert != "" {
sqlUrl += " sslrootcert=" + sslrootcert
}
if sslcrl != "" {
sqlUrl += " sslcrl=" + sslcrl
}
if user != "" {
sqlUrl += " user=" + user
}
adaptedSqlUrl := sqlUrl
if password != "" {
sqlUrl += " password=" + password
adaptedSqlUrl += " password=ADAPTED"
}
if database != "" {
sqlUrl += " dbname=" + database
adaptedSqlUrl += " dbname=" + database
}
if schema != "" && !pgbouncerCompatible {
sqlUrl += " search_path=" + schema
adaptedSqlUrl += " search_path=" + schema
}
var dbErr error
store.DB, dbErr = sql.Open("pgx", sqlUrl)
if dbErr != nil {
if store.DB != nil {
store.DB.Close()
}
store.DB = nil
return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, dbErr)
}
store.DB.SetMaxIdleConns(maxIdle)
store.DB.SetMaxOpenConns(maxOpen)
store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
if err = store.DB.Ping(); err != nil {
return fmt.Errorf("connect to %s error:%v", adaptedSqlUrl, err)
}
return nil
}
|