From 07d101386ce6b8828866dba58beb624eb27deeaf Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Wed, 29 Jun 2022 15:58:19 +0300 Subject: [PATCH] Parachain loop metrics (#1484) * parachain loop metrics * some fixes * mini refactoring * add tests --- bridges/modules/parachains/src/lib.rs | 24 ++++++ bridges/primitives/parachains/src/lib.rs | 18 ++++- bridges/primitives/runtime/src/lib.rs | 32 ++++++++ .../src/parachains/source.rs | 22 +++++- .../src/parachains/target.rs | 51 +++++++++++-- .../relays/parachains/src/parachains_loop.rs | 29 ++++++-- .../parachains/src/parachains_loop_metrics.rs | 74 +++++++++++++++++-- 7 files changed, 228 insertions(+), 22 deletions(-) diff --git a/bridges/modules/parachains/src/lib.rs b/bridges/modules/parachains/src/lib.rs index 46be3866db12e..dc5ad5074b196 100644 --- a/bridges/modules/parachains/src/lib.rs +++ b/bridges/modules/parachains/src/lib.rs @@ -405,6 +405,7 @@ mod tests { use frame_support::{ assert_noop, assert_ok, dispatch::DispatchResultWithPostInfo, + storage::generator::{StorageDoubleMap, StorageMap}, traits::{Get, OnInitialize}, weights::Weight, }; @@ -810,4 +811,27 @@ mod tests { ); }); } + + #[test] + fn storage_keys_computed_properly() { + assert_eq!( + BestParaHeads::::storage_map_final_key(ParaId(42)).to_vec(), + bp_parachains::best_parachain_head_hash_storage_key_at_target("Parachains", ParaId(42)) + .0, + ); + + assert_eq!( + ImportedParaHeads::::storage_double_map_final_key( + ParaId(42), + ParaHash::from([21u8; 32]) + ) + .to_vec(), + bp_parachains::imported_parachain_head_storage_key_at_target( + "Parachains", + ParaId(42), + ParaHash::from([21u8; 32]) + ) + .0, + ); + } } diff --git a/bridges/primitives/parachains/src/lib.rs b/bridges/primitives/parachains/src/lib.rs index 0ed4e11480dad..791d2bd47f11f 100644 --- a/bridges/primitives/parachains/src/lib.rs +++ b/bridges/primitives/parachains/src/lib.rs @@ -56,7 +56,7 @@ pub fn parachain_head_storage_key_at_source( /// Returns runtime storage key of best known parachain head at the target chain. /// /// The head is stored by the `pallet-bridge-parachains` pallet in the `BestParaHeads` map. -pub fn parachain_head_storage_key_at_target( +pub fn best_parachain_head_hash_storage_key_at_target( bridge_parachains_pallet_name: &str, para_id: ParaId, ) -> StorageKey { @@ -66,3 +66,19 @@ pub fn parachain_head_storage_key_at_target( ¶_id.encode(), ) } + +/// Returns runtime storage key of the parachain head with given hash at the target chain. +/// +/// The head is stored by the `pallet-bridge-parachains` pallet in the `ImportedParaHeads` map. +pub fn imported_parachain_head_storage_key_at_target( + bridge_parachains_pallet_name: &str, + para_id: ParaId, + head_hash: ParaHash, +) -> StorageKey { + bp_runtime::storage_double_map_final_key::( + bridge_parachains_pallet_name, + "ImportedParaHeads", + ¶_id.encode(), + &head_hash.encode(), + ) +} diff --git a/bridges/primitives/runtime/src/lib.rs b/bridges/primitives/runtime/src/lib.rs index cba89df726679..4b69bbdb68828 100644 --- a/bridges/primitives/runtime/src/lib.rs +++ b/bridges/primitives/runtime/src/lib.rs @@ -236,6 +236,38 @@ pub fn storage_map_final_key( StorageKey(final_key) } +/// This is a copy of the +/// `frame_support::storage::generator::StorageDoubleMap::storage_double_map_final_key` for maps +/// based on selected hashers. +/// +/// We're using it because to call `storage_double_map_final_key` directly, we need access to the +/// runtime and pallet instance, which (sometimes) is impossible. +pub fn storage_double_map_final_key( + pallet_prefix: &str, + map_name: &str, + key1: &[u8], + key2: &[u8], +) -> StorageKey { + let key1_hashed = H1::hash(key1); + let key2_hashed = H2::hash(key2); + let pallet_prefix_hashed = frame_support::Twox128::hash(pallet_prefix.as_bytes()); + let storage_prefix_hashed = frame_support::Twox128::hash(map_name.as_bytes()); + + let mut final_key = Vec::with_capacity( + pallet_prefix_hashed.len() + + storage_prefix_hashed.len() + + key1_hashed.as_ref().len() + + key2_hashed.as_ref().len(), + ); + + final_key.extend_from_slice(&pallet_prefix_hashed[..]); + final_key.extend_from_slice(&storage_prefix_hashed[..]); + final_key.extend_from_slice(key1_hashed.as_ref()); + final_key.extend_from_slice(key2_hashed.as_ref()); + + StorageKey(final_key) +} + /// This is how a storage key of storage parameter (`parameter_types! { storage Param: bool = false; /// }`) is computed. /// diff --git a/bridges/relays/lib-substrate-relay/src/parachains/source.rs b/bridges/relays/lib-substrate-relay/src/parachains/source.rs index dd239366719fc..c613387e7a258 100644 --- a/bridges/relays/lib-substrate-relay/src/parachains/source.rs +++ b/bridges/relays/lib-substrate-relay/src/parachains/source.rs @@ -23,7 +23,10 @@ use async_trait::async_trait; use bp_parachains::parachain_head_storage_key_at_source; use bp_polkadot_core::parachains::{ParaHash, ParaHead, ParaHeadsProof, ParaId}; use codec::Decode; -use parachains_relay::parachains_loop::{ParaHashAtSource, SourceClient}; +use parachains_relay::{ + parachains_loop::{ParaHashAtSource, SourceClient}, + parachains_loop_metrics::ParachainsLoopMetrics, +}; use relay_substrate_client::{ Chain, Client, Error as SubstrateError, HeaderIdOf, HeaderOf, RelayChain, }; @@ -100,6 +103,7 @@ where async fn parachain_head( &self, at_block: HeaderIdOf, + metrics: Option<&ParachainsLoopMetrics>, para_id: ParaId, ) -> Result { // we don't need to support many parachains now @@ -111,9 +115,11 @@ where ))) } - Ok(match self.on_chain_parachain_header(at_block, para_id).await? { + let mut para_header_number_at_source = None; + let para_hash_at_source = match self.on_chain_parachain_header(at_block, para_id).await? { Some(parachain_header) => { let mut parachain_head = ParaHashAtSource::Some(parachain_header.hash()); + para_header_number_at_source = Some(*parachain_header.number()); // never return head that is larger than requested. This way we'll never sync // headers past `maximal_header_id` if let Some(ref maximal_header_id) = self.maximal_header_id { @@ -125,11 +131,13 @@ where // we don't want this header yet => let's report previously requested // header parachain_head = ParaHashAtSource::Some(maximal_header_id.1); + para_header_number_at_source = Some(maximal_header_id.0); }, Some(_) => (), None => { // on-demand relay has not yet asked us to sync anything let's do that parachain_head = ParaHashAtSource::Unavailable; + para_header_number_at_source = None; }, } } @@ -137,7 +145,15 @@ where parachain_head }, None => ParaHashAtSource::None, - }) + }; + + if let (Some(metrics), Some(para_header_number_at_source)) = + (metrics, para_header_number_at_source) + { + metrics.update_best_parachain_block_at_source(para_id, para_header_number_at_source); + } + + Ok(para_hash_at_source) } async fn prove_parachain_heads( diff --git a/bridges/relays/lib-substrate-relay/src/parachains/target.rs b/bridges/relays/lib-substrate-relay/src/parachains/target.rs index 67d82e1f7075f..ca30629198b5b 100644 --- a/bridges/relays/lib-substrate-relay/src/parachains/target.rs +++ b/bridges/relays/lib-substrate-relay/src/parachains/target.rs @@ -24,13 +24,19 @@ use crate::{ }; use async_trait::async_trait; -use bp_parachains::{parachain_head_storage_key_at_target, BestParaHeadHash}; -use bp_polkadot_core::parachains::{ParaHeadsProof, ParaId}; +use bp_parachains::{ + best_parachain_head_hash_storage_key_at_target, imported_parachain_head_storage_key_at_target, + BestParaHeadHash, +}; +use bp_polkadot_core::parachains::{ParaHead, ParaHeadsProof, ParaId}; use codec::{Decode, Encode}; -use parachains_relay::parachains_loop::TargetClient; +use parachains_relay::{ + parachains_loop::TargetClient, parachains_loop_metrics::ParachainsLoopMetrics, +}; use relay_substrate_client::{ AccountIdOf, AccountKeyPairOf, BlockNumberOf, Chain, Client, Error as SubstrateError, HashOf, - HeaderIdOf, RelayChain, SignParam, TransactionEra, TransactionSignScheme, UnsignedTransaction, + HeaderIdOf, HeaderOf, RelayChain, SignParam, TransactionEra, TransactionSignScheme, + UnsignedTransaction, }; use relay_utils::{relay_loop::Client as RelayClient, HeaderId}; use sp_core::{Bytes, Pair}; @@ -115,15 +121,46 @@ where async fn parachain_head( &self, at_block: HeaderIdOf, + metrics: Option<&ParachainsLoopMetrics>, para_id: ParaId, ) -> Result, Self::Error> { - let storage_key = parachain_head_storage_key_at_target( + let best_para_head_hash_key = best_parachain_head_hash_storage_key_at_target( P::SourceRelayChain::PARACHAINS_FINALITY_PALLET_NAME, para_id, ); - let para_head = self.client.storage_value(storage_key, Some(at_block.1)).await?; + let best_para_head_hash: Option = + self.client.storage_value(best_para_head_hash_key, Some(at_block.1)).await?; + if let (Some(metrics), &Some(ref best_para_head_hash)) = (metrics, &best_para_head_hash) { + let imported_para_head_key = imported_parachain_head_storage_key_at_target( + P::SourceRelayChain::PARACHAINS_FINALITY_PALLET_NAME, + para_id, + best_para_head_hash.head_hash, + ); + let imported_para_header = self + .client + .storage_value::(imported_para_head_key, Some(at_block.1)) + .await? + .and_then(|h| match HeaderOf::::decode(&mut &h.0[..]) { + Ok(header) => Some(header), + Err(e) => { + log::error!( + target: "bridge-metrics", + "Failed to decode {} parachain header at {}: {:?}. Metric will have obsolete value", + P::SourceParachain::NAME, + P::TargetChain::NAME, + e, + ); + + None + }, + }); + if let Some(imported_para_header) = imported_para_header { + metrics + .update_best_parachain_block_at_target(para_id, *imported_para_header.number()); + } + } - Ok(para_head) + Ok(best_para_head_hash) } async fn submit_parachain_heads_proof( diff --git a/bridges/relays/parachains/src/parachains_loop.rs b/bridges/relays/parachains/src/parachains_loop.rs index b1dd74146281c..60627d23e0b15 100644 --- a/bridges/relays/parachains/src/parachains_loop.rs +++ b/bridges/relays/parachains/src/parachains_loop.rs @@ -76,9 +76,13 @@ pub trait SourceClient: RelayClient { async fn ensure_synced(&self) -> Result; /// Get parachain head hash at given block. + /// + /// The implementation may call `ParachainsLoopMetrics::update_best_parachain_block_at_source` + /// on provided `metrics` object to update corresponding metric value. async fn parachain_head( &self, at_block: HeaderIdOf, + metrics: Option<&ParachainsLoopMetrics>, para_id: ParaId, ) -> Result; @@ -103,9 +107,13 @@ pub trait TargetClient: RelayClient { ) -> Result, Self::Error>; /// Get parachain head hash at given block. + /// + /// The implementation may call `ParachainsLoopMetrics::update_best_parachain_block_at_target` + /// on provided `metrics` object to update corresponding metric value. async fn parachain_head( &self, at_block: HeaderIdOf, + metrics: Option<&ParachainsLoopMetrics>, para_id: ParaId, ) -> Result, Self::Error>; @@ -158,7 +166,7 @@ async fn run_until_connection_lost( source_client: impl SourceClient

, target_client: impl TargetClient

, sync_params: ParachainSyncParams, - _metrics: Option, + metrics: Option, exit_signal: impl Future + Send, ) -> Result<(), FailedClient> where @@ -213,9 +221,13 @@ where log::warn!(target: "bridge", "Failed to read best {} block: {:?}", P::SourceChain::NAME, e); FailedClient::Target })?; - let heads_at_target = - read_heads_at_target(&target_client, &best_target_block, &sync_params.parachains) - .await?; + let heads_at_target = read_heads_at_target( + &target_client, + metrics.as_ref(), + &best_target_block, + &sync_params.parachains, + ) + .await?; tx_tracker = tx_tracker.take().and_then(|tx_tracker| tx_tracker.update(&heads_at_target)); if tx_tracker.is_some() { continue @@ -238,6 +250,7 @@ where })?; let heads_at_source = read_heads_at_source( &source_client, + metrics.as_ref(), &best_finalized_relay_block, &sync_params.parachains, ) @@ -398,12 +411,13 @@ fn is_update_required(sync_params: &ParachainSyncParams, updated_ids: &[ParaId]) /// Guarantees that the returning map will have an entry for every parachain from `parachains`. async fn read_heads_at_source( source_client: &impl SourceClient

, + metrics: Option<&ParachainsLoopMetrics>, at_relay_block: &HeaderIdOf, parachains: &[ParaId], ) -> Result, FailedClient> { let mut para_head_hashes = BTreeMap::new(); for para in parachains { - let para_head = source_client.parachain_head(*at_relay_block, *para).await; + let para_head = source_client.parachain_head(*at_relay_block, metrics, *para).await; match para_head { Ok(para_head) => { para_head_hashes.insert(*para, para_head); @@ -428,12 +442,13 @@ async fn read_heads_at_source( /// Guarantees that the returning map will have an entry for every parachain from `parachains`. async fn read_heads_at_target( target_client: &impl TargetClient

, + metrics: Option<&ParachainsLoopMetrics>, at_block: &HeaderIdOf, parachains: &[ParaId], ) -> Result>, FailedClient> { let mut para_best_head_hashes = BTreeMap::new(); for para in parachains { - let para_best_head = target_client.parachain_head(*at_block, *para).await; + let para_best_head = target_client.parachain_head(*at_block, metrics, *para).await; match para_best_head { Ok(para_best_head) => { para_best_head_hashes.insert(*para, para_best_head); @@ -638,6 +653,7 @@ mod tests { async fn parachain_head( &self, _at_block: HeaderIdOf, + _metrics: Option<&ParachainsLoopMetrics>, para_id: ParaId, ) -> Result { match self.data.lock().await.source_heads.get(¶_id.0).cloned() { @@ -684,6 +700,7 @@ mod tests { async fn parachain_head( &self, _at_block: HeaderIdOf, + _metrics: Option<&ParachainsLoopMetrics>, para_id: ParaId, ) -> Result, TestError> { self.data.lock().await.target_heads.get(¶_id.0).cloned().transpose() diff --git a/bridges/relays/parachains/src/parachains_loop_metrics.rs b/bridges/relays/parachains/src/parachains_loop_metrics.rs index a0a99f5c332ab..ff8bace274439 100644 --- a/bridges/relays/parachains/src/parachains_loop_metrics.rs +++ b/bridges/relays/parachains/src/parachains_loop_metrics.rs @@ -14,21 +14,85 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -use relay_utils::metrics::{Metric, PrometheusError, Registry}; +use bp_polkadot_core::parachains::ParaId; +use relay_utils::metrics::{ + metric_name, register, GaugeVec, Metric, Opts, PrometheusError, Registry, U64, +}; /// Parachains sync metrics. #[derive(Clone)] -pub struct ParachainsLoopMetrics; +pub struct ParachainsLoopMetrics { + /// Best parachains header numbers at the source. + best_source_block_numbers: GaugeVec, + /// Best parachains header numbers at the target. + best_target_block_numbers: GaugeVec, +} impl ParachainsLoopMetrics { /// Create and register parachains loop metrics. - pub fn new(_prefix: Option<&str>) -> Result { - Ok(ParachainsLoopMetrics) + pub fn new(prefix: Option<&str>) -> Result { + Ok(ParachainsLoopMetrics { + best_source_block_numbers: GaugeVec::new( + Opts::new( + metric_name(prefix, "best_parachain_block_number_at_source"), + "Best parachain block numbers at the source relay chain".to_string(), + ), + &["parachain"], + )?, + best_target_block_numbers: GaugeVec::new( + Opts::new( + metric_name(prefix, "best_parachain_block_number_at_target"), + "Best parachain block numbers at the target chain".to_string(), + ), + &["parachain"], + )?, + }) + } + + /// Update best block number at source. + pub fn update_best_parachain_block_at_source>( + &self, + parachain: ParaId, + block_number: Number, + ) { + let block_number = block_number.into(); + let label = parachain_label(¶chain); + log::trace!( + target: "bridge-metrics", + "Updated value of metric 'best_parachain_block_number_at_source[{}]': {:?}", + label, + block_number, + ); + self.best_source_block_numbers.with_label_values(&[&label]).set(block_number); + } + + /// Update best block number at target. + pub fn update_best_parachain_block_at_target>( + &self, + parachain: ParaId, + block_number: Number, + ) { + let block_number = block_number.into(); + let label = parachain_label(¶chain); + log::trace!( + target: "bridge-metrics", + "Updated value of metric 'best_parachain_block_number_at_target[{}]': {:?}", + label, + block_number, + ); + self.best_target_block_numbers.with_label_values(&[&label]).set(block_number); } } impl Metric for ParachainsLoopMetrics { - fn register(&self, _registry: &Registry) -> Result<(), PrometheusError> { + fn register(&self, registry: &Registry) -> Result<(), PrometheusError> { + register(self.best_source_block_numbers.clone(), registry)?; + register(self.best_target_block_numbers.clone(), registry)?; Ok(()) } } + +/// Return metric label for the parachain. +fn parachain_label(parachain: &ParaId) -> String { + format!("para_{}", parachain.0) +}