From e8103fd020fb620f18ca0973e07b9cfd15572b16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Hrastnik?= Date: Tue, 12 Oct 2021 17:10:21 +0900 Subject: [PATCH] Reintroduce the advisory lock on RunNode() --- core/cmd/client.go | 58 +++++++++++++++++++++++++ core/services/chainlink/application.go | 11 +++++ core/services/postgres/advisory_lock.go | 2 +- 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/core/cmd/client.go b/core/cmd/client.go index 7e4257ebe8f..acddc9e8593 100644 --- a/core/cmd/client.go +++ b/core/cmd/client.go @@ -2,6 +2,7 @@ package cmd import ( "bytes" + "context" "crypto/tls" "database/sql" "encoding/json" @@ -87,6 +88,53 @@ type AppFactory interface { // ChainlinkAppFactory is used to create a new Application. type ChainlinkAppFactory struct{} +func logRetry(count int) { + if count == 1 { + logger.Infow("Could not get lock, retrying...", "failCount", count) + } else if count%1000 == 0 || count&(count-1) == 0 { + logger.Infow("Still waiting for lock...", "failCount", count) + } +} + +// Try to immediately acquire an advisory lock. The lock will be released on application stop. +func AdvisoryLock(ctx context.Context, db *sql.DB, timeout time.Duration) (postgres.Locker, error) { + lockID := int64(1027321974924625846) + lockRetryInterval := time.Second + + initCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + lock, err := postgres.NewLock(initCtx, lockID, db) + if err != nil { + return nil, err + } + + ticker := time.NewTicker(lockRetryInterval) + defer ticker.Stop() + retryCount := 0 + for { + lockCtx, cancel := context.WithTimeout(ctx, timeout) + gotLock, err := lock.Lock(lockCtx) + cancel() + if err != nil { + return nil, err + } + if gotLock { + break + } + + select { + case <-ticker.C: + retryCount++ + logRetry(retryCount) + continue + case <-ctx.Done(): + return nil, errors.Wrap(ctx.Err(), "timeout expired while waiting for lock") + } + } + + return &lock, nil +} + // NewApplication returns a new instance of the node with the given config. func (n ChainlinkAppFactory) NewApplication(cfg config.GeneralConfig) (chainlink.Application, error) { globalLogger := logger.ProductionLogger(cfg) @@ -102,6 +150,15 @@ func (n ChainlinkAppFactory) NewApplication(cfg config.GeneralConfig) (chainlink if err != nil { return nil, err } + + // Try to acquire an advisory lock to prevent multiple nodes starting at the same time + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + advisoryLock, err := AdvisoryLock(ctx, db.DB, time.Second) + if err != nil { + return nil, errors.Wrap(err, "error acquiring lock") + } + keyStore := keystore.New(gormDB, utils.GetScryptParams(cfg), globalLogger) cfg.SetDB(gormDB) @@ -180,6 +237,7 @@ func (n ChainlinkAppFactory) NewApplication(cfg config.GeneralConfig) (chainlink EventBroadcaster: eventBroadcaster, Logger: globalLogger, ExternalInitiatorManager: externalInitiatorManager, + AdvisoryLock: advisoryLock, }) } diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 461f3b9c14a..ad92729be0f 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -125,6 +125,7 @@ type ChainlinkApplication struct { logger logger.Logger sqlxDB *sqlx.DB gormDB *gorm.DB + advisoryLock postgres.Locker started bool startStopMu sync.Mutex @@ -140,6 +141,7 @@ type ApplicationOpts struct { ChainSet evm.ChainSet Logger logger.Logger ExternalInitiatorManager webhook.ExternalInitiatorManager + AdvisoryLock postgres.Locker } // NewApplication initializes a new store if one is not already @@ -311,6 +313,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) { sqlxDB: opts.SqlxDB, gormDB: opts.GormDB, + advisoryLock: opts.AdvisoryLock, + // NOTE: Can keep things clean by putting more things in subservices // instead of manually start/closing subservices: subservices, @@ -448,6 +452,13 @@ func (app *ChainlinkApplication) stop() (err error) { merr = multierr.Append(merr, app.FeedsService.Close()) } + // Clean up the advisory lock if present + if app.advisoryLock != nil { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + merr = multierr.Append(merr, app.advisoryLock.Unlock(ctx)) + } + // DB should pretty much always be closed last app.logger.Debug("Closing DB...") merr = multierr.Append(merr, app.sqlxDB.Close()) diff --git a/core/services/postgres/advisory_lock.go b/core/services/postgres/advisory_lock.go index 0d073fc05fa..3b81b4957b3 100644 --- a/core/services/postgres/advisory_lock.go +++ b/core/services/postgres/advisory_lock.go @@ -56,4 +56,4 @@ func NewLock(ctx context.Context, id int64, db *sql.DB) (Lock, error) { return Lock{}, err } return Lock{id: id, conn: conn}, nil -} \ No newline at end of file +}