From ee26835d75220b0102aeafca49e3589ae3e4fc7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Mon, 18 Oct 2021 23:18:51 +0900 Subject: [PATCH] Reintroduce advisory locking when starting the node (#5215) * postgres: Replace old advisory_lock interface with one that reuses conn No reason to start a completely separate DB connection just to hold a lock. * Reintroduce the advisory lock on RunNode() --- core/cmd/client.go | 58 ++++++++ core/services/chainlink/application.go | 11 ++ core/services/postgres/advisory_lock.go | 177 +++++------------------- 3 files changed, 107 insertions(+), 139 deletions(-) diff --git a/core/cmd/client.go b/core/cmd/client.go index 5d7463dcc38..441b95b51e3 100644 --- a/core/cmd/client.go +++ b/core/cmd/client.go @@ -2,6 +2,7 @@ package cmd import ( "bytes" + "context" "crypto/tls" "database/sql" "encoding/json" @@ -86,6 +87,53 @@ type AppFactory interface { // ChainlinkAppFactory is used to create a new Application. type ChainlinkAppFactory struct{} +func logRetry(count int) { + if count == 1 { + logger.Infow("Could not get lock, retrying...", "failCount", count) + } else if count%1000 == 0 || count&(count-1) == 0 { + logger.Infow("Still waiting for lock...", "failCount", count) + } +} + +// Try to immediately acquire an advisory lock. The lock will be released on application stop. +func AdvisoryLock(ctx context.Context, db *sql.DB, timeout time.Duration) (postgres.Locker, error) { + lockID := int64(1027321974924625846) + lockRetryInterval := time.Second + + initCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + lock, err := postgres.NewLock(initCtx, lockID, db) + if err != nil { + return nil, err + } + + ticker := time.NewTicker(lockRetryInterval) + defer ticker.Stop() + retryCount := 0 + for { + lockCtx, cancel := context.WithTimeout(ctx, timeout) + gotLock, err := lock.Lock(lockCtx) + cancel() + if err != nil { + return nil, err + } + if gotLock { + break + } + + select { + case <-ticker.C: + retryCount++ + logRetry(retryCount) + continue + case <-ctx.Done(): + return nil, errors.Wrap(ctx.Err(), "timeout expired while waiting for lock") + } + } + + return &lock, nil +} + // NewApplication returns a new instance of the node with the given config. func (n ChainlinkAppFactory) NewApplication(cfg config.GeneralConfig) (chainlink.Application, error) { globalLogger := logger.ProductionLogger(cfg) @@ -101,6 +149,15 @@ func (n ChainlinkAppFactory) NewApplication(cfg config.GeneralConfig) (chainlink if err != nil { return nil, err } + + // Try to acquire an advisory lock to prevent multiple nodes starting at the same time + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + advisoryLock, err := AdvisoryLock(ctx, db.DB, time.Second) + if err != nil { + return nil, errors.Wrap(err, "error acquiring lock") + } + keyStore := keystore.New(gormDB, utils.GetScryptParams(cfg), globalLogger) cfg.SetDB(gormDB) @@ -186,6 +243,7 @@ func (n ChainlinkAppFactory) NewApplication(cfg config.GeneralConfig) (chainlink Logger: globalLogger, ExternalInitiatorManager: externalInitiatorManager, Version: static.Version, + AdvisoryLock: advisoryLock, }) } diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index a8a8ab5a811..8334b3b7c43 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -123,6 +123,7 @@ type ChainlinkApplication struct { logger logger.Logger sqlxDB *sqlx.DB gormDB *gorm.DB + advisoryLock postgres.Locker started bool startStopMu sync.Mutex @@ -139,6 +140,7 @@ type ApplicationOpts struct { Logger logger.Logger ExternalInitiatorManager webhook.ExternalInitiatorManager Version string + AdvisoryLock postgres.Locker } // NewApplication initializes a new store if one is not already @@ -307,6 +309,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) { sqlxDB: opts.SqlxDB, gormDB: opts.GormDB, + advisoryLock: opts.AdvisoryLock, + // NOTE: Can keep things clean by putting more things in subservices // instead of manually start/closing subservices: subservices, @@ -440,6 +444,13 @@ func (app *ChainlinkApplication) stop() (err error) { merr = multierr.Append(merr, app.FeedsService.Close()) } + // Clean up the advisory lock if present + if app.advisoryLock != nil { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + merr = multierr.Append(merr, app.advisoryLock.Unlock(ctx)) + } + // DB should pretty much always be closed last app.logger.Debug("Closing DB...") merr = multierr.Append(merr, app.sqlxDB.Close()) diff --git a/core/services/postgres/advisory_lock.go b/core/services/postgres/advisory_lock.go index 1c19a914b82..3b81b4957b3 100644 --- a/core/services/postgres/advisory_lock.go +++ b/core/services/postgres/advisory_lock.go @@ -3,158 +3,57 @@ package postgres import ( "context" "database/sql" - "net/url" - "sync" - - "github.com/smartcontractkit/chainlink/core/static" - "github.com/smartcontractkit/chainlink/core/store/dialects" - - "github.com/pkg/errors" - "go.uber.org/multierr" - - "github.com/smartcontractkit/chainlink/core/logger" - "github.com/smartcontractkit/chainlink/core/utils" -) - -// NOTE: All advisory lock class IDs used by the Chainlink application MUST be -// kept here to avoid accidental re-use -const ( - AdvisoryLockClassID_EthBroadcaster int32 = 0 - AdvisoryLockClassID_JobSpawner int32 = 1 - AdvisoryLockClassID_EthConfirmer int32 = 2 - - // ORM takes lock on 1027321974924625846 which splits into ClassID 239192036, ObjID 2840971190 - AdvisoryLockClassID_ORM int32 = 239192036 - - AdvisoryLockObjectID_EthConfirmer int32 = 0 -) - -//go:generate mockery --name AdvisoryLocker --output ../../internal/mocks/ --case=underscore -type ( - postgresAdvisoryLock struct { - URI string - conn *sql.Conn - db *sql.DB - mu *sync.Mutex - } - - AdvisoryLocker interface { - Unlock(ctx context.Context, classID int32, objectID int32) error - WithAdvisoryLock(ctx context.Context, classID int32, objectID int32, f func() error) error - Close() error - } ) -func NewAdvisoryLock(uri url.URL) AdvisoryLocker { - static.SetConsumerName(&uri, "AdvisoryLocker") - return &postgresAdvisoryLock{ - URI: uri.String(), - mu: &sync.Mutex{}, - } +// Locker is an interface for postgresql advisory locks. +type Locker interface { + Lock(ctx context.Context) (bool, error) + WaitAndLock(ctx context.Context) error + Unlock(ctx context.Context) error } -func (lock *postgresAdvisoryLock) Close() error { - lock.mu.Lock() - defer lock.mu.Unlock() - - var connErr, dbErr error - - if lock.conn != nil { - connErr = lock.conn.Close() - if connErr == sql.ErrConnDone { - connErr = nil - } - } - if lock.db != nil { - dbErr = lock.db.Close() - if dbErr == sql.ErrConnDone { - dbErr = nil - } - } - - lock.db = nil - lock.conn = nil - - return multierr.Combine(connErr, dbErr) +// Lock implements the Locker interface. +type Lock struct { + id int64 + conn *sql.Conn } -func (lock *postgresAdvisoryLock) tryLock(ctx context.Context, classID int32, objectID int32) (err error) { - lock.mu.Lock() - defer lock.mu.Unlock() - defer utils.WrapIfError(&err, "TryAdvisoryLock failed") - - if lock.conn == nil { - db, err2 := sql.Open(string(dialects.Postgres), lock.URI) - if err2 != nil { - return err2 - } - lock.db = db - - // `database/sql`.DB does opaque connection pooling, but PG advisory locks are per-connection - conn, err2 := db.Conn(ctx) - if err2 != nil { - logger.ErrorIfCalling(lock.db.Close) - lock.db = nil - return err2 - } - lock.conn = conn - } - - gotLock := false - if err = lock.conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1, $2)", classID, objectID).Scan(&gotLock); err != nil { - return err - } - if gotLock { - return nil - } - return errors.Errorf("could not get advisory lock for classID, objectID %v, %v", classID, objectID) +// Lock obtains exclusive session level advisory lock if available. +// It’s similar to WaitAndLock, except it will not wait for the lock to become available. +// It will either obtain the lock and return true, or return false if the lock cannot be acquired immediately. +func (l *Lock) Lock(ctx context.Context) (bool, error) { + result := false + sqlQuery := "SELECT pg_try_advisory_lock($1)" + err := l.conn.QueryRowContext(ctx, sqlQuery, l.id).Scan(&result) + return result, err } -func (lock *postgresAdvisoryLock) Unlock(ctx context.Context, classID int32, objectID int32) error { - lock.mu.Lock() - defer lock.mu.Unlock() - - if lock.conn == nil { - return nil - } - _, err := lock.conn.ExecContext(ctx, "SELECT pg_advisory_unlock($1, $2)", classID, objectID) - return errors.Wrap(err, "AdvisoryUnlock failed") +// WaitAndLock obtains exclusive session level advisory lock. +// If another session already holds a lock on the same resource identifier, this function will wait until the resource becomes available. +// Multiple lock requests stack, so that if the resource is locked three times it must then be unlocked three times. +func (l *Lock) WaitAndLock(ctx context.Context) error { + sqlQuery := "SELECT pg_advisory_lock($1)" + _, err := l.conn.ExecContext(ctx, sqlQuery, l.id) + return err } -func (lock *postgresAdvisoryLock) WithAdvisoryLock(ctx context.Context, classID int32, objectID int32, f func() error) error { - err := lock.tryLock(ctx, classID, objectID) +// Unlock releases the lock and DB connection. +func (l *Lock) Unlock(ctx context.Context) error { + sqlQuery := "SELECT pg_advisory_unlock($1)" + _, err := l.conn.ExecContext(ctx, sqlQuery, l.id) if err != nil { - return errors.Wrapf(err, "could not get advisory lock for classID, objectID %v, %v", classID, objectID) + return err } - defer logger.ErrorIfCalling(func() error { return lock.Unlock(ctx, classID, objectID) }) - return f() + // Returns the connection to the connection pool + return l.conn.Close() } -var _ AdvisoryLocker = &NullAdvisoryLocker{} - -func NewNullAdvisoryLocker() *NullAdvisoryLocker { - return &NullAdvisoryLocker{} -} - -type NullAdvisoryLocker struct { - mu sync.Mutex - closed bool -} - -func (n *NullAdvisoryLocker) Close() error { - n.mu.Lock() - defer n.mu.Unlock() - if n.closed { - panic("already closed") +// NewLock returns a Lock with *sql.Conn +func NewLock(ctx context.Context, id int64, db *sql.DB) (Lock, error) { + // Obtain a connection from the DB connection pool and store it and use it for lock and unlock operations + conn, err := db.Conn(ctx) + if err != nil { + return Lock{}, err } - n.closed = true - return nil -} - -func (*NullAdvisoryLocker) Unlock(ctx context.Context, classID int32, objectID int32) error { - return nil -} - -func (*NullAdvisoryLocker) WithAdvisoryLock(ctx context.Context, classID int32, objectID int32, f func() error) error { - return f() + return Lock{id: id, conn: conn}, nil }