diff --git a/relayer-cli/src/commands/health.rs b/relayer-cli/src/commands/health.rs index badca4ff95..739c6205d8 100644 --- a/relayer-cli/src/commands/health.rs +++ b/relayer-cli/src/commands/health.rs @@ -16,23 +16,21 @@ impl Runnable for HealthCheckCmd { let config = (*app_config()).clone(); for ch in &config.chains { - info!("[{}] performing health check...", ch.id); + let _span = tracing::error_span!("health_check", chain = %ch.id).entered(); + + info!("performing health check..."); let chain = spawn_chain_runtime(&config, &ch.id).unwrap_or_else(exit_with_unrecoverable_error); match chain.health_check() { - Ok(Healthy) => info!(chain = %ch.id, "chain is healthy"), + Ok(Healthy) => info!("chain is healthy"), Ok(Unhealthy(_)) => { // No need to print the error here as it's already printed in `Chain::health_check` // TODO(romac): Move the printing code here and in the supervisor/registry - warn!("[{}] chain is unhealthy", ch.id) + warn!("chain is not healthy") } - Err(e) => error!( - "[{}] failed to perform health check, reason: {}", - ch.id, - e.detail() - ), + Err(e) => error!("failed to perform health check, reason: {}", e.detail()), } } diff --git a/relayer-cli/src/commands/listen.rs b/relayer-cli/src/commands/listen.rs index cd217f7d5a..d1c2f8d5ee 100644 --- a/relayer-cli/src/commands/listen.rs +++ b/relayer-cli/src/commands/listen.rs @@ -6,7 +6,7 @@ use abscissa_core::clap::Parser; use abscissa_core::{application::fatal_error, Runnable}; use itertools::Itertools; use tokio::runtime::Runtime as TokioRuntime; -use tracing::{error, info}; +use tracing::{error, info, instrument}; use ibc::{core::ics24_host::identifier::ChainId, events::IbcEvent}; @@ -96,6 +96,7 @@ impl Runnable for ListenCmd { } /// Listen to events +#[instrument(skip_all, fields(chain = %config.id))] pub fn listen( config: &ChainConfig, filters: &[EventFilter], @@ -104,8 +105,7 @@ pub fn listen( let (event_monitor, rx) = subscribe(config, rt)?; info!( - "[{}] listening for queries {}", - config.id, + "listening for queries: {}", event_monitor.queries().iter().format(", "), ); diff --git a/relayer/src/event/monitor.rs b/relayer/src/event/monitor.rs index 70f12cfcf1..fc521dfa56 100644 --- a/relayer/src/event/monitor.rs +++ b/relayer/src/event/monitor.rs @@ -9,7 +9,7 @@ use futures::{ }; use tokio::task::JoinHandle; use tokio::{runtime::Runtime as TokioRuntime, sync::mpsc}; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, instrument, trace}; use tendermint_rpc::{ event::Event as RpcEvent, query::Query, Error as RpcError, SubscriptionClient, Url, @@ -136,6 +136,7 @@ pub mod queries { impl EventMonitor { /// Create an event monitor, and connect to a node + #[instrument(name = "event_monitor.create", skip_all, fields(chain = %chain_id, addr = %node_addr))] pub fn new( chain_id: ChainId, node_addr: Url, @@ -178,11 +179,12 @@ impl EventMonitor { } /// Clear the current subscriptions, and subscribe again to all queries. + #[instrument(name = "event_monitor.subscribe", skip_all, fields(chain = %self.chain_id))] pub fn subscribe(&mut self) -> Result<()> { let mut subscriptions = vec![]; for query in &self.event_queries { - trace!("[{}] subscribing to query: {}", self.chain_id, query); + trace!("subscribing to query: {}", query); let subscription = self .rt @@ -194,15 +196,15 @@ impl EventMonitor { self.subscriptions = Box::new(select_all(subscriptions)); - trace!("[{}] subscribed to all queries", self.chain_id); + trace!("subscribed to all queries"); Ok(()) } + #[instrument(name = "event_monitor.try_reconnect", skip_all, fields(chain = %self.chain_id))] fn try_reconnect(&mut self) -> Result<()> { trace!( - "[{}] trying to reconnect to WebSocket endpoint {}", - self.chain_id, + "trying to reconnect to WebSocket endpoint {}", self.node_addr ); @@ -221,17 +223,10 @@ impl EventMonitor { core::mem::swap(&mut self.client, &mut client); core::mem::swap(&mut self.driver_handle, &mut driver_handle); - trace!( - "[{}] reconnected to WebSocket endpoint {}", - self.chain_id, - self.node_addr - ); + trace!("reconnected to WebSocket endpoint {}", self.node_addr); // Shut down previous client - trace!( - "[{}] gracefully shutting down previous client", - self.chain_id - ); + trace!("gracefully shutting down previous client",); let _ = client.close(); @@ -239,14 +234,15 @@ impl EventMonitor { .block_on(driver_handle) .map_err(Error::client_termination_failed)?; - trace!("[{}] previous client successfully shutdown", self.chain_id); + trace!("previous client successfully shutdown"); Ok(()) } /// Try to resubscribe to events + #[instrument(name = "event_monitor.try_resubscribe", skip_all, fields(chain = %self.chain_id))] fn try_resubscribe(&mut self) -> Result<()> { - trace!("[{}] trying to resubscribe to events", self.chain_id); + trace!("trying to resubscribe to events"); self.subscribe() } @@ -254,17 +250,18 @@ impl EventMonitor { /// /// See the [`retry`](https://docs.rs/retry) crate and the /// [`crate::util::retry`] module for more information. + #[instrument(name = "event_monitor.reconnect", skip_all, fields(chain = %self.chain_id))] fn reconnect(&mut self) { let result = retry_with_index(retry_strategy::default(), |_| { // Try to reconnect if let Err(e) = self.try_reconnect() { - trace!("[{}] error when reconnecting: {}", self.chain_id, e); + trace!("error when reconnecting: {}", e); return RetryResult::Retry(()); } // Try to resubscribe if let Err(e) = self.try_resubscribe() { - trace!("[{}] error when resubscribing: {}", self.chain_id, e); + trace!("error when resubscribing: {}", e); return RetryResult::Retry(()); } @@ -273,12 +270,11 @@ impl EventMonitor { match result { Ok(()) => info!( - "[{}] successfully reconnected to WebSocket endpoint {}", - self.chain_id, self.node_addr + "successfully reconnected to WebSocket endpoint {}", + self.node_addr ), Err(retries) => error!( - "[{}] failed to reconnect to {} after {} retries", - self.chain_id, + "failed to reconnect to {} after {} retries", self.node_addr, retry_count(&retries) ), @@ -287,8 +283,9 @@ impl EventMonitor { /// Event monitor loop #[allow(clippy::while_let_loop)] + #[instrument(name = "event_monitor.run", skip_all, fields(chain = %self.chain_id))] pub fn run(mut self) { - debug!(chain = %self.chain_id, "starting event monitor"); + debug!("starting event monitor"); // Continuously run the event loop, so that when it aborts // because of WebSocket client restart, we pick up the work again. @@ -299,7 +296,7 @@ impl EventMonitor { } } - debug!("[{}] event monitor is shutting down", self.chain_id); + debug!("event monitor is shutting down"); // Close the WebSocket connection let _ = self.client.close(); @@ -307,10 +304,7 @@ impl EventMonitor { // Wait for the WebSocket driver to finish let _ = self.rt.block_on(self.driver_handle); - trace!( - "[{}] event monitor has successfully shut down", - self.chain_id - ); + trace!("event monitor has successfully shut down"); } fn run_loop(&mut self) -> Next { @@ -347,17 +341,14 @@ impl EventMonitor { match result { Ok(batch) => self.process_batch(batch).unwrap_or_else(|e| { - error!("[{}] {}", self.chain_id, e); + error!("error while processing batch: {}", e); }), Err(e) => { if let ErrorDetail::SubscriptionCancelled(reason) = e.detail() { - error!( - "[{}] subscription cancelled, reason: {}", - self.chain_id, reason - ); + error!("subscription cancelled, reason: {}", reason); self.propagate_error(e).unwrap_or_else(|e| { - error!("[{}] {}", self.chain_id, e); + error!("{}", e); }); telemetry!(ws_reconnect, &self.chain_id); @@ -371,7 +362,7 @@ impl EventMonitor { // thus potentially blow up the stack after many restarts. return Next::Continue; } else { - error!("[{}] failed to collect events: {}", self.chain_id, e); + error!("failed to collect events: {}", e); telemetry!(ws_reconnect, &self.chain_id); diff --git a/relayer/src/foreign_client.rs b/relayer/src/foreign_client.rs index 3550594e0d..658c6d16bf 100644 --- a/relayer/src/foreign_client.rs +++ b/relayer/src/foreign_client.rs @@ -10,7 +10,7 @@ use std::time::Instant; use ibc_proto::google::protobuf::Any; use itertools::Itertools; -use tracing::{debug, error, info, span, trace, warn, Level}; +use tracing::{debug, error, info, instrument, trace, warn}; use flex_error::define_error; use ibc::core::ics02_client::client_consensus::{ @@ -440,9 +440,13 @@ impl ForeignClient Result, ForeignClientError> { - info!("[{}] upgrade Height: {}", self, src_upgrade_height); - let mut msgs = self .build_update_client_with_trusted(src_upgrade_height, None) .map_err(|_| { @@ -472,7 +476,7 @@ impl ForeignClient ForeignClient ForeignClient Result<(), ForeignClientError> { let event = self .build_create_client_and_send(CreateOptions::default()) .map_err(|e| { - error!("[{}] failed CreateClient: {}", self, e); + error!("failed to create client: {}", e); e })?; self.id = extract_client_id(&event)?.clone(); - info!("🍭 [{}] => {:#?}\n", self, event); + + info!(id = %self.id, "🍭 client was created successfully"); + debug!(id = %self.id, ?event, "event emitted after creation"); Ok(()) } + #[instrument( + name = "foreign_client.validated_client_state", + level = "error", + skip(self), + fields(client = %self) + )] pub fn validated_client_state( &self, ) -> Result<(AnyClientState, Option), ForeignClientError> { @@ -708,8 +723,7 @@ impl ForeignClient ForeignClient Result { - let _span = span!(Level::DEBUG, "check_consensus_state_trusting_period", height = %height) - .entered(); - // Safety check if client_state.chain_id() != self.src_chain.id() { warn!("the chain id in the client state ('{}') is inconsistent with the client's source chain id ('{}')", client_state.chain_id(), self.src_chain.id()); } - let consensus_state_timestamp = self.consensus_state(*height)?.timestamp(); + let consensus_state_timestamp = self.fetch_consensus_state(*height)?.timestamp(); let current_src_network_time = self .src_chain @@ -777,6 +794,12 @@ impl ForeignClient Result>, ForeignClientError> { let (client_state, elapsed) = self.validated_client_state()?; @@ -809,6 +832,12 @@ impl ForeignClient ForeignClient ForeignClient ForeignClient ForeignClient ForeignClient ForeignClient ForeignClient ForeignClient= target_height { warn!( - "[{}] skipping update: trusted height ({}) >= chain target height ({})", - self, trusted_height, target_height + "skipping update: trusted height ({}) >= chain target height ({})", + trusted_height, target_height ); + return Ok(vec![]); } @@ -1056,8 +1119,7 @@ impl ForeignClient ForeignClient ForeignClient ForeignClient ForeignClient Result<(), ForeignClientError> { - let res = self.build_latest_update_client_and_send()?; + let events = self.build_latest_update_client_and_send()?; - debug!("[{}] client updated with return message {:?}\n", self, res); + debug!(?events, "client updated"); Ok(()) } @@ -1149,13 +1223,20 @@ impl ForeignClient Result, ForeignClientError> { let mut events = vec![]; for i in 0..MAX_RETRIES { - thread::sleep(Duration::from_millis(100)); + thread::sleep(Duration::from_millis(200)); + let result = self .dst_chain .query_txs(QueryTxRequest::Client(QueryClientEventRequest { @@ -1172,19 +1253,21 @@ impl ForeignClient { error!( - "[{}] query_tx with error {}, retry {}/{}", - self, + "query_tx failed with error {}, retry {}/{}", e, i + 1, MAX_RETRIES ); + continue; } Ok(result) => { events = result; + // Should break to prevent retrying uselessly. break; } @@ -1199,21 +1282,29 @@ impl ForeignClient IbcEvent::UpdateClient).ok_or_else(|| { + let update = downcast!(events[0].clone() => IbcEvent::UpdateClient).ok_or_else(|| { ForeignClientError::unexpected_event( self.id().clone(), self.dst_chain.id(), - event.to_json(), + events[0].to_json(), ) })?; + Ok(Some(update)) } /// Retrieves all consensus states for this client and sorts them in descending height /// order. If consensus states are not pruned on chain, then last consensus state is the one /// installed by the `CreateClient` operation. - fn consensus_states(&self) -> Result, ForeignClientError> { + #[instrument( + name = "foreign_client.fetch_consensus_states", + level = "error", + skip_all, + fields(client = %self) + )] + fn fetch_consensus_states( + &self, + ) -> Result, ForeignClientError> { let mut consensus_states = self .dst_chain .query_consensus_states(QueryConsensusStatesRequest { @@ -1223,12 +1314,23 @@ impl ForeignClient Result { + #[instrument( + name = "foreign_client.fetch_consensus_state", + level = "error", + skip_all, + fields(client = %self, %height) + )] + fn fetch_consensus_state( + &self, + height: Height, + ) -> Result { let (consensus_state, _) = self .dst_chain .query_consensus_state( @@ -1251,13 +1353,18 @@ impl ForeignClient Result, ForeignClientError> { + /// Retrieves all consensus heights for this client sorted in descending order. + #[instrument( + name = "foreign_client.fetch_consensus_state_heights", + level = "error", + skip_all, + fields(client = %self) + )] + fn fetch_consensus_state_heights(&self) -> Result, ForeignClientError> { // [TODO] Utilize query that only fetches consensus state heights // https://github.com/cosmos/ibc-go/issues/798 let consensus_state_heights: Vec = self - .consensus_states()? + .fetch_consensus_states()? .iter() .map(|cs| cs.height) .collect(); @@ -1299,18 +1406,20 @@ impl ForeignClient, ) -> Result, ForeignClientError> { - thread::sleep(Duration::from_millis(100)); - let span_guard = update.as_ref().map(|ev| ev.consensus_height()); - let _span = span!( - tracing::Level::DEBUG, - "detect_misbehaviour", - update_height = ?span_guard, - ) - .entered(); + thread::sleep(Duration::from_millis(200)); // Get the latest client state on destination. let (client_state, _) = { @@ -1338,13 +1447,13 @@ impl ForeignClient ForeignClient ForeignClient ForeignClient { - debug!(target = %target_height, - "exhausted checking trusted consensus states for this client; no evidence found"); + debug!( + target = %target_height, + "exhausted checking trusted consensus states for this client; no evidence found" + ); + // It's safe to stop checking for misbehavior past this `target_height`. break; } @@ -1451,6 +1563,12 @@ impl ForeignClient ForeignClient, @@ -1515,8 +1639,8 @@ impl ForeignClient Ok(vec![]), // no evidence found Ok(Some(detected)) => { error!( - "[{}] MISBEHAVIOUR DETECTED {}, sending evidence", - self, detected.misbehaviour + misbehaviour = %detected.misbehaviour, + "misbehaviour detected, sending evidence" ); self.submit_evidence(detected) @@ -1528,45 +1652,44 @@ impl ForeignClient { - warn!( - "[{}] misbehaviour checking is being disabled: {:?}", - self, s - ); + warn!("misbehaviour checking is being disabled, reason: {:?}", s); + MisbehaviourResults::CannotExecute } + Ok(misbehaviour_detection_result) => { if !misbehaviour_detection_result.is_empty() { info!( - "[{}] evidence submission result {:?}", - self, misbehaviour_detection_result + "evidence submission result: {:?}", + misbehaviour_detection_result ); + MisbehaviourResults::EvidenceSubmitted(misbehaviour_detection_result) } else { + info!("client is valid",); + MisbehaviourResults::ValidClient } } + Err(e) => match e.detail() { ForeignClientErrorDetail::MisbehaviourExit(s) => { - error!( - "[{}] misbehaviour checking is being disabled: {:?}", - self, s - ); + error!("misbehaviour checking is being disabled, reason: {:?}", s); + MisbehaviourResults::CannotExecute } ForeignClientErrorDetail::ExpiredOrFrozen(_) => { - error!( - "[{}] cannot check misbehavior on frozen or expired client", - self - ); + error!("cannot check misbehavior on frozen or expired client",); + MisbehaviourResults::CannotExecute } + + _ if update_event.is_some() => MisbehaviourResults::CannotExecute, + _ => { - if update_event.is_some() { - MisbehaviourResults::CannotExecute - } else { - warn!("[{}] misbehaviour checking result: {:?}", self, e); - MisbehaviourResults::ValidClient - } + warn!("misbehaviour checking result: {:?}", e); + + MisbehaviourResults::ValidClient } }, } diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 2ea8144678..12cf06d310 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -7,7 +7,7 @@ use std::sync::RwLock; use crossbeam_channel::{unbounded, Receiver, Sender}; use itertools::Itertools; -use tracing::{debug, error, error_span, info, trace, warn}; +use tracing::{debug, error, error_span, info, instrument, trace, warn}; use ibc::{ core::ics24_host::identifier::{ChainId, ChannelId, PortId}, @@ -487,17 +487,18 @@ fn health_check(config: &Config, registry: &mut Registry match chain.health_check() { - Ok(Healthy) => info!(chain = %id, "chain is healthy"), - Ok(Unhealthy(e)) => warn!(chain = %id, "chain is unhealthy: {}", e), - Err(e) => error!(chain = %id, "failed to perform health check: {}", e), + Ok(Healthy) => info!("chain is healthy"), + Ok(Unhealthy(e)) => warn!("chain is not healthy: {}", e), + Err(e) => error!("failed to perform health check: {}", e), }, Err(e) => { error!( - chain = %id, "skipping health check, reason: failed to spawn chain runtime with error: {}", e ); @@ -507,6 +508,7 @@ fn health_check(config: &Config, registry: &mut Registry( config: &Config, registry: &mut Registry, @@ -575,6 +577,7 @@ fn handle_rest_requests( } } +#[instrument(skip_all)] fn handle_rest_cmd( registry: &Registry, workers: &WorkerMap, @@ -590,6 +593,7 @@ fn handle_rest_cmd( } } +#[instrument(skip_all, fields(chain = %chain_id))] fn clear_pending_packets(workers: &mut WorkerMap, chain_id: &ChainId) -> Result<(), Error> { for worker in workers.workers_for_chain(chain_id) { worker.clear_pending_packets(); @@ -599,6 +603,7 @@ fn clear_pending_packets(workers: &mut WorkerMap, chain_id: &ChainId) -> Result< } /// Process a batch of events received from a chain. +#[instrument(skip_all, fields(chain = %src_chain.id()))] fn process_batch( config: &Config, registry: &mut Registry, @@ -694,6 +699,7 @@ fn process_batch( /// Process the given batch if it does not contain any errors, /// output the errors on the console otherwise. +#[instrument(skip_all, fields(chain = %chain.id()))] fn handle_batch( config: &Config, registry: &mut Registry, @@ -709,21 +715,17 @@ fn handle_batch( if let Err(e) = process_batch(config, registry, client_state_filter, workers, chain, batch) { - error!("[{}] error during batch processing: {}", chain_id, e); + error!("error during batch processing: {}", e); } } Err(EventError(EventErrorDetail::SubscriptionCancelled(_), _)) => { - warn!(chain.id = %chain_id, "event subscription was cancelled, clearing pending packets"); + warn!("event subscription was cancelled, clearing pending packets"); - let _ = clear_pending_packets(workers, &chain_id).map_err(|e| { - error!( - "[{}] error during clearing pending packets: {}", - chain_id, e - ) - }); + let _ = clear_pending_packets(workers, &chain_id) + .map_err(|e| error!("error during clearing pending packets: {}", e)); } Err(e) => { - error!("[{}] error in receiving event batch: {}", chain_id, e) + error!("error when receiving event batch: {}", e) } } } diff --git a/relayer/src/supervisor/scan.rs b/relayer/src/supervisor/scan.rs index 946cc989eb..26c6c1456d 100644 --- a/relayer/src/supervisor/scan.rs +++ b/relayer/src/supervisor/scan.rs @@ -2,7 +2,7 @@ use core::fmt; use std::collections::BTreeMap; use itertools::Itertools; -use tracing::{debug, error, info, info_span, warn}; +use tracing::{debug, error, error_span, info, warn}; use ibc::core::{ ics02_client::client_state::{ClientState, IdentifiedAnyClientState}, @@ -283,7 +283,7 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { } pub fn scan_chain(&mut self, chain_config: &ChainConfig) -> Result { - let span = info_span!("scan.chain", chain = %chain_config.id); + let span = error_span!("scan.chain", chain = %chain_config.id); let _guard = span.enter(); info!("scanning chain..."); @@ -382,7 +382,7 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { chain: &Chain, client: IdentifiedAnyClientState, ) -> Result, Error> { - let span = info_span!("scan.client", client = %client.client_id); + let span = error_span!("scan.client", client = %client.client_id); let _guard = span.enter(); info!("scanning client..."); @@ -431,7 +431,7 @@ impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { client: &IdentifiedAnyClientState, connection: IdentifiedConnectionEnd, ) -> Result, Error> { - let span = info_span!("scan.connection", connection = %connection.connection_id); + let span = error_span!("scan.connection", connection = %connection.connection_id); let _guard = span.enter(); info!("scanning connection..."); @@ -615,7 +615,7 @@ fn scan_allowed_channel( port_id: &PortId, channel_id: &ChannelId, ) -> Result { - let span = info_span!("scan.channel", port = %port_id, channel = %channel_id); + let span = error_span!("scan.channel", port = %port_id, channel = %channel_id); let _guard = span.enter(); info!("querying channel..."); diff --git a/relayer/src/worker/client.rs b/relayer/src/worker/client.rs index a36608cabb..d4e354a498 100644 --- a/relayer/src/worker/client.rs +++ b/relayer/src/worker/client.rs @@ -27,7 +27,7 @@ pub fn spawn_refresh_client( Some(spawn_background_task( span!( tracing::Level::ERROR, - "refresh", + "refresh_client", client = %client.id, src_chain = %client.src_chain.id(), dst_chain = %client.dst_chain.id(), @@ -69,7 +69,7 @@ pub fn detect_misbehavior_task( let handle = spawn_background_task( span!( tracing::Level::ERROR, - "DetectMisbehaviorWorker", + "detect_misbehaviour_task", client = %client.id, src_chain = %client.src_chain.id(), dst_chain = %client.dst_chain.id(), @@ -78,14 +78,6 @@ pub fn detect_misbehavior_task( move || -> Result> { if !first_check_done { first_check_done = true; - let _span = span!( - tracing::Level::DEBUG, - "DetectMisbehaviorFirstCheck", - client = %client.id, - src_chain = %client.src_chain.id(), - dst_chain = %client.dst_chain.id(), - ) - .entered(); debug!("doing first check"); let misbehavior_result = client.detect_misbehaviour_and_submit_evidence(None); trace!("detect misbehavior result: {:?}", misbehavior_result);