From edc9a2cb50ed774ade03ccb846661ea49caf5b38 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 26 Jan 2022 13:33:55 +0100 Subject: [PATCH] Fast start for chains configured with an allow list (#1705) - Split the worker spawning code into two parts: - a) scan the chains for the clients, connections and channels - b) spawn the appropriate workers based on the resulting scan - When scanning a chain with filtering enabled and an allow list, skip scanning all the clients and query the allowed channels directly, resulting in much fewer queries and a faster start. - Add a `--full-scan` option to `hermes start` to opt out of the fast start mechanism and do a full scan. --- * Introduce a `ChainScanner` to scan the chains for clients, connections and channels * Use `ChainScanner` for spawning workers * Formatting * Add `--full-scan` option to `start` command to force a full scan even when some chains are using allow lists * Remove debug statements and print scanned chains on startup * Changelog entry * Fix duplicate info message * Quote identifiers in log messages * Better error when port/channel does not exists * Add metrics for queries * Small log improvements * Rename queries metric * Use `chain` key for recording chain identifier in tracing logs * Use more structured logging in chain scanner * Fix changelog entry * Improve logs when no workers were spawned * Improve logs when spawning connection and channel workers * Remove spaces in objects names * Add changelog entry * Revert part of logs changes * Use INFO level for spawning logs * Remove redundant changelog entry --- .../improvements/1536-fast-start.md | 3 + .../src/core/ics03_connection/connection.rs | 13 +- relayer-cli/src/commands/start.rs | 25 +- relayer/src/chain/cosmos.rs | 56 +- relayer/src/chain/counterparty.rs | 2 +- relayer/src/config.rs | 25 +- relayer/src/connection.rs | 2 +- relayer/src/object.rs | 4 +- relayer/src/supervisor.rs | 141 ++-- relayer/src/supervisor/error.rs | 5 + relayer/src/supervisor/scan.rs | 779 ++++++++++++++++++ relayer/src/supervisor/spawn.rs | 432 ++-------- relayer/src/util/task.rs | 13 +- relayer/src/worker/channel.rs | 2 +- telemetry/src/state.rs | 19 + .../src/framework/overrides.rs | 12 +- .../src/tests/client_expiration.rs | 24 +- 17 files changed, 1119 insertions(+), 438 deletions(-) create mode 100644 .changelog/unreleased/improvements/1536-fast-start.md create mode 100644 relayer/src/supervisor/scan.rs diff --git a/.changelog/unreleased/improvements/1536-fast-start.md b/.changelog/unreleased/improvements/1536-fast-start.md new file mode 100644 index 0000000000..4141495364 --- /dev/null +++ b/.changelog/unreleased/improvements/1536-fast-start.md @@ -0,0 +1,3 @@ +- Improve startup time of the relayer + - When scanning a chain with filtering enabled and an allow list, skip scanning all the clients and query the allowed channels directly. This results in much fewer queries and a faster start. + - Add a `--full-scan` option to `hermes start` to opt out of the fast start mechanism and do a full scan. diff --git a/modules/src/core/ics03_connection/connection.rs b/modules/src/core/ics03_connection/connection.rs index d5208fc721..24f35e8f96 100644 --- a/modules/src/core/ics03_connection/connection.rs +++ b/modules/src/core/ics03_connection/connection.rs @@ -2,7 +2,7 @@ use crate::prelude::*; use core::str::FromStr; use core::time::Duration; -use core::u64; +use core::{fmt, u64}; use serde::{Deserialize, Serialize}; use tendermint_proto::Protobuf; @@ -325,7 +325,7 @@ pub enum State { impl State { /// Yields the State as a string. - pub fn as_string(&self) -> &'static str { + pub fn as_str(&self) -> &'static str { match self { Self::Uninitialized => "UNINITIALIZED", Self::Init => "INIT", @@ -333,7 +333,8 @@ impl State { Self::Open => "OPEN", } } - // Parses the State out from a i32. + + /// Parses the State out from a i32. pub fn from_i32(s: i32) -> Result { match s { 0 => Ok(Self::Uninitialized), @@ -363,6 +364,12 @@ impl State { } } +impl fmt::Display for State { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.as_str()) + } +} + impl TryFrom for State { type Error = Error; fn try_from(value: i32) -> Result { diff --git a/relayer-cli/src/commands/start.rs b/relayer-cli/src/commands/start.rs index 90a944f0e4..b41c2703dc 100644 --- a/relayer-cli/src/commands/start.rs +++ b/relayer-cli/src/commands/start.rs @@ -1,4 +1,5 @@ use alloc::sync::Arc; +use ibc_relayer::supervisor::SupervisorOptions; use std::error::Error; use std::io; use std::sync::RwLock; @@ -19,15 +20,22 @@ use crate::conclude::Output; use crate::prelude::*; #[derive(Clone, Command, Debug, Parser)] -pub struct StartCmd {} +pub struct StartCmd { + #[clap( + short = 'f', + long = "full-scan", + help = "Force a full scan of the chains for clients, connections and channels" + )] + full_scan: bool, +} impl Runnable for StartCmd { fn run(&self) { let config = (*app_config()).clone(); let config = Arc::new(RwLock::new(config)); - let supervisor_handle = - make_supervisor::(config.clone()).unwrap_or_else(|e| { + let supervisor_handle = make_supervisor::(config.clone(), self.full_scan) + .unwrap_or_else(|e| { Output::error(format!("Hermes failed to start, last error: {}", e)).exit(); unreachable!() }); @@ -176,11 +184,20 @@ fn spawn_telemetry_server( fn make_supervisor( config: Arc>, + force_full_scan: bool, ) -> Result> { let registry = SharedRegistry::::new(config.clone()); spawn_telemetry_server(&config)?; let rest = spawn_rest_server(&config); - Ok(spawn_supervisor(config, registry, rest, true)?) + Ok(spawn_supervisor( + config, + registry, + rest, + SupervisorOptions { + health_check: true, + force_full_scan, + }, + )?) } diff --git a/relayer/src/chain/cosmos.rs b/relayer/src/chain/cosmos.rs index 5161b5e438..82851f857c 100644 --- a/relayer/src/chain/cosmos.rs +++ b/relayer/src/chain/cosmos.rs @@ -285,6 +285,7 @@ impl CosmosSdkChain { /// Specific to the SDK and used only for Tendermint client create pub fn query_consensus_params(&self) -> Result { crate::time!("query_consensus_params"); + crate::telemetry!(query, self.id(), "query_consensus_params"); Ok(self .block_on(self.rpc_client().genesis()) @@ -329,7 +330,10 @@ impl CosmosSdkChain { let estimated_gas = self.estimate_gas(simulate_tx)?; if estimated_gas > self.max_gas() { - debug!(estimated = ?estimated_gas, max = ?self.max_gas(), "[{}] send_tx: estimated gas is higher than max gas", self.id()); + debug!( + id = %self.id(), estimated = ?estimated_gas, max = ?self.max_gas(), + "send_tx: estimated gas is higher than max gas" + ); return Err(Error::tx_simulate_gas_estimate_exceeded( self.id().clone(), @@ -341,7 +345,8 @@ impl CosmosSdkChain { let adjusted_fee = self.fee_with_gas(estimated_gas); debug!( - "using {} gas, fee {}", + id = %self.id(), + "send_tx: using {} gas, fee {}", estimated_gas, PrettyFee(&adjusted_fee) ); @@ -817,8 +822,8 @@ impl CosmosSdkChain { .join(", "); info!( - "[{}] waiting for commit of tx hashes(s) {}", - self.id(), + id = %self.id(), + "wait_for_block_commits: waiting for commit of tx hashes(s) {}", hashes ); @@ -831,8 +836,8 @@ impl CosmosSdkChain { |index| { if all_tx_results_found(&tx_sync_results) { trace!( - "[{}] wait_for_block_commits: retrieved {} tx results after {} tries ({}ms)", - self.id(), + id = %self.id(), + "wait_for_block_commits: retrieved {} tx results after {} tries ({}ms)", tx_sync_results.len(), index, start.elapsed().as_millis() @@ -908,6 +913,9 @@ impl CosmosSdkChain { /// Query the chain's latest height pub fn query_latest_height(&self) -> Result { + crate::time!("query_latest_height"); + crate::telemetry!(query, self.id(), "query_latest_height"); + let status = self.status()?; Ok(ICSHeight { revision_number: ChainId::chain_version(status.node_info.network.as_str()), @@ -1190,6 +1198,7 @@ impl ChainEndpoint for CosmosSdkChain { fn query_commitment_prefix(&self) -> Result { crate::time!("query_commitment_prefix"); + crate::telemetry!(query, self.id(), "query_commitment_prefix"); // TODO - do a real chain query CommitmentPrefix::try_from(self.config().store_prefix.as_bytes().to_vec()) @@ -1199,6 +1208,8 @@ impl ChainEndpoint for CosmosSdkChain { /// Query the chain status fn query_status(&self) -> Result { crate::time!("query_status"); + crate::telemetry!(query, self.id(), "query_status"); + let status = self.status()?; let time = status.sync_info.latest_block_time; @@ -1218,6 +1229,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryClientStatesRequest, ) -> Result, Error> { crate::time!("query_clients"); + crate::telemetry!(query, self.id(), "query_clients"); let mut client = self .block_on( @@ -1252,6 +1264,7 @@ impl ChainEndpoint for CosmosSdkChain { height: ICSHeight, ) -> Result { crate::time!("query_client_state"); + crate::telemetry!(query, self.id(), "query_client_state"); let client_state = self .query(ClientStatePath(client_id.clone()), height, false) @@ -1265,6 +1278,7 @@ impl ChainEndpoint for CosmosSdkChain { height: ICSHeight, ) -> Result<(AnyClientState, MerkleProof), Error> { crate::time!("query_upgraded_client_state"); + crate::telemetry!(query, self.id(), "query_upgraded_client_state"); // Query for the value and the proof. let tm_height = Height::try_from(height.revision_height).map_err(Error::invalid_height)?; @@ -1284,6 +1298,7 @@ impl ChainEndpoint for CosmosSdkChain { height: ICSHeight, ) -> Result<(AnyConsensusState, MerkleProof), Error> { crate::time!("query_upgraded_consensus_state"); + crate::telemetry!(query, self.id(), "query_upgraded_consensus_state"); let tm_height = Height::try_from(height.revision_height).map_err(Error::invalid_height)?; @@ -1305,6 +1320,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryConsensusStatesRequest, ) -> Result, Error> { crate::time!("query_consensus_states"); + crate::telemetry!(query, self.id(), "query_consensus_states"); let mut client = self .block_on( @@ -1337,6 +1353,7 @@ impl ChainEndpoint for CosmosSdkChain { query_height: ICSHeight, ) -> Result { crate::time!("query_consensus_state"); + crate::telemetry!(query, self.id(), "query_consensus_state"); let (consensus_state, _proof) = self.proven_client_consensus(&client_id, consensus_height, query_height)?; @@ -1348,7 +1365,8 @@ impl ChainEndpoint for CosmosSdkChain { &self, request: QueryClientConnectionsRequest, ) -> Result, Error> { - crate::time!("query_connections"); + crate::time!("query_client_connections"); + crate::telemetry!(query, self.id(), "query_client_connections"); let mut client = self .block_on( @@ -1383,6 +1401,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryConnectionsRequest, ) -> Result, Error> { crate::time!("query_connections"); + crate::telemetry!(query, self.id(), "query_connections"); let mut client = self .block_on( @@ -1416,6 +1435,9 @@ impl ChainEndpoint for CosmosSdkChain { connection_id: &ConnectionId, height: ICSHeight, ) -> Result { + crate::time!("query_connection"); + crate::telemetry!(query, self.id(), "query_connection"); + async fn do_query_connection( chain: &CosmosSdkChain, connection_id: &ConnectionId, @@ -1473,6 +1495,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryConnectionChannelsRequest, ) -> Result, Error> { crate::time!("query_connection_channels"); + crate::telemetry!(query, self.id(), "query_connection_channels"); let mut client = self .block_on( @@ -1504,7 +1527,8 @@ impl ChainEndpoint for CosmosSdkChain { &self, request: QueryChannelsRequest, ) -> Result, Error> { - crate::time!("query_connections"); + crate::time!("query_channels"); + crate::telemetry!(query, self.id(), "query_channels"); let mut client = self .block_on( @@ -1535,6 +1559,9 @@ impl ChainEndpoint for CosmosSdkChain { channel_id: &ChannelId, height: ICSHeight, ) -> Result { + crate::time!("query_channel"); + crate::telemetry!(query, self.id(), "query_channel"); + let res = self.query( ChannelEndsPath(port_id.clone(), channel_id.clone()), height, @@ -1550,6 +1577,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryChannelClientStateRequest, ) -> Result, Error> { crate::time!("query_channel_client_state"); + crate::telemetry!(query, self.id(), "query_channel_client_state"); let mut client = self .block_on( @@ -1579,6 +1607,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryPacketCommitmentsRequest, ) -> Result<(Vec, ICSHeight), Error> { crate::time!("query_packet_commitments"); + crate::telemetry!(query, self.id(), "query_packet_commitments"); let mut client = self .block_on( @@ -1612,6 +1641,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryUnreceivedPacketsRequest, ) -> Result, Error> { crate::time!("query_unreceived_packets"); + crate::telemetry!(query, self.id(), "query_unreceived_packets"); let mut client = self .block_on( @@ -1638,6 +1668,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryPacketAcknowledgementsRequest, ) -> Result<(Vec, ICSHeight), Error> { crate::time!("query_packet_acknowledgements"); + crate::telemetry!(query, self.id(), "query_packet_acknowledgements"); let mut client = self .block_on( @@ -1670,6 +1701,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryUnreceivedAcksRequest, ) -> Result, Error> { crate::time!("query_unreceived_acknowledgements"); + crate::telemetry!(query, self.id(), "query_unreceived_acknowledgements"); let mut client = self .block_on( @@ -1695,6 +1727,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryNextSequenceReceiveRequest, ) -> Result { crate::time!("query_next_sequence_receive"); + crate::telemetry!(query, self.id(), "query_next_sequence_receive"); let mut client = self .block_on( @@ -1727,6 +1760,7 @@ impl ChainEndpoint for CosmosSdkChain { /// packets ever sent. fn query_txs(&self, request: QueryTxRequest) -> Result, Error> { crate::time!("query_txs"); + crate::telemetry!(query, self.id(), "query_txs"); match request { QueryTxRequest::Packet(request) => { @@ -1828,6 +1862,7 @@ impl ChainEndpoint for CosmosSdkChain { request: QueryBlockRequest, ) -> Result<(Vec, Vec), Error> { crate::time!("query_blocks"); + crate::telemetry!(query, self.id(), "query_blocks"); match request { QueryBlockRequest::Packet(request) => { @@ -2066,6 +2101,9 @@ impl ChainEndpoint for CosmosSdkChain { } fn query_app_version(&self, request: AppVersion) -> Result { + crate::time!("query_app_version"); + crate::telemetry!(query, self.id(), "query_app_version"); + use ibc_proto::ibc::core::port::v1::query_client::QueryClient; let mut client = self @@ -2304,6 +2342,8 @@ pub async fn broadcast_tx_sync( /// Uses the GRPC client to retrieve the account sequence async fn query_account(chain: &CosmosSdkChain, address: String) -> Result { + crate::telemetry!(query, chain.id(), "query_account"); + let mut client = ibc_proto::cosmos::auth::v1beta1::query_client::QueryClient::connect( chain.grpc_addr.clone(), ) diff --git a/relayer/src/chain/counterparty.rs b/relayer/src/chain/counterparty.rs index 49a5efec5b..3a3de610aa 100644 --- a/relayer/src/chain/counterparty.rs +++ b/relayer/src/chain/counterparty.rs @@ -73,7 +73,7 @@ fn connection_on_destination( } pub fn connection_state_on_destination( - connection: IdentifiedConnectionEnd, + connection: &IdentifiedConnectionEnd, counterparty_chain: &impl ChainHandle, ) -> Result { if let Some(remote_connection_id) = connection.connection_end.counterparty().connection_id() { diff --git a/relayer/src/config.rs b/relayer/src/config.rs index d16cfbd780..b9b5eb29b8 100644 --- a/relayer/src/config.rs +++ b/relayer/src/config.rs @@ -5,9 +5,10 @@ mod proof_specs; pub mod reload; pub mod types; -use alloc::collections::BTreeMap as HashMap; -use alloc::collections::BTreeSet as HashSet; +use alloc::collections::BTreeMap; +use alloc::collections::BTreeSet; use core::{fmt, time::Duration}; +use itertools::Itertools; use std::sync::{Arc, RwLock}; use std::{fs, fs::File, io::Write, path::Path}; @@ -75,12 +76,28 @@ impl PacketFilter { #[derive(Clone, Debug, Default, Serialize, Deserialize)] #[serde(deny_unknown_fields)] -pub struct ChannelsSpec(HashSet<(PortId, ChannelId)>); +pub struct ChannelsSpec(BTreeSet<(PortId, ChannelId)>); impl ChannelsSpec { pub fn contains(&self, channel_port: &(PortId, ChannelId)) -> bool { self.0.contains(channel_port) } + + pub fn iter(&self) -> impl Iterator { + self.0.iter() + } +} + +impl fmt::Display for ChannelsSpec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}", + self.iter() + .map(|(pid, cid)| format!("{}/{}", pid, cid)) + .join(", ") + ) + } } /// Defaults for various fields @@ -165,7 +182,7 @@ impl Config { } } - pub fn chains_map(&self) -> HashMap<&ChainId, &ChainConfig> { + pub fn chains_map(&self) -> BTreeMap<&ChainId, &ChainConfig> { self.chains.iter().map(|c| (&c.id, c)).collect() } } diff --git a/relayer/src/connection.rs b/relayer/src/connection.rs index 030cf27748..ad26a7289e 100644 --- a/relayer/src/connection.rs +++ b/relayer/src/connection.rs @@ -618,7 +618,7 @@ impl Connection { connection_id: connection_id.clone(), }; - connection_state_on_destination(connection, &self.dst_chain()) + connection_state_on_destination(&connection, &self.dst_chain()) .map_err(ConnectionError::supervisor) } diff --git a/relayer/src/object.rs b/relayer/src/object.rs index a240bab421..c830379402 100644 --- a/relayer/src/object.rs +++ b/relayer/src/object.rs @@ -63,7 +63,7 @@ pub struct Connection { impl Connection { pub fn short_name(&self) -> String { format!( - "connection::{}:{} -> {}", + "connection::{}:{}->{}", self.src_connection_id, self.src_chain_id, self.dst_chain_id, ) } @@ -88,7 +88,7 @@ pub struct Channel { impl Channel { pub fn short_name(&self) -> String { format!( - "channel::{}/{}:{} -> {}", + "channel::{}/{}:{}->{}", self.src_channel_id, self.src_port_id, self.src_chain_id, self.dst_chain_id, ) } diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 3189541162..1ecd3e61b0 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -15,17 +15,22 @@ use ibc::{ Height, }; -use crate::util::lock::LockExt; -use crate::util::task::{spawn_background_task, Next, TaskError, TaskHandle}; use crate::{ chain::{handle::ChainHandle, HealthCheck}, config::{ChainConfig, Config}, - event, - event::monitor::{Error as EventError, ErrorDetail as EventErrorDetail, EventBatch}, + event::{ + self, + monitor::{Error as EventError, ErrorDetail as EventErrorDetail, EventBatch}, + }, object::Object, registry::{Registry, SharedRegistry}, rest, - util::try_recv_multiple, + supervisor::scan::ScanMode, + util::{ + lock::LockExt, + task::{spawn_background_task, Next, TaskError, TaskHandle}, + try_recv_multiple, + }, worker::WorkerMap, }; @@ -38,13 +43,13 @@ pub use error::{Error, ErrorDetail}; pub mod dump_state; use dump_state::SupervisorState; +pub mod scan; pub mod spawn; -use spawn::SpawnContext; pub mod cmd; use cmd::{CmdEffect, ConfigUpdate, SupervisorCmd}; -use self::spawn::SpawnMode; +use self::{scan::ChainScanner, spawn::SpawnContext}; type ArcBatch = Arc>; type Subscription = Receiver; @@ -60,6 +65,18 @@ pub struct SupervisorHandle { tasks: Vec, } +/// Options for the supervisor +#[derive(Debug)] +pub struct SupervisorOptions { + /// Perform a health check of all chains we connect to + pub health_check: bool, + + /// Force a full scan of the chains for clients, connections, and channels, + /// even when an allow list is configured for a chain and the full scan could + /// be omitted. + pub force_full_scan: bool, +} + /** Spawn a supervisor for testing purpose using the provided [`SharedConfig`] and [`SharedRegistry`]. Returns a @@ -70,11 +87,11 @@ pub fn spawn_supervisor( config: Arc>, registry: SharedRegistry, rest_rx: Option, - do_health_check: bool, + options: SupervisorOptions, ) -> Result { let (sender, receiver) = unbounded(); - let tasks = spawn_supervisor_tasks(config, registry, rest_rx, receiver, do_health_check)?; + let tasks = spawn_supervisor_tasks(config, registry, rest_rx, receiver, options)?; Ok(SupervisorHandle { sender, tasks }) } @@ -108,23 +125,36 @@ pub fn spawn_supervisor_tasks( registry: SharedRegistry, rest_rx: Option, cmd_rx: Receiver, - do_health_check: bool, + options: SupervisorOptions, ) -> Result, Error> { - if do_health_check { + if options.health_check { health_check(&config.acquire_read(), &mut registry.write()); } let workers = Arc::new(RwLock::new(WorkerMap::new())); let client_state_filter = Arc::new(RwLock::new(FilterPolicy::default())); - spawn_context( + let scan = chain_scanner( &config.acquire_read(), &mut registry.write(), &mut client_state_filter.acquire_write(), + if options.force_full_scan { + ScanMode::Full + } else { + ScanMode::Auto + }, + ) + .scan_chains(); + + info!("Scanned chains:"); + info!("{}", scan); + + spawn_context( + &config.acquire_read(), + &mut registry.write(), &mut workers.acquire_write(), - SpawnMode::Startup, ) - .spawn_workers(); + .spawn_workers(scan); let subscriptions = Arc::new(RwLock::new(init_subscriptions( &config.acquire_read(), @@ -450,11 +480,18 @@ fn collect_events( fn spawn_context<'a, Chain: ChainHandle>( config: &'a Config, registry: &'a mut Registry, - client_state_filter: &'a mut FilterPolicy, workers: &'a mut WorkerMap, - mode: SpawnMode, ) -> SpawnContext<'a, Chain> { - SpawnContext::new(config, registry, client_state_filter, workers, mode) + SpawnContext::new(config, registry, workers) +} + +fn chain_scanner<'a, Chain: ChainHandle>( + config: &'a Config, + registry: &'a mut Registry, + client_state_filter: &'a mut FilterPolicy, + full_scan: ScanMode, +) -> ChainScanner<'a, Chain> { + ChainScanner::new(config, registry, client_state_filter, full_scan) } /// Perform a health check on all connected chains @@ -676,31 +713,21 @@ fn remove_chain( config: &mut Config, registry: &mut Registry, workers: &mut WorkerMap, - client_state_filter: &mut FilterPolicy, id: &ChainId, ) -> CmdEffect { if !config.has_chain(id) { - info!(chain.id=%id, "skipping removal of non-existing chain"); + info!(chain = %id, "skipping removal of non-existing chain"); return CmdEffect::Nothing; } - info!(chain.id=%id, "removing existing chain"); - + info!(chain = %id, "removing existing chain"); config.chains.retain(|c| &c.id != id); - debug!(chain.id=%id, "shutting down workers"); - - let mut ctx = spawn_context( - config, - registry, - client_state_filter, - workers, - SpawnMode::Reload, - ); - + debug!(chain = %id, "shutting down workers"); + let mut ctx = spawn_context(config, registry, workers); ctx.shutdown_workers_for_chain(id); - debug!(chain.id=%id, "shutting down chain runtime"); + debug!(chain = %id, "shutting down chain runtime"); registry.shutdown(id); CmdEffect::ConfigChanged @@ -721,15 +748,15 @@ fn add_chain( let id = chain_config.id.clone(); if config.has_chain(&id) { - info!(chain.id=%id, "skipping addition of already existing chain"); + info!(chain = %id, "skipping addition of already existing chain"); return CmdEffect::Nothing; } - info!(chain.id=%id, "adding new chain"); + info!(chain = %id, "adding new chain"); - config.chains.push(chain_config); + config.chains.push(chain_config.clone()); - debug!(chain.id=%id, "spawning chain runtime"); + debug!(chain = %id, "spawning chain runtime"); if let Err(e) = registry.spawn(&id) { error!( @@ -743,17 +770,27 @@ fn add_chain( return CmdEffect::Nothing; } - debug!(chain.id=%id, "spawning workers"); + debug!(chain = %id, "scanning chain"); - let mut ctx = spawn_context( - config, - registry, - client_state_filter, - workers, - SpawnMode::Reload, - ); + let scan_result = chain_scanner(config, registry, client_state_filter, ScanMode::Auto) + .scan_chain(&chain_config); + + let scan = match scan_result { + Ok(scan) => scan, + Err(e) => { + error!("failed to scan chain {}: {}", id, e); - ctx.spawn_workers_for_chain(&id); + // Remove the newly added config + config.chains.retain(|c| c.id != id); + + return CmdEffect::Nothing; + } + }; + + debug!(chain = %id, "spawning workers"); + + let mut ctx = spawn_context(config, registry, workers); + ctx.spawn_workers_for_chain(scan); CmdEffect::ConfigChanged } @@ -771,15 +808,9 @@ fn update_chain( client_state_filter: &mut FilterPolicy, chain_config: ChainConfig, ) -> CmdEffect { - info!(chain.id=%chain_config.id, "updating existing chain"); + info!(chain = %chain_config.id, "updating existing chain"); - let removed = remove_chain( - config, - registry, - workers, - client_state_filter, - &chain_config.id, - ); + let removed = remove_chain(config, registry, workers, &chain_config.id); let added = add_chain(config, registry, workers, client_state_filter, chain_config); @@ -801,9 +832,7 @@ fn update_config( ConfigUpdate::Add(chain_config) => { add_chain(config, registry, workers, client_state_filter, chain_config) } - ConfigUpdate::Remove(id) => { - remove_chain(config, registry, workers, client_state_filter, &id) - } + ConfigUpdate::Remove(id) => remove_chain(config, registry, workers, &id), ConfigUpdate::Update(chain_config) => { update_chain(config, registry, workers, client_state_filter, chain_config) } diff --git a/relayer/src/supervisor/error.rs b/relayer/src/supervisor/error.rs index b09c2aaa87..a1310979d4 100644 --- a/relayer/src/supervisor/error.rs +++ b/relayer/src/supervisor/error.rs @@ -5,6 +5,7 @@ use ibc::core::ics24_host::identifier::{ChainId, ChannelId, ConnectionId, PortId use crate::error::Error as RelayerError; use crate::registry::SpawnError; +use crate::supervisor::scan::Error as ScanError; define_error! { Error { @@ -64,5 +65,9 @@ define_error! { Spawn [ SpawnError ] |_| { "supervisor was not able to connect to any chains" }, + + Scan + [ ScanError ] + |_| { "supervisor encountered an error when scanning chains" }, } } diff --git a/relayer/src/supervisor/scan.rs b/relayer/src/supervisor/scan.rs new file mode 100644 index 0000000000..9bf0f9f3e5 --- /dev/null +++ b/relayer/src/supervisor/scan.rs @@ -0,0 +1,779 @@ +#![allow(unused_imports)] + +use core::fmt; +use std::collections::BTreeMap; + +use itertools::Itertools; +use tracing::{debug, error, info, info_span, warn}; + +use ibc::{ + core::{ + ics02_client::client_state::{ClientState, IdentifiedAnyClientState}, + ics03_connection::connection::{IdentifiedConnectionEnd, State as ConnectionState}, + ics04_channel::channel::{IdentifiedChannelEnd, State as ChannelState}, + ics24_host::identifier::{ChainId, ChannelId, ClientId, ConnectionId, PortId}, + }, + Height, +}; + +use ibc_proto::ibc::core::{ + channel::v1::QueryConnectionChannelsRequest, client::v1::QueryClientStatesRequest, + connection::v1::QueryClientConnectionsRequest, +}; + +use crate::{ + chain::{ + counterparty::{channel_on_destination, connection_state_on_destination}, + handle::ChainHandle, + }, + config::{ChainConfig, ChannelsSpec, Config, ModeConfig, PacketFilter}, + object::{Channel, Client, Connection, Object, Packet}, + registry::{Registry, SharedRegistry}, + supervisor::client_state_filter::{FilterPolicy, Permission}, + supervisor::error::Error as SupervisorError, + worker::WorkerMap, +}; + +use crate::chain::counterparty::{unreceived_acknowledgements, unreceived_packets}; + +use crate::error::Error as RelayerError; +use crate::registry::SpawnError; + +flex_error::define_error! { + Error { + Spawn + [ SpawnError ] + |_| { "spawn" }, + + Query + [ RelayerError ] + |_| { "query" }, + + MissingConnectionHop + { + port_id: PortId, + channel_id: ChannelId, + chain_id: ChainId, + } + |e| { + format_args!( + "could not retrieve the connection hop underlying port/channel {}/{} on chain '{}'", + e.port_id, e.channel_id, e.chain_id + ) + }, + + UninitializedChannel + { + port_id: PortId, + channel_id: ChannelId, + chain_id: ChainId, + } + |e| { + format_args!( + "channel '{}/{}' on chain '{}' is uninitialized", + e.port_id, e.channel_id, e.chain_id + ) + }, + } +} + +#[derive(Debug)] +pub struct ChainsScan { + pub chains: Vec>, +} + +impl fmt::Display for ChainsScan { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for scan in self.chains.iter().flatten() { + writeln!(f, "# Chain: {}", scan.chain_id)?; + + for client in scan.clients.values() { + writeln!(f, " - Client: {}", client.client.client_id)?; + + for conn in client.connections.values() { + let counterparty = conn + .counterparty_state + .as_ref() + .map(|s| s.to_string()) + .unwrap_or_else(|| "".to_string()); + + writeln!(f, " * Connection: {}", conn.connection.connection_id)?; + writeln!(f, " | State: {}", conn.state())?; + writeln!(f, " | Counterparty state: {}", counterparty)?; + + for chan in conn.channels.values() { + let counterparty = chan + .counterparty + .as_ref() + .map(|c| c.channel_id.to_string()) + .unwrap_or_else(|| "".to_string()); + + writeln!(f, " + Channel: {}", chan.channel.channel_id)?; + writeln!(f, " | Port: {}", chan.channel.port_id)?; + writeln!(f, " | State: {}", chan.channel.channel_end.state())?; + writeln!(f, " | Counterparty: {}", counterparty)?; + } + } + } + } + + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub struct ChainScan { + pub chain_id: ChainId, + pub clients: BTreeMap, +} + +impl ChainScan { + fn new(chain_id: ChainId) -> ChainScan { + Self { + chain_id, + clients: BTreeMap::new(), + } + } +} + +#[derive(Clone, Debug)] +pub struct ClientScan { + pub client: IdentifiedAnyClientState, + pub connections: BTreeMap, +} + +impl ClientScan { + fn new(client: IdentifiedAnyClientState) -> ClientScan { + Self { + client, + connections: BTreeMap::new(), + } + } + + pub fn id(&self) -> &ClientId { + &self.client.client_id + } + + pub fn counterparty_chain_id(&self) -> ChainId { + self.client.client_state.chain_id() + } +} + +#[derive(Clone, Debug)] +pub struct ConnectionScan { + pub connection: IdentifiedConnectionEnd, + pub counterparty_state: Option, + pub channels: BTreeMap, +} + +impl ConnectionScan { + pub fn new( + connection: IdentifiedConnectionEnd, + counterparty_state: Option, + ) -> Self { + Self { + connection, + counterparty_state, + channels: BTreeMap::new(), + } + } + + pub fn id(&self) -> &ConnectionId { + &self.connection.connection_id + } + + pub fn state(&self) -> ConnectionState { + self.connection.connection_end.state + } + + pub fn is_open(&self) -> bool { + self.connection.connection_end.is_open() + } +} + +#[derive(Clone, Debug)] +pub struct ChannelScan { + pub channel: IdentifiedChannelEnd, + pub counterparty: Option, +} + +impl ChannelScan { + pub fn new(channel: IdentifiedChannelEnd, counterparty: Option) -> Self { + Self { + channel, + counterparty, + } + } + + pub fn id(&self) -> &ChannelId { + &self.channel.channel_id + } + + pub fn unreceived_packets_on_counterparty( + &self, + chain: &impl ChainHandle, + counterparty_chain: &impl ChainHandle, + ) -> Option> { + self.counterparty.as_ref().map(|counterparty| { + unreceived_packets(counterparty_chain, chain, counterparty).unwrap_or_default() + }) + } + + pub fn unreceived_acknowledgements_on_counterparty( + &self, + chain: &impl ChainHandle, + counterparty_chain: &impl ChainHandle, + ) -> Option> { + self.counterparty.as_ref().map(|counterparty| { + unreceived_acknowledgements(counterparty_chain, chain, counterparty).unwrap_or_default() + }) + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum ScanMode { + Auto, + Full, +} + +pub struct ChainScanner<'a, Chain: ChainHandle> { + config: &'a Config, + registry: &'a mut Registry, + client_state_filter: &'a mut FilterPolicy, + scan_mode: ScanMode, +} + +impl<'a, Chain: ChainHandle> ChainScanner<'a, Chain> { + pub fn new( + config: &'a Config, + registry: &'a mut Registry, + client_state_filter: &'a mut FilterPolicy, + scan_mode: ScanMode, + ) -> Self { + Self { + config, + registry, + client_state_filter, + scan_mode, + } + } + + pub fn scan_chains(mut self) -> ChainsScan { + let mut scans = ChainsScan { + chains: Vec::with_capacity(self.config.chains.len()), + }; + + for chain in self.config.chains.clone() { + scans.chains.push(self.scan_chain(&chain)); + } + + scans + } + + pub fn scan_chain(&mut self, chain_config: &ChainConfig) -> Result { + let span = info_span!("scan.chain", chain = %chain_config.id); + let _guard = span.enter(); + + info!("scanning chain..."); + + let chain = match self.registry.get_or_spawn(&chain_config.id) { + Ok(chain_handle) => chain_handle, + Err(e) => { + error!( + "aborting scan, reason: failed to spawn chain runtime with error: {}", + e + ); + + return Err(Error::spawn(e)); + } + }; + + let mut scan = ChainScan::new(chain_config.id.clone()); + + match self.use_allow_list(chain_config) { + Some(spec) if self.scan_mode == ScanMode::Auto => { + info!("chain uses an allow list, skipping scan for fast startup"); + info!("allowed ports/channels: {}", spec); + + self.query_allowed_channels(&chain, spec, &mut scan)?; + } + _ => { + info!("scanning chain for all clients, connections and channels"); + self.scan_all_clients(&chain, &mut scan)?; + } + }; + + Ok(scan) + } + + pub fn query_allowed_channels( + &mut self, + chain: &Chain, + spec: &ChannelsSpec, + scan: &mut ChainScan, + ) -> Result<(), Error> { + info!("querying allowed channels..."); + + for (port_id, channel_id) in spec.iter() { + let result = scan_allowed_channel(self.registry, chain, port_id, channel_id); + + match result { + Ok(ScannedChannel { + channel, + counterparty_channel, + connection, + counterparty_connection_state, + client, + }) => { + let client_scan = scan + .clients + .entry(client.client_id.clone()) + .or_insert_with(|| ClientScan::new(client)); + + let connection_scan = client_scan + .connections + .entry(connection.connection_id.clone()) + .or_insert_with(|| { + ConnectionScan::new(connection, counterparty_connection_state) + }); + + connection_scan + .channels + .entry(channel.channel_id.clone()) + .or_insert_with(|| ChannelScan::new(channel, counterparty_channel)); + } + Err(e) => error!(channel = %channel_id, "failed to scan channel, reason: {}", e), + } + } + + Ok(()) + } + + pub fn scan_all_clients(&mut self, chain: &Chain, scan: &mut ChainScan) -> Result<(), Error> { + info!("scanning all clients..."); + + let clients = query_all_clients(chain)?; + + for client in clients { + if let Some(client_scan) = self.scan_client(chain, client)? { + scan.clients.insert(client_scan.id().clone(), client_scan); + } + } + + Ok(()) + } + + fn scan_client( + &mut self, + chain: &Chain, + client: IdentifiedAnyClientState, + ) -> Result, Error> { + let span = info_span!("scan.client", client = %client.client_id); + let _guard = span.enter(); + + info!("scanning client..."); + + if !self.client_allowed(chain, &client) { + warn!( + trust_threshold = ?client.client_state.trust_threshold(), + "skipping client, reason: client is not allowed", + ); + + return Ok(None); + } + + let counterparty_chain_id = client.client_state.chain_id(); + let has_counterparty = self.config.has_chain(&counterparty_chain_id); + + if !has_counterparty { + debug!( + chain = %chain.id(), + counterparty_chain = %counterparty_chain_id, + "skipping client because its counterparty is not present in the config", + ); + + return Ok(None); + } + + let client_connections_ids = query_client_connections(chain, &client.client_id)?; + + let mut scan = ClientScan::new(client); + + for connection_end in client_connections_ids { + if let Some(connection_scan) = + self.scan_connection(chain, &scan.client, connection_end)? + { + scan.connections + .insert(connection_scan.id().clone(), connection_scan); + } + } + + Ok(Some(scan)) + } + + fn scan_connection( + &mut self, + chain: &Chain, + client: &IdentifiedAnyClientState, + connection: IdentifiedConnectionEnd, + ) -> Result, Error> { + let span = info_span!("scan.connection", connection = %connection.connection_id); + let _guard = span.enter(); + + info!("scanning connection..."); + + if !self.connection_allowed(chain, client, &connection) { + warn!("skipping connection, reason: connection is not allowed",); + return Ok(None); + } + + let mut scan = ConnectionScan::new(connection, None); + + if !scan.is_open() { + warn!("connection is not open, skipping scan of channels over this connection"); + return Ok(Some(scan)); + } + + let counterparty_state = match self.counterparty_connection_state(client, &scan.connection) + { + Ok(state) if !state.eq(&ConnectionState::Open) => { + warn!("counterparty connection is not open, skipping scan of channels over this connection"); + return Ok(Some(scan)); + } + Err(e) => { + error!("error fetching counterparty connection state: {}", e); + return Ok(None); + } + Ok(state) => state, + }; + + scan.counterparty_state = Some(counterparty_state); + + let channels = match query_connection_channels(chain, scan.connection.id()) { + Ok(channels) => channels, + Err(e) => { + error!("failed to fetch connection channels: {}", e); + Vec::new() + } + }; + + let counterparty_chain = self + .registry + .get_or_spawn(&client.client_state.chain_id()) + .map_err(Error::spawn)?; + + let channels = channels + .into_iter() + .filter(|channel| self.channel_allowed(chain, channel)) + .map(|channel| { + let counterparty = + channel_on_destination(&channel, &scan.connection, &counterparty_chain) + .unwrap_or_default(); + + let scan = ChannelScan { + channel, + counterparty, + }; + + (scan.id().clone(), scan) + }) + .collect(); + + scan.channels = channels; + + Ok(Some(scan)) + } + + fn counterparty_connection_state( + &mut self, + client: &IdentifiedAnyClientState, + connection: &IdentifiedConnectionEnd, + ) -> Result { + let counterparty_chain = self + .registry + .get_or_spawn(&client.client_state.chain_id()) + .map_err(Error::spawn)?; + + // FIXME + Ok(connection_state_on_destination(connection, &counterparty_chain).unwrap()) + } + + fn filtering_enabled(&self) -> bool { + self.config.mode.packets.filter + } + + fn use_allow_list<'b>(&self, chain_config: &'b ChainConfig) -> Option<&'b ChannelsSpec> { + if !self.filtering_enabled() { + return None; + } + + match chain_config.packet_filter { + PacketFilter::Allow(ref spec) => Some(spec), + _ => None, + } + } + + fn client_allowed(&mut self, chain: &Chain, client: &IdentifiedAnyClientState) -> bool { + if !self.filtering_enabled() { + return true; + }; + + let permission = self.client_state_filter.control_client( + &chain.id(), + &client.client_id, + &client.client_state, + ); + + permission == Permission::Allow + } + + fn connection_allowed( + &mut self, + chain: &Chain, + client: &IdentifiedAnyClientState, + connection: &IdentifiedConnectionEnd, + ) -> bool { + if !self.filtering_enabled() { + return true; + } + + let permission = self.client_state_filter.control_connection_end_and_client( + self.registry, + &chain.id(), + &client.client_state, + &connection.connection_end, + &connection.connection_id, + ); + + match permission { + Ok(Permission::Deny) => { + warn!( + "skipping workers for chain {}, client {} & conn {}, \ + reason: client or counterparty client is not allowed", + chain.id(), + client.client_id, + connection.connection_id + ); + + false + } + Err(e) => { + error!( + "skipping workers for chain {}, client {} & conn {}, reason: {}", + chain.id(), + client.client_id, + connection.connection_id, + e + ); + + false + } + _ => true, + } + } + + fn channel_allowed(&mut self, chain: &Chain, channel: &IdentifiedChannelEnd) -> bool { + self.config + .packets_on_channel_allowed(&chain.id(), &channel.port_id, &channel.channel_id) + } +} + +struct ScannedChannel { + channel: IdentifiedChannelEnd, + counterparty_channel: Option, + connection: IdentifiedConnectionEnd, + counterparty_connection_state: Option, + client: IdentifiedAnyClientState, +} + +fn scan_allowed_channel( + registry: &'_ mut Registry, + chain: &Chain, + port_id: &PortId, + channel_id: &ChannelId, +) -> Result { + let span = info_span!("scan.channel", port = %port_id, channel = %channel_id); + let _guard = span.enter(); + + info!("querying channel..."); + let channel = query_channel(chain, port_id, channel_id)?; + + if channel + .channel_end + .state_matches(&ChannelState::Uninitialized) + { + return Err(Error::uninitialized_channel( + port_id.clone(), + channel_id.clone(), + chain.id(), + )); + } + + let connection = query_connection_for_channel(chain, &channel)?; + let client_id = connection.connection_end.client_id(); + + info!( + connection = %connection.connection_id, client = %client_id, + "found connection and client", + ); + + info!(client = %client_id, "querying client..."); + let client = query_client(chain, client_id)?; + + info!( + client = %client_id, + counterparty_chain = %client.client_state.chain_id(), + "found counterparty chain for client", + ); + + let counterparty_chain = registry + .get_or_spawn(&client.client_state.chain_id()) + .map_err(Error::spawn)?; + + let counterparty_channel = + channel_on_destination(&channel, &connection, &counterparty_chain).unwrap_or_default(); + + let counterparty_channel_name = counterparty_channel + .as_ref() + .map(|c| c.channel_id.to_string()) + .unwrap_or_else(|| "".to_string()); + + info!( + counterparty_channel = %counterparty_channel_name, + "found counterparty channel" + ); + + let counterparty_connection_state = + connection_state_on_destination(&connection, &counterparty_chain) + .map(Some) + .unwrap_or_default(); + + let counterparty_connection_name = counterparty_connection_state + .as_ref() + .map(|s| s.to_string()) + .unwrap_or_else(|| "".to_string()); + + info!( + counterparty_connection_state = %counterparty_connection_name, + "found counterparty connection state" + ); + + Ok(ScannedChannel { + channel, + counterparty_channel, + connection, + counterparty_connection_state, + client, + }) +} + +fn query_client( + chain: &Chain, + client_id: &ClientId, +) -> Result { + let client = chain + .query_client_state(client_id, Height::zero()) + .map_err(Error::query)?; + + Ok(IdentifiedAnyClientState::new(client_id.clone(), client)) +} + +fn query_channel( + chain: &Chain, + port_id: &PortId, + channel_id: &ChannelId, +) -> Result { + let channel_end = chain + .query_channel(port_id, channel_id, Height::zero()) + .map_err(Error::query)?; + + Ok(IdentifiedChannelEnd::new( + port_id.clone(), + channel_id.clone(), + channel_end, + )) +} + +fn query_connection_for_channel( + chain: &Chain, + channel: &IdentifiedChannelEnd, +) -> Result { + let connection_id = channel + .channel_end + .connection_hops() + .first() + .cloned() + .ok_or_else(|| { + Error::missing_connection_hop( + channel.port_id.clone(), + channel.channel_id.clone(), + chain.id(), + ) + })?; + + query_connection(chain, &connection_id) +} + +fn query_all_clients( + chain: &Chain, +) -> Result, Error> { + let clients_req = QueryClientStatesRequest { + pagination: ibc_proto::cosmos::base::query::pagination::all(), + }; + + chain.query_clients(clients_req).map_err(Error::query) +} + +fn query_client_connections( + chain: &Chain, + client_id: &ClientId, +) -> Result, Error> { + let conns_req = QueryClientConnectionsRequest { + client_id: client_id.to_string(), + }; + + let ids = chain + .query_client_connections(conns_req) + .map_err(Error::query)?; + + let connections = ids + .into_iter() + .filter_map(|id| match query_connection(chain, &id) { + Ok(connection) => Some(connection), + Err(e) => { + error!("failed to query connection: {}", e); + None + } + }) + .collect_vec(); + + Ok(connections) +} + +fn query_connection( + chain: &Chain, + connection_id: &ConnectionId, +) -> Result { + let connection_end = chain + .query_connection(connection_id, Height::zero()) + .map_err(Error::query)?; + + Ok(IdentifiedConnectionEnd { + connection_id: connection_id.clone(), + connection_end, + }) +} + +fn query_connection_channels( + chain: &Chain, + connection_id: &ConnectionId, +) -> Result, Error> { + let chans_req = QueryConnectionChannelsRequest { + connection: connection_id.to_string(), + pagination: ibc_proto::cosmos::base::query::pagination::all(), + }; + + chain + .query_connection_channels(chans_req) + .map_err(Error::query) +} diff --git a/relayer/src/supervisor/spawn.rs b/relayer/src/supervisor/spawn.rs index 30e45bc902..d58469d719 100644 --- a/relayer/src/supervisor/spawn.rs +++ b/relayer/src/supervisor/spawn.rs @@ -1,237 +1,76 @@ -use itertools::Itertools; -use tracing::{debug, error, trace, warn}; - -use ibc::{ - core::{ - ics02_client::client_state::{ClientState, IdentifiedAnyClientState}, - ics03_connection::connection::{IdentifiedConnectionEnd, State as ConnectionState}, - ics04_channel::channel::{IdentifiedChannelEnd, State as ChannelState}, - ics24_host::identifier::{ChainId, ConnectionId}, - }, - Height, -}; +use tracing::{error, info}; -use ibc_proto::ibc::core::{ - channel::v1::QueryConnectionChannelsRequest, client::v1::QueryClientStatesRequest, - connection::v1::QueryClientConnectionsRequest, +use ibc::core::{ + ics02_client::client_state::{ClientState, IdentifiedAnyClientState}, + ics03_connection::connection::IdentifiedConnectionEnd, + ics04_channel::channel::State as ChannelState, + ics24_host::identifier::ChainId, }; use crate::{ - chain::{ - counterparty::{channel_on_destination, connection_state_on_destination}, - handle::ChainHandle, - }, + chain::{counterparty::connection_state_on_destination, handle::ChainHandle}, config::Config, object::{Channel, Client, Connection, Object, Packet}, registry::Registry, - supervisor::client_state_filter::{FilterPolicy, Permission}, supervisor::error::Error as SupervisorError, worker::WorkerMap, }; -use super::Error; -use crate::chain::counterparty::{unreceived_acknowledgements, unreceived_packets}; - -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum SpawnMode { - Startup, - Reload, -} +use super::{ + scan::{ChainScan, ChainsScan, ChannelScan, ClientScan, ConnectionScan}, + Error, +}; /// A context for spawning workers within the [`crate::supervisor::Supervisor`]. pub struct SpawnContext<'a, Chain: ChainHandle> { config: &'a Config, registry: &'a mut Registry, workers: &'a mut WorkerMap, - client_state_filter: &'a mut FilterPolicy, - mode: SpawnMode, } impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { pub fn new( config: &'a Config, registry: &'a mut Registry, - client_state_filter: &'a mut FilterPolicy, workers: &'a mut WorkerMap, - mode: SpawnMode, ) -> Self { Self { config, registry, workers, - client_state_filter, - mode, - } - } - - fn client_filter_enabled(&self) -> bool { - // Currently just a wrapper over the global filter. - self.config.mode.packets.filter - } - - pub fn spawn_workers(&mut self) { - let chain_ids = self - .config - .chains - .iter() - .map(|c| &c.id) - .cloned() - .collect_vec(); - - for chain_id in chain_ids { - self.spawn_workers_for_chain(&chain_id); } } - pub fn spawn_workers_from_chain_to_chain( - &mut self, - from_chain_id: &ChainId, - to_chain_id: &ChainId, - ) { - let clients_req = QueryClientStatesRequest { - pagination: ibc_proto::cosmos::base::query::pagination::all(), - }; - - let chain = match self.registry.get_or_spawn(from_chain_id) { - Ok(chain_handle) => chain_handle, - Err(e) => { - error!( - "skipping workers for chain {}, reason: failed to spawn chain runtime with error: {}", - from_chain_id, e - ); - - return; - } - }; - - let clients = match chain.query_clients(clients_req) { - Ok(clients) => clients, - Err(e) => { - error!( - "skipping workers for chain {}, reason: failed to query clients with error: {}", - from_chain_id, e - ); - - return; - } - }; - - for client in clients { - if &client.client_state.chain_id() == to_chain_id { - self.spawn_workers_for_client(chain.clone(), client); + pub fn spawn_workers(&mut self, scan: ChainsScan) { + for chain_scan in scan.chains { + match chain_scan { + Ok(chain_scan) => self.spawn_workers_for_chain(chain_scan), + Err(e) => error!("failed to spawn worker for a chain, reason: {}", e), // TODO: Show chain id } } } - pub fn spawn_workers_for_chain(&mut self, chain_id: &ChainId) { - let clients_req = QueryClientStatesRequest { - pagination: ibc_proto::cosmos::base::query::pagination::all(), - }; - - let chain = match self.registry.get_or_spawn(chain_id) { + pub fn spawn_workers_for_chain(&mut self, scan: ChainScan) { + let chain = match self.registry.get_or_spawn(&scan.chain_id) { Ok(chain_handle) => chain_handle, Err(e) => { error!( "skipping workers for chain {}, reason: failed to spawn chain runtime with error: {}", - chain_id, e - ); - - return; - } - }; - - let clients = match chain.query_clients(clients_req) { - Ok(clients) => clients, - Err(e) => { - error!( - "skipping workers for chain {}, reason: failed to query clients with error: {}", - chain_id, e + scan.chain_id, e ); return; } }; - for client in clients { - self.spawn_workers_for_client(chain.clone(), client); - } - - if self.mode != SpawnMode::Reload { - return; - } - - let chain_ids = self - .config - .chains - .iter() - .map(|c| &c.id) - .cloned() - .collect_vec(); - - for id in chain_ids { - if chain_id == &id { - continue; - } - self.spawn_workers_from_chain_to_chain(&id, chain_id); + for (_, client_scan) in scan.clients { + self.spawn_workers_for_client(chain.clone(), client_scan); } } - pub fn spawn_workers_for_client(&mut self, chain: Chain, client: IdentifiedAnyClientState) { - // Potentially ignore the client - if self.client_filter_enabled() - && matches!( - self.client_state_filter.control_client( - &chain.id(), - &client.client_id, - &client.client_state - ), - Permission::Deny - ) - { - warn!( - "skipping workers for chain {}, client {}. \ - reason: client is not allowed (client trust level={:?})", - chain.id(), - client.client_id, - client.client_state.trust_threshold() - ); - - return; - } - - let counterparty_chain_id = client.client_state.chain_id(); - - let has_counterparty = self.config.has_chain(&counterparty_chain_id); - - if !has_counterparty { - debug!( - "skipping client worker for client {} on chain {} has its counterparty ({}) is not present in config", - client.client_id, chain.id(), counterparty_chain_id - ); - - return; - } - - let chain_id = chain.id(); - - let conns_req = QueryClientConnectionsRequest { - client_id: client.client_id.to_string(), - }; - - let client_connections = match chain.query_client_connections(conns_req) { - Ok(connections) => connections, - Err(e) => { - error!( - "skipping workers for chain {}, reason: failed to query client connections for client {}: {}", - chain_id, client.client_id, e - ); - - return; - } - }; - - for connection_id in client_connections { - self.spawn_workers_for_connection(chain.clone(), &client, connection_id); + pub fn spawn_workers_for_client(&mut self, chain: Chain, client_scan: ClientScan) { + for (_, connection_scan) in client_scan.connections { + self.spawn_workers_for_connection(chain.clone(), &client_scan.client, connection_scan); } } @@ -239,124 +78,45 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { &mut self, chain: Chain, client: &IdentifiedAnyClientState, - connection_id: ConnectionId, + connection_scan: ConnectionScan, ) { - let chain_id = chain.id(); - - let connection_end = match chain.query_connection(&connection_id, Height::zero()) { - Ok(connection_end) => connection_end, - Err(e) => { - error!( - "skipping workers for chain {} and connection {}, reason: failed to query connection end: {}", - chain_id, connection_id, e - ); - return; - } - }; - - let connection = IdentifiedConnectionEnd { - connection_id: connection_id.clone(), - connection_end: connection_end.clone(), - }; - - // Apply the client state filter - if self.client_filter_enabled() { - match self.client_state_filter.control_connection_end_and_client( - self.registry, - &chain_id, - &client.client_state, - &connection_end, - &connection_id, - ) { - Ok(Permission::Deny) => { - warn!( - "skipping workers for chain {}, client {} & conn {}. \ - reason: client or counterparty client is not allowed", - chain_id, client.client_id, connection_id - ); - return; - } - Err(e) => { - error!( - "skipping workers for chain {}, client {} & conn {}. reason: {}", - chain_id, client.client_id, connection_id, e - ); - return; - } - _ => {} // allowed - } - } - - match self.spawn_connection_workers(chain.clone(), client.clone(), connection.clone()) { - Ok(()) => debug!( + let connection_id = connection_scan.id().clone(); + + match self.spawn_connection_workers( + chain.clone(), + client.clone(), + connection_scan.connection, + ) { + Ok(true) => info!( "done spawning workers for connection {} on chain {}", - connection.connection_id, + connection_id, + chain.id(), + ), + Ok(false) => info!( + "no workers were spawn for connection {} on chain {}", + connection_id, chain.id(), ), Err(e) => error!( "skipped workers for connection {} on chain {}, reason: {}", - connection.connection_id, + connection_id, chain.id(), e ), } - if !connection_end.is_open() { - debug!( - "connection {} not open, skip workers for channels over this connection", - connection.connection_id - ); - return; - } - - match self.counterparty_connection_state(client.clone(), connection.clone()) { - Err(e) => { - debug!("error with counterparty: reason {}", e); - return; - } - Ok(state) => { - if !state.eq(&ConnectionState::Open) { - debug!( - "connection {} not open, skip workers for channels over this connection", - connection.connection_id - ); - - debug!( - "drop connection {} because its counterparty is not open", - connection_id - ); - - return; - } - } - }; - - let chans_req = QueryConnectionChannelsRequest { - connection: connection_id.to_string(), - pagination: ibc_proto::cosmos::base::query::pagination::all(), - }; - - let connection_channels = match chain.query_connection_channels(chans_req) { - Ok(channels) => channels, - Err(e) => { - error!( - "skipping workers for chain {} and connection {}, reason: failed to query its channels: {}", - chain.id(), connection_id, e - ); - - return; - } - }; - - for channel in connection_channels { - let channel_id = channel.channel_id.clone(); - - match self.spawn_workers_for_channel(chain.clone(), client, &connection, channel) { - Ok(()) => debug!( + for (channel_id, channel_scan) in connection_scan.channels { + match self.spawn_workers_for_channel(chain.clone(), client, channel_scan) { + Ok(true) => info!( "done spawning workers for chain {} and channel {}", chain.id(), channel_id, ), + Ok(false) => info!( + "no workers spawn for chain {} and channel {}", + chain.id(), + channel_id, + ), Err(e) => error!( "skipped workers for chain {} and channel {} due to error {}", chain.id(), @@ -367,25 +127,12 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { } } - fn counterparty_connection_state( - &mut self, - client: IdentifiedAnyClientState, - connection: IdentifiedConnectionEnd, - ) -> Result { - let counterparty_chain = self - .registry - .get_or_spawn(&client.client_state.chain_id()) - .map_err(Error::spawn)?; - - connection_state_on_destination(connection, &counterparty_chain) - } - fn spawn_connection_workers( &mut self, chain: Chain, client: IdentifiedAnyClientState, connection: IdentifiedConnectionEnd, - ) -> Result<(), Error> { + ) -> Result { let config_conn_enabled = self.config.mode.connections.enabled; let counterparty_chain = self @@ -394,10 +141,9 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { .map_err(Error::spawn)?; let conn_state_src = connection.connection_end.state; - let conn_state_dst = - connection_state_on_destination(connection.clone(), &counterparty_chain)?; + let conn_state_dst = connection_state_on_destination(&connection, &counterparty_chain)?; - debug!( + info!( "connection {} on chain {} is: {:?}, state on dest. chain ({}) is: {:?}", connection.connection_id, chain.id(), @@ -407,11 +153,13 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { ); if conn_state_src.is_open() && conn_state_dst.is_open() { - trace!( + info!( "connection {} on chain {} is already open, not spawning Connection worker", connection.connection_id, chain.id() ); + + Ok(false) } else if config_conn_enabled && !conn_state_dst.is_open() && conn_state_dst.less_or_equal_progress(conn_state_src) @@ -426,14 +174,16 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { self.workers .spawn(chain, counterparty_chain, &connection_object, self.config) .then(|| { - debug!( + info!( "spawning Connection worker: {}", connection_object.short_name() ); }); - } - Ok(()) + Ok(true) + } else { + Ok(false) + } } /// Spawns all the [`Worker`](crate::worker::Worker)s that will @@ -442,9 +192,8 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { &mut self, chain: Chain, client: &IdentifiedAnyClientState, - connection: &IdentifiedConnectionEnd, - channel: IdentifiedChannelEnd, - ) -> Result<(), Error> { + channel_scan: ChannelScan, + ) -> Result { let mode = &self.config.mode; let counterparty_chain = self @@ -452,17 +201,15 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { .get_or_spawn(&client.client_state.chain_id()) .map_err(SupervisorError::spawn)?; - let counterparty_channel = - channel_on_destination(&channel, connection, &counterparty_chain)?; - - let chan_state_src = channel.channel_end.state; - let chan_state_dst = counterparty_channel + let chan_state_src = channel_scan.channel.channel_end.state; + let chan_state_dst = channel_scan + .counterparty .as_ref() .map_or(ChannelState::Uninitialized, |c| c.channel_end.state); - debug!( + info!( "channel {} on chain {} is: {}; state on dest. chain ({}) is: {}", - channel.channel_id, + channel_scan.id(), chain.id(), chan_state_src, counterparty_chain.id(), @@ -472,7 +219,6 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { if (mode.clients.enabled || mode.packets.enabled) && chan_state_src.is_open() && chan_state_dst.is_open() - && self.relay_packets_on_channel(&chain, &channel) { if mode.clients.enabled { // Spawn the client worker @@ -481,6 +227,7 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { dst_chain_id: chain.id(), src_chain_id: client.client_state.chain_id(), }); + self.workers .spawn( counterparty_chain.clone(), @@ -488,22 +235,20 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { &client_object, self.config, ) - .then(|| debug!("spawned Client worker: {}", client_object.short_name())); + .then(|| info!("spawned Client worker: {}", client_object.short_name())); } if mode.packets.enabled { - // SAFETY: Safe to unwrap because the inner channel end has state open - let counterparty_channel = - counterparty_channel.expect("inner channel end is in state OPEN"); - - let has_packets = || -> bool { - !unreceived_packets(&counterparty_chain, &chain, &counterparty_channel) + let has_packets = || { + !channel_scan + .unreceived_packets_on_counterparty(&chain, &counterparty_chain) .unwrap_or_default() .is_empty() }; - let has_acks = || -> bool { - !unreceived_acknowledgements(&counterparty_chain, &chain, &counterparty_channel) + let has_acks = || { + !channel_scan + .unreceived_acknowledgements_on_counterparty(&chain, &counterparty_chain) .unwrap_or_default() .is_empty() }; @@ -514,8 +259,8 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { let path_object = Object::Packet(Packet { dst_chain_id: counterparty_chain.id(), src_chain_id: chain.id(), - src_channel_id: channel.channel_id, - src_port_id: channel.port_id, + src_channel_id: channel_scan.id().clone(), + src_port_id: channel_scan.channel.port_id.clone(), }); self.workers @@ -525,9 +270,11 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { &path_object, self.config, ) - .then(|| debug!("spawned Packet worker: {}", path_object.short_name())); + .then(|| info!("spawned Packet worker: {}", path_object.short_name())); } } + + Ok(mode.clients.enabled) } else if mode.channels.enabled && !chan_state_dst.is_open() && chan_state_dst.less_or_equal_progress(chan_state_src) @@ -536,25 +283,18 @@ impl<'a, Chain: ChainHandle> SpawnContext<'a, Chain> { let channel_object = Object::Channel(Channel { dst_chain_id: counterparty_chain.id(), src_chain_id: chain.id(), - src_channel_id: channel.channel_id, - src_port_id: channel.port_id, + src_channel_id: channel_scan.id().clone(), + src_port_id: channel_scan.channel.port_id, }); self.workers .spawn(chain, counterparty_chain, &channel_object, self.config) - .then(|| debug!("spawned Channel worker: {}", channel_object.short_name())); - } - - Ok(()) - } + .then(|| info!("spawned Channel worker: {}", channel_object.short_name())); - fn relay_packets_on_channel( - &self, - chain: &impl ChainHandle, - channel: &IdentifiedChannelEnd, - ) -> bool { - self.config - .packets_on_channel_allowed(&chain.id(), &channel.port_id, &channel.channel_id) + Ok(true) + } else { + Ok(false) + } } pub fn shutdown_workers_for_chain(&mut self, chain_id: &ChainId) { diff --git a/relayer/src/util/task.rs b/relayer/src/util/task.rs index 919e4e3212..55c573e769 100644 --- a/relayer/src/util/task.rs +++ b/relayer/src/util/task.rs @@ -4,7 +4,7 @@ use core::time::Duration; use crossbeam_channel::{bounded, Sender}; use std::sync::{Arc, RwLock}; use std::thread; -use tracing::{error, info, warn}; +use tracing::{debug, error, warn}; use crate::util::lock::LockExt; @@ -90,7 +90,7 @@ pub fn spawn_background_task( interval_pause: Option, mut step_runner: impl FnMut() -> Result> + Send + Sync + 'static, ) -> TaskHandle { - info!(parent: &span, "spawning"); + debug!(parent: &span, "spawning task"); let stopped = Arc::new(RwLock::new(false)); let write_stopped = stopped.clone(); @@ -107,14 +107,14 @@ pub fn spawn_background_task( _ => match step_runner() { Ok(Next::Continue) => {} Ok(Next::Abort) => { - info!("aborting"); + debug!("aborting task"); break; } Err(TaskError::Ignore(e)) => { - warn!("encountered ignorable error: {}", e); + warn!("task encountered ignorable error: {}", e); } Err(TaskError::Fatal(e)) => { - error!("aborting after encountering fatal error: {}", e); + error!("task aborting after encountering fatal error: {}", e); break; } }, @@ -125,7 +125,8 @@ pub fn spawn_background_task( } *write_stopped.acquire_write() = true; - info!("terminated"); + + debug!("task terminated"); }); TaskHandle { diff --git a/relayer/src/worker/channel.rs b/relayer/src/worker/channel.rs index c0b9571f38..aed739e1aa 100644 --- a/relayer/src/worker/channel.rs +++ b/relayer/src/worker/channel.rs @@ -20,7 +20,7 @@ pub fn spawn_channel_worker( cmd_rx: Receiver, ) -> TaskHandle { spawn_background_task( - error_span!("channel", channel = %channel.short_name()), + error_span!("worker.channel", channel = %channel.short_name()), Some(Duration::from_millis(200)), move || { if let Ok(cmd) = cmd_rx.try_recv() { diff --git a/telemetry/src/state.rs b/telemetry/src/state.rs index b0214d7b0c..25ddbfc240 100644 --- a/telemetry/src/state.rs +++ b/telemetry/src/state.rs @@ -50,6 +50,9 @@ pub struct TelemetryState { /// Number of timeout packets relayed, per channel timeout_packets: Counter, + + /// Number of queries emitted by the relayer, per chain and query type + queries: Counter, } impl TelemetryState { @@ -132,6 +135,15 @@ impl TelemetryState { self.timeout_packets.add(count, labels); } + + pub fn query(&self, chain_id: &ChainId, query_type: &'static str) { + let labels = &[ + KeyValue::new("chain", chain_id.to_string()), + KeyValue::new("query_type", query_type), + ]; + + self.queries.add(1, labels); + } } impl Default for TelemetryState { @@ -171,6 +183,13 @@ impl Default for TelemetryState { .u64_counter("ibc_timeout_packets") .with_description("Number of timeout packets relayed per channel") .init(), + + queries: meter + .u64_counter("queries") + .with_description( + "Number of queries emitted by the relayer, per chain and query type", + ) + .init(), } } } diff --git a/tools/integration-test/src/framework/overrides.rs b/tools/integration-test/src/framework/overrides.rs index d98ec46dc8..21d006e1e2 100644 --- a/tools/integration-test/src/framework/overrides.rs +++ b/tools/integration-test/src/framework/overrides.rs @@ -7,6 +7,7 @@ use ibc_relayer::chain::handle::ChainHandle; use ibc_relayer::config::Config; use ibc_relayer::config::SharedConfig; use ibc_relayer::registry::SharedRegistry; +use ibc_relayer::supervisor::SupervisorOptions; use ibc_relayer::supervisor::{spawn_supervisor, SupervisorHandle}; use crate::error::Error; @@ -75,7 +76,16 @@ pub trait TestOverrides { config: &SharedConfig, registry: &SharedRegistry, ) -> Result, Error> { - let handle = spawn_supervisor(config.clone(), registry.clone(), None, false)?; + let handle = spawn_supervisor( + config.clone(), + registry.clone(), + None, + SupervisorOptions { + health_check: false, + force_full_scan: false, + }, + )?; + Ok(Some(handle)) } diff --git a/tools/integration-test/src/tests/client_expiration.rs b/tools/integration-test/src/tests/client_expiration.rs index 89fbe98791..10ae9b3d8e 100644 --- a/tools/integration-test/src/tests/client_expiration.rs +++ b/tools/integration-test/src/tests/client_expiration.rs @@ -2,7 +2,7 @@ use core::time::Duration; use ibc::core::ics03_connection::connection::State as ConnectionState; use ibc::core::ics04_channel::channel::State as ChannelState; use ibc_relayer::config::{self, Config, ModeConfig}; -use ibc_relayer::supervisor::{spawn_supervisor, SupervisorHandle}; +use ibc_relayer::supervisor::{spawn_supervisor, SupervisorHandle, SupervisorOptions}; use ibc_relayer::worker::client::spawn_refresh_client; use std::thread::sleep; @@ -151,8 +151,15 @@ impl BinaryChainTest for ChannelExpirationTest { wait_for_client_expiry(); - let _supervisor = - spawn_supervisor(chains.config.clone(), chains.registry.clone(), None, false)?; + let _supervisor = spawn_supervisor( + chains.config.clone(), + chains.registry.clone(), + None, + SupervisorOptions { + health_check: false, + force_full_scan: false, + }, + )?; let port_a = tagged_transfer_port(); let port_b = tagged_transfer_port(); @@ -297,8 +304,15 @@ impl BinaryChainTest for PacketExpirationTest { wait_for_client_expiry(); - let _supervisor = - spawn_supervisor(chains.config.clone(), chains.registry.clone(), None, false)?; + let _supervisor = spawn_supervisor( + chains.config.clone(), + chains.registry.clone(), + None, + SupervisorOptions { + health_check: false, + force_full_scan: false, + }, + )?; let denom_a = chains.node_a.denom(); let balance_a = chains