Skip to content

Commit

Permalink
Reintroduce advisory locking when starting the node (#5215)
Browse files Browse the repository at this point in the history
* postgres: Replace old advisory_lock interface with one that reuses conn

No reason to start a completely separate DB connection just to hold
a lock.

* Reintroduce the advisory lock on RunNode()
  • Loading branch information
archseer committed Oct 18, 2021
1 parent 2ae2215 commit ee26835
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 139 deletions.
58 changes: 58 additions & 0 deletions core/cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"bytes"
"context"
"crypto/tls"
"database/sql"
"encoding/json"
Expand Down Expand Up @@ -86,6 +87,53 @@ type AppFactory interface {
// ChainlinkAppFactory is used to create a new Application.
type ChainlinkAppFactory struct{}

func logRetry(count int) {
if count == 1 {
logger.Infow("Could not get lock, retrying...", "failCount", count)
} else if count%1000 == 0 || count&(count-1) == 0 {
logger.Infow("Still waiting for lock...", "failCount", count)
}
}

// Try to immediately acquire an advisory lock. The lock will be released on application stop.
func AdvisoryLock(ctx context.Context, db *sql.DB, timeout time.Duration) (postgres.Locker, error) {
lockID := int64(1027321974924625846)
lockRetryInterval := time.Second

initCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
lock, err := postgres.NewLock(initCtx, lockID, db)
if err != nil {
return nil, err
}

ticker := time.NewTicker(lockRetryInterval)
defer ticker.Stop()
retryCount := 0
for {
lockCtx, cancel := context.WithTimeout(ctx, timeout)
gotLock, err := lock.Lock(lockCtx)
cancel()
if err != nil {
return nil, err
}
if gotLock {
break
}

select {
case <-ticker.C:
retryCount++
logRetry(retryCount)
continue
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "timeout expired while waiting for lock")
}
}

return &lock, nil
}

// NewApplication returns a new instance of the node with the given config.
func (n ChainlinkAppFactory) NewApplication(cfg config.GeneralConfig) (chainlink.Application, error) {
globalLogger := logger.ProductionLogger(cfg)
Expand All @@ -101,6 +149,15 @@ func (n ChainlinkAppFactory) NewApplication(cfg config.GeneralConfig) (chainlink
if err != nil {
return nil, err
}

// Try to acquire an advisory lock to prevent multiple nodes starting at the same time
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
advisoryLock, err := AdvisoryLock(ctx, db.DB, time.Second)
if err != nil {
return nil, errors.Wrap(err, "error acquiring lock")
}

keyStore := keystore.New(gormDB, utils.GetScryptParams(cfg), globalLogger)
cfg.SetDB(gormDB)

Expand Down Expand Up @@ -186,6 +243,7 @@ func (n ChainlinkAppFactory) NewApplication(cfg config.GeneralConfig) (chainlink
Logger: globalLogger,
ExternalInitiatorManager: externalInitiatorManager,
Version: static.Version,
AdvisoryLock: advisoryLock,
})
}

Expand Down
11 changes: 11 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type ChainlinkApplication struct {
logger logger.Logger
sqlxDB *sqlx.DB
gormDB *gorm.DB
advisoryLock postgres.Locker

started bool
startStopMu sync.Mutex
Expand All @@ -139,6 +140,7 @@ type ApplicationOpts struct {
Logger logger.Logger
ExternalInitiatorManager webhook.ExternalInitiatorManager
Version string
AdvisoryLock postgres.Locker
}

// NewApplication initializes a new store if one is not already
Expand Down Expand Up @@ -307,6 +309,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
sqlxDB: opts.SqlxDB,
gormDB: opts.GormDB,

advisoryLock: opts.AdvisoryLock,

// NOTE: Can keep things clean by putting more things in subservices
// instead of manually start/closing
subservices: subservices,
Expand Down Expand Up @@ -440,6 +444,13 @@ func (app *ChainlinkApplication) stop() (err error) {
merr = multierr.Append(merr, app.FeedsService.Close())
}

// Clean up the advisory lock if present
if app.advisoryLock != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
merr = multierr.Append(merr, app.advisoryLock.Unlock(ctx))
}

// DB should pretty much always be closed last
app.logger.Debug("Closing DB...")
merr = multierr.Append(merr, app.sqlxDB.Close())
Expand Down
177 changes: 38 additions & 139 deletions core/services/postgres/advisory_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,158 +3,57 @@ package postgres
import (
"context"
"database/sql"
"net/url"
"sync"

"github.com/smartcontractkit/chainlink/core/static"
"github.com/smartcontractkit/chainlink/core/store/dialects"

"github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/utils"
)

// NOTE: All advisory lock class IDs used by the Chainlink application MUST be
// kept here to avoid accidental re-use
const (
AdvisoryLockClassID_EthBroadcaster int32 = 0
AdvisoryLockClassID_JobSpawner int32 = 1
AdvisoryLockClassID_EthConfirmer int32 = 2

// ORM takes lock on 1027321974924625846 which splits into ClassID 239192036, ObjID 2840971190
AdvisoryLockClassID_ORM int32 = 239192036

AdvisoryLockObjectID_EthConfirmer int32 = 0
)

//go:generate mockery --name AdvisoryLocker --output ../../internal/mocks/ --case=underscore
type (
postgresAdvisoryLock struct {
URI string
conn *sql.Conn
db *sql.DB
mu *sync.Mutex
}

AdvisoryLocker interface {
Unlock(ctx context.Context, classID int32, objectID int32) error
WithAdvisoryLock(ctx context.Context, classID int32, objectID int32, f func() error) error
Close() error
}
)

func NewAdvisoryLock(uri url.URL) AdvisoryLocker {
static.SetConsumerName(&uri, "AdvisoryLocker")
return &postgresAdvisoryLock{
URI: uri.String(),
mu: &sync.Mutex{},
}
// Locker is an interface for postgresql advisory locks.
type Locker interface {
Lock(ctx context.Context) (bool, error)
WaitAndLock(ctx context.Context) error
Unlock(ctx context.Context) error
}

func (lock *postgresAdvisoryLock) Close() error {
lock.mu.Lock()
defer lock.mu.Unlock()

var connErr, dbErr error

if lock.conn != nil {
connErr = lock.conn.Close()
if connErr == sql.ErrConnDone {
connErr = nil
}
}
if lock.db != nil {
dbErr = lock.db.Close()
if dbErr == sql.ErrConnDone {
dbErr = nil
}
}

lock.db = nil
lock.conn = nil

return multierr.Combine(connErr, dbErr)
// Lock implements the Locker interface.
type Lock struct {
id int64
conn *sql.Conn
}

func (lock *postgresAdvisoryLock) tryLock(ctx context.Context, classID int32, objectID int32) (err error) {
lock.mu.Lock()
defer lock.mu.Unlock()
defer utils.WrapIfError(&err, "TryAdvisoryLock failed")

if lock.conn == nil {
db, err2 := sql.Open(string(dialects.Postgres), lock.URI)
if err2 != nil {
return err2
}
lock.db = db

// `database/sql`.DB does opaque connection pooling, but PG advisory locks are per-connection
conn, err2 := db.Conn(ctx)
if err2 != nil {
logger.ErrorIfCalling(lock.db.Close)
lock.db = nil
return err2
}
lock.conn = conn
}

gotLock := false
if err = lock.conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1, $2)", classID, objectID).Scan(&gotLock); err != nil {
return err
}
if gotLock {
return nil
}
return errors.Errorf("could not get advisory lock for classID, objectID %v, %v", classID, objectID)
// Lock obtains exclusive session level advisory lock if available.
// It’s similar to WaitAndLock, except it will not wait for the lock to become available.
// It will either obtain the lock and return true, or return false if the lock cannot be acquired immediately.
func (l *Lock) Lock(ctx context.Context) (bool, error) {
result := false
sqlQuery := "SELECT pg_try_advisory_lock($1)"
err := l.conn.QueryRowContext(ctx, sqlQuery, l.id).Scan(&result)
return result, err
}

func (lock *postgresAdvisoryLock) Unlock(ctx context.Context, classID int32, objectID int32) error {
lock.mu.Lock()
defer lock.mu.Unlock()

if lock.conn == nil {
return nil
}
_, err := lock.conn.ExecContext(ctx, "SELECT pg_advisory_unlock($1, $2)", classID, objectID)
return errors.Wrap(err, "AdvisoryUnlock failed")
// WaitAndLock obtains exclusive session level advisory lock.
// If another session already holds a lock on the same resource identifier, this function will wait until the resource becomes available.
// Multiple lock requests stack, so that if the resource is locked three times it must then be unlocked three times.
func (l *Lock) WaitAndLock(ctx context.Context) error {
sqlQuery := "SELECT pg_advisory_lock($1)"
_, err := l.conn.ExecContext(ctx, sqlQuery, l.id)
return err
}

func (lock *postgresAdvisoryLock) WithAdvisoryLock(ctx context.Context, classID int32, objectID int32, f func() error) error {
err := lock.tryLock(ctx, classID, objectID)
// Unlock releases the lock and DB connection.
func (l *Lock) Unlock(ctx context.Context) error {
sqlQuery := "SELECT pg_advisory_unlock($1)"
_, err := l.conn.ExecContext(ctx, sqlQuery, l.id)
if err != nil {
return errors.Wrapf(err, "could not get advisory lock for classID, objectID %v, %v", classID, objectID)
return err
}
defer logger.ErrorIfCalling(func() error { return lock.Unlock(ctx, classID, objectID) })
return f()
// Returns the connection to the connection pool
return l.conn.Close()
}

var _ AdvisoryLocker = &NullAdvisoryLocker{}

func NewNullAdvisoryLocker() *NullAdvisoryLocker {
return &NullAdvisoryLocker{}
}

type NullAdvisoryLocker struct {
mu sync.Mutex
closed bool
}

func (n *NullAdvisoryLocker) Close() error {
n.mu.Lock()
defer n.mu.Unlock()
if n.closed {
panic("already closed")
// NewLock returns a Lock with *sql.Conn
func NewLock(ctx context.Context, id int64, db *sql.DB) (Lock, error) {
// Obtain a connection from the DB connection pool and store it and use it for lock and unlock operations
conn, err := db.Conn(ctx)
if err != nil {
return Lock{}, err
}
n.closed = true
return nil
}

func (*NullAdvisoryLocker) Unlock(ctx context.Context, classID int32, objectID int32) error {
return nil
}

func (*NullAdvisoryLocker) WithAdvisoryLock(ctx context.Context, classID int32, objectID int32, f func() error) error {
return f()
return Lock{id: id, conn: conn}, nil
}

0 comments on commit ee26835

Please sign in to comment.