From 1f498e3cd40b6e3cc5602f4aa5ffd2560581edf3 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 25 May 2021 10:05:37 +0200 Subject: [PATCH] Promote `start-multi` to `start` (#971) * Promote `start-multi` command to `start` * Cleanup ForeignClient::restore * Remove unused `relayer::relay` module * Remove unused `Link::relay` method * Update e2e test suite * Update changelog --- CHANGELOG.md | 9 + e2e/e2e/relayer.py | 2 +- relayer-cli/src/commands.rs | 10 +- relayer-cli/src/commands/misbehaviour.rs | 2 +- relayer-cli/src/commands/start.rs | 70 +------- relayer-cli/src/commands/start_multi.rs | 20 --- relayer-cli/src/commands/tx/client.rs | 2 +- relayer/src/channel.rs | 39 +++-- relayer/src/connection.rs | 35 ++-- relayer/src/foreign_client.rs | 25 +-- relayer/src/lib.rs | 1 - relayer/src/link.rs | 209 +++++++++-------------- relayer/src/relay.rs | 71 -------- relayer/src/worker/client.rs | 2 +- 14 files changed, 163 insertions(+), 334 deletions(-) delete mode 100644 relayer-cli/src/commands/start_multi.rs delete mode 100644 relayer/src/relay.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a8a9e23db..2a9773383c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog +## Unreleased + +### BREAKING CHANGES + +- [ibc-relayer-cli] + - Promote `start-multi` command to `start` ([#911]) + +[#911]: https://github.com/informalsystems/ibc-rs/issues/911 + ## v0.3.2 *May 21st, 2021* diff --git a/e2e/e2e/relayer.py b/e2e/e2e/relayer.py index cadfcb0d4d..1f72d09c11 100644 --- a/e2e/e2e/relayer.py +++ b/e2e/e2e/relayer.py @@ -6,6 +6,6 @@ def start(c: Config) -> Popen: - full_cmd = f'{c.relayer_cmd} -c {c.config_file} -j start-multi'.split(' ') + full_cmd = f'{c.relayer_cmd} -c {c.config_file} -j start'.split(' ') l.debug(' '.join(full_cmd)) return Popen(full_cmd) diff --git a/relayer-cli/src/commands.rs b/relayer-cli/src/commands.rs index e1eed0af26..b8a3b72f08 100644 --- a/relayer-cli/src/commands.rs +++ b/relayer-cli/src/commands.rs @@ -17,8 +17,7 @@ use crate::DEFAULT_CONFIG_PATH; use self::{ create::CreateCmds, keys::KeysCmd, listen::ListenCmd, query::QueryCmd, start::StartCmd, - start_multi::StartMultiCmd, tx::TxCmd, update::UpdateCmds, upgrade::UpgradeCmds, - version::VersionCmd, + tx::TxCmd, update::UpdateCmds, upgrade::UpgradeCmds, version::VersionCmd, }; use crate::commands::misbehaviour::MisbehaviourCmd; @@ -29,7 +28,6 @@ mod listen; mod misbehaviour; mod query; mod start; -mod start_multi; mod tx; mod update; mod upgrade; @@ -70,13 +68,9 @@ pub enum CliCmd { Upgrade(UpgradeCmds), /// The `start` subcommand - #[options(help = "Start the relayer")] - Start(StartCmd), - - /// The `start-multi` subcommand #[options(help = "Start the relayer in multi-chain mode. \ Handles packet relaying across all open channels between all chains in the config.")] - StartMulti(StartMultiCmd), + Start(StartCmd), /// The `query` subcommand #[options(help = "Query objects from the chain")] diff --git a/relayer-cli/src/commands/misbehaviour.rs b/relayer-cli/src/commands/misbehaviour.rs index e29217e00f..c1573514bc 100644 --- a/relayer-cli/src/commands/misbehaviour.rs +++ b/relayer-cli/src/commands/misbehaviour.rs @@ -117,7 +117,7 @@ fn misbehaviour_handling( ) })?; - let client = ForeignClient::restore(&client_id, chain.clone(), counterparty_chain.clone()); + let client = ForeignClient::restore(client_id, chain, counterparty_chain); let result = client.detect_misbehaviour_and_submit_evidence(update); if let MisbehaviourResults::EvidenceSubmitted(events) = result { info!("evidence submission result {:?}", events); diff --git a/relayer-cli/src/commands/start.rs b/relayer-cli/src/commands/start.rs index e07de29e25..8b2a0bdca8 100644 --- a/relayer-cli/src/commands/start.rs +++ b/relayer-cli/src/commands/start.rs @@ -1,78 +1,20 @@ use abscissa_core::{Command, Options, Runnable}; -use ibc::ics24_host::identifier::{ChainId, ChannelId, PortId}; -use ibc_relayer::link::LinkParameters; -use ibc_relayer::relay::{channel_relay, relay_on_new_link}; +use ibc_relayer::supervisor::Supervisor; -use crate::cli_utils::ChainHandlePair; use crate::conclude::Output; use crate::prelude::*; #[derive(Clone, Command, Debug, Options)] -pub struct StartCmd { - #[options(free, required, help = "identifier of the source chain")] - src_chain_id: ChainId, - - #[options(free, required, help = "identifier of the destination chain")] - dst_chain_id: ChainId, - - #[options(help = "identifier of the source port", short = "p")] - src_port_id: Option, - - #[options(help = "identifier of the source channel", short = "c")] - src_channel_id: Option, -} +pub struct StartCmd {} impl Runnable for StartCmd { fn run(&self) { let config = app_config(); - - let chains = match ChainHandlePair::spawn(&config, &self.src_chain_id, &self.dst_chain_id) { - Ok(chains) => chains, - Err(e) => return Output::error(format!("{}", e)).exit(), - }; - - match (&self.src_port_id, &self.src_channel_id) { - (Some(src_port_id), Some(src_channel_id)) => { - match channel_relay( - chains.src, - chains.dst, - LinkParameters { - src_port_id: src_port_id.clone(), - src_channel_id: src_channel_id.clone(), - }, - ) { - Ok(()) => Output::success(()).exit(), - Err(e) => Output::error(e.to_string()).exit(), - } - } - (None, None) => { - // Relay for a single channel, first on the first connection between the two chains - let relay_path = config.first_matching_path(&self.src_chain_id, &self.dst_chain_id); - - match relay_path { - Some((connection, path)) => { - info!("Start relayer on {:?}", self); - - match relay_on_new_link( - chains.src, - chains.dst, - connection.delay, - path.ordering, - path.clone(), - ) { - Ok(()) => Output::success(()).exit(), - Err(e) => Output::error(e.to_string()).exit(), - } - } - None => Output::error(format!("No paths configured for {:?}", self)).exit(), - } - } - _ => Output::error(format!( - "Invalid parameters, either both port and channel must be specified or none: {:?}", - self - )) - .exit(), + let supervisor = Supervisor::spawn(config.clone()).expect("failed to spawn supervisor"); + match supervisor.run() { + Ok(()) => Output::success_msg("done").exit(), + Err(e) => Output::error(e).exit(), } } } diff --git a/relayer-cli/src/commands/start_multi.rs b/relayer-cli/src/commands/start_multi.rs deleted file mode 100644 index b7259eec9b..0000000000 --- a/relayer-cli/src/commands/start_multi.rs +++ /dev/null @@ -1,20 +0,0 @@ -use abscissa_core::{Command, Options, Runnable}; - -use ibc_relayer::supervisor::Supervisor; - -use crate::conclude::Output; -use crate::prelude::*; - -#[derive(Clone, Command, Debug, Options)] -pub struct StartMultiCmd {} - -impl Runnable for StartMultiCmd { - fn run(&self) { - let config = app_config(); - let supervisor = Supervisor::spawn(config.clone()).expect("failed to spawn supervisor"); - match supervisor.run() { - Ok(()) => Output::success_msg("done").exit(), - Err(e) => Output::error(e).exit(), - } - } -} diff --git a/relayer-cli/src/commands/tx/client.rs b/relayer-cli/src/commands/tx/client.rs index 8d7b5b996c..a3f82fdeb5 100644 --- a/relayer-cli/src/commands/tx/client.rs +++ b/relayer-cli/src/commands/tx/client.rs @@ -34,7 +34,7 @@ impl Runnable for TxCreateClientCmd { Err(e) => return Output::error(format!("{}", e)).exit(), }; - let client = ForeignClient::restore(&ClientId::default(), chains.dst, chains.src); + let client = ForeignClient::restore(ClientId::default(), chains.dst, chains.src); // Trigger client creation via the "build" interface, so that we obtain the resulting event let res: Result = client diff --git a/relayer/src/channel.rs b/relayer/src/channel.rs index 93b1b3d259..3646e41b94 100644 --- a/relayer/src/channel.rs +++ b/relayer/src/channel.rs @@ -1,3 +1,7 @@ +#![allow(clippy::borrowed_box)] + +use std::time::Duration; + use prost_types::Any; use serde::Serialize; use thiserror::Error; @@ -19,8 +23,8 @@ use crate::chain::handle::ChainHandle; use crate::connection::Connection; use crate::error::Error; use crate::foreign_client::{ForeignClient, ForeignClientError}; -use crate::relay::MAX_ITER; -use std::time::Duration; + +const MAX_RETRIES: usize = 5; #[derive(Debug, Error)] pub enum ChannelError { @@ -137,12 +141,12 @@ impl Channel { Ok(channel) } - pub fn src_chain(&self) -> Box { - self.a_side.chain.clone() + pub fn src_chain(&self) -> &Box { + &self.a_side.chain } - pub fn dst_chain(&self) -> Box { - self.b_side.chain.clone() + pub fn dst_chain(&self) -> &Box { + &self.b_side.chain } pub fn src_client_id(&self) -> &ClientId { @@ -191,13 +195,13 @@ impl Channel { fn handshake(&mut self) -> Result<(), ChannelError> { let done = '🥳'; - let a_chain = self.src_chain(); - let b_chain = self.dst_chain(); + let a_chain = self.src_chain().clone(); + let b_chain = self.dst_chain().clone(); // Try chanOpenInit on a_chain let mut counter = 0; let mut init_success = false; - while counter < MAX_ITER { + while counter < MAX_RETRIES { counter += 1; match self.flipped().build_chan_open_init_and_send() { Err(e) => { @@ -217,14 +221,14 @@ impl Channel { if !init_success { return Err(ChannelError::Failed(format!( "Failed to finish channel open init in {} iterations for {:?}", - MAX_ITER, self + MAX_RETRIES, self ))); }; // Try chanOpenTry on b_chain counter = 0; let mut try_success = false; - while counter < MAX_ITER { + while counter < MAX_RETRIES { counter += 1; match self.build_chan_open_try_and_send() { Err(e) => { @@ -243,12 +247,12 @@ impl Channel { if !try_success { return Err(ChannelError::Failed(format!( "Failed to finish channel open try in {} iterations for {:?}", - MAX_ITER, self + MAX_RETRIES, self ))); }; counter = 0; - while counter < MAX_ITER { + while counter < MAX_RETRIES { counter += 1; // Continue loop if query error @@ -298,13 +302,16 @@ impl Channel { Err(ChannelError::Failed(format!( "Failed to finish channel handshake in {} iterations for {:?}", - MAX_ITER, self + MAX_RETRIES, self ))) } pub fn build_update_client_on_dst(&self, height: Height) -> Result, ChannelError> { - let client = - ForeignClient::restore(self.dst_client_id(), self.dst_chain(), self.src_chain()); + let client = ForeignClient::restore( + self.dst_client_id().clone(), + self.dst_chain().clone(), + self.src_chain().clone(), + ); client.build_update_client(height).map_err(|e| { ChannelError::ClientOperation(self.dst_client_id().clone(), self.dst_chain().id(), e) diff --git a/relayer/src/connection.rs b/relayer/src/connection.rs index 716916ab86..b9fb549418 100644 --- a/relayer/src/connection.rs +++ b/relayer/src/connection.rs @@ -22,11 +22,12 @@ use ibc::Height as ICSHeight; use crate::chain::handle::ChainHandle; use crate::error::Error; use crate::foreign_client::{ForeignClient, ForeignClientError}; -use crate::relay::MAX_ITER; /// Maximum value allowed for packet delay on any new connection that the relayer establishes. pub const MAX_PACKET_DELAY: Duration = Duration::from_secs(120); +const MAX_RETRIES: usize = 5; + #[derive(Debug, Error)] pub enum ConnectionError { #[error("failed with underlying cause: {0}")] @@ -252,7 +253,7 @@ impl Connection { // Try connOpenInit on a_chain let mut counter = 0; - while counter < MAX_ITER { + while counter < MAX_RETRIES { counter += 1; match self.flipped().build_conn_init_and_send() { Err(e) => { @@ -269,7 +270,7 @@ impl Connection { // Try connOpenTry on b_chain counter = 0; - while counter < MAX_ITER { + while counter < MAX_RETRIES { counter += 1; match self.build_conn_try_and_send() { Err(e) => { @@ -285,7 +286,7 @@ impl Connection { } counter = 0; - while counter < MAX_ITER { + while counter < MAX_RETRIES { counter += 1; // Continue loop if query error @@ -342,7 +343,7 @@ impl Connection { Err(ConnectionError::Failed(format!( "Failed to finish connection handshake in {:?} iterations", - MAX_ITER + MAX_RETRIES ))) } @@ -411,18 +412,14 @@ impl Connection { } pub fn build_update_client_on_src(&self, height: Height) -> Result, ConnectionError> { - let client = - ForeignClient::restore(self.src_client_id(), self.src_chain(), self.dst_chain()); - + let client = self.restore_src_client(); client.build_update_client(height).map_err(|e| { ConnectionError::ClientOperation(self.src_client_id().clone(), self.src_chain().id(), e) }) } pub fn build_update_client_on_dst(&self, height: Height) -> Result, ConnectionError> { - let client = - ForeignClient::restore(self.dst_client_id(), self.dst_chain(), self.src_chain()); - + let client = self.restore_dst_client(); client.build_update_client(height).map_err(|e| { ConnectionError::ClientOperation(self.dst_client_id().clone(), self.dst_chain().id(), e) }) @@ -800,6 +797,22 @@ impl Connection { _ => panic!("internal error"), } } + + fn restore_src_client(&self) -> ForeignClient { + ForeignClient::restore( + self.src_client_id().clone(), + self.src_chain().clone(), + self.dst_chain().clone(), + ) + } + + fn restore_dst_client(&self) -> ForeignClient { + ForeignClient::restore( + self.dst_client_id().clone(), + self.dst_chain().clone(), + self.src_chain().clone(), + ) + } } fn extract_connection_id(event: &IbcEvent) -> Result<&ConnectionId, ConnectionError> { diff --git a/relayer/src/foreign_client.rs b/relayer/src/foreign_client.rs index edd4fdf3e3..6954b90f46 100644 --- a/relayer/src/foreign_client.rs +++ b/relayer/src/foreign_client.rs @@ -26,10 +26,11 @@ use ibc::Height; use ibc_proto::ibc::core::client::v1::QueryConsensusStatesRequest; use crate::chain::handle::ChainHandle; -use crate::relay::MAX_ITER; const MAX_MISBEHAVIOUR_CHECK_DURATION: Duration = Duration::from_secs(120); +const MAX_RETRIES: usize = 5; + #[derive(Debug, Error)] pub enum ForeignClientError { #[error("error raised while creating client: {0}")] @@ -117,14 +118,14 @@ impl ForeignClient { } pub fn restore( - client_id: &ClientId, + id: ClientId, dst_chain: Box, src_chain: Box, ) -> ForeignClient { ForeignClient { - id: client_id.clone(), - dst_chain: dst_chain.clone(), - src_chain: src_chain.clone(), + id, + dst_chain, + src_chain, } } @@ -151,7 +152,7 @@ impl ForeignClient { } else { // TODO: Any additional checks? Ok(ForeignClient::restore( - client_id, + client_id.clone(), host_chain.clone(), expected_target_chain, )) @@ -564,7 +565,7 @@ impl ForeignClient { }; let mut events = vec![]; - for i in 0..MAX_ITER { + for i in 0..MAX_RETRIES { thread::sleep(Duration::from_millis(100)); let result = self .dst_chain @@ -583,7 +584,7 @@ impl ForeignClient { self, e, i + 1, - MAX_ITER + MAX_RETRIES ); continue; } @@ -948,10 +949,10 @@ mod test { let (a_chain, _) = ChainRuntime::::spawn(a_cfg, rt.clone()).unwrap(); let (b_chain, _) = ChainRuntime::::spawn(b_cfg, rt).unwrap(); let a_client = - ForeignClient::restore(&Default::default(), a_chain.clone(), b_chain.clone()); + ForeignClient::restore(ClientId::default(), a_chain.clone(), b_chain.clone()); let b_client = - ForeignClient::restore(&Default::default(), b_chain.clone(), a_chain.clone()); + ForeignClient::restore(ClientId::default(), b_chain.clone(), a_chain.clone()); // Create the client on chain a let res = a_client.build_create_client_and_send(); @@ -985,10 +986,10 @@ mod test { let rt = Arc::new(TokioRuntime::new().unwrap()); let (a_chain, _) = ChainRuntime::::spawn(a_cfg, rt.clone()).unwrap(); let (b_chain, _) = ChainRuntime::::spawn(b_cfg, rt).unwrap(); - let mut a_client = ForeignClient::restore(&a_client_id, a_chain.clone(), b_chain.clone()); + let mut a_client = ForeignClient::restore(a_client_id, a_chain.clone(), b_chain.clone()); let mut b_client = - ForeignClient::restore(&Default::default(), b_chain.clone(), a_chain.clone()); + ForeignClient::restore(ClientId::default(), b_chain.clone(), a_chain.clone()); // This action should fail because no client exists (yet) let res = a_client.build_latest_update_client_and_send(); diff --git a/relayer/src/lib.rs b/relayer/src/lib.rs index 687695c5f5..34466c1f24 100644 --- a/relayer/src/lib.rs +++ b/relayer/src/lib.rs @@ -26,7 +26,6 @@ pub mod link; pub mod macros; pub mod object; pub mod registry; -pub mod relay; pub mod supervisor; pub mod transfer; pub mod upgrade_chain; diff --git a/relayer/src/link.rs b/relayer/src/link.rs index 456b829be4..020387b0d9 100644 --- a/relayer/src/link.rs +++ b/relayer/src/link.rs @@ -1,7 +1,9 @@ +#![allow(clippy::borrowed_box)] + use std::collections::HashMap; use std::fmt; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Instant; use prost_types::Any; use thiserror::Error; @@ -29,21 +31,21 @@ use ibc::{ tx_msg::Msg, Height, }; + use ibc_proto::ibc::core::channel::v1::{ QueryNextSequenceReceiveRequest, QueryPacketAcknowledgementsRequest, QueryPacketCommitmentsRequest, QueryUnreceivedAcksRequest, QueryUnreceivedPacketsRequest, }; +use crate::chain::handle::ChainHandle; +use crate::channel::{Channel, ChannelError, ChannelSide}; use crate::connection::ConnectionError; use crate::error::Error; use crate::event::monitor::EventBatch; use crate::foreign_client::{ForeignClient, ForeignClientError}; -use crate::relay::MAX_ITER; -use crate::{chain::handle::ChainHandle, transfer::PacketError}; -use crate::{ - channel::{Channel, ChannelError, ChannelSide}, - event::monitor::UnwrapOrClone, -}; +use crate::transfer::PacketError; + +const MAX_RETRIES: usize = 5; #[derive(Debug, Error)] pub enum LinkError { @@ -190,8 +192,6 @@ impl fmt::Display for OperationalData { } pub struct RelayPath { - src_chain: Box, - dst_chain: Box, channel: Channel, clear_packets: bool, @@ -205,27 +205,21 @@ pub struct RelayPath { } impl RelayPath { - pub fn new( - src_chain: Box, - dst_chain: Box, - channel: Channel, - ) -> Self { + pub fn new(channel: Channel) -> Self { Self { - src_chain, - dst_chain, channel, clear_packets: true, - src_operational_data: Default::default(), - dst_operational_data: Default::default(), + src_operational_data: vec![], + dst_operational_data: vec![], } } - pub fn src_chain(&self) -> Box { - self.src_chain.clone() + pub fn src_chain(&self) -> &Box { + &self.channel.src_chain() } - pub fn dst_chain(&self) -> Box { - self.dst_chain.clone() + pub fn dst_chain(&self) -> &Box { + &self.channel.dst_chain() } pub fn src_client_id(&self) -> &ClientId { @@ -279,29 +273,29 @@ impl RelayPath { } fn src_signer(&self) -> Result { - self.src_chain.get_signer().map_err(|e| { + self.src_chain().get_signer().map_err(|e| { LinkError::Failed(format!( "could not retrieve signer from src chain {} with error: {}", - self.src_chain.id(), + self.src_chain().id(), e )) }) } fn dst_signer(&self) -> Result { - self.dst_chain.get_signer().map_err(|e| { + self.dst_chain().get_signer().map_err(|e| { LinkError::Failed(format!( "could not retrieve signer from dst chain {} with error: {}", - self.dst_chain.id(), + self.dst_chain().id(), e )) }) } pub fn dst_latest_height(&self) -> Result { - self.dst_chain + self.dst_chain() .query_latest_height() - .map_err(|e| LinkError::QueryError(self.dst_chain.id(), e)) + .map_err(|e| LinkError::QueryError(self.dst_chain().id(), e)) } fn unordered_channel(&self) -> bool { @@ -313,16 +307,14 @@ impl RelayPath { } pub fn build_update_client_on_dst(&self, height: Height) -> Result, LinkError> { - let client = - ForeignClient::restore(self.dst_client_id(), self.dst_chain(), self.src_chain()); + let client = self.restore_dst_client(); client .build_update_client(height) .map_err(LinkError::ClientError) } pub fn build_update_client_on_src(&self, height: Height) -> Result, LinkError> { - let client = - ForeignClient::restore(self.src_client_id(), self.src_chain(), self.dst_chain()); + let client = self.restore_src_client(); client .build_update_client(height) .map_err(LinkError::ClientError) @@ -388,7 +380,7 @@ impl RelayPath { fn relay_pending_packets(&mut self, height: Height) -> Result<(), LinkError> { info!("[{}] clearing old packets", self); - for _i in 0..MAX_ITER { + for _ in 0..MAX_RETRIES { if self .build_recv_packet_and_timeout_msgs(Some(height)) .is_ok() @@ -479,7 +471,7 @@ impl RelayPath { let mut dst_od = OperationalData::new(src_height, OperationalDataTarget::Destination); for event in input { - debug!("[{}] {} => {}", self, self.src_chain.id(), event); + debug!("[{}] {} => {}", self, self.src_chain().id(), event); let (dst_msg, src_msg) = match event { IbcEvent::CloseInitChannel(_) => ( Some(self.build_chan_close_confirm_from_event(&event)?), @@ -527,7 +519,7 @@ impl RelayPath { debug!( "[{}] {} <= {} from {}", self, - self.dst_chain.id(), + self.dst_chain().id(), msg.type_url, event ); @@ -545,7 +537,7 @@ impl RelayPath { debug!( "[{}] {} <= {} from {}", self, - self.src_chain.id(), + self.src_chain().id(), msg.type_url, event ); @@ -577,7 +569,7 @@ impl RelayPath { // We will operate on potentially different operational data if the initial one fails. let mut odata = initial_od; - for i in 0..MAX_ITER { + for i in 0..MAX_RETRIES { info!( "[{}] relay op. data to {}, proofs height {}, (delayed by: {:?}) [try {}/{}]", self, @@ -585,7 +577,7 @@ impl RelayPath { odata.proofs_height, odata.scheduled_time.elapsed(), i + 1, - MAX_ITER + MAX_RETRIES ); // Consume the operational data by attempting to send its messages @@ -702,8 +694,8 @@ impl RelayPath { } let target = match odata.target { - OperationalDataTarget::Source => &self.src_chain, - OperationalDataTarget::Destination => &self.dst_chain, + OperationalDataTarget::Source => self.src_chain(), + OperationalDataTarget::Destination => self.dst_chain(), }; let msgs = odata.assemble_msgs(self)?; @@ -741,17 +733,17 @@ impl RelayPath { } let mut dst_err_ev = None; - for i in 0..MAX_ITER { + for i in 0..MAX_RETRIES { let dst_update = self.build_update_client_on_dst(src_chain_height)?; info!( "[{}] sending updateClient to client hosted on dest. chain {} for height {} [try {}/{}]", self, self.dst_chain().id(), src_chain_height, - i + 1, MAX_ITER, + i + 1, MAX_RETRIES, ); - let dst_tx_events = self.dst_chain.send_msgs(dst_update)?; + let dst_tx_events = self.dst_chain().send_msgs(dst_update)?; info!( "[{}] result {}\n", self, @@ -770,7 +762,7 @@ impl RelayPath { Err(LinkError::ClientError(ForeignClientError::ClientUpdate( format!( "Failed to update client on destination {} with err: {}", - self.dst_chain.id(), + self.dst_chain().id(), dst_err_ev.unwrap() ), ))) @@ -787,16 +779,16 @@ impl RelayPath { } let mut src_err_ev = None; - for _i in 0..MAX_ITER { + for _ in 0..MAX_RETRIES { let src_update = self.build_update_client_on_src(dst_chain_height)?; info!( "[{}] sending updateClient to client hosted on src. chain {} for height {}", self, - self.src_chain.id(), + self.src_chain().id(), dst_chain_height, ); - let src_tx_events = self.src_chain.send_msgs(src_update)?; + let src_tx_events = self.src_chain().send_msgs(src_update)?; info!( "[{}] result {}\n", self, @@ -815,7 +807,7 @@ impl RelayPath { Err(LinkError::ClientError(ForeignClientError::ClientUpdate( format!( "Failed to update client on source {} with err: {}", - self.src_chain.id(), + self.src_chain().id(), src_err_ev.unwrap() ), ))) @@ -836,7 +828,7 @@ impl RelayPath { pagination: ibc_proto::cosmos::base::query::pagination::all(), }; let (packet_commitments, src_response_height) = - self.src_chain.query_packet_commitments(pc_request)?; + self.src_chain().query_packet_commitments(pc_request)?; let query_height = opt_query_height.unwrap_or(src_response_height); @@ -847,7 +839,7 @@ impl RelayPath { debug!( "[{}] packets that still have commitments on {}: {:?}", self, - self.src_chain.id(), + self.src_chain().id(), commit_sequences ); @@ -859,7 +851,7 @@ impl RelayPath { }; let sequences: Vec = self - .dst_chain + .dst_chain() .query_unreceived_packets(request)? .into_iter() .map(From::from) @@ -868,8 +860,8 @@ impl RelayPath { debug!( "[{}] recv packets to send out to {} of the ones with commitments on source {}: {:?}", self, - self.dst_chain.id(), - self.src_chain.id(), + self.dst_chain().id(), + self.src_chain().id(), sequences ); @@ -887,7 +879,7 @@ impl RelayPath { height: query_height, }); - events_result = self.src_chain.query_txs(query)?; + events_result = self.src_chain().query_txs(query)?; let mut packet_sequences = vec![]; for event in events_result.iter() { @@ -915,9 +907,9 @@ impl RelayPath { pagination: ibc_proto::cosmos::base::query::pagination::all(), }; let (acks_on_source, src_response_height) = self - .src_chain + .src_chain() .query_packet_acknowledgements(pc_request) - .map_err(|e| LinkError::QueryError(self.src_chain.id(), e))?; + .map_err(|e| LinkError::QueryError(self.src_chain().id(), e))?; let query_height = opt_query_height.unwrap_or(src_response_height); @@ -929,7 +921,7 @@ impl RelayPath { debug!( "[{}] packets that have acknowledgments on {} {:?}", self, - self.src_chain.id(), + self.src_chain().id(), acked_sequences ); @@ -940,17 +932,17 @@ impl RelayPath { }; let sequences: Vec = self - .dst_chain + .dst_chain() .query_unreceived_acknowledgement(request) - .map_err(|e| LinkError::QueryError(self.dst_chain.id(), e))? + .map_err(|e| LinkError::QueryError(self.dst_chain().id(), e))? .into_iter() .map(From::from) .collect(); debug!( "[{}] ack packets to send out to {} of the ones with acknowledgments on {}: {:?}", self, - self.dst_chain.id(), - self.src_chain.id(), + self.dst_chain().id(), + self.src_chain().id(), sequences ); @@ -959,7 +951,7 @@ impl RelayPath { } events_result = self - .src_chain + .src_chain() .query_txs(QueryTxRequest::Packet(QueryPacketEventDataRequest { event_id: IbcEventType::WriteAck, source_port_id: self.dst_port_id().clone(), @@ -969,7 +961,7 @@ impl RelayPath { sequences, height: query_height, })) - .map_err(|e| LinkError::QueryError(self.src_chain.id(), e))?; + .map_err(|e| LinkError::QueryError(self.src_chain().id(), e))?; let mut packet_sequences = vec![]; for event in events_result.iter() { @@ -1035,7 +1027,7 @@ impl RelayPath { fn build_recv_packet(&self, packet: &Packet, height: Height) -> Result { let (_, proofs) = self - .src_chain + .src_chain() .build_packet_proofs( PacketMsgType::Recv, &packet.source_port, @@ -1043,7 +1035,7 @@ impl RelayPath { packet.sequence, height, ) - .map_err(|e| LinkError::PacketProofsConstructor(self.src_chain.id(), e))?; + .map_err(|e| LinkError::PacketProofsConstructor(self.src_chain().id(), e))?; let msg = MsgRecvPacket::new(packet.clone(), proofs.clone(), self.dst_signer()?); @@ -1074,7 +1066,7 @@ impl RelayPath { } let (_, proofs) = self - .src_chain + .src_chain() .build_packet_proofs( PacketMsgType::Ack, &packet.destination_port, @@ -1082,7 +1074,7 @@ impl RelayPath { packet.sequence, event.height, ) - .map_err(|e| LinkError::PacketProofsConstructor(self.src_chain.id(), e))?; + .map_err(|e| LinkError::PacketProofsConstructor(self.src_chain().id(), e))?; let msg = MsgAcknowledgement::new( packet, @@ -1130,7 +1122,7 @@ impl RelayPath { }; let (_, proofs) = self - .dst_chain + .dst_chain() .build_packet_proofs( packet_type, &packet.destination_port, @@ -1138,7 +1130,7 @@ impl RelayPath { next_sequence_received, height, ) - .map_err(|e| LinkError::PacketProofsConstructor(self.dst_chain.id(), e))?; + .map_err(|e| LinkError::PacketProofsConstructor(self.dst_chain().id(), e))?; let msg = MsgTimeout::new( packet.clone(), @@ -1163,7 +1155,7 @@ impl RelayPath { height: Height, ) -> Result { let (_, proofs) = self - .dst_chain + .dst_chain() .build_packet_proofs( PacketMsgType::TimeoutOnClose, &packet.destination_port, @@ -1171,7 +1163,7 @@ impl RelayPath { packet.sequence, height, ) - .map_err(|e| LinkError::PacketProofsConstructor(self.dst_chain.id(), e))?; + .map_err(|e| LinkError::PacketProofsConstructor(self.dst_chain().id(), e))?; let msg = MsgTimeoutOnClose::new( packet.clone(), @@ -1438,11 +1430,27 @@ impl RelayPath { pub fn set_clear_packets(&mut self, clear_packets: bool) { self.clear_packets = clear_packets; } + + fn restore_src_client(&self) -> ForeignClient { + ForeignClient::restore( + self.src_client_id().clone(), + self.src_chain().clone(), + self.dst_chain().clone(), + ) + } + + fn restore_dst_client(&self) -> ForeignClient { + ForeignClient::restore( + self.dst_client_id().clone(), + self.dst_chain().clone(), + self.src_chain().clone(), + ) + } } impl fmt::Display for RelayPath { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{} -> {}", self.src_chain.id(), self.dst_chain.id()) + write!(f, "{} -> {}", self.src_chain().id(), self.dst_chain().id()) } } @@ -1459,63 +1467,10 @@ pub struct Link { impl Link { pub fn new(channel: Channel) -> Self { - let a_chain = channel.src_chain(); - let b_chain = channel.dst_chain(); let flipped = channel.flipped(); - Self { - a_to_b: RelayPath::new(a_chain.clone(), b_chain.clone(), channel), - b_to_a: RelayPath::new(b_chain, a_chain, flipped), - } - } - - pub fn relay(&mut self) -> Result<(), LinkError> { - info!( - "relaying packets on path {} <-> {} with delay of {:?}", - self.a_to_b.src_chain().id(), - self.a_to_b.dst_chain().id(), - self.a_to_b.channel.connection_delay - ); - - let events_a = self.a_to_b.src_chain().subscribe()?; - let events_b = self.b_to_a.src_chain().subscribe()?; - - loop { - if self.is_closed()? { - warn!("channel is closed, exiting"); - return Ok(()); - } - - // Input new events to the relay path, and schedule any batch associated with them - if let Ok(batch) = events_a.try_recv() { - let batch = batch.unwrap_or_clone(); - match batch { - Ok(batch) => self.a_to_b.update_schedule(batch)?, - Err(e) => { - dbg!(e); - } - } - } - - // Refresh the scheduled batches and execute any outstanding ones. - self.a_to_b.refresh_schedule()?; - self.a_to_b.execute_schedule()?; - - if let Ok(batch) = events_b.try_recv() { - let batch = batch.unwrap_or_clone(); - match batch { - Ok(batch) => self.b_to_a.update_schedule(batch)?, - Err(e) => { - dbg!(e); - } - } - } - - self.b_to_a.refresh_schedule()?; - self.b_to_a.execute_schedule()?; - - // TODO - select over the two subscriptions - thread::sleep(Duration::from_millis(100)) + a_to_b: RelayPath::new(channel), + b_to_a: RelayPath::new(flipped), } } diff --git a/relayer/src/relay.rs b/relayer/src/relay.rs deleted file mode 100644 index ac2ca17c75..0000000000 --- a/relayer/src/relay.rs +++ /dev/null @@ -1,71 +0,0 @@ -use std::time::Duration; - -use anomaly::BoxError; -use tracing::info; - -use ibc::ics04_channel::channel::Order; - -use crate::chain::handle::ChainHandle; -use crate::channel::Channel; -use crate::config::RelayPath; -use crate::connection::Connection; -use crate::foreign_client::ForeignClient; -use crate::link::{Link, LinkParameters}; - -pub(crate) const MAX_ITER: usize = 10; - -/// Used by the `hermes start ibc-0 ibc-1` -pub fn relay_on_new_link( - a_chain_handle: Box, - b_chain_handle: Box, - delay: Duration, - ordering: Order, - path: RelayPath, -) -> Result<(), BoxError> { - // Setup the clients, connection and channel - let channel = connect_with_new_channel(a_chain_handle, b_chain_handle, delay, ordering, path)?; - - let mut link = Link::new(channel); - link.relay()?; - - Ok(()) -} - -/// Relays packets over a specified channel -/// Used by the `hermes start ibc-0 ibc-1 --src-port-id transfer --src-channel-id channel-0` -pub fn channel_relay( - a_chain: Box, - b_chain: Box, - opts: LinkParameters, -) -> Result<(), BoxError> { - let mut link = Link::new_from_opts(a_chain, b_chain, opts)?; - Ok(link.relay()?) -} - -/// Connects two ports of two chains creating new clients, connection and channel -/// Used by the `hermes channel handshake ibc-0 ibc-1` -pub fn connect_with_new_channel( - a_chain_handle: Box, - b_chain_handle: Box, - delay: Duration, - ordering: Order, - path: RelayPath, -) -> Result { - info!("\nChannel Relay Loop\n"); - - // Instantiate the foreign client on the two chains - let client_on_a = ForeignClient::new(a_chain_handle.clone(), b_chain_handle.clone())?; - let client_on_b = ForeignClient::new(b_chain_handle.clone(), a_chain_handle.clone())?; - - // Setup the connection between the two chains - let connection = Connection::new(client_on_a, client_on_b, delay)?; - - // Setup the channel over the connection - Ok(Channel::new( - connection, - ordering, - path.a_port, - path.b_port, - None, - )?) -} diff --git a/relayer/src/worker/client.rs b/relayer/src/worker/client.rs index 9e5c848ac6..41f52046b9 100644 --- a/relayer/src/worker/client.rs +++ b/relayer/src/worker/client.rs @@ -32,7 +32,7 @@ impl ClientWorker { /// Run the event loop for events associated with a [`Client`]. pub fn run(self) -> Result<(), BoxError> { let mut client = ForeignClient::restore( - &self.client.dst_client_id, + self.client.dst_client_id.clone(), self.chains.a.clone(), self.chains.b.clone(), );