Skip to content

Commit

Permalink
Extract common part of relay loops (paritytech#660)
Browse files Browse the repository at this point in the history
* extract common parts of relay loops: begin

* merge client impls

* backoff in exchange loop

* reconnect without clone
  • Loading branch information
svyatonik authored and serban300 committed Apr 8, 2024
1 parent 4a03b8c commit cee5300
Show file tree
Hide file tree
Showing 23 changed files with 1,015 additions and 775 deletions.
18 changes: 14 additions & 4 deletions bridges/relays/ethereum-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,32 @@ use jsonrpsee::Client as RpcClient;
/// The client used to interact with an Ethereum node through RPC.
#[derive(Clone)]
pub struct Client {
params: ConnectionParams,
client: RpcClient,
}

impl Client {
/// Create a new Ethereum RPC Client.
pub fn new(params: ConnectionParams) -> Self {
Self {
client: Self::build_client(&params),
params,
}
}

/// Build client to use in connection.
fn build_client(params: &ConnectionParams) -> RpcClient {
let uri = format!("http://{}:{}", params.host, params.port);
let transport = HttpTransportClient::new(&uri);
let raw_client = RawClient::new(transport);
let client: RpcClient = raw_client.into();
raw_client.into()
}

Self { client }
/// Reopen client connection.
pub fn reconnect(&mut self) {
self.client = Self::build_client(&self.params);
}
}

impl Client {
/// Estimate gas usage for the given call.
pub async fn estimate_gas(&self, call_request: CallRequest) -> Result<U256> {
Ok(Ethereum::estimate_gas(&self.client, call_request).await?)
Expand Down
41 changes: 29 additions & 12 deletions bridges/relays/ethereum/src/ethereum_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use relay_rialto_client::{Rialto, SigningParams as RialtoSigningParams};
use relay_substrate_client::{
Chain as SubstrateChain, Client as SubstrateClient, ConnectionParams as SubstrateConnectionParams,
};
use relay_utils::{metrics::MetricsParams, HeaderId};
use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient, HeaderId};
use rialto_runtime::exchange::EthereumTransactionInclusionProof;
use std::{sync::Arc, time::Duration};

Expand Down Expand Up @@ -120,27 +120,36 @@ impl SourceTransaction for EthereumSourceTransaction {
}

/// Ethereum node as transactions proof source.
#[derive(Clone)]
struct EthereumTransactionsSource {
client: EthereumClient,
}

#[async_trait]
impl SourceClient<EthereumToSubstrateExchange> for EthereumTransactionsSource {
impl RelayClient for EthereumTransactionsSource {
type Error = RpcError;

async fn reconnect(&mut self) -> Result<(), RpcError> {
self.client.reconnect();
Ok(())
}
}

#[async_trait]
impl SourceClient<EthereumToSubstrateExchange> for EthereumTransactionsSource {
async fn tick(&self) {
async_std::task::sleep(ETHEREUM_TICK_INTERVAL).await;
}

async fn block_by_hash(&self, hash: H256) -> Result<EthereumSourceBlock, Self::Error> {
async fn block_by_hash(&self, hash: H256) -> Result<EthereumSourceBlock, RpcError> {
self.client
.header_by_hash_with_transactions(hash)
.await
.map(EthereumSourceBlock)
.map_err(Into::into)
}

async fn block_by_number(&self, number: u64) -> Result<EthereumSourceBlock, Self::Error> {
async fn block_by_number(&self, number: u64) -> Result<EthereumSourceBlock, RpcError> {
self.client
.header_by_number_with_transactions(number)
.await
Expand All @@ -151,7 +160,7 @@ impl SourceClient<EthereumToSubstrateExchange> for EthereumTransactionsSource {
async fn transaction_block(
&self,
hash: &EthereumTransactionHash,
) -> Result<Option<(EthereumHeaderId, usize)>, Self::Error> {
) -> Result<Option<(EthereumHeaderId, usize)>, RpcError> {
let eth_tx = match self.client.transaction_by_hash(*hash).await? {
Some(eth_tx) => eth_tx,
None => return Ok(None),
Expand All @@ -173,7 +182,7 @@ impl SourceClient<EthereumToSubstrateExchange> for EthereumTransactionsSource {
&self,
block: &EthereumSourceBlock,
tx_index: usize,
) -> Result<EthereumTransactionInclusionProof, Self::Error> {
) -> Result<EthereumTransactionInclusionProof, RpcError> {
const TRANSACTION_HAS_RAW_FIELD_PROOF: &str = "RPC level checks that transactions from Ethereum\
node are having `raw` field; qed";
const BLOCK_HAS_HASH_FIELD_PROOF: &str = "RPC level checks that block has `hash` field; qed";
Expand All @@ -199,25 +208,33 @@ impl SourceClient<EthereumToSubstrateExchange> for EthereumTransactionsSource {
}

/// Substrate node as transactions proof target.
#[derive(Clone)]
struct SubstrateTransactionsTarget {
client: SubstrateClient<Rialto>,
sign_params: RialtoSigningParams,
bridge_instance: Arc<dyn BridgeInstance>,
}

#[async_trait]
impl TargetClient<EthereumToSubstrateExchange> for SubstrateTransactionsTarget {
impl RelayClient for SubstrateTransactionsTarget {
type Error = RpcError;

async fn reconnect(&mut self) -> Result<(), RpcError> {
Ok(self.client.reconnect().await?)
}
}

#[async_trait]
impl TargetClient<EthereumToSubstrateExchange> for SubstrateTransactionsTarget {
async fn tick(&self) {
async_std::task::sleep(Rialto::AVERAGE_BLOCK_INTERVAL).await;
}

async fn is_header_known(&self, id: &EthereumHeaderId) -> Result<bool, Self::Error> {
async fn is_header_known(&self, id: &EthereumHeaderId) -> Result<bool, RpcError> {
self.client.ethereum_header_known(*id).await
}

async fn is_header_finalized(&self, id: &EthereumHeaderId) -> Result<bool, Self::Error> {
async fn is_header_finalized(&self, id: &EthereumHeaderId) -> Result<bool, RpcError> {
// we check if header is finalized by simple comparison of the header number and
// number of best finalized PoA header known to Substrate node.
//
Expand All @@ -230,11 +247,11 @@ impl TargetClient<EthereumToSubstrateExchange> for SubstrateTransactionsTarget {
Ok(id.0 <= best_finalized_ethereum_block.0)
}

async fn best_finalized_header_id(&self) -> Result<EthereumHeaderId, Self::Error> {
async fn best_finalized_header_id(&self) -> Result<EthereumHeaderId, RpcError> {
self.client.best_ethereum_finalized_block().await
}

async fn filter_transaction_proof(&self, proof: &EthereumTransactionInclusionProof) -> Result<bool, Self::Error> {
async fn filter_transaction_proof(&self, proof: &EthereumTransactionInclusionProof) -> Result<bool, RpcError> {
// let's try to parse transaction locally
let (raw_tx, raw_tx_receipt) = &proof.proof[proof.index as usize];
let parse_result = rialto_runtime::exchange::EthTransaction::parse(raw_tx);
Expand All @@ -253,7 +270,7 @@ impl TargetClient<EthereumToSubstrateExchange> for SubstrateTransactionsTarget {
self.client.verify_exchange_transaction_proof(proof.clone()).await
}

async fn submit_transaction_proof(&self, proof: EthereumTransactionInclusionProof) -> Result<(), Self::Error> {
async fn submit_transaction_proof(&self, proof: EthereumTransactionInclusionProof) -> Result<(), RpcError> {
let (sign_params, bridge_instance) = (self.sign_params.clone(), self.bridge_instance.clone());
self.client
.submit_exchange_transaction_proof(sign_params, bridge_instance, proof)
Expand Down
48 changes: 31 additions & 17 deletions bridges/relays/ethereum/src/ethereum_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use relay_rialto_client::{Rialto, SigningParams as RialtoSigningParams};
use relay_substrate_client::{
Chain as SubstrateChain, Client as SubstrateClient, ConnectionParams as SubstrateConnectionParams,
};
use relay_utils::metrics::MetricsParams;
use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient};

use std::fmt::Debug;
use std::{collections::HashSet, sync::Arc, time::Duration};
Expand Down Expand Up @@ -105,6 +105,7 @@ impl HeadersSyncPipeline for EthereumHeadersSyncPipeline {
pub type QueuedEthereumHeader = QueuedHeader<EthereumHeadersSyncPipeline>;

/// Ethereum client as headers source.
#[derive(Clone)]
struct EthereumHeadersSource {
/// Ethereum node client.
client: EthereumClient,
Expand All @@ -117,44 +118,53 @@ impl EthereumHeadersSource {
}

#[async_trait]
impl SourceClient<EthereumHeadersSyncPipeline> for EthereumHeadersSource {
impl RelayClient for EthereumHeadersSource {
type Error = RpcError;

async fn best_block_number(&self) -> Result<u64, Self::Error> {
async fn reconnect(&mut self) -> Result<(), RpcError> {
self.client.reconnect();
Ok(())
}
}

#[async_trait]
impl SourceClient<EthereumHeadersSyncPipeline> for EthereumHeadersSource {
async fn best_block_number(&self) -> Result<u64, RpcError> {
self.client.best_block_number().await.map_err(Into::into)
}

async fn header_by_hash(&self, hash: HeaderHash) -> Result<Header, Self::Error> {
async fn header_by_hash(&self, hash: HeaderHash) -> Result<Header, RpcError> {
self.client
.header_by_hash(hash)
.await
.map(Into::into)
.map_err(Into::into)
}

async fn header_by_number(&self, number: u64) -> Result<Header, Self::Error> {
async fn header_by_number(&self, number: u64) -> Result<Header, RpcError> {
self.client
.header_by_number(number)
.await
.map(Into::into)
.map_err(Into::into)
}

async fn header_completion(&self, id: EthereumHeaderId) -> Result<(EthereumHeaderId, Option<()>), Self::Error> {
async fn header_completion(&self, id: EthereumHeaderId) -> Result<(EthereumHeaderId, Option<()>), RpcError> {
Ok((id, None))
}

async fn header_extra(
&self,
id: EthereumHeaderId,
header: QueuedEthereumHeader,
) -> Result<(EthereumHeaderId, Vec<Receipt>), Self::Error> {
) -> Result<(EthereumHeaderId, Vec<Receipt>), RpcError> {
self.client
.transaction_receipts(id, header.header().transactions.clone())
.await
}
}

#[derive(Clone)]
struct SubstrateHeadersTarget {
/// Substrate node client.
client: SubstrateClient<Rialto>,
Expand Down Expand Up @@ -183,21 +193,25 @@ impl SubstrateHeadersTarget {
}

#[async_trait]
impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
impl RelayClient for SubstrateHeadersTarget {
type Error = RpcError;

async fn best_header_id(&self) -> Result<EthereumHeaderId, Self::Error> {
async fn reconnect(&mut self) -> Result<(), RpcError> {
Ok(self.client.reconnect().await?)
}
}

#[async_trait]
impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
async fn best_header_id(&self) -> Result<EthereumHeaderId, RpcError> {
self.client.best_ethereum_block().await
}

async fn is_known_header(&self, id: EthereumHeaderId) -> Result<(EthereumHeaderId, bool), Self::Error> {
async fn is_known_header(&self, id: EthereumHeaderId) -> Result<(EthereumHeaderId, bool), RpcError> {
Ok((id, self.client.ethereum_header_known(id).await?))
}

async fn submit_headers(
&self,
headers: Vec<QueuedEthereumHeader>,
) -> SubmittedHeaders<EthereumHeaderId, Self::Error> {
async fn submit_headers(&self, headers: Vec<QueuedEthereumHeader>) -> SubmittedHeaders<EthereumHeaderId, RpcError> {
let (sign_params, bridge_instance, sign_transactions) = (
self.sign_params.clone(),
self.bridge_instance.clone(),
Expand All @@ -208,16 +222,16 @@ impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget {
.await
}

async fn incomplete_headers_ids(&self) -> Result<HashSet<EthereumHeaderId>, Self::Error> {
async fn incomplete_headers_ids(&self) -> Result<HashSet<EthereumHeaderId>, RpcError> {
Ok(HashSet::new())
}

#[allow(clippy::unit_arg)]
async fn complete_header(&self, id: EthereumHeaderId, _completion: ()) -> Result<EthereumHeaderId, Self::Error> {
async fn complete_header(&self, id: EthereumHeaderId, _completion: ()) -> Result<EthereumHeaderId, RpcError> {
Ok(id)
}

async fn requires_extra(&self, header: QueuedEthereumHeader) -> Result<(EthereumHeaderId, bool), Self::Error> {
async fn requires_extra(&self, header: QueuedEthereumHeader) -> Result<(EthereumHeaderId, bool), RpcError> {
// we can minimize number of receipts_check calls by checking header
// logs bloom here, but it may give us false positives (when authorities
// source is contract, we never need any logs)
Expand Down
29 changes: 17 additions & 12 deletions bridges/relays/ethereum/src/substrate_sync_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use relay_substrate_client::{
headers_source::HeadersSource, Chain as SubstrateChain, Client as SubstrateClient,
ConnectionParams as SubstrateConnectionParams,
};
use relay_utils::metrics::MetricsParams;
use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient};
use sp_runtime::Justification;

use std::fmt::Debug;
Expand Down Expand Up @@ -98,6 +98,7 @@ pub type QueuedRialtoHeader = QueuedHeader<SubstrateHeadersSyncPipeline>;
type SubstrateHeadersSource = HeadersSource<Rialto, SubstrateHeadersSyncPipeline>;

/// Ethereum client as Substrate headers target.
#[derive(Clone)]
struct EthereumHeadersTarget {
/// Ethereum node client.
client: EthereumClient,
Expand All @@ -118,38 +119,42 @@ impl EthereumHeadersTarget {
}

#[async_trait]
impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
impl RelayClient for EthereumHeadersTarget {
type Error = RpcError;

async fn best_header_id(&self) -> Result<RialtoHeaderId, Self::Error> {
async fn reconnect(&mut self) -> Result<(), RpcError> {
self.client.reconnect();
Ok(())
}
}

#[async_trait]
impl TargetClient<SubstrateHeadersSyncPipeline> for EthereumHeadersTarget {
async fn best_header_id(&self) -> Result<RialtoHeaderId, RpcError> {
self.client.best_substrate_block(self.contract).await
}

async fn is_known_header(&self, id: RialtoHeaderId) -> Result<(RialtoHeaderId, bool), Self::Error> {
async fn is_known_header(&self, id: RialtoHeaderId) -> Result<(RialtoHeaderId, bool), RpcError> {
self.client.substrate_header_known(self.contract, id).await
}

async fn submit_headers(&self, headers: Vec<QueuedRialtoHeader>) -> SubmittedHeaders<RialtoHeaderId, Self::Error> {
async fn submit_headers(&self, headers: Vec<QueuedRialtoHeader>) -> SubmittedHeaders<RialtoHeaderId, RpcError> {
self.client
.submit_substrate_headers(self.sign_params.clone(), self.contract, headers)
.await
}

async fn incomplete_headers_ids(&self) -> Result<HashSet<RialtoHeaderId>, Self::Error> {
async fn incomplete_headers_ids(&self) -> Result<HashSet<RialtoHeaderId>, RpcError> {
self.client.incomplete_substrate_headers(self.contract).await
}

async fn complete_header(
&self,
id: RialtoHeaderId,
completion: Justification,
) -> Result<RialtoHeaderId, Self::Error> {
async fn complete_header(&self, id: RialtoHeaderId, completion: Justification) -> Result<RialtoHeaderId, RpcError> {
self.client
.complete_substrate_header(self.sign_params.clone(), self.contract, id, completion)
.await
}

async fn requires_extra(&self, header: QueuedRialtoHeader) -> Result<(RialtoHeaderId, bool), Self::Error> {
async fn requires_extra(&self, header: QueuedRialtoHeader) -> Result<(RialtoHeaderId, bool), RpcError> {
Ok((header.header().id(), false))
}
}
Expand Down
Loading

0 comments on commit cee5300

Please sign in to comment.