From 49f1bf3ba296f0e3dfc01d5a3d371f82f159dc4a Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Thu, 16 May 2024 06:49:12 -0500 Subject: [PATCH] pg connection cleanup (#13144) --- .changeset/big-windows-lie.md | 5 ++ .golangci.yml | 4 ++ core/internal/testutils/pgtest/txdb.go | 2 +- core/services/pg/connection.go | 74 ++++++++++++++++---------- go.mod | 4 +- 5 files changed, 58 insertions(+), 31 deletions(-) create mode 100644 .changeset/big-windows-lie.md diff --git a/.changeset/big-windows-lie.md b/.changeset/big-windows-lie.md new file mode 100644 index 00000000000..aa81a75c6ee --- /dev/null +++ b/.changeset/big-windows-lie.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +improve handling of postgres connection settings and driver versions #db diff --git a/.golangci.yml b/.golangci.yml index 2902503ed20..96a7de282e0 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -89,6 +89,10 @@ linters-settings: desc: Use the standard library instead - pkg: github.com/gofrs/uuid desc: Use github.com/google/uuid instead + - pkg: github.com/jackc/pgx3 + desc: Use github.com/jackc/pgx4 instead + - pkg: github.com/jackc/pgx5 + desc: Use github.com/jackc/pgx4 instead - pkg: github.com/satori/go.uuid desc: Use github.com/google/uuid instead - pkg: github.com/test-go/testify/assert diff --git a/core/internal/testutils/pgtest/txdb.go b/core/internal/testutils/pgtest/txdb.go index f4640946ad4..7591054305c 100644 --- a/core/internal/testutils/pgtest/txdb.go +++ b/core/internal/testutils/pgtest/txdb.go @@ -89,7 +89,7 @@ func (d *txDriver) Open(dsn string) (driver.Conn, error) { defer d.Unlock() // Open real db connection if its the first call if d.db == nil { - db, err := sql.Open("pgx", d.dbURL) + db, err := sql.Open(string(dialects.Postgres), d.dbURL) if err != nil { return nil, err } diff --git a/core/services/pg/connection.go b/core/services/pg/connection.go index e8b6f3af429..b76c9bc10ed 100644 --- a/core/services/pg/connection.go +++ b/core/services/pg/connection.go @@ -7,16 +7,17 @@ import ( "os" "time" + "github.com/XSAM/otelsql" "github.com/google/uuid" - _ "github.com/jackc/pgx/v4/stdlib" // need to make sure pgx driver is registered before opening connection + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/stdlib" "github.com/jmoiron/sqlx" "github.com/scylladb/go-reflectx" "go.opentelemetry.io/otel" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "golang.org/x/net/context" "github.com/smartcontractkit/chainlink/v2/core/store/dialects" - - "github.com/XSAM/otelsql" ) // NOTE: This is the default level in Postgres anyway, we just make it @@ -48,21 +49,8 @@ type ConnectionConfig interface { MaxIdleConns() int } -func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (db *sqlx.DB, err error) { - if dialect == dialects.TransactionWrappedPostgres { - // Dbtx uses the uri as a unique identifier for each transaction. Each ORM - // should be encapsulated in it's own transaction, and thus needs its own - // unique id. - // - // We can happily throw away the original uri here because if we are using - // txdb it should have already been set at the point where we called - // txdb.Register - uri = uuid.New().String() - } - - // Initialize sql/sqlx - sqldb, err := otelsql.Open(string(dialect), uri, - otelsql.WithAttributes(semconv.DBSystemPostgreSQL), +func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (*sqlx.DB, error) { + opts := []otelsql.Option{otelsql.WithAttributes(semconv.DBSystemPostgreSQL), otelsql.WithTracerProvider(otel.GetTracerProvider()), otelsql.WithSQLCommenter(true), otelsql.WithSpanOptions(otelsql.SpanOptions{ @@ -71,22 +59,52 @@ func NewConnection(uri string, dialect dialects.DialectName, config ConnectionCo OmitRows: true, OmitConnectorConnect: true, OmitConnQuery: false, - }), - ) - if err != nil { - return nil, err - } - db = sqlx.NewDb(sqldb, string(dialect)) - db.MapperFunc(reflectx.CamelToSnakeASCII) + })} // Set default connection options lockTimeout := config.DefaultLockTimeout().Milliseconds() idleInTxSessionTimeout := config.DefaultIdleInTxSessionTimeout().Milliseconds() - stmt := fmt.Sprintf(`SET TIME ZONE 'UTC'; SET lock_timeout = %d; SET idle_in_transaction_session_timeout = %d; SET default_transaction_isolation = %q`, + connParams := fmt.Sprintf(`SET TIME ZONE 'UTC'; SET lock_timeout = %d; SET idle_in_transaction_session_timeout = %d; SET default_transaction_isolation = %q`, lockTimeout, idleInTxSessionTimeout, defaultIsolation.String()) - if _, err = db.Exec(stmt); err != nil { - return nil, err + + var sqldb *sql.DB + if dialect == dialects.TransactionWrappedPostgres { + // Dbtx uses the uri as a unique identifier for each transaction. Each ORM + // should be encapsulated in it's own transaction, and thus needs its own + // unique id. + // + // We can happily throw away the original uri here because if we are using + // txdb it should have already been set at the point where we called + // txdb.Register + var err error + sqldb, err = otelsql.Open(string(dialect), uuid.New().String(), opts...) + if err != nil { + return nil, fmt.Errorf("failed to open txdb: %w", err) + } + _, err = sqldb.Exec(connParams) + if err != nil { + return nil, fmt.Errorf("failed to set options: %w", err) + } + } else { + // Set sane defaults for every new database connection. + // Those can be overridden with Txn options or SET statements in individual connections. + // The default values are the same for Txns. + connConfig, err := pgx.ParseConfig(uri) + if err != nil { + return nil, fmt.Errorf("database: failed to parse config: %w", err) + } + + connector := stdlib.GetConnector(*connConfig, stdlib.OptionAfterConnect(func(ctx context.Context, c *pgx.Conn) (err error) { + _, err = c.Exec(ctx, connParams) + return + })) + + // Initialize sql/sqlx + sqldb = otelsql.OpenDB(connector, opts...) } + db := sqlx.NewDb(sqldb, string(dialect)) + db.MapperFunc(reflectx.CamelToSnakeASCII) + setMaxConns(db, config) if os.Getenv("SKIP_PG_VERSION_CHECK") != "true" { diff --git a/go.mod b/go.mod index 4250c0fef43..f46e4b78333 100644 --- a/go.mod +++ b/go.mod @@ -101,6 +101,7 @@ require ( golang.org/x/crypto v0.22.0 golang.org/x/exp v0.0.0-20240213143201-ec583247a57a golang.org/x/mod v0.15.0 + golang.org/x/net v0.24.0 golang.org/x/sync v0.6.0 golang.org/x/term v0.19.0 golang.org/x/text v0.14.0 @@ -111,6 +112,7 @@ require ( google.golang.org/protobuf v1.33.0 gopkg.in/guregu/null.v4 v4.0.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 + sigs.k8s.io/yaml v1.4.0 ) require ( @@ -324,7 +326,6 @@ require ( go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/ratelimit v0.3.0 // indirect golang.org/x/arch v0.7.0 // indirect - golang.org/x/net v0.24.0 // indirect golang.org/x/sys v0.19.0 // indirect google.golang.org/api v0.149.0 // indirect google.golang.org/genproto v0.0.0-20231030173426-d783a09b4405 // indirect @@ -336,7 +337,6 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect pgregory.net/rapid v0.5.5 // indirect rsc.io/tmplfunc v0.0.3 // indirect - sigs.k8s.io/yaml v1.4.0 ) replace (