Skip to content

Commit

Permalink
Wiring context for EVM Keys API (#11800)
Browse files Browse the repository at this point in the history
* feature/sql-tracing: adding tracing to our db

* XSAM otelsql library

* WIP: understanding context prop

* WIP, wiring ctx through from keys eth create works

* WIP, wiring CTX through all keys eth APIs

* WIP: borked a rebase

* linting + cleanup

* disabling context-check from linters

* further cleanup

* updating tests with hardcoded mock arguments

* updating mocks

* fixing test from rebase

* runLoop now uses context from stop channel

* updating runLoop in resender to use internal context

* wiring context through ServiceForSpec in the job.Delegate interface

* updating common/txmgr/tracker to use background context internally and parent context for tracker state

* cleans up some dangling contexts, consistency with use of c.Request.Context() in web routes, and marks TODOs where needed

* cleaning up context.Background's in tests

* cleaning up use of *cli.Context instead of Request.Context()
  • Loading branch information
patrickhuie19 authored Feb 26, 2024
1 parent b9f577d commit 64df78e
Show file tree
Hide file tree
Showing 91 changed files with 934 additions and 816 deletions.
2 changes: 1 addition & 1 deletion common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star
return errors.New("Broadcaster is already started")
}
var err error
eb.enabledAddresses, err = eb.ks.EnabledAddressesForChain(eb.chainID)
eb.enabledAddresses, err = eb.ks.EnabledAddressesForChain(ctx, eb.chainID)
if err != nil {
return fmt.Errorf("Broadcaster: failed to load EnabledAddressesForChain: %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,26 +183,26 @@ func NewConfirmer[
}

// Start is a comment to appease the linter
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(_ context.Context) error {
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx context.Context) error {
return ec.StartOnce("Confirmer", func() error {
if ec.feeConfig.BumpThreshold() == 0 {
ec.lggr.Infow("Gas bumping is disabled (FeeEstimator.BumpThreshold set to 0)", "feeBumpThreshold", 0)
} else {
ec.lggr.Infow(fmt.Sprintf("Fee bumping is enabled, unconfirmed transactions will have their fee bumped every %d blocks", ec.feeConfig.BumpThreshold()), "feeBumpThreshold", ec.feeConfig.BumpThreshold())
}

return ec.startInternal()
return ec.startInternal(ctx)
})
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) startInternal() error {
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) startInternal(ctx context.Context) error {
ec.initSync.Lock()
defer ec.initSync.Unlock()
if ec.isStarted {
return errors.New("Confirmer is already started")
}
var err error
ec.enabledAddresses, err = ec.ks.EnabledAddressesForChain(ec.chainID)
ec.enabledAddresses, err = ec.ks.EnabledAddressesForChain(ctx, ec.chainID)
if err != nil {
return fmt.Errorf("Confirmer: failed to load EnabledAddressesForChain: %w", err)
}
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) For
if overrideGasLimit != 0 {
etx.FeeLimit = overrideGasLimit
}
attempt, _, err := ec.NewCustomTxAttempt(*etx, fee, etx.FeeLimit, 0x0, ec.lggr)
attempt, _, err := ec.NewCustomTxAttempt(ctx, *etx, fee, etx.FeeLimit, 0x0, ec.lggr)
if err != nil {
ec.lggr.Errorw("ForceRebroadcast: failed to create new attempt", "txID", etx.ID, "err", err)
continue
Expand Down
10 changes: 5 additions & 5 deletions common/txmgr/resender.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func NewResender[
}

// Start is a comment which satisfies the linter
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start() {
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx context.Context) {
er.logger.Debugf("Enabled with poll interval of %s and age threshold of %s", er.interval, er.txConfig.ResendAfterThreshold())
go er.runLoop()
}
Expand All @@ -116,7 +116,7 @@ func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Stop() {
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() {
defer close(er.chDone)

if err := er.resendUnconfirmed(); err != nil {
if err := er.resendUnconfirmed(er.ctx); err != nil {
er.logger.Warnw("Failed to resend unconfirmed transactions", "err", err)
}

Expand All @@ -127,15 +127,15 @@ func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
case <-er.ctx.Done():
return
case <-ticker.C:
if err := er.resendUnconfirmed(); err != nil {
if err := er.resendUnconfirmed(er.ctx); err != nil {
er.logger.Warnw("Failed to resend unconfirmed transactions", "err", err)
}
}
}
}

func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) resendUnconfirmed() error {
resendAddresses, err := er.ks.EnabledAddressesForChain(er.chainID)
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) resendUnconfirmed(ctx context.Context) error {
resendAddresses, err := er.ks.EnabledAddressesForChain(ctx, er.chainID)
if err != nil {
return fmt.Errorf("Resender failed getting enabled keys for chain %s: %w", er.chainID.String(), err)
}
Expand Down
4 changes: 2 additions & 2 deletions common/txmgr/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXT
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestStartInternal() error {
return ec.startInternal()
return ec.startInternal(ec.ctx)
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestCloseInternal() error {
return ec.closeInternal()
}

func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestResendUnconfirmed() error {
return er.resendUnconfirmed()
return er.resendUnconfirmed(er.ctx)
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestAbandon(addr ADDR) (err error) {
Expand Down
14 changes: 7 additions & 7 deletions common/txmgr/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,25 @@ func NewTracker[
}
}

func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(_ context.Context) (err error) {
func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx context.Context) (err error) {
tr.lggr.Info("Abandoned transaction tracking enabled")
return tr.StartOnce("Tracker", func() error {
return tr.startInternal()
return tr.startInternal(ctx)
})
}

func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) startInternal() (err error) {
func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) startInternal(ctx context.Context) (err error) {
tr.lock.Lock()
defer tr.lock.Unlock()

tr.ctx, tr.ctxCancel = context.WithCancel(context.Background())

if err := tr.setEnabledAddresses(); err != nil {
if err := tr.setEnabledAddresses(ctx); err != nil {
return fmt.Errorf("failed to set enabled addresses: %w", err)
}
tr.lggr.Info("Enabled addresses set")

if err := tr.trackAbandonedTxes(tr.ctx); err != nil {
if err := tr.trackAbandonedTxes(ctx); err != nil {
return fmt.Errorf("failed to track abandoned txes: %w", err)
}

Expand Down Expand Up @@ -194,8 +194,8 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsStarted()
return tr.isStarted
}

func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) setEnabledAddresses() error {
enabledAddrs, err := tr.keyStore.EnabledAddressesForChain(tr.chainID)
func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) setEnabledAddresses(ctx context.Context) error {
enabledAddrs, err := tr.keyStore.EnabledAddressesForChain(ctx, tr.chainID)
if err != nil {
return fmt.Errorf("failed to get enabled addresses for chain: %w", err)
}
Expand Down
27 changes: 14 additions & 13 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
}

if b.resender != nil {
b.resender.Start()
b.resender.Start(ctx)
}

if b.fwdMgr != nil {
Expand Down Expand Up @@ -308,10 +308,13 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HealthRepo
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() {
ctx, cancel := b.chStop.NewCtx()
defer cancel()

// eb, ec and keyStates can all be modified by the runloop.
// This is concurrent-safe because the runloop ensures serial access.
defer b.wg.Done()
keysChanged, unsub := b.keyStore.SubscribeToKeyChanges()
keysChanged, unsub := b.keyStore.SubscribeToKeyChanges(ctx)
defer unsub()

close(b.chSubbed)
Expand All @@ -321,7 +324,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()

// execReset is defined as an inline function here because it closes over
// eb, ec and stopped
execReset := func(r *reset) {
execReset := func(ctx context.Context, r *reset) {
// These should always close successfully, since it should be logically
// impossible to enter this code path with ec/eb in a state other than
// "Started"
Expand All @@ -348,8 +351,6 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
wg.Add(2)
go func() {
defer wg.Done()
ctx, cancel := b.chStop.NewCtx()
defer cancel()
// Retry indefinitely on failure
backoff := iutils.NewRedialBackoff()
for {
Expand All @@ -361,7 +362,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
continue
}
return
case <-ctx.Done():
case <-b.chStop:
stopOnce.Do(func() { stopped = true })
return
}
Expand All @@ -374,7 +375,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
for {
select {
case <-time.After(backoff.Duration()):
if err := b.confirmer.startInternal(); err != nil {
if err := b.confirmer.startInternal(ctx); err != nil {
b.logger.Criticalw("Failed to start Confirmer", "err", err)
b.SvcErrBuffer.Append(err)
continue
Expand Down Expand Up @@ -408,7 +409,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
reset.done <- errors.New("Txm was stopped")
continue
}
execReset(&reset)
execReset(ctx, &reset)
case <-b.chStop:
// close and exit
//
Expand Down Expand Up @@ -441,15 +442,15 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
if stopped {
continue
}
enabledAddresses, err := b.keyStore.EnabledAddressesForChain(b.chainID)
enabledAddresses, err := b.keyStore.EnabledAddressesForChain(ctx, b.chainID)
if err != nil {
b.logger.Critical("Failed to reload key states after key change")
b.SvcErrBuffer.Append(err)
continue
}
b.logger.Debugw("Keys changed, reloading", "enabledAddresses", enabledAddresses)

execReset(nil)
execReset(ctx, nil)
}
}
}
Expand Down Expand Up @@ -496,7 +497,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran
}
}

if err = b.checkEnabled(txRequest.FromAddress); err != nil {
if err = b.checkEnabled(ctx, txRequest.FromAddress); err != nil {
return tx, err
}

Expand Down Expand Up @@ -543,8 +544,8 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetForward
return
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) checkEnabled(addr ADDR) error {
if err := b.keyStore.CheckEnabled(addr, b.chainID); err != nil {
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) checkEnabled(ctx context.Context, addr ADDR) error {
if err := b.keyStore.CheckEnabled(ctx, addr, b.chainID); err != nil {
return fmt.Errorf("cannot send transaction from %s on chain ID %s: %w", addr, b.chainID.String(), err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/types/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type TransactionClient[
) (client.SendTxReturnCode, error)
SendEmptyTransaction(
ctx context.Context,
newTxAttempt func(seq SEQ, feeLimit uint32, fee FEE, fromAddress ADDR) (attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error),
newTxAttempt func(ctx context.Context, seq SEQ, feeLimit uint32, fee FEE, fromAddress ADDR) (attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error),
seq SEQ,
gasLimit uint32,
fee FEE,
Expand Down
8 changes: 5 additions & 3 deletions common/txmgr/types/keystore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package types

import (
"context"

"github.com/smartcontractkit/chainlink/v2/common/types"
)

Expand All @@ -15,7 +17,7 @@ type KeyStore[
// Chain's sequence type. For example, EVM chains use nonce, bitcoin uses UTXO.
SEQ types.Sequence,
] interface {
CheckEnabled(address ADDR, chainID CHAIN_ID) error
EnabledAddressesForChain(chainId CHAIN_ID) ([]ADDR, error)
SubscribeToKeyChanges() (ch chan struct{}, unsub func())
CheckEnabled(ctx context.Context, address ADDR, chainID CHAIN_ID) error
EnabledAddressesForChain(ctx context.Context, chainId CHAIN_ID) ([]ADDR, error)
SubscribeToKeyChanges(ctx context.Context) (ch chan struct{}, unsub func())
}
48 changes: 25 additions & 23 deletions common/txmgr/types/mocks/key_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 64df78e

Please sign in to comment.