Skip to content

Commit

Permalink
Use mutex for tx account sequence query through tx broadcast, track a…
Browse files Browse the repository at this point in the history
…ccount sequence number on chain provider
  • Loading branch information
agouin committed Sep 28, 2022
1 parent 5ac3f63 commit 48399e3
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 22 deletions.
11 changes: 10 additions & 1 deletion relayer/chains/cosmos/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@ func ChainClientConfig(pcfg *CosmosProviderConfig) *lens.ChainClientConfig {
type CosmosProvider struct {
log *zap.Logger

lens.ChainClient
PCfg CosmosProviderConfig

lens.ChainClient
nextAccountSeq uint64
txMu sync.Mutex

// metrics to monitor the provider
TotalFees sdk.Coins
totalFeesMu sync.Mutex
Expand Down Expand Up @@ -147,6 +150,12 @@ func (cc *CosmosProvider) Timeout() string {
return cc.PCfg.Timeout
}

func (cc *CosmosProvider) UpdateNextAccountSequence(seq uint64) {
if seq > cc.nextAccountSeq {
cc.nextAccountSeq = seq
}
}

func (cc *CosmosProvider) AddKey(name string, coinType uint32) (*provider.KeyOutput, error) {
// The lens client returns an equivalent KeyOutput type,
// but that type is declared in the lens module,
Expand Down
67 changes: 46 additions & 21 deletions relayer/chains/cosmos/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"math/big"
"regexp"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -34,6 +36,7 @@ var (
rtyAtt = retry.Attempts(rtyAttNum)
rtyDel = retry.Delay(time.Millisecond * 400)
rtyErr = retry.LastErrorOnly(true)
numRegex = regexp.MustCompile("[0-9]+")
)

// Default IBC settings
Expand Down Expand Up @@ -74,12 +77,23 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela
var resp *sdk.TxResponse
var fees sdk.Coins

// Guard against account sequence number mismatch errors by locking for the specific wallet for
// the account sequence query all the way through the transaction broadcast success/fail.
cc.txMu.Lock()
defer cc.txMu.Unlock()

if err := retry.Do(func() error {
txBytes, f, err := cc.buildMessages(ctx, msgs, memo)
txBytes, sequence, f, err := cc.buildMessages(ctx, msgs, memo)
fees = f
if err != nil {
errMsg := err.Error()

// Account sequence mismatch errors can happen on the simulated transaction also.
if strings.Contains(errMsg, sdkerrors.ErrWrongSequence.Error()) {
cc.handleAccountSequenceMismatchError(err)
return err
}

// Occasionally the client will be out of date,
// and we will receive an RPC error like:
// rpc error: code = InvalidArgument desc = failed to execute message; message index: 1: channel handshake open try failed: failed channel state verification for client (07-tendermint-0): client state height < proof height ({0 58} < {0 59}), please ensure the client has been updated: invalid height: invalid request
Expand Down Expand Up @@ -146,16 +160,17 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela

resp, err = cc.BroadcastTx(ctx, txBytes)
if err != nil {
if err == sdkerrors.ErrWrongSequence {
// Allow retrying if we got an invalid sequence error when attempting to broadcast this tx.
return err
if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) {
cc.handleAccountSequenceMismatchError(err)
}

// Don't retry if BroadcastTx resulted in any other error.
// (This was the previous behavior. Unclear if that is still desired.)
return retry.Unrecoverable(err)
}

// we had a successful tx with this sequence, so update it to the next
cc.UpdateNextAccountSequence(sequence + 1)

return nil
}, retry.Context(ctx), rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) {
cc.log.Info(
Expand Down Expand Up @@ -214,11 +229,11 @@ func parseEventsFromTxResponse(resp *sdk.TxResponse) []provider.RelayerEvent {
return events
}

func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.RelayerMessage, memo string) ([]byte, sdk.Coins, error) {
func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.RelayerMessage, memo string) ([]byte, uint64, sdk.Coins, error) {
// Query account details
txf, err := cc.PrepareFactory(cc.TxFactory())
if err != nil {
return nil, sdk.Coins{}, err
return nil, 0, sdk.Coins{}, err
}

if memo != "" {
Expand All @@ -231,7 +246,7 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel
// If users pass gas adjustment, then calculate gas
_, adjusted, err := cc.CalculateGas(ctx, txf, CosmosMsgs(msgs...)...)
if err != nil {
return nil, sdk.Coins{}, err
return nil, 0, sdk.Coins{}, err
}

// Set the gas amount on the transaction factory
Expand All @@ -246,13 +261,7 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel
}
return nil
}, retry.Context(ctx), rtyAtt, rtyDel, rtyErr); err != nil {
return nil, sdk.Coins{}, err
}

// Attach the signature to the transaction
// Force encoding in the chain specific address
for _, msg := range msgs {
cc.Codec.Marshaler.MustMarshalJSON(CosmosMsg(msg))
return nil, 0, sdk.Coins{}, err
}

done := cc.SetSDKContext()
Expand All @@ -263,27 +272,43 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel
}
return nil
}, retry.Context(ctx), rtyAtt, rtyDel, rtyErr); err != nil {
return nil, sdk.Coins{}, err
return nil, 0, sdk.Coins{}, err
}

done()

fees := txb.GetTx().GetFee()
tx := txb.GetTx()
fees := tx.GetFee()

var txBytes []byte
// Generate the transaction bytes
if err := retry.Do(func() error {
var err error
txBytes, err = cc.Codec.TxConfig.TxEncoder()(txb.GetTx())
txBytes, err = cc.Codec.TxConfig.TxEncoder()(tx)
if err != nil {
return err
}
return nil
}, retry.Context(ctx), rtyAtt, rtyDel, rtyErr); err != nil {
return nil, sdk.Coins{}, err
return nil, 0, sdk.Coins{}, err
}

return txBytes, fees, nil
return txBytes, txf.Sequence(), fees, nil
}

// handleAccountSequenceMismatchError will parse the error string, e.g.:
// "account sequence mismatch, expected 10, got 9: incorrect account sequence"
// and update the next account sequence with the expected value.
func (cc *CosmosProvider) handleAccountSequenceMismatchError(err error) {
sequences := numRegex.FindAllString(err.Error(), -1)
if len(sequences) != 2 {
return
}
nextSeq, err := strconv.ParseUint(sequences[0], 10, 64)
if err != nil {
return
}
cc.UpdateNextAccountSequence(nextSeq)
}

// MsgCreateClient creates an sdk.Msg to update the client on src with consensus state from dst
Expand Down Expand Up @@ -1051,7 +1076,7 @@ func castClientStateToTMType(cs *codectypes.Any) (*tmclient.ClientState, error)
return clientState, nil
}

//DefaultUpgradePath is the default IBC upgrade path set for an on-chain light client
// DefaultUpgradePath is the default IBC upgrade path set for an on-chain light client
var defaultUpgradePath = []string{"upgrade", "upgradedIBCState"}

// NewClientState creates a new tendermint client state tracking the dst chain.
Expand Down

0 comments on commit 48399e3

Please sign in to comment.