Skip to content

Commit

Permalink
Instrument more functions under the chain::cosmos module
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Aug 10, 2022
1 parent 267a30c commit b87e949
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 62 deletions.
73 changes: 46 additions & 27 deletions relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tendermint_rpc::{
};
use tokio::runtime::Runtime as TokioRuntime;
use tonic::{codegen::http::Uri, metadata::AsciiMetadataValue};
use tracing::{error, span, warn, Level};
use tracing::{error, instrument, warn};

use ibc::clients::ics07_tendermint::consensus_state::ConsensusState as TMConsensusState;
use ibc::clients::ics07_tendermint::header::Header as TmHeader;
Expand Down Expand Up @@ -124,6 +124,23 @@ impl CosmosSdkChain {
&self.config
}

/// The maximum size of any transaction sent by the relayer to this chain
fn max_tx_size(&self) -> usize {
self.config.max_tx_size.into()
}

fn key(&self) -> Result<KeyEntry, Error> {
self.keybase()
.get_key(&self.config.key_name)
.map_err(Error::key_base)
}

fn trusting_period(&self, unbonding_period: Duration) -> Duration {
self.config
.trusting_period
.unwrap_or(2 * unbonding_period / 3)
}

/// Performs validation of chain-specific configuration
/// parameters against the chain's genesis configuration.
///
Expand Down Expand Up @@ -278,11 +295,6 @@ impl CosmosSdkChain {
self.rt.block_on(f)
}

/// The maximum size of any transaction sent by the relayer to this chain
fn max_tx_size(&self) -> usize {
self.config.max_tx_size.into()
}

fn query(
&self,
data: impl Into<Path>,
Expand Down Expand Up @@ -342,6 +354,7 @@ impl CosmosSdkChain {
// SAFETY: Creating a Path from a constant; this should never fail
let path = TendermintABCIPath::from_str(SDK_UPGRADE_QUERY_PATH)
.expect("Turning SDK upgrade query path constant into a Tendermint ABCI path");

let response: QueryResponse = self.block_on(abci_query(
&self.rpc_client,
&self.config.rpc_addr,
Expand All @@ -356,23 +369,14 @@ impl CosmosSdkChain {
Ok((response.value, proof))
}

fn key(&self) -> Result<KeyEntry, Error> {
self.keybase()
.get_key(&self.config.key_name)
.map_err(Error::key_base)
}

fn trusting_period(&self, unbonding_period: Duration) -> Duration {
self.config
.trusting_period
.unwrap_or(2 * unbonding_period / 3)
}

/// Query the chain status via an RPC query.
///
/// Returns an error if the node is still syncing and has not caught up,
/// ie. if `sync_info.catching_up` is `true`.
fn chain_status(&self) -> Result<status::Response, Error> {
crate::time!("chain_status");
crate::telemetry!(query, self.id(), "status");

let status = self
.block_on(self.rpc_client.status())
.map_err(|e| Error::rpc(self.config.rpc_addr.clone(), e))?;
Expand Down Expand Up @@ -401,15 +405,21 @@ impl CosmosSdkChain {
Ok(status.height)
}

#[instrument(
name = "send_messages_and_wait_commit",
level = "error",
skip_all,
fields(
chain = %self.id(),
tracking_id = %tracked_msgs.tracking_id()
),
)]
async fn do_send_messages_and_wait_commit(
&mut self,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<IbcEvent>, Error> {
crate::time!("send_messages_and_wait_commit");

let _span =
span!(Level::DEBUG, "send_tx_commit", id = %tracked_msgs.tracking_id()).entered();

let proto_msgs = tracked_msgs.msgs;

let key_entry = self.key()?;
Expand All @@ -429,15 +439,21 @@ impl CosmosSdkChain {
.await
}

#[instrument(
name = "send_messages_and_wait_check_tx",
level = "error",
skip_all,
fields(
chain = %self.id(),
tracking_id = %tracked_msgs.tracking_id()
),
)]
async fn do_send_messages_and_wait_check_tx(
&mut self,
tracked_msgs: TrackedMsgs,
) -> Result<Vec<Response>, Error> {
crate::time!("send_messages_and_wait_check_tx");

let span = span!(Level::DEBUG, "send_tx_check", id = %tracked_msgs.tracking_id());
let _enter = span.enter();

let proto_msgs = tracked_msgs.msgs;

let key_entry = self.key()?;
Expand Down Expand Up @@ -1671,10 +1687,13 @@ fn do_health_check(chain: &CosmosSdkChain) -> Result<(), Error> {
}

// Check that the chain identifier matches the network name
if !status.node_info.network.as_str().eq(chain_id.as_str()) {
if status.node_info.network.as_str() != chain_id.as_str() {
// Log the error, continue optimistically
error!("/status endpoint from chain id '{}' reports network identifier to be '{}': this is usually a sign of misconfiguration, check your config.toml",
chain_id, status.node_info.network);
error!(
"/status endpoint from chain '{}' reports network identifier to be '{}'. \
This is usually a sign of misconfiguration, please check your config.toml",
chain_id, status.node_info.network
);
}

let version_specs = chain.block_on(fetch_version_specs(&chain.config.id, &chain.grpc_addr))?;
Expand Down
92 changes: 59 additions & 33 deletions relayer/src/chain/cosmos/retry.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use core::time::Duration;
use ibc_proto::google::protobuf::Any;
use std::thread;

use tracing::{debug, error, instrument, warn};

use ibc_proto::google::protobuf::Any;
use tendermint::abci::Code;
use tendermint_rpc::endpoint::broadcast::tx_sync::Response;
use tracing::{debug, error, span, warn, Level};

use crate::chain::cosmos::query::account::refresh_account;
use crate::chain::cosmos::tx::estimate_fee_and_send_tx;
Expand All @@ -13,7 +15,7 @@ use crate::config::types::Memo;
use crate::error::Error;
use crate::keyring::KeyEntry;
use crate::sdk_error::sdk_error_from_tx_sync_error_code;
use crate::telemetry;
use crate::{telemetry, time};

// Delay in milliseconds before retrying in the case of account sequence mismatch.
const ACCOUNT_SEQUENCE_RETRY_DELAY: u64 = 300;
Expand All @@ -33,37 +35,28 @@ const INCORRECT_ACCOUNT_SEQUENCE_ERR: u32 = 32;
///
/// We treat both cases by re-fetching the account sequence number
/// from the full node and retrying once with the new account s.n.
#[instrument(
name = "send_tx_with_account_sequence_retry",
level = "error",
skip_all,
fields(
chain = %config.chain_id,
account.sequence = %account.sequence,
),
)]
pub async fn send_tx_with_account_sequence_retry(
config: &TxConfig,
key_entry: &KeyEntry,
account: &mut Account,
tx_memo: &Memo,
messages: Vec<Any>,
) -> Result<Response, Error> {
crate::time!("send_tx_with_account_sequence_retry");

let _span =
span!(Level::ERROR, "send_tx_with_account_sequence_retry", id = %config.chain_id).entered();

time!("send_tx_with_account_sequence_retry");
telemetry!(msg_num, &config.chain_id, messages.len() as u64);

do_send_tx_with_account_sequence_retry(config, key_entry, account, tx_memo, messages).await
}

async fn refresh_account_and_retry_send_tx_with_account_sequence(
config: &TxConfig,
key_entry: &KeyEntry,
account: &mut Account,
tx_memo: &Memo,
messages: Vec<Any>,
) -> Result<Response, Error> {
// Re-fetch the account s.n.
refresh_account(&config.grpc_address, &key_entry.account, account).await?;
// Retry after delay.
thread::sleep(Duration::from_millis(ACCOUNT_SEQUENCE_RETRY_DELAY));
estimate_fee_and_send_tx(config, key_entry, account, tx_memo, messages.clone()).await
}

async fn do_send_tx_with_account_sequence_retry(
config: &TxConfig,
key_entry: &KeyEntry,
Expand All @@ -77,10 +70,11 @@ async fn do_send_tx_with_account_sequence_retry(
// This can happen when the same account is used by another agent.
Err(ref e) if mismatch_account_sequence_number_error_requires_refresh(e) => {
warn!(
"failed at estimate_gas step mismatching account sequence {}. \
refresh account sequence number and retry once",
e
error = %e,
"failed to estimate gas because of a mismatched account sequence number, \
refreshing account sequence number and retrying once",
);

refresh_account_and_retry_send_tx_with_account_sequence(
config, key_entry, account, tx_memo, messages,
)
Expand All @@ -90,10 +84,11 @@ async fn do_send_tx_with_account_sequence_retry(
// Gas estimation succeeded but broadcast_tx_sync failed with a retry-able error.
Ok(ref response) if response.code == Code::Err(INCORRECT_ACCOUNT_SEQUENCE_ERR) => {
warn!(
"failed at broadcast_tx_sync step with incorrect account sequence {:?}. \
refresh account sequence number and retry once",
response
?response,
"failed to broadcast tx because of a mismatched account sequence number, \
refreshing account sequence number and retrying once"
);

refresh_account_and_retry_send_tx_with_account_sequence(
config, key_entry, account, tx_memo, messages,
)
Expand All @@ -103,12 +98,24 @@ async fn do_send_tx_with_account_sequence_retry(
// Gas estimation succeeded and broadcast_tx_sync was either successful or has failed with
// an unrecoverable error.
Ok(response) => {
debug!("gas estimation succeeded");

// Gas estimation and broadcast_tx_sync were successful.
match response.code {
Code::Ok => {
let old_account_sequence = account.sequence;

// Increase account s.n.
debug!("broadcast_tx_sync: {:?}", response);
account.sequence.increment_mut();

debug!(
?response,
account.sequence.old = %old_account_sequence,
account.sequence.new = %account.sequence,
"tx was successfully broadcasted, \
increasing account sequence number"
);

Ok(response)
}

Expand All @@ -117,20 +124,39 @@ async fn do_send_tx_with_account_sequence_retry(
// Do not increase the account s.n. since CheckTx step of broadcast_tx_sync has failed.
// Log the error.
error!(
"broadcast_tx_sync: {:?}: diagnostic: {:?}",
response,
sdk_error_from_tx_sync_error_code(code)
?response,
diagnostic = ?sdk_error_from_tx_sync_error_code(code),
"failed to broadcast tx with unrecoverable error"
);

Ok(response)
}
}
}

// Gas estimation failure or other unrecoverable error, propagate.
Err(e) => Err(e),
Err(e) => {
error!(error = %e, "gas estimation failed or encountered another unrecoverable error");

Err(e)
}
}
}

async fn refresh_account_and_retry_send_tx_with_account_sequence(
config: &TxConfig,
key_entry: &KeyEntry,
account: &mut Account,
tx_memo: &Memo,
messages: Vec<Any>,
) -> Result<Response, Error> {
// Re-fetch the account s.n.
refresh_account(&config.grpc_address, &key_entry.account, account).await?;
// Retry after delay.
thread::sleep(Duration::from_millis(ACCOUNT_SEQUENCE_RETRY_DELAY));
estimate_fee_and_send_tx(config, key_entry, account, tx_memo, messages.clone()).await
}

/// Determine whether the given error yielded by `tx_simulate`
/// indicates hat the current sequence number cached in Hermes
/// is smaller than the full node's version of the s.n. and therefore
Expand Down
29 changes: 27 additions & 2 deletions relayer/src/chain/cosmos/types/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,46 @@ use ibc_proto::cosmos::auth::v1beta1::BaseAccount;
/// More fields may be added later.
#[derive(Clone, Debug, PartialEq)]
pub struct Account {
// pub address: String,
// pub pub_key: Option<prost_types::Any>,
pub address: AccountAddress,
pub number: AccountNumber,
pub sequence: AccountSequence,
// pub pub_key: Option<prost_types::Any>,
}

impl From<BaseAccount> for Account {
fn from(value: BaseAccount) -> Self {
Self {
address: AccountAddress::new(value.address),
number: AccountNumber::new(value.account_number),
sequence: AccountSequence::new(value.sequence),
}
}
}

/// Newtype for account address
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct AccountAddress(String);

impl AccountAddress {
pub fn new(string: String) -> Self {
Self(string)
}

pub fn as_str(&self) -> &str {
&self.0
}

pub fn into_string(&self) -> String {
self.0
}
}

impl fmt::Display for AccountAddress {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}

/// Newtype for account numbers
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct AccountNumber(u64);
Expand Down

0 comments on commit b87e949

Please sign in to comment.