1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
// Package postgres2 provides PostgreSQL filer store implementation with bucket support
// Migrated from github.com/lib/pq to github.com/jackc/pgx for:
// - Active development and support
// - Better performance and PostgreSQL-specific features
// - Improved error handling (no more panics)
// - Built-in logging capabilities
// - Superior SSL certificate support
package postgres2
import (
"context"
"database/sql"
"fmt"
"strconv"
"time"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
"github.com/seaweedfs/seaweedfs/weed/filer/postgres"
"github.com/seaweedfs/seaweedfs/weed/util"
)
var _ filer.BucketAware = (*PostgresStore2)(nil)
func init() {
filer.Stores = append(filer.Stores, &PostgresStore2{})
}
type PostgresStore2 struct {
abstract_sql.AbstractSqlStore
}
func (store *PostgresStore2) GetName() string {
return "postgres2"
}
func (store *PostgresStore2) Initialize(configuration util.Configuration, prefix string) (err error) {
return store.initialize(
configuration.GetString(prefix+"createTable"),
configuration.GetString(prefix+"upsertQuery"),
configuration.GetBool(prefix+"enableUpsert"),
configuration.GetString(prefix+"username"),
configuration.GetString(prefix+"password"),
configuration.GetString(prefix+"hostname"),
configuration.GetInt(prefix+"port"),
configuration.GetString(prefix+"database"),
configuration.GetString(prefix+"schema"),
configuration.GetString(prefix+"sslmode"),
configuration.GetString(prefix+"sslcert"),
configuration.GetString(prefix+"sslkey"),
configuration.GetString(prefix+"sslrootcert"),
configuration.GetString(prefix+"sslcrl"),
configuration.GetBool(prefix+"pgbouncer_compatible"),
configuration.GetInt(prefix+"connection_max_idle"),
configuration.GetInt(prefix+"connection_max_open"),
configuration.GetInt(prefix+"connection_max_lifetime_seconds"),
)
}
func (store *PostgresStore2) initialize(createTable, upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database, schema, sslmode, sslcert, sslkey, sslrootcert, sslcrl string, pgbouncerCompatible bool, maxIdle, maxOpen, maxLifetimeSeconds int) (err error) {
store.SupportBucketTable = true
if !enableUpsert {
upsertQuery = ""
}
store.SqlGenerator = &postgres.SqlGenPostgres{
CreateTableSqlTemplate: createTable,
DropTableSqlTemplate: `drop table "%s"`,
UpsertQueryTemplate: upsertQuery,
}
// pgx-optimized connection string with better timeouts and connection handling
sqlUrl := "connect_timeout=30"
// PgBouncer compatibility: add prefer_simple_protocol=true when needed
// This avoids prepared statement issues with PgBouncer's transaction pooling mode
if pgbouncerCompatible {
sqlUrl += " prefer_simple_protocol=true"
}
if hostname != "" {
sqlUrl += " host=" + hostname
}
if port != 0 {
sqlUrl += " port=" + strconv.Itoa(port)
}
// SSL configuration - pgx provides better SSL support than lib/pq
if sslmode != "" {
sqlUrl += " sslmode=" + sslmode
}
if sslcert != "" {
sqlUrl += " sslcert=" + sslcert
}
if sslkey != "" {
sqlUrl += " sslkey=" + sslkey
}
if sslrootcert != "" {
sqlUrl += " sslrootcert=" + sslrootcert
}
if sslcrl != "" {
sqlUrl += " sslcrl=" + sslcrl
}
if user != "" {
sqlUrl += " user=" + user
}
adaptedSqlUrl := sqlUrl
if password != "" {
sqlUrl += " password=" + password
adaptedSqlUrl += " password=ADAPTED"
}
if database != "" {
sqlUrl += " dbname=" + database
adaptedSqlUrl += " dbname=" + database
}
if schema != "" && !pgbouncerCompatible {
sqlUrl += " search_path=" + schema
adaptedSqlUrl += " search_path=" + schema
}
var dbErr error
store.DB, dbErr = sql.Open("pgx", sqlUrl)
if dbErr != nil {
if store.DB != nil {
store.DB.Close()
}
store.DB = nil
return fmt.Errorf("can not connect to %s error:%v", adaptedSqlUrl, dbErr)
}
store.DB.SetMaxIdleConns(maxIdle)
store.DB.SetMaxOpenConns(maxOpen)
store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
if err = store.DB.Ping(); err != nil {
return fmt.Errorf("connect to %s error:%v", adaptedSqlUrl, err)
}
if err = store.CreateTable(context.Background(), abstract_sql.DEFAULT_TABLE); err != nil {
return fmt.Errorf("init table %s: %v", abstract_sql.DEFAULT_TABLE, err)
}
return nil
}
|