diff --git a/.changelog/unreleased/improvements/ibc-relayer/1491-structured-logs.md b/.changelog/unreleased/improvements/ibc-relayer/1491-structured-logs.md new file mode 100644 index 0000000000..1f1ae711e9 --- /dev/null +++ b/.changelog/unreleased/improvements/ibc-relayer/1491-structured-logs.md @@ -0,0 +1,2 @@ +- More structural logging in relayer, using tracing spans and key-value pairs. + ([#1491](https://github.com/informalsystems/ibc-rs/pull/1491)) diff --git a/Cargo.lock b/Cargo.lock index b666923d45..6290b9e699 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1384,6 +1384,7 @@ dependencies = [ "ibc-telemetry", "itertools", "k256", + "nanoid", "num-bigint", "num-rational", "prost", @@ -1778,6 +1779,15 @@ dependencies = [ "twoway", ] +[[package]] +name = "nanoid" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ffa00dec017b5b1a8b7cf5e2c008bfda1aa7e0697ac1508b491fdf2622fb4d8" +dependencies = [ + "rand 0.8.4", +] + [[package]] name = "nom" version = "7.1.0" diff --git a/config.toml b/config.toml index 655aec6dd3..681ed838f7 100644 --- a/config.toml +++ b/config.toml @@ -121,6 +121,8 @@ websocket_addr = 'ws://127.0.0.1:26657/websocket' # Specify the maximum amount of time (duration) that the RPC requests should # take before timing out. Default: 10s (10 seconds) +# Note: Hermes uses this parameter _only_ in `start` mode; for all other CLIs, +# Hermes uses a large preconfigured timeout (on the order of minutes). rpc_timeout = '10s' # Specify the prefix used by the chain. Required diff --git a/relayer-cli/src/commands.rs b/relayer-cli/src/commands.rs index ad91af9534..b695788486 100644 --- a/relayer-cli/src/commands.rs +++ b/relayer-cli/src/commands.rs @@ -5,6 +5,7 @@ //! See the `impl Configurable` below for how to specify the path to the //! application's configuration file. +use core::time::Duration; use std::path::PathBuf; use abscissa_core::{config::Override, Clap, Command, Configurable, FrameworkError, Runnable}; @@ -138,6 +139,14 @@ impl Configurable for CliCmd { ccfg.memo_prefix.apply_suffix(&suffix); } + // For all commands except for `start` Hermes retries + // for a prolonged period of time. + if !matches!(self, CliCmd::Start(_)) { + for c in config.chains.iter_mut() { + c.rpc_timeout = Duration::from_secs(120); + } + } + match self { CliCmd::Tx(cmd) => cmd.override_config(config), // CliCmd::Help(cmd) => cmd.override_config(config), @@ -146,7 +155,6 @@ impl Configurable for CliCmd { // CliCmd::Update(cmd) => cmd.override_config(config), // CliCmd::Upgrade(cmd) => cmd.override_config(config), // CliCmd::Start(cmd) => cmd.override_config(config), - // CliCmd::StartMulti(cmd) => cmd.override_config(config), // CliCmd::Query(cmd) => cmd.override_config(config), // CliCmd::Listen(cmd) => cmd.override_config(config), // CliCmd::Misbehaviour(cmd) => cmd.override_config(config), diff --git a/relayer/Cargo.toml b/relayer/Cargo.toml index 95de574e69..dc95ec4e7e 100644 --- a/relayer/Cargo.toml +++ b/relayer/Cargo.toml @@ -60,6 +60,7 @@ anyhow = "1.0" semver = "1.0" uint = "0.9" humantime = "2.1.0" +nanoid = "0.4.0" [dependencies.num-bigint] version = "0.4" diff --git a/relayer/src/chain.rs b/relayer/src/chain.rs index 5ad9f9aa89..635a171bd7 100644 --- a/relayer/src/chain.rs +++ b/relayer/src/chain.rs @@ -1,5 +1,4 @@ use alloc::sync::Arc; -use prost_types::Any; use tendermint::block::Height; use tokio::runtime::Runtime as TokioRuntime; @@ -46,10 +45,13 @@ use crate::keyring::{KeyEntry, KeyRing}; use crate::light_client::LightClient; use crate::{config::ChainConfig, event::monitor::EventReceiver}; +use self::tx::TrackedMsgs; + pub mod cosmos; pub mod counterparty; pub mod handle; pub mod runtime; +pub mod tx; #[cfg(test)] pub mod mock; @@ -125,14 +127,14 @@ pub trait ChainEndpoint: Sized { // synchronously wait for it to be committed. fn send_messages_and_wait_commit( &mut self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error>; /// Sends one or more transactions with `msgs` to chain. /// Non-blocking alternative to `send_messages_and_wait_commit` interface. fn send_messages_and_wait_check_tx( &mut self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error>; fn get_signer(&mut self) -> Result; diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 85eb03fcc4..a6e87e5118 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -29,7 +29,7 @@ use tendermint_rpc::{ }; use tokio::runtime::Runtime as TokioRuntime; use tonic::codegen::http::Uri; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, span, trace, warn, Level}; use ibc::clients::ics07_tendermint::client_state::{AllowUpdate, ClientState}; use ibc::clients::ics07_tendermint::consensus_state::ConsensusState as TMConsensusState; @@ -94,7 +94,7 @@ use crate::{ sdk_error::sdk_error_from_tx_sync_error_code, }; -use super::{ChainEndpoint, HealthCheck}; +use super::{tx::TrackedMsgs, ChainEndpoint, HealthCheck}; use ibc::core::ics24_host::path::{ AcksPath, ChannelEndsPath, ClientConsensusStatePath, ClientStatePath, CommitmentsPath, ConnectionsPath, ReceiptsPath, SeqRecvsPath, @@ -303,8 +303,7 @@ impl CosmosSdkChain { account_seq: u64, ) -> Result { debug!( - "[{}] send_tx: sending {} messages using account sequence {}", - self.id(), + "sending {} messages using account sequence {}", proto_msgs.len(), account_seq, ); @@ -312,11 +311,7 @@ impl CosmosSdkChain { let signer_info = self.signer(account_seq)?; let max_fee = self.max_fee(); - debug!( - "[{}] send_tx: max fee, for use in tx simulation: {}", - self.id(), - PrettyFee(&max_fee) - ); + debug!("max fee, for use in tx simulation: {}", PrettyFee(&max_fee)); let (body, body_buf) = tx_body_and_bytes(proto_msgs, self.tx_memo())?; @@ -345,8 +340,7 @@ impl CosmosSdkChain { let adjusted_fee = self.fee_with_gas(estimated_gas); debug!( - "[{}] send_tx: using {} gas, fee {}", - self.id(), + "using {} gas, fee {}", estimated_gas, PrettyFee(&adjusted_fee) ); @@ -404,7 +398,7 @@ impl CosmosSdkChain { // and refresh the s.n., to allow proceeding to the other transactions. A separate // retry at the worker-level will handle retrying. Err(e) if mismatching_account_sequence_number(&e) => { - warn!("send_tx failed at estimate_gas step mismatching account sequence: dropping the tx & refreshing account sequence number"); + warn!("failed at estimate_gas step mismatching account sequence: dropping the tx & refreshing account sequence number"); self.refresh_account()?; // Note: propagating error here can lead to bug & dropped packets: // https://github.com/informalsystems/ibc-rs/issues/1153 @@ -416,7 +410,7 @@ impl CosmosSdkChain { Ok(response) if response.code == Code::Err(INCORRECT_ACCOUNT_SEQUENCE_ERR) => { if retry_counter < retry_strategy::MAX_ACCOUNT_SEQUENCE_RETRY { let retry_counter = retry_counter + 1; - warn!("send_tx failed at broadcast step with incorrect account sequence. retrying ({}/{})", + warn!("failed at broadcast step with incorrect account sequence. retrying ({}/{})", retry_counter, retry_strategy::MAX_ACCOUNT_SEQUENCE_RETRY); // Backoff & re-fetch the account s.n. let backoff = (retry_counter as u64) @@ -431,7 +425,7 @@ impl CosmosSdkChain { // we ignore the error and return the original response to downstream. // We do not return an error here, because the current convention // let the caller handle error responses separately. - error!("failed to send_tx due to account sequence errors. the relayer wallet may be used elsewhere concurrently."); + error!("failed due to account sequence errors. the relayer wallet may be used elsewhere concurrently."); Ok(response) } } @@ -442,7 +436,7 @@ impl CosmosSdkChain { // Complete success. match response.code { tendermint::abci::Code::Ok => { - debug!("[{}] send_tx: broadcast_tx_sync: {:?}", self.id(), response); + debug!("broadcast_tx_sync: {:?}", response); self.incr_account_sequence(); Ok(response) @@ -452,8 +446,7 @@ impl CosmosSdkChain { // Avoid increasing the account s.n. if CheckTx failed // Log the error error!( - "[{}] send_tx: broadcast_tx_sync: {:?}: diagnostic: {:?}", - self.id(), + "broadcast_tx_sync: {:?}: diagnostic: {:?}", response, sdk_error_from_tx_sync_error_code(code) ); @@ -470,6 +463,8 @@ impl CosmosSdkChain { fn send_tx(&mut self, proto_msgs: Vec) -> Result { crate::time!("send_tx"); + let _span = span!(Level::ERROR, "send_tx", id = %self.id()).entered(); + self.send_tx_with_account_sequence_retry(proto_msgs, 0) } @@ -483,12 +478,12 @@ impl CosmosSdkChain { /// In this case we use the `default_gas` param. fn estimate_gas(&mut self, tx: Tx) -> Result { let simulated_gas = self.send_tx_simulate(tx).map(|sr| sr.gas_info); + let _span = span!(Level::ERROR, "estimate_gas").entered(); match simulated_gas { Ok(Some(gas_info)) => { debug!( - "[{}] estimate_gas: tx simulation successful, gas amount used: {:?}", - self.id(), + "tx simulation successful, gas amount used: {:?}", gas_info.gas_used ); @@ -497,8 +492,7 @@ impl CosmosSdkChain { Ok(None) => { warn!( - "[{}] estimate_gas: tx simulation successful but no gas amount used was returned, falling back on default gas: {}", - self.id(), + "tx simulation successful but no gas amount used was returned, falling back on default gas: {}", self.default_gas() ); @@ -510,8 +504,7 @@ impl CosmosSdkChain { // See `can_recover_from_simulation_failure` for more info. Err(e) if can_recover_from_simulation_failure(&e) => { warn!( - "[{}] estimate_gas: failed to simulate tx, falling back on default gas because the error is potentially recoverable: {}", - self.id(), + "failed to simulate tx, falling back on default gas because the error is potentially recoverable: {}", e.detail() ); @@ -520,8 +513,7 @@ impl CosmosSdkChain { Err(e) => { error!( - "[{}] estimate_gas: failed to simulate tx. propagating error to caller: {}", - self.id(), + "failed to simulate tx. propagating error to caller: {}", e.detail() ); // Propagate the error, the retrying mechanism at caller may catch & retry. @@ -705,8 +697,7 @@ impl CosmosSdkChain { info!( sequence = %account.sequence, number = %account.account_number, - "[{}] refresh: retrieved account", - self.id() + "refresh: retrieved account", ); self.account = Some(account); @@ -1059,13 +1050,14 @@ impl ChainEndpoint for CosmosSdkChain { /// msgs in a Tx until any of the max size, max num msgs, max fee are exceeded. fn send_messages_and_wait_commit( &mut self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { crate::time!("send_messages_and_wait_commit"); - debug!( - "send_messages_and_wait_commit with {} messages", - proto_msgs.len() - ); + + let _span = + span!(Level::DEBUG, "send_tx_commit", id = %tracked_msgs.tracking_id()).entered(); + + let proto_msgs = tracked_msgs.messages(); if proto_msgs.is_empty() { return Ok(vec![]); @@ -1116,13 +1108,14 @@ impl ChainEndpoint for CosmosSdkChain { fn send_messages_and_wait_check_tx( &mut self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { crate::time!("send_messages_and_wait_check_tx"); - debug!( - "send_messages_and_wait_check_tx with {} messages", - proto_msgs.len() - ); + + let span = span!(Level::DEBUG, "send_tx_check", id = %tracked_msgs.tracking_id()); + let _enter = span.enter(); + + let proto_msgs = tracked_msgs.messages(); if proto_msgs.is_empty() { return Ok(vec![]); diff --git a/relayer/src/chain/handle.rs b/relayer/src/chain/handle.rs index 8a6e8d4064..5a125018f3 100644 --- a/relayer/src/chain/handle.rs +++ b/relayer/src/chain/handle.rs @@ -54,7 +54,7 @@ use crate::{ keyring::KeyEntry, }; -use super::HealthCheck; +use super::{tx::TrackedMsgs, HealthCheck}; mod prod; pub mod requests; @@ -111,12 +111,12 @@ pub enum ChainRequest { }, SendMessagesAndWaitCommit { - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, reply_to: ReplyTo>, }, SendMessagesAndWaitCheckTx { - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, reply_to: ReplyTo>, }, @@ -350,7 +350,7 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug + 'static { /// and return the list of events emitted by the chain after the transaction was committed. fn send_messages_and_wait_commit( &self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error>; /// Submit messages asynchronously. @@ -359,7 +359,7 @@ pub trait ChainHandle: Clone + Send + Sync + Serialize + Debug + 'static { /// returns a set of transaction hashes. fn send_messages_and_wait_check_tx( &self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error>; fn get_signer(&self) -> Result; diff --git a/relayer/src/chain/handle/prod.rs b/relayer/src/chain/handle/prod.rs index 9ed354042a..15c8149770 100644 --- a/relayer/src/chain/handle/prod.rs +++ b/relayer/src/chain/handle/prod.rs @@ -37,6 +37,7 @@ use ibc_proto::ibc::core::commitment::v1::MerkleProof; use ibc_proto::ibc::core::connection::v1::QueryClientConnectionsRequest; use ibc_proto::ibc::core::connection::v1::QueryConnectionsRequest; +use crate::chain::tx::TrackedMsgs; use crate::{ chain::handle::requests::AppVersion, chain::StatusResponse, config::ChainConfig, connection::ConnectionMsgType, error::Error, keyring::KeyEntry, @@ -98,20 +99,20 @@ impl ChainHandle for ProdChainHandle { fn send_messages_and_wait_commit( &self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { self.send(|reply_to| ChainRequest::SendMessagesAndWaitCommit { - proto_msgs, + tracked_msgs, reply_to, }) } fn send_messages_and_wait_check_tx( &self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { self.send(|reply_to| ChainRequest::SendMessagesAndWaitCheckTx { - proto_msgs, + tracked_msgs, reply_to, }) } diff --git a/relayer/src/chain/mock.rs b/relayer/src/chain/mock.rs index a6702e0db2..8183e49b98 100644 --- a/relayer/src/chain/mock.rs +++ b/relayer/src/chain/mock.rs @@ -3,7 +3,6 @@ use core::ops::Add; use core::time::Duration; use crossbeam_channel as channel; -use prost_types::Any; use tendermint_testgen::light_block::TmLightBlock; use tokio::runtime::Runtime; @@ -50,6 +49,7 @@ use crate::keyring::{KeyEntry, KeyRing}; use crate::light_client::Verified; use crate::light_client::{mock::LightClient as MockLightClient, LightClient}; +use super::tx::TrackedMsgs; use super::HealthCheck; /// The representation of a mocked chain as the relayer sees it. @@ -129,17 +129,20 @@ impl ChainEndpoint for MockChain { fn send_messages_and_wait_commit( &mut self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { // Use the ICS18Context interface to submit the set of messages. - let events = self.context.send(proto_msgs).map_err(Error::ics18)?; + let events = self + .context + .send(tracked_msgs.into()) + .map_err(Error::ics18)?; Ok(events) } fn send_messages_and_wait_check_tx( &mut self, - _proto_msgs: Vec, + _tracked_msgs: TrackedMsgs, ) -> Result, Error> { todo!() } diff --git a/relayer/src/chain/runtime.rs b/relayer/src/chain/runtime.rs index 2ef8a3a89b..828ebee891 100644 --- a/relayer/src/chain/runtime.rs +++ b/relayer/src/chain/runtime.rs @@ -60,6 +60,7 @@ use crate::{ use super::{ handle::{ChainHandle, ChainRequest, ReplyTo, Subscription}, + tx::TrackedMsgs, ChainEndpoint, HealthCheck, }; @@ -251,12 +252,12 @@ where self.subscribe(reply_to)? }, - Ok(ChainRequest::SendMessagesAndWaitCommit { proto_msgs, reply_to }) => { - self.send_messages_and_wait_commit(proto_msgs, reply_to)? + Ok(ChainRequest::SendMessagesAndWaitCommit { tracked_msgs, reply_to }) => { + self.send_messages_and_wait_commit(tracked_msgs, reply_to)? }, - Ok(ChainRequest::SendMessagesAndWaitCheckTx { proto_msgs, reply_to }) => { - self.send_messages_and_wait_check_tx(proto_msgs, reply_to)? + Ok(ChainRequest::SendMessagesAndWaitCheckTx { tracked_msgs, reply_to }) => { + self.send_messages_and_wait_check_tx(tracked_msgs, reply_to)? }, Ok(ChainRequest::Signer { reply_to }) => { @@ -446,19 +447,19 @@ where fn send_messages_and_wait_commit( &mut self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, reply_to: ReplyTo>, ) -> Result<(), Error> { - let result = self.chain.send_messages_and_wait_commit(proto_msgs); + let result = self.chain.send_messages_and_wait_commit(tracked_msgs); reply_to.send(result).map_err(Error::send) } fn send_messages_and_wait_check_tx( &mut self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, reply_to: ReplyTo>, ) -> Result<(), Error> { - let result = self.chain.send_messages_and_wait_check_tx(proto_msgs); + let result = self.chain.send_messages_and_wait_check_tx(tracked_msgs); reply_to.send(result).map_err(Error::send) } diff --git a/relayer/src/chain/tx.rs b/relayer/src/chain/tx.rs new file mode 100644 index 0000000000..3947a20019 --- /dev/null +++ b/relayer/src/chain/tx.rs @@ -0,0 +1,43 @@ +use prost_types::Any; + +/// A wrapper over a vector of proto-encoded messages +/// (`Vec`), which has an associated tracking +/// number. +/// +/// A [`TrackedMsgs`] correlates with a [`TrackedEvents`] +/// by sharing the same `tracking_id`. +#[derive(Debug, Clone)] +pub struct TrackedMsgs { + msgs: Vec, + tracking_id: String, +} + +impl TrackedMsgs { + pub fn new(msgs: Vec, tid: impl Into) -> Self { + Self { + msgs, + tracking_id: tid.into(), + } + } + + pub fn new_single(msg: Any, tid: impl Into) -> Self { + Self { + msgs: vec![msg], + tracking_id: tid.into(), + } + } + + pub fn messages(&self) -> &Vec { + &self.msgs + } + + pub fn tracking_id(&self) -> &str { + &self.tracking_id + } +} + +impl From for Vec { + fn from(tm: TrackedMsgs) -> Vec { + tm.msgs + } +} diff --git a/relayer/src/channel.rs b/relayer/src/channel.rs index fb101f51f9..a29052a28c 100644 --- a/relayer/src/channel.rs +++ b/relayer/src/channel.rs @@ -23,6 +23,7 @@ use ibc_proto::ibc::core::channel::v1::QueryConnectionChannelsRequest; use crate::chain::counterparty::{channel_connection_client, channel_state_on_destination}; use crate::chain::handle::ChainHandle; +use crate::chain::tx::TrackedMsgs; use crate::channel::version::ResolveContext; use crate::connection::Connection; use crate::foreign_client::{ForeignClient, HasExpiredOrFrozenError}; @@ -739,9 +740,11 @@ impl Channel { pub fn build_chan_open_init_and_send(&self) -> Result { let dst_msgs = self.build_chan_open_init()?; + let tm = TrackedMsgs::new(dst_msgs, "ChannelOpenInit"); + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?; // Find the relevant event for channel open init @@ -903,9 +906,11 @@ impl Channel { pub fn build_chan_open_try_and_send(&self) -> Result { let dst_msgs = self.build_chan_open_try()?; + let tm = TrackedMsgs::new(dst_msgs, "ChannelOpenTry"); + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?; // Find the relevant event for channel open try @@ -988,9 +993,11 @@ impl Channel { ) -> Result { let dst_msgs = channel.build_chan_open_ack()?; + let tm = TrackedMsgs::new(dst_msgs, "ChannelOpenAck"); + let events = channel .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ChannelError::submit(channel.dst_chain().id(), e))?; // Find the relevant event for channel open ack @@ -1084,9 +1091,10 @@ impl Channel { ) -> Result { let dst_msgs = channel.build_chan_open_confirm()?; + let tm = TrackedMsgs::new(dst_msgs, "ChannelOpenConfirm"); let events = channel .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ChannelError::submit(channel.dst_chain().id(), e))?; // Find the relevant event for channel open confirm @@ -1147,9 +1155,11 @@ impl Channel { pub fn build_chan_close_init_and_send(&self) -> Result { let dst_msgs = self.build_chan_close_init()?; + let tm = TrackedMsgs::new(dst_msgs, "ChannelCloseInit"); + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?; // Find the relevant event for channel close init @@ -1226,9 +1236,11 @@ impl Channel { pub fn build_chan_close_confirm_and_send(&self) -> Result { let dst_msgs = self.build_chan_close_confirm()?; + let tm = TrackedMsgs::new(dst_msgs, "ChannelCloseConfirm"); + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ChannelError::submit(self.dst_chain().id(), e))?; // Find the relevant event for channel close confirm diff --git a/relayer/src/connection.rs b/relayer/src/connection.rs index 1f05f79d86..030cf27748 100644 --- a/relayer/src/connection.rs +++ b/relayer/src/connection.rs @@ -1,6 +1,7 @@ use core::time::Duration; use crate::chain::counterparty::connection_state_on_destination; +use crate::chain::tx::TrackedMsgs; use crate::util::retry::RetryResult; use flex_error::define_error; use ibc_proto::ibc::core::connection::v1::QueryConnectionsRequest; @@ -803,9 +804,11 @@ impl Connection { pub fn build_conn_init_and_send(&self) -> Result { let dst_msgs = self.build_conn_init()?; + let tm = TrackedMsgs::new(dst_msgs, "ConnectionOpenInit"); + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?; // Find the relevant event for connection init @@ -860,8 +863,10 @@ impl Connection { .query_latest_height() .map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?; let client_msgs = self.build_update_client_on_src(src_client_target_height)?; + + let tm = TrackedMsgs::new(client_msgs, "update client on source for ConnectionOpenTry"); self.src_chain() - .send_messages_and_wait_commit(client_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ConnectionError::submit(self.src_chain().id(), e))?; let query_height = self @@ -930,9 +935,11 @@ impl Connection { pub fn build_conn_try_and_send(&self) -> Result { let dst_msgs = self.build_conn_try()?; + let tm = TrackedMsgs::new(dst_msgs, "ConnectionOpenTry"); + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?; // Find the relevant event for connection try transaction @@ -977,8 +984,11 @@ impl Connection { .query_latest_height() .map_err(|e| ConnectionError::chain_query(self.dst_chain().id(), e))?; let client_msgs = self.build_update_client_on_src(src_client_target_height)?; + + let tm = TrackedMsgs::new(client_msgs, "update client on source for ConnectionOpenAck"); + self.src_chain() - .send_messages_and_wait_commit(client_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ConnectionError::submit(self.src_chain().id(), e))?; let query_height = self @@ -1021,9 +1031,11 @@ impl Connection { pub fn build_conn_ack_and_send(&self) -> Result { let dst_msgs = self.build_conn_ack()?; + let tm = TrackedMsgs::new(dst_msgs, "ConnectionOpenAck"); + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?; // Find the relevant event for connection ack @@ -1098,9 +1110,11 @@ impl Connection { pub fn build_conn_confirm_and_send(&self) -> Result { let dst_msgs = self.build_conn_confirm()?; + let tm = TrackedMsgs::new(dst_msgs, "ConnectionOpenConfirm"); + let events = self .dst_chain() - .send_messages_and_wait_commit(dst_msgs) + .send_messages_and_wait_commit(tm) .map_err(|e| ConnectionError::submit(self.dst_chain().id(), e))?; // Find the relevant event for connection confirm diff --git a/relayer/src/event/rpc.rs b/relayer/src/event/rpc.rs index 9e1dc5bcd9..e3f3ac7640 100644 --- a/relayer/src/event/rpc.rs +++ b/relayer/src/event/rpc.rs @@ -58,7 +58,19 @@ pub fn get_all_events( if *query == queries::ibc_channel().to_string() { if let Some(mut chan_event) = ChannelEvents::try_from_tx(abci_event) { chan_event.set_height(height); - tracing::trace!("extracted ibc_channel event {:?}", chan_event); + let _span = tracing::trace_span!("ibc_channel event").entered(); + tracing::trace!("extracted {:?}", chan_event); + if matches!(chan_event, IbcEvent::SendPacket(_)) { + // Should be the same as the hash of tx_result.tx? + if let Some(hash) = result + .events + .as_ref() + .and_then(|events| events.get("tx.hash")) + .and_then(|values| values.get(0)) + { + tracing::trace!(event = "SendPacket", "tx hash: {}", hash); + } + } vals.push((height, chan_event)); } } diff --git a/relayer/src/foreign_client.rs b/relayer/src/foreign_client.rs index 12d61bc7d2..3c4696c27b 100644 --- a/relayer/src/foreign_client.rs +++ b/relayer/src/foreign_client.rs @@ -6,6 +6,7 @@ use itertools::Itertools; use prost_types::Any; use tracing::{debug, error, info, trace, warn}; +use crate::chain::tx::TrackedMsgs; use crate::error::Error as RelayerError; use flex_error::define_error; use ibc::core::ics02_client::client_consensus::{ @@ -433,9 +434,11 @@ impl ForeignClient ForeignClient ForeignClient ForeignClient Link { /// Implements the `packet-recv` CLI pub fn build_and_send_recv_packet_messages(&mut self) -> Result, LinkError> { + let _span = error_span!( + "PacketRecvCmd", + src_chain = %self.a_to_b.src_chain().id(), + src_port = %self.a_to_b.src_port_id(), + src_channel = %self.a_to_b.src_channel_id(), + dst_chain = %self.a_to_b.dst_chain().id(), + ) + .entered(); + self.a_to_b.build_recv_packet_and_timeout_msgs(None)?; let mut results = vec![]; @@ -180,6 +190,15 @@ impl Link { /// Implements the `packet-ack` CLI pub fn build_and_send_ack_packet_messages(&mut self) -> Result, LinkError> { + let _span = error_span!( + "PacketAckCmd", + src_chain = %self.a_to_b.src_chain().id(), + src_port = %self.a_to_b.src_port_id(), + src_channel = %self.a_to_b.src_channel_id(), + dst_chain = %self.a_to_b.dst_chain().id(), + ) + .entered(); + self.a_to_b.build_packet_ack_msgs(None)?; let mut results = vec![]; diff --git a/relayer/src/link/operational_data.rs b/relayer/src/link/operational_data.rs index ce0587e464..8682433c66 100644 --- a/relayer/src/link/operational_data.rs +++ b/relayer/src/link/operational_data.rs @@ -1,14 +1,17 @@ +use alloc::borrow::Cow; use core::fmt; use core::iter; use std::time::Instant; +use nanoid::nanoid; use prost_types::Any; -use tracing::{info, warn}; +use tracing::{debug, info}; use ibc::events::IbcEvent; use ibc::Height; use crate::chain::handle::ChainHandle; +use crate::chain::tx::TrackedMsgs; use crate::link::error::LinkError; use crate::link::RelayPath; @@ -27,8 +30,47 @@ impl fmt::Display for OperationalDataTarget { } } -/// A packet messages that is prepared for sending to a chain, but has not been sent yet. -/// Comprises both the proto-encoded packet message, alongside the event which generated it. +/// A set of [`IbcEvent`]s that have an associated +/// tracking number to ensure better observability. +pub struct TrackedEvents { + list: Vec, + tracking_id: String, +} + +impl TrackedEvents { + pub fn is_empty(&self) -> bool { + self.list.is_empty() + } + + pub fn events(&self) -> &Vec { + &self.list + } + + pub fn tracking_id(&self) -> &str { + &self.tracking_id + } + + pub fn set_height(&mut self, height: Height) { + for event in self.list.iter_mut() { + event.set_height(height); + } + } +} + +impl From> for TrackedEvents { + fn from(list: Vec) -> Self { + Self { + list, + tracking_id: nanoid!(10), + } + } +} + +/// A packet message that is prepared for sending +/// to a chain, but has not been sent yet. +/// +/// Comprises the proto-encoded packet message, +/// alongside the event which generated it. #[derive(Clone)] pub struct TransitMessage { pub event: IbcEvent, @@ -50,40 +92,61 @@ pub struct OperationalData { /// Stores the time when the clients on the target chain has been updated, i.e., when this data /// was scheduled. Necessary for packet delays. pub scheduled_time: Instant, + pub tracking_id: String, } impl OperationalData { - pub fn new(proofs_height: Height, target: OperationalDataTarget) -> Self { + pub fn new( + proofs_height: Height, + target: OperationalDataTarget, + tracking_id: impl Into, + ) -> Self { OperationalData { proofs_height, batch: vec![], target, scheduled_time: Instant::now(), + tracking_id: tracking_id.into(), + } + } + + pub fn push(&mut self, msg: TransitMessage) { + self.batch.push(msg) + } + + /// Returns displayable information on the operation's data. + pub fn info(&self) -> OperationalInfo<'_> { + OperationalInfo { + tracking_id: Cow::Borrowed(&self.tracking_id), + target: self.target, + proofs_height: self.proofs_height, + batch_len: self.batch.len(), } } - pub fn events(&self) -> Vec { - self.batch.iter().map(|gm| gm.event.clone()).collect() + /// Transforms `self` into the list of events accompanied with the tracking ID. + pub fn into_events(self) -> TrackedEvents { + let list = self.batch.into_iter().map(|gm| gm.event).collect(); + TrackedEvents { + list, + tracking_id: self.tracking_id, + } } - /// Returns all the messages in this operational data, plus prepending the client update message + /// Returns all the messages in this operational + /// data, plus prepending the client update message /// if necessary. pub fn assemble_msgs( &self, relay_path: &RelayPath, - ) -> Result, LinkError> { - if self.batch.is_empty() { - warn!("assemble_msgs() method call on an empty OperationalData!"); - return Ok(vec![]); - } - + ) -> Result { // For zero delay we prepend the client update msgs. let client_update_msg = if relay_path.zero_delay() { let update_height = self.proofs_height.increment(); - info!( - "[{}] prepending {} client update @ height {}", - relay_path, self.target, update_height + debug!( + "prepending {} client update at height {}", + self.target, update_height ); // Fetch the client update message. Vector may be empty if the client already has the header @@ -109,24 +172,55 @@ impl OperationalData { None => self.batch.iter().map(|gm| gm.msg.clone()).collect(), }; - info!( - "[{}] assembled batch of {} message(s)", - relay_path, - msgs.len() - ); + let tm = TrackedMsgs::new(msgs, &self.tracking_id); + + info!("assembled batch of {} message(s)", tm.messages().len()); + + Ok(tm) + } +} + +/// A lightweight informational data structure that can be extracted +/// out of [`OperationalData`] for e.g. logging purposes. +pub struct OperationalInfo<'a> { + tracking_id: Cow<'a, str>, + target: OperationalDataTarget, + proofs_height: Height, + batch_len: usize, +} + +impl<'a> OperationalInfo<'a> { + pub fn target(&self) -> OperationalDataTarget { + self.target + } + + /// Returns the length of the assembled batch of in-transit messages. + pub fn batch_len(&self) -> usize { + self.batch_len + } - Ok(msgs) + pub fn into_owned(self) -> OperationalInfo<'static> { + let Self { + tracking_id, + target, + proofs_height, + batch_len, + } = self; + OperationalInfo { + tracking_id: tracking_id.into_owned().into(), + target, + proofs_height, + batch_len, + } } } -impl fmt::Display for OperationalData { +impl<'a> fmt::Display for OperationalInfo<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "Op.Data [->{} @{}; {} event(s) & msg(s) in batch]", - self.target, - self.proofs_height, - self.batch.len(), + "{} ->{} @{}; len={}", + self.tracking_id, self.target, self.proofs_height, self.batch_len, ) } } diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index 324270190c..cb6df819dd 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1,13 +1,12 @@ use alloc::collections::BTreeMap as HashMap; use alloc::collections::VecDeque; -use core::fmt; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Instant; use itertools::Itertools; use prost_types::Any; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, span, trace, warn, Level}; use ibc::{ core::{ @@ -38,13 +37,16 @@ use crate::chain::counterparty::{ unreceived_acknowledgements_sequences, unreceived_packets_sequences, }; use crate::chain::handle::ChainHandle; +use crate::chain::tx::TrackedMsgs; use crate::chain::StatusResponse; use crate::channel::error::ChannelError; use crate::channel::Channel; use crate::event::monitor::EventBatch; use crate::foreign_client::{ForeignClient, ForeignClientError}; use crate::link::error::{self, LinkError}; -use crate::link::operational_data::{OperationalData, OperationalDataTarget, TransitMessage}; +use crate::link::operational_data::{ + OperationalData, OperationalDataTarget, TrackedEvents, TransitMessage, +}; use crate::link::pending::PendingTxs; use crate::link::relay_sender::{AsyncReply, SubmitReply}; use crate::link::relay_summary::RelaySummary; @@ -246,7 +248,7 @@ impl RelayPath { // Determines if the events received are relevant and should be processed. // Only events for a port/channel matching one of the channel ends should be processed. - fn filter_relaying_events(&self, events: Vec) -> Vec { + fn filter_relaying_events(&self, events: Vec) -> TrackedEvents { let src_channel_id = self.src_channel_id(); let mut result = vec![]; @@ -284,7 +286,9 @@ impl RelayPath { _ => {} } } - result + + // Transform into `TrackedEvents` + result.into() } fn relay_pending_packets(&self, height: Option) -> Result<(), LinkError> { @@ -296,8 +300,8 @@ impl RelayPath { match cleared { Ok(()) => return Ok(()), Err(e) => error!( - "[{}] failed to clear packets, retry {}/{}: {}", - self, i, MAX_RETRIES, e + "failed to clear packets, retry {}/{}: {}", + i, MAX_RETRIES, e ), } } @@ -317,6 +321,9 @@ impl RelayPath { force: bool, ) -> Result<(), LinkError> { if self.should_clear_packets() || force { + let span = span!(Level::DEBUG, "clear", f = ?force); + let _enter = span.enter(); + // Disable further clearing of old packets by default. // Clearing may still happen: upon new blocks, when `force = true`. *self.clear_packets.acquire_write() = false; @@ -325,11 +332,9 @@ impl RelayPath { .map(|h| h.decrement().map_err(|e| LinkError::decrement_height(h, e))) .transpose()?; - info!(height = ?clear_height, "[{}] clearing pending packets", self); - self.relay_pending_packets(clear_height)?; - info!(height = ?clear_height, "[{}] finished scheduling pending packets clearing", self); + debug!(height = ?clear_height, "done scheduling"); } Ok(()) @@ -345,7 +350,7 @@ impl RelayPath { } /// Produces and schedules operational data for this relaying path based on the input events. - fn events_to_operational_data(&self, events: Vec) -> Result<(), LinkError> { + fn events_to_operational_data(&self, events: TrackedEvents) -> Result<(), LinkError> { // Obtain the operational data for the source chain (mostly timeout packets) and for the // destination chain (e.g., receive packet messages). let (src_opt, dst_opt) = self.generate_operational_data(events)?; @@ -370,15 +375,12 @@ impl RelayPath { /// or `MsgTimeout`). fn generate_operational_data( &self, - input: Vec, + events: TrackedEvents, ) -> Result<(Option, Option), LinkError> { - if !input.is_empty() { - info!( - "[{}] generate messages from batch with {} events", - self, - input.len() - ); - } + let span = span!(Level::DEBUG, "generate", id = %events.tracking_id()); + let _enter = span.enter(); + + let input = events.events(); let src_height = match input.get(0) { None => return Ok((None, None)), Some(ev) => ev.height(), @@ -390,17 +392,24 @@ impl RelayPath { .map_err(|e| LinkError::query(self.src_chain().id(), e))?; let dst_latest_height = dst_latest_info.height; // Operational data targeting the source chain (e.g., Timeout packets) - let mut src_od = OperationalData::new(dst_latest_height, OperationalDataTarget::Source); + let mut src_od = OperationalData::new( + dst_latest_height, + OperationalDataTarget::Source, + events.tracking_id(), + ); // Operational data targeting the destination chain (e.g., SendPacket messages) - let mut dst_od = OperationalData::new(src_height, OperationalDataTarget::Destination); + let mut dst_od = OperationalData::new( + src_height, + OperationalDataTarget::Destination, + events.tracking_id(), + ); for event in input { - debug!("[{}] {} => {}", self, self.src_chain().id(), event); + trace!("processing event: {}", event); let (dst_msg, src_msg) = match event { - IbcEvent::CloseInitChannel(_) => ( - Some(self.build_chan_close_confirm_from_event(&event)?), - None, - ), + IbcEvent::CloseInitChannel(_) => { + (Some(self.build_chan_close_confirm_from_event(event)?), None) + } IbcEvent::TimeoutPacket(ref timeout_ev) => { // When a timeout packet for an ordered channel is processed on-chain (src here) // the chain closes the channel but no close init event is emitted, instead @@ -412,17 +421,14 @@ impl RelayPath { .src_channel(timeout_ev.height)? .state_matches(&ChannelState::Closed) { - ( - Some(self.build_chan_close_confirm_from_event(&event)?), - None, - ) + (Some(self.build_chan_close_confirm_from_event(event)?), None) } else { (None, None) } } IbcEvent::SendPacket(ref send_packet_ev) => { if self.send_packet_event_handled(send_packet_ev)? { - debug!("[{}] {} already handled", self, send_packet_ev); + debug!("{} already handled", send_packet_ev); (None, None) } else { self.build_recv_or_timeout_from_send_packet_event( @@ -438,7 +444,7 @@ impl RelayPath { { (None, None) } else if self.write_ack_event_handled(write_ack_ev)? { - debug!("[{}] {} already handled", self, write_ack_ev); + debug!("{} already handled", write_ack_ev); (None, None) } else { (self.build_ack_from_recv_event(write_ack_ev)?, None) @@ -449,13 +455,7 @@ impl RelayPath { // Collect messages to be sent to the destination chain (e.g., RecvPacket) if let Some(msg) = dst_msg { - debug!( - "[{}] {} <= {} from {}", - self, - self.dst_chain().id(), - msg.type_url, - event - ); + debug!("{} from {}", msg.type_url, event); dst_od.batch.push(TransitMessage { event: event.clone(), msg, @@ -467,14 +467,11 @@ impl RelayPath { // For Ordered channels a single timeout event should be sent as this closes the channel. // Otherwise a multi message transaction will fail. if self.unordered_channel() || src_od.batch.is_empty() { - debug!( - "[{}] {} <= {} from {}", - self, - self.src_chain().id(), - msg.type_url, - event - ); - src_od.batch.push(TransitMessage { event, msg }); + debug!("{} from {}", msg.type_url, event); + src_od.batch.push(TransitMessage { + event: event.clone(), + msg, + }); } } } @@ -501,15 +498,13 @@ impl RelayPath { initial_od: OperationalData, ) -> Result { // We will operate on potentially different operational data if the initial one fails. + let _span = span!(Level::INFO, "relay", odata = %initial_od.info()).entered(); + let mut odata = initial_od; for i in 0..MAX_RETRIES { - info!( - "[{}] relay op. data of {} msgs(s) to {} (height {}), delayed by: {:?} [try {}/{}]", - self, - odata.batch.len(), - odata.target, - odata.proofs_height.increment(), + debug!( + "delayed by: {:?} [try {}/{}]", odata.scheduled_time.elapsed(), i + 1, MAX_RETRIES @@ -519,20 +514,15 @@ impl RelayPath { match self.send_from_operational_data::(odata.clone()) { Ok(reply) => { // Done with this op. data - info!("[{}] success", self); + info!("success"); return Ok(reply); } Err(LinkError(error::LinkErrorDetail::Send(e), _)) => { // This error means we could retry - error!("[{}] error {}", self, e.event); + error!("error {}", e.event); if i + 1 == MAX_RETRIES { - error!( - "[{}] {}/{} retries exhausted. giving up", - self, - i + 1, - MAX_RETRIES - ) + error!("{}/{} retries exhausted. giving up", i + 1, MAX_RETRIES) } else { // If we haven't exhausted all retries, regenerate the op. data & retry match self.regenerate_operational_data(odata.clone()) { @@ -565,38 +555,39 @@ impl RelayPath { &self, initial_odata: OperationalData, ) -> Option { - info!( - "[{}] failed. Regenerate operational data from {} events", - self, - initial_odata.events().len() + let op_info = initial_odata.info().into_owned(); + + warn!( + "failed. Regenerate operational data from {} events", + op_info.batch_len() ); // Retry by re-generating the operational data using the initial events - let (src_opt, dst_opt) = match self.generate_operational_data(initial_odata.events()) { + let (src_opt, dst_opt) = match self.generate_operational_data(initial_odata.into_events()) { Ok(new_operational_data) => new_operational_data, Err(e) => { error!( - "[{}] failed to regenerate operational data from initial data: {} \ + "failed to regenerate operational data from initial data: {} \ with error {}, discarding this op. data", - self, initial_odata, e + op_info, e ); return None; } // Cannot retry, contain the error by reporting a None }; if let Some(src_od) = src_opt { - if src_od.target == initial_odata.target { + if src_od.target == op_info.target() { // Our target is the _source_ chain, retry these messages - info!("[{}] will retry with op data {}", self, src_od); + info!(odata = %src_od.info(), "will retry"); return Some(src_od); } else { // Our target is the _destination_ chain, the data in `src_od` contains // potentially new timeout messages that have to be handled separately. if let Err(e) = self.schedule_operational_data(src_od) { error!( - "[{}] failed to schedule newly-generated operational data from \ - initial data: {} with error {}, discarding this op. data", - self, initial_odata, e + "failed to schedule newly-generated operational data from \ + initial data: {} with error {}, discarding this op. data", + op_info, e ); return None; } @@ -604,23 +595,22 @@ impl RelayPath { } if let Some(dst_od) = dst_opt { - if dst_od.target == initial_odata.target { + if dst_od.target == op_info.target() { // Our target is the _destination_ chain, retry these messages - info!("[{}] will retry with op data {}", self, dst_od); + info!(odata = %dst_od.info(), "will retry"); return Some(dst_od); } else { // Our target is the _source_ chain, but `dst_od` has new messages // intended for the destination chain, this should never be the case error!( - "[{}] generated new messages for destination chain while handling \ + "generated new messages for destination chain while handling \ failed events targeting the source chain!", - self ); } } else { // There is no message intended for the destination chain - if initial_odata.target == OperationalDataTarget::Destination { - info!("[{}] exhausted all events from this operational data", self); + if op_info.target() == OperationalDataTarget::Destination { + info!("exhausted all events from this operational data"); return None; } } @@ -642,7 +632,7 @@ impl RelayPath { odata: OperationalData, ) -> Result { if odata.batch.is_empty() { - error!("[{}] ignoring empty operational data!", self); + error!("ignoring empty operational data!"); return Ok(S::Reply::empty()); } @@ -734,7 +724,11 @@ impl RelayPath { } /// Handles updating the client on the destination chain - fn update_client_dst(&self, src_chain_height: Height) -> Result<(), LinkError> { + fn update_client_dst( + &self, + src_chain_height: Height, + tracking_id: &str, + ) -> Result<(), LinkError> { // Handle the update on the destination chain // Check if a consensus state at update_height exists on destination chain already if self @@ -749,18 +743,18 @@ impl RelayPath { for i in 0..MAX_RETRIES { let dst_update = self.build_update_client_on_dst(src_chain_height)?; info!( - "[{}] sending updateClient to client hosted on dest. chain {} for height {} [try {}/{}]", - self, - self.dst_chain().id(), + "sending updateClient to client hosted on destination chain for height {} [try {}/{}]", src_chain_height, i + 1, MAX_RETRIES, ); + let tm = TrackedMsgs::new(dst_update, tracking_id); + let dst_tx_events = self .dst_chain() - .send_messages_and_wait_commit(dst_update) + .send_messages_and_wait_commit(tm) .map_err(LinkError::relayer)?; - info!("[{}] result {}\n", self, PrettyEvents(&dst_tx_events)); + info!("result: {}", PrettyEvents(&dst_tx_events)); dst_err_ev = dst_tx_events .into_iter() @@ -778,7 +772,11 @@ impl RelayPath { } /// Handles updating the client on the source chain - fn update_client_src(&self, dst_chain_height: Height) -> Result<(), LinkError> { + fn update_client_src( + &self, + dst_chain_height: Height, + tracking_id: &str, + ) -> Result<(), LinkError> { if self .src_chain() .proven_client_consensus(self.src_client_id(), dst_chain_height, Height::zero()) @@ -791,17 +789,17 @@ impl RelayPath { for _ in 0..MAX_RETRIES { let src_update = self.build_update_client_on_src(dst_chain_height)?; info!( - "[{}] sending updateClient to client hosted on src. chain {} for height {}", - self, - self.src_chain().id(), + "sending updateClient to client hosted on source chain for height {}", dst_chain_height, ); + let tm = TrackedMsgs::new(src_update, tracking_id); + let src_tx_events = self .src_chain() - .send_messages_and_wait_commit(src_update) + .send_messages_and_wait_commit(tm) .map_err(LinkError::relayer)?; - info!("[{}] result {}\n", self, PrettyEvents(&src_tx_events)); + info!("result: {}", PrettyEvents(&src_tx_events)); src_err_ev = src_tx_events .into_iter() @@ -823,7 +821,7 @@ impl RelayPath { fn target_height_and_send_packet_events( &self, opt_query_height: Option, - ) -> Result<(Vec, Height), LinkError> { + ) -> Result<(TrackedEvents, Height), LinkError> { let mut events_result = vec![]; let src_channel_id = self.src_channel_id(); @@ -843,20 +841,18 @@ impl RelayPath { let sequences: Vec = sequences.into_iter().map(From::from).collect(); if sequences.is_empty() { - return Ok((events_result, query_height)); + return Ok((events_result.into(), query_height)); } debug!( - "[{}] packet seq. that still have commitments on {}: {} (first 10 shown here; total={})", - self, + "packet seq. that still have commitments on {}: {} (first 10 shown here; total={})", self.src_chain().id(), commit_sequences.iter().take(10).join(", "), commit_sequences.len() ); debug!( - "[{}] recv packets to send out to {} of the ones with commitments on source {}: {} (first 10 shown here; total={})", - self, + "recv packets to send out to {} of the ones with commitments on {}: {} (first 10 shown here; total={})", self.dst_chain().id(), self.src_chain().id(), sequences.iter().take(10).join(", "), sequences.len() @@ -895,9 +891,9 @@ impl RelayPath { Default::default() }; - trace!("[{}] start_block_events {:?}", self, start_block_events); - trace!("[{}] tx_events {:?}", self, tx_events); - trace!("[{}] end_block_events {:?}", self, end_block_events); + trace!("start_block_events {:?}", start_block_events); + trace!("tx_events {:?}", tx_events); + trace!("end_block_events {:?}", end_block_events); // events must be ordered in the following fashion - // start-block events followed by tx-events followed by end-block events @@ -906,10 +902,7 @@ impl RelayPath { events_result.extend(end_block_events); if events_result.is_empty() { - info!( - "[{}] found zero unprocessed SendPacket events on source chain, nothing to do", - self - ); + info!("found zero unprocessed SendPacket events on source chain, nothing to do"); } else { let mut packet_sequences = vec![]; for event in events_result.iter() { @@ -925,14 +918,13 @@ impl RelayPath { } } info!( - "[{}] found unprocessed SendPacket events for {:?} (first 10 shown here; total={})", - self, + "found unprocessed SendPacket events for {:?} (first 10 shown here; total={})", packet_sequences, events_result.len() ); } - Ok((events_result, query_height)) + Ok((events_result.into(), query_height)) } /// Returns relevant packet events for building ack messages. @@ -940,7 +932,7 @@ impl RelayPath { fn target_height_and_write_ack_events( &self, opt_query_height: Option, - ) -> Result<(Vec, Height), LinkError> { + ) -> Result<(TrackedEvents, Height), LinkError> { let mut events_result = vec![]; let src_channel_id = self.src_channel_id(); @@ -961,12 +953,11 @@ impl RelayPath { let sequences: Vec = unreceived_acks_by_dst.into_iter().map(From::from).collect(); if sequences.is_empty() { - return Ok((events_result, query_height)); + return Ok((events_result.into(), query_height)); } debug!( - "[{}] packets that have acknowledgments on {}: [{:?}..{:?}] (total={})", - self, + "packets that have acknowledgments on {}: [{:?}..{:?}] (total={})", self.src_chain().id(), acks_on_src.first(), acks_on_src.last(), @@ -974,8 +965,7 @@ impl RelayPath { ); debug!( - "[{}] ack packets to send out to {} of the ones with acknowledgments on {}: {} (first 10 shown here; total={})", - self, + "ack packets to send out to {} of the ones with acknowledgments on {}: {} (first 10 shown here; total={})", self.dst_chain().id(), self.src_chain().id(), sequences.iter().take(10).join(", "), sequences.len() @@ -996,8 +986,7 @@ impl RelayPath { if events_result.is_empty() { info!( - "[{}] found zero unprocessed WriteAcknowledgement events on source chain, nothing to do", - self + "found zero unprocessed WriteAcknowledgement events on source chain, nothing to do", ); } else { let mut packet_sequences = vec![]; @@ -1015,10 +1004,14 @@ impl RelayPath { } } } - info!("[{}] found unprocessed WriteAcknowledgement events for {:?} (first 10 shown here; total={})", self, packet_sequences, events_result.len()); + info!( + "found unprocessed WriteAcknowledgement events for {:?} (first 10 shown here; total={})", + packet_sequences, + events_result.len(), + ); } - Ok((events_result, query_height)) + Ok((events_result.into(), query_height)) } /// Schedules the relaying of RecvPacket and Timeout messages. @@ -1038,9 +1031,7 @@ impl RelayPath { return Ok(()); } - for event in events.iter_mut() { - event.set_height(height); - } + events.set_height(height); self.events_to_operational_data(events)?; @@ -1061,9 +1052,7 @@ impl RelayPath { return Ok(()); } - for event in events.iter_mut() { - event.set_height(height); - } + events.set_height(height); self.events_to_operational_data(events)?; Ok(()) @@ -1084,8 +1073,7 @@ impl RelayPath { let msg = MsgRecvPacket::new(packet.clone(), proofs.clone(), self.dst_signer()?); trace!( - "[{}] built recv_packet msg {}, proofs at height {}", - self, + "built recv_packet msg {}, proofs at height {}", msg.packet, proofs.height() ); @@ -1118,8 +1106,7 @@ impl RelayPath { ); trace!( - "[{}] built acknowledgment msg {}, proofs at height {}", - self, + "built acknowledgment msg {}, proofs at height {}", msg.packet, proofs.height() ); @@ -1166,8 +1153,7 @@ impl RelayPath { ); trace!( - "[{}] built timeout msg {}, proofs at height {}", - self, + "built timeout msg {}, proofs at height {}", msg.packet, proofs.height() ); @@ -1199,8 +1185,7 @@ impl RelayPath { ); trace!( - "[{}] built timeout on close msg {}, proofs at height {}", - self, + "built timeout on close msg {}, proofs at height {}", msg.packet, proofs.height() ); @@ -1318,6 +1303,9 @@ impl RelayPath { return Ok(()); } + let span = span!(Level::INFO, "refresh"); + let _enter = span.enter(); + let dst_status = self .dst_chain() .query_status() @@ -1329,7 +1317,7 @@ impl RelayPath { // to source operational data. let mut all_dst_odata = self.dst_operational_data.clone_vec(); - let mut timed_out: HashMap> = HashMap::default(); + let mut timed_out: HashMap = HashMap::default(); // For each operational data targeting the destination chain... for (odata_pos, odata) in all_dst_odata.iter_mut().enumerate() { @@ -1343,23 +1331,24 @@ impl RelayPath { IbcEvent::SendPacket(e) => { // Catch any SendPacket event that timed-out if self.send_packet_event_handled(e)? { - debug!( - "[{}] refreshing schedule: already handled send packet {}", - self, e - ); + debug!("already handled send packet {}", e); } else if let Some(new_msg) = self.build_timeout_from_send_packet_event(e, &dst_status)? { - debug!( - "[{}] refreshing schedule: found a timed-out msg in the op data {}", - self, odata - ); - timed_out.entry(odata_pos).or_insert_with(Vec::new).push( - TransitMessage { + debug!("found a timed-out msg in the op data {}", odata.info(),); + timed_out + .entry(odata_pos) + .or_insert_with(|| { + OperationalData::new( + dst_current_height, + OperationalDataTarget::Source, + &odata.tracking_id, + ) + }) + .push(TransitMessage { event: event.clone(), msg: new_msg, - }, - ); + }); } else { // A SendPacket event, but did not time-out yet, retain retain_batch.push(gm.clone()); @@ -1367,10 +1356,7 @@ impl RelayPath { } IbcEvent::WriteAcknowledgement(e) => { if self.write_ack_event_handled(e)? { - debug!( - "[{}] refreshing schedule: already handled {} write ack ", - self, e - ); + debug!("already handled {} write ack ", e); } else { retain_batch.push(gm.clone()); } @@ -1397,15 +1383,9 @@ impl RelayPath { } // Schedule new operational data targeting the source chain - for (_, batch) in timed_out.into_iter() { - let mut new_od = - OperationalData::new(dst_current_height, OperationalDataTarget::Source); - - new_od.batch = batch; - + for (_, new_od) in timed_out.into_iter() { info!( - "[{}] refreshing schedule: re-scheduling from new timed-out batch of size {}", - self, + "re-scheduling from new timed-out batch of size {}", new_od.batch.len() ); @@ -1419,29 +1399,30 @@ impl RelayPath { /// If the relaying path has non-zero packet delays, this method also updates the client on the /// target chain with the appropriate headers. fn schedule_operational_data(&self, mut od: OperationalData) -> Result<(), LinkError> { + let _span = span!(Level::INFO, "schedule", odata = %od.info()).entered(); + if od.batch.is_empty() { info!( - "[{}] ignoring operational data for {} because it has no messages", - self, od.target + "ignoring operational data for {} because it has no messages", + od.target ); return Ok(()); } - info!( - "[{}] scheduling op. data with {} msg(s) for {} (height {})", - self, - od.batch.len(), - od.target, - od.proofs_height.increment(), // increment for easier correlation with the client logs - ); - // Update clients ahead of scheduling the operational data, if the delays are non-zero. if !self.zero_delay() { + debug!("connection delay is non-zero: updating client"); let target_height = od.proofs_height.increment(); match od.target { - OperationalDataTarget::Source => self.update_client_src(target_height)?, - OperationalDataTarget::Destination => self.update_client_dst(target_height)?, + OperationalDataTarget::Source => { + self.update_client_src(target_height, &od.tracking_id)? + } + OperationalDataTarget::Destination => { + self.update_client_dst(target_height, &od.tracking_id)? + } }; + } else { + debug!("connection delay is zero: client update message will be prepended later"); } od.scheduled_time = Instant::now(); @@ -1516,15 +1497,13 @@ impl RelayPath { match delay_left { None => info!( - "[{}] ready to fetch a scheduled op. data with batch of size {} targeting {}", - self, + "ready to fetch a scheduled op. data with batch of size {} targeting {}", odata.batch.len(), odata.target, ), Some(delay_left) => { info!( - "[{}] waiting ({:?} left) for a scheduled op. data with batch of size {} targeting {}", - self, + "waiting ({:?} left) for a scheduled op. data with batch of size {} targeting {}", delay_left, odata.batch.len(), odata.target, @@ -1557,16 +1536,3 @@ impl RelayPath { ) } } - -impl fmt::Display for RelayPath { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "{}:{}/{} -> {}", - self.src_chain().id(), - self.src_port_id(), - self.src_channel_id(), - self.dst_chain().id() - ) - } -} diff --git a/relayer/src/link/relay_sender.rs b/relayer/src/link/relay_sender.rs index fd87e4f224..aac599c4f8 100644 --- a/relayer/src/link/relay_sender.rs +++ b/relayer/src/link/relay_sender.rs @@ -1,12 +1,12 @@ use core::fmt; -use prost_types::Any; use tendermint_rpc::endpoint::broadcast::tx_sync; use tracing::info; use ibc::events::{IbcEvent, PrettyEvents}; use crate::chain::handle::ChainHandle; +use crate::chain::tx::TrackedMsgs; use crate::link::error::LinkError; use crate::link::RelaySummary; @@ -24,7 +24,7 @@ impl SubmitReply for RelaySummary { pub trait Submit { type Reply: SubmitReply; - fn submit(target: &impl ChainHandle, msgs: Vec) -> Result; + fn submit(target: &impl ChainHandle, msgs: TrackedMsgs) -> Result; } /// Synchronous sender @@ -36,7 +36,7 @@ impl Submit for SyncSender { // TODO: Switch from the `Chain::send_msgs` interface in this method // to use `Chain::submit_msgs` instead; implement waiting for block // commits directly here (instead of blocking in the chain runtime). - fn submit(target: &impl ChainHandle, msgs: Vec) -> Result { + fn submit(target: &impl ChainHandle, msgs: TrackedMsgs) -> Result { let tx_events = target .send_messages_and_wait_commit(msgs) .map_err(LinkError::relayer)?; @@ -76,7 +76,7 @@ pub struct AsyncSender; impl Submit for AsyncSender { type Reply = AsyncReply; - fn submit(target: &impl ChainHandle, msgs: Vec) -> Result { + fn submit(target: &impl ChainHandle, msgs: TrackedMsgs) -> Result { let a = target .send_messages_and_wait_check_tx(msgs) .map_err(LinkError::relayer)?; diff --git a/relayer/src/rest.rs b/relayer/src/rest.rs index 8ad9e17486..4f6cb31fce 100644 --- a/relayer/src/rest.rs +++ b/relayer/src/rest.rs @@ -44,48 +44,48 @@ pub fn process_incoming_requests(config: &Config, channel: &Receiver) -> Option< match channel.try_recv() { Ok(request) => match request { Request::Version { reply_to } => { - trace!("[rest] Version"); + trace!("Version"); let v = VersionInfo { name: NAME.to_string(), version: VER.to_string(), }; - reply_to.send(Ok(v)).unwrap_or_else(|e| { - error!("[rest/supervisor] error replying to a REST request {}", e) - }); + reply_to + .send(Ok(v)) + .unwrap_or_else(|e| error!("error replying to a REST request {}", e)); } Request::GetChains { reply_to } => { - trace!("[rest] GetChains"); + trace!("GetChains"); reply_to .send(Ok(config.chains.iter().map(|c| c.id.clone()).collect())) - .unwrap_or_else(|e| error!("[rest] error replying to a REST request {}", e)); + .unwrap_or_else(|e| error!("error replying to a REST request {}", e)); } Request::GetChain { chain_id, reply_to } => { - trace!("[rest] GetChain {}", chain_id); + trace!("GetChain {}", chain_id); let result = config .find_chain(&chain_id) .cloned() .ok_or(RestApiError::ChainConfigNotFound(chain_id)); - reply_to.send(result).unwrap_or_else(|e| { - error!("[rest/supervisor] error replying to a REST request {}", e) - }); + reply_to + .send(result) + .unwrap_or_else(|e| error!("error replying to a REST request {}", e)); } Request::State { reply_to } => { - trace!("[rest] State"); + trace!("State"); return Some(Command::DumpState(reply_to)); } }, Err(e) => { if !matches!(e, TryRecvError::Empty) { - error!("[rest] error while waiting for requests: {}", e); + error!("error while waiting for requests: {}", e); } } } diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 2d706bceb1..3189541162 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -7,7 +7,7 @@ use std::sync::RwLock; use crossbeam_channel::{unbounded, Receiver, Sender}; use itertools::Itertools; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, error_span, info, trace, warn}; use ibc::{ core::ics24_host::identifier::{ChainId, ChannelId, PortId}, @@ -166,7 +166,7 @@ fn spawn_batch_worker( subscriptions: Arc>>, ) -> TaskHandle { spawn_background_task( - "supervisor_batch".to_string(), + tracing::Span::none(), Some(Duration::from_millis(500)), move || -> Result> { if let Some((chain, batch)) = try_recv_multiple(&subscriptions.acquire_read()) { @@ -194,7 +194,7 @@ pub fn spawn_cmd_worker( cmd_rx: Receiver, ) -> TaskHandle { spawn_background_task( - "supervisor_cmd".to_string(), + error_span!("cmd"), Some(Duration::from_millis(500)), move || { if let Ok(cmd) = cmd_rx.try_recv() { @@ -237,7 +237,7 @@ pub fn spawn_rest_worker( rest_rx: rest::Receiver, ) -> TaskHandle { spawn_background_task( - "supervisor_rest".to_string(), + error_span!("rest"), Some(Duration::from_millis(500)), move || -> Result> { handle_rest_requests( @@ -560,9 +560,9 @@ fn handle_rest_cmd( match m { rest::Command::DumpState(reply) => { let state = state(registry, workers); - reply.send(Ok(state)).unwrap_or_else(|e| { - error!("[rest/supervisor] error replying to a REST request {}", e) - }); + reply + .send(Ok(state)) + .unwrap_or_else(|e| error!("error replying to a REST request {}", e)); } } } diff --git a/relayer/src/supervisor/spawn.rs b/relayer/src/supervisor/spawn.rs index 49c995a3f8..30e45bc902 100644 --- a/relayer/src/supervisor/spawn.rs +++ b/relayer/src/supervisor/spawn.rs @@ -1,5 +1,5 @@ use itertools::Itertools; -use tracing::{debug, error, warn}; +use tracing::{debug, error, trace, warn}; use ibc::{ core::{ @@ -407,7 +407,7 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { ); if conn_state_src.is_open() && conn_state_dst.is_open() { - debug!( + trace!( "connection {} on chain {} is already open, not spawning Connection worker", connection.connection_id, chain.id() diff --git a/relayer/src/transfer.rs b/relayer/src/transfer.rs index fcc7f0ad0e..e924f9742c 100644 --- a/relayer/src/transfer.rs +++ b/relayer/src/transfer.rs @@ -12,6 +12,7 @@ use ibc::Height; use uint::FromStrRadixErr; use crate::chain::handle::ChainHandle; +use crate::chain::tx::TrackedMsgs; use crate::error::Error; use crate::util::bigint::U256; @@ -126,7 +127,7 @@ pub fn build_and_send_transfer_messages( - task_name: String, + span: tracing::Span, interval_pause: Option, mut step_runner: impl FnMut() -> Result> + Send + Sync + 'static, ) -> TaskHandle { - info!("spawning new background task {}", task_name); + info!(parent: &span, "spawning"); let stopped = Arc::new(RwLock::new(false)); let write_stopped = stopped.clone(); @@ -98,6 +98,7 @@ pub fn spawn_background_task( let (shutdown_sender, receiver) = bounded(1); let join_handle = thread::spawn(move || { + let _entered = span.enter(); loop { match receiver.try_recv() { Ok(()) => { @@ -106,17 +107,14 @@ pub fn spawn_background_task( _ => match step_runner() { Ok(Next::Continue) => {} Ok(Next::Abort) => { - info!("task is aborting: {}", task_name); + info!("aborting"); break; } Err(TaskError::Ignore(e)) => { - warn!("task {} encountered ignorable error: {}", task_name, e); + warn!("encountered ignorable error: {}", e); } Err(TaskError::Fatal(e)) => { - error!( - "aborting task {} after encountering fatal error: {}", - task_name, e - ); + error!("aborting after encountering fatal error: {}", e); break; } }, @@ -127,7 +125,7 @@ pub fn spawn_background_task( } *write_stopped.acquire_write() = true; - info!("task {} has terminated", task_name); + info!("terminated"); }); TaskHandle { diff --git a/relayer/src/worker/channel.rs b/relayer/src/worker/channel.rs index 2057db2992..c0b9571f38 100644 --- a/relayer/src/worker/channel.rs +++ b/relayer/src/worker/channel.rs @@ -1,6 +1,6 @@ use core::time::Duration; use crossbeam_channel::Receiver; -use tracing::debug; +use tracing::{debug, error_span}; use crate::channel::Channel as RelayChannel; use crate::util::task::{spawn_background_task, Next, TaskError, TaskHandle}; @@ -20,7 +20,7 @@ pub fn spawn_channel_worker( cmd_rx: Receiver, ) -> TaskHandle { spawn_background_task( - format!("ChannelWorker({})", channel.short_name()), + error_span!("channel", channel = %channel.short_name()), Some(Duration::from_millis(200)), move || { if let Ok(cmd) = cmd_rx.try_recv() { @@ -29,10 +29,7 @@ pub fn spawn_channel_worker( // there can be up to two event for this channel, e.g. init and try. // process the last event, the one with highest "rank". let last_event = batch.events.last(); - debug!( - channel = %channel.short_name(), - "channel worker starts processing {:#?}", last_event - ); + debug!("starts processing {:#?}", last_event); if let Some(event) = last_event { let mut handshake_channel = RelayChannel::restore_from_event( @@ -54,10 +51,7 @@ pub fn spawn_channel_worker( height: current_height, new_block: _, } => { - debug!( - channel = %channel.short_name(), - "Channel worker starts processing block event at {:#?}", current_height - ); + debug!("starts processing block event at {:#?}", current_height); let height = current_height .decrement() diff --git a/relayer/src/worker/client.rs b/relayer/src/worker/client.rs index 3c54135cf7..2914541e75 100644 --- a/relayer/src/worker/client.rs +++ b/relayer/src/worker/client.rs @@ -1,7 +1,7 @@ use core::convert::Infallible; use core::time::Duration; use crossbeam_channel::Receiver; -use tracing::{debug, trace, warn}; +use tracing::{debug, span, trace, warn}; use ibc::events::IbcEvent; @@ -19,13 +19,19 @@ pub fn spawn_refresh_client( ) -> Option { if client.is_expired_or_frozen() { warn!( - "skipping refresh client task on frozen client: {}", - client.id() + client = %client.id, + "skipping refresh client task on frozen client", ); None } else { Some(spawn_background_task( - format!("RefreshClientWorker({})", client), + span!( + tracing::Level::ERROR, + "refresh", + client = %client.id, + src_chain = %client.src_chain.id(), + dst_chain = %client.dst_chain.id(), + ), Some(Duration::from_secs(1)), move || { let res = client.refresh().map_err(|e| { @@ -52,40 +58,47 @@ pub fn detect_misbehavior_task( ) -> Option { if client.is_expired_or_frozen() { warn!( - "skipping detect misbehavior task on frozen client: {}", - client.id() + client = %client.id(), + "skipping detect misbehavior task on frozen client", ); return None; } { - debug!("[{}] doing first misbehavior check", client); + let _span = span!( + tracing::Level::DEBUG, + "DetectMisbehaviorFirstCheck", + client = %client.id, + src_chain = %client.src_chain.id(), + dst_chain = %client.dst_chain.id(), + ) + .entered(); + debug!("doing first check"); let misbehavior_result = client.detect_misbehaviour_and_submit_evidence(None); - debug!( - "[{}] detect misbehavior result: {:?}", - client, misbehavior_result - ); + trace!("detect misbehavior result: {:?}", misbehavior_result); } let handle = spawn_background_task( - format!("DetectMisbehaviorWorker({})", client), + span!( + tracing::Level::ERROR, + "DetectMisbehaviorWorker", + client = %client.id, + src_chain = %client.src_chain.id(), + dst_chain = %client.dst_chain.id(), + ), Some(Duration::from_millis(600)), move || -> Result> { if let Ok(cmd) = receiver.try_recv() { match cmd { WorkerCmd::IbcEvents { batch } => { - trace!("[{}] worker received batch: {:?}", client, batch); + trace!("received batch: {:?}", batch); for event in batch.events { if let IbcEvent::UpdateClient(update) = event { - debug!("[{}] checking misbehavior for updated client", client); + debug!("checking misbehavior for updated client"); let misbehavior_result = client.detect_misbehaviour_and_submit_evidence(Some(update)); - trace!( - "[{}] detect misbehavior result: {:?}", - client, - misbehavior_result - ); + trace!("detect misbehavior result: {:?}", misbehavior_result); match misbehavior_result { MisbehaviourResults::ValidClient => {} diff --git a/relayer/src/worker/connection.rs b/relayer/src/worker/connection.rs index 39f892a00e..1ddf8912f7 100644 --- a/relayer/src/worker/connection.rs +++ b/relayer/src/worker/connection.rs @@ -1,6 +1,6 @@ use core::time::Duration; use crossbeam_channel::Receiver; -use tracing::debug; +use tracing::{debug, error_span}; use crate::connection::Connection as RelayConnection; use crate::util::task::{spawn_background_task, Next, TaskError, TaskHandle}; @@ -20,7 +20,7 @@ pub fn spawn_connection_worker( cmd_rx: Receiver, ) -> TaskHandle { spawn_background_task( - format!("ConnectionWorker({})", connection.short_name()), + error_span!("connection", connection = %connection.short_name()), Some(Duration::from_millis(200)), move || { if let Ok(cmd) = cmd_rx.try_recv() { @@ -30,10 +30,7 @@ pub fn spawn_connection_worker( // process the last event, the one with highest "rank". let last_event = batch.events.last(); - debug!( - connection = %connection.short_name(), - "connection worker starts processing {:#?}", last_event - ); + debug!("starts processing {:#?}", last_event); if let Some(event) = last_event { let mut handshake_connection = RelayConnection::restore_from_event( @@ -56,11 +53,7 @@ pub fn spawn_connection_worker( height: current_height, new_block: _, } => { - debug!( - connection = %connection.short_name(), - "connection worker starts processing block event at {}", - current_height - ); + debug!("starts processing block event at {}", current_height); let height = current_height .decrement() diff --git a/relayer/src/worker/error.rs b/relayer/src/worker/error.rs index 6b531d7aa1..18effb124e 100644 --- a/relayer/src/worker/error.rs +++ b/relayer/src/worker/error.rs @@ -10,19 +10,19 @@ define_error! { RunError { Ics02 [ Ics02Error ] - | _ | { "client errror" }, + | _ | { "client error" }, Connection [ ConnectionError ] - | _ | { "connection errror" }, + | _ | { "connection error" }, Channel [ ChannelError ] - | _ | { "channel errror" }, + | _ | { "channel error" }, Link [ LinkError ] - | _ | { "link errror" }, + | _ | { "link error" }, Retry { retries: retry::Error } diff --git a/relayer/src/worker/packet.rs b/relayer/src/worker/packet.rs index 5a8d539167..dff637422f 100644 --- a/relayer/src/worker/packet.rs +++ b/relayer/src/worker/packet.rs @@ -2,7 +2,7 @@ use core::time::Duration; use crossbeam_channel::Receiver; use ibc::Height; use std::sync::{Arc, Mutex}; -use tracing::{error, trace}; +use tracing::{error, error_span, trace}; use crate::chain::handle::ChainHandle; use crate::foreign_client::HasExpiredOrFrozenError; @@ -47,31 +47,37 @@ pub fn spawn_packet_worker( // Mutex is used to prevent race condition between the packet workers link: Arc>>, ) -> TaskHandle { - spawn_background_task( - format!("PacketWorker({})", link.lock().unwrap().a_to_b), - Some(Duration::from_millis(1000)), - move || { - let relay_path = &link.lock().unwrap().a_to_b; + let span = { + let relay_path = &link.lock().unwrap().a_to_b; + error_span!( + "packet", + src_chain = %relay_path.src_chain().id(), + src_port = %relay_path.src_port_id(), + src_channel = %relay_path.src_channel_id(), + dst_chain = %relay_path.dst_chain().id(), + ) + }; + spawn_background_task(span, Some(Duration::from_millis(1000)), move || { + let relay_path = &link.lock().unwrap().a_to_b; - relay_path - .refresh_schedule() - .map_err(handle_link_error_in_task)?; + relay_path + .refresh_schedule() + .map_err(handle_link_error_in_task)?; - relay_path - .execute_schedule() - .map_err(handle_link_error_in_task)?; + relay_path + .execute_schedule() + .map_err(handle_link_error_in_task)?; - let summary = relay_path.process_pending_txs(); + let summary = relay_path.process_pending_txs(); - if !summary.is_empty() { - trace!("Packet worker produced relay summary: {:?}", summary); - } + if !summary.is_empty() { + trace!("Packet worker produced relay summary: {:?}", summary); + } - telemetry!(packet_metrics(&path, &summary)); + telemetry!(packet_metrics(&path, &summary)); - Ok(Next::Continue) - }, - ) + Ok(Next::Continue) + }) } pub fn spawn_packet_cmd_worker( @@ -83,28 +89,34 @@ pub fn spawn_packet_cmd_worker( path: Packet, ) -> TaskHandle { let mut is_first_run: bool = true; - spawn_background_task( - format!("PacketCmdWorker({})", link.lock().unwrap().a_to_b), - Some(Duration::from_millis(200)), - move || { - if let Ok(cmd) = cmd_rx.try_recv() { - retry_with_index(retry_strategy::worker_stubborn_strategy(), |index| { - handle_packet_cmd( - &mut is_first_run, - &link.lock().unwrap(), - clear_on_start, - clear_interval, - &path, - cmd.clone(), - index, - ) - }) - .map_err(|e| TaskError::Fatal(RunError::retry(e)))?; - } - - Ok(Next::Continue) - }, - ) + let span = { + let relay_path = &link.lock().unwrap().a_to_b; + error_span!( + "packet_cmd", + src_chain = %relay_path.src_chain().id(), + src_port = %relay_path.src_port_id(), + src_channel = %relay_path.src_channel_id(), + dst_chain = %relay_path.dst_chain().id(), + ) + }; + spawn_background_task(span, Some(Duration::from_millis(200)), move || { + if let Ok(cmd) = cmd_rx.try_recv() { + retry_with_index(retry_strategy::worker_stubborn_strategy(), |index| { + handle_packet_cmd( + &mut is_first_run, + &link.lock().unwrap(), + clear_on_start, + clear_interval, + &path, + cmd.clone(), + index, + ) + }) + .map_err(|e| TaskError::Fatal(RunError::retry(e)))?; + } + + Ok(Next::Continue) + }) } /// Receives worker commands, which may be: @@ -124,7 +136,7 @@ fn handle_packet_cmd( cmd: WorkerCmd, index: u64, ) -> RetryResult<(), u64> { - trace!("[{}] handling packet worker command {:?}", link.a_to_b, cmd); + trace!("handling command {:?}", cmd); let result = match cmd { WorkerCmd::IbcEvents { batch } => link.a_to_b.update_schedule(batch), @@ -150,8 +162,8 @@ fn handle_packet_cmd( error!( path = %path.short_name(), retry_index = %index, - "[{}] worker will retry: handling command encountered error: {}", - link.a_to_b, e + "will retry: handling command encountered error: {}", + e ); return RetryResult::Retry(index); @@ -175,16 +187,13 @@ fn handle_packet_cmd( if let Err(e) = schedule_result { if e.is_expired_or_frozen_error() { - error!( - "[{}] worker aborting due to expired or frozen client", - link.a_to_b - ); + error!("aborting due to expired or frozen client"); return RetryResult::Err(index); } else { error!( retry_index = %index, - "[{}] worker will retry: schedule execution encountered error: {}", - link.a_to_b, e + "will retry: schedule execution encountered error: {}", + e, ); return RetryResult::Retry(index); } @@ -193,7 +202,7 @@ fn handle_packet_cmd( let summary = link.a_to_b.process_pending_txs(); if !summary.is_empty() { - trace!("Packet worker produced relay summary: {:?}", summary); + trace!("produced relay summary: {:?}", summary); } telemetry!(packet_metrics(path, &summary)); diff --git a/tools/integration-test/src/relayer/chain.rs b/tools/integration-test/src/relayer/chain.rs index 394a31a7f8..8270634350 100644 --- a/tools/integration-test/src/relayer/chain.rs +++ b/tools/integration-test/src/relayer/chain.rs @@ -57,6 +57,7 @@ use ibc_proto::ibc::core::connection::v1::QueryClientConnectionsRequest; use ibc_proto::ibc::core::connection::v1::QueryConnectionsRequest; use ibc_relayer::chain::handle::requests::AppVersion; use ibc_relayer::chain::handle::{ChainHandle, ChainRequest, Subscription}; +use ibc_relayer::chain::tx::TrackedMsgs; use ibc_relayer::chain::{HealthCheck, StatusResponse}; use ibc_relayer::config::ChainConfig; use ibc_relayer::error::Error; @@ -91,16 +92,16 @@ where fn send_messages_and_wait_commit( &self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { - self.value().send_messages_and_wait_commit(proto_msgs) + self.value().send_messages_and_wait_commit(tracked_msgs) } fn send_messages_and_wait_check_tx( &self, - proto_msgs: Vec, + tracked_msgs: TrackedMsgs, ) -> Result, Error> { - self.value().send_messages_and_wait_check_tx(proto_msgs) + self.value().send_messages_and_wait_check_tx(tracked_msgs) } fn get_signer(&self) -> Result {