From e4b43dba748eaeab25904914b5203ad6312fa469 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 1 Mar 2023 13:26:13 +0300 Subject: [PATCH] Fix multiple parachain headers submission for single message delivery (#1916) * switch off parachains relay when we don't need to relay parachains (temp solution) * parachains relay now only works with single parachain * fix usages of parachains relay * revert hacky fix * fixes * fixed Westmint parachain ID * fixed metrics * fixed compilation * fmt * clippy * call -> typed_state_call --- bridges/primitives/chain-westend/src/lib.rs | 2 +- .../src/chains/rialto_parachains_to_millau.rs | 6 - .../rococo_parachains_to_bridge_hub_wococo.rs | 6 - .../chains/westend_parachains_to_millau.rs | 6 - .../wococo_parachains_to_bridge_hub_rococo.rs | 6 - .../relays/bin-substrate/src/cli/bridge.rs | 9 +- .../bin-substrate/src/cli/relay_parachains.rs | 19 +- bridges/relays/client-substrate/src/chain.rs | 2 +- .../relays/client-substrate/src/test_chain.rs | 48 + .../src/on_demand/parachains.rs | 56 +- .../lib-substrate-relay/src/parachains/mod.rs | 3 +- .../src/parachains/source.rs | 42 +- .../src/parachains/target.rs | 94 +- bridges/relays/parachains/Cargo.toml | 1 - bridges/relays/parachains/src/lib.rs | 6 +- .../relays/parachains/src/parachains_loop.rs | 870 ++++++------------ .../parachains/src/parachains_loop_metrics.rs | 43 +- 17 files changed, 418 insertions(+), 801 deletions(-) diff --git a/bridges/primitives/chain-westend/src/lib.rs b/bridges/primitives/chain-westend/src/lib.rs index b6d41ece2868d..74e8895aff906 100644 --- a/bridges/primitives/chain-westend/src/lib.rs +++ b/bridges/primitives/chain-westend/src/lib.rs @@ -101,7 +101,7 @@ pub const WITH_WESTEND_BRIDGE_PARAS_PALLET_NAME: &str = "BridgeWestendParachains pub const MAX_NESTED_PARACHAIN_HEAD_DATA_SIZE: u32 = 128; /// Identifier of Westmint parachain at the Westend relay chain. -pub const WESTMINT_PARACHAIN_ID: u32 = 2000; +pub const WESTMINT_PARACHAIN_ID: u32 = 1000; decl_bridge_finality_runtime_apis!(westend); diff --git a/bridges/relays/bin-substrate/src/chains/rialto_parachains_to_millau.rs b/bridges/relays/bin-substrate/src/chains/rialto_parachains_to_millau.rs index b3471ad1a382f..37eb848fe5f8e 100644 --- a/bridges/relays/bin-substrate/src/chains/rialto_parachains_to_millau.rs +++ b/bridges/relays/bin-substrate/src/chains/rialto_parachains_to_millau.rs @@ -17,7 +17,6 @@ //! Rialto-to-Millau parachains sync entrypoint. use crate::cli::bridge::{CliBridgeBase, MessagesCliBridge, ParachainToRelayHeadersCliBridge}; -use parachains_relay::ParachainsPipeline; use relay_millau_client::Millau; use relay_rialto_client::Rialto; use relay_rialto_parachain_client::RialtoParachain; @@ -29,11 +28,6 @@ use substrate_relay_helper::parachains::{ #[derive(Clone, Debug)] pub struct RialtoParachainsToMillau; -impl ParachainsPipeline for RialtoParachainsToMillau { - type SourceChain = Rialto; - type TargetChain = Millau; -} - impl SubstrateParachainsPipeline for RialtoParachainsToMillau { type SourceParachain = RialtoParachain; type SourceRelayChain = Rialto; diff --git a/bridges/relays/bin-substrate/src/chains/rococo_parachains_to_bridge_hub_wococo.rs b/bridges/relays/bin-substrate/src/chains/rococo_parachains_to_bridge_hub_wococo.rs index 098aed417c662..ba0c10beae424 100644 --- a/bridges/relays/bin-substrate/src/chains/rococo_parachains_to_bridge_hub_wococo.rs +++ b/bridges/relays/bin-substrate/src/chains/rococo_parachains_to_bridge_hub_wococo.rs @@ -18,7 +18,6 @@ use crate::cli::bridge::{CliBridgeBase, MessagesCliBridge, ParachainToRelayHeadersCliBridge}; use bp_polkadot_core::parachains::{ParaHash, ParaHeadsProof, ParaId}; -use parachains_relay::ParachainsPipeline; use relay_substrate_client::{CallOf, HeaderIdOf}; use substrate_relay_helper::parachains::{ SubmitParachainHeadsCallBuilder, SubstrateParachainsPipeline, @@ -28,11 +27,6 @@ use substrate_relay_helper::parachains::{ #[derive(Clone, Debug)] pub struct BridgeHubRococoToBridgeHubWococo; -impl ParachainsPipeline for BridgeHubRococoToBridgeHubWococo { - type SourceChain = relay_rococo_client::Rococo; - type TargetChain = relay_bridge_hub_wococo_client::BridgeHubWococo; -} - impl SubstrateParachainsPipeline for BridgeHubRococoToBridgeHubWococo { type SourceParachain = relay_bridge_hub_rococo_client::BridgeHubRococo; type SourceRelayChain = relay_rococo_client::Rococo; diff --git a/bridges/relays/bin-substrate/src/chains/westend_parachains_to_millau.rs b/bridges/relays/bin-substrate/src/chains/westend_parachains_to_millau.rs index f025f48dcb60a..abd9e6137bb32 100644 --- a/bridges/relays/bin-substrate/src/chains/westend_parachains_to_millau.rs +++ b/bridges/relays/bin-substrate/src/chains/westend_parachains_to_millau.rs @@ -17,7 +17,6 @@ //! Westend-to-Millau parachains sync entrypoint. use crate::cli::bridge::{CliBridgeBase, ParachainToRelayHeadersCliBridge}; -use parachains_relay::ParachainsPipeline; use relay_millau_client::Millau; use relay_westend_client::{Westend, Westmint}; use substrate_relay_helper::parachains::{ @@ -28,11 +27,6 @@ use substrate_relay_helper::parachains::{ #[derive(Clone, Debug)] pub struct WestendParachainsToMillau; -impl ParachainsPipeline for WestendParachainsToMillau { - type SourceChain = Westend; - type TargetChain = Millau; -} - impl SubstrateParachainsPipeline for WestendParachainsToMillau { type SourceParachain = Westmint; type SourceRelayChain = Westend; diff --git a/bridges/relays/bin-substrate/src/chains/wococo_parachains_to_bridge_hub_rococo.rs b/bridges/relays/bin-substrate/src/chains/wococo_parachains_to_bridge_hub_rococo.rs index 683e7705dd6ac..028d8c9e17dba 100644 --- a/bridges/relays/bin-substrate/src/chains/wococo_parachains_to_bridge_hub_rococo.rs +++ b/bridges/relays/bin-substrate/src/chains/wococo_parachains_to_bridge_hub_rococo.rs @@ -18,7 +18,6 @@ use crate::cli::bridge::{CliBridgeBase, MessagesCliBridge, ParachainToRelayHeadersCliBridge}; use bp_polkadot_core::parachains::{ParaHash, ParaHeadsProof, ParaId}; -use parachains_relay::ParachainsPipeline; use relay_substrate_client::{CallOf, HeaderIdOf}; use substrate_relay_helper::parachains::{ SubmitParachainHeadsCallBuilder, SubstrateParachainsPipeline, @@ -28,11 +27,6 @@ use substrate_relay_helper::parachains::{ #[derive(Clone, Debug)] pub struct BridgeHubWococoToBridgeHubRococo; -impl ParachainsPipeline for BridgeHubWococoToBridgeHubRococo { - type SourceChain = relay_wococo_client::Wococo; - type TargetChain = relay_bridge_hub_rococo_client::BridgeHubRococo; -} - impl SubstrateParachainsPipeline for BridgeHubWococoToBridgeHubRococo { type SourceParachain = relay_bridge_hub_wococo_client::BridgeHubWococo; type SourceRelayChain = relay_wococo_client::Wococo; diff --git a/bridges/relays/bin-substrate/src/cli/bridge.rs b/bridges/relays/bin-substrate/src/cli/bridge.rs index 330b498f19c03..208f67c527be6 100644 --- a/bridges/relays/bin-substrate/src/cli/bridge.rs +++ b/bridges/relays/bin-substrate/src/cli/bridge.rs @@ -16,7 +16,6 @@ use crate::cli::CliChain; use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber}; -use parachains_relay::ParachainsPipeline; use relay_substrate_client::{Chain, ChainWithTransactions, Parachain, RelayChain}; use strum::{EnumString, EnumVariantNames}; use substrate_relay_helper::{ @@ -87,10 +86,10 @@ where + RelayChain; /// Finality proofs synchronization pipeline (source parachain -> target). type ParachainFinality: SubstrateParachainsPipeline< - SourceRelayChain = Self::SourceRelay, - SourceParachain = Self::Source, - TargetChain = Self::Target, - > + ParachainsPipeline; + SourceRelayChain = Self::SourceRelay, + SourceParachain = Self::Source, + TargetChain = Self::Target, + >; /// Finality proofs synchronization pipeline (source relay chain -> target). type RelayFinality: SubstrateFinalitySyncPipeline< SourceChain = Self::SourceRelay, diff --git a/bridges/relays/bin-substrate/src/cli/relay_parachains.rs b/bridges/relays/bin-substrate/src/cli/relay_parachains.rs index 1234b3a3309df..4e59dc14733fc 100644 --- a/bridges/relays/bin-substrate/src/cli/relay_parachains.rs +++ b/bridges/relays/bin-substrate/src/cli/relay_parachains.rs @@ -22,20 +22,14 @@ use crate::chains::{ }; use async_std::sync::Mutex; use async_trait::async_trait; -use bp_polkadot_core::parachains::ParaId; -use parachains_relay::parachains_loop::{ - AvailableHeader, ParachainSyncParams, SourceClient, TargetClient, -}; -use relay_substrate_client::{Parachain, ParachainBase}; +use parachains_relay::parachains_loop::{AvailableHeader, SourceClient, TargetClient}; +use relay_substrate_client::Parachain; use relay_utils::metrics::{GlobalMetrics, StandaloneMetric}; use std::sync::Arc; use structopt::StructOpt; use strum::{EnumString, EnumVariantNames, VariantNames}; use substrate_relay_helper::{ - parachains::{ - source::ParachainsSource, target::ParachainsTarget, ParachainsPipelineAdapter, - SubstrateParachainsPipeline, - }, + parachains::{source::ParachainsSource, target::ParachainsTarget, ParachainsPipelineAdapter}, TransactionParams, }; @@ -105,13 +99,6 @@ where parachains_relay::parachains_loop::run( source_client, target_client, - ParachainSyncParams { - parachains: vec![ - ParaId(::SourceParachain::PARACHAIN_ID) - ], - stall_timeout: std::time::Duration::from_secs(60), - strategy: parachains_relay::parachains_loop::ParachainSyncStrategy::Any, - }, metrics_params, futures::future::pending(), ) diff --git a/bridges/relays/client-substrate/src/chain.rs b/bridges/relays/client-substrate/src/chain.rs index bc38d1ec9323e..6ab353921efca 100644 --- a/bridges/relays/client-substrate/src/chain.rs +++ b/bridges/relays/client-substrate/src/chain.rs @@ -72,7 +72,7 @@ pub trait RelayChain: Chain { /// Name of the bridge parachains pallet (used in `construct_runtime` macro call) that is /// deployed at the **bridged** chain. /// - /// We assume that all chains that are bridging with this `ChainWithGrandpa` are using + /// We assume that all chains that are bridging with this `RelayChain` are using /// the same name. const PARACHAINS_FINALITY_PALLET_NAME: &'static str; } diff --git a/bridges/relays/client-substrate/src/test_chain.rs b/bridges/relays/client-substrate/src/test_chain.rs index 9bc6c5ae15cc4..10c31804992a8 100644 --- a/bridges/relays/client-substrate/src/test_chain.rs +++ b/bridges/relays/client-substrate/src/test_chain.rs @@ -68,3 +68,51 @@ impl ChainWithBalances for TestChain { unreachable!() } } + +/// Primitives-level parachain that may be used in tests. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct TestParachainBase; + +impl bp_runtime::Chain for TestParachainBase { + type BlockNumber = u32; + type Hash = sp_core::H256; + type Hasher = sp_runtime::traits::BlakeTwo256; + type Header = sp_runtime::generic::Header; + + type AccountId = u32; + type Balance = u32; + type Index = u32; + type Signature = sp_runtime::testing::TestSignature; + + fn max_extrinsic_size() -> u32 { + unreachable!() + } + + fn max_extrinsic_weight() -> Weight { + unreachable!() + } +} + +impl bp_runtime::Parachain for TestParachainBase { + const PARACHAIN_ID: u32 = 1000; +} + +/// Parachain that may be used in tests. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct TestParachain; + +impl bp_runtime::UnderlyingChainProvider for TestParachain { + type Chain = TestParachainBase; +} + +impl Chain for TestParachain { + const NAME: &'static str = "TestParachain"; + const TOKEN_ID: Option<&'static str> = None; + const BEST_FINALIZED_HEADER_ID_METHOD: &'static str = "TestParachainMethod"; + const AVERAGE_BLOCK_INTERVAL: Duration = Duration::from_millis(0); + + type SignedBlock = sp_runtime::generic::SignedBlock< + sp_runtime::generic::Block, + >; + type Call = (); +} diff --git a/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs b/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs index b1270108c8038..672530de3a58c 100644 --- a/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs +++ b/bridges/relays/lib-substrate-relay/src/on_demand/parachains.rs @@ -36,9 +36,7 @@ use bp_runtime::HeaderIdProvider; use futures::{select, FutureExt}; use num_traits::Zero; use pallet_bridge_parachains::{RelayBlockHash, RelayBlockHasher, RelayBlockNumber}; -use parachains_relay::parachains_loop::{ - AvailableHeader, ParachainSyncParams, SourceClient, TargetClient, -}; +use parachains_relay::parachains_loop::{AvailableHeader, SourceClient, TargetClient}; use relay_substrate_client::{ is_ancient_block, AccountIdOf, AccountKeyPairOf, BlockNumberOf, CallOf, Chain, Client, Error as SubstrateError, HashOf, HeaderIdOf, ParachainBase, @@ -183,7 +181,7 @@ where let mut proved_parachain_block = selected_parachain_block; if proved_relay_block != selected_relay_block { proved_parachain_block = parachains_source - .on_chain_para_head_id(proved_relay_block, para_id) + .on_chain_para_head_id(proved_relay_block) .await? // this could happen e.g. if parachain has been offboarded? .ok_or_else(|| { @@ -209,11 +207,11 @@ where } // and finally - prove parachain head - let (para_proof, para_hashes) = - parachains_source.prove_parachain_heads(proved_relay_block, &[para_id]).await?; + let (para_proof, para_hash) = + parachains_source.prove_parachain_head(proved_relay_block).await?; calls.push(P::SubmitParachainHeadsCallBuilder::build_submit_parachain_heads_call( proved_relay_block, - para_hashes.into_iter().map(|h| (para_id, h)).collect(), + vec![(para_id, para_hash)], para_proof, )); @@ -241,16 +239,14 @@ async fn background_task( let mut relay_state = RelayState::Idle; let mut required_parachain_header_number = Zero::zero(); - let required_para_header_number_ref = Arc::new(Mutex::new(AvailableHeader::Unavailable)); + let required_para_header_ref = Arc::new(Mutex::new(AvailableHeader::Unavailable)); let mut restart_relay = true; let parachains_relay_task = futures::future::Fuse::terminated(); futures::pin_mut!(parachains_relay_task); - let mut parachains_source = ParachainsSource::

::new( - source_relay_client.clone(), - required_para_header_number_ref.clone(), - ); + let mut parachains_source = + ParachainsSource::

::new(source_relay_client.clone(), required_para_header_ref.clone()); let mut parachains_target = ParachainsTarget::

::new(target_client.clone(), target_transaction_params.clone()); @@ -271,13 +267,20 @@ async fn background_task( }, }; - // keep in mind that we are not updating `required_para_header_number_ref` here, because + // keep in mind that we are not updating `required_para_header_ref` here, because // then we'll be submitting all previous headers as well (while required relay headers are // delivered) and we want to avoid that (to reduce cost) - required_parachain_header_number = std::cmp::max( - required_parachain_header_number, - new_required_parachain_header_number, - ); + if new_required_parachain_header_number > required_parachain_header_number { + log::trace!( + target: "bridge", + "[{}] More {} headers required. Going to sync up to the {}", + relay_task_name, + P::SourceParachain::NAME, + new_required_parachain_header_number, + ); + + required_parachain_header_number = new_required_parachain_header_number; + } }, _ = async_std::task::sleep(P::TargetChain::AVERAGE_BLOCK_INTERVAL).fuse() => {}, _ = parachains_relay_task => { @@ -351,7 +354,7 @@ async fn background_task( .await; }, RelayState::RelayingParaHeader(required_para_header) => { - *required_para_header_number_ref.lock().await = + *required_para_header_ref.lock().await = AvailableHeader::Available(required_para_header); }, } @@ -379,11 +382,6 @@ async fn background_task( parachains_relay::parachains_loop::run( parachains_source.clone(), parachains_target.clone(), - ParachainSyncParams { - parachains: vec![P::SourceParachain::PARACHAIN_ID.into()], - stall_timeout: std::time::Duration::from_secs(60), - strategy: parachains_relay::parachains_loop::ParachainSyncStrategy::Any, - }, MetricsParams::disabled(), futures::future::pending(), ) @@ -489,10 +487,7 @@ where source.client().best_finalized_header().await.map_err(map_source_err)?; let best_finalized_relay_block_id = best_finalized_relay_header.id(); let para_header_at_source = source - .on_chain_para_head_id( - best_finalized_relay_block_id, - P::SourceParachain::PARACHAIN_ID.into(), - ) + .on_chain_para_head_id(best_finalized_relay_block_id) .await .map_err(map_source_err)?; @@ -515,10 +510,7 @@ where let para_header_at_relay_header_at_target = if let Some(available_relay_header_at_target) = available_relay_header_at_target { source - .on_chain_para_head_id( - available_relay_header_at_target, - P::SourceParachain::PARACHAIN_ID.into(), - ) + .on_chain_para_head_id(available_relay_header_at_target) .await .map_err(map_source_err)? } else { @@ -669,7 +661,7 @@ impl<'a, P: SubstrateParachainsPipeline> &self, at_relay_block: HeaderIdOf, ) -> Result>, SubstrateError> { - self.1.on_chain_para_head_id(at_relay_block, self.parachain_id()).await + self.1.on_chain_para_head_id(at_relay_block).await } } diff --git a/bridges/relays/lib-substrate-relay/src/parachains/mod.rs b/bridges/relays/lib-substrate-relay/src/parachains/mod.rs index 9852e512f7e9c..722f9b61f9f08 100644 --- a/bridges/relays/lib-substrate-relay/src/parachains/mod.rs +++ b/bridges/relays/lib-substrate-relay/src/parachains/mod.rs @@ -56,7 +56,8 @@ pub struct ParachainsPipelineAdapter { } impl ParachainsPipeline for ParachainsPipelineAdapter

{ - type SourceChain = P::SourceRelayChain; + type SourceParachain = P::SourceParachain; + type SourceRelayChain = P::SourceRelayChain; type TargetChain = P::TargetChain; } diff --git a/bridges/relays/lib-substrate-relay/src/parachains/source.rs b/bridges/relays/lib-substrate-relay/src/parachains/source.rs index 90776902dd6f9..146c5840cd51f 100644 --- a/bridges/relays/lib-substrate-relay/src/parachains/source.rs +++ b/bridges/relays/lib-substrate-relay/src/parachains/source.rs @@ -24,10 +24,7 @@ use bp_parachains::parachain_head_storage_key_at_source; use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId}; use bp_runtime::HeaderIdProvider; use codec::Decode; -use parachains_relay::{ - parachains_loop::{AvailableHeader, SourceClient}, - parachains_loop_metrics::ParachainsLoopMetrics, -}; +use parachains_relay::parachains_loop::{AvailableHeader, SourceClient}; use relay_substrate_client::{ is_ancient_block, Chain, Client, Error as SubstrateError, HeaderIdOf, HeaderOf, ParachainBase, RelayChain, @@ -63,8 +60,8 @@ impl ParachainsSource

{ pub async fn on_chain_para_head_id( &self, at_block: HeaderIdOf, - para_id: ParaId, ) -> Result>, SubstrateError> { + let para_id = ParaId(P::SourceParachain::PARACHAIN_ID); let storage_key = parachain_head_storage_key_at_source(P::SourceRelayChain::PARAS_PALLET_NAME, para_id); let para_head = self.client.raw_storage_value(storage_key, Some(at_block.1)).await?; @@ -104,18 +101,7 @@ where async fn parachain_head( &self, at_block: HeaderIdOf, - metrics: Option<&ParachainsLoopMetrics>, - para_id: ParaId, - ) -> Result, Self::Error> { - // we don't need to support many parachains now - if para_id.0 != P::SourceParachain::PARACHAIN_ID { - return Err(SubstrateError::Custom(format!( - "Parachain id {} is not matching expected {}", - para_id.0, - P::SourceParachain::PARACHAIN_ID, - ))) - } - + ) -> Result>, Self::Error> { // if requested relay header is ancient, then we don't even want to try to read the // parachain head - we simply return `Unavailable` let best_block_number = self.client.best_finalized_header_number().await?; @@ -125,7 +111,7 @@ where // else - try to read head from the source client let mut para_head_id = AvailableHeader::Missing; - if let Some(on_chain_para_head_id) = self.on_chain_para_head_id(at_block, para_id).await? { + if let Some(on_chain_para_head_id) = self.on_chain_para_head_id(at_block).await? { // Never return head that is larger than requested. This way we'll never sync // headers past `max_header_id`. para_head_id = match *self.max_head_id.lock().await { @@ -141,26 +127,14 @@ where } } - if let (Some(metrics), AvailableHeader::Available(para_head_id)) = (metrics, para_head_id) { - metrics.update_best_parachain_block_at_source(para_id, para_head_id.0); - } - - Ok(para_head_id.map(|para_head_id| para_head_id.1)) + Ok(para_head_id) } - async fn prove_parachain_heads( + async fn prove_parachain_head( &self, at_block: HeaderIdOf, - parachains: &[ParaId], - ) -> Result<(ParaHeadsProof, Vec), Self::Error> { + ) -> Result<(ParaHeadsProof, ParaHash), Self::Error> { let parachain = ParaId(P::SourceParachain::PARACHAIN_ID); - if parachains != [parachain] { - return Err(SubstrateError::Custom(format!( - "Trying to prove unexpected parachains {parachains:?}. Expected {parachain:?}", - ))) - } - - let parachain = parachains[0]; let storage_key = parachain_head_storage_key_at_source(P::SourceRelayChain::PARAS_PALLET_NAME, parachain); let parachain_heads_proof = self @@ -190,6 +164,6 @@ where })?; let parachain_head_hash = parachain_head.hash(); - Ok((ParaHeadsProof(parachain_heads_proof), vec![parachain_head_hash])) + Ok((ParaHeadsProof(parachain_heads_proof), parachain_head_hash)) } } diff --git a/bridges/relays/lib-substrate-relay/src/parachains/target.rs b/bridges/relays/lib-substrate-relay/src/parachains/target.rs index 7f6cdf8f9251f..6df7bc0a742a9 100644 --- a/bridges/relays/lib-substrate-relay/src/parachains/target.rs +++ b/bridges/relays/lib-substrate-relay/src/parachains/target.rs @@ -24,18 +24,15 @@ use crate::{ }; use async_trait::async_trait; -use bp_parachains::{BestParaHeadHash, ImportedParaHeadsKeyProvider, ParasInfoKeyProvider}; use bp_polkadot_core::parachains::{ParaHash, ParaHeadsProof, ParaId}; use bp_runtime::HeaderIdProvider; use codec::Decode; -use parachains_relay::{ - parachains_loop::TargetClient, parachains_loop_metrics::ParachainsLoopMetrics, -}; +use parachains_relay::parachains_loop::TargetClient; use relay_substrate_client::{ - AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf, - HeaderIdOf, ParachainBase, RelayChain, TransactionEra, TransactionTracker, UnsignedTransaction, + AccountIdOf, AccountKeyPairOf, Chain, Client, Error as SubstrateError, HeaderIdOf, + ParachainBase, TransactionEra, TransactionTracker, UnsignedTransaction, }; -use relay_utils::{relay_loop::Client as RelayClient, HeaderId}; +use relay_utils::relay_loop::Client as RelayClient; use sp_core::{Bytes, Pair}; /// Substrate client as parachain heads source. @@ -92,93 +89,50 @@ where Ok(best_id) } - async fn best_finalized_source_block( + async fn best_finalized_source_relay_chain_block( &self, at_block: &HeaderIdOf, ) -> Result, Self::Error> { - let encoded_best_finalized_source_block = self - .client - .state_call( + self.client + .typed_state_call::<_, Option>>( P::SourceRelayChain::BEST_FINALIZED_HEADER_ID_METHOD.into(), - Bytes(Vec::new()), + (), Some(at_block.1), ) - .await?; - - Option::, BlockNumberOf>>::decode( - &mut &encoded_best_finalized_source_block.0[..], - ) - .map_err(SubstrateError::ResponseParseFailed)? - .map(Ok) - .unwrap_or(Err(SubstrateError::NoParachainHeadAtTarget( - P::SourceParachain::PARACHAIN_ID, - P::TargetChain::NAME.into(), - ))) + .await? + .map(Ok) + .unwrap_or(Err(SubstrateError::BridgePalletIsNotInitialized)) } async fn parachain_head( &self, at_block: HeaderIdOf, - metrics: Option<&ParachainsLoopMetrics>, - para_id: ParaId, - ) -> Result, Self::Error> { - let best_para_head_hash: Option = self + ) -> Result>, Self::Error> { + let encoded_best_finalized_source_para_block = self .client - .storage_map_value::( - P::SourceRelayChain::PARACHAINS_FINALITY_PALLET_NAME, - ¶_id, + .state_call( + P::SourceParachain::BEST_FINALIZED_HEADER_ID_METHOD.into(), + Bytes(Vec::new()), Some(at_block.1), ) - .await? - .map(|para_info| para_info.best_head_hash); - - if let (Some(metrics), Some(best_para_head_hash)) = (metrics, &best_para_head_hash) { - let imported_para_head_number = self - .client - .storage_double_map_value::( - P::SourceRelayChain::PARACHAINS_FINALITY_PALLET_NAME, - ¶_id, - &best_para_head_hash.head_hash, - Some(at_block.1), - ) - .await - .and_then(|maybe_encoded_head| match maybe_encoded_head { - Some(encoded_head) => encoded_head - .decode_parachain_head_data::() - .map(|head| head.number) - .map(Some) - .map_err(Self::Error::ResponseParseFailed), - None => Ok(None), - }) - .map_err(|e| { - log::error!( - target: "bridge-metrics", - "Failed to read or decode {} parachain header at {}: {:?}. Metric will have obsolete value", - P::SourceParachain::NAME, - P::TargetChain::NAME, - e, - ); - e - }) - .unwrap_or(None); - if let Some(imported_para_head_number) = imported_para_head_number { - metrics.update_best_parachain_block_at_target(para_id, imported_para_head_number); - } - } + .await?; - Ok(best_para_head_hash) + Ok(Option::>::decode( + &mut &encoded_best_finalized_source_para_block.0[..], + ) + .map_err(SubstrateError::ResponseParseFailed)?) } - async fn submit_parachain_heads_proof( + async fn submit_parachain_head_proof( &self, at_relay_block: HeaderIdOf, - updated_parachains: Vec<(ParaId, ParaHash)>, + updated_head_hash: ParaHash, proof: ParaHeadsProof, ) -> Result { let transaction_params = self.transaction_params.clone(); let call = P::SubmitParachainHeadsCallBuilder::build_submit_parachain_heads_call( at_relay_block, - updated_parachains, + vec![(ParaId(P::SourceParachain::PARACHAIN_ID), updated_head_hash)], proof, ); self.client diff --git a/bridges/relays/parachains/Cargo.toml b/bridges/relays/parachains/Cargo.toml index 093fc0b94e3fa..d2938389ca77e 100644 --- a/bridges/relays/parachains/Cargo.toml +++ b/bridges/relays/parachains/Cargo.toml @@ -14,7 +14,6 @@ relay-utils = { path = "../utils" } # Bridge dependencies -bp-parachains = { path = "../../primitives/parachains" } bp-polkadot-core = { path = "../../primitives/polkadot-core" } relay-substrate-client = { path = "../client-substrate" } diff --git a/bridges/relays/parachains/src/lib.rs b/bridges/relays/parachains/src/lib.rs index 94b3ce3ec7669..81ea983a6f76a 100644 --- a/bridges/relays/parachains/src/lib.rs +++ b/bridges/relays/parachains/src/lib.rs @@ -16,7 +16,7 @@ use std::fmt::Debug; -use relay_substrate_client::Chain; +use relay_substrate_client::{Chain, Parachain}; pub mod parachains_loop; pub mod parachains_loop_metrics; @@ -24,7 +24,9 @@ pub mod parachains_loop_metrics; /// Finality proofs synchronization pipeline. pub trait ParachainsPipeline: 'static + Clone + Debug + Send + Sync { /// Relay chain which is storing parachain heads in its `paras` module. - type SourceChain: Chain; + type SourceRelayChain: Chain; + /// Parachain which headers we are syncing here. + type SourceParachain: Parachain; /// Target chain (either relay or para) which wants to know about new parachain heads. type TargetChain: Chain; } diff --git a/bridges/relays/parachains/src/parachains_loop.rs b/bridges/relays/parachains/src/parachains_loop.rs index 7b62e72ca2930..9b9038fd9761a 100644 --- a/bridges/relays/parachains/src/parachains_loop.rs +++ b/bridges/relays/parachains/src/parachains_loop.rs @@ -17,7 +17,6 @@ use crate::{parachains_loop_metrics::ParachainsLoopMetrics, ParachainsPipeline}; use async_trait::async_trait; -use bp_parachains::BestParaHeadHash; use bp_polkadot_core::{ parachains::{ParaHash, ParaHeadsProof, ParaId}, BlockNumber as RelayBlockNumber, @@ -26,44 +25,17 @@ use futures::{ future::{FutureExt, Shared}, poll, select_biased, }; -use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf}; +use relay_substrate_client::{Chain, HeaderIdOf, ParachainBase}; use relay_utils::{ metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient, TrackedTransactionStatus, TransactionTracker, }; -use std::{ - collections::{BTreeMap, BTreeSet}, - future::Future, - pin::Pin, - task::Poll, - time::Duration, -}; - -/// Parachain heads synchronization params. -#[derive(Clone, Debug)] -pub struct ParachainSyncParams { - /// Parachains that we're relaying here. - pub parachains: Vec, - /// Parachain heads update strategy. - pub strategy: ParachainSyncStrategy, - /// Stall timeout. If we have submitted transaction and we see no state updates for this - /// period, we consider our transaction lost. - pub stall_timeout: Duration, -} - -/// Parachain heads update strategy. -#[derive(Clone, Copy, Debug)] -pub enum ParachainSyncStrategy { - /// Update whenever any parachain head is updated. - Any, - /// Wait till all parachain heads are updated. - All, -} +use std::{future::Future, pin::Pin, task::Poll}; /// Parachain header availability at a certain chain. #[derive(Clone, Copy, Debug)] pub enum AvailableHeader { - /// The client refuses to report parachain head at this moment. + /// The client can not report actual parachain head at this moment. /// /// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used. /// This variant must be treated as "we don't want to update parachain head value at the @@ -78,15 +50,20 @@ pub enum AvailableHeader { } impl AvailableHeader { - /// Transform contained value. - pub fn map(self, f: F) -> AvailableHeader - where - F: FnOnce(T) -> U, - { - match self { - AvailableHeader::Unavailable => AvailableHeader::Unavailable, - AvailableHeader::Missing => AvailableHeader::Missing, - AvailableHeader::Available(val) => AvailableHeader::Available(f(val)), + /// Return available header. + pub fn as_available(&self) -> Option<&T> { + match *self { + AvailableHeader::Available(ref header) => Some(header), + _ => None, + } + } +} + +impl From> for AvailableHeader { + fn from(maybe_header: Option) -> AvailableHeader { + match maybe_header { + Some(header) => AvailableHeader::Available(header), + None => AvailableHeader::Missing, } } } @@ -97,27 +74,17 @@ pub trait SourceClient: RelayClient { /// Returns `Ok(true)` if client is in synced state. async fn ensure_synced(&self) -> Result; - /// Get parachain head hash at given block. - /// - /// The implementation may call `ParachainsLoopMetrics::update_best_parachain_block_at_source` - /// on provided `metrics` object to update corresponding metric value. + /// Get parachain head id at given block. async fn parachain_head( &self, - at_block: HeaderIdOf, - metrics: Option<&ParachainsLoopMetrics>, - para_id: ParaId, - ) -> Result, Self::Error>; + at_block: HeaderIdOf, + ) -> Result>, Self::Error>; - /// Get parachain heads proof. - /// - /// The number and order of entries in the resulting parachain head hashes vector must match the - /// number and order of parachains in the `parachains` vector. The incorrect implementation will - /// result in panic. - async fn prove_parachain_heads( + /// Get parachain head proof at given block. + async fn prove_parachain_head( &self, - at_block: HeaderIdOf, - parachains: &[ParaId], - ) -> Result<(ParaHeadsProof, Vec), Self::Error>; + at_block: HeaderIdOf, + ) -> Result<(ParaHeadsProof, ParaHash), Self::Error>; } /// Target client used in parachain heads synchronization loop. @@ -129,28 +96,23 @@ pub trait TargetClient: RelayClient { /// Get best block id. async fn best_block(&self) -> Result, Self::Error>; - /// Get best finalized source block id. - async fn best_finalized_source_block( + /// Get best finalized source relay chain block id. + async fn best_finalized_source_relay_chain_block( &self, at_block: &HeaderIdOf, - ) -> Result, Self::Error>; + ) -> Result, Self::Error>; - /// Get parachain head hash at given block. - /// - /// The implementation may call `ParachainsLoopMetrics::update_best_parachain_block_at_target` - /// on provided `metrics` object to update corresponding metric value. + /// Get parachain head id at given block. async fn parachain_head( &self, at_block: HeaderIdOf, - metrics: Option<&ParachainsLoopMetrics>, - para_id: ParaId, - ) -> Result, Self::Error>; + ) -> Result>, Self::Error>; /// Submit parachain heads proof. - async fn submit_parachain_heads_proof( + async fn submit_parachain_head_proof( &self, - at_source_block: HeaderIdOf, - updated_parachains: Vec<(ParaId, ParaHash)>, + at_source_block: HeaderIdOf, + para_head_hash: ParaHash, proof: ParaHeadsProof, ) -> Result; } @@ -158,19 +120,23 @@ pub trait TargetClient: RelayClient { /// Return prefix that will be used by default to expose Prometheus metrics of the parachains /// sync loop. pub fn metrics_prefix() -> String { - format!("{}_to_{}_Parachains", P::SourceChain::NAME, P::TargetChain::NAME) + format!( + "{}_to_{}_Parachains_{}", + P::SourceRelayChain::NAME, + P::TargetChain::NAME, + P::SourceParachain::PARACHAIN_ID + ) } /// Run parachain heads synchronization. pub async fn run( source_client: impl SourceClient

, target_client: impl TargetClient

, - sync_params: ParachainSyncParams, metrics_params: MetricsParams, exit_signal: impl Future + 'static + Send, ) -> Result<(), relay_utils::Error> where - P::SourceChain: Chain, + P::SourceRelayChain: Chain, { let exit_signal = exit_signal.shared(); relay_utils::relay_loop(source_client, target_client) @@ -179,13 +145,7 @@ where .expose() .await? .run(metrics_prefix::

(), move |source_client, target_client, metrics| { - run_until_connection_lost( - source_client, - target_client, - sync_params.clone(), - metrics, - exit_signal.clone(), - ) + run_until_connection_lost(source_client, target_client, metrics, exit_signal.clone()) }) .await } @@ -194,16 +154,15 @@ where async fn run_until_connection_lost( source_client: impl SourceClient

, target_client: impl TargetClient

, - sync_params: ParachainSyncParams, metrics: Option, exit_signal: impl Future + Send, ) -> Result<(), FailedClient> where - P::SourceChain: Chain, + P::SourceRelayChain: Chain, { let exit_signal = exit_signal.fuse(); let min_block_interval = std::cmp::min( - P::SourceChain::AVERAGE_BLOCK_INTERVAL, + P::SourceRelayChain::AVERAGE_BLOCK_INTERVAL, P::TargetChain::AVERAGE_BLOCK_INTERVAL, ); @@ -232,7 +191,7 @@ where log::warn!( target: "bridge", "{} client is syncing. Won't do anything until it is synced", - P::SourceChain::NAME, + P::SourceRelayChain::NAME, ); continue }, @@ -240,7 +199,7 @@ where log::warn!( target: "bridge", "{} client has failed to return its sync status: {:?}", - P::SourceChain::NAME, + P::SourceRelayChain::NAME, e, ); return Err(FailedClient::Source) @@ -249,33 +208,28 @@ where // if we have active transaction, we'll need to wait until it is mined or dropped let best_target_block = target_client.best_block().await.map_err(|e| { - log::warn!(target: "bridge", "Failed to read best {} block: {:?}", P::SourceChain::NAME, e); + log::warn!(target: "bridge", "Failed to read best {} block: {:?}", P::SourceRelayChain::NAME, e); FailedClient::Target })?; - let heads_at_target = read_heads_at_target( - &target_client, - metrics.as_ref(), - &best_target_block, - &sync_params.parachains, - ) - .await?; + let head_at_target = + read_head_at_target(&target_client, metrics.as_ref(), &best_target_block).await?; // check if our transaction has been mined if let Some(tracker) = submitted_heads_tracker.take() { - match tracker.update(&best_target_block, &heads_at_target).await { - SubmittedHeadsStatus::Waiting(tracker) => { + match tracker.update(&best_target_block, &head_at_target).await { + SubmittedHeadStatus::Waiting(tracker) => { // no news about our transaction and we shall keep waiting submitted_heads_tracker = Some(tracker); continue }, - SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized(_)) => { + SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)) => { // all heads have been updated, we don't need this tracker anymore }, - SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost) => { + SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost) => { log::warn!( target: "bridge", "Parachains synchronization from {} to {} has stalled. Going to restart", - P::SourceChain::NAME, + P::SourceRelayChain::NAME, P::TargetChain::NAME, ); @@ -287,247 +241,201 @@ where // we have no active transaction and may need to update heads, but do we have something for // update? let best_finalized_relay_block = target_client - .best_finalized_source_block(&best_target_block) + .best_finalized_source_relay_chain_block(&best_target_block) .await .map_err(|e| { log::warn!( target: "bridge", "Failed to read best finalized {} block from {}: {:?}", - P::SourceChain::NAME, + P::SourceRelayChain::NAME, P::TargetChain::NAME, e, ); FailedClient::Target })?; - let heads_at_source = read_heads_at_source( - &source_client, - metrics.as_ref(), - &best_finalized_relay_block, - &sync_params.parachains, - ) - .await?; - let updated_ids = select_parachains_to_update::

( - heads_at_source, - heads_at_target, - best_finalized_relay_block, - ); - let is_update_required = is_update_required(&sync_params, &updated_ids); - - log::info!( - target: "bridge", - "Total {} parachains: {}. Up-to-date at {}: {}. Needs update at {}: {}.", - P::SourceChain::NAME, - sync_params.parachains.len(), - P::TargetChain::NAME, - sync_params.parachains.len() - updated_ids.len(), - P::TargetChain::NAME, - updated_ids.len(), - ); + let head_at_source = + read_head_at_source(&source_client, metrics.as_ref(), &best_finalized_relay_block) + .await?; + let is_update_required = is_update_required::

(head_at_source, head_at_target); if is_update_required { - let (heads_proofs, head_hashes) = source_client - .prove_parachain_heads(best_finalized_relay_block, &updated_ids) + let (head_proof, head_hash) = source_client + .prove_parachain_head(best_finalized_relay_block) .await .map_err(|e| { log::warn!( target: "bridge", - "Failed to prove {} parachain heads: {:?}", - P::SourceChain::NAME, + "Failed to prove {} parachain ParaId({}) heads: {:?}", + P::SourceRelayChain::NAME, + P::SourceParachain::PARACHAIN_ID, e, ); FailedClient::Source })?; log::info!( target: "bridge", - "Submitting {} parachain heads update transaction to {}", - P::SourceChain::NAME, + "Submitting {} parachain ParaId({}) head update transaction to {}", + P::SourceRelayChain::NAME, + P::SourceParachain::PARACHAIN_ID, P::TargetChain::NAME, ); - assert_eq!( - head_hashes.len(), - updated_ids.len(), - "Incorrect parachains SourceClient implementation" - ); - let transaction_tracker = target_client - .submit_parachain_heads_proof( - best_finalized_relay_block, - updated_ids.iter().cloned().zip(head_hashes).collect(), - heads_proofs, - ) + .submit_parachain_head_proof(best_finalized_relay_block, head_hash, head_proof) .await .map_err(|e| { log::warn!( target: "bridge", - "Failed to submit {} parachain heads proof to {}: {:?}", - P::SourceChain::NAME, + "Failed to submit {} parachain ParaId({}) heads proof to {}: {:?}", + P::SourceRelayChain::NAME, + P::SourceParachain::PARACHAIN_ID, P::TargetChain::NAME, e, ); FailedClient::Target })?; - submitted_heads_tracker = Some(SubmittedHeadsTracker::

::new( - updated_ids, - best_finalized_relay_block.0, - transaction_tracker, - )); + submitted_heads_tracker = + Some(SubmittedHeadsTracker::

::new(head_at_source, transaction_tracker)); } } } -/// Given heads at source and target clients, returns set of heads that are out of sync. -fn select_parachains_to_update( - heads_at_source: BTreeMap>, - heads_at_target: BTreeMap>, - best_finalized_relay_block: HeaderIdOf, -) -> Vec +/// Returns `true` if we need to submit parachain-head-update transaction. +fn is_update_required( + head_at_source: AvailableHeader>, + head_at_target: Option>, +) -> bool where - P::SourceChain: Chain, + P::SourceRelayChain: Chain, { log::trace!( target: "bridge", - "Selecting {} parachains to update at {} (relay block: {:?}):\n\t\ + "Checking if {} parachain ParaId({}) needs update at {}:\n\t\ At {}: {:?}\n\t\ At {}: {:?}", - P::SourceChain::NAME, + P::SourceRelayChain::NAME, + P::SourceParachain::PARACHAIN_ID, P::TargetChain::NAME, - best_finalized_relay_block, - P::SourceChain::NAME, - heads_at_source, + P::SourceRelayChain::NAME, + head_at_source, P::TargetChain::NAME, - heads_at_target, + head_at_target, ); - heads_at_source - .into_iter() - .zip(heads_at_target.into_iter()) - .filter(|((para, head_at_source), (_, head_at_target))| { - let needs_update = match (head_at_source, head_at_target) { - (AvailableHeader::Unavailable, _) => { - // source client has politely asked us not to update current parachain head - // at the target chain - false - }, - (AvailableHeader::Available(head_at_source), Some(head_at_target)) - if head_at_target.at_relay_block_number < best_finalized_relay_block.0 && - head_at_target.head_hash != *head_at_source => - { - // source client knows head that is better than the head known to the target - // client - true - }, - (AvailableHeader::Available(_), Some(_)) => { - // this is normal case when relay has recently updated heads, when parachain is - // not progressing, or when our source client is still syncing - false - }, - (AvailableHeader::Available(_), None) => { - // parachain is not yet known to the target client. This is true when parachain - // or bridge has been just onboarded/started - true - }, - (AvailableHeader::Missing, Some(_)) => { - // parachain/parathread has been offboarded removed from the system. It needs to - // be propageted to the target client - true - }, - (AvailableHeader::Missing, None) => { - // all's good - parachain is unknown to both clients - false - }, - }; - if needs_update { - log::trace!( - target: "bridge", - "{} parachain {:?} needs update at {}: {:?} vs {:?}", - P::SourceChain::NAME, - para, - P::TargetChain::NAME, - head_at_source, - head_at_target, - ); - } - - needs_update - }) - .map(|((para, _), _)| para) - .collect() -} + let needs_update = match (head_at_source, head_at_target) { + (AvailableHeader::Unavailable, _) => { + // source client has politely asked us not to update current parachain head + // at the target chain + false + }, + (AvailableHeader::Available(head_at_source), Some(head_at_target)) + if head_at_source.number() > head_at_target.number() => + { + // source client knows head that is better than the head known to the target + // client + true + }, + (AvailableHeader::Available(_), Some(_)) => { + // this is normal case when relay has recently updated heads, when parachain is + // not progressing, or when our source client is still syncing + false + }, + (AvailableHeader::Available(_), None) => { + // parachain is not yet known to the target client. This is true when parachain + // or bridge has been just onboarded/started + true + }, + (AvailableHeader::Missing, Some(_)) => { + // parachain/parathread has been offboarded removed from the system. It needs to + // be propageted to the target client + true + }, + (AvailableHeader::Missing, None) => { + // all's good - parachain is unknown to both clients + false + }, + }; -/// Returns true if we need to submit update transactions to the target node. -fn is_update_required(sync_params: &ParachainSyncParams, updated_ids: &[ParaId]) -> bool { - match sync_params.strategy { - ParachainSyncStrategy::All => updated_ids.len() == sync_params.parachains.len(), - ParachainSyncStrategy::Any => !updated_ids.is_empty(), + if needs_update { + log::trace!( + target: "bridge", + "{} parachain ParaId({}) needs update at {}: {:?} vs {:?}", + P::SourceRelayChain::NAME, + P::SourceParachain::PARACHAIN_ID, + P::TargetChain::NAME, + head_at_source, + head_at_target, + ); } + + needs_update } -/// Reads given parachains heads from the source client. -/// -/// Guarantees that the returning map will have an entry for every parachain from `parachains`. -async fn read_heads_at_source( +/// Reads parachain head from the source client. +async fn read_head_at_source( source_client: &impl SourceClient

, metrics: Option<&ParachainsLoopMetrics>, - at_relay_block: &HeaderIdOf, - parachains: &[ParaId], -) -> Result>, FailedClient> { - let mut para_head_hashes = BTreeMap::new(); - for para in parachains { - let para_head = source_client.parachain_head(*at_relay_block, metrics, *para).await; - match para_head { - Ok(para_head) => { - para_head_hashes.insert(*para, para_head); - }, - Err(e) => { - log::warn!( - target: "bridge", - "Failed to read head of {} parachain {:?}: {:?}", - P::SourceChain::NAME, - para, - e, + at_relay_block: &HeaderIdOf, +) -> Result>, FailedClient> { + let para_head = source_client.parachain_head(*at_relay_block).await; + match para_head { + Ok(AvailableHeader::Available(para_head)) => { + if let Some(metrics) = metrics { + metrics.update_best_parachain_block_at_source( + ParaId(P::SourceParachain::PARACHAIN_ID), + para_head.number(), ); - return Err(FailedClient::Source) - }, - } + } + Ok(AvailableHeader::Available(para_head)) + }, + Ok(r) => Ok(r), + Err(e) => { + log::warn!( + target: "bridge", + "Failed to read head of {} parachain ParaId({:?}): {:?}", + P::SourceRelayChain::NAME, + P::SourceParachain::PARACHAIN_ID, + e, + ); + Err(FailedClient::Source) + }, } - Ok(para_head_hashes) } -/// Reads given parachains heads from the source client. -/// -/// Guarantees that the returning map will have an entry for every parachain from `parachains`. -async fn read_heads_at_target( +/// Reads parachain head from the target client. +async fn read_head_at_target( target_client: &impl TargetClient

, metrics: Option<&ParachainsLoopMetrics>, at_block: &HeaderIdOf, - parachains: &[ParaId], -) -> Result>, FailedClient> { - let mut para_best_head_hashes = BTreeMap::new(); - for para in parachains { - let para_best_head = target_client.parachain_head(*at_block, metrics, *para).await; - match para_best_head { - Ok(para_best_head) => { - para_best_head_hashes.insert(*para, para_best_head); - }, - Err(e) => { - log::warn!( - target: "bridge", - "Failed to read head of {} parachain {:?} at {}: {:?}", - P::SourceChain::NAME, - para, - P::TargetChain::NAME, - e, +) -> Result>, FailedClient> { + let para_head_id = target_client.parachain_head(*at_block).await; + match para_head_id { + Ok(Some(para_head_id)) => { + if let Some(metrics) = metrics { + metrics.update_best_parachain_block_at_target( + ParaId(P::SourceParachain::PARACHAIN_ID), + para_head_id.number(), ); - return Err(FailedClient::Target) - }, - } + } + Ok(Some(para_head_id)) + }, + Ok(None) => Ok(None), + Err(e) => { + log::warn!( + target: "bridge", + "Failed to read head of {} parachain ParaId({}) at {}: {:?}", + P::SourceRelayChain::NAME, + P::SourceParachain::PARACHAIN_ID, + P::TargetChain::NAME, + e, + ); + Err(FailedClient::Target) + }, } - Ok(para_best_head_hashes) } /// Submitted heads status. -enum SubmittedHeadsStatus { +enum SubmittedHeadStatus { /// Heads are not yet updated. Waiting(SubmittedHeadsTracker

), /// Heads transaction has either been finalized or lost (i.e. received its "final" status). @@ -551,66 +459,50 @@ type SharedTransactionTracker

= Shared< /// Submitted parachain heads transaction. struct SubmittedHeadsTracker { - /// Ids of parachains which heads were updated in the tracked transaction. - awaiting_update: BTreeSet, - /// Number of relay chain block that has been used to craft parachain heads proof. - relay_block_number: BlockNumberOf, + /// Parachain header id that we have submitted. + submitted_head: AvailableHeader>, /// Future that waits for submitted transaction finality or loss. /// /// It needs to be shared because of `poll` macro and our consuming `update` method. transaction_tracker: SharedTransactionTracker

, } -impl SubmittedHeadsTracker

-where - P::SourceChain: Chain, -{ +impl SubmittedHeadsTracker

{ /// Creates new parachain heads transaction tracker. pub fn new( - awaiting_update: impl IntoIterator, - relay_block_number: BlockNumberOf, + submitted_head: AvailableHeader>, transaction_tracker: impl TransactionTracker> + 'static, ) -> Self { SubmittedHeadsTracker { - awaiting_update: awaiting_update.into_iter().collect(), - relay_block_number, + submitted_head, transaction_tracker: transaction_tracker.wait().fuse().boxed().shared(), } } /// Returns `None` if all submitted parachain heads have been updated. pub async fn update( - mut self, + self, at_target_block: &HeaderIdOf, - heads_at_target: &BTreeMap>, - ) -> SubmittedHeadsStatus

{ - // remove all pending heads that were synced - for (para, best_para_head) in heads_at_target { - if best_para_head - .as_ref() - .map(|best_para_head| { - best_para_head.at_relay_block_number >= self.relay_block_number - }) - .unwrap_or(false) - { - self.awaiting_update.remove(para); - - log::trace!( - target: "bridge", - "Head of parachain {:?} has been updated at {}: {:?}. Outdated parachains remaining: {}", - para, - P::TargetChain::NAME, - best_para_head, - self.awaiting_update.len(), - ); - } - } + head_at_target: &Option>, + ) -> SubmittedHeadStatus

{ + // check if our head has been updated + let is_head_updated = match (self.submitted_head, head_at_target) { + (AvailableHeader::Available(submitted_head), Some(head_at_target)) + if head_at_target.number() >= submitted_head.number() => + true, + (AvailableHeader::Missing, None) => true, + _ => false, + }; + if is_head_updated { + log::trace!( + target: "bridge", + "Head of parachain ParaId({}) has been updated at {}: {:?}", + P::SourceParachain::PARACHAIN_ID, + P::TargetChain::NAME, + head_at_target, + ); - // if we have synced all required heads, we are done - if self.awaiting_update.is_empty() { - return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized( - *at_target_block, - )) + return SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(*at_target_block)) } // if underlying transaction tracker has reported that the transaction is lost, we may @@ -618,16 +510,16 @@ where let transaction_tracker = self.transaction_tracker.clone(); match poll!(transaction_tracker) { Poll::Ready(TrackedTransactionStatus::Lost) => - return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost), + return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost), Poll::Ready(TrackedTransactionStatus::Finalized(_)) => { // so we are here and our transaction is mined+finalized, but some of heads were not // updated => we're considering our loop as stalled - return SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost) + return SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost) }, _ => (), } - SubmittedHeadsStatus::Waiting(self) + SubmittedHeadStatus::Waiting(self) } } @@ -637,18 +529,16 @@ mod tests { use async_std::sync::{Arc, Mutex}; use codec::Encode; use futures::{SinkExt, StreamExt}; - use relay_substrate_client::test_chain::TestChain; + use relay_substrate_client::test_chain::{TestChain, TestParachain}; use relay_utils::{HeaderId, MaybeConnectionError}; use sp_core::H256; - const PARA_ID: u32 = 0; - const PARA_0_HASH: ParaHash = H256([1u8; 32]); - const PARA_1_HASH: ParaHash = H256([2u8; 32]); + const PARA_10_HASH: ParaHash = H256([10u8; 32]); + const PARA_20_HASH: ParaHash = H256([20u8; 32]); #[derive(Clone, Debug)] enum TestError { Error, - MissingParachainHeadProof, } impl MaybeConnectionError for TestError { @@ -661,7 +551,8 @@ mod tests { struct TestParachainsPipeline; impl ParachainsPipeline for TestParachainsPipeline { - type SourceChain = TestChain; + type SourceRelayChain = TestChain; + type SourceParachain = TestParachain; type TargetChain = TestChain; } @@ -688,12 +579,12 @@ mod tests { #[derive(Clone, Debug)] struct TestClientData { source_sync_status: Result, - source_heads: BTreeMap, TestError>>, - source_proofs: BTreeMap, TestError>>, + source_head: Result>, TestError>, + source_proof: Result<(), TestError>, target_best_block: Result, TestError>, target_best_finalized_source_block: Result, TestError>, - target_heads: BTreeMap>, + target_head: Result>, TestError>, target_submit_result: Result<(), TestError>, exit_signal_sender: Option>>, @@ -703,14 +594,12 @@ mod tests { pub fn minimal() -> Self { TestClientData { source_sync_status: Ok(true), - source_heads: vec![(PARA_ID, Ok(AvailableHeader::Available(PARA_0_HASH)))] - .into_iter() - .collect(), - source_proofs: vec![(PARA_ID, Ok(PARA_0_HASH.encode()))].into_iter().collect(), + source_head: Ok(AvailableHeader::Available(HeaderId(0, PARA_20_HASH))), + source_proof: Ok(()), target_best_block: Ok(HeaderId(0, Default::default())), target_best_finalized_source_block: Ok(HeaderId(0, Default::default())), - target_heads: BTreeMap::new(), + target_head: Ok(None), target_submit_result: Ok(()), exit_signal_sender: None, @@ -750,34 +639,17 @@ mod tests { async fn parachain_head( &self, _at_block: HeaderIdOf, - _metrics: Option<&ParachainsLoopMetrics>, - para_id: ParaId, - ) -> Result, TestError> { - match self.data.lock().await.source_heads.get(¶_id.0).cloned() { - Some(result) => result, - None => Ok(AvailableHeader::Missing), - } + ) -> Result>, TestError> { + self.data.lock().await.source_head.clone() } - async fn prove_parachain_heads( + async fn prove_parachain_head( &self, _at_block: HeaderIdOf, - parachains: &[ParaId], - ) -> Result<(ParaHeadsProof, Vec), TestError> { - let mut proofs = Vec::new(); - for para_id in parachains { - proofs.push( - self.data - .lock() - .await - .source_proofs - .get(¶_id.0) - .cloned() - .transpose()? - .ok_or(TestError::MissingParachainHeadProof)?, - ); - } - Ok((ParaHeadsProof(proofs), vec![Default::default(); parachains.len()])) + ) -> Result<(ParaHeadsProof, ParaHash), TestError> { + let head = *self.data.lock().await.source_head.clone()?.as_available().unwrap(); + let proof = (ParaHeadsProof(vec![head.hash().encode()]), head.hash()); + self.data.lock().await.source_proof.clone().map(|_| proof) } } @@ -789,7 +661,7 @@ mod tests { self.data.lock().await.target_best_block.clone() } - async fn best_finalized_source_block( + async fn best_finalized_source_relay_chain_block( &self, _at_block: &HeaderIdOf, ) -> Result, TestError> { @@ -799,16 +671,14 @@ mod tests { async fn parachain_head( &self, _at_block: HeaderIdOf, - _metrics: Option<&ParachainsLoopMetrics>, - para_id: ParaId, - ) -> Result, TestError> { - self.data.lock().await.target_heads.get(¶_id.0).cloned().transpose() + ) -> Result>, TestError> { + self.data.lock().await.target_head.clone() } - async fn submit_parachain_heads_proof( + async fn submit_parachain_head_proof( &self, _at_source_block: HeaderIdOf, - _updated_parachains: Vec<(ParaId, ParaHash)>, + _updated_parachain_head: ParaHash, _proof: ParaHeadsProof, ) -> Result { let mut data = self.data.lock().await; @@ -823,14 +693,6 @@ mod tests { } } - fn default_sync_params() -> ParachainSyncParams { - ParachainSyncParams { - parachains: vec![ParaId(PARA_ID)], - strategy: ParachainSyncStrategy::Any, - stall_timeout: Duration::from_secs(60), - } - } - #[test] fn when_source_client_fails_to_return_sync_state() { let mut test_source_client = TestClientData::minimal(); @@ -840,7 +702,6 @@ mod tests { async_std::task::block_on(run_until_connection_lost( TestClient::from(test_source_client), TestClient::from(TestClientData::minimal()), - default_sync_params(), None, futures::future::pending(), )), @@ -857,7 +718,6 @@ mod tests { async_std::task::block_on(run_until_connection_lost( TestClient::from(TestClientData::minimal()), TestClient::from(test_target_client), - default_sync_params(), None, futures::future::pending(), )), @@ -868,13 +728,12 @@ mod tests { #[test] fn when_target_client_fails_to_read_heads() { let mut test_target_client = TestClientData::minimal(); - test_target_client.target_heads.insert(PARA_ID, Err(TestError::Error)); + test_target_client.target_head = Err(TestError::Error); assert_eq!( async_std::task::block_on(run_until_connection_lost( TestClient::from(TestClientData::minimal()), TestClient::from(test_target_client), - default_sync_params(), None, futures::future::pending(), )), @@ -891,7 +750,6 @@ mod tests { async_std::task::block_on(run_until_connection_lost( TestClient::from(TestClientData::minimal()), TestClient::from(test_target_client), - default_sync_params(), None, futures::future::pending(), )), @@ -902,13 +760,12 @@ mod tests { #[test] fn when_source_client_fails_to_read_heads() { let mut test_source_client = TestClientData::minimal(); - test_source_client.source_heads.insert(PARA_ID, Err(TestError::Error)); + test_source_client.source_head = Err(TestError::Error); assert_eq!( async_std::task::block_on(run_until_connection_lost( TestClient::from(test_source_client), TestClient::from(TestClientData::minimal()), - default_sync_params(), None, futures::future::pending(), )), @@ -919,13 +776,12 @@ mod tests { #[test] fn when_source_client_fails_to_prove_heads() { let mut test_source_client = TestClientData::minimal(); - test_source_client.source_proofs.insert(PARA_ID, Err(TestError::Error)); + test_source_client.source_proof = Err(TestError::Error); assert_eq!( async_std::task::block_on(run_until_connection_lost( TestClient::from(test_source_client), TestClient::from(TestClientData::minimal()), - default_sync_params(), None, futures::future::pending(), )), @@ -942,7 +798,6 @@ mod tests { async_std::task::block_on(run_until_connection_lost( TestClient::from(TestClientData::minimal()), TestClient::from(test_target_client), - default_sync_params(), None, futures::future::pending(), )), @@ -957,7 +812,6 @@ mod tests { async_std::task::block_on(run_until_connection_lost( TestClient::from(TestClientData::minimal()), TestClient::from(TestClientData::with_exit_signal_sender(exit_signal_sender)), - default_sync_params(), None, exit_signal.into_future().map(|(_, _)| ()), )), @@ -965,111 +819,62 @@ mod tests { ); } - const PARA_1_ID: u32 = PARA_ID + 1; - const SOURCE_BLOCK_NUMBER: u32 = 100; - fn test_tx_tracker() -> SubmittedHeadsTracker { SubmittedHeadsTracker::new( - vec![ParaId(PARA_ID), ParaId(PARA_1_ID)], - SOURCE_BLOCK_NUMBER, + AvailableHeader::Available(HeaderId(20, PARA_20_HASH)), TestTransactionTracker(None), ) } - fn all_expected_tracker_heads() -> BTreeMap> { - vec![ - ( - ParaId(PARA_ID), - Some(BestParaHeadHash { - at_relay_block_number: SOURCE_BLOCK_NUMBER, - head_hash: PARA_0_HASH, - }), - ), - ( - ParaId(PARA_1_ID), - Some(BestParaHeadHash { - at_relay_block_number: SOURCE_BLOCK_NUMBER, - head_hash: PARA_0_HASH, - }), - ), - ] - .into_iter() - .collect() - } - - impl From> for Option> { - fn from(status: SubmittedHeadsStatus) -> Option> { + impl From> for Option<()> { + fn from(status: SubmittedHeadStatus) -> Option<()> { match status { - SubmittedHeadsStatus::Waiting(tracker) => Some(tracker.awaiting_update), + SubmittedHeadStatus::Waiting(_) => Some(()), _ => None, } } } #[async_std::test] - async fn tx_tracker_update_when_nothing_is_updated() { + async fn tx_tracker_update_when_head_at_target_has_none_value() { assert_eq!( - Some(test_tx_tracker().awaiting_update), + Some(()), test_tx_tracker() - .update(&HeaderId(0, Default::default()), &vec![].into_iter().collect()) + .update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH))) .await .into(), ); } #[async_std::test] - async fn tx_tracker_update_when_one_of_heads_is_updated_to_previous_value() { + async fn tx_tracker_update_when_head_at_target_has_old_value() { assert_eq!( - Some(test_tx_tracker().awaiting_update), + Some(()), test_tx_tracker() - .update( - &HeaderId(0, Default::default()), - &vec![( - ParaId(PARA_ID), - Some(BestParaHeadHash { - at_relay_block_number: SOURCE_BLOCK_NUMBER - 1, - head_hash: PARA_0_HASH, - }) - )] - .into_iter() - .collect() - ) + .update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH))) .await .into(), ); } #[async_std::test] - async fn tx_tracker_update_when_one_of_heads_is_updated() { - assert_eq!( - Some(vec![ParaId(PARA_1_ID)].into_iter().collect::>()), + async fn tx_tracker_update_when_head_at_target_has_same_value() { + assert!(matches!( test_tx_tracker() - .update( - &HeaderId(0, Default::default()), - &vec![( - ParaId(PARA_ID), - Some(BestParaHeadHash { - at_relay_block_number: SOURCE_BLOCK_NUMBER, - head_hash: PARA_0_HASH, - }) - )] - .into_iter() - .collect() - ) - .await - .into(), - ); + .update(&HeaderId(0, Default::default()), &Some(HeaderId(20, PARA_20_HASH))) + .await, + SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)), + )); } #[async_std::test] - async fn tx_tracker_update_when_all_heads_are_updated() { - assert_eq!( - Option::>::None, + async fn tx_tracker_update_when_head_at_target_has_better_value() { + assert!(matches!( test_tx_tracker() - .update(&HeaderId(0, Default::default()), &all_expected_tracker_heads()) - .await - .into(), - ); + .update(&HeaderId(0, Default::default()), &Some(HeaderId(30, PARA_20_HASH))) + .await, + SubmittedHeadStatus::Final(TrackedTransactionStatus::Finalized(_)), + )); } #[async_std::test] @@ -1079,9 +884,9 @@ mod tests { futures::future::ready(TrackedTransactionStatus::Lost).boxed().shared(); assert!(matches!( tx_tracker - .update(&HeaderId(0, Default::default()), &vec![].into_iter().collect()) + .update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH))) .await, - SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost), + SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost), )); } @@ -1094,162 +899,55 @@ mod tests { .shared(); assert!(matches!( tx_tracker - .update(&HeaderId(0, Default::default()), &vec![].into_iter().collect()) + .update(&HeaderId(0, Default::default()), &Some(HeaderId(10, PARA_10_HASH))) .await, - SubmittedHeadsStatus::Final(TrackedTransactionStatus::Lost), + SubmittedHeadStatus::Final(TrackedTransactionStatus::Lost), )); } - #[async_std::test] - async fn tx_tracker_update_when_tx_is_finalized_and_heads_are_updated() { - let mut tx_tracker = test_tx_tracker(); - tx_tracker.transaction_tracker = - futures::future::ready(TrackedTransactionStatus::Finalized(Default::default())) - .boxed() - .shared(); - assert!(matches!( - tx_tracker - .update(&HeaderId(0, Default::default()), &all_expected_tracker_heads()) - .await, - SubmittedHeadsStatus::Final(TrackedTransactionStatus::Finalized(_)), + #[test] + fn parachain_is_not_updated_if_it_is_unavailable() { + assert!(!is_update_required::(AvailableHeader::Unavailable, None)); + assert!(!is_update_required::( + AvailableHeader::Unavailable, + Some(HeaderId(10, PARA_10_HASH)) )); } #[test] fn parachain_is_not_updated_if_it_is_unknown_to_both_clients() { - assert_eq!( - select_parachains_to_update::( - vec![(ParaId(PARA_ID), AvailableHeader::Missing)].into_iter().collect(), - vec![(ParaId(PARA_ID), None)].into_iter().collect(), - HeaderId(10, Default::default()), - ), - Vec::::new(), - ); - } - - #[test] - fn parachain_is_not_updated_if_it_has_been_updated_at_better_relay_block() { - assert_eq!( - select_parachains_to_update::( - vec![(ParaId(PARA_ID), AvailableHeader::Available(PARA_0_HASH))] - .into_iter() - .collect(), - vec![( - ParaId(PARA_ID), - Some(BestParaHeadHash { at_relay_block_number: 20, head_hash: PARA_1_HASH }) - )] - .into_iter() - .collect(), - HeaderId(10, Default::default()), - ), - Vec::::new(), - ); + assert!(!is_update_required::(AvailableHeader::Missing, None),); } #[test] - fn parachain_is_not_updated_if_hash_is_the_same_at_next_relay_block() { - assert_eq!( - select_parachains_to_update::( - vec![(ParaId(PARA_ID), AvailableHeader::Available(PARA_0_HASH))] - .into_iter() - .collect(), - vec![( - ParaId(PARA_ID), - Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH }) - )] - .into_iter() - .collect(), - HeaderId(10, Default::default()), - ), - Vec::::new(), - ); + fn parachain_is_not_updated_if_target_has_better_head() { + assert!(!is_update_required::( + AvailableHeader::Available(HeaderId(10, Default::default())), + Some(HeaderId(20, Default::default())), + ),); } #[test] fn parachain_is_updated_after_offboarding() { - assert_eq!( - select_parachains_to_update::( - vec![(ParaId(PARA_ID), AvailableHeader::Missing)].into_iter().collect(), - vec![( - ParaId(PARA_ID), - Some(BestParaHeadHash { - at_relay_block_number: 0, - head_hash: Default::default(), - }) - )] - .into_iter() - .collect(), - HeaderId(10, Default::default()), - ), - vec![ParaId(PARA_ID)], - ); + assert!(is_update_required::( + AvailableHeader::Missing, + Some(HeaderId(20, Default::default())), + ),); } #[test] fn parachain_is_updated_after_onboarding() { - assert_eq!( - select_parachains_to_update::( - vec![(ParaId(PARA_ID), AvailableHeader::Available(PARA_0_HASH))] - .into_iter() - .collect(), - vec![(ParaId(PARA_ID), None)].into_iter().collect(), - HeaderId(10, Default::default()), - ), - vec![ParaId(PARA_ID)], - ); + assert!(is_update_required::( + AvailableHeader::Available(HeaderId(30, Default::default())), + None, + ),); } #[test] fn parachain_is_updated_if_newer_head_is_known() { - assert_eq!( - select_parachains_to_update::( - vec![(ParaId(PARA_ID), AvailableHeader::Available(PARA_1_HASH))] - .into_iter() - .collect(), - vec![( - ParaId(PARA_ID), - Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH }) - )] - .into_iter() - .collect(), - HeaderId(10, Default::default()), - ), - vec![ParaId(PARA_ID)], - ); - } - - #[test] - fn parachain_is_not_updated_if_source_head_is_unavailable() { - assert_eq!( - select_parachains_to_update::( - vec![(ParaId(PARA_ID), AvailableHeader::Unavailable)].into_iter().collect(), - vec![( - ParaId(PARA_ID), - Some(BestParaHeadHash { at_relay_block_number: 0, head_hash: PARA_0_HASH }) - )] - .into_iter() - .collect(), - HeaderId(10, Default::default()), - ), - vec![], - ); - } - - #[test] - fn is_update_required_works() { - let mut sync_params = ParachainSyncParams { - parachains: vec![ParaId(PARA_ID), ParaId(PARA_1_ID)], - strategy: ParachainSyncStrategy::Any, - stall_timeout: Duration::from_secs(60), - }; - - assert!(!is_update_required(&sync_params, &[])); - assert!(is_update_required(&sync_params, &[ParaId(PARA_ID)])); - assert!(is_update_required(&sync_params, &[ParaId(PARA_ID), ParaId(PARA_1_ID)])); - - sync_params.strategy = ParachainSyncStrategy::All; - assert!(!is_update_required(&sync_params, &[])); - assert!(!is_update_required(&sync_params, &[ParaId(PARA_ID)])); - assert!(is_update_required(&sync_params, &[ParaId(PARA_ID), ParaId(PARA_1_ID)])); + assert!(is_update_required::( + AvailableHeader::Available(HeaderId(40, Default::default())), + Some(HeaderId(30, Default::default())), + ),); } } diff --git a/bridges/relays/parachains/src/parachains_loop_metrics.rs b/bridges/relays/parachains/src/parachains_loop_metrics.rs index 5df996b4ddd1f..8138a43b3b3dc 100644 --- a/bridges/relays/parachains/src/parachains_loop_metrics.rs +++ b/bridges/relays/parachains/src/parachains_loop_metrics.rs @@ -16,7 +16,7 @@ use bp_polkadot_core::parachains::ParaId; use relay_utils::{ - metrics::{metric_name, register, GaugeVec, Metric, Opts, PrometheusError, Registry, U64}, + metrics::{metric_name, register, Gauge, Metric, PrometheusError, Registry, U64}, UniqueSaturatedInto, }; @@ -24,28 +24,22 @@ use relay_utils::{ #[derive(Clone)] pub struct ParachainsLoopMetrics { /// Best parachains header numbers at the source. - best_source_block_numbers: GaugeVec, + best_source_block_numbers: Gauge, /// Best parachains header numbers at the target. - best_target_block_numbers: GaugeVec, + best_target_block_numbers: Gauge, } impl ParachainsLoopMetrics { /// Create and register parachains loop metrics. pub fn new(prefix: Option<&str>) -> Result { Ok(ParachainsLoopMetrics { - best_source_block_numbers: GaugeVec::new( - Opts::new( - metric_name(prefix, "best_parachain_block_number_at_source"), - "Best parachain block numbers at the source relay chain".to_string(), - ), - &["parachain"], + best_source_block_numbers: Gauge::new( + metric_name(prefix, "best_parachain_block_number_at_source"), + "Best parachain block numbers at the source relay chain".to_string(), )?, - best_target_block_numbers: GaugeVec::new( - Opts::new( - metric_name(prefix, "best_parachain_block_number_at_target"), - "Best parachain block numbers at the target chain".to_string(), - ), - &["parachain"], + best_target_block_numbers: Gauge::new( + metric_name(prefix, "best_parachain_block_number_at_target"), + "Best parachain block numbers at the target chain".to_string(), )?, }) } @@ -57,14 +51,13 @@ impl ParachainsLoopMetrics { block_number: Number, ) { let block_number = block_number.unique_saturated_into(); - let label = parachain_label(¶chain); log::trace!( target: "bridge-metrics", - "Updated value of metric 'best_parachain_block_number_at_source[{}]': {:?}", - label, + "Updated value of metric 'best_parachain_block_number_at_source[{:?}]': {:?}", + parachain, block_number, ); - self.best_source_block_numbers.with_label_values(&[&label]).set(block_number); + self.best_source_block_numbers.set(block_number); } /// Update best block number at target. @@ -74,14 +67,13 @@ impl ParachainsLoopMetrics { block_number: Number, ) { let block_number = block_number.unique_saturated_into(); - let label = parachain_label(¶chain); log::trace!( target: "bridge-metrics", - "Updated value of metric 'best_parachain_block_number_at_target[{}]': {:?}", - label, + "Updated value of metric 'best_parachain_block_number_at_target[{:?}]': {:?}", + parachain, block_number, ); - self.best_target_block_numbers.with_label_values(&[&label]).set(block_number); + self.best_target_block_numbers.set(block_number); } } @@ -92,8 +84,3 @@ impl Metric for ParachainsLoopMetrics { Ok(()) } } - -/// Return metric label for the parachain. -fn parachain_label(parachain: &ParaId) -> String { - format!("para_{}", parachain.0) -}