Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wiring context for EVM Keys API #11800

Merged
merged 20 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
84a89df
feature/sql-tracing: adding tracing to our db
patrickhuie19 Jan 17, 2024
b98862f
XSAM otelsql library
patrickhuie19 Jan 24, 2024
cfc5df9
WIP: understanding context prop
patrickhuie19 Jan 25, 2024
b9a2f7c
WIP, wiring ctx through from keys eth create works
patrickhuie19 Jan 29, 2024
31a9659
WIP, wiring CTX through all keys eth APIs
patrickhuie19 Feb 16, 2024
aa064a5
WIP: borked a rebase
patrickhuie19 Feb 16, 2024
3d11fc0
linting + cleanup
patrickhuie19 Feb 20, 2024
19afec9
disabling context-check from linters
patrickhuie19 Feb 20, 2024
7f3b851
further cleanup
patrickhuie19 Feb 20, 2024
a721cca
updating tests with hardcoded mock arguments
patrickhuie19 Feb 20, 2024
db20b5a
updating mocks
patrickhuie19 Feb 20, 2024
e584c84
fixing test from rebase
patrickhuie19 Feb 22, 2024
4334123
runLoop now uses context from stop channel
patrickhuie19 Feb 22, 2024
460958c
Merge branch 'develop' into feature/sql-tracing
patrickhuie19 Feb 22, 2024
2dfda6b
updating runLoop in resender to use internal context
patrickhuie19 Feb 22, 2024
cc71107
wiring context through ServiceForSpec in the job.Delegate interface
patrickhuie19 Feb 23, 2024
60006c8
updating common/txmgr/tracker to use background context internally an…
patrickhuie19 Feb 23, 2024
572e45c
cleans up some dangling contexts, consistency with use of c.Request.C…
patrickhuie19 Feb 26, 2024
7703bd7
cleaning up context.Background's in tests
patrickhuie19 Feb 26, 2024
04e6c87
cleaning up use of *cli.Context instead of Request.Context()
patrickhuie19 Feb 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star
return errors.New("Broadcaster is already started")
}
var err error
eb.enabledAddresses, err = eb.ks.EnabledAddressesForChain(eb.chainID)
eb.enabledAddresses, err = eb.ks.EnabledAddressesForChain(ctx, eb.chainID)
if err != nil {
return fmt.Errorf("Broadcaster: failed to load EnabledAddressesForChain: %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,26 +183,26 @@ func NewConfirmer[
}

// Start is a comment to appease the linter
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(_ context.Context) error {
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx context.Context) error {
return ec.StartOnce("Confirmer", func() error {
if ec.feeConfig.BumpThreshold() == 0 {
ec.lggr.Infow("Gas bumping is disabled (FeeEstimator.BumpThreshold set to 0)", "feeBumpThreshold", 0)
} else {
ec.lggr.Infow(fmt.Sprintf("Fee bumping is enabled, unconfirmed transactions will have their fee bumped every %d blocks", ec.feeConfig.BumpThreshold()), "feeBumpThreshold", ec.feeConfig.BumpThreshold())
}

return ec.startInternal()
return ec.startInternal(ctx)
})
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) startInternal() error {
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) startInternal(ctx context.Context) error {
ec.initSync.Lock()
defer ec.initSync.Unlock()
if ec.isStarted {
return errors.New("Confirmer is already started")
}
var err error
ec.enabledAddresses, err = ec.ks.EnabledAddressesForChain(ec.chainID)
ec.enabledAddresses, err = ec.ks.EnabledAddressesForChain(ctx, ec.chainID)
if err != nil {
return fmt.Errorf("Confirmer: failed to load EnabledAddressesForChain: %w", err)
}
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) For
if overrideGasLimit != 0 {
etx.FeeLimit = overrideGasLimit
}
attempt, _, err := ec.NewCustomTxAttempt(*etx, fee, etx.FeeLimit, 0x0, ec.lggr)
attempt, _, err := ec.NewCustomTxAttempt(ctx, *etx, fee, etx.FeeLimit, 0x0, ec.lggr)
if err != nil {
ec.lggr.Errorw("ForceRebroadcast: failed to create new attempt", "txID", etx.ID, "err", err)
continue
Expand Down
10 changes: 5 additions & 5 deletions common/txmgr/resender.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func NewResender[
}

// Start is a comment which satisfies the linter
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start() {
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx context.Context) {
er.logger.Debugf("Enabled with poll interval of %s and age threshold of %s", er.interval, er.txConfig.ResendAfterThreshold())
go er.runLoop()
}
Expand All @@ -116,7 +116,7 @@ func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Stop() {
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() {
defer close(er.chDone)

if err := er.resendUnconfirmed(); err != nil {
if err := er.resendUnconfirmed(er.ctx); err != nil {
er.logger.Warnw("Failed to resend unconfirmed transactions", "err", err)
}

Expand All @@ -127,15 +127,15 @@ func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
case <-er.ctx.Done():
return
case <-ticker.C:
if err := er.resendUnconfirmed(); err != nil {
if err := er.resendUnconfirmed(er.ctx); err != nil {
er.logger.Warnw("Failed to resend unconfirmed transactions", "err", err)
}
}
}
}

func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) resendUnconfirmed() error {
resendAddresses, err := er.ks.EnabledAddressesForChain(er.chainID)
func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) resendUnconfirmed(ctx context.Context) error {
resendAddresses, err := er.ks.EnabledAddressesForChain(ctx, er.chainID)
if err != nil {
return fmt.Errorf("Resender failed getting enabled keys for chain %s: %w", er.chainID.String(), err)
}
Expand Down
4 changes: 2 additions & 2 deletions common/txmgr/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) XXXT
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestStartInternal() error {
return ec.startInternal()
return ec.startInternal(ec.ctx)
}

func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestCloseInternal() error {
return ec.closeInternal()
}

func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestResendUnconfirmed() error {
return er.resendUnconfirmed()
return er.resendUnconfirmed(er.ctx)
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) XXXTestAbandon(addr ADDR) (err error) {
Expand Down
14 changes: 7 additions & 7 deletions common/txmgr/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,25 @@ func NewTracker[
}
}

func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(_ context.Context) (err error) {
func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx context.Context) (err error) {
tr.lggr.Info("Abandoned transaction tracking enabled")
return tr.StartOnce("Tracker", func() error {
return tr.startInternal()
return tr.startInternal(ctx)
})
}

func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) startInternal() (err error) {
func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) startInternal(ctx context.Context) (err error) {
tr.lock.Lock()
defer tr.lock.Unlock()

tr.ctx, tr.ctxCancel = context.WithCancel(context.Background())

if err := tr.setEnabledAddresses(); err != nil {
if err := tr.setEnabledAddresses(ctx); err != nil {
return fmt.Errorf("failed to set enabled addresses: %w", err)
}
tr.lggr.Info("Enabled addresses set")

if err := tr.trackAbandonedTxes(tr.ctx); err != nil {
if err := tr.trackAbandonedTxes(ctx); err != nil {
return fmt.Errorf("failed to track abandoned txes: %w", err)
}

Expand Down Expand Up @@ -194,8 +194,8 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsStarted()
return tr.isStarted
}

func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) setEnabledAddresses() error {
enabledAddrs, err := tr.keyStore.EnabledAddressesForChain(tr.chainID)
func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) setEnabledAddresses(ctx context.Context) error {
enabledAddrs, err := tr.keyStore.EnabledAddressesForChain(ctx, tr.chainID)
if err != nil {
return fmt.Errorf("failed to get enabled addresses for chain: %w", err)
}
Expand Down
27 changes: 14 additions & 13 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx
}

if b.resender != nil {
b.resender.Start()
b.resender.Start(ctx)
}

if b.fwdMgr != nil {
Expand Down Expand Up @@ -308,10 +308,13 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HealthRepo
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() {
ctx, cancel := b.chStop.NewCtx()
defer cancel()

// eb, ec and keyStates can all be modified by the runloop.
// This is concurrent-safe because the runloop ensures serial access.
defer b.wg.Done()
keysChanged, unsub := b.keyStore.SubscribeToKeyChanges()
keysChanged, unsub := b.keyStore.SubscribeToKeyChanges(ctx)
defer unsub()

close(b.chSubbed)
Expand All @@ -321,7 +324,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()

// execReset is defined as an inline function here because it closes over
// eb, ec and stopped
execReset := func(r *reset) {
execReset := func(ctx context.Context, r *reset) {
// These should always close successfully, since it should be logically
// impossible to enter this code path with ec/eb in a state other than
// "Started"
Expand All @@ -348,8 +351,6 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
wg.Add(2)
go func() {
defer wg.Done()
ctx, cancel := b.chStop.NewCtx()
defer cancel()
// Retry indefinitely on failure
backoff := iutils.NewRedialBackoff()
for {
Expand All @@ -361,7 +362,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
continue
}
return
case <-ctx.Done():
case <-b.chStop:
stopOnce.Do(func() { stopped = true })
return
}
Expand All @@ -374,7 +375,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
for {
select {
case <-time.After(backoff.Duration()):
if err := b.confirmer.startInternal(); err != nil {
if err := b.confirmer.startInternal(ctx); err != nil {
b.logger.Criticalw("Failed to start Confirmer", "err", err)
b.SvcErrBuffer.Append(err)
continue
Expand Down Expand Up @@ -408,7 +409,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
reset.done <- errors.New("Txm was stopped")
continue
}
execReset(&reset)
execReset(ctx, &reset)
case <-b.chStop:
// close and exit
//
Expand Down Expand Up @@ -441,15 +442,15 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
if stopped {
continue
}
enabledAddresses, err := b.keyStore.EnabledAddressesForChain(b.chainID)
enabledAddresses, err := b.keyStore.EnabledAddressesForChain(ctx, b.chainID)
if err != nil {
b.logger.Critical("Failed to reload key states after key change")
b.SvcErrBuffer.Append(err)
continue
}
b.logger.Debugw("Keys changed, reloading", "enabledAddresses", enabledAddresses)

execReset(nil)
execReset(ctx, nil)
}
}
}
Expand Down Expand Up @@ -496,7 +497,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran
}
}

if err = b.checkEnabled(txRequest.FromAddress); err != nil {
if err = b.checkEnabled(ctx, txRequest.FromAddress); err != nil {
return tx, err
}

Expand Down Expand Up @@ -543,8 +544,8 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetForward
return
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) checkEnabled(addr ADDR) error {
if err := b.keyStore.CheckEnabled(addr, b.chainID); err != nil {
func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) checkEnabled(ctx context.Context, addr ADDR) error {
if err := b.keyStore.CheckEnabled(ctx, addr, b.chainID); err != nil {
return fmt.Errorf("cannot send transaction from %s on chain ID %s: %w", addr, b.chainID.String(), err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/types/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type TransactionClient[
) (client.SendTxReturnCode, error)
SendEmptyTransaction(
ctx context.Context,
newTxAttempt func(seq SEQ, feeLimit uint32, fee FEE, fromAddress ADDR) (attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error),
newTxAttempt func(ctx context.Context, seq SEQ, feeLimit uint32, fee FEE, fromAddress ADDR) (attempt TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error),
seq SEQ,
gasLimit uint32,
fee FEE,
Expand Down
8 changes: 5 additions & 3 deletions common/txmgr/types/keystore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package types

import (
"context"

"github.com/smartcontractkit/chainlink/v2/common/types"
)

Expand All @@ -15,7 +17,7 @@ type KeyStore[
// Chain's sequence type. For example, EVM chains use nonce, bitcoin uses UTXO.
SEQ types.Sequence,
] interface {
CheckEnabled(address ADDR, chainID CHAIN_ID) error
EnabledAddressesForChain(chainId CHAIN_ID) ([]ADDR, error)
SubscribeToKeyChanges() (ch chan struct{}, unsub func())
CheckEnabled(ctx context.Context, address ADDR, chainID CHAIN_ID) error
EnabledAddressesForChain(ctx context.Context, chainId CHAIN_ID) ([]ADDR, error)
SubscribeToKeyChanges(ctx context.Context) (ch chan struct{}, unsub func())
}
48 changes: 25 additions & 23 deletions common/txmgr/types/mocks/key_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading