diff --git a/pkg/user/tx_client.go b/pkg/user/tx_client.go index a403a91991..b90005f090 100644 --- a/pkg/user/tx_client.go +++ b/pkg/user/tx_client.go @@ -41,6 +41,12 @@ const ( type Option func(client *TxClient) +type TxPoolTx struct { + Timeout int64 + Nonce uint64 + Signer string +} + // TxResponse is a response from the chain after // a transaction has been submitted. type TxResponse struct { @@ -137,6 +143,7 @@ type TxClient struct { defaultGasPrice float64 defaultAccount string defaultAddress sdktypes.AccAddress + txPool map[string]TxPoolTx } // NewTxClient returns a new signer using the provided keyring @@ -356,6 +363,11 @@ func (client *TxClient) BroadcastTx(ctx context.Context, msgs []sdktypes.Msg, op } func (client *TxClient) broadcastTx(ctx context.Context, txBytes []byte, signer string) (*sdktypes.TxResponse, error) { + // save this in local mempool + txHash := string(txBytes) + client.txPool[txHash] = TxPoolTx{ + Nonce: client.signer.accounts[signer].sequence, + } txClient := sdktx.NewServiceClient(client.grpc) resp, err := txClient.BroadcastTx( ctx, @@ -369,8 +381,8 @@ func (client *TxClient) broadcastTx(ctx context.Context, txBytes []byte, signer } if resp.TxResponse.Code != abci.CodeTypeOK { if apperrors.IsNonceMismatchCode(resp.TxResponse.Code) { - // query the account to update the sequence number on-chain for the account _, seqNum, err := QueryAccount(ctx, client.grpc, client.registry, client.signer.accounts[signer].address) + // query the account to update the sequence number on-chain for the account if err != nil { return nil, fmt.Errorf("querying account for new sequence number: %w\noriginal tx response: %s", err, resp.TxResponse.RawLog) } @@ -384,7 +396,22 @@ func (client *TxClient) broadcastTx(ctx context.Context, txBytes []byte, signer Code: resp.TxResponse.Code, ErrorLog: resp.TxResponse.RawLog, } - return resp.TxResponse, broadcastTxErr + // transaction failed + // check if the signer has other txs in the pool + for txHash, tx := range client.txPool { + if tx.Signer == signer { + // set the time for when the tx failed + 1 minute (they can be resubmitted after this time) + tx.Timeout = time.Now().Add(time.Minute).Unix() + // update the nonce of other txs in the pool + tx.Nonce = seqNum + client.txPool[txHash] = tx + + } + } + // if yes, we need to adjust the nonce of the txs in the pool + // wait for them to be invalidated by their max heights + // and then resubmit the tx + return nil, broadcastTxErr } // after the transaction has been submitted, we can increment the @@ -395,6 +422,20 @@ func (client *TxClient) broadcastTx(ctx context.Context, txBytes []byte, signer return resp.TxResponse, nil } +func (client *TxClient) updateNonces(signer string, newSeqNum uint64) { + client.mtx.Lock() + defer client.mtx.Unlock() + + latestNonce := newSeqNum + for txHash, tx := range client.txPool { + if tx.Signer == signer { + tx.Nonce = latestNonce + client.txPool[txHash] = tx + latestNonce++ + } + } +} + // retryBroadcastingTx creates a new transaction by copying over an existing transaction but creates a new signature with the // new sequence number. It then calls `broadcastTx` and attempts to submit the transaction func (client *TxClient) retryBroadcastingTx(ctx context.Context, txBytes []byte) (*sdktypes.TxResponse, error) {