From 4e4e9530800d28fb2c984f1cfc7b03f05eec618c Mon Sep 17 00:00:00 2001 From: Reece Williams <31943163+Reecepbcups@users.noreply.github.com> Date: Mon, 16 Sep 2024 12:51:05 -0500 Subject: [PATCH] refactor: alternate consensus engines (#1490) * feat: `ConsensusClient` interface & comet integration * chore: cometbft bytes lib * refactor: migrate to ConsensusClient * rename * simplify * refactor: just use cometbft bytes. helper lib for now * refactor: rename rclient -> cclient (consensus client) * remove WithClient --- .../cmbft_client_wrapper.go | 45 +++--- cclient/cmbft_consensus.go | 153 ++++++++++++++++++ cclient/consensus.go | 94 +++++++++++ interchaintest/feegrant_test.go | 10 +- .../chains/cosmos/cosmos_chain_processor.go | 12 +- relayer/chains/cosmos/fee_market.go | 10 +- relayer/chains/cosmos/feegrant.go | 2 +- relayer/chains/cosmos/provider.go | 40 ++--- relayer/chains/cosmos/query.go | 27 ++-- relayer/chains/cosmos/tx.go | 15 +- .../penumbra/penumbra_chain_processor.go | 2 +- relayer/chains/penumbra/provider.go | 32 ++-- relayer/chains/penumbra/query.go | 18 +-- relayer/chains/penumbra/tx.go | 14 +- 14 files changed, 359 insertions(+), 115 deletions(-) rename client/client_wrapper.go => cclient/cmbft_client_wrapper.go (90%) create mode 100644 cclient/cmbft_consensus.go create mode 100644 cclient/consensus.go diff --git a/client/client_wrapper.go b/cclient/cmbft_client_wrapper.go similarity index 90% rename from client/client_wrapper.go rename to cclient/cmbft_client_wrapper.go index 9825dffae..9338060f8 100644 --- a/client/client_wrapper.go +++ b/cclient/cmbft_client_wrapper.go @@ -1,4 +1,4 @@ -package client +package cclient import ( "context" @@ -28,17 +28,17 @@ import ( types2 "github.com/strangelove-ventures/cometbft-client/types" ) -// RPCClient wraps our slimmed down CometBFT client and converts the returned types to the upstream CometBFT types. +// CometRPCClient wraps our slimmed down CometBFT client and converts the returned types to the upstream CometBFT types. // This is useful so that it can be used in any function calls that expect the upstream types. -type RPCClient struct { +type CometRPCClient struct { c *client.Client } -func NewRPCClient(c *client.Client) RPCClient { - return RPCClient{c: c} +func NewCometRPCClient(c *client.Client) CometRPCClient { + return CometRPCClient{c: c} } -func (r RPCClient) ABCIInfo(ctx context.Context) (*coretypes.ResultABCIInfo, error) { +func (r CometRPCClient) ABCIInfo(ctx context.Context) (*coretypes.ResultABCIInfo, error) { res, err := r.c.ABCIInfo(ctx) if err != nil { return nil, err @@ -55,7 +55,7 @@ func (r RPCClient) ABCIInfo(ctx context.Context) (*coretypes.ResultABCIInfo, err }, nil } -func (r RPCClient) ABCIQuery( +func (r CometRPCClient) ABCIQuery( ctx context.Context, path string, data bytes.HexBytes, @@ -68,7 +68,7 @@ func (r RPCClient) ABCIQuery( return convertResultABCIQuery(res), nil } -func (r RPCClient) ABCIQueryWithOptions( +func (r CometRPCClient) ABCIQueryWithOptions( ctx context.Context, path string, data bytes.HexBytes, @@ -87,7 +87,7 @@ func (r RPCClient) ABCIQueryWithOptions( return convertResultABCIQuery(res), nil } -func (r RPCClient) BroadcastTxCommit(ctx context.Context, tx tmtypes.Tx) (*coretypes.ResultBroadcastTxCommit, error) { +func (r CometRPCClient) BroadcastTxCommit(ctx context.Context, tx tmtypes.Tx) (*coretypes.ResultBroadcastTxCommit, error) { res, err := r.c.BroadcastTxCommit(ctx, types2.Tx(tx)) if err != nil { return nil, err @@ -119,7 +119,7 @@ func (r RPCClient) BroadcastTxCommit(ctx context.Context, tx tmtypes.Tx) (*coret }, nil } -func (r RPCClient) BroadcastTxAsync(ctx context.Context, tx tmtypes.Tx) (*coretypes.ResultBroadcastTx, error) { +func (r CometRPCClient) BroadcastTxAsync(ctx context.Context, tx tmtypes.Tx) (*coretypes.ResultBroadcastTx, error) { res, err := r.c.BroadcastTxAsync(ctx, types2.Tx(tx)) if err != nil { return nil, err @@ -134,7 +134,7 @@ func (r RPCClient) BroadcastTxAsync(ctx context.Context, tx tmtypes.Tx) (*corety }, nil } -func (r RPCClient) BroadcastTxSync(ctx context.Context, tx tmtypes.Tx) (*coretypes.ResultBroadcastTx, error) { +func (r CometRPCClient) BroadcastTxSync(ctx context.Context, tx tmtypes.Tx) (*coretypes.ResultBroadcastTx, error) { res, err := r.c.BroadcastTxSync(ctx, types2.Tx(tx)) if err != nil { return nil, err @@ -149,7 +149,7 @@ func (r RPCClient) BroadcastTxSync(ctx context.Context, tx tmtypes.Tx) (*coretyp }, nil } -func (r RPCClient) Validators( +func (r CometRPCClient) Validators( ctx context.Context, height *int64, page, perPage *int, @@ -177,7 +177,7 @@ func (r RPCClient) Validators( }, nil } -func (r RPCClient) Status(ctx context.Context) (*coretypes.ResultStatus, error) { +func (r CometRPCClient) Status(ctx context.Context) (*coretypes.ResultStatus, error) { res, err := r.c.Status(ctx) if err != nil { return nil, err @@ -220,7 +220,7 @@ func (r RPCClient) Status(ctx context.Context) (*coretypes.ResultStatus, error) }, nil } -func (r RPCClient) Block(ctx context.Context, height *int64) (*coretypes.ResultBlock, error) { +func (r CometRPCClient) Block(ctx context.Context, height *int64) (*coretypes.ResultBlock, error) { res, err := r.c.Block(ctx, height) if err != nil { return nil, err @@ -232,7 +232,7 @@ func (r RPCClient) Block(ctx context.Context, height *int64) (*coretypes.ResultB }, nil } -func (r RPCClient) BlockByHash(ctx context.Context, hash []byte) (*coretypes.ResultBlock, error) { +func (r CometRPCClient) BlockByHash(ctx context.Context, hash []byte) (*coretypes.ResultBlock, error) { res, err := r.c.BlockByHash(ctx, hash) if err != nil { return nil, err @@ -244,7 +244,7 @@ func (r RPCClient) BlockByHash(ctx context.Context, hash []byte) (*coretypes.Res }, nil } -func (r RPCClient) BlockResults(ctx context.Context, height *int64) (*coretypes.ResultBlockResults, error) { +func (r CometRPCClient) BlockResults(ctx context.Context, height *int64) (*coretypes.ResultBlockResults, error) { res, err := r.c.BlockResults(ctx, height) if err != nil { return nil, err @@ -274,7 +274,7 @@ func (r RPCClient) BlockResults(ctx context.Context, height *int64) (*coretypes. }, nil } -func (r RPCClient) BlockchainInfo( +func (r CometRPCClient) BlockchainInfo( ctx context.Context, minHeight, maxHeight int64, ) (*coretypes.ResultBlockchainInfo, error) { @@ -305,7 +305,7 @@ func (r RPCClient) BlockchainInfo( }, nil } -func (r RPCClient) Commit(ctx context.Context, height *int64) (*coretypes.ResultCommit, error) { +func (r CometRPCClient) Commit(ctx context.Context, height *int64) (*coretypes.ResultCommit, error) { res, err := r.c.Commit(ctx, height) if err != nil { return nil, err @@ -336,7 +336,7 @@ func (r RPCClient) Commit(ctx context.Context, height *int64) (*coretypes.Result }, nil } -func (r RPCClient) Tx(ctx context.Context, hash []byte, prove bool) (*coretypes.ResultTx, error) { +func (r CometRPCClient) Tx(ctx context.Context, hash []byte, prove bool) (*coretypes.ResultTx, error) { res, err := r.c.Tx(ctx, hash, prove) if err != nil { return nil, err @@ -345,7 +345,7 @@ func (r RPCClient) Tx(ctx context.Context, hash []byte, prove bool) (*coretypes. return convertResultTx(res), nil } -func (r RPCClient) TxSearch( +func (r CometRPCClient) TxSearch( ctx context.Context, query string, prove bool, @@ -368,7 +368,7 @@ func (r RPCClient) TxSearch( }, nil } -func (r RPCClient) BlockSearch( +func (r CometRPCClient) BlockSearch( ctx context.Context, query string, page, perPage *int, @@ -388,8 +388,7 @@ func (r RPCClient) BlockSearch( } return &coretypes.ResultBlockSearch{ - Blocks: blocks, - TotalCount: res.TotalCount, + Blocks: blocks, }, nil } diff --git a/cclient/cmbft_consensus.go b/cclient/cmbft_consensus.go new file mode 100644 index 000000000..7a0a40e99 --- /dev/null +++ b/cclient/cmbft_consensus.go @@ -0,0 +1,153 @@ +package cclient + +import ( + "context" + "fmt" + "time" + + "github.com/cometbft/cometbft/libs/bytes" + rpcclient "github.com/cometbft/cometbft/rpc/client" + coretypes "github.com/cometbft/cometbft/rpc/core/types" + tmtypes "github.com/cometbft/cometbft/types" +) + +var _ ConsensusClient = (*CometRPCClient)(nil) + +// GetBlock implements ConsensusClient. +func (r CometRPCClient) GetBlockTime(ctx context.Context, height uint64) (time.Time, error) { + h := int64(height) + + b, err := r.Block(ctx, &h) + if err != nil { + return time.Time{}, fmt.Errorf("failed to get block: %w", err) + } + + return b.Block.Header.Time, nil +} + +// GetBlockResults implements ConsensusClient. +func (r CometRPCClient) GetBlockResults(ctx context.Context, height uint64) (*BlockResults, error) { + h := int64(height) + br, err := r.BlockResults(ctx, &h) + if err != nil { + return nil, fmt.Errorf("failed to get block results: %w", err) + } + return &BlockResults{ + TxsResults: br.TxsResults, + FinalizeBlockEvents: br.FinalizeBlockEvents, + }, nil +} + +// GetABCIQuery implements ConsensusClient. +func (r CometRPCClient) GetABCIQuery(ctx context.Context, queryPath string, data bytes.HexBytes) (*ABCIQueryResponse, error) { + resp, err := r.ABCIQuery(ctx, queryPath, data) + if err != nil { + return nil, fmt.Errorf("failed to get ABCI query: %w", err) + } + return &ABCIQueryResponse{ + Code: resp.Response.Code, + Value: resp.Response.Value, + }, nil +} + +// GetTx implements ConsensusClient. +func (r CometRPCClient) GetTx(ctx context.Context, hash []byte, prove bool) (*coretypes.ResultTx, error) { + resp, err := r.Tx(ctx, hash, prove) + if err != nil { + return nil, fmt.Errorf("failed to get tx: %w", err) + } + return resp, nil +} + +// GetTxSearch implements ConsensusClient. +func (r CometRPCClient) GetTxSearch(ctx context.Context, query string, prove bool, page *int, perPage *int, orderBy string) (*ResultTxSearch, error) { + resp, err := r.TxSearch(ctx, query, prove, page, perPage, orderBy) + if err != nil { + return nil, fmt.Errorf("failed to get tx search: %w", err) + } + return &ResultTxSearch{ + Txs: resp.Txs, + }, nil +} + +// GetBlockSearch implements ConsensusClient. +func (r CometRPCClient) GetBlockSearch(ctx context.Context, query string, page *int, perPage *int, orderBy string) (*coretypes.ResultBlockSearch, error) { + resp, err := r.BlockSearch(ctx, query, page, perPage, orderBy) + if err != nil { + return nil, fmt.Errorf("failed to get block search: %w", err) + } + return resp, nil +} + +// GetCommit implements ConsensusClient. +func (r CometRPCClient) GetCommit(ctx context.Context, height uint64) (*coretypes.ResultCommit, error) { + h := int64(height) + c, err := r.Commit(ctx, &h) + if err != nil { + return nil, fmt.Errorf("failed to get commit: %w", err) + } + return c, nil +} + +// GetValidators implements ConsensusClient. +func (r CometRPCClient) GetValidators(ctx context.Context, height *int64, page *int, perPage *int) (*ResultValidators, error) { + v, err := r.Validators(ctx, height, page, perPage) + if err != nil { + return nil, fmt.Errorf("failed to get validators: %w", err) + } + + return &ResultValidators{ + Validators: v.Validators, + }, nil +} + +// DoBroadcastTxAsync implements ConsensusClient. +func (r CometRPCClient) DoBroadcastTxAsync(ctx context.Context, tx tmtypes.Tx) (*ResultBroadcastTx, error) { + b, err := r.BroadcastTxAsync(ctx, tx) + if err != nil { + return nil, fmt.Errorf("failed to broadcast tx async: %w", err) + } + return &ResultBroadcastTx{ + Code: b.Code, + Data: b.Data, + Log: b.Log, + Codespace: b.Codespace, + Hash: b.Hash, + }, nil +} + +// DoBroadcastTxSync implements ConsensusClient. +func (r CometRPCClient) DoBroadcastTxSync(ctx context.Context, tx tmtypes.Tx) (*ResultBroadcastTx, error) { + b, err := r.BroadcastTxSync(ctx, tx) + if err != nil { + return nil, fmt.Errorf("failed to broadcast tx sync: %w", err) + } + return &ResultBroadcastTx{ + Code: b.Code, + Data: b.Data, + Log: b.Log, + Codespace: b.Codespace, + Hash: b.Hash, + }, nil +} + +// GetABCIQueryWithOptions implements ConsensusClient. +func (r CometRPCClient) GetABCIQueryWithOptions(ctx context.Context, path string, data bytes.HexBytes, opts rpcclient.ABCIQueryOptions) (*coretypes.ResultABCIQuery, error) { + q, err := r.ABCIQueryWithOptions(ctx, path, data, opts) + if err != nil { + return nil, fmt.Errorf("failed to get ABCI query with options: %w", err) + } + return q, nil +} + +// GetStatus implements ConsensusClient. +func (r CometRPCClient) GetStatus(ctx context.Context) (*Status, error) { + s, err := r.Status(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get status: %w", err) + } + return &Status{ + CatchingUp: s.SyncInfo.CatchingUp, + LatestBlockHeight: uint64(s.SyncInfo.LatestBlockHeight), + }, nil +} diff --git a/cclient/consensus.go b/cclient/consensus.go new file mode 100644 index 000000000..b185d34af --- /dev/null +++ b/cclient/consensus.go @@ -0,0 +1,94 @@ +package cclient + +import ( + "context" + "strings" + "time" + + abci "github.com/cometbft/cometbft/abci/types" + "github.com/cometbft/cometbft/crypto" + bytes "github.com/cometbft/cometbft/libs/bytes" + rpcclient "github.com/cometbft/cometbft/rpc/client" + coretypes "github.com/cometbft/cometbft/rpc/core/types" + tmtypes "github.com/cometbft/cometbft/types" +) + +// TODO(reece): get off cometbft types into internal relayer. +type ConsensusClient interface { + GetBlockTime(ctx context.Context, height uint64) (time.Time, error) + GetStatus(ctx context.Context) (*Status, error) + GetBlockResults(ctx context.Context, height uint64) (*BlockResults, error) + GetABCIQuery(ctx context.Context, queryPath string, data bytes.HexBytes) (*ABCIQueryResponse, error) + GetValidators( + ctx context.Context, + height *int64, + page, perPage *int, + ) (*ResultValidators, error) + GetTxSearch( + ctx context.Context, + query string, + prove bool, + page, perPage *int, + orderBy string, + ) (*ResultTxSearch, error) + DoBroadcastTxSync(ctx context.Context, tx tmtypes.Tx) (*ResultBroadcastTx, error) + DoBroadcastTxAsync(ctx context.Context, tx tmtypes.Tx) (*ResultBroadcastTx, error) + GetTx(ctx context.Context, hash []byte, prove bool) (*coretypes.ResultTx, error) + GetBlockSearch( + ctx context.Context, + query string, + page, perPage *int, + orderBy string, + ) (*coretypes.ResultBlockSearch, error) + GetCommit(ctx context.Context, height uint64) (*coretypes.ResultCommit, error) + GetABCIQueryWithOptions( + ctx context.Context, + path string, + data bytes.HexBytes, + opts rpcclient.ABCIQueryOptions, + ) (*coretypes.ResultABCIQuery, error) +} + +type Status struct { + CatchingUp bool + LatestBlockHeight uint64 +} + +type BlockResults struct { + FinalizeBlockEvents []abci.Event `json:"finalize_block_events"` + TxsResults []*abci.ExecTxResult `json:"txs_results"` +} + +type ABCIQueryResponse struct { + Code uint32 `json:"code,omitempty"` + Value []byte `json:"value,omitempty"` +} + +// The response value contains the data link escape control character which must be removed before parsing. +func (q ABCIQueryResponse) ValueCleaned() string { + return strings.ReplaceAll(strings.TrimSpace(string(q.Value)), "\u0010", "") +} + +// coretypes.ResultTxSearch +type ResultTxSearch struct { + Txs []*coretypes.ResultTx `json:"txs"` +} + +type ResultValidators struct { + Validators []*tmtypes.Validator `json:"validators"` +} + +type Validator struct { + Address crypto.Address `json:"address"` + PubKey crypto.PubKey `json:"pub_key"` + VotingPower int64 `json:"voting_power"` + ProposerPriority int64 `json:"proposer_priority"` +} + +type ResultBroadcastTx struct { + Code uint32 `json:"code"` + Data bytes.HexBytes `json:"data"` + Log string `json:"log"` + Codespace string `json:"codespace"` + Hash bytes.HexBytes `json:"hash"` +} diff --git a/interchaintest/feegrant_test.go b/interchaintest/feegrant_test.go index 9b9cbf951..977bd792e 100644 --- a/interchaintest/feegrant_test.go +++ b/interchaintest/feegrant_test.go @@ -18,7 +18,7 @@ import ( "github.com/cosmos/go-bip39" transfertypes "github.com/cosmos/ibc-go/v8/modules/apps/transfer/types" chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types" - "github.com/cosmos/relayer/v2/client" + "github.com/cosmos/relayer/v2/cclient" "github.com/cosmos/relayer/v2/relayer" "github.com/cosmos/relayer/v2/relayer/chains/cosmos" "github.com/cosmos/relayer/v2/relayer/processor" @@ -396,7 +396,7 @@ func TestRelayerFeeGrant(t *testing.T) { hash, err := hex.DecodeString(curr.Response.TxHash) require.Nil(t, err) - txResp, err := TxWithRetry(ctx, cProv.RPCClient, hash) + txResp, err := TxWithRetry(ctx, cProv.ConsensusClient, hash) require.Nil(t, err) require.Nil(t, err) @@ -538,11 +538,11 @@ func TestRelayerFeeGrant(t *testing.T) { } } -func TxWithRetry(ctx context.Context, client client.RPCClient, hash []byte) (*coretypes.ResultTx, error) { +func TxWithRetry(ctx context.Context, client cclient.ConsensusClient, hash []byte) (*coretypes.ResultTx, error) { var err error var res *coretypes.ResultTx if err = retry.Do(func() error { - res, err = client.Tx(ctx, hash, true) + res, err = client.GetTx(ctx, hash, true) return err }, retry.Context(ctx), relayer.RtyAtt, relayer.RtyDel, relayer.RtyErr); err != nil { return res, err @@ -870,7 +870,7 @@ func TestRelayerFeeGrantExternal(t *testing.T) { hash, err := hex.DecodeString(curr.Response.TxHash) require.Nil(t, err) - txResp, err := TxWithRetry(ctx, cProv.RPCClient, hash) + txResp, err := TxWithRetry(ctx, cProv.ConsensusClient, hash) require.Nil(t, err) require.Nil(t, err) diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index 13746ef81..dea00a435 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -8,12 +8,12 @@ import ( "time" "github.com/avast/retry-go/v4" - coretypes "github.com/cometbft/cometbft/rpc/core/types" sdk "github.com/cosmos/cosmos-sdk/types" clienttypes "github.com/cosmos/ibc-go/v8/modules/core/02-client/types" conntypes "github.com/cosmos/ibc-go/v8/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types" ibcexported "github.com/cosmos/ibc-go/v8/modules/core/exported" + "github.com/cosmos/relayer/v2/cclient" "github.com/cosmos/relayer/v2/relayer/chains" "github.com/cosmos/relayer/v2/relayer/processor" "github.com/cosmos/relayer/v2/relayer/provider" @@ -149,7 +149,7 @@ func (ccp *CosmosChainProcessor) latestHeightWithRetry(ctx context.Context) (lat // nodeStatusWithRetry will query for the latest node status, retrying in case of failure. // It will delay by latestHeightQueryRetryDelay between attempts, up to latestHeightQueryRetries. -func (ccp *CosmosChainProcessor) nodeStatusWithRetry(ctx context.Context) (status *coretypes.ResultStatus, err error) { +func (ccp *CosmosChainProcessor) nodeStatusWithRetry(ctx context.Context) (status *cclient.Status, err error) { return status, retry.Do(func() error { latestHeightQueryCtx, cancelLatestHeightQueryCtx := context.WithTimeout(ctx, queryTimeout) defer cancelLatestHeightQueryCtx() @@ -239,7 +239,7 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui } continue } - persistence.latestHeight = status.SyncInfo.LatestBlockHeight + persistence.latestHeight = int64(status.LatestBlockHeight) break } @@ -351,7 +351,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu return nil } - persistence.latestHeight = status.SyncInfo.LatestBlockHeight + persistence.latestHeight = int64(status.LatestBlockHeight) // This debug log is very noisy, but is helpful when debugging new chains. // ccp.log.Debug("Queried latest height", @@ -393,7 +393,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu for i := persistence.latestQueriedBlock + 1; i <= persistence.latestHeight; i++ { var ( eg errgroup.Group - blockRes *coretypes.ResultBlockResults + blockRes *cclient.BlockResults ibcHeader provider.IBCHeader ) @@ -403,7 +403,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu queryCtx, cancelQueryCtx := context.WithTimeout(ctx, blockResultsQueryTimeout) defer cancelQueryCtx() - blockRes, err = ccp.chainProvider.RPCClient.BlockResults(queryCtx, &sI) + blockRes, err = ccp.chainProvider.ConsensusClient.GetBlockResults(queryCtx, uint64(i)) if err != nil && ccp.metrics != nil { ccp.metrics.IncBlockQueryFailure(chainID, "RPC Client") } diff --git a/relayer/chains/cosmos/fee_market.go b/relayer/chains/cosmos/fee_market.go index bc4cb85f1..d2e96bb50 100644 --- a/relayer/chains/cosmos/fee_market.go +++ b/relayer/chains/cosmos/fee_market.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "regexp" - "strings" sdkmath "cosmossdk.io/math" "go.uber.org/zap" @@ -32,15 +31,12 @@ func (cc *CosmosProvider) DynamicFee(ctx context.Context) string { // QueryBaseFee attempts to make an ABCI query to retrieve the base fee on chains using the Osmosis EIP-1559 implementation. // This is currently hardcoded to only work on Osmosis. func (cc *CosmosProvider) QueryBaseFee(ctx context.Context) (string, error) { - resp, err := cc.RPCClient.ABCIQuery(ctx, queryPath, nil) - if err != nil || resp.Response.Code != 0 { + resp, err := cc.ConsensusClient.GetABCIQuery(ctx, queryPath, nil) + if err != nil || resp.Code != 0 { return "", err } - // The response value contains the data link escape control character which must be removed before parsing. - cleanedString := strings.ReplaceAll(strings.TrimSpace(string(resp.Response.Value)), "\u0010", "") - - decFee, err := sdkmath.LegacyNewDecFromStr(cleanedString) + decFee, err := sdkmath.LegacyNewDecFromStr(resp.ValueCleaned()) if err != nil { return "", err } diff --git a/relayer/chains/cosmos/feegrant.go b/relayer/chains/cosmos/feegrant.go index f74027290..7045bd1fd 100644 --- a/relayer/chains/cosmos/feegrant.go +++ b/relayer/chains/cosmos/feegrant.go @@ -332,7 +332,7 @@ func (cc *CosmosProvider) EnsureBasicGrants(ctx context.Context, memo string, ga } if len(msgs) > 0 { - cliCtx := client.Context{}.WithClient(cc.RPCClient). + cliCtx := client.Context{}. WithInterfaceRegistry(cc.Cdc.InterfaceRegistry). WithChainID(cc.PCfg.ChainID). WithCodec(cc.Cdc.Marshaler). diff --git a/relayer/chains/cosmos/provider.go b/relayer/chains/cosmos/provider.go index 61d9a7a86..11491be8c 100644 --- a/relayer/chains/cosmos/provider.go +++ b/relayer/chains/cosmos/provider.go @@ -19,7 +19,7 @@ import ( "github.com/cosmos/cosmos-sdk/types/module" "github.com/cosmos/gogoproto/proto" commitmenttypes "github.com/cosmos/ibc-go/v8/modules/core/23-commitment/types" - cwrapper "github.com/cosmos/relayer/v2/client" + "github.com/cosmos/relayer/v2/cclient" "github.com/cosmos/relayer/v2/relayer/codecs/ethermint" "github.com/cosmos/relayer/v2/relayer/processor" "github.com/cosmos/relayer/v2/relayer/provider" @@ -124,14 +124,14 @@ func (pc CosmosProviderConfig) NewProvider(log *zap.Logger, homepath string, deb type CosmosProvider struct { log *zap.Logger - PCfg CosmosProviderConfig - Keybase keyring.Keyring - KeyringOptions []keyring.Option - RPCClient cwrapper.RPCClient - LightProvider provtypes.Provider - Input io.Reader - Output io.Writer - Cdc Codec + PCfg CosmosProviderConfig + Keybase keyring.Keyring + KeyringOptions []keyring.Option + ConsensusClient cclient.ConsensusClient + LightProvider provtypes.Provider + Input io.Reader + Output io.Writer + Cdc Codec // TODO: GRPC Client type? //nextAccountSeq uint64 @@ -350,7 +350,7 @@ func (cc *CosmosProvider) startLivelinessChecks(ctx context.Context, timeout tim case <-ctx.Done(): return case <-ticker.C: - _, err := cc.RPCClient.Status(ctx) + _, err := cc.ConsensusClient.GetStatus(ctx) if err != nil { cc.log.Error("RPC client disconnected", zap.String("chain", cc.ChainName()), zap.Error(err)) @@ -401,13 +401,13 @@ func (cc *CosmosProvider) setRpcClient(onStartup bool, rpcAddr string, timeout t return err } - cc.RPCClient = cwrapper.NewRPCClient(c) + cc.ConsensusClient = cclient.NewCometRPCClient(c) // Only check status if not on startup, to ensure the relayer will not block on startup. // All subsequent calls will perform the status check to ensure RPC endpoints are rotated // as necessary. if !onStartup { - if _, err = cc.RPCClient.Status(context.Background()); err != nil { + if _, err = cc.ConsensusClient.GetStatus(context.Background()); err != nil { return err } } @@ -428,21 +428,21 @@ func (cc *CosmosProvider) setLightProvider(rpcAddr string) error { // WaitForNBlocks blocks until the next block on a given chain func (cc *CosmosProvider) WaitForNBlocks(ctx context.Context, n int64) error { - var initial int64 - h, err := cc.RPCClient.Status(ctx) + var initial uint64 + h, err := cc.ConsensusClient.GetStatus(ctx) if err != nil { return err } - if h.SyncInfo.CatchingUp { + if h.CatchingUp { return errors.New("chain catching up") } - initial = h.SyncInfo.LatestBlockHeight + initial = h.LatestBlockHeight for { - h, err = cc.RPCClient.Status(ctx) + h, err = cc.ConsensusClient.GetStatus(ctx) if err != nil { return err } - if h.SyncInfo.LatestBlockHeight > initial+n { + if h.LatestBlockHeight > initial+uint64(n) { return nil } select { @@ -455,11 +455,11 @@ func (cc *CosmosProvider) WaitForNBlocks(ctx context.Context, n int64) error { } func (cc *CosmosProvider) BlockTime(ctx context.Context, height int64) (time.Time, error) { - resultBlock, err := cc.RPCClient.Block(ctx, &height) + bt, err := cc.ConsensusClient.GetBlockTime(ctx, uint64(height)) if err != nil { return time.Time{}, err } - return resultBlock.Block.Time, nil + return bt, nil } func (cc *CosmosProvider) SetMetrics(m *processor.PrometheusMetrics) { diff --git a/relayer/chains/cosmos/query.go b/relayer/chains/cosmos/query.go index 188635934..0bce53f42 100644 --- a/relayer/chains/cosmos/query.go +++ b/relayer/chains/cosmos/query.go @@ -16,7 +16,6 @@ import ( "cosmossdk.io/x/feegrant" upgradetypes "cosmossdk.io/x/upgrade/types" abci "github.com/cometbft/cometbft/abci/types" - coretypes "github.com/cometbft/cometbft/rpc/core/types" tmtypes "github.com/cometbft/cometbft/types" "github.com/cosmos/cosmos-sdk/codec" sdk "github.com/cosmos/cosmos-sdk/types" @@ -33,6 +32,7 @@ import ( host "github.com/cosmos/ibc-go/v8/modules/core/24-host" ibcexported "github.com/cosmos/ibc-go/v8/modules/core/exported" tmclient "github.com/cosmos/ibc-go/v8/modules/light-clients/07-tendermint" + "github.com/cosmos/relayer/v2/cclient" "github.com/cosmos/relayer/v2/relayer/chains" "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap" @@ -67,7 +67,7 @@ func (cc *CosmosProvider) queryIBCMessages(ctx context.Context, log *zap.Logger, ) eg.Go(func() error { - res, err := cc.RPCClient.BlockSearch(ctx, query, &page, &limit, "") + res, err := cc.ConsensusClient.GetBlockSearch(ctx, query, &page, &limit, "") if err != nil { return err } @@ -77,7 +77,7 @@ func (cc *CosmosProvider) queryIBCMessages(ctx context.Context, log *zap.Logger, for _, b := range res.Blocks { b := b nestedEg.Go(func() error { - block, err := cc.RPCClient.BlockResults(ctx, &b.Block.Height) + block, err := cc.ConsensusClient.GetBlockResults(ctx, uint64(b.Block.Height)) if err != nil { return err } @@ -93,7 +93,7 @@ func (cc *CosmosProvider) queryIBCMessages(ctx context.Context, log *zap.Logger, }) eg.Go(func() error { - res, err := cc.RPCClient.TxSearch(ctx, query, true, &page, &limit, "") + res, err := cc.ConsensusClient.GetTxSearch(ctx, query, true, &page, &limit, "") if err != nil { return err } @@ -121,7 +121,8 @@ func (cc *CosmosProvider) QueryTx(ctx context.Context, hashHex string) (*provide return nil, err } - resp, err := cc.RPCClient.Tx(ctx, hash, true) + // TODO(reece): Why is this true when we do not use the proof? + resp, err := cc.ConsensusClient.GetTx(ctx, hash, true) if err != nil { return nil, err } @@ -151,7 +152,7 @@ func (cc *CosmosProvider) QueryTxs(ctx context.Context, page, limit int, events return nil, errors.New("limit must greater than 0") } - res, err := cc.RPCClient.TxSearch(ctx, strings.Join(events, " AND "), true, &page, &limit, "") + res, err := cc.ConsensusClient.GetTxSearch(ctx, strings.Join(events, " AND "), true, &page, &limit, "") if err != nil { return nil, err } @@ -604,7 +605,7 @@ func (cc *CosmosProvider) QueryUpgradedConsState(ctx context.Context, height int // QueryConsensusState returns a consensus state for a given chain to be used as a // client in another chain, fetches latest height when passed 0 as arg func (cc *CosmosProvider) QueryConsensusState(ctx context.Context, height int64) (ibcexported.ConsensusState, int64, error) { - commit, err := cc.RPCClient.Commit(ctx, &height) + commit, err := cc.ConsensusClient.GetCommit(ctx, uint64(height)) if err != nil { return &tmclient.ConsensusState{}, 0, err } @@ -613,7 +614,7 @@ func (cc *CosmosProvider) QueryConsensusState(ctx context.Context, height int64) count := 10_000 nextHeight := height + 1 - nextVals, err := cc.RPCClient.Validators(ctx, &nextHeight, &page, &count) + nextVals, err := cc.ConsensusClient.GetValidators(ctx, &nextHeight, &page, &count) if err != nil { return &tmclient.ConsensusState{}, 0, err } @@ -1201,18 +1202,18 @@ func (cc *CosmosProvider) QueryPacketReceipt(ctx context.Context, height int64, } func (cc *CosmosProvider) QueryLatestHeight(ctx context.Context) (int64, error) { - stat, err := cc.RPCClient.Status(ctx) + stat, err := cc.ConsensusClient.GetStatus(ctx) if err != nil { return -1, err - } else if stat.SyncInfo.CatchingUp { + } else if stat.CatchingUp { return -1, fmt.Errorf("node at %s running chain %s not caught up", cc.PCfg.RPCAddr, cc.PCfg.ChainID) } - return stat.SyncInfo.LatestBlockHeight, nil + return int64(stat.LatestBlockHeight), nil } // Query current node status -func (cc *CosmosProvider) QueryStatus(ctx context.Context) (*coretypes.ResultStatus, error) { - status, err := cc.RPCClient.Status(ctx) +func (cc *CosmosProvider) QueryStatus(ctx context.Context) (*cclient.Status, error) { + status, err := cc.ConsensusClient.GetStatus(ctx) if err != nil { return nil, fmt.Errorf("failed to query node status: %w", err) } diff --git a/relayer/chains/cosmos/tx.go b/relayer/chains/cosmos/tx.go index bf5750969..2afc0d995 100644 --- a/relayer/chains/cosmos/tx.go +++ b/relayer/chains/cosmos/tx.go @@ -43,6 +43,7 @@ import ( ibcexported "github.com/cosmos/ibc-go/v8/modules/core/exported" tmclient "github.com/cosmos/ibc-go/v8/modules/light-clients/07-tendermint" localhost "github.com/cosmos/ibc-go/v8/modules/light-clients/09-localhost" + "github.com/cosmos/relayer/v2/cclient" strideicqtypes "github.com/cosmos/relayer/v2/relayer/chains/cosmos/stride" "github.com/cosmos/relayer/v2/relayer/ethermint" "github.com/cosmos/relayer/v2/relayer/provider" @@ -265,7 +266,7 @@ func (cc *CosmosProvider) AwaitTx(txHash bytes.HexBytes, timeout time.Duration) // sent and executed successfully is returned. // // feegranterKey - key name of the address set as the feegranter, empty string will not feegrant -func (cc *CosmosProvider) SendMsgsWith(ctx context.Context, msgs []sdk.Msg, memo string, gas uint64, signingKey string, feegranterKey string) (*coretypes.ResultBroadcastTx, error) { +func (cc *CosmosProvider) SendMsgsWith(ctx context.Context, msgs []sdk.Msg, memo string, gas uint64, signingKey string, feegranterKey string) (*cclient.ResultBroadcastTx, error) { sdkConfigMutex.Lock() sdkConf := sdk.GetConfig() sdkConf.SetBech32PrefixForAccount(cc.PCfg.AccountPrefix, cc.PCfg.AccountPrefix+"pub") @@ -343,7 +344,7 @@ func (cc *CosmosProvider) SendMsgsWith(ctx context.Context, msgs []sdk.Msg, memo return nil, err } - res, err := cc.RPCClient.BroadcastTxAsync(ctx, txBytes) + res, err := cc.ConsensusClient.DoBroadcastTxAsync(ctx, txBytes) if res != nil { fmt.Printf("TX hash: %s\n", res.Hash) } @@ -386,7 +387,7 @@ func (cc *CosmosProvider) broadcastTx( asyncCallbacks []func(*provider.RelayerTxResponse, error), // callback for success/fail of the wait for block inclusion dynamicFee string, ) error { - res, err := cc.RPCClient.BroadcastTxSync(ctx, tx) + res, err := cc.ConsensusClient.DoBroadcastTxSync(ctx, tx) isErr := err != nil isFailed := res != nil && res.Code != 0 if isErr || isFailed { @@ -497,7 +498,7 @@ func (cc *CosmosProvider) waitForBlockInclusion( return nil, fmt.Errorf("timed out after: %d; %w", waitTimeout, ErrTimeoutAfterWaitingForTxBroadcast) // This fixed poll is fine because it's only for logging and updating prometheus metrics currently. case <-time.After(time.Millisecond * 100): - res, err := cc.RPCClient.Tx(ctx, txHash, false) + res, err := cc.ConsensusClient.GetTx(ctx, txHash, false) if err == nil { return cc.mkTxResult(res) } @@ -1668,7 +1669,7 @@ func (cc *CosmosProvider) PrepareFactory(txf tx.Factory, signingKey string) (tx. return tx.Factory{}, err } - cliCtx := client.Context{}.WithClient(cc.RPCClient). + cliCtx := client.Context{}. WithInterfaceRegistry(cc.Cdc.InterfaceRegistry). WithChainID(cc.PCfg.ChainID). WithCodec(cc.Cdc.Marshaler). @@ -1746,7 +1747,7 @@ func (cc *CosmosProvider) SetWithExtensionOptions(txf tx.Factory) (tx.Factory, e for _, opt := range cc.PCfg.ExtensionOptions { max, ok := sdkmath.NewIntFromString(opt.Value) if !ok { - return txf,errors.New("invalid opt value") + return txf, errors.New("invalid opt value") } extensionOption := ethermint.ExtensionOptionDynamicFeeTx{ MaxPriorityPrice: max, @@ -1845,7 +1846,7 @@ func (cc *CosmosProvider) QueryABCI(ctx context.Context, req abci.RequestQuery) Prove: req.Prove, } - result, err := cc.RPCClient.ABCIQueryWithOptions(ctx, req.Path, req.Data, opts) + result, err := cc.ConsensusClient.GetABCIQueryWithOptions(ctx, req.Path, req.Data, opts) if err != nil { return abci.ResponseQuery{}, err } diff --git a/relayer/chains/penumbra/penumbra_chain_processor.go b/relayer/chains/penumbra/penumbra_chain_processor.go index 2944a4570..7bea56b33 100644 --- a/relayer/chains/penumbra/penumbra_chain_processor.go +++ b/relayer/chains/penumbra/penumbra_chain_processor.go @@ -337,7 +337,7 @@ func (pcp *PenumbraChainProcessor) queryCycle(ctx context.Context, persistence * queryCtx, cancelQueryCtx := context.WithTimeout(ctx, blockResultsQueryTimeout) defer cancelQueryCtx() - blockRes, err = pcp.chainProvider.RPCClient.BlockResults(queryCtx, &i) + blockRes, err = pcp.chainProvider.ConsensusClient.BlockResults(queryCtx, &i) return err }) diff --git a/relayer/chains/penumbra/provider.go b/relayer/chains/penumbra/provider.go index 9145302af..c4ef95f96 100644 --- a/relayer/chains/penumbra/provider.go +++ b/relayer/chains/penumbra/provider.go @@ -22,7 +22,7 @@ import ( commitmenttypes "github.com/cosmos/ibc-go/v8/modules/core/23-commitment/types" ibcexported "github.com/cosmos/ibc-go/v8/modules/core/exported" tmclient "github.com/cosmos/ibc-go/v8/modules/light-clients/07-tendermint" - cwrapper "github.com/cosmos/relayer/v2/client" + "github.com/cosmos/relayer/v2/cclient" "github.com/cosmos/relayer/v2/relayer/codecs/ethermint" "github.com/cosmos/relayer/v2/relayer/provider" "github.com/strangelove-ventures/cometbft-client/client" @@ -139,15 +139,15 @@ func (h PenumbraIBCHeader) NextValidatorsHash() []byte { type PenumbraProvider struct { log *zap.Logger - PCfg PenumbraProviderConfig - Keybase keyring.Keyring - KeyringOptions []keyring.Option - RPCClient cwrapper.RPCClient - LightProvider provtypes.Provider - Input io.Reader - Output io.Writer - Codec Codec - RPCCaller jsonrpcclient.Caller + PCfg PenumbraProviderConfig + Keybase keyring.Keyring + KeyringOptions []keyring.Option + ConsensusClient cclient.CometRPCClient + LightProvider provtypes.Provider + Input io.Reader + Output io.Writer + Codec Codec + RPCCaller jsonrpcclient.Caller } func (cc *PenumbraProvider) ProviderConfig() provider.ProviderConfig { @@ -303,7 +303,7 @@ func (cc *PenumbraProvider) startLivelinessChecks(ctx context.Context, timeout t case <-ctx.Done(): return case <-ticker.C: - _, err := cc.RPCClient.Status(ctx) + _, err := cc.ConsensusClient.Status(ctx) if err != nil { cc.log.Error("RPC client disconnected", zap.String("chain", cc.ChainName()), zap.Error(err)) @@ -354,13 +354,13 @@ func (cc *PenumbraProvider) setRpcClient(onStartup bool, rpcAddr string, timeout return err } - cc.RPCClient = cwrapper.NewRPCClient(c) + cc.ConsensusClient = cclient.NewCometRPCClient(c) // Only check status if not on startup, to ensure the relayer will not block on startup. // All subsequent calls will perform the status check to ensure RPC endpoints are rotated // as necessary. if !onStartup { - if _, err = cc.RPCClient.Status(context.Background()); err != nil { + if _, err = cc.ConsensusClient.Status(context.Background()); err != nil { return err } } @@ -382,7 +382,7 @@ func (cc *PenumbraProvider) setLightProvider(rpcAddr string) error { // WaitForNBlocks blocks until the next block on a given chain func (cc *PenumbraProvider) WaitForNBlocks(ctx context.Context, n int64) error { var initial int64 - h, err := cc.RPCClient.Status(ctx) + h, err := cc.ConsensusClient.Status(ctx) if err != nil { return err } @@ -391,7 +391,7 @@ func (cc *PenumbraProvider) WaitForNBlocks(ctx context.Context, n int64) error { } initial = h.SyncInfo.LatestBlockHeight for { - h, err = cc.RPCClient.Status(ctx) + h, err = cc.ConsensusClient.Status(ctx) if err != nil { return err } @@ -408,7 +408,7 @@ func (cc *PenumbraProvider) WaitForNBlocks(ctx context.Context, n int64) error { } func (cc *PenumbraProvider) BlockTime(ctx context.Context, height int64) (time.Time, error) { - resultBlock, err := cc.RPCClient.Block(ctx, &height) + resultBlock, err := cc.ConsensusClient.Block(ctx, &height) if err != nil { return time.Time{}, err } diff --git a/relayer/chains/penumbra/query.go b/relayer/chains/penumbra/query.go index 615428799..518aa0764 100644 --- a/relayer/chains/penumbra/query.go +++ b/relayer/chains/penumbra/query.go @@ -42,7 +42,7 @@ func (cc *PenumbraProvider) QueryTx(ctx context.Context, hashHex string) (*provi return nil, err } - resp, err := cc.RPCClient.Tx(ctx, hash, true) + resp, err := cc.ConsensusClient.Tx(ctx, hash, true) if err != nil { return nil, err } @@ -72,7 +72,7 @@ func (cc *PenumbraProvider) QueryTxs(ctx context.Context, page, limit int, event return nil, errors.New("limit must greater than 0") } - res, err := cc.RPCClient.TxSearch(ctx, strings.Join(events, " AND "), true, &page, &limit, "") + res, err := cc.ConsensusClient.TxSearch(ctx, strings.Join(events, " AND "), true, &page, &limit, "") if err != nil { return nil, err } @@ -380,7 +380,7 @@ func (cc *PenumbraProvider) QueryUpgradedConsState(ctx context.Context, height i // QueryConsensusState returns a consensus state for a given chain to be used as a // client in another chain, fetches latest height when passed 0 as arg func (cc *PenumbraProvider) QueryConsensusState(ctx context.Context, height int64) (ibcexported.ConsensusState, int64, error) { - commit, err := cc.RPCClient.Commit(ctx, &height) + commit, err := cc.ConsensusClient.Commit(ctx, &height) if err != nil { return &tmclient.ConsensusState{}, 0, err } @@ -389,7 +389,7 @@ func (cc *PenumbraProvider) QueryConsensusState(ctx context.Context, height int6 count := 10_000 nextHeight := height + 1 - nextVals, err := cc.RPCClient.Validators(ctx, &nextHeight, &page, &count) + nextVals, err := cc.ConsensusClient.Validators(ctx, &nextHeight, &page, &count) if err != nil { return &tmclient.ConsensusState{}, 0, err } @@ -787,7 +787,7 @@ func (cc *PenumbraProvider) QueryPacketReceipt(ctx context.Context, height int64 } func (cc *PenumbraProvider) QueryLatestHeight(ctx context.Context) (int64, error) { - stat, err := cc.RPCClient.Status(ctx) + stat, err := cc.ConsensusClient.Status(ctx) if err != nil { return -1, err } else if stat.SyncInfo.CatchingUp { @@ -806,12 +806,12 @@ func (cc *PenumbraProvider) QueryHeaderAtHeight(ctx context.Context, height int6 return nil, fmt.Errorf("must pass in valid height, %d not valid", height) } - res, err := cc.RPCClient.Commit(ctx, &height) + res, err := cc.ConsensusClient.Commit(ctx, &height) if err != nil { return nil, err } - val, err := cc.RPCClient.Validators(ctx, &height, &page, &perPage) + val, err := cc.ConsensusClient.Validators(ctx, &height, &page, &perPage) if err != nil { return nil, err } @@ -922,7 +922,7 @@ func (cc *PenumbraProvider) queryIBCMessages(ctx context.Context, log *zap.Logge return nil, errors.New("limit must greater than 0") } - res, err := cc.RPCClient.TxSearch(ctx, query, true, &page, &limit, "") + res, err := cc.ConsensusClient.TxSearch(ctx, query, true, &page, &limit, "") if err != nil { return nil, err } @@ -1004,7 +1004,7 @@ func (cc *PenumbraProvider) QueryRecvPacket( // QueryStatus queries the current node status. func (cc *PenumbraProvider) QueryStatus(ctx context.Context) (*coretypes.ResultStatus, error) { - status, err := cc.RPCClient.Status(ctx) + status, err := cc.ConsensusClient.Status(ctx) if err != nil { return nil, fmt.Errorf("failed to query node status: %w", err) } diff --git a/relayer/chains/penumbra/tx.go b/relayer/chains/penumbra/tx.go index dbf8a2547..5ce21696a 100644 --- a/relayer/chains/penumbra/tx.go +++ b/relayer/chains/penumbra/tx.go @@ -248,7 +248,7 @@ type ValidatorUpdate struct { } func (cc *PenumbraProvider) getAnchor(ctx context.Context) (*penumbracrypto.MerkleRoot, error) { - status, err := cc.RPCClient.Status(ctx) + status, err := cc.ConsensusClient.Status(ctx) if err != nil { return nil, err } @@ -345,7 +345,7 @@ func (cc *PenumbraProvider) sendMessagesInner(ctx context.Context, msgs []provid return nil, err } - return cc.RPCClient.BroadcastTxSync(ctx, txBytes) + return cc.ConsensusClient.BroadcastTxSync(ctx, txBytes) } // SendMessages attempts to sign, encode, & send a slice of RelayerMessages @@ -372,7 +372,7 @@ func (cc *PenumbraProvider) SendMessages(ctx context.Context, msgs []provider.Re ctx, cancel := context.WithTimeout(ctx, 40*time.Second) defer cancel() - res, err := cc.RPCClient.Tx(ctx, syncRes.Hash, false) + res, err := cc.ConsensusClient.Tx(ctx, syncRes.Hash, false) if err != nil { return err } @@ -2078,7 +2078,7 @@ func (cc *PenumbraProvider) QueryABCI(ctx context.Context, req abci.RequestQuery Prove: req.Prove, } - result, err := cc.RPCClient.ABCIQueryWithOptions(ctx, req.Path, req.Data, opts) + result, err := cc.ConsensusClient.ABCIQueryWithOptions(ctx, req.Path, req.Data, opts) if err != nil { return abci.ResponseQuery{}, err } @@ -2153,7 +2153,7 @@ func (cc *PenumbraProvider) broadcastTx( asyncTimeout time.Duration, // timeout for waiting for block inclusion asyncCallback func(*provider.RelayerTxResponse, error), // callback for success/fail of the wait for block inclusion ) error { - res, err := cc.RPCClient.BroadcastTxSync(ctx, tx) + res, err := cc.ConsensusClient.BroadcastTxSync(ctx, tx) isErr := err != nil isFailed := res != nil && res.Code != 0 if isErr || isFailed { @@ -2249,12 +2249,12 @@ func (cc *PenumbraProvider) waitForBlockInclusion( return nil, fmt.Errorf("timed out after: %d; %w", waitTimeout, ErrTimeoutAfterWaitingForTxBroadcast) // This fixed poll is fine because it's only for logging and updating prometheus metrics currently. case <-time.After(time.Millisecond * 100): - res, err := cc.RPCClient.Tx(ctx, txHash, false) + res, err := cc.ConsensusClient.Tx(ctx, txHash, false) if err == nil { return cc.mkTxResult(res) } if strings.Contains(err.Error(), "transaction indexing is disabled") { - return nil,errors.New("cannot determine success/failure of tx because transaction indexing is disabled on rpc url") + return nil, errors.New("cannot determine success/failure of tx because transaction indexing is disabled on rpc url") } case <-ctx.Done(): return nil, ctx.Err()