diff --git a/bridges/relays/ethereum-client/src/client.rs b/bridges/relays/ethereum-client/src/client.rs index 0042b13c6ef18..b5fe3ab6bf179 100644 --- a/bridges/relays/ethereum-client/src/client.rs +++ b/bridges/relays/ethereum-client/src/client.rs @@ -28,22 +28,32 @@ use jsonrpsee::Client as RpcClient; /// The client used to interact with an Ethereum node through RPC. #[derive(Clone)] pub struct Client { + params: ConnectionParams, client: RpcClient, } impl Client { /// Create a new Ethereum RPC Client. pub fn new(params: ConnectionParams) -> Self { + Self { + client: Self::build_client(¶ms), + params, + } + } + + /// Build client to use in connection. + fn build_client(params: &ConnectionParams) -> RpcClient { let uri = format!("http://{}:{}", params.host, params.port); let transport = HttpTransportClient::new(&uri); let raw_client = RawClient::new(transport); - let client: RpcClient = raw_client.into(); + raw_client.into() + } - Self { client } + /// Reopen client connection. + pub fn reconnect(&mut self) { + self.client = Self::build_client(&self.params); } -} -impl Client { /// Estimate gas usage for the given call. pub async fn estimate_gas(&self, call_request: CallRequest) -> Result { Ok(Ethereum::estimate_gas(&self.client, call_request).await?) diff --git a/bridges/relays/ethereum/src/ethereum_exchange.rs b/bridges/relays/ethereum/src/ethereum_exchange.rs index 870de74f7835a..19d9f44cef0e9 100644 --- a/bridges/relays/ethereum/src/ethereum_exchange.rs +++ b/bridges/relays/ethereum/src/ethereum_exchange.rs @@ -39,7 +39,7 @@ use relay_rialto_client::{Rialto, SigningParams as RialtoSigningParams}; use relay_substrate_client::{ Chain as SubstrateChain, Client as SubstrateClient, ConnectionParams as SubstrateConnectionParams, }; -use relay_utils::{metrics::MetricsParams, HeaderId}; +use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient, HeaderId}; use rialto_runtime::exchange::EthereumTransactionInclusionProof; use std::{sync::Arc, time::Duration}; @@ -120,19 +120,28 @@ impl SourceTransaction for EthereumSourceTransaction { } /// Ethereum node as transactions proof source. +#[derive(Clone)] struct EthereumTransactionsSource { client: EthereumClient, } #[async_trait] -impl SourceClient for EthereumTransactionsSource { +impl RelayClient for EthereumTransactionsSource { type Error = RpcError; + async fn reconnect(&mut self) -> Result<(), RpcError> { + self.client.reconnect(); + Ok(()) + } +} + +#[async_trait] +impl SourceClient for EthereumTransactionsSource { async fn tick(&self) { async_std::task::sleep(ETHEREUM_TICK_INTERVAL).await; } - async fn block_by_hash(&self, hash: H256) -> Result { + async fn block_by_hash(&self, hash: H256) -> Result { self.client .header_by_hash_with_transactions(hash) .await @@ -140,7 +149,7 @@ impl SourceClient for EthereumTransactionsSource { .map_err(Into::into) } - async fn block_by_number(&self, number: u64) -> Result { + async fn block_by_number(&self, number: u64) -> Result { self.client .header_by_number_with_transactions(number) .await @@ -151,7 +160,7 @@ impl SourceClient for EthereumTransactionsSource { async fn transaction_block( &self, hash: &EthereumTransactionHash, - ) -> Result, Self::Error> { + ) -> Result, RpcError> { let eth_tx = match self.client.transaction_by_hash(*hash).await? { Some(eth_tx) => eth_tx, None => return Ok(None), @@ -173,7 +182,7 @@ impl SourceClient for EthereumTransactionsSource { &self, block: &EthereumSourceBlock, tx_index: usize, - ) -> Result { + ) -> Result { const TRANSACTION_HAS_RAW_FIELD_PROOF: &str = "RPC level checks that transactions from Ethereum\ node are having `raw` field; qed"; const BLOCK_HAS_HASH_FIELD_PROOF: &str = "RPC level checks that block has `hash` field; qed"; @@ -199,6 +208,7 @@ impl SourceClient for EthereumTransactionsSource { } /// Substrate node as transactions proof target. +#[derive(Clone)] struct SubstrateTransactionsTarget { client: SubstrateClient, sign_params: RialtoSigningParams, @@ -206,18 +216,25 @@ struct SubstrateTransactionsTarget { } #[async_trait] -impl TargetClient for SubstrateTransactionsTarget { +impl RelayClient for SubstrateTransactionsTarget { type Error = RpcError; + async fn reconnect(&mut self) -> Result<(), RpcError> { + Ok(self.client.reconnect().await?) + } +} + +#[async_trait] +impl TargetClient for SubstrateTransactionsTarget { async fn tick(&self) { async_std::task::sleep(Rialto::AVERAGE_BLOCK_INTERVAL).await; } - async fn is_header_known(&self, id: &EthereumHeaderId) -> Result { + async fn is_header_known(&self, id: &EthereumHeaderId) -> Result { self.client.ethereum_header_known(*id).await } - async fn is_header_finalized(&self, id: &EthereumHeaderId) -> Result { + async fn is_header_finalized(&self, id: &EthereumHeaderId) -> Result { // we check if header is finalized by simple comparison of the header number and // number of best finalized PoA header known to Substrate node. // @@ -230,11 +247,11 @@ impl TargetClient for SubstrateTransactionsTarget { Ok(id.0 <= best_finalized_ethereum_block.0) } - async fn best_finalized_header_id(&self) -> Result { + async fn best_finalized_header_id(&self) -> Result { self.client.best_ethereum_finalized_block().await } - async fn filter_transaction_proof(&self, proof: &EthereumTransactionInclusionProof) -> Result { + async fn filter_transaction_proof(&self, proof: &EthereumTransactionInclusionProof) -> Result { // let's try to parse transaction locally let (raw_tx, raw_tx_receipt) = &proof.proof[proof.index as usize]; let parse_result = rialto_runtime::exchange::EthTransaction::parse(raw_tx); @@ -253,7 +270,7 @@ impl TargetClient for SubstrateTransactionsTarget { self.client.verify_exchange_transaction_proof(proof.clone()).await } - async fn submit_transaction_proof(&self, proof: EthereumTransactionInclusionProof) -> Result<(), Self::Error> { + async fn submit_transaction_proof(&self, proof: EthereumTransactionInclusionProof) -> Result<(), RpcError> { let (sign_params, bridge_instance) = (self.sign_params.clone(), self.bridge_instance.clone()); self.client .submit_exchange_transaction_proof(sign_params, bridge_instance, proof) diff --git a/bridges/relays/ethereum/src/ethereum_sync_loop.rs b/bridges/relays/ethereum/src/ethereum_sync_loop.rs index ec0c95c70ca72..b84626d230c6d 100644 --- a/bridges/relays/ethereum/src/ethereum_sync_loop.rs +++ b/bridges/relays/ethereum/src/ethereum_sync_loop.rs @@ -37,7 +37,7 @@ use relay_rialto_client::{Rialto, SigningParams as RialtoSigningParams}; use relay_substrate_client::{ Chain as SubstrateChain, Client as SubstrateClient, ConnectionParams as SubstrateConnectionParams, }; -use relay_utils::metrics::MetricsParams; +use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient}; use std::fmt::Debug; use std::{collections::HashSet, sync::Arc, time::Duration}; @@ -105,6 +105,7 @@ impl HeadersSyncPipeline for EthereumHeadersSyncPipeline { pub type QueuedEthereumHeader = QueuedHeader; /// Ethereum client as headers source. +#[derive(Clone)] struct EthereumHeadersSource { /// Ethereum node client. client: EthereumClient, @@ -117,14 +118,22 @@ impl EthereumHeadersSource { } #[async_trait] -impl SourceClient for EthereumHeadersSource { +impl RelayClient for EthereumHeadersSource { type Error = RpcError; - async fn best_block_number(&self) -> Result { + async fn reconnect(&mut self) -> Result<(), RpcError> { + self.client.reconnect(); + Ok(()) + } +} + +#[async_trait] +impl SourceClient for EthereumHeadersSource { + async fn best_block_number(&self) -> Result { self.client.best_block_number().await.map_err(Into::into) } - async fn header_by_hash(&self, hash: HeaderHash) -> Result { + async fn header_by_hash(&self, hash: HeaderHash) -> Result { self.client .header_by_hash(hash) .await @@ -132,7 +141,7 @@ impl SourceClient for EthereumHeadersSource { .map_err(Into::into) } - async fn header_by_number(&self, number: u64) -> Result { + async fn header_by_number(&self, number: u64) -> Result { self.client .header_by_number(number) .await @@ -140,7 +149,7 @@ impl SourceClient for EthereumHeadersSource { .map_err(Into::into) } - async fn header_completion(&self, id: EthereumHeaderId) -> Result<(EthereumHeaderId, Option<()>), Self::Error> { + async fn header_completion(&self, id: EthereumHeaderId) -> Result<(EthereumHeaderId, Option<()>), RpcError> { Ok((id, None)) } @@ -148,13 +157,14 @@ impl SourceClient for EthereumHeadersSource { &self, id: EthereumHeaderId, header: QueuedEthereumHeader, - ) -> Result<(EthereumHeaderId, Vec), Self::Error> { + ) -> Result<(EthereumHeaderId, Vec), RpcError> { self.client .transaction_receipts(id, header.header().transactions.clone()) .await } } +#[derive(Clone)] struct SubstrateHeadersTarget { /// Substrate node client. client: SubstrateClient, @@ -183,21 +193,25 @@ impl SubstrateHeadersTarget { } #[async_trait] -impl TargetClient for SubstrateHeadersTarget { +impl RelayClient for SubstrateHeadersTarget { type Error = RpcError; - async fn best_header_id(&self) -> Result { + async fn reconnect(&mut self) -> Result<(), RpcError> { + Ok(self.client.reconnect().await?) + } +} + +#[async_trait] +impl TargetClient for SubstrateHeadersTarget { + async fn best_header_id(&self) -> Result { self.client.best_ethereum_block().await } - async fn is_known_header(&self, id: EthereumHeaderId) -> Result<(EthereumHeaderId, bool), Self::Error> { + async fn is_known_header(&self, id: EthereumHeaderId) -> Result<(EthereumHeaderId, bool), RpcError> { Ok((id, self.client.ethereum_header_known(id).await?)) } - async fn submit_headers( - &self, - headers: Vec, - ) -> SubmittedHeaders { + async fn submit_headers(&self, headers: Vec) -> SubmittedHeaders { let (sign_params, bridge_instance, sign_transactions) = ( self.sign_params.clone(), self.bridge_instance.clone(), @@ -208,16 +222,16 @@ impl TargetClient for SubstrateHeadersTarget { .await } - async fn incomplete_headers_ids(&self) -> Result, Self::Error> { + async fn incomplete_headers_ids(&self) -> Result, RpcError> { Ok(HashSet::new()) } #[allow(clippy::unit_arg)] - async fn complete_header(&self, id: EthereumHeaderId, _completion: ()) -> Result { + async fn complete_header(&self, id: EthereumHeaderId, _completion: ()) -> Result { Ok(id) } - async fn requires_extra(&self, header: QueuedEthereumHeader) -> Result<(EthereumHeaderId, bool), Self::Error> { + async fn requires_extra(&self, header: QueuedEthereumHeader) -> Result<(EthereumHeaderId, bool), RpcError> { // we can minimize number of receipts_check calls by checking header // logs bloom here, but it may give us false positives (when authorities // source is contract, we never need any logs) diff --git a/bridges/relays/ethereum/src/substrate_sync_loop.rs b/bridges/relays/ethereum/src/substrate_sync_loop.rs index 7ca9d797dfe55..298fe0e592545 100644 --- a/bridges/relays/ethereum/src/substrate_sync_loop.rs +++ b/bridges/relays/ethereum/src/substrate_sync_loop.rs @@ -35,7 +35,7 @@ use relay_substrate_client::{ headers_source::HeadersSource, Chain as SubstrateChain, Client as SubstrateClient, ConnectionParams as SubstrateConnectionParams, }; -use relay_utils::metrics::MetricsParams; +use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient}; use sp_runtime::Justification; use std::fmt::Debug; @@ -98,6 +98,7 @@ pub type QueuedRialtoHeader = QueuedHeader; type SubstrateHeadersSource = HeadersSource; /// Ethereum client as Substrate headers target. +#[derive(Clone)] struct EthereumHeadersTarget { /// Ethereum node client. client: EthereumClient, @@ -118,38 +119,42 @@ impl EthereumHeadersTarget { } #[async_trait] -impl TargetClient for EthereumHeadersTarget { +impl RelayClient for EthereumHeadersTarget { type Error = RpcError; - async fn best_header_id(&self) -> Result { + async fn reconnect(&mut self) -> Result<(), RpcError> { + self.client.reconnect(); + Ok(()) + } +} + +#[async_trait] +impl TargetClient for EthereumHeadersTarget { + async fn best_header_id(&self) -> Result { self.client.best_substrate_block(self.contract).await } - async fn is_known_header(&self, id: RialtoHeaderId) -> Result<(RialtoHeaderId, bool), Self::Error> { + async fn is_known_header(&self, id: RialtoHeaderId) -> Result<(RialtoHeaderId, bool), RpcError> { self.client.substrate_header_known(self.contract, id).await } - async fn submit_headers(&self, headers: Vec) -> SubmittedHeaders { + async fn submit_headers(&self, headers: Vec) -> SubmittedHeaders { self.client .submit_substrate_headers(self.sign_params.clone(), self.contract, headers) .await } - async fn incomplete_headers_ids(&self) -> Result, Self::Error> { + async fn incomplete_headers_ids(&self) -> Result, RpcError> { self.client.incomplete_substrate_headers(self.contract).await } - async fn complete_header( - &self, - id: RialtoHeaderId, - completion: Justification, - ) -> Result { + async fn complete_header(&self, id: RialtoHeaderId, completion: Justification) -> Result { self.client .complete_substrate_header(self.sign_params.clone(), self.contract, id, completion) .await } - async fn requires_extra(&self, header: QueuedRialtoHeader) -> Result<(RialtoHeaderId, bool), Self::Error> { + async fn requires_extra(&self, header: QueuedRialtoHeader) -> Result<(RialtoHeaderId, bool), RpcError> { Ok((header.header().id(), false)) } } diff --git a/bridges/relays/exchange-relay/src/exchange.rs b/bridges/relays/exchange-relay/src/exchange.rs index 0df2927d56c6e..cdf9c1a9f3588 100644 --- a/bridges/relays/exchange-relay/src/exchange.rs +++ b/bridges/relays/exchange-relay/src/exchange.rs @@ -17,7 +17,9 @@ //! Relaying proofs of exchange transaction. use async_trait::async_trait; -use relay_utils::{MaybeConnectionError, StringifiedMaybeConnectionError}; +use relay_utils::{ + relay_loop::Client as RelayClient, FailedClient, MaybeConnectionError, StringifiedMaybeConnectionError, +}; use std::{ fmt::{Debug, Display}, string::ToString, @@ -84,10 +86,7 @@ pub type HeaderId

= relay_utils::HeaderId, BlockNumberOf

>; /// Source client API. #[async_trait] -pub trait SourceClient { - /// Error type. - type Error: Debug + MaybeConnectionError; - +pub trait SourceClient: RelayClient { /// Sleep until exchange-related data is (probably) updated. async fn tick(&self); /// Get block by hash. @@ -104,10 +103,7 @@ pub trait SourceClient { /// Target client API. #[async_trait] -pub trait TargetClient { - /// Error type. - type Error: Debug + MaybeConnectionError; - +pub trait TargetClient: RelayClient { /// Sleep until exchange-related data is (probably) updated. async fn tick(&self); /// Returns `Ok(true)` if header is known to the target node. @@ -146,7 +142,7 @@ pub async fn relay_block_transactions( target_client: &impl TargetClient

, source_block: &P::Block, mut relayed_transactions: RelayedBlockTransactions, -) -> Result { +) -> Result { let transactions_to_process = source_block .transactions() .into_iter() @@ -156,16 +152,21 @@ pub async fn relay_block_transactions( let result = async { let source_tx_id = format!("{}/{}", source_block.id().1, source_tx_index); let source_tx_proof = - prepare_transaction_proof(source_client, &source_tx_id, source_block, source_tx_index).await?; + prepare_transaction_proof(source_client, &source_tx_id, source_block, source_tx_index) + .await + .map_err(|e| (FailedClient::Source, e))?; let needs_to_be_relayed = target_client .filter_transaction_proof(&source_tx_proof) .await .map_err(|err| { - StringifiedMaybeConnectionError::new( - err.is_connection_error(), - format!("Transaction filtering has failed with {:?}", err), + ( + FailedClient::Target, + StringifiedMaybeConnectionError::new( + err.is_connection_error(), + format!("Transaction filtering has failed with {:?}", err), + ), ) })?; @@ -176,6 +177,7 @@ pub async fn relay_block_transactions( relay_ready_transaction_proof(target_client, &source_tx_id, source_tx_proof) .await .map(|_| true) + .map_err(|e| (FailedClient::Target, e)) } .await; @@ -205,7 +207,7 @@ pub async fn relay_block_transactions( relayed_transactions.processed += 1; relayed_transactions.relayed += 1; } - Err(err) => { + Err((failed_client, err)) => { log::error!( target: "bridge", "Error relaying {} transaction {} proof to {} node: {}. {}", @@ -221,7 +223,7 @@ pub async fn relay_block_transactions( ); if err.is_connection_error() { - return Err(relayed_transactions); + return Err((failed_client, relayed_transactions)); } relayed_transactions.processed += 1; @@ -529,8 +531,9 @@ pub(crate) mod tests { #[derive(Debug, Clone, PartialEq)] pub struct TestTransactionProof(pub TestTransactionHash); + #[derive(Clone)] pub struct TestTransactionsSource { - pub on_tick: Box, + pub on_tick: Arc, pub data: Arc>, } @@ -543,7 +546,7 @@ pub(crate) mod tests { impl TestTransactionsSource { pub fn new(on_tick: Box) -> Self { Self { - on_tick, + on_tick: Arc::new(on_tick), data: Arc::new(Mutex::new(TestTransactionsSourceData { block: Ok(test_block()), transaction_block: Ok(Some((test_block_id(), 0))), @@ -554,9 +557,16 @@ pub(crate) mod tests { } #[async_trait] - impl SourceClient for TestTransactionsSource { + impl RelayClient for TestTransactionsSource { type Error = TestError; + async fn reconnect(&mut self) -> Result<(), TestError> { + Ok(()) + } + } + + #[async_trait] + impl SourceClient for TestTransactionsSource { async fn tick(&self) { (self.on_tick)(&mut *self.data.lock()) } @@ -584,8 +594,9 @@ pub(crate) mod tests { } } + #[derive(Clone)] pub struct TestTransactionsTarget { - pub on_tick: Box, + pub on_tick: Arc, pub data: Arc>, } @@ -600,7 +611,7 @@ pub(crate) mod tests { impl TestTransactionsTarget { pub fn new(on_tick: Box) -> Self { Self { - on_tick, + on_tick: Arc::new(on_tick), data: Arc::new(Mutex::new(TestTransactionsTargetData { is_header_known: Ok(true), is_header_finalized: Ok(true), @@ -613,9 +624,16 @@ pub(crate) mod tests { } #[async_trait] - impl TargetClient for TestTransactionsTarget { + impl RelayClient for TestTransactionsTarget { type Error = TestError; + async fn reconnect(&mut self) -> Result<(), TestError> { + Ok(()) + } + } + + #[async_trait] + impl TargetClient for TestTransactionsTarget { async fn tick(&self) { (self.on_tick)(&mut *self.data.lock()) } @@ -784,6 +802,7 @@ pub(crate) mod tests { ), pre_relayed, )) + .map_err(|(_, transactions)| transactions) } #[test] diff --git a/bridges/relays/exchange-relay/src/exchange_loop.rs b/bridges/relays/exchange-relay/src/exchange_loop.rs index 645672ba3f403..06f4d3f40ab01 100644 --- a/bridges/relays/exchange-relay/src/exchange_loop.rs +++ b/bridges/relays/exchange-relay/src/exchange_loop.rs @@ -27,13 +27,9 @@ use futures::{future::FutureExt, select}; use num_traits::One; use relay_utils::{ metrics::{start as metrics_start, GlobalMetrics, MetricsParams}, - retry_backoff, + retry_backoff, FailedClient, MaybeConnectionError, }; -use std::{future::Future, time::Duration}; - -/// Delay after connection-related error happened before we'll try -/// reconnection again. -const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10); +use std::future::Future; /// Transactions proofs relay state. #[derive(Debug)] @@ -43,7 +39,7 @@ pub struct TransactionProofsRelayState { } /// Transactions proofs relay storage. -pub trait TransactionProofsRelayStorage { +pub trait TransactionProofsRelayStorage: Clone { /// Associated block number. type BlockNumber; @@ -54,7 +50,7 @@ pub trait TransactionProofsRelayStorage { } /// In-memory storage for auto-relay loop. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct InMemoryStorage { best_processed_header_number: BlockNumber, } @@ -84,68 +80,101 @@ impl TransactionProofsRelayStorage for InMemoryStorag /// Run proofs synchronization. pub fn run( - mut storage: impl TransactionProofsRelayStorage>, + storage: impl TransactionProofsRelayStorage>, source_client: impl SourceClient

, target_client: impl TargetClient

, metrics_params: Option, exit_signal: impl Future, ) { - let mut local_pool = futures::executor::LocalPool::new(); - - local_pool.run_until(async move { - let mut retry_backoff = retry_backoff(); - let mut state = storage.state(); - let mut current_finalized_block = None; - - let mut metrics_global = GlobalMetrics::default(); - let mut metrics_exch = ExchangeLoopMetrics::default(); - let metrics_enabled = metrics_params.is_some(); - metrics_start( - format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME), - metrics_params, - &metrics_global, - &metrics_exch, - ); + let exit_signal = exit_signal.shared(); + let metrics_global = GlobalMetrics::default(); + let metrics_exch = ExchangeLoopMetrics::default(); + let metrics_enabled = metrics_params.is_some(); + metrics_start( + format!("{}_to_{}_Exchange", P::SOURCE_NAME, P::TARGET_NAME), + metrics_params, + &metrics_global, + &metrics_exch, + ); + + relay_utils::relay_loop::run( + relay_utils::relay_loop::RECONNECT_DELAY, + source_client, + target_client, + |source_client, target_client| { + run_until_connection_lost( + storage.clone(), + source_client, + target_client, + if metrics_enabled { + Some(metrics_global.clone()) + } else { + None + }, + if metrics_enabled { + Some(metrics_exch.clone()) + } else { + None + }, + exit_signal.clone(), + ) + }, + ); +} + +/// Run proofs synchronization. +async fn run_until_connection_lost( + mut storage: impl TransactionProofsRelayStorage>, + source_client: impl SourceClient

, + target_client: impl TargetClient

, + metrics_global: Option, + metrics_exch: Option, + exit_signal: impl Future, +) -> Result<(), FailedClient> { + let mut retry_backoff = retry_backoff(); + let mut state = storage.state(); + let mut current_finalized_block = None; - let exit_signal = exit_signal.fuse(); + let exit_signal = exit_signal.fuse(); - futures::pin_mut!(exit_signal); + futures::pin_mut!(exit_signal); - loop { - let iteration_result = run_loop_iteration( - &mut storage, - &source_client, - &target_client, - &mut state, - &mut current_finalized_block, - if metrics_enabled { Some(&mut metrics_exch) } else { None }, - ) - .await; + loop { + let iteration_result = run_loop_iteration( + &mut storage, + &source_client, + &target_client, + &mut state, + &mut current_finalized_block, + metrics_exch.as_ref(), + ) + .await; + + if let Some(ref metrics_global) = metrics_global { + metrics_global.update().await; + } - if metrics_enabled { - metrics_global.update(); + if let Err((is_connection_error, failed_client)) = iteration_result { + if is_connection_error { + return Err(failed_client); } - match iteration_result { - Ok(_) => { - retry_backoff.reset(); - - select! { - _ = source_client.tick().fuse() => {}, - _ = exit_signal => return, - } - } - Err(_) => { - let retry_timeout = retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY); + let retry_timeout = retry_backoff + .next_backoff() + .unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY); + select! { + _ = async_std::task::sleep(retry_timeout).fuse() => {}, + _ = exit_signal => return Ok(()), + } + } else { + retry_backoff.reset(); - select! { - _ = async_std::task::sleep(retry_timeout).fuse() => {}, - _ = exit_signal => return, - } - } + select! { + _ = source_client.tick().fuse() => {}, + _ = exit_signal => return Ok(()), } } - }); + } } /// Run exchange loop until we need to break. @@ -155,8 +184,8 @@ async fn run_loop_iteration( target_client: &impl TargetClient

, state: &mut TransactionProofsRelayState>, current_finalized_block: &mut Option<(P::Block, RelayedBlockTransactions)>, - mut exchange_loop_metrics: Option<&mut ExchangeLoopMetrics>, -) -> Result<(), ()> { + exchange_loop_metrics: Option<&ExchangeLoopMetrics>, +) -> Result<(), (bool, FailedClient)> { let best_finalized_header_id = match target_client.best_finalized_header_id().await { Ok(best_finalized_header_id) => { log::debug!( @@ -178,7 +207,7 @@ async fn run_loop_iteration( err, ); - return Err(()); + return Err((err.is_connection_error(), FailedClient::Target)); } }; @@ -202,7 +231,7 @@ async fn run_loop_iteration( state.best_processed_header_number = state.best_processed_header_number + One::one(); storage.set_state(state); - if let Some(exchange_loop_metrics) = exchange_loop_metrics.as_mut() { + if let Some(ref exchange_loop_metrics) = exchange_loop_metrics { exchange_loop_metrics.update::

( state.best_processed_header_number, best_finalized_header_id.0, @@ -212,9 +241,9 @@ async fn run_loop_iteration( // we have just updated state => proceed to next block retrieval } - Err(relayed_transactions) => { + Err((failed_client, relayed_transactions)) => { *current_finalized_block = Some((block, relayed_transactions)); - return Err(()); + return Err((true, failed_client)); } } } @@ -240,7 +269,7 @@ async fn run_loop_iteration( err, ); - return Err(()); + return Err((err.is_connection_error(), FailedClient::Source)); } } } diff --git a/bridges/relays/exchange-relay/src/exchange_loop_metrics.rs b/bridges/relays/exchange-relay/src/exchange_loop_metrics.rs index 325bfd2c9dfcd..bf8f0243b693a 100644 --- a/bridges/relays/exchange-relay/src/exchange_loop_metrics.rs +++ b/bridges/relays/exchange-relay/src/exchange_loop_metrics.rs @@ -20,6 +20,7 @@ use crate::exchange::{BlockNumberOf, RelayedBlockTransactions, TransactionProofP use relay_utils::metrics::{register, Counter, CounterVec, GaugeVec, Metrics, Opts, Registry, U64}; /// Exchange transactions relay metrics. +#[derive(Clone)] pub struct ExchangeLoopMetrics { /// Best finalized block numbers - "processed" and "known". best_block_numbers: GaugeVec, @@ -60,7 +61,7 @@ impl Default for ExchangeLoopMetrics { impl ExchangeLoopMetrics { /// Update metrics when single block is relayed. pub fn update( - &mut self, + &self, best_processed_block_number: BlockNumberOf

, best_known_block_number: BlockNumberOf

, relayed_transactions: RelayedBlockTransactions, diff --git a/bridges/relays/headers-relay/src/sync_loop.rs b/bridges/relays/headers-relay/src/sync_loop.rs index 15f982af768ad..d2584f2ccb2c8 100644 --- a/bridges/relays/headers-relay/src/sync_loop.rs +++ b/bridges/relays/headers-relay/src/sync_loop.rs @@ -26,7 +26,9 @@ use num_traits::{Saturating, Zero}; use relay_utils::{ format_ids, interval, metrics::{start as metrics_start, GlobalMetrics, MetricsParams}, - process_future_result, retry_backoff, MaybeConnectionError, StringifiedMaybeConnectionError, + process_future_result, + relay_loop::Client as RelayClient, + retry_backoff, FailedClient, MaybeConnectionError, StringifiedMaybeConnectionError, }; use std::{ collections::HashSet, @@ -53,10 +55,7 @@ const MAINTAIN_INTERVAL: Duration = Duration::from_secs(30); /// Source client trait. #[async_trait] -pub trait SourceClient { - /// Type of error this clients returns. - type Error: std::fmt::Debug + MaybeConnectionError; - +pub trait SourceClient: RelayClient { /// Get best block number. async fn best_block_number(&self) -> Result; @@ -80,10 +79,7 @@ pub trait SourceClient { /// Target client trait. #[async_trait] -pub trait TargetClient { - /// Type of error this clients returns. - type Error: std::fmt::Debug + MaybeConnectionError; - +pub trait TargetClient: RelayClient { /// Returns ID of best header known to the target node. async fn best_header_id(&self) -> Result, Self::Error>; @@ -106,7 +102,7 @@ pub trait TargetClient { /// Synchronization maintain procedure. #[async_trait] -pub trait SyncMaintain: Send + Sync { +pub trait SyncMaintain: Clone + Send + Sync { /// Run custom maintain procedures. This is guaranteed to be called when both source and target /// clients are unoccupied. async fn maintain(&self, _sync: &mut HeadersSync

) {} @@ -126,465 +122,506 @@ pub fn run>( metrics_params: Option, exit_signal: impl Future, ) { - #![allow(unused_variables)] // this is to suppress weird errors from clippy - let mut local_pool = futures::executor::LocalPool::new(); - let mut progress_context = (Instant::now(), None, None); + let exit_signal = exit_signal.shared(); + + let metrics_global = GlobalMetrics::default(); + let metrics_sync = SyncLoopMetrics::default(); + let metrics_enabled = metrics_params.is_some(); + metrics_start( + format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME), + metrics_params, + &metrics_global, + &metrics_sync, + ); - local_pool.run_until(async move { - let mut sync = HeadersSync::

::new(sync_params); - let mut stall_countdown = None; - let mut last_update_time = Instant::now(); - - let mut metrics_global = GlobalMetrics::default(); - let mut metrics_sync = SyncLoopMetrics::default(); - let metrics_enabled = metrics_params.is_some(); - metrics_start( - format!("{}_to_{}_Sync", P::SOURCE_NAME, P::TARGET_NAME), - metrics_params, - &metrics_global, - &metrics_sync, - ); - - let mut source_retry_backoff = retry_backoff(); - let mut source_client_is_online = false; - let mut source_best_block_number_required = false; - let source_best_block_number_future = source_client.best_block_number().fuse(); - let source_new_header_future = futures::future::Fuse::terminated(); - let source_orphan_header_future = futures::future::Fuse::terminated(); - let source_extra_future = futures::future::Fuse::terminated(); - let source_completion_future = futures::future::Fuse::terminated(); - let source_go_offline_future = futures::future::Fuse::terminated(); - let source_tick_stream = interval(source_tick).fuse(); - - let mut target_retry_backoff = retry_backoff(); - let mut target_client_is_online = false; - let mut target_best_block_required = false; - let mut target_incomplete_headers_required = true; - let target_best_block_future = target_client.best_header_id().fuse(); - let target_incomplete_headers_future = futures::future::Fuse::terminated(); - let target_extra_check_future = futures::future::Fuse::terminated(); - let target_existence_status_future = futures::future::Fuse::terminated(); - let target_submit_header_future = futures::future::Fuse::terminated(); - let target_complete_header_future = futures::future::Fuse::terminated(); - let target_go_offline_future = futures::future::Fuse::terminated(); - let target_tick_stream = interval(target_tick).fuse(); - - let mut maintain_required = false; - let maintain_stream = interval(MAINTAIN_INTERVAL).fuse(); - - let exit_signal = exit_signal.fuse(); - - futures::pin_mut!( - source_best_block_number_future, - source_new_header_future, - source_orphan_header_future, - source_extra_future, - source_completion_future, - source_go_offline_future, - source_tick_stream, - target_best_block_future, - target_incomplete_headers_future, - target_extra_check_future, - target_existence_status_future, - target_submit_header_future, - target_complete_header_future, - target_go_offline_future, - target_tick_stream, - maintain_stream, - exit_signal - ); - - loop { - futures::select! { - source_best_block_number = source_best_block_number_future => { - source_best_block_number_required = false; - - source_client_is_online = process_future_result( - source_best_block_number, - &mut source_retry_backoff, - |source_best_block_number| sync.source_best_header_number_response(source_best_block_number), - &mut source_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving best header number from {}", P::SOURCE_NAME), - ).is_ok(); - }, - source_new_header = source_new_header_future => { - source_client_is_online = process_future_result( - source_new_header, - &mut source_retry_backoff, - |source_new_header| sync.headers_mut().header_response(source_new_header), - &mut source_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving header from {} node", P::SOURCE_NAME), - ).is_ok(); - }, - source_orphan_header = source_orphan_header_future => { - source_client_is_online = process_future_result( - source_orphan_header, - &mut source_retry_backoff, - |source_orphan_header| sync.headers_mut().header_response(source_orphan_header), - &mut source_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving orphan header from {} node", P::SOURCE_NAME), - ).is_ok(); - }, - source_extra = source_extra_future => { - source_client_is_online = process_future_result( - source_extra, - &mut source_retry_backoff, - |(header, extra)| sync.headers_mut().extra_response(&header, extra), - &mut source_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving extra data from {} node", P::SOURCE_NAME), - ).is_ok(); - }, - source_completion = source_completion_future => { - source_client_is_online = process_future_result( - source_completion, - &mut source_retry_backoff, - |(header, completion)| sync.headers_mut().completion_response(&header, completion), - &mut source_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving completion data from {} node", P::SOURCE_NAME), - ).is_ok(); - }, - source_client = source_go_offline_future => { - source_client_is_online = true; - }, - _ = source_tick_stream.next() => { - if sync.is_almost_synced() { - source_best_block_number_required = true; - } - }, - target_best_block = target_best_block_future => { - target_best_block_required = false; - - target_client_is_online = process_future_result( - target_best_block, - &mut target_retry_backoff, - |target_best_block| { - let head_updated = sync.target_best_header_response(target_best_block); - if head_updated { - last_update_time = Instant::now(); - } - match head_updated { - // IF head is updated AND there are still our transactions: - // => restart stall countdown timer - true if sync.headers().headers_in_status(HeaderStatus::Submitted) != 0 => - stall_countdown = Some(Instant::now()), - // IF head is updated AND there are no our transactions: - // => stop stall countdown timer - true => stall_countdown = None, - // IF head is not updated AND stall countdown is not yet completed - // => do nothing - false if stall_countdown - .map(|stall_countdown| stall_countdown.elapsed() < STALL_SYNC_TIMEOUT) - .unwrap_or(true) - => (), - // IF head is not updated AND stall countdown has completed - // => restart sync - false => { - log::info!( - target: "bridge", - "Sync has stalled. Restarting {} headers synchronization.", - P::SOURCE_NAME, - ); - stall_countdown = None; - sync.restart(); - }, - } - }, - &mut target_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving best known {} header from {} node", P::SOURCE_NAME, P::TARGET_NAME), - ).is_ok(); - }, - incomplete_headers_ids = target_incomplete_headers_future => { - target_incomplete_headers_required = false; - - target_client_is_online = process_future_result( - incomplete_headers_ids, - &mut target_retry_backoff, - |incomplete_headers_ids| sync.headers_mut().incomplete_headers_response(incomplete_headers_ids), - &mut target_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving incomplete headers from {} node", P::TARGET_NAME), - ).is_ok(); - }, - target_existence_status = target_existence_status_future => { - target_client_is_online = process_future_result( - target_existence_status, - &mut target_retry_backoff, - |(target_header, target_existence_status)| sync - .headers_mut() - .maybe_orphan_response(&target_header, target_existence_status), - &mut target_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving existence status from {} node", P::TARGET_NAME), - ).is_ok(); - }, - submitted_headers = target_submit_header_future => { - // following line helps Rust understand the type of `submitted_headers` :/ - let submitted_headers: SubmittedHeaders, TC::Error> = submitted_headers; - let submitted_headers_str = format!("{}", submitted_headers); - let all_headers_rejected = submitted_headers.submitted.is_empty() - && submitted_headers.incomplete.is_empty(); - let has_submitted_headers = sync.headers().headers_in_status(HeaderStatus::Submitted) != 0; - - let maybe_fatal_error = match submitted_headers.fatal_error { - Some(fatal_error) => Err(StringifiedMaybeConnectionError::new( - fatal_error.is_connection_error(), - format!("{:?}", fatal_error), - )), - None if all_headers_rejected && !has_submitted_headers => - Err(StringifiedMaybeConnectionError::new(false, "All headers were rejected".into())), - None => Ok(()), - }; - - let no_fatal_error = maybe_fatal_error.is_ok(); - target_client_is_online = process_future_result( - maybe_fatal_error, - &mut target_retry_backoff, - |_| {}, - &mut target_go_offline_future, - async_std::task::sleep, - || format!("Error submitting headers to {} node", P::TARGET_NAME), - ).is_ok(); - - log::debug!(target: "bridge", "Header submit result: {}", submitted_headers_str); - - sync.headers_mut().headers_submitted(submitted_headers.submitted); - sync.headers_mut().add_incomplete_headers(false, submitted_headers.incomplete); - - // when there's no fatal error, but node has rejected all our headers we may - // want to pause until our submitted headers will be accepted - if no_fatal_error && all_headers_rejected && has_submitted_headers { - sync.pause_submit(); - } - }, - target_complete_header_result = target_complete_header_future => { - target_client_is_online = process_future_result( - target_complete_header_result, - &mut target_retry_backoff, - |completed_header| sync.headers_mut().header_completed(&completed_header), - &mut target_go_offline_future, - async_std::task::sleep, - || format!("Error completing headers at {}", P::TARGET_NAME), - ).is_ok(); - }, - target_extra_check_result = target_extra_check_future => { - target_client_is_online = process_future_result( - target_extra_check_result, - &mut target_retry_backoff, - |(header, extra_check_result)| sync - .headers_mut() - .maybe_extra_response(&header, extra_check_result), - &mut target_go_offline_future, - async_std::task::sleep, - || format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME), - ).is_ok(); - }, - target_client = target_go_offline_future => { - target_client_is_online = true; - }, - _ = target_tick_stream.next() => { - target_best_block_required = true; - target_incomplete_headers_required = true; + relay_utils::relay_loop::run( + relay_utils::relay_loop::RECONNECT_DELAY, + source_client, + target_client, + |source_client, target_client| { + run_until_connection_lost( + source_client, + source_tick, + target_client, + target_tick, + sync_maintain.clone(), + sync_params.clone(), + if metrics_enabled { + Some(metrics_global.clone()) + } else { + None }, - - _ = maintain_stream.next() => { - maintain_required = true; + if metrics_enabled { + Some(metrics_sync.clone()) + } else { + None }, - _ = exit_signal => { - return; - } - } + exit_signal.clone(), + ) + }, + ); +} - // update metrics - if metrics_enabled { - metrics_global.update(); - metrics_sync.update(&sync); - } +/// Run headers synchronization. +#[allow(clippy::too_many_arguments)] +async fn run_until_connection_lost>( + source_client: impl SourceClient

, + source_tick: Duration, + target_client: TC, + target_tick: Duration, + sync_maintain: impl SyncMaintain

, + sync_params: HeadersSyncParams, + metrics_global: Option, + metrics_sync: Option, + exit_signal: impl Future, +) -> Result<(), FailedClient> { + let mut progress_context = (Instant::now(), None, None); - // print progress - progress_context = print_sync_progress(progress_context, &sync); + let mut sync = HeadersSync::

::new(sync_params); + let mut stall_countdown = None; + let mut last_update_time = Instant::now(); + + let mut source_retry_backoff = retry_backoff(); + let mut source_client_is_online = false; + let mut source_best_block_number_required = false; + let source_best_block_number_future = source_client.best_block_number().fuse(); + let source_new_header_future = futures::future::Fuse::terminated(); + let source_orphan_header_future = futures::future::Fuse::terminated(); + let source_extra_future = futures::future::Fuse::terminated(); + let source_completion_future = futures::future::Fuse::terminated(); + let source_go_offline_future = futures::future::Fuse::terminated(); + let source_tick_stream = interval(source_tick).fuse(); + + let mut target_retry_backoff = retry_backoff(); + let mut target_client_is_online = false; + let mut target_best_block_required = false; + let mut target_incomplete_headers_required = true; + let target_best_block_future = target_client.best_header_id().fuse(); + let target_incomplete_headers_future = futures::future::Fuse::terminated(); + let target_extra_check_future = futures::future::Fuse::terminated(); + let target_existence_status_future = futures::future::Fuse::terminated(); + let target_submit_header_future = futures::future::Fuse::terminated(); + let target_complete_header_future = futures::future::Fuse::terminated(); + let target_go_offline_future = futures::future::Fuse::terminated(); + let target_tick_stream = interval(target_tick).fuse(); + + let mut maintain_required = false; + let maintain_stream = interval(MAINTAIN_INTERVAL).fuse(); + + let exit_signal = exit_signal.fuse(); + + futures::pin_mut!( + source_best_block_number_future, + source_new_header_future, + source_orphan_header_future, + source_extra_future, + source_completion_future, + source_go_offline_future, + source_tick_stream, + target_best_block_future, + target_incomplete_headers_future, + target_extra_check_future, + target_existence_status_future, + target_submit_header_future, + target_complete_header_future, + target_go_offline_future, + target_tick_stream, + maintain_stream, + exit_signal + ); - // run maintain procedures - if maintain_required && source_client_is_online && target_client_is_online { - log::debug!(target: "bridge", "Maintaining headers sync loop"); - maintain_required = false; - sync_maintain.maintain(&mut sync).await; + loop { + futures::select! { + source_best_block_number = source_best_block_number_future => { + source_best_block_number_required = false; + + source_client_is_online = process_future_result( + source_best_block_number, + &mut source_retry_backoff, + |source_best_block_number| sync.source_best_header_number_response(source_best_block_number), + &mut source_go_offline_future, + async_std::task::sleep, + || format!("Error retrieving best header number from {}", P::SOURCE_NAME), + ).fail_if_connection_error(FailedClient::Source)?; + }, + source_new_header = source_new_header_future => { + source_client_is_online = process_future_result( + source_new_header, + &mut source_retry_backoff, + |source_new_header| sync.headers_mut().header_response(source_new_header), + &mut source_go_offline_future, + async_std::task::sleep, + || format!("Error retrieving header from {} node", P::SOURCE_NAME), + ).fail_if_connection_error(FailedClient::Source)?; + }, + source_orphan_header = source_orphan_header_future => { + source_client_is_online = process_future_result( + source_orphan_header, + &mut source_retry_backoff, + |source_orphan_header| sync.headers_mut().header_response(source_orphan_header), + &mut source_go_offline_future, + async_std::task::sleep, + || format!("Error retrieving orphan header from {} node", P::SOURCE_NAME), + ).fail_if_connection_error(FailedClient::Source)?; + }, + source_extra = source_extra_future => { + source_client_is_online = process_future_result( + source_extra, + &mut source_retry_backoff, + |(header, extra)| sync.headers_mut().extra_response(&header, extra), + &mut source_go_offline_future, + async_std::task::sleep, + || format!("Error retrieving extra data from {} node", P::SOURCE_NAME), + ).fail_if_connection_error(FailedClient::Source)?; + }, + source_completion = source_completion_future => { + source_client_is_online = process_future_result( + source_completion, + &mut source_retry_backoff, + |(header, completion)| sync.headers_mut().completion_response(&header, completion), + &mut source_go_offline_future, + async_std::task::sleep, + || format!("Error retrieving completion data from {} node", P::SOURCE_NAME), + ).fail_if_connection_error(FailedClient::Source)?; + }, + _ = source_go_offline_future => { + source_client_is_online = true; + }, + _ = source_tick_stream.next() => { + if sync.is_almost_synced() { + source_best_block_number_required = true; + } + }, + target_best_block = target_best_block_future => { + target_best_block_required = false; + + target_client_is_online = process_future_result( + target_best_block, + &mut target_retry_backoff, + |target_best_block| { + let head_updated = sync.target_best_header_response(target_best_block); + if head_updated { + last_update_time = Instant::now(); + } + match head_updated { + // IF head is updated AND there are still our transactions: + // => restart stall countdown timer + true if sync.headers().headers_in_status(HeaderStatus::Submitted) != 0 => + stall_countdown = Some(Instant::now()), + // IF head is updated AND there are no our transactions: + // => stop stall countdown timer + true => stall_countdown = None, + // IF head is not updated AND stall countdown is not yet completed + // => do nothing + false if stall_countdown + .map(|stall_countdown| stall_countdown.elapsed() < STALL_SYNC_TIMEOUT) + .unwrap_or(true) + => (), + // IF head is not updated AND stall countdown has completed + // => restart sync + false => { + log::info!( + target: "bridge", + "Sync has stalled. Restarting {} headers synchronization.", + P::SOURCE_NAME, + ); + stall_countdown = None; + sync.restart(); + }, + } + }, + &mut target_go_offline_future, + async_std::task::sleep, + || format!("Error retrieving best known {} header from {} node", P::SOURCE_NAME, P::TARGET_NAME), + ).fail_if_connection_error(FailedClient::Target)?; + }, + incomplete_headers_ids = target_incomplete_headers_future => { + target_incomplete_headers_required = false; + + target_client_is_online = process_future_result( + incomplete_headers_ids, + &mut target_retry_backoff, + |incomplete_headers_ids| sync.headers_mut().incomplete_headers_response(incomplete_headers_ids), + &mut target_go_offline_future, + async_std::task::sleep, + || format!("Error retrieving incomplete headers from {} node", P::TARGET_NAME), + ).fail_if_connection_error(FailedClient::Target)?; + }, + target_existence_status = target_existence_status_future => { + target_client_is_online = process_future_result( + target_existence_status, + &mut target_retry_backoff, + |(target_header, target_existence_status)| sync + .headers_mut() + .maybe_orphan_response(&target_header, target_existence_status), + &mut target_go_offline_future, + async_std::task::sleep, + || format!("Error retrieving existence status from {} node", P::TARGET_NAME), + ).fail_if_connection_error(FailedClient::Target)?; + }, + submitted_headers = target_submit_header_future => { + // following line helps Rust understand the type of `submitted_headers` :/ + let submitted_headers: SubmittedHeaders, TC::Error> = submitted_headers; + let submitted_headers_str = format!("{}", submitted_headers); + let all_headers_rejected = submitted_headers.submitted.is_empty() + && submitted_headers.incomplete.is_empty(); + let has_submitted_headers = sync.headers().headers_in_status(HeaderStatus::Submitted) != 0; + + let maybe_fatal_error = match submitted_headers.fatal_error { + Some(fatal_error) => Err(StringifiedMaybeConnectionError::new( + fatal_error.is_connection_error(), + format!("{:?}", fatal_error), + )), + None if all_headers_rejected && !has_submitted_headers => + Err(StringifiedMaybeConnectionError::new(false, "All headers were rejected".into())), + None => Ok(()), + }; + + let no_fatal_error = maybe_fatal_error.is_ok(); + target_client_is_online = process_future_result( + maybe_fatal_error, + &mut target_retry_backoff, + |_| {}, + &mut target_go_offline_future, + async_std::task::sleep, + || format!("Error submitting headers to {} node", P::TARGET_NAME), + ).fail_if_connection_error(FailedClient::Target)?; + + log::debug!(target: "bridge", "Header submit result: {}", submitted_headers_str); + + sync.headers_mut().headers_submitted(submitted_headers.submitted); + sync.headers_mut().add_incomplete_headers(false, submitted_headers.incomplete); + + // when there's no fatal error, but node has rejected all our headers we may + // want to pause until our submitted headers will be accepted + if no_fatal_error && all_headers_rejected && has_submitted_headers { + sync.pause_submit(); + } + }, + target_complete_header_result = target_complete_header_future => { + target_client_is_online = process_future_result( + target_complete_header_result, + &mut target_retry_backoff, + |completed_header| sync.headers_mut().header_completed(&completed_header), + &mut target_go_offline_future, + async_std::task::sleep, + || format!("Error completing headers at {}", P::TARGET_NAME), + ).fail_if_connection_error(FailedClient::Target)?; + }, + target_extra_check_result = target_extra_check_future => { + target_client_is_online = process_future_result( + target_extra_check_result, + &mut target_retry_backoff, + |(header, extra_check_result)| sync + .headers_mut() + .maybe_extra_response(&header, extra_check_result), + &mut target_go_offline_future, + async_std::task::sleep, + || format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME), + ).fail_if_connection_error(FailedClient::Target)?; + }, + _ = target_go_offline_future => { + target_client_is_online = true; + }, + _ = target_tick_stream.next() => { + target_best_block_required = true; + target_incomplete_headers_required = true; + }, + + _ = maintain_stream.next() => { + maintain_required = true; + }, + _ = exit_signal => { + return Ok(()); } + } - // If the target client is accepting requests we update the requests that - // we want it to run - if !maintain_required && target_client_is_online { - // NOTE: Is is important to reset this so that we only have one - // request being processed by the client at a time. This prevents - // race conditions like receiving two transactions with the same - // nonce from the client. - target_client_is_online = false; - - // The following is how we prioritize requests: - // - // 1. Get best block - // - Stops us from downloading or submitting new blocks - // - Only called rarely - // - // 2. Get incomplete headers - // - Stops us from submitting new blocks - // - Only called rarely - // - // 3. Get complete headers - // - Stops us from submitting new blocks - // - // 4. Check if we need extra data from source - // - Stops us from downloading or submitting new blocks - // - // 5. Check existence of header - // - Stops us from submitting new blocks - // - // 6. Submit header - - if target_best_block_required { - log::debug!(target: "bridge", "Asking {} about best block", P::TARGET_NAME); - target_best_block_future.set(target_client.best_header_id().fuse()); - } else if target_incomplete_headers_required { - log::debug!(target: "bridge", "Asking {} about incomplete headers", P::TARGET_NAME); - target_incomplete_headers_future.set(target_client.incomplete_headers_ids().fuse()); - } else if let Some((id, completion)) = sync.headers_mut().header_to_complete() { - log::debug!( - target: "bridge", - "Going to complete header: {:?}", - id, - ); - - target_complete_header_future.set(target_client.complete_header(id, completion.clone()).fuse()); - } else if let Some(header) = sync.headers().header(HeaderStatus::MaybeExtra) { - log::debug!( - target: "bridge", - "Checking if header submission requires extra: {:?}", - header.id(), - ); - - target_extra_check_future.set(target_client.requires_extra(header.clone()).fuse()); - } else if let Some(header) = sync.headers().header(HeaderStatus::MaybeOrphan) { - // for MaybeOrphan we actually ask for parent' header existence - let parent_id = header.parent_id(); + // update metrics + if let Some(ref metrics_global) = metrics_global { + metrics_global.update().await; + } + if let Some(ref metrics_sync) = metrics_sync { + metrics_sync.update(&sync); + } - log::debug!( - target: "bridge", - "Asking {} node for existence of: {:?}", - P::TARGET_NAME, - parent_id, - ); + // print progress + progress_context = print_sync_progress(progress_context, &sync); - target_existence_status_future.set(target_client.is_known_header(parent_id).fuse()); - } else if let Some(headers) = - sync.select_headers_to_submit(last_update_time.elapsed() > BACKUP_STALL_SYNC_TIMEOUT) - { - log::debug!( - target: "bridge", - "Submitting {} header(s) to {} node: {:?}", - headers.len(), - P::TARGET_NAME, - format_ids(headers.iter().map(|header| header.id())), - ); - - let headers = headers.into_iter().cloned().collect(); - target_submit_header_future.set(target_client.submit_headers(headers).fuse()); + // run maintain procedures + if maintain_required && source_client_is_online && target_client_is_online { + log::debug!(target: "bridge", "Maintaining headers sync loop"); + maintain_required = false; + sync_maintain.maintain(&mut sync).await; + } - // remember that we have submitted some headers - if stall_countdown.is_none() { - stall_countdown = Some(Instant::now()); - } - } else { - target_client_is_online = true; + // If the target client is accepting requests we update the requests that + // we want it to run + if !maintain_required && target_client_is_online { + // NOTE: Is is important to reset this so that we only have one + // request being processed by the client at a time. This prevents + // race conditions like receiving two transactions with the same + // nonce from the client. + target_client_is_online = false; + + // The following is how we prioritize requests: + // + // 1. Get best block + // - Stops us from downloading or submitting new blocks + // - Only called rarely + // + // 2. Get incomplete headers + // - Stops us from submitting new blocks + // - Only called rarely + // + // 3. Get complete headers + // - Stops us from submitting new blocks + // + // 4. Check if we need extra data from source + // - Stops us from downloading or submitting new blocks + // + // 5. Check existence of header + // - Stops us from submitting new blocks + // + // 6. Submit header + + if target_best_block_required { + log::debug!(target: "bridge", "Asking {} about best block", P::TARGET_NAME); + target_best_block_future.set(target_client.best_header_id().fuse()); + } else if target_incomplete_headers_required { + log::debug!(target: "bridge", "Asking {} about incomplete headers", P::TARGET_NAME); + target_incomplete_headers_future.set(target_client.incomplete_headers_ids().fuse()); + } else if let Some((id, completion)) = sync.headers_mut().header_to_complete() { + log::debug!( + target: "bridge", + "Going to complete header: {:?}", + id, + ); + + target_complete_header_future.set(target_client.complete_header(id, completion.clone()).fuse()); + } else if let Some(header) = sync.headers().header(HeaderStatus::MaybeExtra) { + log::debug!( + target: "bridge", + "Checking if header submission requires extra: {:?}", + header.id(), + ); + + target_extra_check_future.set(target_client.requires_extra(header.clone()).fuse()); + } else if let Some(header) = sync.headers().header(HeaderStatus::MaybeOrphan) { + // for MaybeOrphan we actually ask for parent' header existence + let parent_id = header.parent_id(); + + log::debug!( + target: "bridge", + "Asking {} node for existence of: {:?}", + P::TARGET_NAME, + parent_id, + ); + + target_existence_status_future.set(target_client.is_known_header(parent_id).fuse()); + } else if let Some(headers) = + sync.select_headers_to_submit(last_update_time.elapsed() > BACKUP_STALL_SYNC_TIMEOUT) + { + log::debug!( + target: "bridge", + "Submitting {} header(s) to {} node: {:?}", + headers.len(), + P::TARGET_NAME, + format_ids(headers.iter().map(|header| header.id())), + ); + + let headers = headers.into_iter().cloned().collect(); + target_submit_header_future.set(target_client.submit_headers(headers).fuse()); + + // remember that we have submitted some headers + if stall_countdown.is_none() { + stall_countdown = Some(Instant::now()); } + } else { + target_client_is_online = true; } + } - // If the source client is accepting requests we update the requests that - // we want it to run - if !maintain_required && source_client_is_online { - // NOTE: Is is important to reset this so that we only have one - // request being processed by the client at a time. This prevents - // race conditions like receiving two transactions with the same - // nonce from the client. - source_client_is_online = false; - - // The following is how we prioritize requests: - // - // 1. Get best block - // - Stops us from downloading or submitting new blocks - // - Only called rarely - // - // 2. Download completion data - // - Stops us from submitting new blocks - // - // 3. Download extra data - // - Stops us from submitting new blocks - // - // 4. Download missing headers - // - Stops us from downloading or submitting new blocks - // - // 5. Downloading new headers - - if source_best_block_number_required { - log::debug!(target: "bridge", "Asking {} node about best block number", P::SOURCE_NAME); - source_best_block_number_future.set(source_client.best_block_number().fuse()); - } else if let Some(id) = sync.headers_mut().incomplete_header() { - log::debug!( - target: "bridge", - "Retrieving completion data for header: {:?}", - id, - ); - source_completion_future.set(source_client.header_completion(id).fuse()); - } else if let Some(header) = sync.headers().header(HeaderStatus::Extra) { - let id = header.id(); - log::debug!( - target: "bridge", - "Retrieving extra data for header: {:?}", - id, - ); - source_extra_future.set(source_client.header_extra(id, header.clone()).fuse()); - } else if let Some(header) = sync.select_orphan_header_to_download() { - // for Orphan we actually ask for parent' header - let parent_id = header.parent_id(); - - // if we have end up with orphan header#0, then we are misconfigured - if parent_id.0.is_zero() { - log::error!( - target: "bridge", - "Misconfiguration. Genesis {} header is considered orphan by {} node", - P::SOURCE_NAME, - P::TARGET_NAME, - ); - return; - } - - log::debug!( - target: "bridge", - "Going to download orphan header from {} node: {:?}", - P::SOURCE_NAME, - parent_id, - ); - - source_orphan_header_future.set(source_client.header_by_hash(parent_id.1).fuse()); - } else if let Some(id) = sync.select_new_header_to_download() { - log::debug!( + // If the source client is accepting requests we update the requests that + // we want it to run + if !maintain_required && source_client_is_online { + // NOTE: Is is important to reset this so that we only have one + // request being processed by the client at a time. This prevents + // race conditions like receiving two transactions with the same + // nonce from the client. + source_client_is_online = false; + + // The following is how we prioritize requests: + // + // 1. Get best block + // - Stops us from downloading or submitting new blocks + // - Only called rarely + // + // 2. Download completion data + // - Stops us from submitting new blocks + // + // 3. Download extra data + // - Stops us from submitting new blocks + // + // 4. Download missing headers + // - Stops us from downloading or submitting new blocks + // + // 5. Downloading new headers + + if source_best_block_number_required { + log::debug!(target: "bridge", "Asking {} node about best block number", P::SOURCE_NAME); + source_best_block_number_future.set(source_client.best_block_number().fuse()); + } else if let Some(id) = sync.headers_mut().incomplete_header() { + log::debug!( + target: "bridge", + "Retrieving completion data for header: {:?}", + id, + ); + source_completion_future.set(source_client.header_completion(id).fuse()); + } else if let Some(header) = sync.headers().header(HeaderStatus::Extra) { + let id = header.id(); + log::debug!( + target: "bridge", + "Retrieving extra data for header: {:?}", + id, + ); + source_extra_future.set(source_client.header_extra(id, header.clone()).fuse()); + } else if let Some(header) = sync.select_orphan_header_to_download() { + // for Orphan we actually ask for parent' header + let parent_id = header.parent_id(); + + // if we have end up with orphan header#0, then we are misconfigured + if parent_id.0.is_zero() { + log::error!( target: "bridge", - "Going to download new header from {} node: {:?}", + "Misconfiguration. Genesis {} header is considered orphan by {} node", P::SOURCE_NAME, - id, + P::TARGET_NAME, ); - - source_new_header_future.set(source_client.header_by_number(id).fuse()); - } else { - source_client_is_online = true; + return Ok(()); } + + log::debug!( + target: "bridge", + "Going to download orphan header from {} node: {:?}", + P::SOURCE_NAME, + parent_id, + ); + + source_orphan_header_future.set(source_client.header_by_hash(parent_id.1).fuse()); + } else if let Some(id) = sync.select_new_header_to_download() { + log::debug!( + target: "bridge", + "Going to download new header from {} node: {:?}", + P::SOURCE_NAME, + id, + ); + + source_new_header_future.set(source_client.header_by_number(id).fuse()); + } else { + source_client_is_online = true; } } - }); + } } /// Print synchronization progress. diff --git a/bridges/relays/headers-relay/src/sync_loop_metrics.rs b/bridges/relays/headers-relay/src/sync_loop_metrics.rs index 26939ee372cb7..456aa0a6b0522 100644 --- a/bridges/relays/headers-relay/src/sync_loop_metrics.rs +++ b/bridges/relays/headers-relay/src/sync_loop_metrics.rs @@ -23,6 +23,7 @@ use num_traits::Zero; use relay_utils::metrics::{register, GaugeVec, Metrics, Opts, Registry, U64}; /// Headers sync metrics. +#[derive(Clone)] pub struct SyncLoopMetrics { /// Best syncing headers at "source" and "target" nodes. best_block_numbers: GaugeVec, @@ -57,7 +58,7 @@ impl Default for SyncLoopMetrics { impl SyncLoopMetrics { /// Update metrics. - pub fn update(&mut self, sync: &HeadersSync

) { + pub fn update(&self, sync: &HeadersSync

) { let headers = sync.headers(); let source_best_number = sync.source_best_number().unwrap_or_else(Zero::zero); let target_best_number = sync.target_best_header().map(|id| id.0).unwrap_or_else(Zero::zero); diff --git a/bridges/relays/headers-relay/src/sync_loop_tests.rs b/bridges/relays/headers-relay/src/sync_loop_tests.rs index 27e5daa7da4f4..5cfd5e4f57b66 100644 --- a/bridges/relays/headers-relay/src/sync_loop_tests.rs +++ b/bridges/relays/headers-relay/src/sync_loop_tests.rs @@ -23,7 +23,9 @@ use async_trait::async_trait; use backoff::backoff::Backoff; use futures::{future::FutureExt, stream::StreamExt}; use parking_lot::Mutex; -use relay_utils::{process_future_result, retry_backoff, HeaderId, MaybeConnectionError}; +use relay_utils::{ + process_future_result, relay_loop::Client as RelayClient, retry_backoff, HeaderId, MaybeConnectionError, +}; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -89,8 +91,9 @@ enum SourceMethod { HeaderExtra(TestHeaderId, TestQueuedHeader), } +#[derive(Clone)] struct Source { - data: Mutex, + data: Arc>, on_method_call: Arc, } @@ -109,7 +112,7 @@ impl Source { on_method_call: impl Fn(SourceMethod, &mut SourceData) + Send + Sync + 'static, ) -> Self { Source { - data: Mutex::new(SourceData { + data: Arc::new(Mutex::new(SourceData { best_block_number: Ok(best_block_id.0), header_by_hash: headers .iter() @@ -127,35 +130,42 @@ impl Source { .collect(), provides_completion: true, provides_extra: true, - }), + })), on_method_call: Arc::new(on_method_call), } } } #[async_trait] -impl SourceClient for Source { +impl RelayClient for Source { type Error = TestError; - async fn best_block_number(&self) -> Result { + async fn reconnect(&mut self) -> Result<(), TestError> { + unimplemented!() + } +} + +#[async_trait] +impl SourceClient for Source { + async fn best_block_number(&self) -> Result { let mut data = self.data.lock(); (self.on_method_call)(SourceMethod::BestBlockNumber, &mut *data); data.best_block_number.clone() } - async fn header_by_hash(&self, hash: TestHash) -> Result { + async fn header_by_hash(&self, hash: TestHash) -> Result { let mut data = self.data.lock(); (self.on_method_call)(SourceMethod::HeaderByHash(hash), &mut *data); data.header_by_hash.get(&hash).cloned().ok_or(TestError(false)) } - async fn header_by_number(&self, number: TestNumber) -> Result { + async fn header_by_number(&self, number: TestNumber) -> Result { let mut data = self.data.lock(); (self.on_method_call)(SourceMethod::HeaderByNumber(number), &mut *data); data.header_by_number.get(&number).cloned().ok_or(TestError(false)) } - async fn header_completion(&self, id: TestHeaderId) -> Result<(TestHeaderId, Option), Self::Error> { + async fn header_completion(&self, id: TestHeaderId) -> Result<(TestHeaderId, Option), TestError> { let mut data = self.data.lock(); (self.on_method_call)(SourceMethod::HeaderCompletion(id), &mut *data); if data.provides_completion { @@ -169,7 +179,7 @@ impl SourceClient for Source { &self, id: TestHeaderId, header: TestQueuedHeader, - ) -> Result<(TestHeaderId, TestExtra), Self::Error> { + ) -> Result<(TestHeaderId, TestExtra), TestError> { let mut data = self.data.lock(); (self.on_method_call)(SourceMethod::HeaderExtra(id, header), &mut *data); if data.provides_extra { @@ -189,8 +199,9 @@ enum TargetMethod { RequiresExtra(TestQueuedHeader), } +#[derive(Clone)] struct Target { - data: Mutex, + data: Arc>, on_method_call: Arc, } @@ -211,7 +222,7 @@ impl Target { on_method_call: impl Fn(TargetMethod, &mut TargetData) + Send + Sync + 'static, ) -> Self { Target { - data: Mutex::new(TargetData { + data: Arc::new(Mutex::new(TargetData { best_header_id: Ok(best_header_id), is_known_header_by_hash: headers.iter().map(|header| (header.1, true)).collect(), submitted_headers: HashMap::new(), @@ -219,16 +230,23 @@ impl Target { completed_headers: HashMap::new(), requires_completion: false, requires_extra: false, - }), + })), on_method_call: Arc::new(on_method_call), } } } #[async_trait] -impl TargetClient for Target { +impl RelayClient for Target { type Error = TestError; + async fn reconnect(&mut self) -> Result<(), TestError> { + unimplemented!() + } +} + +#[async_trait] +impl TargetClient for Target { async fn best_header_id(&self) -> Result { let mut data = self.data.lock(); (self.on_method_call)(TargetMethod::BestHeaderId, &mut *data); diff --git a/bridges/relays/messages-relay/src/message_lane_loop.rs b/bridges/relays/messages-relay/src/message_lane_loop.rs index b60cbf35137e3..28b55dba47cc7 100644 --- a/bridges/relays/messages-relay/src/message_lane_loop.rs +++ b/bridges/relays/messages-relay/src/message_lane_loop.rs @@ -35,7 +35,9 @@ use futures::{channel::mpsc::unbounded, future::FutureExt, stream::StreamExt}; use relay_utils::{ interval, metrics::{start as metrics_start, GlobalMetrics, MetricsParams}, - process_future_result, retry_backoff, FailedClient, MaybeConnectionError, + process_future_result, + relay_loop::Client as RelayClient, + retry_backoff, FailedClient, }; use std::{collections::BTreeMap, fmt::Debug, future::Future, ops::RangeInclusive, time::Duration}; @@ -98,13 +100,7 @@ pub struct MessageProofParameters { /// Source client trait. #[async_trait] -pub trait SourceClient: Clone + Send + Sync { - /// Type of error this clients returns. - type Error: std::fmt::Debug + MaybeConnectionError; - - /// Try to reconnect to source node. - async fn reconnect(self) -> Result; - +pub trait SourceClient: RelayClient { /// Returns state of the client. async fn state(&self) -> Result, Self::Error>; @@ -147,13 +143,7 @@ pub trait SourceClient: Clone + Send + Sync { /// Target client trait. #[async_trait] -pub trait TargetClient: Clone + Send + Sync { - /// Type of error this clients returns. - type Error: std::fmt::Debug + MaybeConnectionError; - - /// Try to reconnect to source node. - async fn reconnect(self) -> Result; - +pub trait TargetClient: RelayClient { /// Returns state of the client. async fn state(&self) -> Result, Self::Error>; @@ -218,37 +208,38 @@ pub struct ClientsState { /// Run message lane service loop. pub fn run( params: Params, - mut source_client: impl SourceClient

, - mut target_client: impl TargetClient

, + source_client: impl SourceClient

, + target_client: impl TargetClient

, metrics_params: Option, exit_signal: impl Future, ) { - let mut local_pool = futures::executor::LocalPool::new(); let exit_signal = exit_signal.shared(); + let metrics_global = GlobalMetrics::default(); + let metrics_msg = MessageLaneLoopMetrics::default(); + let metrics_enabled = metrics_params.is_some(); + metrics_start( + format!( + "{}_to_{}_MessageLane_{}", + P::SOURCE_NAME, + P::TARGET_NAME, + hex::encode(params.lane) + ), + metrics_params, + &metrics_global, + &metrics_msg, + ); - local_pool.run_until(async move { - let mut metrics_global = GlobalMetrics::default(); - let metrics_msg = MessageLaneLoopMetrics::default(); - let metrics_enabled = metrics_params.is_some(); - metrics_start( - format!( - "{}_to_{}_MessageLane_{}", - P::SOURCE_NAME, - P::TARGET_NAME, - hex::encode(params.lane) - ), - metrics_params, - &metrics_global, - &metrics_msg, - ); - - loop { - let result = run_until_connection_lost( + relay_utils::relay_loop::run( + params.reconnect_delay, + source_client, + target_client, + |source_client, target_client| { + run_until_connection_lost( params.clone(), - source_client.clone(), - target_client.clone(), + source_client, + target_client, if metrics_enabled { - Some(&mut metrics_global) + Some(metrics_global.clone()) } else { None }, @@ -259,55 +250,8 @@ pub fn run( }, exit_signal.clone(), ) - .await; - - match result { - Ok(()) => break, - Err(failed_client) => loop { - async_std::task::sleep(params.reconnect_delay).await; - if failed_client == FailedClient::Both || failed_client == FailedClient::Source { - source_client = match source_client.clone().reconnect().await { - Ok(source_client) => source_client, - Err(error) => { - log::warn!( - target: "bridge", - "Failed to reconnect {}. Going to retry in {}s: {:?}", - P::SOURCE_NAME, - params.reconnect_delay.as_secs(), - error, - ); - continue; - } - } - } - if failed_client == FailedClient::Both || failed_client == FailedClient::Target { - target_client = match target_client.clone().reconnect().await { - Ok(target_client) => target_client, - Err(error) => { - log::warn!( - target: "bridge", - "Failed to reconnect {}. Going to retry in {}s: {:?}", - P::TARGET_NAME, - params.reconnect_delay.as_secs(), - error, - ); - continue; - } - } - } - - break; - }, - } - - log::debug!( - target: "bridge", - "Restarting lane {} -> {}", - P::SOURCE_NAME, - P::TARGET_NAME, - ); - } - }); + }, + ); } /// Run one-way message delivery loop until connection with target or source node is lost, or exit signal is received. @@ -315,7 +259,7 @@ async fn run_until_connection_lost, TC: Targ params: Params, source_client: SC, target_client: TC, - mut metrics_global: Option<&mut GlobalMetrics>, + metrics_global: Option, metrics_msg: Option, exit_signal: impl Future, ) -> Result<(), FailedClient> { @@ -459,8 +403,8 @@ async fn run_until_connection_lost, TC: Targ } } - if let Some(metrics_global) = metrics_global.as_mut() { - metrics_global.update(); + if let Some(ref metrics_global) = metrics_global { + metrics_global.update().await; } if source_client_is_online && source_state_required { @@ -482,7 +426,7 @@ pub(crate) mod tests { use super::*; use futures::stream::StreamExt; use parking_lot::Mutex; - use relay_utils::HeaderId; + use relay_utils::{HeaderId, MaybeConnectionError}; use std::sync::Arc; pub fn header_id(number: TestSourceHeaderNumber) -> TestSourceHeaderId { @@ -550,19 +494,22 @@ pub(crate) mod tests { } #[async_trait] - impl SourceClient for TestSourceClient { + impl RelayClient for TestSourceClient { type Error = TestError; - async fn reconnect(self) -> Result { + async fn reconnect(&mut self) -> Result<(), TestError> { { let mut data = self.data.lock(); (self.tick)(&mut *data); data.is_source_reconnected = true; } - Ok(self) + Ok(()) } + } - async fn state(&self) -> Result, Self::Error> { + #[async_trait] + impl SourceClient for TestSourceClient { + async fn state(&self) -> Result, TestError> { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_source_fails { @@ -574,7 +521,7 @@ pub(crate) mod tests { async fn latest_generated_nonce( &self, id: SourceHeaderIdOf, - ) -> Result<(SourceHeaderIdOf, MessageNonce), Self::Error> { + ) -> Result<(SourceHeaderIdOf, MessageNonce), TestError> { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_source_fails { @@ -586,7 +533,7 @@ pub(crate) mod tests { async fn latest_confirmed_received_nonce( &self, id: SourceHeaderIdOf, - ) -> Result<(SourceHeaderIdOf, MessageNonce), Self::Error> { + ) -> Result<(SourceHeaderIdOf, MessageNonce), TestError> { let mut data = self.data.lock(); (self.tick)(&mut *data); Ok((id, data.source_latest_confirmed_received_nonce)) @@ -596,7 +543,7 @@ pub(crate) mod tests { &self, _id: SourceHeaderIdOf, nonces: RangeInclusive, - ) -> Result { + ) -> Result { Ok(nonces .map(|nonce| (nonce, MessageWeights { weight: 1, size: 1 })) .collect()) @@ -613,7 +560,7 @@ pub(crate) mod tests { RangeInclusive, TestMessagesProof, ), - Self::Error, + TestError, > { let mut data = self.data.lock(); (self.tick)(&mut *data); @@ -635,7 +582,7 @@ pub(crate) mod tests { &self, _generated_at_block: TargetHeaderIdOf, proof: TestMessagesReceivingProof, - ) -> Result<(), Self::Error> { + ) -> Result<(), TestError> { let mut data = self.data.lock(); (self.tick)(&mut *data); data.submitted_messages_receiving_proofs.push(proof); @@ -651,19 +598,22 @@ pub(crate) mod tests { } #[async_trait] - impl TargetClient for TestTargetClient { + impl RelayClient for TestTargetClient { type Error = TestError; - async fn reconnect(self) -> Result { + async fn reconnect(&mut self) -> Result<(), TestError> { { let mut data = self.data.lock(); (self.tick)(&mut *data); data.is_target_reconnected = true; } - Ok(self) + Ok(()) } + } - async fn state(&self) -> Result, Self::Error> { + #[async_trait] + impl TargetClient for TestTargetClient { + async fn state(&self) -> Result, TestError> { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_target_fails { @@ -675,7 +625,7 @@ pub(crate) mod tests { async fn latest_received_nonce( &self, id: TargetHeaderIdOf, - ) -> Result<(TargetHeaderIdOf, MessageNonce), Self::Error> { + ) -> Result<(TargetHeaderIdOf, MessageNonce), TestError> { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_target_fails { @@ -687,7 +637,7 @@ pub(crate) mod tests { async fn unrewarded_relayers_state( &self, id: TargetHeaderIdOf, - ) -> Result<(TargetHeaderIdOf, UnrewardedRelayersState), Self::Error> { + ) -> Result<(TargetHeaderIdOf, UnrewardedRelayersState), TestError> { Ok(( id, UnrewardedRelayersState { @@ -701,7 +651,7 @@ pub(crate) mod tests { async fn latest_confirmed_received_nonce( &self, id: TargetHeaderIdOf, - ) -> Result<(TargetHeaderIdOf, MessageNonce), Self::Error> { + ) -> Result<(TargetHeaderIdOf, MessageNonce), TestError> { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_target_fails { @@ -713,7 +663,7 @@ pub(crate) mod tests { async fn prove_messages_receiving( &self, id: TargetHeaderIdOf, - ) -> Result<(TargetHeaderIdOf, TestMessagesReceivingProof), Self::Error> { + ) -> Result<(TargetHeaderIdOf, TestMessagesReceivingProof), TestError> { Ok((id, self.data.lock().target_latest_received_nonce)) } @@ -722,7 +672,7 @@ pub(crate) mod tests { _generated_at_header: SourceHeaderIdOf, nonces: RangeInclusive, proof: TestMessagesProof, - ) -> Result, Self::Error> { + ) -> Result, TestError> { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_target_fails { diff --git a/bridges/relays/substrate-client/src/client.rs b/bridges/relays/substrate-client/src/client.rs index e52af57cdd59e..cfa644bc8c71e 100644 --- a/bridges/relays/substrate-client/src/client.rs +++ b/bridges/relays/substrate-client/src/client.rs @@ -89,12 +89,9 @@ impl Client { } /// Reopen client connection. - pub async fn reconnect(self) -> Result { - Ok(Self { - params: self.params.clone(), - client: Self::build_client(self.params).await?, - genesis_hash: self.genesis_hash, - }) + pub async fn reconnect(&mut self) -> Result<()> { + self.client = Self::build_client(self.params.clone()).await?; + Ok(()) } /// Build client to use in connection. diff --git a/bridges/relays/substrate-client/src/headers_source.rs b/bridges/relays/substrate-client/src/headers_source.rs index 33f194322ddea..040cc08e99766 100644 --- a/bridges/relays/substrate-client/src/headers_source.rs +++ b/bridges/relays/substrate-client/src/headers_source.rs @@ -25,6 +25,7 @@ use headers_relay::{ sync_loop::SourceClient, sync_types::{HeaderIdOf, HeadersSyncPipeline, QueuedHeader, SourceHeader}, }; +use relay_utils::relay_loop::Client as RelayClient; use sp_runtime::{traits::Header as HeaderT, Justification}; use std::marker::PhantomData; @@ -44,6 +45,24 @@ impl HeadersSource { } } +impl Clone for HeadersSource { + fn clone(&self) -> Self { + HeadersSource { + client: self.client.clone(), + _phantom: Default::default(), + } + } +} + +#[async_trait] +impl RelayClient for HeadersSource { + type Error = Error; + + async fn reconnect(&mut self) -> Result<(), Error> { + self.client.reconnect().await + } +} + #[async_trait] impl SourceClient

for HeadersSource where @@ -53,13 +72,11 @@ where P: HeadersSyncPipeline, P::Header: SourceHeader, { - type Error = Error; - - async fn best_block_number(&self) -> Result { + async fn best_block_number(&self) -> Result { Ok(*self.client.best_header().await?.number()) } - async fn header_by_hash(&self, hash: P::Hash) -> Result { + async fn header_by_hash(&self, hash: P::Hash) -> Result { self.client .header_by_hash(hash) .await @@ -67,7 +84,7 @@ where .map_err(Into::into) } - async fn header_by_number(&self, number: P::Number) -> Result { + async fn header_by_number(&self, number: P::Number) -> Result { self.client .header_by_number(number) .await @@ -75,10 +92,7 @@ where .map_err(Into::into) } - async fn header_completion( - &self, - id: HeaderIdOf

, - ) -> Result<(HeaderIdOf

, Option), Self::Error> { + async fn header_completion(&self, id: HeaderIdOf

) -> Result<(HeaderIdOf

, Option), Error> { let hash = id.1; let signed_block = self.client.get_block(Some(hash)).await?; let grandpa_justification = signed_block.justification().cloned(); @@ -86,11 +100,7 @@ where Ok((id, grandpa_justification)) } - async fn header_extra( - &self, - id: HeaderIdOf

, - _header: QueuedHeader

, - ) -> Result<(HeaderIdOf

, ()), Self::Error> { + async fn header_extra(&self, id: HeaderIdOf

, _header: QueuedHeader

) -> Result<(HeaderIdOf

, ()), Error> { Ok((id, ())) } } diff --git a/bridges/relays/substrate/src/headers_maintain.rs b/bridges/relays/substrate/src/headers_maintain.rs index d2996083b95ac..14432487ea308 100644 --- a/bridges/relays/substrate/src/headers_maintain.rs +++ b/bridges/relays/substrate/src/headers_maintain.rs @@ -80,6 +80,20 @@ impl } } +#[async_trait] +impl Clone + for SubstrateHeadersToSubstrateMaintain +{ + fn clone(&self) -> Self { + SubstrateHeadersToSubstrateMaintain { + pipeline: self.pipeline.clone(), + target_client: self.target_client.clone(), + justifications: self.justifications.clone(), + _marker: Default::default(), + } + } +} + #[async_trait] impl SyncMaintain

for SubstrateHeadersToSubstrateMaintain where diff --git a/bridges/relays/substrate/src/headers_target.rs b/bridges/relays/substrate/src/headers_target.rs index e7fe68a5c0112..c962511270805 100644 --- a/bridges/relays/substrate/src/headers_target.rs +++ b/bridges/relays/substrate/src/headers_target.rs @@ -28,7 +28,7 @@ use headers_relay::{ sync_types::{HeaderIdOf, QueuedHeader, SubmittedHeaders}, }; use relay_substrate_client::{Chain, Client, Error as SubstrateError}; -use relay_utils::HeaderId; +use relay_utils::{relay_loop::Client as RelayClient, HeaderId}; use sp_core::Bytes; use sp_runtime::Justification; use std::collections::HashSet; @@ -46,6 +46,24 @@ impl SubstrateHeadersTarget { } } +impl Clone for SubstrateHeadersTarget { + fn clone(&self) -> Self { + SubstrateHeadersTarget { + client: self.client.clone(), + pipeline: self.pipeline.clone(), + } + } +} + +#[async_trait] +impl RelayClient for SubstrateHeadersTarget { + type Error = SubstrateError; + + async fn reconnect(&mut self) -> Result<(), SubstrateError> { + self.client.reconnect().await + } +} + #[async_trait] impl TargetClient

for SubstrateHeadersTarget where @@ -54,9 +72,7 @@ where P::Hash: Decode + Encode, P: SubstrateHeadersSyncPipeline, { - type Error = SubstrateError; - - async fn best_header_id(&self) -> Result, Self::Error> { + async fn best_header_id(&self) -> Result, SubstrateError> { let call = P::BEST_BLOCK_METHOD.into(); let data = Bytes(Vec::new()); @@ -72,7 +88,7 @@ where .map(|(num, hash)| HeaderId(*num, *hash)) } - async fn is_known_header(&self, id: HeaderIdOf

) -> Result<(HeaderIdOf

, bool), Self::Error> { + async fn is_known_header(&self, id: HeaderIdOf

) -> Result<(HeaderIdOf

, bool), SubstrateError> { let call = P::IS_KNOWN_BLOCK_METHOD.into(); let data = Bytes(id.1.encode()); @@ -83,7 +99,10 @@ where Ok((id, is_known_block)) } - async fn submit_headers(&self, mut headers: Vec>) -> SubmittedHeaders, Self::Error> { + async fn submit_headers( + &self, + mut headers: Vec>, + ) -> SubmittedHeaders, SubstrateError> { debug_assert_eq!( headers.len(), 1, @@ -114,7 +133,7 @@ where } } - async fn incomplete_headers_ids(&self) -> Result>, Self::Error> { + async fn incomplete_headers_ids(&self) -> Result>, SubstrateError> { let call = P::INCOMPLETE_HEADERS_METHOD.into(); let data = Bytes(Vec::new()); @@ -133,13 +152,13 @@ where &self, id: HeaderIdOf

, completion: Justification, - ) -> Result, Self::Error> { + ) -> Result, SubstrateError> { let tx = self.pipeline.make_complete_header_transaction(id, completion).await?; self.client.submit_extrinsic(Bytes(tx.encode())).await?; Ok(id) } - async fn requires_extra(&self, header: QueuedHeader

) -> Result<(HeaderIdOf

, bool), Self::Error> { + async fn requires_extra(&self, header: QueuedHeader

) -> Result<(HeaderIdOf

, bool), SubstrateError> { Ok((header.id(), false)) } } diff --git a/bridges/relays/substrate/src/messages_source.rs b/bridges/relays/substrate/src/messages_source.rs index 9376a14ce6f67..b9a8d01a7c258 100644 --- a/bridges/relays/substrate/src/messages_source.rs +++ b/bridges/relays/substrate/src/messages_source.rs @@ -32,7 +32,7 @@ use messages_relay::{ }, }; use relay_substrate_client::{Chain, Client, Error as SubstrateError, HashOf, HeaderIdOf}; -use relay_utils::{BlockNumberBase, HeaderId}; +use relay_utils::{relay_loop::Client as RelayClient, BlockNumberBase, HeaderId}; use sp_core::Bytes; use sp_runtime::{traits::Header as HeaderT, DeserializeOwned}; use sp_trie::StorageProof; @@ -74,6 +74,15 @@ impl Clone for SubstrateMessagesSource } } +#[async_trait] +impl RelayClient for SubstrateMessagesSource { + type Error = SubstrateError; + + async fn reconnect(&mut self) -> Result<(), SubstrateError> { + self.client.reconnect().await + } +} + #[async_trait] impl SourceClient

for SubstrateMessagesSource where @@ -89,15 +98,7 @@ where P::TargetHeaderNumber: Decode, P::TargetHeaderHash: Decode, { - type Error = SubstrateError; - - async fn reconnect(mut self) -> Result { - let new_client = self.client.clone().reconnect().await?; - self.client = new_client; - Ok(self) - } - - async fn state(&self) -> Result, Self::Error> { + async fn state(&self) -> Result, SubstrateError> { read_client_state::<_, P::TargetHeaderHash, P::TargetHeaderNumber>( &self.client, P::BEST_FINALIZED_TARGET_HEADER_ID_AT_SOURCE, @@ -108,7 +109,7 @@ where async fn latest_generated_nonce( &self, id: SourceHeaderIdOf

, - ) -> Result<(SourceHeaderIdOf

, MessageNonce), Self::Error> { + ) -> Result<(SourceHeaderIdOf

, MessageNonce), SubstrateError> { let encoded_response = self .client .state_call( @@ -125,7 +126,7 @@ where async fn latest_confirmed_received_nonce( &self, id: SourceHeaderIdOf

, - ) -> Result<(SourceHeaderIdOf

, MessageNonce), Self::Error> { + ) -> Result<(SourceHeaderIdOf

, MessageNonce), SubstrateError> { let encoded_response = self .client .state_call( @@ -143,7 +144,7 @@ where &self, id: SourceHeaderIdOf

, nonces: RangeInclusive, - ) -> Result { + ) -> Result { let encoded_response = self .client .state_call( @@ -164,7 +165,7 @@ where id: SourceHeaderIdOf

, nonces: RangeInclusive, proof_parameters: MessageProofParameters, - ) -> Result<(SourceHeaderIdOf

, RangeInclusive, P::MessagesProof), Self::Error> { + ) -> Result<(SourceHeaderIdOf

, RangeInclusive, P::MessagesProof), SubstrateError> { let proof = self .client .prove_messages( @@ -183,7 +184,7 @@ where &self, generated_at_block: TargetHeaderIdOf

, proof: P::MessagesReceivingProof, - ) -> Result<(), Self::Error> { + ) -> Result<(), SubstrateError> { let tx = self .lane .make_messages_receiving_proof_transaction(generated_at_block, proof) diff --git a/bridges/relays/substrate/src/messages_target.rs b/bridges/relays/substrate/src/messages_target.rs index 8a16ecdd6edc0..0bc81d9be05a3 100644 --- a/bridges/relays/substrate/src/messages_target.rs +++ b/bridges/relays/substrate/src/messages_target.rs @@ -30,7 +30,7 @@ use messages_relay::{ message_lane_loop::{TargetClient, TargetClientState}, }; use relay_substrate_client::{Chain, Client, Error as SubstrateError, HashOf}; -use relay_utils::BlockNumberBase; +use relay_utils::{relay_loop::Client as RelayClient, BlockNumberBase}; use sp_core::Bytes; use sp_runtime::{traits::Header as HeaderT, DeserializeOwned}; use sp_trie::StorageProof; @@ -70,6 +70,15 @@ impl Clone for SubstrateMessagesTarget } } +#[async_trait] +impl RelayClient for SubstrateMessagesTarget { + type Error = SubstrateError; + + async fn reconnect(&mut self) -> Result<(), SubstrateError> { + self.client.reconnect().await + } +} + #[async_trait] impl TargetClient

for SubstrateMessagesTarget where @@ -85,15 +94,7 @@ where P::SourceHeaderNumber: Decode, P::SourceHeaderHash: Decode, { - type Error = SubstrateError; - - async fn reconnect(mut self) -> Result { - let new_client = self.client.clone().reconnect().await?; - self.client = new_client; - Ok(self) - } - - async fn state(&self) -> Result, Self::Error> { + async fn state(&self) -> Result, SubstrateError> { read_client_state::<_, P::SourceHeaderHash, P::SourceHeaderNumber>( &self.client, P::BEST_FINALIZED_SOURCE_HEADER_ID_AT_TARGET, @@ -104,7 +105,7 @@ where async fn latest_received_nonce( &self, id: TargetHeaderIdOf

, - ) -> Result<(TargetHeaderIdOf

, MessageNonce), Self::Error> { + ) -> Result<(TargetHeaderIdOf

, MessageNonce), SubstrateError> { let encoded_response = self .client .state_call( @@ -121,7 +122,7 @@ where async fn latest_confirmed_received_nonce( &self, id: TargetHeaderIdOf

, - ) -> Result<(TargetHeaderIdOf

, MessageNonce), Self::Error> { + ) -> Result<(TargetHeaderIdOf

, MessageNonce), SubstrateError> { let encoded_response = self .client .state_call( @@ -138,7 +139,7 @@ where async fn unrewarded_relayers_state( &self, id: TargetHeaderIdOf

, - ) -> Result<(TargetHeaderIdOf

, UnrewardedRelayersState), Self::Error> { + ) -> Result<(TargetHeaderIdOf

, UnrewardedRelayersState), SubstrateError> { let encoded_response = self .client .state_call( @@ -155,7 +156,7 @@ where async fn prove_messages_receiving( &self, id: TargetHeaderIdOf

, - ) -> Result<(TargetHeaderIdOf

, P::MessagesReceivingProof), Self::Error> { + ) -> Result<(TargetHeaderIdOf

, P::MessagesReceivingProof), SubstrateError> { let (id, relayers_state) = self.unrewarded_relayers_state(id).await?; let proof = self .client @@ -170,7 +171,7 @@ where generated_at_header: SourceHeaderIdOf

, nonces: RangeInclusive, proof: P::MessagesProof, - ) -> Result, Self::Error> { + ) -> Result, SubstrateError> { let tx = self .lane .make_messages_delivery_transaction(generated_at_header, nonces.clone(), proof) diff --git a/bridges/relays/substrate/src/millau_messages_to_rialto.rs b/bridges/relays/substrate/src/millau_messages_to_rialto.rs index 7fa948b8c4731..1d9239ff37156 100644 --- a/bridges/relays/substrate/src/millau_messages_to_rialto.rs +++ b/bridges/relays/substrate/src/millau_messages_to_rialto.rs @@ -105,7 +105,6 @@ pub fn run( lane_id: LaneId, metrics_params: Option, ) { - let reconnect_delay = Duration::from_secs(10); let stall_timeout = Duration::from_secs(5 * 60); let relayer_id_at_millau = millau_sign.signer.public().as_array_ref().clone().into(); @@ -135,7 +134,7 @@ pub fn run( lane: lane_id, source_tick: Millau::AVERAGE_BLOCK_INTERVAL, target_tick: Rialto::AVERAGE_BLOCK_INTERVAL, - reconnect_delay, + reconnect_delay: relay_utils::relay_loop::RECONNECT_DELAY, stall_timeout, delivery_params: messages_relay::message_lane_loop::MessageDeliveryParams { max_unrewarded_relayer_entries_at_target: bp_rialto::MAX_UNREWARDED_RELAYER_ENTRIES_AT_INBOUND_LANE, diff --git a/bridges/relays/substrate/src/rialto_messages_to_millau.rs b/bridges/relays/substrate/src/rialto_messages_to_millau.rs index ef11f6cecf783..a336637e4daf2 100644 --- a/bridges/relays/substrate/src/rialto_messages_to_millau.rs +++ b/bridges/relays/substrate/src/rialto_messages_to_millau.rs @@ -105,7 +105,6 @@ pub fn run( lane_id: LaneId, metrics_params: Option, ) { - let reconnect_delay = Duration::from_secs(10); let stall_timeout = Duration::from_secs(5 * 60); let relayer_id_at_rialto = rialto_sign.signer.public().as_array_ref().clone().into(); @@ -134,7 +133,7 @@ pub fn run( lane: lane_id, source_tick: Rialto::AVERAGE_BLOCK_INTERVAL, target_tick: Millau::AVERAGE_BLOCK_INTERVAL, - reconnect_delay, + reconnect_delay: relay_utils::relay_loop::RECONNECT_DELAY, stall_timeout, delivery_params: messages_relay::message_lane_loop::MessageDeliveryParams { max_unrewarded_relayer_entries_at_target: bp_millau::MAX_UNREWARDED_RELAYER_ENTRIES_AT_INBOUND_LANE, diff --git a/bridges/relays/utils/Cargo.toml b/bridges/relays/utils/Cargo.toml index 5320e91fd558b..01aa866ee43d1 100644 --- a/bridges/relays/utils/Cargo.toml +++ b/bridges/relays/utils/Cargo.toml @@ -8,6 +8,7 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0" [dependencies] ansi_term = "0.12" async-std = "1.6.5" +async-trait = "0.1.40" backoff = "0.2" env_logger = "0.8.2" futures = "0.3.5" diff --git a/bridges/relays/utils/src/lib.rs b/bridges/relays/utils/src/lib.rs index 2968320ff5c87..3ece57d0a3201 100644 --- a/bridges/relays/utils/src/lib.rs +++ b/bridges/relays/utils/src/lib.rs @@ -29,6 +29,7 @@ pub const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10); pub mod initialize; pub mod metrics; +pub mod relay_loop; /// Block number traits shared by all chains that relay is able to serve. pub trait BlockNumberBase: diff --git a/bridges/relays/utils/src/metrics.rs b/bridges/relays/utils/src/metrics.rs index 0667bdb922a00..f38d1bda3a5d8 100644 --- a/bridges/relays/utils/src/metrics.rs +++ b/bridges/relays/utils/src/metrics.rs @@ -16,6 +16,7 @@ pub use substrate_prometheus_endpoint::{register, Counter, CounterVec, Gauge, GaugeVec, Opts, Registry, F64, U64}; +use async_std::sync::{Arc, Mutex}; use std::net::SocketAddr; use substrate_prometheus_endpoint::init_prometheus; use sysinfo::{ProcessExt, RefreshKind, System, SystemExt}; @@ -36,9 +37,9 @@ pub trait Metrics { } /// Global Prometheus metrics. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct GlobalMetrics { - system: System, + system: Arc>, system_average_load: GaugeVec, process_cpu_usage_percentage: Gauge, process_memory_usage_bytes: Gauge, @@ -110,7 +111,7 @@ impl Metrics for GlobalMetrics { impl Default for GlobalMetrics { fn default() -> Self { GlobalMetrics { - system: System::new_with_specifics(RefreshKind::everything()), + system: Arc::new(Mutex::new(System::new_with_specifics(RefreshKind::everything()))), system_average_load: GaugeVec::new(Opts::new("system_average_load", "System load average"), &["over"]) .expect("metric is static and thus valid; qed"), process_cpu_usage_percentage: Gauge::new("process_cpu_usage_percentage", "Process CPU usage") @@ -126,9 +127,10 @@ impl Default for GlobalMetrics { impl GlobalMetrics { /// Update metrics. - pub fn update(&mut self) { + pub async fn update(&self) { // update system-wide metrics - let load = self.system.get_load_average(); + let mut system = self.system.lock().await; + let load = system.get_load_average(); self.system_average_load.with_label_values(&["1min"]).set(load.one); self.system_average_load.with_label_values(&["5min"]).set(load.five); self.system_average_load.with_label_values(&["15min"]).set(load.fifteen); @@ -139,8 +141,8 @@ impl GlobalMetrics { relay is not supposed to run in such MetricsParamss;\ qed", ); - let is_process_refreshed = self.system.refresh_process(pid); - match (is_process_refreshed, self.system.get_process(pid)) { + let is_process_refreshed = system.refresh_process(pid); + match (is_process_refreshed, system.get_process(pid)) { (true, Some(process_info)) => { let cpu_usage = process_info.cpu_usage() as f64; let memory_usage = process_info.memory() * 1024; diff --git a/bridges/relays/utils/src/relay_loop.rs b/bridges/relays/utils/src/relay_loop.rs new file mode 100644 index 0000000000000..d750358edaa02 --- /dev/null +++ b/bridges/relays/utils/src/relay_loop.rs @@ -0,0 +1,95 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +use crate::{FailedClient, MaybeConnectionError}; + +use async_trait::async_trait; +use std::{fmt::Debug, future::Future, time::Duration}; + +/// Default pause between reconnect attempts. +pub const RECONNECT_DELAY: Duration = Duration::from_secs(10); + +/// Basic blockchain client from relay perspective. +#[async_trait] +pub trait Client: Clone + Send + Sync { + /// Type of error this clients returns. + type Error: Debug + MaybeConnectionError; + + /// Try to reconnect to source node. + async fn reconnect(&mut self) -> Result<(), Self::Error>; +} + +/// Run relay loop. +/// +/// This function represents an outer loop, which in turn calls provided `loop_run` function to do +/// actual job. When `loop_run` returns, this outer loop reconnects to failed client (source, +/// target or both) and calls `loop_run` again. +pub fn run( + reconnect_delay: Duration, + mut source_client: SC, + mut target_client: TC, + loop_run: R, +) where + R: Fn(SC, TC) -> F, + F: Future>, +{ + let mut local_pool = futures::executor::LocalPool::new(); + + local_pool.run_until(async move { + loop { + let result = loop_run(source_client.clone(), target_client.clone()).await; + + match result { + Ok(()) => break, + Err(failed_client) => loop { + async_std::task::sleep(reconnect_delay).await; + if failed_client == FailedClient::Both || failed_client == FailedClient::Source { + match source_client.reconnect().await { + Ok(()) => (), + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect to source client. Going to retry in {}s: {:?}", + reconnect_delay.as_secs(), + error, + ); + continue; + } + } + } + if failed_client == FailedClient::Both || failed_client == FailedClient::Target { + match target_client.reconnect().await { + Ok(()) => (), + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect to target client. Going to retry in {}s: {:?}", + reconnect_delay.as_secs(), + error, + ); + continue; + } + } + } + + break; + }, + } + + log::debug!(target: "bridge", "Restarting relay loop"); + } + }); +}