From 0ed7526ccdd892a061660b0afc844f8edf76becd Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Mon, 6 Dec 2021 18:09:42 +0100 Subject: [PATCH 1/5] Improve spawning of supervisor worker tasks --- relayer-cli/src/commands/start.rs | 23 +- relayer/src/link/relay_path.rs | 11 +- relayer/src/registry.rs | 2 +- relayer/src/supervisor.rs | 1218 +++++++++-------- relayer/src/supervisor/cmd.rs | 1 - relayer/src/supervisor/spawn.rs | 65 +- relayer/src/util.rs | 2 + relayer/src/util/lock.rs | 36 + relayer/src/util/queue.rs | 34 +- relayer/src/util/task.rs | 195 +++ relayer/src/worker.rs | 173 +-- relayer/src/worker/channel.rs | 88 +- relayer/src/worker/client.rs | 230 +--- relayer/src/worker/cmd.rs | 3 - relayer/src/worker/connection.rs | 90 +- relayer/src/worker/error.rs | 9 +- relayer/src/worker/handle.rs | 63 +- relayer/src/worker/map.rs | 55 +- relayer/src/worker/packet.rs | 409 +++--- .../src/framework/binary/chain.rs | 18 +- .../src/framework/binary/node.rs | 2 +- .../src/framework/overrides.rs | 10 +- tools/integration-test/src/relayer/mod.rs | 1 - .../src/relayer/supervisor.rs | 83 -- 24 files changed, 1421 insertions(+), 1400 deletions(-) create mode 100644 relayer/src/util/lock.rs create mode 100644 relayer/src/util/task.rs delete mode 100644 tools/integration-test/src/relayer/supervisor.rs diff --git a/relayer-cli/src/commands/start.rs b/relayer-cli/src/commands/start.rs index 75c7a1c521..c751ef6e42 100644 --- a/relayer-cli/src/commands/start.rs +++ b/relayer-cli/src/commands/start.rs @@ -9,8 +9,9 @@ use crossbeam_channel::Sender; use ibc_relayer::chain::handle::{ChainHandle, ProdChainHandle}; use ibc_relayer::config::reload::ConfigReload; use ibc_relayer::config::Config; +use ibc_relayer::registry::SharedRegistry; use ibc_relayer::rest; -use ibc_relayer::supervisor::{cmd::SupervisorCmd, Supervisor}; +use ibc_relayer::supervisor::{cmd::SupervisorCmd, spawn_supervisor, SupervisorHandle}; use crate::conclude::json; use crate::conclude::Output; @@ -24,16 +25,17 @@ impl Runnable for StartCmd { let config = (*app_config()).clone(); let config = Arc::new(RwLock::new(config)); - let (mut supervisor, tx_cmd) = make_supervisor::(config.clone()) - .unwrap_or_else(|e| { + let supervisor_handle = + make_supervisor::(config.clone()).unwrap_or_else(|e| { Output::error(format!("Hermes failed to start, last error: {}", e)).exit(); unreachable!() }); match crate::config::config_path() { Some(config_path) => { - let reload = ConfigReload::new(config_path, config, tx_cmd.clone()); - register_signals(reload, tx_cmd).unwrap_or_else(|e| { + let reload = + ConfigReload::new(config_path, config, supervisor_handle.sender.clone()); + register_signals(reload, supervisor_handle.sender.clone()).unwrap_or_else(|e| { warn!("failed to install signal handler: {}", e); }); } @@ -43,10 +45,8 @@ impl Runnable for StartCmd { }; info!("Hermes has started"); - match supervisor.run() { - Ok(()) => Output::success_msg("done").exit(), - Err(e) => Output::error(format!("Hermes failed to start, last error: {}", e)).exit(), - } + + supervisor_handle.wait(); } } @@ -175,10 +175,11 @@ fn spawn_telemetry_server( fn make_supervisor( config: Arc>, -) -> Result<(Supervisor, Sender), Box> { +) -> Result> { + let registry = SharedRegistry::::new(config.clone()); spawn_telemetry_server(&config)?; let rest = spawn_rest_server(&config); - Ok(Supervisor::new(config, rest)) + Ok(spawn_supervisor(config, registry, rest, true)?) } diff --git a/relayer/src/link/relay_path.rs b/relayer/src/link/relay_path.rs index a2c8d7d184..324270190c 100644 --- a/relayer/src/link/relay_path.rs +++ b/relayer/src/link/relay_path.rs @@ -1,7 +1,7 @@ use alloc::collections::BTreeMap as HashMap; use alloc::collections::VecDeque; -use core::cell::RefCell; use core::fmt; +use std::sync::{Arc, RwLock}; use std::thread; use std::time::Instant; @@ -49,6 +49,7 @@ use crate::link::pending::PendingTxs; use crate::link::relay_sender::{AsyncReply, SubmitReply}; use crate::link::relay_summary::RelaySummary; use crate::link::{pending, relay_sender}; +use crate::util::lock::LockExt; use crate::util::queue::Queue; const MAX_RETRIES: usize = 5; @@ -65,7 +66,7 @@ pub struct RelayPath { // Marks whether this path has already cleared pending packets. // Packets should be cleared once (at startup), then this // flag turns to `false`. - clear_packets: RefCell, + clear_packets: Arc>, // Operational data, targeting both the source and destination chain. // These vectors of operational data are ordered decreasingly by @@ -119,7 +120,7 @@ impl RelayPath { dst_channel_id: dst_channel_id.clone(), dst_port_id: dst_port_id.clone(), - clear_packets: RefCell::new(true), + clear_packets: Arc::new(RwLock::new(true)), src_operational_data: Queue::new(), dst_operational_data: Queue::new(), @@ -305,7 +306,7 @@ impl RelayPath { } fn should_clear_packets(&self) -> bool { - *self.clear_packets.borrow() + *self.clear_packets.acquire_read() } /// Clears any packets that were sent before `height`, either if the `clear_packets` flag @@ -318,7 +319,7 @@ impl RelayPath { if self.should_clear_packets() || force { // Disable further clearing of old packets by default. // Clearing may still happen: upon new blocks, when `force = true`. - self.clear_packets.replace(false); + *self.clear_packets.acquire_write() = false; let clear_height = height .map(|h| h.decrement().map_err(|e| LinkError::decrement_height(h, e))) diff --git a/relayer/src/registry.rs b/relayer/src/registry.rs index bb3046a37b..31a335ac66 100644 --- a/relayer/src/registry.rs +++ b/relayer/src/registry.rs @@ -10,11 +10,11 @@ use tracing::{trace, warn}; use ibc::core::ics24_host::identifier::ChainId; +use crate::util::lock::RwArc; use crate::{ chain::{handle::ChainHandle, runtime::ChainRuntime, CosmosSdkChain}, config::Config, error::Error as RelayerError, - supervisor::RwArc, }; define_error! { diff --git a/relayer/src/supervisor.rs b/relayer/src/supervisor.rs index 05d6ab2aff..6fbcb1c631 100644 --- a/relayer/src/supervisor.rs +++ b/relayer/src/supervisor.rs @@ -4,7 +4,7 @@ use core::ops::Deref; use core::time::Duration; use std::sync::RwLock; -use crossbeam_channel::{Receiver, Sender}; +use crossbeam_channel::{unbounded, Receiver, Sender}; use itertools::Itertools; use tracing::{debug, error, info, trace, warn}; @@ -14,16 +14,18 @@ use ibc::{ Height, }; +use crate::util::lock::LockExt; +use crate::util::task::{spawn_background_task, TaskError, TaskHandle}; use crate::{ chain::{handle::ChainHandle, HealthCheck}, config::{ChainConfig, Config}, event, event::monitor::{Error as EventError, ErrorDetail as EventErrorDetail, EventBatch}, object::Object, - registry::SharedRegistry, + registry::{Registry, SharedRegistry}, rest, util::try_recv_multiple, - worker::{WorkerMap, WorkerMsg}, + worker::WorkerMap, }; pub mod client_state_filter; @@ -46,660 +48,770 @@ use self::spawn::SpawnMode; type ArcBatch = Arc>; type Subscription = Receiver; -pub type RwArc = Arc>; - -/// The supervisor listens for events on multiple pairs of chains, -/// and dispatches the events it receives to the appropriate -/// worker, based on the [`Object`] associated with each event. -pub struct Supervisor { - config: RwArc, - registry: SharedRegistry, - workers: WorkerMap, +/** + A wrapper around the SupervisorCmd sender so that we can + send stop signal to the supervisor before stopping the + chain drivers to prevent the supervisor from raising + errors caused by closed connections. +*/ +pub struct SupervisorHandle { + pub sender: Sender, + tasks: Vec, +} - cmd_rx: Receiver, - worker_msg_rx: Receiver, +/** + Spawn a supervisor for testing purpose using the provided + [`SharedConfig`] and [`SharedRegistry`]. Returns a + [`SupervisorHandle`] that stops the supervisor when the + value is dropped. +*/ +pub fn spawn_supervisor( + config: Arc>, + registry: SharedRegistry, rest_rx: Option, - client_state_filter: FilterPolicy, -} + do_health_check: bool, +) -> Result { + let (sender, receiver) = unbounded(); + + let tasks = spawn_supervisor_tasks(config, registry, rest_rx, receiver, do_health_check)?; -#[derive(Eq, PartialEq)] -enum StepResult { - Break, - Continue, + Ok(SupervisorHandle { sender, tasks }) } -impl Supervisor { - /// Create a [`Supervisor`] which will listen for events on all the chains in the [`Config`]. - pub fn new( - config: RwArc, - rest_rx: Option, - ) -> (Self, Sender) { - let registry = SharedRegistry::new(config.clone()); - Self::new_with_registry(config, registry, rest_rx) +impl SupervisorHandle { + /** + Explicitly stop the running supervisor. This is useful in tests where + the supervisor has to be stopped and restarted explicitly. + + Note that after stopping the supervisor, the only way to restart it + is by respawning a new supervisor using [`spawn_supervisor`]. + */ + pub fn shutdown(self) { + for task in self.tasks { + // Send the shutdown signals in parallel + task.shutdown(); + } + // Dropping the tasks will cause this to block until all tasks + // are terminated. } - pub fn new_with_registry( - config: RwArc, - registry: SharedRegistry, - rest_rx: Option, - ) -> (Self, Sender) { - let (worker_msg_tx, worker_msg_rx) = crossbeam_channel::unbounded(); - let workers = WorkerMap::new(worker_msg_tx); - let client_state_filter = FilterPolicy::default(); - - let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded(); - - let supervisor = Self { - config, - registry, - workers, - cmd_rx, - worker_msg_rx, - rest_rx, - client_state_filter, - }; - - (supervisor, cmd_tx) + pub fn wait(self) { + for task in self.tasks { + task.join(); + } } +} - /// Returns `true` if the relayer should filter based on - /// client state attributes, e.g., trust threshold. - /// Returns `false` otherwise. - fn client_filter_enabled(&self) -> bool { - // Currently just a wrapper over the global filter. - self.config - .read() - .expect("poisoned lock") - .mode - .packets - .filter +pub fn spawn_supervisor_tasks( + config: Arc>, + registry: SharedRegistry, + rest_rx: Option, + cmd_rx: Receiver, + do_health_check: bool, +) -> Result, Error> { + if do_health_check { + health_check(&config.acquire_read(), &mut registry.write()); } - /// Returns `true` if the relayer should filter based on - /// channel identifiers. - /// Returns `false` otherwise. - fn channel_filter_enabled(&self) -> bool { - self.config - .read() - .expect("poisoned lock") - .mode - .packets - .filter + let workers = Arc::new(RwLock::new(WorkerMap::new())); + let client_state_filter = Arc::new(RwLock::new(FilterPolicy::default())); + + spawn_context( + &config.acquire_read(), + &mut registry.write(), + &mut client_state_filter.acquire_write(), + &mut workers.acquire_write(), + SpawnMode::Startup, + ) + .spawn_workers(); + + let subscriptions = Arc::new(RwLock::new(init_subscriptions( + &config.acquire_read(), + &mut registry.write(), + )?)); + + let batch_task = spawn_batch_worker( + config.clone(), + registry.clone(), + client_state_filter.clone(), + workers.clone(), + subscriptions.clone(), + ); + + let cmd_task = spawn_cmd_worker( + config.clone(), + registry.clone(), + client_state_filter, + workers.clone(), + subscriptions, + cmd_rx, + ); + + let mut tasks = vec![batch_task, cmd_task]; + + if let Some(rest_rx) = rest_rx { + let rest_task = spawn_rest_worker(config, registry, workers, rest_rx); + tasks.push(rest_task); } - fn relay_packets_on_channel( - &self, - chain_id: &ChainId, - port_id: &PortId, - channel_id: &ChannelId, - ) -> bool { - // If filtering is disabled, then relay all channels - if !self.channel_filter_enabled() { - return true; - } + Ok(tasks) +} - self.config - .read() - .expect("poisoned lock") - .packets_on_channel_allowed(chain_id, port_id, channel_id) - } +fn spawn_batch_worker( + config: Arc>, + registry: SharedRegistry, + client_state_filter: Arc>, + workers: Arc>, + subscriptions: Arc>>, +) -> TaskHandle { + spawn_background_task( + "supervisor_batch".to_string(), + Some(Duration::from_millis(500)), + move || -> Result<(), TaskError> { + if let Some((chain, batch)) = try_recv_multiple(&subscriptions.acquire_read()) { + handle_batch( + &config.acquire_read(), + &mut registry.write(), + &mut client_state_filter.acquire_write(), + &mut workers.acquire_write(), + chain.clone(), + batch, + ); + } - fn relay_on_object(&mut self, chain_id: &ChainId, object: &Object) -> bool { - // No filter is enabled, bail fast. - if !self.channel_filter_enabled() && !self.client_filter_enabled() { - return true; - } + Ok(()) + }, + ) +} - // First, apply the channel filter - if let Object::Packet(u) = object { - if !self.relay_packets_on_channel(chain_id, u.src_port_id(), u.src_channel_id()) { - return false; +pub fn spawn_cmd_worker( + config: Arc>, + registry: SharedRegistry, + client_state_filter: Arc>, + workers: Arc>, + subscriptions: Arc>>, + cmd_rx: Receiver, +) -> TaskHandle { + spawn_background_task( + "supervisor_cmd".to_string(), + Some(Duration::from_millis(500)), + move || -> Result<(), TaskError> { + if let Ok(cmd) = cmd_rx.try_recv() { + match cmd { + SupervisorCmd::UpdateConfig(update) => { + let effect = update_config( + &mut config.acquire_write(), + &mut registry.write(), + &mut workers.acquire_write(), + &mut client_state_filter.acquire_write(), + *update, + ); + + if let CmdEffect::ConfigChanged = effect { + let new_subscriptions = + init_subscriptions(&config.acquire_read(), &mut registry.write()); + match new_subscriptions { + Ok(subs) => { + *subscriptions.acquire_write() = subs; + } + Err(Error(ErrorDetail::NoChainsAvailable(_), _)) => (), + Err(e) => return Err(TaskError::Fatal(e)), + } + } + } + SupervisorCmd::DumpState(reply_to) => { + dump_state(®istry.read(), &workers.acquire_read(), reply_to); + } + } } - } + Ok(()) + }, + ) +} - let mut registry = self.registry.write(); - - // Second, apply the client filter - let client_filter_outcome = match object { - Object::Client(client) => self - .client_state_filter - .control_client_object(&mut registry, client), - Object::Connection(conn) => self - .client_state_filter - .control_conn_object(&mut registry, conn), - Object::Channel(chan) => self - .client_state_filter - .control_chan_object(&mut registry, chan), - Object::Packet(u) => self - .client_state_filter - .control_packet_object(&mut registry, u), - }; +pub fn spawn_rest_worker( + config: Arc>, + registry: SharedRegistry, + workers: Arc>, + rest_rx: rest::Receiver, +) -> TaskHandle { + spawn_background_task( + "supervisor_rest".to_string(), + Some(Duration::from_millis(500)), + move || -> Result<(), TaskError> { + handle_rest_requests( + &config.acquire_read(), + ®istry.read(), + &workers.acquire_read(), + &rest_rx, + ); - match client_filter_outcome { - Ok(Permission::Allow) => true, - Ok(Permission::Deny) => { - warn!( - "client filter denies relaying on object {}", - object.short_name() - ); + Ok(()) + }, + ) +} - false - } - Err(e) => { - warn!( - "denying relaying on object {}, caused by: {}", - object.short_name(), - e - ); +/// Returns `true` if the relayer should filter based on +/// client state attributes, e.g., trust threshold. +/// Returns `false` otherwise. +fn client_filter_enabled(config: &Config) -> bool { + // Currently just a wrapper over the global filter. + config.mode.packets.filter +} - false - } - } - } +/// Returns `true` if the relayer should filter based on +/// channel identifiers. +/// Returns `false` otherwise. +fn channel_filter_enabled(config: &Config) -> bool { + config.mode.packets.filter +} - /// If `enabled`, build an `Object` using the provided `object_ctor` - /// and add the given `event` to the `collected` events for this `object`. - fn collect_event( - &self, - collected: &mut CollectedEvents, - event: &IbcEvent, - enabled: bool, - object_ctor: F, - ) where - F: FnOnce() -> Option, - { - if enabled { - if let Some(object) = object_ctor() { - collected - .per_object - .entry(object) - .or_default() - .push(event.clone()); - } - } +fn relay_packets_on_channel( + config: &Config, + chain_id: &ChainId, + port_id: &PortId, + channel_id: &ChannelId, +) -> bool { + // If filtering is disabled, then relay all channels + if !channel_filter_enabled(config) { + return true; } - /// Collect the events we are interested in from an [`EventBatch`], - /// and maps each [`IbcEvent`] to their corresponding [`Object`]. - pub fn collect_events( - &self, - src_chain: &impl ChainHandle, - batch: &EventBatch, - ) -> CollectedEvents { - let mut collected = CollectedEvents::new(batch.height, batch.chain_id.clone()); - - let mode = self.config.read().expect("poisoned lock").mode; - - for event in &batch.events { - match event { - IbcEvent::NewBlock(_) => { - collected.new_block = Some(event.clone()); - } - IbcEvent::UpdateClient(ref update) => { - self.collect_event(&mut collected, event, mode.clients.enabled, || { - // Collect update client events only if the worker exists - if let Ok(object) = Object::for_update_client(update, src_chain) { - self.workers.contains(&object).then(|| object) - } else { - None - } - }); - } - IbcEvent::OpenInitConnection(..) - | IbcEvent::OpenTryConnection(..) - | IbcEvent::OpenAckConnection(..) => { - self.collect_event(&mut collected, event, mode.connections.enabled, || { - event - .connection_attributes() - .map(|attr| { - Object::connection_from_conn_open_events(attr, src_chain).ok() - }) - .flatten() - }); - } - IbcEvent::OpenInitChannel(..) | IbcEvent::OpenTryChannel(..) => { - self.collect_event(&mut collected, event, mode.channels.enabled, || { - event - .channel_attributes() - .map(|attr| Object::channel_from_chan_open_events(attr, src_chain).ok()) - .flatten() - }); - } - IbcEvent::OpenAckChannel(ref open_ack) => { - // Create client and packet workers here as channel end must be opened - self.collect_event(&mut collected, event, mode.clients.enabled, || { - Object::client_from_chan_open_events(open_ack.attributes(), src_chain).ok() - }); - - self.collect_event(&mut collected, event, mode.packets.enabled, || { - Object::packet_from_chan_open_events(open_ack.attributes(), src_chain).ok() - }); - - // If handshake message relaying is enabled create worker to send the MsgChannelOpenConfirm message - self.collect_event(&mut collected, event, mode.channels.enabled, || { - Object::channel_from_chan_open_events(open_ack.attributes(), src_chain).ok() - }); - } - IbcEvent::OpenConfirmChannel(ref open_confirm) => { - // Create client worker here as channel end must be opened - self.collect_event(&mut collected, event, mode.clients.enabled, || { - Object::client_from_chan_open_events(open_confirm.attributes(), src_chain) - .ok() - }); - - self.collect_event(&mut collected, event, mode.packets.enabled, || { - Object::packet_from_chan_open_events(open_confirm.attributes(), src_chain) - .ok() - }); - } - IbcEvent::SendPacket(ref packet) => { - self.collect_event(&mut collected, event, mode.packets.enabled, || { - Object::for_send_packet(packet, src_chain).ok() - }); - } - IbcEvent::TimeoutPacket(ref packet) => { - self.collect_event(&mut collected, event, mode.packets.enabled, || { - Object::for_timeout_packet(packet, src_chain).ok() - }); - } - IbcEvent::WriteAcknowledgement(ref packet) => { - self.collect_event(&mut collected, event, mode.packets.enabled, || { - Object::for_write_ack(packet, src_chain).ok() - }); - } - IbcEvent::CloseInitChannel(ref packet) => { - self.collect_event(&mut collected, event, mode.packets.enabled, || { - Object::for_close_init_channel(packet, src_chain).ok() - }); - } - _ => (), - } - } + config.packets_on_channel_allowed(chain_id, port_id, channel_id) +} - collected +fn relay_on_object( + config: &Config, + registry: &mut Registry, + client_state_filter: &mut FilterPolicy, + chain_id: &ChainId, + object: &Object, +) -> bool { + // No filter is enabled, bail fast. + if !channel_filter_enabled(config) && !client_filter_enabled(config) { + return true; } - /// Create a new `SpawnContext` for spawning workers. - fn spawn_context(&mut self, mode: SpawnMode) -> SpawnContext<'_, Chain> { - SpawnContext::new( - self.config.clone(), - self.registry.clone(), - &mut self.client_state_filter, - &mut self.workers, - mode, - ) + // First, apply the channel filter + if let Object::Packet(u) = object { + if !relay_packets_on_channel(config, chain_id, u.src_port_id(), u.src_channel_id()) { + return false; + } } - /// Spawn all the workers necessary for the relayer to connect - /// and relay between all the chains in the configurations. - fn spawn_workers(&mut self, mode: SpawnMode) { - self.spawn_context(mode).spawn_workers(); - } + // Second, apply the client filter + let client_filter_outcome = match object { + Object::Client(client) => client_state_filter.control_client_object(registry, client), + Object::Connection(conn) => client_state_filter.control_conn_object(registry, conn), + Object::Channel(chan) => client_state_filter.control_chan_object(registry, chan), + Object::Packet(u) => client_state_filter.control_packet_object(registry, u), + }; + + match client_filter_outcome { + Ok(Permission::Allow) => true, + Ok(Permission::Deny) => { + warn!( + "client filter denies relaying on object {}", + object.short_name() + ); - /// Perform a health check on all connected chains - fn health_check(&mut self) { - use HealthCheck::*; - - let chains = &self.config.read().expect("poisoned lock").chains; - - for config in chains { - let id = &config.id; - let chain = self.registry.get_or_spawn(id); - - match chain { - Ok(chain) => match chain.health_check() { - Ok(Healthy) => info!("[{}] chain is healthy", id), - Ok(Unhealthy(e)) => warn!("[{}] chain is unhealthy: {}", id, e), - Err(e) => error!("[{}] failed to perform health check: {}", id, e), - }, - Err(e) => { - error!( - "skipping health check for chain {}, reason: failed to spawn chain runtime with error: {}", - config.id, e - ); - } - } + false } - } + Err(e) => { + warn!( + "denying relaying on object {}, caused by: {}", + object.short_name(), + e + ); - fn run_step( - &mut self, - subscriptions: &mut Vec<(Chain, Subscription)>, - ) -> Result { - if let Some((chain, batch)) = try_recv_multiple(subscriptions) { - self.handle_batch(chain.clone(), batch); + false } + } +} - if let Ok(msg) = self.worker_msg_rx.try_recv() { - self.handle_worker_msg(msg); +/// If `enabled`, build an `Object` using the provided `object_ctor` +/// and add the given `event` to the `collected` events for this `object`. +fn collect_event( + collected: &mut CollectedEvents, + event: &IbcEvent, + enabled: bool, + object_ctor: F, +) where + F: FnOnce() -> Option, +{ + if enabled { + if let Some(object) = object_ctor() { + collected + .per_object + .entry(object) + .or_default() + .push(event.clone()); } + } +} - if let Ok(cmd) = self.cmd_rx.try_recv() { - match cmd { - SupervisorCmd::UpdateConfig(update) => { - let effect = self.update_config(*update); +fn collect_events( + config: &Config, + workers: &WorkerMap, + src_chain: &impl ChainHandle, + batch: &EventBatch, +) -> CollectedEvents { + let mut collected = CollectedEvents::new(batch.height, batch.chain_id.clone()); - if let CmdEffect::ConfigChanged = effect { - match self.init_subscriptions() { - Ok(subs) => { - *subscriptions = subs; - } - Err(Error(ErrorDetail::NoChainsAvailable(_), _)) => (), - Err(e) => return Err(e), - } + let mode = config.mode; + + for event in &batch.events { + match event { + IbcEvent::NewBlock(_) => { + collected.new_block = Some(event.clone()); + } + IbcEvent::UpdateClient(ref update) => { + collect_event(&mut collected, event, mode.clients.enabled, || { + // Collect update client events only if the worker exists + if let Ok(object) = Object::for_update_client(update, src_chain) { + workers.contains(&object).then(|| object) + } else { + None } - } - SupervisorCmd::DumpState(reply_to) => { - self.dump_state(reply_to); - } - SupervisorCmd::Stop(reply_to) => { - let _ = reply_to.send(()); - return Ok(StepResult::Break); - } + }); } - } + IbcEvent::OpenInitConnection(..) + | IbcEvent::OpenTryConnection(..) + | IbcEvent::OpenAckConnection(..) => { + collect_event(&mut collected, event, mode.connections.enabled, || { + event + .connection_attributes() + .map(|attr| Object::connection_from_conn_open_events(attr, src_chain).ok()) + .flatten() + }); + } + IbcEvent::OpenInitChannel(..) | IbcEvent::OpenTryChannel(..) => { + collect_event(&mut collected, event, mode.channels.enabled, || { + event + .channel_attributes() + .map(|attr| Object::channel_from_chan_open_events(attr, src_chain).ok()) + .flatten() + }); + } + IbcEvent::OpenAckChannel(ref open_ack) => { + // Create client and packet workers here as channel end must be opened + collect_event(&mut collected, event, mode.clients.enabled, || { + Object::client_from_chan_open_events(open_ack.attributes(), src_chain).ok() + }); - // Process incoming requests from the REST server - self.handle_rest_requests(); + collect_event(&mut collected, event, mode.packets.enabled, || { + Object::packet_from_chan_open_events(open_ack.attributes(), src_chain).ok() + }); - Ok(StepResult::Continue) + // If handshake message relaying is enabled create worker to send the MsgChannelOpenConfirm message + collect_event(&mut collected, event, mode.channels.enabled, || { + Object::channel_from_chan_open_events(open_ack.attributes(), src_chain).ok() + }); + } + IbcEvent::OpenConfirmChannel(ref open_confirm) => { + // Create client worker here as channel end must be opened + collect_event(&mut collected, event, mode.clients.enabled, || { + Object::client_from_chan_open_events(open_confirm.attributes(), src_chain).ok() + }); + + collect_event(&mut collected, event, mode.packets.enabled, || { + Object::packet_from_chan_open_events(open_confirm.attributes(), src_chain).ok() + }); + } + IbcEvent::SendPacket(ref packet) => { + collect_event(&mut collected, event, mode.packets.enabled, || { + Object::for_send_packet(packet, src_chain).ok() + }); + } + IbcEvent::TimeoutPacket(ref packet) => { + collect_event(&mut collected, event, mode.packets.enabled, || { + Object::for_timeout_packet(packet, src_chain).ok() + }); + } + IbcEvent::WriteAcknowledgement(ref packet) => { + collect_event(&mut collected, event, mode.packets.enabled, || { + Object::for_write_ack(packet, src_chain).ok() + }); + } + IbcEvent::CloseInitChannel(ref packet) => { + collect_event(&mut collected, event, mode.packets.enabled, || { + Object::for_close_init_channel(packet, src_chain).ok() + }); + } + _ => (), + } } - /// Run the supervisor event loop. - pub fn run(&mut self) -> Result<(), Error> { - self.health_check(); + collected +} - self.run_without_health_check() - } +/// Create a new `SpawnContext` for spawning workers. +fn spawn_context<'a, Chain: ChainHandle + 'static>( + config: &'a Config, + registry: &'a mut Registry, + client_state_filter: &'a mut FilterPolicy, + workers: &'a mut WorkerMap, + mode: SpawnMode, +) -> SpawnContext<'a, Chain> { + SpawnContext::new(config, registry, client_state_filter, workers, mode) +} - pub fn run_without_health_check(&mut self) -> Result<(), Error> { - self.spawn_workers(SpawnMode::Startup); +/// Perform a health check on all connected chains +fn health_check(config: &Config, registry: &mut Registry) { + use HealthCheck::*; - let mut subscriptions = self.init_subscriptions()?; + let chains = &config.chains; - loop { - let step_res = self.run_step(&mut subscriptions)?; + for config in chains { + let id = &config.id; + let chain = registry.get_or_spawn(id); - if step_res == StepResult::Break { - info!("stopping supervisor"); - return Ok(()); + match chain { + Ok(chain) => match chain.health_check() { + Ok(Healthy) => info!("[{}] chain is healthy", id), + Ok(Unhealthy(e)) => warn!("[{}] chain is unhealthy: {}", id, e), + Err(e) => error!("[{}] failed to perform health check: {}", id, e), + }, + Err(e) => { + error!( + "skipping health check for chain {}, reason: failed to spawn chain runtime with error: {}", + config.id, e + ); } - - std::thread::sleep(Duration::from_millis(50)); } } +} - /// Subscribe to the events emitted by the chains the supervisor is connected to. - fn init_subscriptions(&mut self) -> Result, Error> { - let chains = &self.config.read().expect("poisoned lock").chains; - - let mut subscriptions = Vec::with_capacity(chains.len()); - - for chain_config in chains { - let chain = match self.registry.get_or_spawn(&chain_config.id) { - Ok(chain) => chain, - Err(e) => { - error!( - "failed to spawn chain runtime for {}: {}", - chain_config.id, e - ); +/// Subscribe to the events emitted by the chains the supervisor is connected to. +fn init_subscriptions( + config: &Config, + registry: &mut Registry, +) -> Result, Error> { + let chains = &config.chains; - continue; - } - }; + let mut subscriptions = Vec::with_capacity(chains.len()); - match chain.subscribe() { - Ok(subscription) => subscriptions.push((chain, subscription)), - Err(e) => error!( - "failed to subscribe to events of {}: {}", + for chain_config in chains { + let chain = match registry.get_or_spawn(&chain_config.id) { + Ok(chain) => chain, + Err(e) => { + error!( + "failed to spawn chain runtime for {}: {}", chain_config.id, e - ), + ); + + continue; } - } + }; - // At least one chain runtime should be available, otherwise the supervisor - // cannot do anything and will hang indefinitely. - if self.registry.read().size() == 0 { - return Err(Error::no_chains_available()); + match chain.subscribe() { + Ok(subscription) => subscriptions.push((chain, subscription)), + Err(e) => error!( + "failed to subscribe to events of {}: {}", + chain_config.id, e + ), } - - Ok(subscriptions) } - /// Dump the state of the supervisor into a [`SupervisorState`] value, - /// and send it back through the given channel. - fn dump_state(&self, reply_to: Sender) { - let state = self.state(); - let _ = reply_to.try_send(state); + // At least one chain runtime should be available, otherwise the supervisor + // cannot do anything and will hang indefinitely. + if registry.size() == 0 { + return Err(Error::no_chains_available()); } - /// Returns a representation of the supervisor's internal state - /// as a [`SupervisorState`]. - fn state(&self) -> SupervisorState { - let chains = self.registry.read().chains().map(|c| c.id()).collect_vec(); - SupervisorState::new(chains, self.workers.objects()) - } + Ok(subscriptions) +} - /// Apply the given configuration update. - /// - /// Returns an [`CmdEffect`] which instructs the caller as to - /// whether or not the event subscriptions needs to be reset or not. - fn update_config(&mut self, update: ConfigUpdate) -> CmdEffect { - match update { - ConfigUpdate::Add(config) => self.add_chain(config), - ConfigUpdate::Remove(id) => self.remove_chain(&id), - ConfigUpdate::Update(config) => self.update_chain(config), - } +/// Dump the state of the supervisor into a [`SupervisorState`] value, +/// and send it back through the given channel. +fn dump_state( + registry: &Registry, + workers: &WorkerMap, + reply_to: Sender, +) { + let state = state(registry, workers); + let _ = reply_to.try_send(state); +} + +/// Returns a representation of the supervisor's internal state +/// as a [`SupervisorState`]. +fn state(registry: &Registry, workers: &WorkerMap) -> SupervisorState { + let chains = registry.chains().map(|c| c.id()).collect_vec(); + SupervisorState::new(chains, workers.objects()) +} + +fn handle_rest_requests( + config: &Config, + registry: &Registry, + workers: &WorkerMap, + rest_rx: &rest::Receiver, +) { + if let Some(cmd) = rest::process_incoming_requests(config, rest_rx) { + handle_rest_cmd(registry, workers, cmd); } +} - /// Add the given chain to the configuration and spawn the associated workers. - /// Will not have any effect if the chain is already present in the config. - /// - /// If the addition had any effect, returns [`CmdEffect::ConfigChanged`] as - /// subscriptions need to be reset to take into account the newly added chain. - fn add_chain(&mut self, config: ChainConfig) -> CmdEffect { - let id = config.id.clone(); - - if self.config.read().expect("poisoned lock").has_chain(&id) { - info!(chain.id=%id, "skipping addition of already existing chain"); - return CmdEffect::Nothing; +fn handle_rest_cmd( + registry: &Registry, + workers: &WorkerMap, + m: rest::Command, +) { + match m { + rest::Command::DumpState(reply) => { + let state = state(registry, workers); + reply.send(Ok(state)).unwrap_or_else(|e| { + error!("[rest/supervisor] error replying to a REST request {}", e) + }); } + } +} - info!(chain.id=%id, "adding new chain"); +fn clear_pending_packets(workers: &mut WorkerMap, chain_id: &ChainId) -> Result<(), Error> { + for worker in workers.workers_for_chain(chain_id) { + worker.clear_pending_packets().map_err(Error::worker)?; + } - self.config - .write() - .expect("poisoned lock") - .chains - .push(config); + Ok(()) +} - debug!(chain.id=%id, "spawning chain runtime"); +/// Process a batch of events received from a chain. +fn process_batch( + config: &Config, + registry: &mut Registry, + client_state_filter: &mut FilterPolicy, + workers: &mut WorkerMap, + src_chain: Chain, + batch: &EventBatch, +) -> Result<(), Error> { + assert_eq!(src_chain.id(), batch.chain_id); + + let height = batch.height; + let chain_id = batch.chain_id.clone(); + + let collected = collect_events(config, workers, &src_chain, batch); + + // If there is a NewBlock event, forward this event first to any workers affected by it. + if let Some(IbcEvent::NewBlock(new_block)) = collected.new_block { + for worker in workers.to_notify(&src_chain.id()) { + worker + .send_new_block(height, new_block) + .map_err(Error::worker)? + } + } - if let Err(e) = self.registry.spawn(&id) { - error!( - "failed to add chain {} because of failure to spawn the chain runtime: {}", - id, e + // Forward the IBC events. + for (object, events) in collected.per_object.into_iter() { + if !relay_on_object( + config, + registry, + client_state_filter, + &src_chain.id(), + &object, + ) { + trace!( + "skipping events for '{}'. \ + reason: filtering is enabled and channel does not match any allowed channels", + object.short_name() ); - // Remove the newly added config - self.config - .write() - .expect("poisoned lock") - .chains - .retain(|c| c.id != id); - - return CmdEffect::Nothing; + continue; } - debug!(chain.id=%id, "spawning workers"); - let mut ctx = self.spawn_context(SpawnMode::Reload); - ctx.spawn_workers_for_chain(&id); - - CmdEffect::ConfigChanged - } - - /// Remove the given chain to the configuration and spawn the associated workers. - /// Will not have any effect if the chain was not already present in the config. - /// - /// If the removal had any effect, returns [`CmdEffect::ConfigChanged`] as - /// subscriptions need to be reset to take into account the newly added chain. - fn remove_chain(&mut self, id: &ChainId) -> CmdEffect { - if !self.config.read().expect("poisoned lock").has_chain(id) { - info!(chain.id=%id, "skipping removal of non-existing chain"); - return CmdEffect::Nothing; + if events.is_empty() { + continue; } - info!(chain.id=%id, "removing existing chain"); - - self.config - .write() - .expect("poisoned lock") - .chains - .retain(|c| &c.id != id); + let src = registry + .get_or_spawn(object.src_chain_id()) + .map_err(Error::spawn)?; - debug!(chain.id=%id, "shutting down workers"); - let mut ctx = self.spawn_context(SpawnMode::Reload); - ctx.shutdown_workers_for_chain(id); + let dst = registry + .get_or_spawn(object.dst_chain_id()) + .map_err(Error::spawn)?; - debug!(chain.id=%id, "shutting down chain runtime"); - self.registry.shutdown(id); + let worker = { workers.get_or_spawn(object, src, dst, config) }; - CmdEffect::ConfigChanged + worker + .send_events(height, events, chain_id.clone()) + .map_err(Error::worker)? } - /// Update the given chain configuration, by removing it with - /// [`Supervisor::remove_chain`] and adding the updated - /// chain config with [`Supervisor::remove_chain`]. - /// - /// If the update had any effect, returns [`CmdEffect::ConfigChanged`] as - /// subscriptions need to be reset to take into account the newly added chain. - fn update_chain(&mut self, config: ChainConfig) -> CmdEffect { - info!(chain.id=%config.id, "updating existing chain"); - - let removed = self.remove_chain(&config.id); - let added = self.add_chain(config); - removed.or(added) - } + Ok(()) +} - /// Process the given [`WorkerMsg`] sent by a worker. - fn handle_worker_msg(&mut self, msg: WorkerMsg) { - match msg { - WorkerMsg::Stopped(id, object) => { - self.workers.remove_stopped(id, object); - } +/// Process the given batch if it does not contain any errors, +/// output the errors on the console otherwise. +fn handle_batch( + config: &Config, + registry: &mut Registry, + client_state_filter: &mut FilterPolicy, + workers: &mut WorkerMap, + chain: Chain, + batch: ArcBatch, +) { + let chain_id = chain.id(); + + match batch.deref() { + Ok(batch) => { + let _ = process_batch(config, registry, client_state_filter, workers, chain, batch) + .map_err(|e| error!("[{}] error during batch processing: {}", chain_id, e)); } - } - - fn handle_rest_requests(&self) { - if let Some(rest_rx) = &self.rest_rx { - let config = self.config.read().expect("poisoned lock"); - if let Some(cmd) = rest::process_incoming_requests(&config, rest_rx) { - self.handle_rest_cmd(cmd); - } + Err(EventError(EventErrorDetail::SubscriptionCancelled(_), _)) => { + warn!(chain.id = %chain_id, "event subscription was cancelled, clearing pending packets"); + + let _ = clear_pending_packets(workers, &chain_id).map_err(|e| { + error!( + "[{}] error during clearing pending packets: {}", + chain_id, e + ) + }); + } + Err(e) => { + error!("[{}] error in receiving event batch: {}", chain_id, e) } } +} - fn handle_rest_cmd(&self, m: rest::Command) { - match m { - rest::Command::DumpState(reply) => { - let state = self.state(); - reply.send(Ok(state)).unwrap_or_else(|e| { - error!("[rest/supervisor] error replying to a REST request {}", e) - }); - } - } +/// Remove the given chain to the configuration and spawn the associated workers. +/// Will not have any effect if the chain was not already present in the config. +/// +/// If the removal had any effect, returns [`CmdEffect::ConfigChanged`] as +/// subscriptions need to be reset to take into account the newly added chain. +fn remove_chain( + config: &mut Config, + registry: &mut Registry, + workers: &mut WorkerMap, + client_state_filter: &mut FilterPolicy, + id: &ChainId, +) -> CmdEffect { + if !config.has_chain(id) { + info!(chain.id=%id, "skipping removal of non-existing chain"); + return CmdEffect::Nothing; } - /// Process the given batch if it does not contain any errors, - /// output the errors on the console otherwise. - fn handle_batch(&mut self, chain: Chain, batch: ArcBatch) { - let chain_id = chain.id(); + info!(chain.id=%id, "removing existing chain"); - match batch.deref() { - Ok(batch) => { - let _ = self - .process_batch(chain, batch) - .map_err(|e| error!("[{}] error during batch processing: {}", chain_id, e)); - } - Err(EventError(EventErrorDetail::SubscriptionCancelled(_), _)) => { - warn!(chain.id = %chain_id, "event subscription was cancelled, clearing pending packets"); - - let _ = self.clear_pending_packets(&chain_id).map_err(|e| { - error!( - "[{}] error during clearing pending packets: {}", - chain_id, e - ) - }); - } - Err(e) => { - error!("[{}] error in receiving event batch: {}", chain_id, e) - } - } - } + config.chains.retain(|c| &c.id != id); - /// Process a batch of events received from a chain. - fn process_batch(&mut self, src_chain: Chain, batch: &EventBatch) -> Result<(), Error> { - assert_eq!(src_chain.id(), batch.chain_id); + debug!(chain.id=%id, "shutting down workers"); - let height = batch.height; - let chain_id = batch.chain_id.clone(); + let mut ctx = spawn_context( + config, + registry, + client_state_filter, + workers, + SpawnMode::Reload, + ); - let collected = self.collect_events(&src_chain, batch); + ctx.shutdown_workers_for_chain(id); - // If there is a NewBlock event, forward this event first to any workers affected by it. - if let Some(IbcEvent::NewBlock(new_block)) = collected.new_block { - for worker in self.workers.to_notify(&src_chain.id()) { - worker - .send_new_block(height, new_block) - .map_err(Error::worker)? - } - } + debug!(chain.id=%id, "shutting down chain runtime"); + registry.shutdown(id); - // Forward the IBC events. - for (object, events) in collected.per_object.into_iter() { - if !self.relay_on_object(&src_chain.id(), &object) { - trace!( - "skipping events for '{}'. \ - reason: filtering is enabled and channel does not match any allowed channels", - object.short_name() - ); + CmdEffect::ConfigChanged +} - continue; - } +/// Add the given chain to the configuration and spawn the associated workers. +/// Will not have any effect if the chain is already present in the config. +/// +/// If the addition had any effect, returns [`CmdEffect::ConfigChanged`] as +/// subscriptions need to be reset to take into account the newly added chain. +fn add_chain( + config: &mut Config, + registry: &mut Registry, + workers: &mut WorkerMap, + client_state_filter: &mut FilterPolicy, + chain_config: ChainConfig, +) -> CmdEffect { + let id = chain_config.id.clone(); + + if config.has_chain(&id) { + info!(chain.id=%id, "skipping addition of already existing chain"); + return CmdEffect::Nothing; + } - if events.is_empty() { - continue; - } + info!(chain.id=%id, "adding new chain"); - let src = self - .registry - .get_or_spawn(object.src_chain_id()) - .map_err(Error::spawn)?; + config.chains.push(chain_config); - let dst = self - .registry - .get_or_spawn(object.dst_chain_id()) - .map_err(Error::spawn)?; + debug!(chain.id=%id, "spawning chain runtime"); - let worker = { - let config = self.config.read().expect("poisoned lock"); - self.workers.get_or_spawn(object, src, dst, &config) - }; + if let Err(e) = registry.spawn(&id) { + error!( + "failed to add chain {} because of failure to spawn the chain runtime: {}", + id, e + ); - worker - .send_events(height, events, chain_id.clone()) - .map_err(Error::worker)? - } + // Remove the newly added config + config.chains.retain(|c| c.id != id); - Ok(()) + return CmdEffect::Nothing; } - fn clear_pending_packets(&mut self, chain_id: &ChainId) -> Result<(), Error> { - for worker in self.workers.workers_for_chain(chain_id) { - worker.clear_pending_packets().map_err(Error::worker)?; - } + debug!(chain.id=%id, "spawning workers"); + + let mut ctx = spawn_context( + config, + registry, + client_state_filter, + workers, + SpawnMode::Reload, + ); + + ctx.spawn_workers_for_chain(&id); + + CmdEffect::ConfigChanged +} - Ok(()) +/// Update the given chain configuration, by removing it with +/// [`Supervisor::remove_chain`] and adding the updated +/// chain config with [`Supervisor::remove_chain`]. +/// +/// If the update had any effect, returns [`CmdEffect::ConfigChanged`] as +/// subscriptions need to be reset to take into account the newly added chain. +fn update_chain( + config: &mut Config, + registry: &mut Registry, + workers: &mut WorkerMap, + client_state_filter: &mut FilterPolicy, + chain_config: ChainConfig, +) -> CmdEffect { + info!(chain.id=%chain_config.id, "updating existing chain"); + + let removed = remove_chain( + config, + registry, + workers, + client_state_filter, + &chain_config.id, + ); + + let added = add_chain(config, registry, workers, client_state_filter, chain_config); + + removed.or(added) +} + +/// Apply the given configuration update. +/// +/// Returns an [`CmdEffect`] which instructs the caller as to +/// whether or not the event subscriptions needs to be reset or not. +fn update_config( + config: &mut Config, + registry: &mut Registry, + workers: &mut WorkerMap, + client_state_filter: &mut FilterPolicy, + update: ConfigUpdate, +) -> CmdEffect { + match update { + ConfigUpdate::Add(chain_config) => { + add_chain(config, registry, workers, client_state_filter, chain_config) + } + ConfigUpdate::Remove(id) => { + remove_chain(config, registry, workers, client_state_filter, &id) + } + ConfigUpdate::Update(chain_config) => { + update_chain(config, registry, workers, client_state_filter, chain_config) + } } } diff --git a/relayer/src/supervisor/cmd.rs b/relayer/src/supervisor/cmd.rs index 408230763d..03c83546a4 100644 --- a/relayer/src/supervisor/cmd.rs +++ b/relayer/src/supervisor/cmd.rs @@ -16,7 +16,6 @@ pub enum ConfigUpdate { pub enum SupervisorCmd { UpdateConfig(Box), DumpState(Sender), - Stop(Sender<()>), } #[derive(Copy, Clone, Debug, PartialEq, Eq)] diff --git a/relayer/src/supervisor/spawn.rs b/relayer/src/supervisor/spawn.rs index 7936e3d5f8..5e8e95a6f2 100644 --- a/relayer/src/supervisor/spawn.rs +++ b/relayer/src/supervisor/spawn.rs @@ -23,13 +23,13 @@ use crate::{ }, config::Config, object::{Channel, Client, Connection, Object, Packet}, - registry::SharedRegistry, + registry::Registry, supervisor::client_state_filter::{FilterPolicy, Permission}, supervisor::error::Error as SupervisorError, worker::WorkerMap, }; -use super::{Error, RwArc}; +use super::Error; use crate::chain::counterparty::{unreceived_acknowledgements, unreceived_packets}; #[derive(Copy, Clone, Debug, PartialEq, Eq)] @@ -40,8 +40,8 @@ pub enum SpawnMode { /// A context for spawning workers within the [`crate::supervisor::Supervisor`]. pub struct SpawnContext<'a, Chain: ChainHandle> { - config: RwArc, - registry: SharedRegistry, + config: &'a Config, + registry: &'a mut Registry, workers: &'a mut WorkerMap, client_state_filter: &'a mut FilterPolicy, mode: SpawnMode, @@ -49,8 +49,8 @@ pub struct SpawnContext<'a, Chain: ChainHandle> { impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { pub fn new( - config: RwArc, - registry: SharedRegistry, + config: &'a Config, + registry: &'a mut Registry, client_state_filter: &'a mut FilterPolicy, workers: &'a mut WorkerMap, mode: SpawnMode, @@ -66,19 +66,12 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { fn client_filter_enabled(&self) -> bool { // Currently just a wrapper over the global filter. - self.config - .read() - .expect("poisoned lock") - .mode - .packets - .filter + self.config.mode.packets.filter } pub fn spawn_workers(&mut self) { let chain_ids = self .config - .read() - .expect("poisoned lock") .chains .iter() .map(|c| &c.id) @@ -169,8 +162,6 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { let chain_ids = self .config - .read() - .expect("poisoned lock") .chains .iter() .map(|c| &c.id) @@ -209,11 +200,8 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { } let counterparty_chain_id = client.client_state.chain_id(); - let has_counterparty = self - .config - .read() - .expect("poisoned lock") - .has_chain(&counterparty_chain_id); + + let has_counterparty = self.config.has_chain(&counterparty_chain_id); if !has_counterparty { debug!( @@ -274,7 +262,7 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { // Apply the client state filter if self.client_filter_enabled() { match self.client_state_filter.control_connection_end_and_client( - &mut self.registry.write(), + self.registry, &chain_id, &client.client_state, &connection_end, @@ -398,13 +386,7 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { client: IdentifiedAnyClientState, connection: IdentifiedConnectionEnd, ) -> Result<(), Error> { - let config_conn_enabled = self - .config - .read() - .expect("poisoned lock") - .mode - .connections - .enabled; + let config_conn_enabled = self.config.mode.connections.enabled; let counterparty_chain = self .registry @@ -442,12 +424,7 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { }); self.workers - .spawn( - chain, - counterparty_chain, - &connection_object, - &self.config.read().expect("poisoned lock"), - ) + .spawn(chain, counterparty_chain, &connection_object, self.config) .then(|| { debug!( "spawning Connection worker: {}", @@ -468,7 +445,7 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { connection: &IdentifiedConnectionEnd, channel: IdentifiedChannelEnd, ) -> Result<(), Error> { - let mode = &self.config.read().expect("poisoned lock").mode; + let mode = &self.config.mode; let counterparty_chain = self .registry @@ -504,13 +481,12 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { dst_chain_id: chain.id(), src_chain_id: client.client_state.chain_id(), }); - self.workers .spawn( counterparty_chain.clone(), chain.clone(), &client_object, - &self.config.read().expect("poisoned lock"), + self.config, ) .then(|| debug!("spawned Client worker: {}", client_object.short_name())); } @@ -547,7 +523,7 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { chain.clone(), counterparty_chain.clone(), &path_object, - &self.config.read().expect("poisoned lock"), + self.config, ) .then(|| debug!("spawned Packet worker: {}", path_object.short_name())); } @@ -565,12 +541,7 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { }); self.workers - .spawn( - chain, - counterparty_chain, - &channel_object, - &self.config.read().expect("poisoned lock"), - ) + .spawn(chain, counterparty_chain, &channel_object, self.config) .then(|| debug!("spawned Channel worker: {}", channel_object.short_name())); } @@ -582,8 +553,8 @@ impl<'a, Chain: ChainHandle + 'static> SpawnContext<'a, Chain> { chain: &impl ChainHandle, channel: &IdentifiedChannelEnd, ) -> bool { - let config = self.config.read().expect("poisoned lock"); - config.packets_on_channel_allowed(&chain.id(), &channel.port_id, &channel.channel_id) + self.config + .packets_on_channel_allowed(&chain.id(), &channel.port_id, &channel.channel_id) } pub fn shutdown_workers_for_chain(&mut self, chain_id: &ChainId) { diff --git a/relayer/src/util.rs b/relayer/src/util.rs index 3b8c1ab74a..00c47d98c3 100644 --- a/relayer/src/util.rs +++ b/relayer/src/util.rs @@ -7,6 +7,8 @@ pub use recv_multiple::try_recv_multiple; pub mod bigint; pub mod diff; pub mod iter; +pub mod lock; pub mod queue; pub mod retry; pub mod stream; +pub mod task; diff --git a/relayer/src/util/lock.rs b/relayer/src/util/lock.rs new file mode 100644 index 0000000000..5fa139b9fd --- /dev/null +++ b/relayer/src/util/lock.rs @@ -0,0 +1,36 @@ +use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +pub type RwArc = Arc>; + +/** + Utility methods for acquiring an `Arc>` lock without having + to assert the success acquire every time. + + The current code base panics if the lock acquire fails due to + [poisoned lock](https://doc.rust-lang.org/std/sync/struct.PoisonError.html), + as the error is non-recoverable anyway. + + Using the `LockExt` methods, we can avoid having to write `.unwrap()` + or `.expect("poisoned lock")` everywhere in the code base. +*/ +pub trait LockExt { + fn new_lock(val: T) -> Self; + + fn acquire_read(&self) -> RwLockReadGuard<'_, T>; + + fn acquire_write(&self) -> RwLockWriteGuard<'_, T>; +} + +impl LockExt for Arc> { + fn new_lock(val: T) -> Self { + Arc::new(RwLock::new(val)) + } + + fn acquire_read(&self) -> RwLockReadGuard<'_, T> { + self.read().expect("poisoned lock") + } + + fn acquire_write(&self) -> RwLockWriteGuard<'_, T> { + self.write().expect("poisoned lock") + } +} diff --git a/relayer/src/util/queue.rs b/relayer/src/util/queue.rs index 311eb1f83b..2fe5ab144a 100644 --- a/relayer/src/util/queue.rs +++ b/relayer/src/util/queue.rs @@ -1,64 +1,60 @@ use alloc::collections::VecDeque; -use core::cell::RefCell; +use std::sync::{Arc, RwLock}; + +use crate::util::lock::LockExt; // A lightweight wrapper type to RefCell> so that // we can safely mutate it with regular reference instead of // mutable reference. We only expose subset of VecDeque methods // that does not return any inner reference, so that the RefCell // can never panic caused by simultaneous borrow and borrow_mut. -pub struct Queue(RefCell>); +pub struct Queue(Arc>>); impl Queue { pub fn new() -> Self { - Queue(RefCell::new(VecDeque::new())) + Queue(Arc::new(RwLock::new(VecDeque::new()))) } pub fn pop_front(&self) -> Option { - self.0.borrow_mut().pop_front() + self.0.acquire_write().pop_front() } pub fn pop_back(&self) -> Option { - self.0.borrow_mut().pop_back() + self.0.acquire_write().pop_back() } pub fn push_back(&self, val: T) { - self.0.borrow_mut().push_front(val) + self.0.acquire_write().push_front(val) } pub fn push_front(&self, val: T) { - self.0.borrow_mut().push_front(val) + self.0.acquire_write().push_front(val) } pub fn len(&self) -> usize { - self.0.borrow().len() + self.0.acquire_read().len() } pub fn is_empty(&self) -> bool { - self.0.borrow().is_empty() + self.0.acquire_read().is_empty() } pub fn into_vec(self) -> VecDeque { - self.0.into_inner() + self.0.acquire_write().drain(..).collect() } pub fn replace(&self, queue: VecDeque) { - self.0.replace(queue); + *self.0.acquire_write() = queue; } pub fn take(&self) -> VecDeque { - self.0.take() + self.0.acquire_write().drain(..).collect() } } impl Queue { pub fn clone_vec(&self) -> VecDeque { - self.0.borrow().clone() - } -} - -impl Clone for Queue { - fn clone(&self) -> Queue { - Queue(RefCell::new(self.0.borrow().clone())) + self.0.acquire_read().clone() } } diff --git a/relayer/src/util/task.rs b/relayer/src/util/task.rs new file mode 100644 index 0000000000..ecb21d206a --- /dev/null +++ b/relayer/src/util/task.rs @@ -0,0 +1,195 @@ +use core::fmt::Display; +use core::mem; +use core::time::Duration; +use crossbeam_channel::{bounded, Sender}; +use std::sync::{Arc, RwLock}; +use std::thread; +use tracing::{debug, error, info, warn}; + +use crate::util::lock::LockExt; + +/** + A task handle holds the endpoints for stopping or waiting for a + background task to terminate. + + A holder of `TaskHandle` can explicitly stop the background task by + calling [`shutdown`](TaskHandle::shutdown) or + [`shutdown_and_wait`](TaskHandle::shutdown_and_wait). + + Otherwise, when the `TaskHandle` is dropped, it will stop the background + task and wait for the background task to terminate before returning. +*/ +pub struct TaskHandle { + shutdown_sender: Sender<()>, + stopped: Arc>, + join_handle: DropJoinHandle, +} + +/** + A wrapper to [`std::thread::JoinHandle`] so that the handle is joined + when it is dropped. +*/ +struct DropJoinHandle(Option>); + +/** + A wrapper around the error type returned by a background task step + function to indicate whether the background task should be terminated + because of the error. +*/ +pub enum TaskError { + /** + Inform the background task runner for a termination without error. + */ + Abort, + + /** + Inform the background task runner that an ignorable error has occured, + and the background task runner should log the error and then continue + execution. + */ + Ignore(E), + + /** + Inform the background task runner that a fatal error has occured, + and the background task runner should log the error and then abort + execution. + */ + Fatal(E), +} + +/** + Spawn a long-running background task with the given step runner. + + The step runner is a `FnMut` closure that is called repeatedly and + returns a `Result<(), TaskError>`. If the step is executed successfuly, + the step runner should return `Ok(())` so that it will be called again. + + Otherwise if errors occurred or of the task needs to be aborted, + the step runner should return a [`TaskError`] that instructs the + task runner of whether the background task should be aborted. + + The function is also given a task name string, which is used for logging + information about the execution of the task. An optional [`Duration`] + argument is also given for the task runner to sleep for the given + duration before calling the step runner again. + + The function returns a [`TaskHandle`] that can be used to shutdown the + background task. If the [`TaskHandle`] is dropped or if explicit shutdown + instruction is sent, the task runner will stop calling the step runner + and abort the background task. + + If the step runner is receiving commands from other + [channels](crossbeam_channel::Receiver), it should use the + [`try_recv`](crossbeam_channel::Receiver::try_recv) function + so that the step runner do not get stuck indefinitely even + when shutdown instruction has been sent through the + [`TaskHandle`]. +*/ +pub fn spawn_background_task( + task_name: String, + interval_pause: Option, + mut step_runner: impl FnMut() -> Result<(), TaskError> + Send + Sync + 'static, +) -> TaskHandle { + let stopped = Arc::new(RwLock::new(false)); + let write_stopped = stopped.clone(); + + let (shutdown_sender, receiver) = bounded(1); + + let join_handle = thread::spawn(move || { + loop { + match receiver.try_recv() { + Ok(()) => { + break; + } + _ => match step_runner() { + Ok(()) => {} + Err(TaskError::Abort) => { + info!("task is aborting: {}", task_name); + break; + } + Err(TaskError::Ignore(e)) => { + warn!("task {} encountered ignorable error: {}", task_name, e); + } + Err(TaskError::Fatal(e)) => { + error!( + "aborting task {} after encountering fatal error: {}", + task_name, e + ); + break; + } + }, + } + if let Some(interval) = interval_pause { + thread::sleep(interval); + } + } + + *write_stopped.acquire_write() = true; + debug!("task {} has terminated", task_name); + }); + + TaskHandle { + shutdown_sender, + stopped, + join_handle: DropJoinHandle(Some(join_handle)), + } +} + +impl TaskHandle { + /** + Wait for the background task to terminate. + + Note that because the background tasks are meant to run forever, + this would likely never return unless errors occurred or if + the step runner returns [`TaskError::Abort`] to abort prematurely. + */ + pub fn join(mut self) { + if let Some(handle) = mem::take(&mut self.join_handle.0) { + let _ = handle.join(); + } + } + + /** + Send the shutdown signal to the background task without waiting + for it to terminate. + + Note that the waiting will still happen when the [`TaskHandle`] is + dropped. + + This can be used to shutdown multiple tasks in parallel, and then + wait for them to all terminate concurrently. + */ + pub fn shutdown(&self) { + let _ = self.shutdown_sender.send(()); + } + + /** + Send the shutdown signal and wait for the task to terminate. + + This is done implicitly by the [`TaskHandle`] when it is dropped. + */ + pub fn shutdown_and_wait(self) { + let _ = self.shutdown_sender.send(()); + } + + /** + Check whether a background task has been stopped prematurely. + */ + pub fn is_stopped(&self) -> bool { + *self.stopped.acquire_read() + } +} + +impl Drop for DropJoinHandle { + fn drop(&mut self) { + if let Some(handle) = mem::take(&mut self.0) { + let _ = handle.join(); + } + } +} + +impl Drop for TaskHandle { + fn drop(&mut self) { + let _ = self.shutdown_sender.send(()); + } +} diff --git a/relayer/src/worker.rs b/relayer/src/worker.rs index fd3f80980c..78c2930e9b 100644 --- a/relayer/src/worker.rs +++ b/relayer/src/worker.rs @@ -1,9 +1,9 @@ +use alloc::sync::Arc; use core::fmt; - -use crossbeam_channel::Sender; use serde::{Deserialize, Serialize}; -use tracing::{debug, error, info}; +use crate::foreign_client::ForeignClient; +use crate::link::{Link, LinkParameters}; use crate::{ chain::handle::{ChainHandle, ChainHandlePair}, config::Config, @@ -13,7 +13,7 @@ use crate::{ pub mod retry_strategy; mod error; -pub use error::WorkerError; +pub use error::{RunError, WorkerError}; mod handle; pub use handle::WorkerHandle; @@ -24,17 +24,13 @@ pub use cmd::WorkerCmd; mod map; pub use map::WorkerMap; -mod client; -pub use client::ClientWorker; +pub mod client; -mod connection; -pub use connection::ConnectionWorker; +pub mod connection; -mod channel; -pub use channel::ChannelWorker; +pub mod channel; -mod packet; -pub use packet::PacketWorker; +pub mod packet; #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] #[serde(transparent)] @@ -56,113 +52,64 @@ impl fmt::Display for WorkerId { } } -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum WorkerMsg { - Stopped(WorkerId, Object), -} - -/// A worker processes batches of events associated with a given [`Object`]. -pub enum Worker { - Client(WorkerId, ClientWorker), - Connection(WorkerId, ConnectionWorker), - Channel(WorkerId, ChannelWorker), - Packet(WorkerId, PacketWorker), -} - -impl fmt::Display - for Worker -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "[{} <-> {}]", self.chains().a.id(), self.chains().b.id(),) - } -} - -impl Worker { - /// Spawn a worker which relays events pertaining to an [`Object`] between two `chains`. - pub fn spawn( - chains: ChainHandlePair, - id: WorkerId, - object: Object, - msg_tx: Sender, - config: &Config, - ) -> WorkerHandle { - let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded(); - - debug!("spawning worker for object {}", object.short_name(),); - - let worker = match &object { - Object::Client(client) => Self::Client( - id, - ClientWorker::new(client.clone(), chains, cmd_rx, config.mode.clients), - ), - Object::Connection(connection) => Self::Connection( - id, - ConnectionWorker::new(connection.clone(), chains, cmd_rx), - ), - Object::Channel(channel) => { - Self::Channel(id, ChannelWorker::new(channel.clone(), chains, cmd_rx)) +pub fn spawn_worker_tasks( + chains: ChainHandlePair, + id: WorkerId, + object: Object, + config: &Config, +) -> WorkerHandle { + let mut task_handles = Vec::new(); + let (cmd_tx, cmd_rx) = crossbeam_channel::unbounded(); + + match &object { + Object::Client(client) => { + let client = ForeignClient::restore(client.dst_client_id.clone(), chains.b, chains.a); + + let refresh_task = client::spawn_refresh_client(client.clone()); + task_handles.push(refresh_task); + + let misbehavior_task = client::detect_misbehavior_task(cmd_rx, client); + if let Some(task) = misbehavior_task { + task_handles.push(task); } - Object::Packet(path) => Self::Packet( - id, - PacketWorker::new(path.clone(), chains, cmd_rx, config.mode.packets), - ), - }; - - let thread_handle = std::thread::spawn(move || worker.run(msg_tx)); - WorkerHandle::new(id, object, cmd_tx, thread_handle) - } - - /// Run the worker event loop. - fn run(self, msg_tx: Sender) { - let id = self.id(); - let object = self.object(); - let name = format!("{}#{}", object.short_name(), id); - - let result = match self { - Self::Client(_, w) => w.run(), - Self::Connection(_, w) => w.run(), - Self::Channel(_, w) => w.run(), - Self::Packet(_, w) => w.run(), - }; - - if let Err(e) = result { - error!("[{}] worker aborted with error: {}", name, e); } - - if let Err(e) = msg_tx.send(WorkerMsg::Stopped(id, object)) { - error!( - "[{}] failed to notify supervisor that worker stopped: {}", - name, e - ); + Object::Connection(connection) => { + let connection_task = + connection::spawn_connection_worker(connection.clone(), chains, cmd_rx); + task_handles.push(connection_task); } - - info!("[{}] worker stopped", name); - } - - fn id(&self) -> WorkerId { - match self { - Self::Client(id, _) => *id, - Self::Connection(id, _) => *id, - Self::Channel(id, _) => *id, - Self::Packet(id, _) => *id, + Object::Channel(channel) => { + let channel_task = channel::spawn_channel_worker(channel.clone(), chains, cmd_rx); + task_handles.push(channel_task); } - } + Object::Packet(path) => { + let packets_config = config.mode.packets; + let link = Link::new_from_opts( + chains.a.clone(), + chains.b, + LinkParameters { + src_port_id: path.src_port_id.clone(), + src_channel_id: path.src_channel_id.clone(), + }, + packets_config.tx_confirmation, + ); - fn chains(&self) -> &ChainHandlePair { - match self { - Self::Client(_, w) => w.chains(), - Self::Connection(_, w) => w.chains(), - Self::Channel(_, w) => w.chains(), - Self::Packet(_, w) => w.chains(), + if let Ok(link) = link { + let link = Arc::new(link); + let packet_task = packet::spawn_packet_cmd_worker( + cmd_rx, + link.clone(), + packets_config.clear_on_start, + packets_config.clear_interval, + path.clone(), + ); + task_handles.push(packet_task); + + let link_task = packet::spawn_link_worker(path.clone(), link); + task_handles.push(link_task); + } } } - fn object(&self) -> Object { - match self { - Worker::Client(_, w) => w.object().clone().into(), - Worker::Connection(_, w) => w.object().clone().into(), - Worker::Channel(_, w) => w.object().clone().into(), - Worker::Packet(_, w) => w.object().clone().into(), - } - } + WorkerHandle::new(id, object, cmd_tx, task_handles) } diff --git a/relayer/src/worker/channel.rs b/relayer/src/worker/channel.rs index 90e6689b01..478a3e0d8d 100644 --- a/relayer/src/worker/channel.rs +++ b/relayer/src/worker/channel.rs @@ -1,10 +1,9 @@ use core::time::Duration; -use std::thread; - use crossbeam_channel::Receiver; -use tracing::{debug, info, warn}; +use tracing::{debug, warn}; use crate::channel::Channel as RelayChannel; +use crate::util::task::{spawn_background_task, TaskError, TaskHandle}; use crate::{ chain::handle::{ChainHandle, ChainHandlePair}, object::Channel, @@ -15,56 +14,36 @@ use crate::{ use super::error::RunError; use super::WorkerCmd; -pub struct ChannelWorker { +pub fn spawn_channel_worker( channel: Channel, chains: ChainHandlePair, cmd_rx: Receiver, -} - -impl ChannelWorker { - pub fn new( - channel: Channel, - chains: ChainHandlePair, - cmd_rx: Receiver, - ) -> Self { - Self { - channel, - chains, - cmd_rx, - } - } - - /// Run the event loop for events associated with a [`Channel`]. - pub(crate) fn run(self) -> Result<(), RunError> { - let a_chain = self.chains.a.clone(); - let b_chain = self.chains.b.clone(); - - // Flag that indicates if the worker should actively resume handshake. - // Set on start or when event based handshake fails. - let mut resume_handshake = true; - - loop { - thread::sleep(Duration::from_millis(200)); - - if let Ok(cmd) = self.cmd_rx.try_recv() { +) -> TaskHandle { + let mut resume_handshake = true; + + spawn_background_task( + "channel_worker".to_string(), + Some(Duration::from_millis(200)), + move || { + if let Ok(cmd) = cmd_rx.try_recv() { let result = match cmd { WorkerCmd::IbcEvents { batch } => { // there can be up to two event for this channel, e.g. init and try. // process the last event, the one with highest "rank". let last_event = batch.events.last(); debug!( - channel = %self.channel.short_name(), + channel = %channel.short_name(), "channel worker starts processing {:#?}", last_event ); match last_event { Some(event) => { let mut handshake_channel = RelayChannel::restore_from_event( - a_chain.clone(), - b_chain.clone(), + chains.a.clone(), + chains.b.clone(), event.clone(), ) - .map_err(RunError::channel)?; + .map_err(|e| TaskError::Fatal(RunError::channel(e)))?; retry_with_index( retry_strategy::worker_default_strategy(), @@ -79,39 +58,36 @@ impl ChannelWorker { new_block: _, } => { if !resume_handshake { - continue; + return Ok(()); } debug!( - channel = %self.channel.short_name(), + channel = %channel.short_name(), "Channel worker starts processing block event at {:#?}", current_height ); - let height = current_height.decrement().map_err(RunError::ics02)?; + let height = current_height + .decrement() + .map_err(|e| TaskError::Fatal(RunError::ics02(e)))?; let (mut handshake_channel, state) = RelayChannel::restore_from_state( - a_chain.clone(), - b_chain.clone(), - self.channel.clone(), + chains.a.clone(), + chains.b.clone(), + channel.clone(), height, ) - .map_err(RunError::channel)?; + .map_err(|e| TaskError::Fatal(RunError::channel(e)))?; retry_with_index(retry_strategy::worker_default_strategy(), |index| { handshake_channel.step_state(state, index) }) } - WorkerCmd::Shutdown => { - info!(channel = %self.channel.short_name(), "shutting down Channel worker"); - return Ok(()); - } - WorkerCmd::ClearPendingPackets => Ok(()), // nothing to do }; if let Err(retries) = result { - warn!(channel = %self.channel.short_name(), "Channel worker failed after {} retries", retries); + warn!(channel = %channel.short_name(), "Channel worker failed after {} retries", retries); // Resume handshake on next iteration. resume_handshake = true; @@ -119,16 +95,8 @@ impl ChannelWorker { resume_handshake = false; } } - } - } - - /// Get a reference to the uni chan path worker's chains. - pub fn chains(&self) -> &ChainHandlePair { - &self.chains - } - /// Get a reference to the client worker's object. - pub fn object(&self) -> &Channel { - &self.channel - } + Ok(()) + }, + ) } diff --git a/relayer/src/worker/client.rs b/relayer/src/worker/client.rs index 1a5b9799aa..55458ec474 100644 --- a/relayer/src/worker/client.rs +++ b/relayer/src/worker/client.rs @@ -1,178 +1,100 @@ use core::time::Duration; -use std::{thread, time::Instant}; -use crossbeam_channel::Receiver; -use tracing::{debug, info, trace, warn}; +use crossbeam_channel::{Receiver, TryRecvError}; +use tracing::{debug, trace}; -use ibc::{core::ics02_client::events::UpdateClient, events::IbcEvent}; +use ibc::events::IbcEvent; +use crate::util::task::{spawn_background_task, TaskError, TaskHandle}; use crate::{ - chain::handle::{ChainHandle, ChainHandlePair}, - config::Clients as ClientsConfig, - foreign_client::{ForeignClient, ForeignClientErrorDetail, MisbehaviourResults}, - object::Client, + chain::handle::ChainHandle, + foreign_client::{ + ForeignClient, ForeignClientError, ForeignClientErrorDetail, MisbehaviourResults, + }, telemetry, }; -use super::error::RunError; use super::WorkerCmd; -pub struct ClientWorker { - client: Client, - chains: ChainHandlePair, - cmd_rx: Receiver, - clients_cfg: ClientsConfig, -} - -impl ClientWorker { - pub fn new( - client: Client, - chains: ChainHandlePair, - cmd_rx: Receiver, - clients_cfg: ClientsConfig, - ) -> Self { - Self { - client, - chains, - cmd_rx, - clients_cfg, - } - } - - /// Run the event loop for events associated with a [`Client`]. - pub fn run(self) -> Result<(), RunError> { - let mut client = ForeignClient::restore( - self.client.dst_client_id.clone(), - self.chains.b.clone(), - self.chains.a.clone(), - ); - - info!( - "[{}] running client worker with misbehaviour={} and refresh={}", - client, self.clients_cfg.misbehaviour, self.clients_cfg.refresh - ); - - // initial check for evidence of misbehaviour for all updates - let skip_misbehaviour = - !self.clients_cfg.misbehaviour || self.detect_misbehaviour(&client, None); - - // remember the time of the last refresh so we backoff - let mut last_refresh = Instant::now() - Duration::from_secs(61); - - loop { - thread::sleep(Duration::from_millis(600)); - - // Clients typically need refresh every 2/3 of their - // trusting period (which can e.g., two weeks). - // Backoff refresh checking to attempt it every minute. - if self.clients_cfg.refresh && last_refresh.elapsed() > Duration::from_secs(60) { - // Run client refresh, exit only if expired or frozen - match client.refresh() { - Ok(Some(_)) => { - telemetry!( - ibc_client_updates, - &self.client.dst_chain_id, - &self.client.dst_client_id, - 1 - ); - } - Err(e) => { - if let ForeignClientErrorDetail::ExpiredOrFrozen(_) = e.detail() { - warn!("failed to refresh client '{}': {}", client, e); - - // This worker has completed its job as the client cannot be refreshed any - // further, and can therefore exit without an error. - return Ok(()); - } - } - _ => (), - }; +pub fn spawn_refresh_client( + mut client: ForeignClient, +) -> TaskHandle { + spawn_background_task( + "refresh_client".to_string(), + Some(Duration::from_secs(1)), + move || -> Result<(), TaskError> { + let res = client.refresh().map_err(|e| { + if let ForeignClientErrorDetail::ExpiredOrFrozen(_) = e.detail() { + TaskError::Fatal(e) + } else { + TaskError::Ignore(e) + } + })?; - last_refresh = Instant::now(); + if res.is_some() { + telemetry!(ibc_client_updates, &client.dst_chain.id(), &client.id, 1); } - if skip_misbehaviour { - continue; - } + Ok(()) + }, + ) +} - if let Ok(cmd) = self.cmd_rx.try_recv() { - match self.process_cmd(cmd, &client) { - Next::Continue => continue, - Next::Abort => break, - }; - } +pub fn detect_misbehavior_task( + receiver: Receiver, + client: ForeignClient, +) -> Option { + match client.detect_misbehaviour_and_submit_evidence(None) { + MisbehaviourResults::ValidClient => {} + MisbehaviourResults::VerificationError => {} + MisbehaviourResults::EvidenceSubmitted(_) => { + return None; + } + MisbehaviourResults::CannotExecute => { + return None; } - - Ok(()) } - fn process_cmd(&self, cmd: WorkerCmd, client: &ForeignClient) -> Next { - match cmd { - WorkerCmd::IbcEvents { batch } => { - trace!("[{}] worker received batch: {:?}", client, batch); - - for event in batch.events { - if let IbcEvent::UpdateClient(update) = event { - debug!("[{}] client was updated", client); - - // Run misbehaviour. If evidence submitted the loop will exit in next - // iteration with frozen client - if self.detect_misbehaviour(client, Some(update)) { - telemetry!( - ibc_client_misbehaviour, - &self.client.dst_chain_id, - &self.client.dst_client_id, - 1 - ); + let handle = spawn_background_task( + "detect_misbehavior".to_string(), + Some(Duration::from_millis(600)), + move || -> Result<(), TaskError> { + if let Ok(cmd) = receiver.try_recv() { + match cmd { + WorkerCmd::IbcEvents { batch } => { + trace!("[{}] worker received batch: {:?}", client, batch); + + for event in batch.events { + if let IbcEvent::UpdateClient(update) = event { + debug!("[{}] client was updated", client); + + match client.detect_misbehaviour_and_submit_evidence(Some(update)) { + MisbehaviourResults::ValidClient => {} + MisbehaviourResults::VerificationError => { + // can retry in next call + } + MisbehaviourResults::EvidenceSubmitted(_) => { + // if evidence was submitted successfully then exit + return Err(TaskError::Abort); + } + MisbehaviourResults::CannotExecute => { + // skip misbehaviour checking if chain does not have support for it (i.e. client + // update event does not include the header) + return Err(TaskError::Abort); + } + } + } } } - } - - Next::Continue - } - - WorkerCmd::NewBlock { .. } => Next::Continue, - WorkerCmd::ClearPendingPackets => Next::Continue, - - WorkerCmd::Shutdown => Next::Abort, - } - } - fn detect_misbehaviour( - &self, - client: &ForeignClient, - update: Option, - ) -> bool { - match client.detect_misbehaviour_and_submit_evidence(update) { - MisbehaviourResults::ValidClient => false, - MisbehaviourResults::VerificationError => { - // can retry in next call - false - } - MisbehaviourResults::EvidenceSubmitted(_) => { - // if evidence was submitted successfully then exit - true - } - MisbehaviourResults::CannotExecute => { - // skip misbehaviour checking if chain does not have support for it (i.e. client - // update event does not include the header) - true + WorkerCmd::NewBlock { .. } => {} + WorkerCmd::ClearPendingPackets => {} + } } - } - } - /// Get a reference to the client worker's chains. - pub fn chains(&self) -> &ChainHandlePair { - &self.chains - } - - /// Get a reference to the client worker's object. - pub fn object(&self) -> &Client { - &self.client - } -} + Ok(()) + }, + ); -pub enum Next { - Abort, - Continue, + Some(handle) } diff --git a/relayer/src/worker/cmd.rs b/relayer/src/worker/cmd.rs index d9ddde001e..ef8ce932f3 100644 --- a/relayer/src/worker/cmd.rs +++ b/relayer/src/worker/cmd.rs @@ -13,7 +13,4 @@ pub enum WorkerCmd { /// Trigger a pending packets clear ClearPendingPackets, - - /// Shutdown the worker - Shutdown, } diff --git a/relayer/src/worker/connection.rs b/relayer/src/worker/connection.rs index 7a573c1eee..384959c53d 100644 --- a/relayer/src/worker/connection.rs +++ b/relayer/src/worker/connection.rs @@ -1,10 +1,9 @@ use core::time::Duration; -use std::thread; - use crossbeam_channel::Receiver; -use tracing::{debug, info, warn}; +use tracing::{debug, warn}; use crate::connection::Connection as RelayConnection; +use crate::util::task::{spawn_background_task, TaskError, TaskHandle}; use crate::{ chain::handle::{ChainHandle, ChainHandlePair}, object::Connection, @@ -15,38 +14,18 @@ use crate::{ use super::error::RunError; use super::WorkerCmd; -pub struct ConnectionWorker { +pub fn spawn_connection_worker( connection: Connection, chains: ChainHandlePair, cmd_rx: Receiver, -} - -impl ConnectionWorker { - pub fn new( - connection: Connection, - chains: ChainHandlePair, - cmd_rx: Receiver, - ) -> Self { - Self { - connection, - chains, - cmd_rx, - } - } - - /// Run the event loop for events associated with a [`Connection`]. - pub(crate) fn run(self) -> Result<(), RunError> { - let a_chain = self.chains.a.clone(); - let b_chain = self.chains.b.clone(); - - // Flag that indicates if the worker should actively resume handshake. - // Set on start or when event based handshake fails. - let mut resume_handshake = true; - - loop { - thread::sleep(Duration::from_millis(200)); - - if let Ok(cmd) = self.cmd_rx.try_recv() { +) -> TaskHandle { + let mut resume_handshake = true; + + spawn_background_task( + "connection_worker".to_string(), + Some(Duration::from_millis(200)), + move || { + if let Ok(cmd) = cmd_rx.try_recv() { let result = match cmd { WorkerCmd::IbcEvents { batch } => { // there can be up to two event for this connection, e.g. init and try. @@ -54,18 +33,18 @@ impl ConnectionWorker let last_event = batch.events.last(); debug!( - connection = %self.connection.short_name(), + connection = %connection.short_name(), "connection worker starts processing {:#?}", last_event ); match last_event { Some(event) => { let mut handshake_connection = RelayConnection::restore_from_event( - a_chain.clone(), - b_chain.clone(), + chains.a.clone(), + chains.b.clone(), event.clone(), ) - .map_err(RunError::connection)?; + .map_err(|e| TaskError::Fatal(RunError::connection(e)))?; retry_with_index( retry_strategy::worker_default_strategy(), @@ -81,42 +60,39 @@ impl ConnectionWorker new_block: _, } => { if !resume_handshake { - continue; + return Ok(()); } debug!( - connection = %self.connection.short_name(), + connection = %connection.short_name(), "connection worker starts processing block event at {}", current_height ); - let height = current_height.decrement().map_err(RunError::ics02)?; + let height = current_height + .decrement() + .map_err(|e| TaskError::Fatal(RunError::ics02(e)))?; let (mut handshake_connection, state) = RelayConnection::restore_from_state( - a_chain.clone(), - b_chain.clone(), - self.connection.clone(), + chains.a.clone(), + chains.b.clone(), + connection.clone(), height, ) - .map_err(RunError::connection)?; + .map_err(|e| TaskError::Fatal(RunError::connection(e)))?; retry_with_index(retry_strategy::worker_default_strategy(), |index| { handshake_connection.step_state(state, index) }) } - WorkerCmd::Shutdown => { - info!(connection = %self.connection.short_name(), "shutting down Connection worker"); - return Ok(()); - } - WorkerCmd::ClearPendingPackets => Ok(()), // nothing to do }; if let Err(retries) = result { warn!( - connection = %self.connection.short_name(), + connection = %connection.short_name(), "connection worker failed after {} retries", retries ); @@ -126,16 +102,8 @@ impl ConnectionWorker resume_handshake = false; } } - } - } - - /// Get a reference to the uni chan path worker's chains. - pub fn chains(&self) -> &ChainHandlePair { - &self.chains - } - - /// Get a reference to the client worker's object. - pub fn object(&self) -> &Connection { - &self.connection - } + + Ok(()) + }, + ) } diff --git a/relayer/src/worker/error.rs b/relayer/src/worker/error.rs index 96a08f33a3..65b9e00e5d 100644 --- a/relayer/src/worker/error.rs +++ b/relayer/src/worker/error.rs @@ -1,4 +1,5 @@ -use flex_error::define_error; +use crossbeam_channel::RecvError; +use flex_error::{define_error, DisplayOnly}; use crate::channel::ChannelError; use crate::connection::ConnectionError; @@ -28,7 +29,11 @@ define_error! { | e | { format_args!("Packet worker failed after {} retries", e.retries) - } + }, + + Recv + [ DisplayOnly ] + | _ | { "error receiving from channel: sender end has been closed" } } } diff --git a/relayer/src/worker/handle.rs b/relayer/src/worker/handle.rs index 36ee099802..f945faa838 100644 --- a/relayer/src/worker/handle.rs +++ b/relayer/src/worker/handle.rs @@ -1,6 +1,5 @@ use core::fmt; -use std::thread::{self, JoinHandle}; - +use core::mem; use crossbeam_channel::Sender; use tracing::trace; @@ -10,27 +9,17 @@ use ibc::{ Height, }; +use crate::util::task::TaskHandle; use crate::{event::monitor::EventBatch, object::Object}; use super::error::WorkerError; use super::{WorkerCmd, WorkerId}; -/// Handle to a [`Worker`](crate::worker::Worker), -/// for sending [`WorkerCmd`]s to it. pub struct WorkerHandle { id: WorkerId, object: Object, tx: Sender, - thread_handle: JoinHandle<()>, -} - -impl fmt::Debug for WorkerHandle { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("WorkerHandle") - .field("id", &self.id) - .field("object", &self.object) - .finish_non_exhaustive() - } + task_handles: Vec, } impl WorkerHandle { @@ -38,13 +27,13 @@ impl WorkerHandle { id: WorkerId, object: Object, tx: Sender, - thread_handle: JoinHandle<()>, + task_handles: Vec, ) -> Self { Self { id, object, tx, - thread_handle, + task_handles, } } @@ -80,17 +69,30 @@ impl WorkerHandle { .map_err(WorkerError::send) } - /// Shutdown the worker. - pub fn shutdown(&self) -> Result<(), WorkerError> { - self.tx.send(WorkerCmd::Shutdown).map_err(WorkerError::send) + /// Shutdown all worker tasks without waiting for them to terminate. + pub fn shutdown(&self) { + for task in self.task_handles.iter() { + task.shutdown() + } + } + + /// Shutdown all worker tasks and wait for them to terminate + pub fn shutdown_and_wait(self) { + for task in self.task_handles.iter() { + // Send shutdown signal to all tasks in parallel. + task.shutdown() + } + // Drop handle automatically handles the waiting for tasks to terminate. } /// Wait for the worker thread to finish. - pub fn join(self) -> thread::Result<()> { + pub fn join(mut self) { + let task_handles = mem::take(&mut self.task_handles); trace!(worker = %self.object.short_name(), "worker::handle: waiting for worker loop to end"); - let res = self.thread_handle.join(); + for task in task_handles.into_iter() { + task.join() + } trace!(worker = %self.object.short_name(), "worker::handle: waiting for worker loop to end: done"); - res } /// Get the worker's id. @@ -103,3 +105,20 @@ impl WorkerHandle { &self.object } } + +// Drop handle to send shutdown signals to background tasks in parallel +// before waiting for all of them to terminate. +impl Drop for WorkerHandle { + fn drop(&mut self) { + self.shutdown() + } +} + +impl fmt::Debug for WorkerHandle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WorkerHandle") + .field("id", &self.id) + .field("object", &self.object) + .finish_non_exhaustive() + } +} diff --git a/relayer/src/worker/map.rs b/relayer/src/worker/map.rs index ccc126ba0c..6b5bc8049b 100644 --- a/relayer/src/worker/map.rs +++ b/relayer/src/worker/map.rs @@ -1,9 +1,8 @@ use alloc::collections::btree_map::BTreeMap as HashMap; - -use crossbeam_channel::Sender; +use core::mem; use ibc::core::ics24_host::identifier::ChainId; -use tracing::{debug, trace, warn}; +use tracing::{debug, trace}; use crate::{ chain::handle::{ChainHandle, ChainHandlePair}, @@ -12,27 +11,31 @@ use crate::{ telemetry, }; -use super::{Worker, WorkerHandle, WorkerId, WorkerMsg}; +use super::{spawn_worker_tasks, WorkerHandle, WorkerId}; /// Manage the lifecycle of [`Worker`]s associated with [`Object`]s. #[derive(Debug)] pub struct WorkerMap { workers: HashMap, latest_worker_id: WorkerId, - msg_tx: Sender, } -impl WorkerMap { - /// Create a new worker map, which will spawn workers with - /// the given channel for sending messages back to the - /// [`Supervisor`](crate::supervisor::Supervisor). - pub fn new(msg_tx: Sender) -> Self { +impl Default for WorkerMap { + fn default() -> Self { Self { workers: HashMap::new(), latest_worker_id: WorkerId::new(0), - msg_tx, } } +} + +impl WorkerMap { + /// Create a new worker map, which will spawn workers with + /// the given channel for sending messages back to the + /// [`Supervisor`](crate::supervisor::Supervisor). + pub fn new() -> Self { + Self::default() + } /// Returns `true` if there is a spawned [`Worker`] associated with the given [`Object`]. pub fn contains(&self, object: &Object) -> bool { @@ -148,11 +151,10 @@ impl WorkerMap { ) -> WorkerHandle { telemetry!(worker, metric_type(object), 1); - Worker::spawn( + spawn_worker_tasks( ChainHandlePair { a: src, b: dst }, self.next_worker_id(), object.clone(), - self.msg_tx.clone(), config, ) } @@ -186,16 +188,9 @@ impl WorkerMap { if let Some(handle) = self.workers.remove(object) { telemetry!(worker, metric_type(object), -1); - match handle.shutdown() { - Ok(()) => { - trace!(object = %object.short_name(), "waiting for worker to exit"); - let _ = handle.join(); - } - Err(e) => { - warn!(object = %object.short_name(), "a worker may have failed to shutdown properly: {}", e); - } - } + handle.shutdown_and_wait(); } + // Drop handle automatically handles the waiting for tasks to terminate. } /// Get an iterator over the worker map's objects. @@ -204,6 +199,22 @@ impl WorkerMap { .iter() .map(|(object, handle)| (handle.id(), object)) } + + pub fn shutdown(&mut self) { + let workers = mem::take(&mut self.workers); + for worker in workers.values() { + // Send shutdown signal to all tasks in parallel. + worker.shutdown(); + } + } +} + +// Drop handle to send shutdown signals to background tasks in parallel +// before waiting for all of them to terminate. +impl Drop for WorkerMap { + fn drop(&mut self) { + self.shutdown() + } } #[cfg(feature = "telemetry")] diff --git a/relayer/src/worker/packet.rs b/relayer/src/worker/packet.rs index 1215d54c4f..495a441a14 100644 --- a/relayer/src/worker/packet.rs +++ b/relayer/src/worker/packet.rs @@ -1,256 +1,209 @@ use core::time::Duration; - use crossbeam_channel::Receiver; use ibc::Height; -use tracing::{error, info, trace, warn}; +use std::sync::Arc; +use tracing::{error, trace}; -use crate::{ - chain::handle::{ChainHandle, ChainHandlePair}, - config::Packets as PacketsConfig, - link::{Link, LinkParameters, RelaySummary}, - object::Packet, - telemetry, - util::retry::{retry_with_index, RetryResult}, - worker::retry_strategy, -}; +use crate::chain::handle::ChainHandle; +use crate::link::{Link, RelaySummary}; +use crate::object::Packet; +use crate::telemetry; +use crate::util::retry::{retry_with_index, RetryResult}; +use crate::util::task::{spawn_background_task, TaskError, TaskHandle}; +use crate::worker::retry_strategy; use super::error::RunError; use super::WorkerCmd; -enum Step { - Success(RelaySummary), - Shutdown, +/// Whether or not to clear pending packets at this `step` for the given height. +/// Packets are cleared on the first iteration if `clear_on_start` is true. +/// Subsequently, packets are cleared only if `clear_interval` is not `0` and +/// if we have reached the interval. +fn should_clear_packets( + is_first_run: &mut bool, + clear_on_start: bool, + clear_interval: u64, + height: Height, +) -> bool { + if *is_first_run { + *is_first_run = false; + clear_on_start + } else { + clear_interval != 0 && height.revision_height % clear_interval == 0 + } } -#[derive(Debug)] -pub struct PacketWorker { - path: Packet, - chains: ChainHandlePair, +pub fn spawn_packet_cmd_worker( cmd_rx: Receiver, - packets_cfg: PacketsConfig, - first_run: bool, -} - -impl PacketWorker { - pub fn new( - path: Packet, - chains: ChainHandlePair, - cmd_rx: Receiver, - packets_cfg: PacketsConfig, - ) -> Self { - Self { - path, - chains, - cmd_rx, - packets_cfg, - first_run: true, - } - } - - /// Whether or not to clear pending packets at this `step` for the given height. - /// Packets are cleared on the first iteration if `clear_on_start` is true. - /// Subsequently, packets are cleared only if `clear_interval` is not `0` and - /// if we have reached the interval. - fn clear_packets(&mut self, height: Height) -> bool { - if self.first_run { - self.first_run = false; - self.packets_cfg.clear_on_start - } else { - self.packets_cfg.clear_interval != 0 - && height.revision_height % self.packets_cfg.clear_interval == 0 - } - } - - /// Run the event loop for events associated with a [`Packet`]. - pub fn run(mut self) -> Result<(), RunError> { - let mut link = Link::new_from_opts( - self.chains.a.clone(), - self.chains.b.clone(), - LinkParameters { - src_port_id: self.path.src_port_id.clone(), - src_channel_id: self.path.src_channel_id.clone(), - }, - self.packets_cfg.tx_confirmation, - ) - .map_err(RunError::link)?; - - let is_closed = link.is_closed().map_err(RunError::link)?; - - // TODO: Do periodical checks that the link is closed (upon every retry in the loop). - if is_closed { - warn!("channel is closed, exiting"); - return Ok(()); - } - - loop { - const BACKOFF: Duration = Duration::from_millis(200); - - // Pop-out any unprocessed commands - // If there are no incoming commands, it's safe to backoff. - let maybe_cmd = crossbeam_channel::select! { - recv(self.cmd_rx) -> cmd => cmd.ok(), - recv(crossbeam_channel::after(BACKOFF)) -> _ => None, - }; - - let result = retry_with_index(retry_strategy::worker_stubborn_strategy(), |index| { - self.step(maybe_cmd.clone(), &mut link, index) - }); - - match result { - Ok(Step::Success(summary)) => { - if !summary.is_empty() { - trace!("Packet worker produced relay summary: {:?}", summary); - } - telemetry!(self.packet_metrics(&summary)); - } - - Ok(Step::Shutdown) => { - info!(path = %self.path.short_name(), "shutting down Packet worker"); - return Ok(()); - } - - Err(retries) => { - return Err(RunError::retry(retries)); - } + link: Arc>, + clear_on_start: bool, + clear_interval: u64, + path: Packet, +) -> TaskHandle { + let mut is_first_run: bool = true; + spawn_background_task( + "packet_worker".to_string(), + Some(Duration::from_millis(200)), + move || { + if let Ok(cmd) = cmd_rx.try_recv() { + retry_with_index(retry_strategy::worker_stubborn_strategy(), |index| { + handle_packet_cmd( + &mut is_first_run, + &link, + clear_on_start, + clear_interval, + &path, + cmd.clone(), + index, + ) + }) + .map_err(|e| TaskError::Fatal(RunError::retry(e)))?; } - } - } - - /// Receives worker commands, which may be: - /// - IbcEvent => then it updates schedule - /// - NewBlock => schedules packet clearing - /// - Shutdown => exits - /// - /// Regardless of the incoming command, this method - /// also refreshes and executes any scheduled operational - /// data that is ready. - fn step( - &mut self, - cmd: Option, - link: &mut Link, - index: u64, - ) -> RetryResult { - if let Some(cmd) = cmd { - let result = match cmd { - WorkerCmd::IbcEvents { batch } => link.a_to_b.update_schedule(batch), - - // Handle the arrival of an event signaling that the - // source chain has advanced to a new block. - WorkerCmd::NewBlock { - height, - new_block: _, - } => { - // Schedule the clearing of pending packets. This may happen once at start, - // and may be _forced_ at predefined block intervals. - link.a_to_b - .schedule_packet_clearing(Some(height), self.clear_packets(height)) - } - WorkerCmd::ClearPendingPackets => link.a_to_b.schedule_packet_clearing(None, true), - - WorkerCmd::Shutdown => { - return RetryResult::Ok(Step::Shutdown); - } - }; - - if let Err(e) = result { - error!( - path = %self.path.short_name(), - "[{}] worker: handling command encountered error: {}", - link.a_to_b, e - ); + Ok(()) + }, + ) +} - return RetryResult::Retry(index); +pub fn spawn_link_worker( + path: Packet, + link: Arc>, +) -> TaskHandle { + spawn_background_task( + "link_worker".to_string(), + Some(Duration::from_millis(500)), + move || { + link.a_to_b + .refresh_schedule() + .map_err(|e| TaskError::Ignore(RunError::link(e)))?; + + link.a_to_b + .execute_schedule() + .map_err(|e| TaskError::Ignore(RunError::link(e)))?; + + let summary = link.a_to_b.process_pending_txs(); + + if !summary.is_empty() { + trace!("Packet worker produced relay summary: {:?}", summary); } - } - - if let Err(e) = link - .a_to_b - .refresh_schedule() - .and_then(|_| link.a_to_b.execute_schedule()) - { - error!( - "[{}] worker: schedule execution encountered error: {}", - link.a_to_b, e - ); - return RetryResult::Retry(index); - } - let confirmation_result = link.a_to_b.process_pending_txs(); - - RetryResult::Ok(Step::Success(confirmation_result)) - } - - /// Get a reference to the uni chan path worker's chains. - pub fn chains(&self) -> &ChainHandlePair { - &self.chains - } - - /// Get a reference to the client worker's object. - pub fn object(&self) -> &Packet { - &self.path - } + telemetry!(packet_metrics(&path, &summary)); - #[cfg(feature = "telemetry")] - fn packet_metrics(&self, summary: &RelaySummary) { - self.receive_packet_metrics(summary); - self.acknowledgment_metrics(summary); - self.timeout_metrics(summary); - } + Ok(()) + }, + ) +} - #[cfg(feature = "telemetry")] - fn receive_packet_metrics(&self, summary: &RelaySummary) { - use ibc::events::IbcEvent::WriteAcknowledgement; +/// Receives worker commands, which may be: +/// - IbcEvent => then it updates schedule +/// - NewBlock => schedules packet clearing +/// - Shutdown => exits +/// +/// Regardless of the incoming command, this method +/// also refreshes and executes any scheduled operational +/// data that is ready. +fn handle_packet_cmd( + is_first_run: &mut bool, + link: &Link, + clear_on_start: bool, + clear_interval: u64, + path: &Packet, + cmd: WorkerCmd, + index: u64, +) -> RetryResult<(), u64> { + let result = match cmd { + WorkerCmd::IbcEvents { batch } => link.a_to_b.update_schedule(batch), + + // Handle the arrival of an event signaling that the + // source chain has advanced to a new block. + WorkerCmd::NewBlock { + height, + new_block: _, + } => { + let do_clear_packet = + should_clear_packets(is_first_run, clear_on_start, clear_interval, height); + + // Schedule the clearing of pending packets. This may happen once at start, + // and may be _forced_ at predefined block intervals. + link.a_to_b + .schedule_packet_clearing(Some(height), do_clear_packet) + } - let count = summary - .events - .iter() - .filter(|e| matches!(e, WriteAcknowledgement(_))) - .count(); + WorkerCmd::ClearPendingPackets => link.a_to_b.schedule_packet_clearing(None, true), + }; - telemetry!( - ibc_receive_packets, - &self.path.src_chain_id, - &self.path.src_channel_id, - &self.path.src_port_id, - count as u64, + if let Err(e) = result { + error!( + path = %path.short_name(), + "[{}] worker: handling command encountered error: {}", + link.a_to_b, e ); + + return RetryResult::Retry(index); } - #[cfg(feature = "telemetry")] - fn acknowledgment_metrics(&self, summary: &RelaySummary) { - use ibc::events::IbcEvent::AcknowledgePacket; + RetryResult::Ok(()) +} - let count = summary - .events - .iter() - .filter(|e| matches!(e, AcknowledgePacket(_))) - .count(); +#[cfg(feature = "telemetry")] +fn packet_metrics(path: &Packet, summary: &RelaySummary) { + receive_packet_metrics(path, summary); + acknowledgment_metrics(path, summary); + timeout_metrics(path, summary); +} - telemetry!( - ibc_acknowledgment_packets, - &self.path.src_chain_id, - &self.path.src_channel_id, - &self.path.src_port_id, - count as u64, - ); - } +#[cfg(feature = "telemetry")] +fn receive_packet_metrics(path: &Packet, summary: &RelaySummary) { + use ibc::events::IbcEvent::WriteAcknowledgement; + + let count = summary + .events + .iter() + .filter(|e| matches!(e, WriteAcknowledgement(_))) + .count(); + + telemetry!( + ibc_receive_packets, + &path.src_chain_id, + &path.src_channel_id, + &path.src_port_id, + count as u64, + ); +} - #[cfg(feature = "telemetry")] - fn timeout_metrics(&self, summary: &RelaySummary) { - use ibc::events::IbcEvent::TimeoutPacket; - let count = summary - .events - .iter() - .filter(|e| matches!(e, TimeoutPacket(_))) - .count(); +#[cfg(feature = "telemetry")] +fn acknowledgment_metrics(path: &Packet, summary: &RelaySummary) { + use ibc::events::IbcEvent::AcknowledgePacket; + + let count = summary + .events + .iter() + .filter(|e| matches!(e, AcknowledgePacket(_))) + .count(); + + telemetry!( + ibc_acknowledgment_packets, + &path.src_chain_id, + &path.src_channel_id, + &path.src_port_id, + count as u64, + ); +} - telemetry!( - ibc_timeout_packets, - &self.path.src_chain_id, - &self.path.src_channel_id, - &self.path.src_port_id, - count as u64, - ); - } +#[cfg(feature = "telemetry")] +fn timeout_metrics(path: &Packet, summary: &RelaySummary) { + use ibc::events::IbcEvent::TimeoutPacket; + let count = summary + .events + .iter() + .filter(|e| matches!(e, TimeoutPacket(_))) + .count(); + + telemetry!( + ibc_timeout_packets, + &path.src_chain_id, + &path.src_channel_id, + &path.src_port_id, + count as u64, + ); } diff --git a/tools/integration-test/src/framework/binary/chain.rs b/tools/integration-test/src/framework/binary/chain.rs index 92bfba7cf9..7ba0a426ea 100644 --- a/tools/integration-test/src/framework/binary/chain.rs +++ b/tools/integration-test/src/framework/binary/chain.rs @@ -7,6 +7,7 @@ use ibc_relayer::chain::handle::ChainHandle; use ibc_relayer::config::Config; use ibc_relayer::config::SharedConfig; use ibc_relayer::registry::SharedRegistry; +use ibc_relayer::supervisor::SupervisorHandle; use tracing::info; use super::node::{run_binary_node_test, BinaryNodeTest, NodeConfigOverride}; @@ -17,7 +18,6 @@ use crate::chain::builder::ChainBuilder; use crate::error::Error; use crate::framework::base::HasOverrides; use crate::framework::base::{run_basic_test, BasicTest}; -use crate::relayer::supervisor::SupervisorHandle; use crate::types::binary::chains::{ConnectedChains, DropChainHandle}; use crate::types::config::TestConfig; use crate::types::env::write_env; @@ -116,7 +116,7 @@ pub trait SupervisorOverride { &self, config: &SharedConfig, registry: &SharedRegistry, - ) -> Option; + ) -> Result, Error>; } /** @@ -199,15 +199,17 @@ where info!("written chains environment to {}", env_path.display()); - let _supervisor = self - .test - .get_overrides() - .spawn_supervisor(&chains.config, &chains.registry); - let _drop_handle_a = DropChainHandle(chains.handle_a.clone()); let _drop_handle_b = DropChainHandle(chains.handle_b.clone()); - self.test.run(config, chains)?; + { + let _supervisor = self + .test + .get_overrides() + .spawn_supervisor(&chains.config, &chains.registry); + + self.test.run(config, chains)?; + } Ok(()) } diff --git a/tools/integration-test/src/framework/binary/node.rs b/tools/integration-test/src/framework/binary/node.rs index 014540ebcb..be2467fc43 100644 --- a/tools/integration-test/src/framework/binary/node.rs +++ b/tools/integration-test/src/framework/binary/node.rs @@ -86,7 +86,7 @@ where })?; let _node_process_a = node_a.process.clone(); - let _node_process_b = node_a.process.clone(); + let _node_process_b = node_b.process.clone(); self.test.run(config, node_a, node_b)?; diff --git a/tools/integration-test/src/framework/overrides.rs b/tools/integration-test/src/framework/overrides.rs index 887571157b..7d5bed80f1 100644 --- a/tools/integration-test/src/framework/overrides.rs +++ b/tools/integration-test/src/framework/overrides.rs @@ -7,13 +7,13 @@ use ibc_relayer::chain::handle::ChainHandle; use ibc_relayer::config::Config; use ibc_relayer::config::SharedConfig; use ibc_relayer::registry::SharedRegistry; +use ibc_relayer::supervisor::{spawn_supervisor, SupervisorHandle}; use crate::error::Error; use crate::framework::base::HasOverrides; use crate::framework::binary::chain::{RelayerConfigOverride, SupervisorOverride}; use crate::framework::binary::channel::PortsOverride; use crate::framework::binary::node::NodeConfigOverride; -use crate::relayer::supervisor::{spawn_supervisor, SupervisorHandle}; /** This trait should be implemented for all test cases to allow overriding @@ -70,9 +70,9 @@ pub trait TestOverrides { &self, config: &SharedConfig, registry: &SharedRegistry, - ) -> Option { - let handle = spawn_supervisor(config.clone(), registry.clone()); - Some(handle) + ) -> Result, Error> { + let handle = spawn_supervisor(config.clone(), registry.clone(), None, false)?; + Ok(Some(handle)) } /** @@ -121,7 +121,7 @@ impl SupervisorOverride for Test { &self, config: &SharedConfig, registry: &SharedRegistry, - ) -> Option { + ) -> Result, Error> { TestOverrides::spawn_supervisor(self, config, registry) } } diff --git a/tools/integration-test/src/relayer/mod.rs b/tools/integration-test/src/relayer/mod.rs index 4aef157afe..92bd121309 100644 --- a/tools/integration-test/src/relayer/mod.rs +++ b/tools/integration-test/src/relayer/mod.rs @@ -21,5 +21,4 @@ pub mod chain; pub mod connection; pub mod foreign_client; -pub mod supervisor; pub mod transfer; diff --git a/tools/integration-test/src/relayer/supervisor.rs b/tools/integration-test/src/relayer/supervisor.rs deleted file mode 100644 index 5114438754..0000000000 --- a/tools/integration-test/src/relayer/supervisor.rs +++ /dev/null @@ -1,83 +0,0 @@ -/*! - Extension to the [`Supervisor`] data type. -*/ - -use core::time::Duration; -use crossbeam_channel::{bounded, Sender}; -use ibc_relayer::chain::handle::ChainHandle; -use ibc_relayer::config::SharedConfig; -use ibc_relayer::registry::SharedRegistry; -use ibc_relayer::supervisor::cmd::SupervisorCmd; -use ibc_relayer::supervisor::Supervisor; -use std::cell::Cell; -use tracing::info; - -/** - A wrapper around the SupervisorCmd sender so that we can - send stop signal to the supervisor before stopping the - chain drivers to prevent the supervisor from raising - errors caused by closed connections. -*/ -pub struct SupervisorHandle { - sender: Sender, - stopped: Cell, -} - -/** - Spawn a supervisor for testing purpose using the provided - [`SharedConfig`] and [`SharedRegistry`]. Returns a - [`SupervisorHandle`] that stops the supervisor when the - value is dropped. -*/ -pub fn spawn_supervisor( - config: SharedConfig, - registry: SharedRegistry, -) -> SupervisorHandle { - let (mut supervisor, sender) = Supervisor::new_with_registry(config, registry, None); - - std::thread::spawn(move || { - // We run the supervisor without health check, as `gaiad` running - // from Cosmos.nix do not contain version information and would fail - // the health check. - // https://github.com/informalsystems/cosmos.nix/issues/53 - supervisor.run_without_health_check().unwrap(); - }); - - SupervisorHandle::new(sender) -} - -impl SupervisorHandle { - /** - Create a new [`SupervisorHandle`] based on the underlying [`SupervisorCmd`] - sender. - */ - pub fn new(sender: Sender) -> Self { - Self { - sender, - stopped: Cell::new(false), - } - } - - /** - Explicitly stop the running supervisor. This is useful in tests where - the supervisor has to be stopped and restarted explicitly. - - Note that after stopping the supervisor, the only way to restart it - is by respawning a new supervisor using [`spawn_supervisor`]. - */ - pub fn stop(&self) { - if !self.stopped.get() { - info!("stopping supervisor"); - self.stopped.set(true); - let (sender, receiver) = bounded(1); - let _ = self.sender.send(SupervisorCmd::Stop(sender)); - let _ = receiver.recv_timeout(Duration::from_secs(5)); - } - } -} - -impl Drop for SupervisorHandle { - fn drop(&mut self) { - self.stop(); - } -} From 490b8dfeeb76804591555c9837e51a8b2e79045f Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Tue, 14 Dec 2021 12:02:47 +0100 Subject: [PATCH 2/5] Use better names for worker tasks --- relayer/src/worker.rs | 2 +- relayer/src/worker/channel.rs | 2 +- relayer/src/worker/client.rs | 4 ++-- relayer/src/worker/connection.rs | 2 +- relayer/src/worker/error.rs | 2 +- relayer/src/worker/packet.rs | 6 +++--- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/relayer/src/worker.rs b/relayer/src/worker.rs index 78c2930e9b..db9bb5e412 100644 --- a/relayer/src/worker.rs +++ b/relayer/src/worker.rs @@ -105,7 +105,7 @@ pub fn spawn_worker_tasks, ) -> TaskHandle { spawn_background_task( - "refresh_client".to_string(), + format!("RefreshClientWorker({})", client), Some(Duration::from_secs(1)), move || -> Result<(), TaskError> { let res = client.refresh().map_err(|e| { @@ -56,7 +56,7 @@ pub fn detect_misbehavior_task Result<(), TaskError> { if let Ok(cmd) = receiver.try_recv() { diff --git a/relayer/src/worker/connection.rs b/relayer/src/worker/connection.rs index 384959c53d..9b3d4f87e0 100644 --- a/relayer/src/worker/connection.rs +++ b/relayer/src/worker/connection.rs @@ -22,7 +22,7 @@ pub fn spawn_connection_worker } | e | { - format_args!("Packet worker failed after {} retries", + format_args!("Worker failed after {} retries", e.retries) }, diff --git a/relayer/src/worker/packet.rs b/relayer/src/worker/packet.rs index 495a441a14..74ee0a1e84 100644 --- a/relayer/src/worker/packet.rs +++ b/relayer/src/worker/packet.rs @@ -42,7 +42,7 @@ pub fn spawn_packet_cmd_worker TaskHandle { let mut is_first_run: bool = true; spawn_background_task( - "packet_worker".to_string(), + format!("PacketCmdWorker({})", link.a_to_b), Some(Duration::from_millis(200)), move || { if let Ok(cmd) = cmd_rx.try_recv() { @@ -65,12 +65,12 @@ pub fn spawn_packet_cmd_worker( +pub fn spawn_packet_worker( path: Packet, link: Arc>, ) -> TaskHandle { spawn_background_task( - "link_worker".to_string(), + format!("PacketWorker({})", link.a_to_b), Some(Duration::from_millis(500)), move || { link.a_to_b From aad0e0019fb0c4f44cf1150376c8a7db5663d99f Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Wed, 15 Dec 2021 21:25:59 +0100 Subject: [PATCH 3/5] Add integration tests for connection and channel workers --- modules/src/core/ics24_host/identifier.rs | 5 +- relayer/src/connection.rs | 2 +- .../src/framework/overrides.rs | 4 +- tools/integration-test/src/prelude.rs | 25 ++-- tools/integration-test/src/relayer/channel.rs | 71 ++++++++++ .../src/relayer/connection.rs | 51 ++++++- tools/integration-test/src/relayer/mod.rs | 1 + tools/integration-test/src/tests/mod.rs | 1 + .../integration-test/src/tests/supervisor.rs | 132 ++++++++++++++++++ .../src/types/binary/chains.rs | 11 +- tools/integration-test/src/types/id.rs | 4 + .../integration-test/src/types/tagged/dual.rs | 4 + .../integration-test/src/types/tagged/mono.rs | 4 + 13 files changed, 296 insertions(+), 19 deletions(-) create mode 100644 tools/integration-test/src/relayer/channel.rs create mode 100644 tools/integration-test/src/tests/supervisor.rs diff --git a/modules/src/core/ics24_host/identifier.rs b/modules/src/core/ics24_host/identifier.rs index 929a5e5e59..087b958ccb 100644 --- a/modules/src/core/ics24_host/identifier.rs +++ b/modules/src/core/ics24_host/identifier.rs @@ -293,8 +293,9 @@ impl PartialEq for ConnectionId { pub struct PortId(String); impl PortId { - pub fn unsafe_new(id: &str) -> Self { - Self(id.to_string()) + /// Infallible creation of the well-known transfer port + pub fn transfer() -> Self { + Self("transfer".to_string()) } /// Get this identifier as a borrowed `&str` diff --git a/relayer/src/connection.rs b/relayer/src/connection.rs index 733f710a8c..a24fe6841e 100644 --- a/relayer/src/connection.rs +++ b/relayer/src/connection.rs @@ -1112,7 +1112,7 @@ impl Connection { } } -fn extract_connection_id(event: &IbcEvent) -> Result<&ConnectionId, ConnectionError> { +pub fn extract_connection_id(event: &IbcEvent) -> Result<&ConnectionId, ConnectionError> { match event { IbcEvent::OpenInitConnection(ev) => ev.connection_id().as_ref(), IbcEvent::OpenTryConnection(ev) => ev.connection_id().as_ref(), diff --git a/tools/integration-test/src/framework/overrides.rs b/tools/integration-test/src/framework/overrides.rs index 7d5bed80f1..221ea6c730 100644 --- a/tools/integration-test/src/framework/overrides.rs +++ b/tools/integration-test/src/framework/overrides.rs @@ -82,7 +82,7 @@ pub trait TestOverrides { Implemented for [`PortsOverride`]. */ fn channel_port_a(&self) -> PortId { - PortId::unsafe_new("transfer") + PortId::transfer() } /** @@ -92,7 +92,7 @@ pub trait TestOverrides { Implemented for [`PortsOverride`]. */ fn channel_port_b(&self) -> PortId { - PortId::unsafe_new("transfer") + PortId::transfer() } } diff --git a/tools/integration-test/src/prelude.rs b/tools/integration-test/src/prelude.rs index 6ba45409db..b8d409b9c3 100644 --- a/tools/integration-test/src/prelude.rs +++ b/tools/integration-test/src/prelude.rs @@ -3,6 +3,7 @@ */ pub use eyre::eyre; +pub use ibc::core::ics24_host::identifier::{ChainId, ChannelId, ClientId, ConnectionId, PortId}; pub use ibc_relayer::chain::handle::ChainHandle; pub use ibc_relayer::config::Config; pub use ibc_relayer::config::SharedConfig; @@ -12,22 +13,26 @@ pub use tracing::{debug, error, info, warn}; pub use crate::chain::driver::{tagged::TaggedChainDriverExt, ChainDriver}; pub use crate::error::Error; +pub use crate::framework::base::HasOverrides; +pub use crate::framework::binary::chain::{ + run_binary_chain_test, run_two_way_binary_chain_test, BinaryChainTest, +}; +pub use crate::framework::binary::channel::{ + run_binary_channel_test, run_two_way_binary_channel_test, BinaryChannelTest, +}; +pub use crate::framework::binary::node::{run_binary_node_test, BinaryNodeTest}; pub use crate::framework::overrides::TestOverrides; +pub use crate::relayer::channel::TaggedChannelEndExt; +pub use crate::relayer::connection::{TaggedConnectionEndExt, TaggedConnectionExt}; +pub use crate::relayer::foreign_client::TaggedForeignClientExt; pub use crate::types::binary::chains::ConnectedChains; pub use crate::types::binary::channel::ConnectedChannel; pub use crate::types::config::TestConfig; +pub use crate::types::id::*; pub use crate::types::single::node::{FullNode, TaggedFullNodeExt}; +pub use crate::types::tagged::{DualTagged, MonoTagged}; pub use crate::types::wallet::{ TaggedTestWalletsExt, TaggedWallet, TestWallets, Wallet, WalletAddress, WalletId, }; +pub use crate::util::retry::assert_eventually_succeed; pub use crate::util::suspend::suspend; - -pub use crate::framework::binary::channel::{ - run_binary_channel_test, run_two_way_binary_channel_test, BinaryChannelTest, -}; - -pub use crate::framework::binary::chain::{ - run_binary_chain_test, run_two_way_binary_chain_test, BinaryChainTest, -}; - -pub use crate::framework::binary::node::{run_binary_node_test, BinaryNodeTest}; diff --git a/tools/integration-test/src/relayer/channel.rs b/tools/integration-test/src/relayer/channel.rs new file mode 100644 index 0000000000..f4ecf3a15a --- /dev/null +++ b/tools/integration-test/src/relayer/channel.rs @@ -0,0 +1,71 @@ +use ibc::core::ics04_channel::channel::{ChannelEnd, Order}; +use ibc::Height; +use ibc_relayer::chain::handle::ChainHandle; +use ibc_relayer::channel::{extract_channel_id, Channel, ChannelSide}; + +use crate::error::Error; +use crate::types::id::{ + TaggedChannelId, TaggedChannelIdRef, TaggedClientIdRef, TaggedConnectionIdRef, TaggedPortIdRef, +}; +use crate::types::tagged::DualTagged; + +pub trait TaggedChannelEndExt { + fn tagged_counterparty_channel_id(&self) -> Option>; +} + +impl TaggedChannelEndExt + for DualTagged +{ + fn tagged_counterparty_channel_id(&self) -> Option> { + self.contra_map(|c| c.counterparty().channel_id.clone()) + .transpose() + } +} + +pub fn init_channel( + handle_a: &ChainA, + handle_b: &ChainB, + client_id_a: &TaggedClientIdRef, + client_id_b: &TaggedClientIdRef, + connection_id_a: &TaggedConnectionIdRef, + connection_id_b: &TaggedConnectionIdRef, + src_port_id: &TaggedPortIdRef, + dst_port_id: &TaggedPortIdRef, +) -> Result, Error> { + let channel = Channel { + connection_delay: Default::default(), + ordering: Order::Unordered, + a_side: ChannelSide::new( + handle_a.clone(), + client_id_a.cloned_value(), + connection_id_a.cloned_value(), + src_port_id.cloned_value(), + None, + None, + ), + b_side: ChannelSide::new( + handle_b.clone(), + client_id_b.cloned_value(), + connection_id_b.cloned_value(), + dst_port_id.cloned_value(), + None, + None, + ), + }; + + let event = channel.build_chan_open_init_and_send()?; + + let channel_id = extract_channel_id(&event)?; + + Ok(DualTagged::new(channel_id.clone())) +} + +pub fn query_channel_end( + handle: &ChainA, + channel_id: &TaggedChannelIdRef, + port_id: &TaggedPortIdRef, +) -> Result, Error> { + let channel_end = handle.query_channel(port_id.value(), channel_id.value(), Height::zero())?; + + Ok(DualTagged::new(channel_end)) +} diff --git a/tools/integration-test/src/relayer/connection.rs b/tools/integration-test/src/relayer/connection.rs index 1dde67137a..3922c2beb4 100644 --- a/tools/integration-test/src/relayer/connection.rs +++ b/tools/integration-test/src/relayer/connection.rs @@ -2,11 +2,15 @@ Definition for extension trait methods for [`Connection`] */ +use ibc::core::ics03_connection::connection::ConnectionEnd; +use ibc::timestamp::ZERO_DURATION; +use ibc::Height; use ibc_relayer::chain::handle::ChainHandle; -use ibc_relayer::connection::Connection; +use ibc_relayer::connection::{extract_connection_id, Connection, ConnectionSide}; -use crate::types::id::TaggedConnectionIdRef; -use crate::types::tagged::*; +use crate::error::Error; +use crate::types::id::{TaggedClientIdRef, TaggedConnectionId, TaggedConnectionIdRef}; +use crate::types::tagged::DualTagged; /** An extension trait that provide helper methods to get tagged identifiers @@ -24,6 +28,10 @@ pub trait TaggedConnectionExt { fn tagged_connection_id_b(&self) -> Option>; } +pub trait TaggedConnectionEndExt { + fn tagged_counterparty_connection_id(&self) -> Option>; +} + impl TaggedConnectionExt for Connection { @@ -35,3 +43,40 @@ impl TaggedConnectionExt TaggedConnectionEndExt + for DualTagged +{ + fn tagged_counterparty_connection_id(&self) -> Option> { + self.contra_map(|c| c.counterparty().connection_id.clone()) + .transpose() + } +} + +pub fn init_connection( + handle_a: &ChainA, + handle_b: &ChainB, + client_id_a: &TaggedClientIdRef, + client_id_b: &TaggedClientIdRef, +) -> Result, Error> { + let connection = Connection { + delay_period: ZERO_DURATION, + a_side: ConnectionSide::new(handle_a.clone(), (*client_id_a.value()).clone(), None), + b_side: ConnectionSide::new(handle_b.clone(), (*client_id_b.value()).clone(), None), + }; + + let event = connection.build_conn_init_and_send()?; + + let connection_id = extract_connection_id(&event)?; + + Ok(DualTagged::new(connection_id.clone())) +} + +pub fn query_connection_end( + handle: &ChainA, + connection_id: &TaggedConnectionIdRef, +) -> Result, Error> { + let connection_end = handle.query_connection(connection_id.value(), Height::zero())?; + + Ok(DualTagged::new(connection_end)) +} diff --git a/tools/integration-test/src/relayer/mod.rs b/tools/integration-test/src/relayer/mod.rs index 92bd121309..b25c21f2bb 100644 --- a/tools/integration-test/src/relayer/mod.rs +++ b/tools/integration-test/src/relayer/mod.rs @@ -19,6 +19,7 @@ */ pub mod chain; +pub mod channel; pub mod connection; pub mod foreign_client; pub mod transfer; diff --git a/tools/integration-test/src/tests/mod.rs b/tools/integration-test/src/tests/mod.rs index d76f266765..d5996571d0 100644 --- a/tools/integration-test/src/tests/mod.rs +++ b/tools/integration-test/src/tests/mod.rs @@ -6,6 +6,7 @@ */ pub mod memo; +pub mod supervisor; pub mod transfer; #[cfg(any(doc, feature = "manual"))] diff --git a/tools/integration-test/src/tests/supervisor.rs b/tools/integration-test/src/tests/supervisor.rs new file mode 100644 index 0000000000..8e50162f20 --- /dev/null +++ b/tools/integration-test/src/tests/supervisor.rs @@ -0,0 +1,132 @@ +use core::time::Duration; +use ibc::core::ics03_connection::connection::State as ConnectionState; +use ibc::core::ics04_channel::channel::State as ChannelState; +use ibc_relayer::config::{self, Config, ModeConfig}; + +use crate::prelude::*; +pub use crate::relayer::channel::{init_channel, query_channel_end}; +pub use crate::relayer::connection::{init_connection, query_connection_end}; + +#[test] +fn test_supervisor() -> Result<(), Error> { + run_binary_chain_test(&SupervisorTest) +} + +struct SupervisorTest; + +impl TestOverrides for SupervisorTest { + fn modify_relayer_config(&self, config: &mut Config) { + config.mode = ModeConfig { + clients: config::Clients { + enabled: true, + refresh: true, + misbehaviour: true, + }, + connections: config::Connections { enabled: true }, + channels: config::Channels { enabled: true }, + packets: config::Packets { + enabled: true, + clear_interval: 10, + clear_on_start: true, + filter: false, + tx_confirmation: true, + }, + }; + } +} + +impl BinaryChainTest for SupervisorTest { + fn run( + &self, + _config: &TestConfig, + chains: ConnectedChains, + ) -> Result<(), Error> { + let connection_id_b = init_connection( + &chains.handle_a, + &chains.handle_b, + &chains.client_b_to_a.tagged_client_id(), + &chains.client_a_to_b.tagged_client_id(), + )?; + + let connection_id_a = assert_eventually_succeed( + "connection should eventually open", + || { + let connection_end_b = + query_connection_end(&chains.handle_b, &connection_id_b.as_ref())?; + + if !connection_end_b + .value() + .state_matches(&ConnectionState::Open) + { + return Err(eyre!("expeted connection end A to be in open state")); + } + + let connection_id_a = connection_end_b + .tagged_counterparty_connection_id() + .ok_or_else(|| { + eyre!("expected counterparty connection id to present on open connection") + })?; + + let connection_end_a = + query_connection_end(&chains.handle_a, &connection_id_a.as_ref())?; + + if !connection_end_a + .value() + .state_matches(&ConnectionState::Open) + { + return Err(eyre!("expeted connection end B to be in open state")); + } + + Ok(connection_id_a) + }, + 20, + Duration::from_secs(1), + )?; + + let port_a = tagged_transfer_port(); + let port_b = tagged_transfer_port(); + + let channel_id_b = init_channel( + &chains.handle_a, + &chains.handle_b, + &chains.client_id_a(), + &chains.client_id_b(), + &connection_id_a.as_ref(), + &connection_id_b.as_ref(), + &port_a.as_ref(), + &port_b.as_ref(), + )?; + + assert_eventually_succeed( + "channel should eventually open", + || { + let channel_end_b = + query_channel_end(&chains.handle_b, &channel_id_b.as_ref(), &port_b.as_ref())?; + + if !channel_end_b.value().state_matches(&ChannelState::Open) { + return Err(eyre!("expeted channel end A to be in open state")); + } + + let channel_id_a = + channel_end_b + .tagged_counterparty_channel_id() + .ok_or_else(|| { + eyre!("expected counterparty channel id to present on open channel") + })?; + + let channel_end_a = + query_channel_end(&chains.handle_a, &channel_id_a.as_ref(), &port_a.as_ref())?; + + if !channel_end_a.value().state_matches(&ChannelState::Open) { + return Err(eyre!("expeted channel end B to be in open state")); + } + + Ok(channel_id_a) + }, + 20, + Duration::from_secs(1), + )?; + + Ok(()) + } +} diff --git a/tools/integration-test/src/types/binary/chains.rs b/tools/integration-test/src/types/binary/chains.rs index d5f33d7236..a0595c433a 100644 --- a/tools/integration-test/src/types/binary/chains.rs +++ b/tools/integration-test/src/types/binary/chains.rs @@ -9,8 +9,9 @@ use ibc_relayer::registry::SharedRegistry; use std::path::PathBuf; use tracing::info; +use crate::relayer::foreign_client::TaggedForeignClientExt; use crate::types::env::{prefix_writer, EnvWriter, ExportEnv}; -use crate::types::id::TaggedChainIdRef; +use crate::types::id::{TaggedChainIdRef, TaggedClientIdRef}; use crate::types::single::node::{FullNode, TaggedFullNodeExt}; use crate::types::tagged::*; @@ -139,6 +140,14 @@ impl ConnectedChains { self.node_a.chain_id() } + pub fn client_id_a(&self) -> TaggedClientIdRef { + self.client_b_to_a.tagged_client_id() + } + + pub fn client_id_b(&self) -> TaggedClientIdRef { + self.client_a_to_b.tagged_client_id() + } + /** The chain ID of chain B. */ diff --git a/tools/integration-test/src/types/id.rs b/tools/integration-test/src/types/id.rs index 19053e5176..07481784aa 100644 --- a/tools/integration-test/src/types/id.rs +++ b/tools/integration-test/src/types/id.rs @@ -70,3 +70,7 @@ pub type TaggedConnectionId = DualTagged = DualTagged; + +pub fn tagged_transfer_port() -> TaggedPortId { + DualTagged::new(PortId::transfer()) +} diff --git a/tools/integration-test/src/types/tagged/dual.rs b/tools/integration-test/src/types/tagged/dual.rs index bd6d7ae371..c967268ff2 100644 --- a/tools/integration-test/src/types/tagged/dual.rs +++ b/tools/integration-test/src/types/tagged/dual.rs @@ -330,6 +330,10 @@ impl<'a, TagA, TagB, Value: Clone> Tagged { pub fn cloned(&self) -> Tagged { Tagged::new(self.0.clone()) } + + pub fn cloned_value(&self) -> Value { + self.0.clone() + } } impl Tagged> { diff --git a/tools/integration-test/src/types/tagged/mono.rs b/tools/integration-test/src/types/tagged/mono.rs index fdbca12767..13e4d04768 100644 --- a/tools/integration-test/src/types/tagged/mono.rs +++ b/tools/integration-test/src/types/tagged/mono.rs @@ -239,6 +239,10 @@ impl<'a, Tag, Value: Clone> Tagged { pub fn cloned(&self) -> Tagged { Tagged::new(self.0.clone()) } + + pub fn cloned_value(&self) -> Value { + self.0.clone() + } } impl Tagged> { From af16f5b4b706655db5d0878241668a90c503ad3d Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Thu, 16 Dec 2021 12:02:26 +0100 Subject: [PATCH 4/5] Fix typo --- tools/integration-test/src/tests/supervisor.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/integration-test/src/tests/supervisor.rs b/tools/integration-test/src/tests/supervisor.rs index 8e50162f20..a1a3a5183b 100644 --- a/tools/integration-test/src/tests/supervisor.rs +++ b/tools/integration-test/src/tests/supervisor.rs @@ -58,7 +58,7 @@ impl BinaryChainTest for SupervisorTest { .value() .state_matches(&ConnectionState::Open) { - return Err(eyre!("expeted connection end A to be in open state")); + return Err(eyre!("expected connection end A to be in open state")); } let connection_id_a = connection_end_b @@ -74,7 +74,7 @@ impl BinaryChainTest for SupervisorTest { .value() .state_matches(&ConnectionState::Open) { - return Err(eyre!("expeted connection end B to be in open state")); + return Err(eyre!("expected connection end B to be in open state")); } Ok(connection_id_a) @@ -104,7 +104,7 @@ impl BinaryChainTest for SupervisorTest { query_channel_end(&chains.handle_b, &channel_id_b.as_ref(), &port_b.as_ref())?; if !channel_end_b.value().state_matches(&ChannelState::Open) { - return Err(eyre!("expeted channel end A to be in open state")); + return Err(eyre!("expected channel end A to be in open state")); } let channel_id_a = @@ -118,7 +118,7 @@ impl BinaryChainTest for SupervisorTest { query_channel_end(&chains.handle_a, &channel_id_a.as_ref(), &port_a.as_ref())?; if !channel_end_a.value().state_matches(&ChannelState::Open) { - return Err(eyre!("expeted channel end B to be in open state")); + return Err(eyre!("expected channel end B to be in open state")); } Ok(channel_id_a) From e060b4dd60898ad7cc3a718d6dc8c037eef53b6e Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Thu, 16 Dec 2021 12:03:39 +0100 Subject: [PATCH 5/5] Reorder arguments in assert_eventually_succeed --- tools/integration-test/src/chain/driver.rs | 4 ++-- tools/integration-test/src/tests/supervisor.rs | 8 ++++---- tools/integration-test/src/util/retry.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/tools/integration-test/src/chain/driver.rs b/tools/integration-test/src/chain/driver.rs index ff9b208e51..8a312938b3 100644 --- a/tools/integration-test/src/chain/driver.rs +++ b/tools/integration-test/src/chain/driver.rs @@ -435,6 +435,8 @@ impl ChainDriver { ) -> Result<(), Error> { assert_eventually_succeed( "wallet reach expected amount", + 20, + Duration::from_secs(1), || { let amount = self.query_balance(&user.address, denom)?; @@ -448,8 +450,6 @@ impl ChainDriver { )) } }, - 20, - Duration::from_secs(1), )?; Ok(()) diff --git a/tools/integration-test/src/tests/supervisor.rs b/tools/integration-test/src/tests/supervisor.rs index a1a3a5183b..026c4f4816 100644 --- a/tools/integration-test/src/tests/supervisor.rs +++ b/tools/integration-test/src/tests/supervisor.rs @@ -50,6 +50,8 @@ impl BinaryChainTest for SupervisorTest { let connection_id_a = assert_eventually_succeed( "connection should eventually open", + 20, + Duration::from_secs(1), || { let connection_end_b = query_connection_end(&chains.handle_b, &connection_id_b.as_ref())?; @@ -79,8 +81,6 @@ impl BinaryChainTest for SupervisorTest { Ok(connection_id_a) }, - 20, - Duration::from_secs(1), )?; let port_a = tagged_transfer_port(); @@ -99,6 +99,8 @@ impl BinaryChainTest for SupervisorTest { assert_eventually_succeed( "channel should eventually open", + 20, + Duration::from_secs(1), || { let channel_end_b = query_channel_end(&chains.handle_b, &channel_id_b.as_ref(), &port_b.as_ref())?; @@ -123,8 +125,6 @@ impl BinaryChainTest for SupervisorTest { Ok(channel_id_a) }, - 20, - Duration::from_secs(1), )?; Ok(()) diff --git a/tools/integration-test/src/util/retry.rs b/tools/integration-test/src/util/retry.rs index 6a3d3cf6e1..d52569d813 100644 --- a/tools/integration-test/src/util/retry.rs +++ b/tools/integration-test/src/util/retry.rs @@ -17,9 +17,9 @@ use crate::error::Error; */ pub fn assert_eventually_succeed( task_name: &str, - task: impl Fn() -> Result, attempts: u16, interval: Duration, + task: impl Fn() -> Result, ) -> Result { sleep(interval); for _ in 0..attempts {