Skip to content

Commit

Permalink
Instrument more functions using tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Aug 10, 2022
1 parent 891d3f6 commit 32cf498
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 155 deletions.
14 changes: 6 additions & 8 deletions relayer-cli/src/commands/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,21 @@ impl Runnable for HealthCheckCmd {
let config = (*app_config()).clone();

for ch in &config.chains {
info!("[{}] performing health check...", ch.id);
let _span = tracing::error_span!("health_check", chain = %ch.id).entered();

info!("performing health check...");

let chain =
spawn_chain_runtime(&config, &ch.id).unwrap_or_else(exit_with_unrecoverable_error);

match chain.health_check() {
Ok(Healthy) => info!(chain = %ch.id, "chain is healthy"),
Ok(Healthy) => info!("chain is healthy"),
Ok(Unhealthy(_)) => {
// No need to print the error here as it's already printed in `Chain::health_check`
// TODO(romac): Move the printing code here and in the supervisor/registry
warn!("[{}] chain is unhealthy", ch.id)
warn!("chain is not healthy")
}
Err(e) => error!(
"[{}] failed to perform health check, reason: {}",
ch.id,
e.detail()
),
Err(e) => error!("failed to perform health check, reason: {}", e.detail()),
}
}

Expand Down
6 changes: 3 additions & 3 deletions relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use abscissa_core::clap::Parser;
use abscissa_core::{application::fatal_error, Runnable};
use itertools::Itertools;
use tokio::runtime::Runtime as TokioRuntime;
use tracing::{error, info};
use tracing::{error, info, instrument};

use ibc::{core::ics24_host::identifier::ChainId, events::IbcEvent};

Expand Down Expand Up @@ -96,6 +96,7 @@ impl Runnable for ListenCmd {
}

/// Listen to events
#[instrument(skip_all, fields(chain = %config.id))]
pub fn listen(
config: &ChainConfig,
filters: &[EventFilter],
Expand All @@ -104,8 +105,7 @@ pub fn listen(
let (event_monitor, rx) = subscribe(config, rt)?;

info!(
"[{}] listening for queries {}",
config.id,
"listening for queries: {}",
event_monitor.queries().iter().format(", "),
);

Expand Down
61 changes: 26 additions & 35 deletions relayer/src/event/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::{
};
use tokio::task::JoinHandle;
use tokio::{runtime::Runtime as TokioRuntime, sync::mpsc};
use tracing::{debug, error, info, trace};
use tracing::{debug, error, info, instrument, trace};

use tendermint_rpc::{
event::Event as RpcEvent, query::Query, Error as RpcError, SubscriptionClient, Url,
Expand Down Expand Up @@ -136,6 +136,7 @@ pub mod queries {

impl EventMonitor {
/// Create an event monitor, and connect to a node
#[instrument(name = "event_monitor.create", skip_all, fields(chain = %chain_id, addr = %node_addr))]
pub fn new(
chain_id: ChainId,
node_addr: Url,
Expand Down Expand Up @@ -178,11 +179,12 @@ impl EventMonitor {
}

/// Clear the current subscriptions, and subscribe again to all queries.
#[instrument(name = "event_monitor.subscribe", skip_all, fields(chain = %self.chain_id))]
pub fn subscribe(&mut self) -> Result<()> {
let mut subscriptions = vec![];

for query in &self.event_queries {
trace!("[{}] subscribing to query: {}", self.chain_id, query);
trace!("subscribing to query: {}", query);

let subscription = self
.rt
Expand All @@ -194,15 +196,15 @@ impl EventMonitor {

self.subscriptions = Box::new(select_all(subscriptions));

trace!("[{}] subscribed to all queries", self.chain_id);
trace!("subscribed to all queries");

Ok(())
}

#[instrument(name = "event_monitor.try_reconnect", skip_all, fields(chain = %self.chain_id))]
fn try_reconnect(&mut self) -> Result<()> {
trace!(
"[{}] trying to reconnect to WebSocket endpoint {}",
self.chain_id,
"trying to reconnect to WebSocket endpoint {}",
self.node_addr
);

Expand All @@ -221,50 +223,45 @@ impl EventMonitor {
core::mem::swap(&mut self.client, &mut client);
core::mem::swap(&mut self.driver_handle, &mut driver_handle);

trace!(
"[{}] reconnected to WebSocket endpoint {}",
self.chain_id,
self.node_addr
);
trace!("reconnected to WebSocket endpoint {}", self.node_addr);

// Shut down previous client
trace!(
"[{}] gracefully shutting down previous client",
self.chain_id
);
trace!("gracefully shutting down previous client",);

let _ = client.close();

self.rt
.block_on(driver_handle)
.map_err(Error::client_termination_failed)?;

trace!("[{}] previous client successfully shutdown", self.chain_id);
trace!("previous client successfully shutdown");

Ok(())
}

/// Try to resubscribe to events
#[instrument(name = "event_monitor.try_resubscribe", skip_all, fields(chain = %self.chain_id))]
fn try_resubscribe(&mut self) -> Result<()> {
trace!("[{}] trying to resubscribe to events", self.chain_id);
trace!("trying to resubscribe to events");
self.subscribe()
}

/// Attempt to reconnect the WebSocket client using the given retry strategy.
///
/// See the [`retry`](https://docs.rs/retry) crate and the
/// [`crate::util::retry`] module for more information.
#[instrument(name = "event_monitor.reconnect", skip_all, fields(chain = %self.chain_id))]
fn reconnect(&mut self) {
let result = retry_with_index(retry_strategy::default(), |_| {
// Try to reconnect
if let Err(e) = self.try_reconnect() {
trace!("[{}] error when reconnecting: {}", self.chain_id, e);
trace!("error when reconnecting: {}", e);
return RetryResult::Retry(());
}

// Try to resubscribe
if let Err(e) = self.try_resubscribe() {
trace!("[{}] error when resubscribing: {}", self.chain_id, e);
trace!("error when resubscribing: {}", e);
return RetryResult::Retry(());
}

Expand All @@ -273,12 +270,11 @@ impl EventMonitor {

match result {
Ok(()) => info!(
"[{}] successfully reconnected to WebSocket endpoint {}",
self.chain_id, self.node_addr
"successfully reconnected to WebSocket endpoint {}",
self.node_addr
),
Err(retries) => error!(
"[{}] failed to reconnect to {} after {} retries",
self.chain_id,
"failed to reconnect to {} after {} retries",
self.node_addr,
retry_count(&retries)
),
Expand All @@ -287,8 +283,9 @@ impl EventMonitor {

/// Event monitor loop
#[allow(clippy::while_let_loop)]
#[instrument(name = "event_monitor.run", skip_all, fields(chain = %self.chain_id))]
pub fn run(mut self) {
debug!(chain = %self.chain_id, "starting event monitor");
debug!("starting event monitor");

// Continuously run the event loop, so that when it aborts
// because of WebSocket client restart, we pick up the work again.
Expand All @@ -299,18 +296,15 @@ impl EventMonitor {
}
}

debug!("[{}] event monitor is shutting down", self.chain_id);
debug!("event monitor is shutting down");

// Close the WebSocket connection
let _ = self.client.close();

// Wait for the WebSocket driver to finish
let _ = self.rt.block_on(self.driver_handle);

trace!(
"[{}] event monitor has successfully shut down",
self.chain_id
);
trace!("event monitor has successfully shut down");
}

fn run_loop(&mut self) -> Next {
Expand Down Expand Up @@ -347,17 +341,14 @@ impl EventMonitor {

match result {
Ok(batch) => self.process_batch(batch).unwrap_or_else(|e| {
error!("[{}] {}", self.chain_id, e);
error!("error while processing batch: {}", e);
}),
Err(e) => {
if let ErrorDetail::SubscriptionCancelled(reason) = e.detail() {
error!(
"[{}] subscription cancelled, reason: {}",
self.chain_id, reason
);
error!("subscription cancelled, reason: {}", reason);

self.propagate_error(e).unwrap_or_else(|e| {
error!("[{}] {}", self.chain_id, e);
error!("{}", e);
});

telemetry!(ws_reconnect, &self.chain_id);
Expand All @@ -371,7 +362,7 @@ impl EventMonitor {
// thus potentially blow up the stack after many restarts.
return Next::Continue;
} else {
error!("[{}] failed to collect events: {}", self.chain_id, e);
error!("failed to collect events: {}", e);

telemetry!(ws_reconnect, &self.chain_id);

Expand Down
Loading

0 comments on commit 32cf498

Please sign in to comment.