Skip to content

Commit

Permalink
Reintroduce the advisory lock on RunNode()
Browse files Browse the repository at this point in the history
  • Loading branch information
archseer committed Oct 15, 2021
1 parent c8a946f commit e8103fd
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 1 deletion.
58 changes: 58 additions & 0 deletions core/cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"bytes"
"context"
"crypto/tls"
"database/sql"
"encoding/json"
Expand Down Expand Up @@ -87,6 +88,53 @@ type AppFactory interface {
// ChainlinkAppFactory is used to create a new Application.
type ChainlinkAppFactory struct{}

func logRetry(count int) {
if count == 1 {
logger.Infow("Could not get lock, retrying...", "failCount", count)
} else if count%1000 == 0 || count&(count-1) == 0 {
logger.Infow("Still waiting for lock...", "failCount", count)
}
}

// Try to immediately acquire an advisory lock. The lock will be released on application stop.
func AdvisoryLock(ctx context.Context, db *sql.DB, timeout time.Duration) (postgres.Locker, error) {
lockID := int64(1027321974924625846)
lockRetryInterval := time.Second

initCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
lock, err := postgres.NewLock(initCtx, lockID, db)
if err != nil {
return nil, err
}

ticker := time.NewTicker(lockRetryInterval)
defer ticker.Stop()
retryCount := 0
for {
lockCtx, cancel := context.WithTimeout(ctx, timeout)
gotLock, err := lock.Lock(lockCtx)
cancel()
if err != nil {
return nil, err
}
if gotLock {
break
}

select {
case <-ticker.C:
retryCount++
logRetry(retryCount)
continue
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "timeout expired while waiting for lock")
}
}

return &lock, nil
}

// NewApplication returns a new instance of the node with the given config.
func (n ChainlinkAppFactory) NewApplication(cfg config.GeneralConfig) (chainlink.Application, error) {
globalLogger := logger.ProductionLogger(cfg)
Expand All @@ -102,6 +150,15 @@ func (n ChainlinkAppFactory) NewApplication(cfg config.GeneralConfig) (chainlink
if err != nil {
return nil, err
}

// Try to acquire an advisory lock to prevent multiple nodes starting at the same time
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
advisoryLock, err := AdvisoryLock(ctx, db.DB, time.Second)
if err != nil {
return nil, errors.Wrap(err, "error acquiring lock")
}

keyStore := keystore.New(gormDB, utils.GetScryptParams(cfg), globalLogger)
cfg.SetDB(gormDB)

Expand Down Expand Up @@ -180,6 +237,7 @@ func (n ChainlinkAppFactory) NewApplication(cfg config.GeneralConfig) (chainlink
EventBroadcaster: eventBroadcaster,
Logger: globalLogger,
ExternalInitiatorManager: externalInitiatorManager,
AdvisoryLock: advisoryLock,
})
}

Expand Down
11 changes: 11 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type ChainlinkApplication struct {
logger logger.Logger
sqlxDB *sqlx.DB
gormDB *gorm.DB
advisoryLock postgres.Locker

started bool
startStopMu sync.Mutex
Expand All @@ -140,6 +141,7 @@ type ApplicationOpts struct {
ChainSet evm.ChainSet
Logger logger.Logger
ExternalInitiatorManager webhook.ExternalInitiatorManager
AdvisoryLock postgres.Locker
}

// NewApplication initializes a new store if one is not already
Expand Down Expand Up @@ -311,6 +313,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
sqlxDB: opts.SqlxDB,
gormDB: opts.GormDB,

advisoryLock: opts.AdvisoryLock,

// NOTE: Can keep things clean by putting more things in subservices
// instead of manually start/closing
subservices: subservices,
Expand Down Expand Up @@ -448,6 +452,13 @@ func (app *ChainlinkApplication) stop() (err error) {
merr = multierr.Append(merr, app.FeedsService.Close())
}

// Clean up the advisory lock if present
if app.advisoryLock != nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
merr = multierr.Append(merr, app.advisoryLock.Unlock(ctx))
}

// DB should pretty much always be closed last
app.logger.Debug("Closing DB...")
merr = multierr.Append(merr, app.sqlxDB.Close())
Expand Down
2 changes: 1 addition & 1 deletion core/services/postgres/advisory_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ func NewLock(ctx context.Context, id int64, db *sql.DB) (Lock, error) {
return Lock{}, err
}
return Lock{id: id, conn: conn}, nil
}
}

0 comments on commit e8103fd

Please sign in to comment.