diff --git a/bridges/bin/millau/runtime/src/lib.rs b/bridges/bin/millau/runtime/src/lib.rs index faad5153d8223..42251b94b01b3 100644 --- a/bridges/bin/millau/runtime/src/lib.rs +++ b/bridges/bin/millau/runtime/src/lib.rs @@ -321,7 +321,7 @@ impl pallet_shift_session_manager::Trait for Runtime {} parameter_types! { pub const MaxMessagesToPruneAtOnce: bp_message_lane::MessageNonce = 8; - pub const MaxUnconfirmedMessagesAtInboundLane: bp_message_lane::MessageNonce = 128; + pub const MaxUnconfirmedMessagesAtInboundLane: bp_message_lane::MessageNonce = bp_millau::MAX_UNCONFIRMED_MESSAGES_AT_INBOUND_LANE; } impl pallet_message_lane::Trait for Runtime { diff --git a/bridges/bin/millau/runtime/src/rialto_messages.rs b/bridges/bin/millau/runtime/src/rialto_messages.rs index 0aed3d4bae89b..450d38b6d00fb 100644 --- a/bridges/bin/millau/runtime/src/rialto_messages.rs +++ b/bridges/bin/millau/runtime/src/rialto_messages.rs @@ -23,7 +23,7 @@ use bp_message_lane::{ target_chain::{ProvedMessages, SourceHeaderChain}, InboundLaneData, LaneId, Message, MessageNonce, }; -use bp_runtime::InstanceId; +use bp_runtime::{InstanceId, RIALTO_BRIDGE_INSTANCE}; use bridge_runtime_common::messages::{self, ChainWithMessageLanes, MessageBridge}; use frame_support::{ weights::{Weight, WeightToFeePolynomial}, @@ -75,7 +75,7 @@ pub type FromRialtoMessageDispatch = messages::target::FromBridgedChainMessageDi pub struct WithRialtoMessageBridge; impl MessageBridge for WithRialtoMessageBridge { - const INSTANCE: InstanceId = *b"rlto"; + const INSTANCE: InstanceId = RIALTO_BRIDGE_INSTANCE; const RELAYER_FEE_PERCENT: u32 = 10; diff --git a/bridges/bin/rialto/runtime/src/lib.rs b/bridges/bin/rialto/runtime/src/lib.rs index 1ab9984bec722..ea73fd2d1f520 100644 --- a/bridges/bin/rialto/runtime/src/lib.rs +++ b/bridges/bin/rialto/runtime/src/lib.rs @@ -65,6 +65,7 @@ pub use frame_system::Call as SystemCall; pub use pallet_balances::Call as BalancesCall; pub use pallet_bridge_currency_exchange::Call as BridgeCurrencyExchangeCall; pub use pallet_bridge_eth_poa::Call as BridgeEthPoACall; +pub use pallet_message_lane::Call as MessageLaneCall; pub use pallet_substrate_bridge::Call as BridgeMillauCall; pub use pallet_timestamp::Call as TimestampCall; @@ -428,7 +429,7 @@ impl pallet_shift_session_manager::Trait for Runtime {} parameter_types! { pub const MaxMessagesToPruneAtOnce: bp_message_lane::MessageNonce = 8; - pub const MaxUnconfirmedMessagesAtInboundLane: bp_message_lane::MessageNonce = 128; + pub const MaxUnconfirmedMessagesAtInboundLane: bp_message_lane::MessageNonce = bp_rialto::MAX_UNCONFIRMED_MESSAGES_AT_INBOUND_LANE; } impl pallet_message_lane::Trait for Runtime { diff --git a/bridges/bin/rialto/runtime/src/millau_messages.rs b/bridges/bin/rialto/runtime/src/millau_messages.rs index 45e3984a24a51..97067078e34b5 100644 --- a/bridges/bin/rialto/runtime/src/millau_messages.rs +++ b/bridges/bin/rialto/runtime/src/millau_messages.rs @@ -23,7 +23,7 @@ use bp_message_lane::{ target_chain::{ProvedMessages, SourceHeaderChain}, InboundLaneData, LaneId, Message, MessageNonce, }; -use bp_runtime::InstanceId; +use bp_runtime::{InstanceId, MILLAU_BRIDGE_INSTANCE}; use bridge_runtime_common::messages::{self, ChainWithMessageLanes, MessageBridge}; use frame_support::{ weights::{Weight, WeightToFeePolynomial}, @@ -78,7 +78,7 @@ type FromMillauMessagesProof = messages::target::FromBridgedChainMessagesProof + Into + Ord + + CheckedSub + std::ops::Add + One + Zero; /// Messages proof. - type MessagesProof: Clone; + type MessagesProof: Clone + Send + Sync; /// Messages receiving proof. - type MessagesReceivingProof: Clone; + type MessagesReceivingProof: Clone + Send + Sync; /// Number of the source header. - type SourceHeaderNumber: Clone + Debug + Default + Ord + PartialEq + Into; + type SourceHeaderNumber: Clone + Debug + Ord + PartialEq + Into + Send + Sync; /// Hash of the source header. - type SourceHeaderHash: Clone + Debug + Default + PartialEq; + type SourceHeaderHash: Clone + Debug + Default + PartialEq + Send + Sync; /// Number of the target header. - type TargetHeaderNumber: Clone + Debug + Default + Ord + PartialEq + Into; + type TargetHeaderNumber: Clone + Debug + Ord + PartialEq + Into + Send + Sync; /// Hash of the target header. - type TargetHeaderHash: Clone + Debug + Default + PartialEq; + type TargetHeaderHash: Clone + Debug + Default + PartialEq + Send + Sync; } /// Source header id within given one-way message lane. diff --git a/bridges/relays/messages-relay/src/message_lane_loop.rs b/bridges/relays/messages-relay/src/message_lane_loop.rs index b2eb7d2820bf9..a9333509500e6 100644 --- a/bridges/relays/messages-relay/src/message_lane_loop.rs +++ b/bridges/relays/messages-relay/src/message_lane_loop.rs @@ -24,9 +24,6 @@ //! finalized header. I.e. when talking about headers in lane context, we //! only care about finalized headers. -// Until there'll be actual message-lane in the runtime. -#![allow(dead_code)] - use crate::message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}; use crate::message_race_delivery::run as run_message_delivery_race; use crate::message_race_receiving::run as run_message_receiving_race; @@ -42,14 +39,33 @@ use relay_utils::{ }; use std::{fmt::Debug, future::Future, ops::RangeInclusive, time::Duration}; +/// Message lane loop configuration params. +#[derive(Debug, Clone)] +pub struct Params { + /// Id of lane this loop is servicing. + pub lane: LaneId, + /// Interval at which we ask target node about its updates. + pub source_tick: Duration, + /// Interval at which we ask target node about its updates. + pub target_tick: Duration, + /// Delay between moments when connection error happens and our reconnect attempt. + pub reconnect_delay: Duration, + /// The loop will auto-restart if there has been no updates during this period. + pub stall_timeout: Duration, + /// Message delivery race will stop delivering messages if there are `max_unconfirmed_nonces_at_target` + /// unconfirmed nonces on the target node. The race would continue once they're confirmed by the + /// receiving race. + pub max_unconfirmed_nonces_at_target: MessageNonce, +} + /// Source client trait. -#[async_trait(?Send)] -pub trait SourceClient: Clone { +#[async_trait] +pub trait SourceClient: Clone + Send + Sync { /// Type of error this clients returns. type Error: std::fmt::Debug + MaybeConnectionError; /// Try to reconnect to source node. - fn reconnect(self) -> Self; + async fn reconnect(self) -> Result; /// Returns state of the client. async fn state(&self) -> Result, Self::Error>; @@ -70,6 +86,7 @@ pub trait SourceClient: Clone { &self, id: SourceHeaderIdOf

, nonces: RangeInclusive, + include_outbound_lane_state: bool, ) -> Result<(SourceHeaderIdOf

, RangeInclusive, P::MessagesProof), Self::Error>; /// Submit messages receiving proof. @@ -77,17 +94,17 @@ pub trait SourceClient: Clone { &self, generated_at_block: TargetHeaderIdOf

, proof: P::MessagesReceivingProof, - ) -> Result, Self::Error>; + ) -> Result<(), Self::Error>; } /// Target client trait. -#[async_trait(?Send)] -pub trait TargetClient: Clone { +#[async_trait] +pub trait TargetClient: Clone + Send + Sync { /// Type of error this clients returns. type Error: std::fmt::Debug + MaybeConnectionError; /// Try to reconnect to source node. - fn reconnect(self) -> Self; + async fn reconnect(self) -> Result; /// Returns state of the client. async fn state(&self) -> Result, Self::Error>; @@ -98,6 +115,12 @@ pub trait TargetClient: Clone { id: TargetHeaderIdOf

, ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error>; + /// Get nonce of latest confirmed message. + async fn latest_confirmed_received_nonce( + &self, + id: TargetHeaderIdOf

, + ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error>; + /// Prove messages receiving at given block. async fn prove_messages_receiving( &self, @@ -138,15 +161,10 @@ pub struct ClientsState { } /// Run message lane service loop. -#[allow(clippy::too_many_arguments)] pub fn run( - lane: LaneId, + params: Params, mut source_client: impl SourceClient

, - source_tick: Duration, mut target_client: impl TargetClient

, - target_tick: Duration, - reconnect_delay: Duration, - stall_timeout: Duration, metrics_params: Option, exit_signal: impl Future, ) { @@ -162,7 +180,7 @@ pub fn run( "{}_to_{}_MessageLoop/{}", P::SOURCE_NAME, P::TARGET_NAME, - hex::encode(lane) + hex::encode(params.lane) ), metrics_params, &metrics_global, @@ -171,11 +189,9 @@ pub fn run( loop { let result = run_until_connection_lost( + params.clone(), source_client.clone(), - source_tick, target_client.clone(), - target_tick, - stall_timeout, if metrics_enabled { Some(&mut metrics_global) } else { @@ -192,15 +208,41 @@ pub fn run( match result { Ok(()) => break, - Err(failed_client) => { - async_std::task::sleep(reconnect_delay).await; + Err(failed_client) => loop { + async_std::task::sleep(params.reconnect_delay).await; if failed_client == FailedClient::Both || failed_client == FailedClient::Source { - source_client = source_client.reconnect(); + source_client = match source_client.clone().reconnect().await { + Ok(source_client) => source_client, + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect {}. Going to retry in {}s: {:?}", + P::SOURCE_NAME, + params.reconnect_delay.as_secs(), + error, + ); + continue; + } + } } if failed_client == FailedClient::Both || failed_client == FailedClient::Target { - target_client = target_client.reconnect(); + target_client = match target_client.clone().reconnect().await { + Ok(target_client) => target_client, + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect {}. Going to retry in {}s: {:?}", + P::TARGET_NAME, + params.reconnect_delay.as_secs(), + error, + ); + continue; + } + } } - } + + break; + }, } log::debug!( @@ -214,13 +256,10 @@ pub fn run( } /// Run one-way message delivery loop until connection with target or source node is lost, or exit signal is received. -#[allow(clippy::too_many_arguments)] async fn run_until_connection_lost, TC: TargetClient

>( + params: Params, source_client: SC, - source_tick: Duration, target_client: TC, - target_tick: Duration, - stall_timeout: Duration, mut metrics_global: Option<&mut GlobalMetrics>, metrics_msg: Option, exit_signal: impl Future, @@ -230,14 +269,14 @@ async fn run_until_connection_lost, TC: Targ let mut source_state_required = true; let source_state = source_client.state().fuse(); let source_go_offline_future = futures::future::Fuse::terminated(); - let source_tick_stream = interval(source_tick).fuse(); + let source_tick_stream = interval(params.source_tick).fuse(); let mut target_retry_backoff = retry_backoff(); let mut target_client_is_online = false; let mut target_state_required = true; let target_state = target_client.state().fuse(); let target_go_offline_future = futures::future::Fuse::terminated(); - let target_tick_stream = interval(target_tick).fuse(); + let target_tick_stream = interval(params.target_tick).fuse(); let ( (delivery_source_state_sender, delivery_source_state_receiver), @@ -248,8 +287,9 @@ async fn run_until_connection_lost, TC: Targ delivery_source_state_receiver, target_client.clone(), delivery_target_state_receiver, - stall_timeout, + params.stall_timeout, metrics_msg.clone(), + params.max_unconfirmed_nonces_at_target, ) .fuse(); @@ -262,7 +302,7 @@ async fn run_until_connection_lost, TC: Targ receiving_source_state_receiver, target_client.clone(), receiving_target_state_receiver, - stall_timeout, + params.stall_timeout, metrics_msg.clone(), ) .fuse(); @@ -395,7 +435,7 @@ pub(crate) mod tests { } pub type TestMessageNonce = u64; - pub type TestMessagesProof = RangeInclusive; + pub type TestMessagesProof = (RangeInclusive, Option); pub type TestMessagesReceivingProof = TestMessageNonce; pub type TestSourceHeaderNumber = u64; @@ -405,20 +445,15 @@ pub(crate) mod tests { pub type TestTargetHeaderHash = u64; #[derive(Debug)] - pub enum TestError { - Logic, - Connection, - } + pub struct TestError; impl MaybeConnectionError for TestError { fn is_connection_error(&self) -> bool { - match *self { - TestError::Logic => false, - TestError::Connection => true, - } + true } } + #[derive(Clone)] pub struct TestMessageLane; impl MessageLane for TestMessageLane { @@ -449,33 +484,34 @@ pub(crate) mod tests { is_target_reconnected: bool, target_state: SourceClientState, target_latest_received_nonce: TestMessageNonce, + target_latest_confirmed_received_nonce: TestMessageNonce, submitted_messages_proofs: Vec, } #[derive(Clone)] pub struct TestSourceClient { data: Arc>, - tick: Arc, + tick: Arc, } - #[async_trait(?Send)] + #[async_trait] impl SourceClient for TestSourceClient { type Error = TestError; - fn reconnect(self) -> Self { + async fn reconnect(self) -> Result { { let mut data = self.data.lock(); (self.tick)(&mut *data); data.is_source_reconnected = true; } - self + Ok(self) } async fn state(&self) -> Result, Self::Error> { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_source_fails { - return Err(TestError::Connection); + return Err(TestError); } Ok(data.source_state.clone()) } @@ -487,7 +523,7 @@ pub(crate) mod tests { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_source_fails { - return Err(TestError::Connection); + return Err(TestError); } Ok((id, data.source_latest_generated_nonce)) } @@ -505,6 +541,7 @@ pub(crate) mod tests { &self, id: SourceHeaderIdOf, nonces: RangeInclusive, + include_outbound_lane_state: bool, ) -> Result< ( SourceHeaderIdOf, @@ -513,46 +550,59 @@ pub(crate) mod tests { ), Self::Error, > { - Ok((id, nonces.clone(), nonces)) + let mut data = self.data.lock(); + (self.tick)(&mut *data); + Ok(( + id, + nonces.clone(), + ( + nonces, + if include_outbound_lane_state { + Some(data.source_latest_confirmed_received_nonce) + } else { + None + }, + ), + )) } async fn submit_messages_receiving_proof( &self, _generated_at_block: TargetHeaderIdOf, proof: TestMessagesReceivingProof, - ) -> Result, Self::Error> { + ) -> Result<(), Self::Error> { let mut data = self.data.lock(); (self.tick)(&mut *data); data.submitted_messages_receiving_proofs.push(proof); data.source_latest_confirmed_received_nonce = proof; - Ok(proof..=proof) + Ok(()) } } #[derive(Clone)] pub struct TestTargetClient { data: Arc>, - tick: Arc, + tick: Arc, } - #[async_trait(?Send)] + #[async_trait] impl TargetClient for TestTargetClient { type Error = TestError; - fn reconnect(self) -> Self { + async fn reconnect(self) -> Result { { let mut data = self.data.lock(); (self.tick)(&mut *data); data.is_target_reconnected = true; } - self + Ok(self) } async fn state(&self) -> Result, Self::Error> { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_target_fails { - return Err(TestError::Connection); + return Err(TestError); } Ok(data.target_state.clone()) } @@ -564,11 +614,23 @@ pub(crate) mod tests { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_target_fails { - return Err(TestError::Connection); + return Err(TestError); } Ok((id, data.target_latest_received_nonce)) } + async fn latest_confirmed_received_nonce( + &self, + id: TargetHeaderIdOf, + ) -> Result<(TargetHeaderIdOf, TestMessageNonce), Self::Error> { + let mut data = self.data.lock(); + (self.tick)(&mut *data); + if data.is_target_fails { + return Err(TestError); + } + Ok((id, data.target_latest_confirmed_received_nonce)) + } + async fn prove_messages_receiving( &self, id: TargetHeaderIdOf, @@ -585,11 +647,14 @@ pub(crate) mod tests { let mut data = self.data.lock(); (self.tick)(&mut *data); if data.is_target_fails { - return Err(TestError::Connection); + return Err(TestError); } data.target_state.best_self = HeaderId(data.target_state.best_self.0 + 1, data.target_state.best_self.1 + 1); - data.target_latest_received_nonce = *proof.end(); + data.target_latest_received_nonce = *proof.0.end(); + if let Some(target_latest_confirmed_received_nonce) = proof.1 { + data.target_latest_confirmed_received_nonce = target_latest_confirmed_received_nonce; + } data.submitted_messages_proofs.push(proof); Ok(nonces) } @@ -597,8 +662,8 @@ pub(crate) mod tests { fn run_loop_test( data: TestClientData, - source_tick: Arc, - target_tick: Arc, + source_tick: Arc, + target_tick: Arc, exit_signal: impl Future, ) -> TestClientData { async_std::task::block_on(async { @@ -613,17 +678,19 @@ pub(crate) mod tests { tick: target_tick, }; run( - [0, 0, 0, 0], + Params { + lane: [0, 0, 0, 0], + source_tick: Duration::from_millis(100), + target_tick: Duration::from_millis(100), + reconnect_delay: Duration::from_millis(0), + stall_timeout: Duration::from_millis(60), + max_unconfirmed_nonces_at_target: 100, + }, source_client, - Duration::from_millis(100), target_client, - Duration::from_millis(100), - Duration::from_millis(0), - Duration::from_secs(60), None, exit_signal, ); - let result = data.lock().clone(); result }) @@ -671,7 +738,7 @@ pub(crate) mod tests { exit_receiver.into_future().map(|(_, _)| ()), ); - assert_eq!(result.submitted_messages_proofs, vec![1..=1],); + assert_eq!(result.submitted_messages_proofs, vec![(1..=1, None)],); } #[test] @@ -717,7 +784,13 @@ pub(crate) mod tests { exit_receiver.into_future().map(|(_, _)| ()), ); - assert_eq!(result.submitted_messages_proofs, vec![1..=4, 5..=8, 9..=10],); + // there are no strict restrictions on when reward confirmation should come + // (because `max_unconfirmed_nonces_at_target` is `100` in tests and this confirmation + // depends on the state of both clients) + // => we do not check it here + assert_eq!(result.submitted_messages_proofs[0].0, 1..=4); + assert_eq!(result.submitted_messages_proofs[1].0, 5..=8); + assert_eq!(result.submitted_messages_proofs[2].0, 9..=10); assert!(!result.submitted_messages_receiving_proofs.is_empty()); } } diff --git a/bridges/relays/messages-relay/src/message_race_delivery.rs b/bridges/relays/messages-relay/src/message_race_delivery.rs index d725f256878f7..a5d096b7710b1 100644 --- a/bridges/relays/messages-relay/src/message_race_delivery.rs +++ b/bridges/relays/messages-relay/src/message_race_delivery.rs @@ -18,14 +18,15 @@ use crate::message_lane_loop::{ SourceClient as MessageLaneSourceClient, SourceClientState, TargetClient as MessageLaneTargetClient, TargetClientState, }; -use crate::message_race_loop::{MessageRace, RaceState, RaceStrategy, SourceClient, TargetClient}; +use crate::message_race_loop::{ClientNonces, MessageRace, RaceState, RaceStrategy, SourceClient, TargetClient}; +use crate::message_race_strategy::BasicStrategy; use crate::metrics::MessageLaneLoopMetrics; use async_trait::async_trait; use futures::stream::FusedStream; -use num_traits::{One, Zero}; -use relay_utils::{FailedClient, HeaderId}; -use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive, time::Duration}; +use num_traits::CheckedSub; +use relay_utils::FailedClient; +use std::{marker::PhantomData, ops::RangeInclusive, time::Duration}; /// Maximal number of messages to relay in single transaction. const MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX: u32 = 4; @@ -38,6 +39,7 @@ pub async fn run( target_state_updates: impl FusedStream>, stall_timeout: Duration, metrics_msg: Option, + max_unconfirmed_nonces_at_target: P::MessageNonce, ) -> Result<(), FailedClient> { crate::message_race_loop::run( MessageDeliveryRaceSource { @@ -53,7 +55,12 @@ pub async fn run( }, target_state_updates, stall_timeout, - MessageDeliveryStrategy::

::new(MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX.into()), + MessageDeliveryStrategy::

{ + max_unconfirmed_nonces_at_target, + source_nonces: None, + target_nonces: None, + strategy: BasicStrategy::new(MAX_MESSAGES_TO_RELAY_IN_SINGLE_TX.into()), + }, ) .await } @@ -84,33 +91,46 @@ struct MessageDeliveryRaceSource { _phantom: PhantomData

, } -#[async_trait(?Send)] +#[async_trait] impl SourceClient> for MessageDeliveryRaceSource where P: MessageLane, C: MessageLaneSourceClient

, { type Error = C::Error; + type ProofParameters = bool; - async fn latest_nonce( + async fn nonces( &self, at_block: SourceHeaderIdOf

, - ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error> { - let result = self.client.latest_generated_nonce(at_block).await; + ) -> Result<(SourceHeaderIdOf

, ClientNonces), Self::Error> { + let (at_block, latest_generated_nonce) = self.client.latest_generated_nonce(at_block).await?; + let (at_block, latest_confirmed_nonce) = self.client.latest_confirmed_received_nonce(at_block).await?; + if let Some(metrics_msg) = self.metrics_msg.as_ref() { - if let Ok((_, source_latest_generated_nonce)) = result.as_ref() { - metrics_msg.update_target_latest_received_nonce::

(*source_latest_generated_nonce); - } + metrics_msg.update_source_latest_generated_nonce::

(latest_generated_nonce); + metrics_msg.update_source_latest_confirmed_nonce::

(latest_confirmed_nonce); } - result + + Ok(( + at_block, + ClientNonces { + latest_nonce: latest_generated_nonce, + confirmed_nonce: Some(latest_confirmed_nonce), + }, + )) } async fn generate_proof( &self, at_block: SourceHeaderIdOf

, nonces: RangeInclusive, + proof_parameters: Self::ProofParameters, ) -> Result<(SourceHeaderIdOf

, RangeInclusive, P::MessagesProof), Self::Error> { - self.client.prove_messages(at_block, nonces).await + let outbound_state_proof_required = proof_parameters; + self.client + .prove_messages(at_block, nonces, outbound_state_proof_required) + .await } } @@ -121,7 +141,7 @@ struct MessageDeliveryRaceTarget { _phantom: PhantomData

, } -#[async_trait(?Send)] +#[async_trait] impl TargetClient> for MessageDeliveryRaceTarget where P: MessageLane, @@ -129,17 +149,25 @@ where { type Error = C::Error; - async fn latest_nonce( + async fn nonces( &self, at_block: TargetHeaderIdOf

, - ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error> { - let result = self.client.latest_received_nonce(at_block).await; + ) -> Result<(TargetHeaderIdOf

, ClientNonces), Self::Error> { + let (at_block, latest_received_nonce) = self.client.latest_received_nonce(at_block).await?; + let (at_block, latest_confirmed_nonce) = self.client.latest_confirmed_received_nonce(at_block).await?; + if let Some(metrics_msg) = self.metrics_msg.as_ref() { - if let Ok((_, target_latest_received_nonce)) = result.as_ref() { - metrics_msg.update_target_latest_received_nonce::

(*target_latest_received_nonce); - } + metrics_msg.update_target_latest_received_nonce::

(latest_received_nonce); + metrics_msg.update_target_latest_confirmed_nonce::

(latest_confirmed_nonce); } - result + + Ok(( + at_block, + ClientNonces { + latest_nonce: latest_received_nonce, + confirmed_nonce: Some(latest_confirmed_nonce), + }, + )) } async fn submit_proof( @@ -155,7 +183,18 @@ where } /// Messages delivery strategy. -type MessageDeliveryStrategy

= DeliveryStrategy< +struct MessageDeliveryStrategy { + /// Maximal unconfirmed nonces at target client. + max_unconfirmed_nonces_at_target: P::MessageNonce, + /// Latest nonces from the source client. + source_nonces: Option>, + /// Target nonces from the source client. + target_nonces: Option>, + /// Basic delivery strategy. + strategy: MessageDeliveryStrategyBase

, +} + +type MessageDeliveryStrategyBase

= BasicStrategy<

::SourceHeaderNumber,

::SourceHeaderHash,

::TargetHeaderNumber, @@ -164,280 +203,90 @@ type MessageDeliveryStrategy

= DeliveryStrategy<

::MessagesProof, >; -/// Nonces delivery strategy. -#[derive(Debug)] -pub struct DeliveryStrategy { - /// All queued nonces. - source_queue: VecDeque<(HeaderId, Nonce)>, - /// Best nonce known to target node. - target_nonce: Nonce, - /// Max nonces to relay in single transaction. - max_nonces_to_relay_in_single_tx: Nonce, - /// Unused generic types dump. - _phantom: PhantomData<(TargetHeaderNumber, TargetHeaderHash, Proof)>, -} - -impl - DeliveryStrategy +impl RaceStrategy, TargetHeaderIdOf

, P::MessageNonce, P::MessagesProof> + for MessageDeliveryStrategy

{ - /// Create new delivery strategy. - pub fn new(max_nonces_to_relay_in_single_tx: Nonce) -> Self { - DeliveryStrategy { - source_queue: VecDeque::new(), - target_nonce: Default::default(), - max_nonces_to_relay_in_single_tx, - _phantom: Default::default(), - } - } -} + type ProofParameters = bool; -impl - RaceStrategy< - HeaderId, - HeaderId, - Nonce, - Proof, - > for DeliveryStrategy -where - SourceHeaderHash: Clone, - SourceHeaderNumber: Clone + Ord, - Nonce: Clone + Copy + From + Ord + std::ops::Add + One + Zero, -{ fn is_empty(&self) -> bool { - self.source_queue.is_empty() + self.strategy.is_empty() } - fn source_nonce_updated(&mut self, at_block: HeaderId, nonce: Nonce) { - if nonce <= self.target_nonce { - return; - } - - match self.source_queue.back() { - Some((_, prev_nonce)) if *prev_nonce < nonce => (), - Some(_) => return, - None => (), - } - - self.source_queue.push_back((at_block, nonce)) + fn source_nonces_updated(&mut self, at_block: SourceHeaderIdOf

, nonces: ClientNonces) { + self.source_nonces = Some(nonces.clone()); + self.strategy.source_nonces_updated(at_block, nonces) } - fn target_nonce_updated( + fn target_nonces_updated( &mut self, - nonce: Nonce, - race_state: &mut RaceState< - HeaderId, - HeaderId, - Nonce, - Proof, - >, + nonces: ClientNonces, + race_state: &mut RaceState, TargetHeaderIdOf

, P::MessageNonce, P::MessagesProof>, ) { - if nonce < self.target_nonce { - return; - } - - while let Some(true) = self - .source_queue - .front() - .map(|(_, source_nonce)| *source_nonce <= nonce) - { - self.source_queue.pop_front(); - } - - let need_to_select_new_nonces = race_state - .nonces_to_submit - .as_ref() - .map(|(_, nonces, _)| *nonces.end() <= nonce) - .unwrap_or(false); - if need_to_select_new_nonces { - race_state.nonces_to_submit = None; - } - - let need_new_nonces_to_submit = race_state - .nonces_submitted - .as_ref() - .map(|nonces| *nonces.end() <= nonce) - .unwrap_or(false); - if need_new_nonces_to_submit { - race_state.nonces_submitted = None; - } - - self.target_nonce = nonce; + self.target_nonces = Some(nonces.clone()); + self.strategy.target_nonces_updated(nonces, race_state) } fn select_nonces_to_deliver( &mut self, - race_state: &RaceState< - HeaderId, - HeaderId, - Nonce, - Proof, - >, - ) -> Option> { - // if we have already selected nonces that we want to submit, do nothing - if race_state.nonces_to_submit.is_some() { - return None; - } - - // if we already submitted some nonces, do nothing - if race_state.nonces_submitted.is_some() { - return None; - } - - // 1) we want to deliver all nonces, starting from `target_nonce + 1` - // 2) we want to deliver at most `self.max_nonces_to_relay_in_single_tx` nonces in this batch - // 3) we can't deliver new nonce until header, that has emitted this nonce, is finalized - // by target client - let nonces_begin = self.target_nonce + 1.into(); - let best_header_at_target = &race_state.target_state.as_ref()?.best_peer; - let mut nonces_end = None; - let mut i = Zero::zero(); - while i < self.max_nonces_to_relay_in_single_tx { - let nonce = nonces_begin + i; - - // if queue is empty, we don't need to prove anything - let (first_queued_at, first_queued_nonce) = match self.source_queue.front() { - Some((first_queued_at, first_queued_nonce)) => ((*first_queued_at).clone(), *first_queued_nonce), - None => break, - }; - - // if header that has queued the message is not yet finalized at bridged chain, - // we can't prove anything - if first_queued_at.0 > best_header_at_target.0 { - break; - } - - // ok, we may deliver this nonce - nonces_end = Some(nonce); - - // probably remove it from the queue? - if nonce == first_queued_nonce { - self.source_queue.pop_front(); + race_state: &RaceState, TargetHeaderIdOf

, P::MessageNonce, P::MessagesProof>, + ) -> Option<(RangeInclusive, Self::ProofParameters)> { + const CONFIRMED_NONCE_PROOF: &str = "\ + ClientNonces are crafted by MessageDeliveryRace(Source|Target);\ + MessageDeliveryRace(Source|Target) always fills confirmed_nonce field;\ + qed"; + + let source_nonces = self.source_nonces.as_ref()?; + let target_nonces = self.target_nonces.as_ref()?; + + // There's additional condition in the message delivery race: target would reject messages + // if there are too much unconfirmed messages at the inbound lane. + + // https://github.com/paritytech/parity-bridges-common/issues/432 + // TODO: message lane loop works with finalized blocks only, but we're submitting transactions that + // are updating best block (which may not be finalized yet). So all decisions that are made below + // may be outdated. This needs to be changed - all logic here must be built on top of best blocks. + + // The receiving race is responsible to deliver confirmations back to the source chain. So if + // there's a lot of unconfirmed messages, let's wait until it'll be able to do its job. + let latest_received_nonce_at_target = target_nonces.latest_nonce; + let latest_confirmed_nonce_at_source = source_nonces.confirmed_nonce.expect(CONFIRMED_NONCE_PROOF); + let confirmations_missing = latest_received_nonce_at_target.checked_sub(&latest_confirmed_nonce_at_source); + match confirmations_missing { + Some(confirmations_missing) if confirmations_missing > self.max_unconfirmed_nonces_at_target => { + log::debug!( + target: "bridge", + "Cannot deliver any more messages from {} to {}. Too many unconfirmed nonces \ + at target: target.latest_received={:?}, source.latest_confirmed={:?}, max={:?}", + MessageDeliveryRace::

::source_name(), + MessageDeliveryRace::

::target_name(), + latest_received_nonce_at_target, + latest_confirmed_nonce_at_source, + self.max_unconfirmed_nonces_at_target, + ); + + return None; } - - i = i + One::one(); + _ => (), } - nonces_end.map(|nonces_end| RangeInclusive::new(nonces_begin, nonces_end)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::message_lane_loop::{ - tests::{header_id, TestMessageLane, TestMessageNonce, TestMessagesProof}, - ClientState, - }; - - #[test] - fn strategy_is_empty_works() { - let mut strategy = MessageDeliveryStrategy::::new(4); - assert_eq!(strategy.is_empty(), true); - strategy.source_nonce_updated(header_id(1), 1); - assert_eq!(strategy.is_empty(), false); - } - - #[test] - fn source_nonce_is_never_lower_than_known_target_nonce() { - let mut strategy = MessageDeliveryStrategy::::new(4); - strategy.target_nonce_updated(10, &mut Default::default()); - strategy.source_nonce_updated(header_id(1), 5); - assert_eq!(strategy.source_queue, vec![]); - } - - #[test] - fn source_nonce_is_never_lower_than_latest_known_source_nonce() { - let mut strategy = MessageDeliveryStrategy::::new(4); - strategy.source_nonce_updated(header_id(1), 5); - strategy.source_nonce_updated(header_id(2), 3); - strategy.source_nonce_updated(header_id(2), 5); - assert_eq!(strategy.source_queue, vec![(header_id(1), 5)]); - } - - #[test] - fn target_nonce_is_never_lower_than_latest_known_target_nonce() { - let mut strategy = MessageDeliveryStrategy::::new(4); - strategy.target_nonce_updated(10, &mut Default::default()); - strategy.target_nonce_updated(5, &mut Default::default()); - assert_eq!(strategy.target_nonce, 10); - } - - #[test] - fn updated_target_nonce_removes_queued_entries() { - let mut strategy = MessageDeliveryStrategy::::new(4); - strategy.source_nonce_updated(header_id(1), 5); - strategy.source_nonce_updated(header_id(2), 10); - strategy.source_nonce_updated(header_id(3), 15); - strategy.source_nonce_updated(header_id(4), 20); - strategy.target_nonce_updated(15, &mut Default::default()); - assert_eq!(strategy.source_queue, vec![(header_id(4), 20)]); - } - - #[test] - fn selected_nonces_are_dropped_on_target_nonce_update() { - let mut state = RaceState::default(); - let mut strategy = MessageDeliveryStrategy::::new(4); - state.nonces_to_submit = Some((header_id(1), 5..=10, 5..=10)); - strategy.target_nonce_updated(7, &mut state); - assert!(state.nonces_to_submit.is_some()); - strategy.target_nonce_updated(10, &mut state); - assert!(state.nonces_to_submit.is_none()); - } - - #[test] - fn submitted_nonces_are_dropped_on_target_nonce_update() { - let mut state = RaceState::default(); - let mut strategy = MessageDeliveryStrategy::::new(4); - state.nonces_submitted = Some(5..=10); - strategy.target_nonce_updated(7, &mut state); - assert!(state.nonces_submitted.is_some()); - strategy.target_nonce_updated(10, &mut state); - assert!(state.nonces_submitted.is_none()); - } - - #[test] - fn nothing_is_selected_if_something_is_already_selected() { - let mut state = RaceState::default(); - let mut strategy = MessageDeliveryStrategy::::new(4); - state.nonces_to_submit = Some((header_id(1), 1..=10, 1..=10)); - strategy.source_nonce_updated(header_id(1), 10); - assert_eq!(strategy.select_nonces_to_deliver(&state), None); - } - - #[test] - fn nothing_is_selected_if_something_is_already_submitted() { - let mut state = RaceState::default(); - let mut strategy = MessageDeliveryStrategy::::new(4); - state.nonces_submitted = Some(1..=10); - strategy.source_nonce_updated(header_id(1), 10); - assert_eq!(strategy.select_nonces_to_deliver(&state), None); - } - - #[test] - fn select_nonces_to_deliver_works() { - let mut state = RaceState::<_, _, TestMessageNonce, TestMessagesProof>::default(); - let mut strategy = MessageDeliveryStrategy::::new(4); - strategy.source_nonce_updated(header_id(1), 1); - strategy.source_nonce_updated(header_id(2), 2); - strategy.source_nonce_updated(header_id(3), 6); - strategy.source_nonce_updated(header_id(5), 8); - - state.target_state = Some(ClientState { - best_self: header_id(0), - best_peer: header_id(4), - }); - assert_eq!(strategy.select_nonces_to_deliver(&state), Some(1..=4)); - strategy.target_nonce_updated(4, &mut state); - assert_eq!(strategy.select_nonces_to_deliver(&state), Some(5..=6)); - strategy.target_nonce_updated(6, &mut state); - assert_eq!(strategy.select_nonces_to_deliver(&state), None); - - state.target_state = Some(ClientState { - best_self: header_id(0), - best_peer: header_id(5), - }); - assert_eq!(strategy.select_nonces_to_deliver(&state), Some(7..=8)); - strategy.target_nonce_updated(8, &mut state); - assert_eq!(strategy.select_nonces_to_deliver(&state), None); + // If we're here, then the confirmations race did it job && sending side now knows that messages + // have been delivered. Now let's select nonces that we want to deliver. + let selected_nonces = self.strategy.select_nonces_to_deliver(race_state)?.0; + + // Ok - we have new nonces to deliver. But target may still reject new messages, because we haven't + // notified it that (some) messages have been confirmed. So we may want to include updated + // `source.latest_confirmed` in the proof. + // + // Important note: we're including outbound state lane proof whenever there are unconfirmed nonces + // on the target chain. Other strategy is to include it only if it's absolutely necessary. + let latest_confirmed_nonce_at_target = target_nonces.confirmed_nonce.expect(CONFIRMED_NONCE_PROOF); + let outbound_state_proof_required = latest_confirmed_nonce_at_target < latest_confirmed_nonce_at_source; + + // https://github.com/paritytech/parity-bridges-common/issues/432 + // https://github.com/paritytech/parity-bridges-common/issues/433 + // TODO: number of messages must be no larger than: + // `max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target - latest_confirmed_nonce_at_target)` + + Some((selected_nonces, outbound_state_proof_required)) } } diff --git a/bridges/relays/messages-relay/src/message_race_loop.rs b/bridges/relays/messages-relay/src/message_race_loop.rs index 30401cd0baaa9..4b2a9eef04540 100644 --- a/bridges/relays/messages-relay/src/message_race_loop.rs +++ b/bridges/relays/messages-relay/src/message_race_loop.rs @@ -20,9 +20,6 @@ //! associated data - like messages, lane state, etc) to the target node by //! generating and submitting proof. -// Until there'll be actual message-lane in the runtime. -#![allow(dead_code)] - use crate::message_lane_loop::ClientState; use async_trait::async_trait; @@ -61,36 +58,49 @@ type SourceClientState

= ClientState<

::SourceHeaderId,

= ClientState<

::TargetHeaderId,

::SourceHeaderId>; +/// Nonces on the race client. +#[derive(Debug, Clone)] +pub struct ClientNonces { + /// Latest nonce that is known to the client. + pub latest_nonce: MessageNonce, + /// Latest nonce that is confirmed to the bridged client. This nonce only makes + /// sense in some races. In other races it is `None`. + pub confirmed_nonce: Option, +} + /// One of message lane clients, which is source client for the race. -#[async_trait(?Send)] +#[async_trait] pub trait SourceClient { /// Type of error this clients returns. type Error: std::fmt::Debug + MaybeConnectionError; + /// Additional proof parameters required to generate proof. + type ProofParameters; - /// Return latest nonce that is known to the source client. - async fn latest_nonce( + /// Return nonces that are known to the source client. + async fn nonces( &self, at_block: P::SourceHeaderId, - ) -> Result<(P::SourceHeaderId, P::MessageNonce), Self::Error>; + ) -> Result<(P::SourceHeaderId, ClientNonces), Self::Error>; /// Generate proof for delivering to the target client. async fn generate_proof( &self, at_block: P::SourceHeaderId, nonces: RangeInclusive, + proof_parameters: Self::ProofParameters, ) -> Result<(P::SourceHeaderId, RangeInclusive, P::Proof), Self::Error>; } /// One of message lane clients, which is target client for the race. -#[async_trait(?Send)] +#[async_trait] pub trait TargetClient { /// Type of error this clients returns. type Error: std::fmt::Debug + MaybeConnectionError; - /// Return latest nonce that is known to the target client. - async fn latest_nonce( + /// Return nonces that are known to the target client. + async fn nonces( &self, at_block: P::TargetHeaderId, - ) -> Result<(P::TargetHeaderId, P::MessageNonce), Self::Error>; + ) -> Result<(P::TargetHeaderId, ClientNonces), Self::Error>; /// Submit proof to the target client. async fn submit_proof( &self, @@ -102,22 +112,26 @@ pub trait TargetClient { /// Race strategy. pub trait RaceStrategy { + /// Additional proof parameters required to generate proof. + type ProofParameters; + /// Should return true if nothing has to be synced. fn is_empty(&self) -> bool; - /// Called when latest nonce is updated at source node of the race. - fn source_nonce_updated(&mut self, at_block: SourceHeaderId, nonce: MessageNonce); - /// Called when latest nonce is updated at target node of the race. - fn target_nonce_updated( + /// Called when nonces are updated at source node of the race. + fn source_nonces_updated(&mut self, at_block: SourceHeaderId, nonce: ClientNonces); + /// Called when nonces are updated at target node of the race. + fn target_nonces_updated( &mut self, - nonce: MessageNonce, + nonces: ClientNonces, race_state: &mut RaceState, ); /// Should return `Some(nonces)` if we need to deliver proof of `nonces` (and associated /// data) from source to target node. + /// Additionally, parameters required to generate proof are returned. fn select_nonces_to_deliver( &mut self, race_state: &RaceState, - ) -> Option>; + ) -> Option<(RangeInclusive, Self::ProofParameters)>; } /// State of the race. @@ -133,38 +147,44 @@ pub struct RaceState { } /// Run race loop until connection with target or source node is lost. -pub async fn run( - race_source: impl SourceClient

, +pub async fn run>( + race_source: SC, race_source_updated: impl FusedStream>, race_target: impl TargetClient

, race_target_updated: impl FusedStream>, stall_timeout: Duration, - mut strategy: impl RaceStrategy, + mut strategy: impl RaceStrategy< + P::SourceHeaderId, + P::TargetHeaderId, + P::MessageNonce, + P::Proof, + ProofParameters = SC::ProofParameters, + >, ) -> Result<(), FailedClient> { let mut race_state = RaceState::default(); let mut stall_countdown = Instant::now(); let mut source_retry_backoff = retry_backoff(); let mut source_client_is_online = true; - let mut source_latest_nonce_required = false; - let source_latest_nonce = futures::future::Fuse::terminated(); + let mut source_nonces_required = false; + let source_nonces = futures::future::Fuse::terminated(); let source_generate_proof = futures::future::Fuse::terminated(); let source_go_offline_future = futures::future::Fuse::terminated(); let mut target_retry_backoff = retry_backoff(); let mut target_client_is_online = true; - let mut target_latest_nonce_required = false; - let target_latest_nonce = futures::future::Fuse::terminated(); + let mut target_nonces_required = false; + let target_nonces = futures::future::Fuse::terminated(); let target_submit_proof = futures::future::Fuse::terminated(); let target_go_offline_future = futures::future::Fuse::terminated(); futures::pin_mut!( race_source_updated, - source_latest_nonce, + source_nonces, source_generate_proof, source_go_offline_future, race_target_updated, - target_latest_nonce, + target_nonces, target_submit_proof, target_go_offline_future, ); @@ -175,7 +195,7 @@ pub async fn run( source_state = race_source_updated.next() => { if let Some(source_state) = source_state { if race_state.source_state.as_ref() != Some(&source_state) { - source_latest_nonce_required = true; + source_nonces_required = true; race_state.source_state = Some(source_state); } } @@ -183,53 +203,53 @@ pub async fn run( target_state = race_target_updated.next() => { if let Some(target_state) = target_state { if race_state.target_state.as_ref() != Some(&target_state) { - target_latest_nonce_required = true; + target_nonces_required = true; race_state.target_state = Some(target_state); } } }, // when nonces are updated - latest_nonce = source_latest_nonce => { - source_latest_nonce_required = false; + nonces = source_nonces => { + source_nonces_required = false; source_client_is_online = process_future_result( - latest_nonce, + nonces, &mut source_retry_backoff, - |(at_block, latest_nonce)| { + |(at_block, nonces)| { log::debug!( target: "bridge", - "Received latest nonce from {}: {:?}", + "Received nonces from {}: {:?}", P::source_name(), - latest_nonce, + nonces, ); - strategy.source_nonce_updated(at_block, latest_nonce); + strategy.source_nonces_updated(at_block, nonces); }, &mut source_go_offline_future, |delay| async_std::task::sleep(delay), - || format!("Error retrieving latest nonce from {}", P::source_name()), + || format!("Error retrieving nonces from {}", P::source_name()), ).fail_if_connection_error(FailedClient::Source)?; }, - latest_nonce = target_latest_nonce => { - target_latest_nonce_required = false; + nonces = target_nonces => { + target_nonces_required = false; target_client_is_online = process_future_result( - latest_nonce, + nonces, &mut target_retry_backoff, - |(_, latest_nonce)| { + |(_, nonces)| { log::debug!( target: "bridge", - "Received latest nonce from {}: {:?}", + "Received nonces from {}: {:?}", P::target_name(), - latest_nonce, + nonces, ); - strategy.target_nonce_updated(latest_nonce, &mut race_state); + strategy.target_nonces_updated(nonces, &mut race_state); }, &mut target_go_offline_future, |delay| async_std::task::sleep(delay), - || format!("Error retrieving latest nonce from {}", P::target_name()), + || format!("Error retrieving nonces from {}", P::target_name()), ).fail_if_connection_error(FailedClient::Target)?; }, @@ -288,26 +308,32 @@ pub async fn run( let nonces_to_deliver = race_state.source_state.as_ref().and_then(|source_state| { strategy .select_nonces_to_deliver(&race_state) - .map(|nonces_range| (source_state.best_self.clone(), nonces_range)) + .map(|(nonces_range, proof_parameters)| { + (source_state.best_self.clone(), nonces_range, proof_parameters) + }) }); - if let Some((at_block, nonces_range)) = nonces_to_deliver { + if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver { log::debug!( target: "bridge", "Asking {} to prove nonces in range {:?}", P::source_name(), nonces_range, ); - source_generate_proof.set(race_source.generate_proof(at_block, nonces_range).fuse()); - } else if source_latest_nonce_required { - log::debug!(target: "bridge", "Asking {} about latest generated message nonce", P::source_name()); + source_generate_proof.set( + race_source + .generate_proof(at_block, nonces_range, proof_parameters) + .fuse(), + ); + } else if source_nonces_required { + log::debug!(target: "bridge", "Asking {} about message nonces", P::source_name()); let at_block = race_state .source_state .as_ref() - .expect("source_latest_nonce_required is only true when source_state is Some; qed") + .expect("source_nonces_required is only true when source_state is Some; qed") .best_self .clone(); - source_latest_nonce.set(race_source.latest_nonce(at_block).fuse()); + source_nonces.set(race_source.nonces(at_block).fuse()); } else { source_client_is_online = true; } @@ -329,15 +355,15 @@ pub async fn run( .fuse(), ); } - if target_latest_nonce_required { - log::debug!(target: "bridge", "Asking {} about latest nonce", P::target_name()); + if target_nonces_required { + log::debug!(target: "bridge", "Asking {} about message nonces", P::target_name()); let at_block = race_state .target_state .as_ref() - .expect("target_latest_nonce_required is only true when target_state is Some; qed") + .expect("target_nonces_required is only true when target_state is Some; qed") .best_self .clone(); - target_latest_nonce.set(race_target.latest_nonce(at_block).fuse()); + target_nonces.set(race_target.nonces(at_block).fuse()); } else { target_client_is_online = true; } diff --git a/bridges/relays/messages-relay/src/message_race_receiving.rs b/bridges/relays/messages-relay/src/message_race_receiving.rs index d985020fddda1..bf9975717c319 100644 --- a/bridges/relays/messages-relay/src/message_race_receiving.rs +++ b/bridges/relays/messages-relay/src/message_race_receiving.rs @@ -18,8 +18,8 @@ use crate::message_lane_loop::{ SourceClient as MessageLaneSourceClient, SourceClientState, TargetClient as MessageLaneTargetClient, TargetClientState, }; -use crate::message_race_delivery::DeliveryStrategy; -use crate::message_race_loop::{MessageRace, SourceClient, TargetClient}; +use crate::message_race_loop::{ClientNonces, MessageRace, SourceClient, TargetClient}; +use crate::message_race_strategy::BasicStrategy; use crate::metrics::MessageLaneLoopMetrics; use async_trait::async_trait; @@ -28,7 +28,7 @@ use relay_utils::FailedClient; use std::{marker::PhantomData, ops::RangeInclusive, time::Duration}; /// Message receiving confirmations delivery strategy. -type ReceivingConfirmationsDeliveryStrategy

= DeliveryStrategy< +type ReceivingConfirmationsBasicStrategy

= BasicStrategy<

::TargetHeaderNumber,

::TargetHeaderHash,

::SourceHeaderNumber, @@ -60,7 +60,7 @@ pub async fn run( }, source_state_updates, stall_timeout, - ReceivingConfirmationsDeliveryStrategy::

::new(std::u32::MAX.into()), + ReceivingConfirmationsBasicStrategy::

::new(std::u32::MAX.into()), ) .await } @@ -91,31 +91,38 @@ struct ReceivingConfirmationsRaceSource { _phantom: PhantomData

, } -#[async_trait(?Send)] +#[async_trait] impl SourceClient> for ReceivingConfirmationsRaceSource where P: MessageLane, C: MessageLaneTargetClient

, { type Error = C::Error; + type ProofParameters = (); - async fn latest_nonce( + async fn nonces( &self, at_block: TargetHeaderIdOf

, - ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error> { - let result = self.client.latest_received_nonce(at_block).await; + ) -> Result<(TargetHeaderIdOf

, ClientNonces), Self::Error> { + let (at_block, latest_received_nonce) = self.client.latest_received_nonce(at_block).await?; if let Some(metrics_msg) = self.metrics_msg.as_ref() { - if let Ok((_, target_latest_received_nonce)) = result.as_ref() { - metrics_msg.update_target_latest_received_nonce::

(*target_latest_received_nonce); - } + metrics_msg.update_target_latest_received_nonce::

(latest_received_nonce); } - result + Ok(( + at_block, + ClientNonces { + latest_nonce: latest_received_nonce, + confirmed_nonce: None, + }, + )) } + #[allow(clippy::unit_arg)] async fn generate_proof( &self, at_block: TargetHeaderIdOf

, nonces: RangeInclusive, + _proof_parameters: Self::ProofParameters, ) -> Result< ( TargetHeaderIdOf

, @@ -138,7 +145,7 @@ struct ReceivingConfirmationsRaceTarget { _phantom: PhantomData

, } -#[async_trait(?Send)] +#[async_trait] impl TargetClient> for ReceivingConfirmationsRaceTarget where P: MessageLane, @@ -146,27 +153,32 @@ where { type Error = C::Error; - async fn latest_nonce( + async fn nonces( &self, at_block: SourceHeaderIdOf

, - ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error> { - let result = self.client.latest_confirmed_received_nonce(at_block).await; + ) -> Result<(SourceHeaderIdOf

, ClientNonces), Self::Error> { + let (at_block, latest_confirmed_nonce) = self.client.latest_confirmed_received_nonce(at_block).await?; if let Some(metrics_msg) = self.metrics_msg.as_ref() { - if let Ok((_, source_latest_confirmed_nonce)) = result.as_ref() { - metrics_msg.update_source_latest_confirmed_nonce::

(*source_latest_confirmed_nonce); - } + metrics_msg.update_source_latest_confirmed_nonce::

(latest_confirmed_nonce); } - result + Ok(( + at_block, + ClientNonces { + latest_nonce: latest_confirmed_nonce, + confirmed_nonce: None, + }, + )) } async fn submit_proof( &self, generated_at_block: TargetHeaderIdOf

, - _nonces: RangeInclusive, + nonces: RangeInclusive, proof: P::MessagesReceivingProof, ) -> Result, Self::Error> { self.client .submit_messages_receiving_proof(generated_at_block, proof) - .await + .await?; + Ok(nonces) } } diff --git a/bridges/relays/messages-relay/src/message_race_strategy.rs b/bridges/relays/messages-relay/src/message_race_strategy.rs new file mode 100644 index 0000000000000..a22331fe07d13 --- /dev/null +++ b/bridges/relays/messages-relay/src/message_race_strategy.rs @@ -0,0 +1,334 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +//! Basic delivery strategy. The strategy selects nonces if: +//! +//! 1) there are more nonces on the source side than on the target side; +//! 2) new nonces may be proved to target node (i.e. they have appeared at the +//! block, which is known to the target node). + +use crate::message_race_loop::{ClientNonces, RaceState, RaceStrategy}; + +use num_traits::{One, Zero}; +use relay_utils::HeaderId; +use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive}; + +/// Nonces delivery strategy. +#[derive(Debug)] +pub struct BasicStrategy { + /// All queued nonces. + source_queue: VecDeque<(HeaderId, Nonce)>, + /// Best nonce known to target node. + target_nonce: Nonce, + /// Max nonces to relay in single transaction. + max_nonces_to_relay_in_single_tx: Nonce, + /// Unused generic types dump. + _phantom: PhantomData<(TargetHeaderNumber, TargetHeaderHash, Proof)>, +} + +impl + BasicStrategy +{ + /// Create new delivery strategy. + pub fn new(max_nonces_to_relay_in_single_tx: Nonce) -> Self { + BasicStrategy { + source_queue: VecDeque::new(), + target_nonce: Default::default(), + max_nonces_to_relay_in_single_tx, + _phantom: Default::default(), + } + } +} + +impl + RaceStrategy< + HeaderId, + HeaderId, + Nonce, + Proof, + > for BasicStrategy +where + SourceHeaderHash: Clone, + SourceHeaderNumber: Clone + Ord, + Nonce: Clone + Copy + From + Ord + std::ops::Add + One + Zero, +{ + type ProofParameters = (); + + fn is_empty(&self) -> bool { + self.source_queue.is_empty() + } + + fn source_nonces_updated( + &mut self, + at_block: HeaderId, + nonces: ClientNonces, + ) { + let nonce = nonces.latest_nonce; + + if nonce <= self.target_nonce { + return; + } + + match self.source_queue.back() { + Some((_, prev_nonce)) if *prev_nonce < nonce => (), + Some(_) => return, + None => (), + } + + self.source_queue.push_back((at_block, nonce)) + } + + fn target_nonces_updated( + &mut self, + nonces: ClientNonces, + race_state: &mut RaceState< + HeaderId, + HeaderId, + Nonce, + Proof, + >, + ) { + let nonce = nonces.latest_nonce; + + if nonce < self.target_nonce { + return; + } + + while let Some(true) = self + .source_queue + .front() + .map(|(_, source_nonce)| *source_nonce <= nonce) + { + self.source_queue.pop_front(); + } + + let need_to_select_new_nonces = race_state + .nonces_to_submit + .as_ref() + .map(|(_, nonces, _)| *nonces.end() <= nonce) + .unwrap_or(false); + if need_to_select_new_nonces { + race_state.nonces_to_submit = None; + } + + let need_new_nonces_to_submit = race_state + .nonces_submitted + .as_ref() + .map(|nonces| *nonces.end() <= nonce) + .unwrap_or(false); + if need_new_nonces_to_submit { + race_state.nonces_submitted = None; + } + + self.target_nonce = nonce; + } + + fn select_nonces_to_deliver( + &mut self, + race_state: &RaceState< + HeaderId, + HeaderId, + Nonce, + Proof, + >, + ) -> Option<(RangeInclusive, Self::ProofParameters)> { + // if we have already selected nonces that we want to submit, do nothing + if race_state.nonces_to_submit.is_some() { + return None; + } + + // if we already submitted some nonces, do nothing + if race_state.nonces_submitted.is_some() { + return None; + } + + // 1) we want to deliver all nonces, starting from `target_nonce + 1` + // 2) we want to deliver at most `self.max_nonces_to_relay_in_single_tx` nonces in this batch + // 3) we can't deliver new nonce until header, that has emitted this nonce, is finalized + // by target client + let nonces_begin = self.target_nonce + 1.into(); + let best_header_at_target = &race_state.target_state.as_ref()?.best_peer; + let mut nonces_end = None; + let mut i = Zero::zero(); + + // https://github.com/paritytech/parity-bridges-common/issues/433 + // TODO: instead of limiting number of messages by number, provide custom limit callback here. + // In delivery race it'll be weight-based callback. In receiving race it'll be unlimited callback. + + while i < self.max_nonces_to_relay_in_single_tx { + let nonce = nonces_begin + i; + + // if queue is empty, we don't need to prove anything + let (first_queued_at, first_queued_nonce) = match self.source_queue.front() { + Some((first_queued_at, first_queued_nonce)) => ((*first_queued_at).clone(), *first_queued_nonce), + None => break, + }; + + // if header that has queued the message is not yet finalized at bridged chain, + // we can't prove anything + if first_queued_at.0 > best_header_at_target.0 { + break; + } + + // ok, we may deliver this nonce + nonces_end = Some(nonce); + + // probably remove it from the queue? + if nonce == first_queued_nonce { + self.source_queue.pop_front(); + } + + i = i + One::one(); + } + + nonces_end.map(|nonces_end| (RangeInclusive::new(nonces_begin, nonces_end), ())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::message_lane::MessageLane; + use crate::message_lane_loop::{ + tests::{header_id, TestMessageLane, TestMessageNonce, TestMessagesProof}, + ClientState, + }; + + type BasicStrategy

= super::BasicStrategy< +

::SourceHeaderNumber, +

::SourceHeaderHash, +

::TargetHeaderNumber, +

::TargetHeaderHash, +

::MessageNonce, +

::MessagesProof, + >; + + fn nonces(latest_nonce: TestMessageNonce) -> ClientNonces { + ClientNonces { + latest_nonce, + confirmed_nonce: None, + } + } + + #[test] + fn strategy_is_empty_works() { + let mut strategy = BasicStrategy::::new(4); + assert_eq!(strategy.is_empty(), true); + strategy.source_nonces_updated(header_id(1), nonces(1)); + assert_eq!(strategy.is_empty(), false); + } + + #[test] + fn source_nonce_is_never_lower_than_known_target_nonce() { + let mut strategy = BasicStrategy::::new(4); + strategy.target_nonces_updated(nonces(10), &mut Default::default()); + strategy.source_nonces_updated(header_id(1), nonces(5)); + assert_eq!(strategy.source_queue, vec![]); + } + + #[test] + fn source_nonce_is_never_lower_than_latest_known_source_nonce() { + let mut strategy = BasicStrategy::::new(4); + strategy.source_nonces_updated(header_id(1), nonces(5)); + strategy.source_nonces_updated(header_id(2), nonces(3)); + strategy.source_nonces_updated(header_id(2), nonces(5)); + assert_eq!(strategy.source_queue, vec![(header_id(1), 5)]); + } + + #[test] + fn target_nonce_is_never_lower_than_latest_known_target_nonce() { + let mut strategy = BasicStrategy::::new(4); + strategy.target_nonces_updated(nonces(10), &mut Default::default()); + strategy.target_nonces_updated(nonces(5), &mut Default::default()); + assert_eq!(strategy.target_nonce, 10); + } + + #[test] + fn updated_target_nonce_removes_queued_entries() { + let mut strategy = BasicStrategy::::new(4); + strategy.source_nonces_updated(header_id(1), nonces(5)); + strategy.source_nonces_updated(header_id(2), nonces(10)); + strategy.source_nonces_updated(header_id(3), nonces(15)); + strategy.source_nonces_updated(header_id(4), nonces(20)); + strategy.target_nonces_updated(nonces(15), &mut Default::default()); + assert_eq!(strategy.source_queue, vec![(header_id(4), 20)]); + } + + #[test] + fn selected_nonces_are_dropped_on_target_nonce_update() { + let mut state = RaceState::default(); + let mut strategy = BasicStrategy::::new(4); + state.nonces_to_submit = Some((header_id(1), 5..=10, (5..=10, None))); + strategy.target_nonces_updated(nonces(7), &mut state); + assert!(state.nonces_to_submit.is_some()); + strategy.target_nonces_updated(nonces(10), &mut state); + assert!(state.nonces_to_submit.is_none()); + } + + #[test] + fn submitted_nonces_are_dropped_on_target_nonce_update() { + let mut state = RaceState::default(); + let mut strategy = BasicStrategy::::new(4); + state.nonces_submitted = Some(5..=10); + strategy.target_nonces_updated(nonces(7), &mut state); + assert!(state.nonces_submitted.is_some()); + strategy.target_nonces_updated(nonces(10), &mut state); + assert!(state.nonces_submitted.is_none()); + } + + #[test] + fn nothing_is_selected_if_something_is_already_selected() { + let mut state = RaceState::default(); + let mut strategy = BasicStrategy::::new(4); + state.nonces_to_submit = Some((header_id(1), 1..=10, (1..=10, None))); + strategy.source_nonces_updated(header_id(1), nonces(10)); + assert_eq!(strategy.select_nonces_to_deliver(&state), None); + } + + #[test] + fn nothing_is_selected_if_something_is_already_submitted() { + let mut state = RaceState::default(); + let mut strategy = BasicStrategy::::new(4); + state.nonces_submitted = Some(1..=10); + strategy.source_nonces_updated(header_id(1), nonces(10)); + assert_eq!(strategy.select_nonces_to_deliver(&state), None); + } + + #[test] + fn select_nonces_to_deliver_works() { + let mut state = RaceState::<_, _, TestMessageNonce, TestMessagesProof>::default(); + let mut strategy = BasicStrategy::::new(4); + strategy.source_nonces_updated(header_id(1), nonces(1)); + strategy.source_nonces_updated(header_id(2), nonces(2)); + strategy.source_nonces_updated(header_id(3), nonces(6)); + strategy.source_nonces_updated(header_id(5), nonces(8)); + + state.target_state = Some(ClientState { + best_self: header_id(0), + best_peer: header_id(4), + }); + assert_eq!(strategy.select_nonces_to_deliver(&state), Some((1..=4, ()))); + strategy.target_nonces_updated(nonces(4), &mut state); + assert_eq!(strategy.select_nonces_to_deliver(&state), Some((5..=6, ()))); + strategy.target_nonces_updated(nonces(6), &mut state); + assert_eq!(strategy.select_nonces_to_deliver(&state), None); + + state.target_state = Some(ClientState { + best_self: header_id(0), + best_peer: header_id(5), + }); + assert_eq!(strategy.select_nonces_to_deliver(&state), Some((7..=8, ()))); + strategy.target_nonces_updated(nonces(8), &mut state); + assert_eq!(strategy.select_nonces_to_deliver(&state), None); + } +} diff --git a/bridges/relays/millau-client/Cargo.toml b/bridges/relays/millau-client/Cargo.toml index 25a27f754fd9e..487e4bb6347f6 100644 --- a/bridges/relays/millau-client/Cargo.toml +++ b/bridges/relays/millau-client/Cargo.toml @@ -21,4 +21,5 @@ frame-support = "2.0" frame-system = "2.0" pallet-transaction-payment = "2.0" sp-core = "2.0" +sp-keyring = "2.0" sp-runtime = "2.0" diff --git a/bridges/relays/substrate-client/Cargo.toml b/bridges/relays/substrate-client/Cargo.toml index a1de85ec39fbb..d8f9232b25e0f 100644 --- a/bridges/relays/substrate-client/Cargo.toml +++ b/bridges/relays/substrate-client/Cargo.toml @@ -16,6 +16,7 @@ rand = "0.7" # Bridge dependencies +bp-message-lane = { path = "../../primitives/message-lane" } bp-runtime = { path = "../../primitives/runtime" } headers-relay = { path = "../headers-relay" } relay-utils = { path = "../utils" } @@ -27,6 +28,8 @@ frame-system = "2.0" pallet-balances = "2.0" sp-core = "2.0" sp-runtime = "2.0" +sp-std = "2.0" +sp-trie = "2.0" sp-version = "2.0" #[dev-dependencies] diff --git a/bridges/relays/substrate-client/src/client.rs b/bridges/relays/substrate-client/src/client.rs index 9ff54c4b3ab2c..1e0075f228c43 100644 --- a/bridges/relays/substrate-client/src/client.rs +++ b/bridges/relays/substrate-client/src/client.rs @@ -17,11 +17,13 @@ //! Substrate node client. use crate::chain::{Chain, ChainWithBalances}; -use crate::error::Error; -use crate::rpc::Substrate; -use crate::{ConnectionParams, Result}; +use crate::rpc::{Substrate, SubstrateMessageLane}; +use crate::{ConnectionParams, Error, Result}; +use bp_message_lane::{LaneId, MessageNonce}; +use bp_runtime::InstanceId; use codec::Decode; +use frame_support::weights::Weight; use frame_system::AccountInfo; use jsonrpsee::common::DeserializeOwned; use jsonrpsee::raw::RawClient; @@ -30,7 +32,9 @@ use jsonrpsee::{client::Subscription, Client as RpcClient}; use num_traits::Zero; use pallet_balances::AccountData; use sp_core::Bytes; +use sp_trie::StorageProof; use sp_version::RuntimeVersion; +use std::ops::RangeInclusive; const SUB_API_GRANDPA_AUTHORITIES: &str = "GrandpaApi_grandpa_authorities"; @@ -42,15 +46,26 @@ pub type OpaqueGrandpaAuthoritiesSet = Vec; /// Substrate client type. /// -/// Cloning Client is a cheap operation. -#[derive(Clone)] +/// Cloning `Client` is a cheap operation. pub struct Client { + /// Client connection params. + params: ConnectionParams, /// Substrate RPC client. client: RpcClient, /// Genesis block hash. genesis_hash: C::Hash, } +impl Clone for Client { + fn clone(&self) -> Self { + Client { + params: self.params.clone(), + client: self.client.clone(), + genesis_hash: self.genesis_hash, + } + } +} + impl std::fmt::Debug for Client { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { fmt.debug_struct("Client") @@ -62,15 +77,33 @@ impl std::fmt::Debug for Client { impl Client { /// Returns client that is able to call RPCs on Substrate node over websocket connection. pub async fn new(params: ConnectionParams) -> Result { - let uri = format!("ws://{}:{}", params.host, params.port); - let transport = WsTransportClient::new(&uri).await?; - let raw_client = RawClient::new(transport); - let client: RpcClient = raw_client.into(); + let client = Self::build_client(params.clone()).await?; let number: C::BlockNumber = Zero::zero(); let genesis_hash = Substrate::::chain_get_block_hash(&client, number).await?; - Ok(Self { client, genesis_hash }) + Ok(Self { + params, + client, + genesis_hash, + }) + } + + /// Reopen client connection. + pub async fn reconnect(self) -> Result { + Ok(Self { + params: self.params.clone(), + client: Self::build_client(self.params).await?, + genesis_hash: self.genesis_hash, + }) + } + + /// Build client to use in connection. + async fn build_client(params: ConnectionParams) -> Result { + let uri = format!("ws://{}:{}", params.host, params.port); + let transport = WsTransportClient::new(&uri).await?; + let raw_client = RawClient::new(transport); + Ok(raw_client.into()) } } @@ -80,6 +113,11 @@ impl Client { &self.genesis_hash } + /// Return hash of the best finalized block. + pub async fn best_finalized_header_hash(&self) -> Result { + Ok(Substrate::::chain_get_finalized_head(&self.client).await?) + } + /// Returns the best Substrate header. pub async fn best_header(&self) -> Result where @@ -169,6 +207,47 @@ impl Client { .map_err(Into::into) } + /// Returns proof-of-message(s) in given inclusive range. + pub async fn prove_messages( + &self, + instance: InstanceId, + lane: LaneId, + range: RangeInclusive, + include_outbound_lane_state: bool, + at_block: C::Hash, + ) -> Result<(Weight, StorageProof)> { + let (dispatch_weight, encoded_trie_nodes) = SubstrateMessageLane::::prove_messages( + &self.client, + instance, + lane, + *range.start(), + *range.end(), + include_outbound_lane_state, + Some(at_block), + ) + .await + .map_err(Error::Request)?; + let decoded_trie_nodes: Vec> = + Decode::decode(&mut &encoded_trie_nodes[..]).map_err(Error::ResponseParseFailed)?; + Ok((dispatch_weight, StorageProof::new(decoded_trie_nodes))) + } + + /// Returns proof-of-message(s) delivery. + pub async fn prove_messages_delivery( + &self, + instance: InstanceId, + lane: LaneId, + at_block: C::Hash, + ) -> Result { + let encoded_trie_nodes = + SubstrateMessageLane::::prove_messages_delivery(&self.client, instance, lane, Some(at_block)) + .await + .map_err(Error::Request)?; + let decoded_trie_nodes: Vec> = + Decode::decode(&mut &encoded_trie_nodes[..]).map_err(Error::ResponseParseFailed)?; + Ok(StorageProof::new(decoded_trie_nodes)) + } + /// Return new justifications stream. pub async fn subscribe_justifications(self) -> Result { Ok(self diff --git a/bridges/relays/substrate-client/src/lib.rs b/bridges/relays/substrate-client/src/lib.rs index b92e6d02885ec..567fc4a61984e 100644 --- a/bridges/relays/substrate-client/src/lib.rs +++ b/bridges/relays/substrate-client/src/lib.rs @@ -31,6 +31,9 @@ pub use crate::client::{Client, JustificationsSubscription, OpaqueGrandpaAuthori pub use crate::error::{Error, Result}; pub use bp_runtime::{BlockNumberOf, Chain as ChainBase, HashOf, HeaderOf}; +/// Header id used by the chain. +pub type HeaderIdOf = relay_utils::HeaderId, BlockNumberOf>; + /// Substrate-over-websocket connection params. #[derive(Debug, Clone)] pub struct ConnectionParams { diff --git a/bridges/relays/substrate-client/src/rpc.rs b/bridges/relays/substrate-client/src/rpc.rs index 552b20de622b5..89ed36695abcc 100644 --- a/bridges/relays/substrate-client/src/rpc.rs +++ b/bridges/relays/substrate-client/src/rpc.rs @@ -23,6 +23,9 @@ use crate::chain::Chain; +use bp_message_lane::{LaneId, MessageNonce}; +use bp_runtime::InstanceId; +use frame_support::weights::Weight; use sp_core::{ storage::{StorageData, StorageKey}, Bytes, @@ -33,6 +36,8 @@ jsonrpsee::rpc_api! { pub(crate) Substrate { #[rpc(method = "chain_getHeader", positional_params)] fn chain_get_header(block_hash: Option) -> C::Header; + #[rpc(method = "chain_getFinalizedHead", positional_params)] + fn chain_get_finalized_head() -> C::Hash; #[rpc(method = "chain_getBlock", positional_params)] fn chain_get_block(block_hash: Option) -> C::SignedBlock; #[rpc(method = "chain_getBlockHash", positional_params)] @@ -48,4 +53,23 @@ jsonrpsee::rpc_api! { #[rpc(method = "state_getRuntimeVersion", positional_params)] fn runtime_version() -> RuntimeVersion; } + + pub(crate) SubstrateMessageLane { + #[rpc(method = "messageLane_proveMessages", positional_params)] + fn prove_messages( + instance: InstanceId, + lane: LaneId, + begin: MessageNonce, + end: MessageNonce, + include_outbound_lane_state: bool, + block: Option, + ) -> (Weight, Bytes); + + #[rpc(method = "messageLane_proveMessagesDelivery", positional_params)] + fn prove_messages_delivery( + instance: InstanceId, + lane: LaneId, + block: Option, + ) -> Bytes; + } } diff --git a/bridges/relays/substrate/Cargo.toml b/bridges/relays/substrate/Cargo.toml index c5dca17c9fab2..2489ebcf95bec 100644 --- a/bridges/relays/substrate/Cargo.toml +++ b/bridges/relays/substrate/Cargo.toml @@ -20,6 +20,7 @@ structopt = "0.3" bp-message-lane = { path = "../../primitives/message-lane" } bp-millau = { path = "../../primitives/millau" } +bp-runtime = { path = "../../primitives/runtime" } bp-rialto = { path = "../../primitives/rialto" } headers-relay = { path = "../headers-relay" } messages-relay = { path = "../messages-relay" } @@ -37,3 +38,4 @@ rialto-runtime = { path = "../../bin/rialto/runtime" } frame-support = "2.0" sp-core = "2.0" sp-runtime = "2.0" +sp-trie = "2.0" diff --git a/bridges/relays/substrate/src/cli.rs b/bridges/relays/substrate/src/cli.rs index cff2d36211108..d3ad78c3a1a0e 100644 --- a/bridges/relays/substrate/src/cli.rs +++ b/bridges/relays/substrate/src/cli.rs @@ -50,6 +50,22 @@ pub enum Command { #[structopt(flatten)] prometheus_params: PrometheusParams, }, + /// Serve given lane of Millau -> Rialto messages. + MillauMessagesToRialto { + #[structopt(flatten)] + millau: MillauConnectionParams, + #[structopt(flatten)] + millau_sign: MillauSigningParams, + #[structopt(flatten)] + rialto: RialtoConnectionParams, + #[structopt(flatten)] + rialto_sign: RialtoSigningParams, + #[structopt(flatten)] + prometheus_params: PrometheusParams, + /// Hex-encoded id of lane that should be served by relay. + #[structopt(long)] + lane: HexLaneId, + }, /// Submit message to given Rialto -> Millau lane. SubmitMillauToRialtoMessage { #[structopt(flatten)] diff --git a/bridges/relays/substrate/src/main.rs b/bridges/relays/substrate/src/main.rs index 6d36e7606159e..98c30ff0be58a 100644 --- a/bridges/relays/substrate/src/main.rs +++ b/bridges/relays/substrate/src/main.rs @@ -36,7 +36,10 @@ mod cli; mod headers_maintain; mod headers_pipeline; mod headers_target; +mod messages_source; +mod messages_target; mod millau_headers_to_rialto; +mod millau_messages_to_rialto; mod rialto_headers_to_millau; fn main() { @@ -94,8 +97,47 @@ async fn run_command(command: cli::Command) -> Result<(), String> { millau_sign.millau_signer_password.as_deref(), ) .map_err(|e| format!("Failed to parse millau-signer: {:?}", e))?; + rialto_headers_to_millau::run(rialto_client, millau_client, millau_sign, prometheus_params.into()).await; } + cli::Command::MillauMessagesToRialto { + millau, + millau_sign, + rialto, + rialto_sign, + prometheus_params, + lane, + } => { + let millau_client = MillauClient::new(ConnectionParams { + host: millau.millau_host, + port: millau.millau_port, + }) + .await?; + let millau_sign = MillauSigningParams::from_suri( + &millau_sign.millau_signer, + millau_sign.millau_signer_password.as_deref(), + ) + .map_err(|e| format!("Failed to parse millau-signer: {:?}", e))?; + let rialto_client = RialtoClient::new(ConnectionParams { + host: rialto.rialto_host, + port: rialto.rialto_port, + }) + .await?; + let rialto_sign = RialtoSigningParams::from_suri( + &rialto_sign.rialto_signer, + rialto_sign.rialto_signer_password.as_deref(), + ) + .map_err(|e| format!("Failed to parse rialto-signer: {:?}", e))?; + + millau_messages_to_rialto::run( + millau_client, + millau_sign, + rialto_client, + rialto_sign, + lane.into(), + prometheus_params.into(), + ); + } cli::Command::SubmitMillauToRialtoMessage { millau, millau_sign, diff --git a/bridges/relays/substrate/src/messages_source.rs b/bridges/relays/substrate/src/messages_source.rs new file mode 100644 index 0000000000000..46e990e101682 --- /dev/null +++ b/bridges/relays/substrate/src/messages_source.rs @@ -0,0 +1,225 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Substrate client as Substrate messages source. The chain we connect to should have +//! runtime that implements `HeaderApi` to allow bridging with +//! chain. + +use async_trait::async_trait; +use bp_message_lane::{LaneId, MessageNonce}; +use bp_runtime::InstanceId; +use codec::{Decode, Encode}; +use frame_support::weights::Weight; +use messages_relay::{ + message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, + message_lane_loop::{ClientState, SourceClient, SourceClientState}, +}; +use relay_substrate_client::{Chain, Client, Error as SubstrateError, HashOf, HeaderIdOf}; +use relay_utils::HeaderId; +use sp_core::Bytes; +use sp_runtime::{traits::Header as HeaderT, DeserializeOwned}; +use sp_trie::StorageProof; +use std::{marker::PhantomData, ops::RangeInclusive}; + +/// Intermediate message proof returned by the source Substrate node. Includes everything +/// required to submit to the target node: cumulative dispatch weight of bundled messages and +/// the proof itself. +pub type SubstrateMessagesProof = (Weight, (HashOf, StorageProof, LaneId, MessageNonce, MessageNonce)); + +/// Substrate client as Substrate messages source. +pub struct SubstrateMessagesSource { + client: Client, + tx_maker: M, + lane: LaneId, + instance: InstanceId, + _marker: PhantomData

, +} + +/// Substrate transactions maker. +#[async_trait] +pub trait SubstrateTransactionMaker: Clone + Send + Sync { + /// Signed transaction type. + type SignedTransaction: Send + Sync + Encode; + + /// Make messages receiving proof transaction. + async fn make_messages_receiving_proof_transaction( + &self, + generated_at_block: TargetHeaderIdOf

, + proof: P::MessagesReceivingProof, + ) -> Result; +} + +impl SubstrateMessagesSource { + /// Create new Substrate headers source. + pub fn new(client: Client, tx_maker: M, lane: LaneId, instance: InstanceId) -> Self { + SubstrateMessagesSource { + client, + tx_maker, + lane, + instance, + _marker: Default::default(), + } + } +} + +impl Clone for SubstrateMessagesSource { + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + tx_maker: self.tx_maker.clone(), + lane: self.lane, + instance: self.instance, + _marker: Default::default(), + } + } +} + +#[async_trait] +impl SourceClient

for SubstrateMessagesSource +where + C: Chain, + C::Header: DeserializeOwned, + C::Index: DeserializeOwned, + ::Number: Into, + P: MessageLane< + MessageNonce = MessageNonce, + MessagesProof = SubstrateMessagesProof, + SourceHeaderNumber = ::Number, + SourceHeaderHash = ::Hash, + >, + P::TargetHeaderNumber: Decode, + P::TargetHeaderHash: Decode, + M: SubstrateTransactionMaker, +{ + type Error = SubstrateError; + + async fn reconnect(mut self) -> Result { + let new_client = self.client.clone().reconnect().await?; + self.client = new_client; + Ok(self) + } + + async fn state(&self) -> Result, Self::Error> { + read_client_state::<_, P::TargetHeaderHash, P::TargetHeaderNumber>(&self.client, P::TARGET_NAME).await + } + + async fn latest_generated_nonce( + &self, + id: SourceHeaderIdOf

, + ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error> { + let encoded_response = self + .client + .state_call( + // TODO: https://github.com/paritytech/parity-bridges-common/issues/457 + "OutboundLaneApi_latest_generated_nonce".into(), + Bytes(self.lane.encode()), + Some(id.1), + ) + .await?; + let latest_generated_nonce: P::MessageNonce = + Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; + Ok((id, latest_generated_nonce)) + } + + async fn latest_confirmed_received_nonce( + &self, + id: SourceHeaderIdOf

, + ) -> Result<(SourceHeaderIdOf

, P::MessageNonce), Self::Error> { + let encoded_response = self + .client + .state_call( + // TODO: https://github.com/paritytech/parity-bridges-common/issues/457 + "OutboundLaneApi_latest_received_nonce".into(), + Bytes(self.lane.encode()), + Some(id.1), + ) + .await?; + let latest_received_nonce: P::MessageNonce = + Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; + Ok((id, latest_received_nonce)) + } + + async fn prove_messages( + &self, + id: SourceHeaderIdOf

, + nonces: RangeInclusive, + include_outbound_lane_state: bool, + ) -> Result<(SourceHeaderIdOf

, RangeInclusive, P::MessagesProof), Self::Error> { + let (weight, proof) = self + .client + .prove_messages( + self.instance, + self.lane, + nonces.clone(), + include_outbound_lane_state, + id.1, + ) + .await?; + let proof = (id.1, proof, self.lane, *nonces.start(), *nonces.end()); + Ok((id, nonces, (weight, proof))) + } + + async fn submit_messages_receiving_proof( + &self, + generated_at_block: TargetHeaderIdOf

, + proof: P::MessagesReceivingProof, + ) -> Result<(), Self::Error> { + let tx = self + .tx_maker + .make_messages_receiving_proof_transaction(generated_at_block, proof) + .await?; + self.client.submit_extrinsic(Bytes(tx.encode())).await?; + Ok(()) + } +} + +pub async fn read_client_state( + self_client: &Client, + bridged_chain_name: &str, +) -> Result, HeaderId>, SubstrateError> +where + SelfChain: Chain, + SelfChain::Header: DeserializeOwned, + SelfChain::Index: DeserializeOwned, + BridgedHeaderHash: Decode, + BridgedHeaderNumber: Decode, +{ + // let's read our state first: we need best finalized header hash on **this** chain + let self_best_finalized_header_hash = self_client.best_finalized_header_hash().await?; + let self_best_finalized_header = self_client.header_by_hash(self_best_finalized_header_hash).await?; + let self_best_finalized_id = HeaderId(*self_best_finalized_header.number(), self_best_finalized_header_hash); + + // now let's read id of best finalized peer header at our best finalized block + let best_finalized_peer_on_self_method = format!("{}HeaderApi_finalized_block", bridged_chain_name); + let encoded_best_finalized_peer_on_self = self_client + .state_call( + best_finalized_peer_on_self_method, + Bytes(Vec::new()), + Some(self_best_finalized_header_hash), + ) + .await?; + let decoded_best_finalized_peer_on_self: (BridgedHeaderNumber, BridgedHeaderHash) = + Decode::decode(&mut &encoded_best_finalized_peer_on_self.0[..]).map_err(SubstrateError::ResponseParseFailed)?; + let peer_on_self_best_finalized_id = HeaderId( + decoded_best_finalized_peer_on_self.0, + decoded_best_finalized_peer_on_self.1, + ); + + Ok(ClientState { + best_self: self_best_finalized_id, + best_peer: peer_on_self_best_finalized_id, + }) +} diff --git a/bridges/relays/substrate/src/messages_target.rs b/bridges/relays/substrate/src/messages_target.rs new file mode 100644 index 0000000000000..541b4dc0f2d85 --- /dev/null +++ b/bridges/relays/substrate/src/messages_target.rs @@ -0,0 +1,176 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Substrate client as Substrate messages target. The chain we connect to should have +//! runtime that implements `HeaderApi` to allow bridging with +//! chain. + +use crate::messages_source::read_client_state; + +use async_trait::async_trait; +use bp_message_lane::{LaneId, MessageNonce}; +use bp_runtime::InstanceId; +use codec::{Decode, Encode}; +use messages_relay::{ + message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf}, + message_lane_loop::{TargetClient, TargetClientState}, +}; +use relay_substrate_client::{Chain, Client, Error as SubstrateError, HashOf}; +use sp_core::Bytes; +use sp_runtime::{traits::Header as HeaderT, DeserializeOwned}; +use sp_trie::StorageProof; +use std::{marker::PhantomData, ops::RangeInclusive}; + +/// Substrate client as Substrate messages target. +pub struct SubstrateMessagesTarget { + client: Client, + tx_maker: M, + lane: LaneId, + instance: InstanceId, + _marker: PhantomData

, +} + +/// Substrate transactions maker. +#[async_trait] +pub trait SubstrateTransactionMaker: Clone + Send + Sync { + /// Signed transaction type. + type SignedTransaction: Send + Sync + Encode; + + /// Make messages delivery transaction. + async fn make_messages_delivery_transaction( + &self, + generated_at_header: SourceHeaderIdOf

, + nonces: RangeInclusive, + proof: P::MessagesProof, + ) -> Result; +} + +impl SubstrateMessagesTarget { + /// Create new Substrate headers target. + pub fn new(client: Client, tx_maker: M, lane: LaneId, instance: InstanceId) -> Self { + SubstrateMessagesTarget { + client, + tx_maker, + lane, + instance, + _marker: Default::default(), + } + } +} + +impl Clone for SubstrateMessagesTarget { + fn clone(&self) -> Self { + Self { + client: self.client.clone(), + tx_maker: self.tx_maker.clone(), + lane: self.lane, + instance: self.instance, + _marker: Default::default(), + } + } +} + +#[async_trait] +impl TargetClient

for SubstrateMessagesTarget +where + C: Chain, + C::Header: DeserializeOwned, + C::Index: DeserializeOwned, + ::Number: Into, + P: MessageLane< + MessageNonce = MessageNonce, + MessagesReceivingProof = (HashOf, StorageProof, LaneId), + TargetHeaderNumber = ::Number, + TargetHeaderHash = ::Hash, + >, + P::SourceHeaderNumber: Decode, + P::SourceHeaderHash: Decode, + M: SubstrateTransactionMaker, +{ + type Error = SubstrateError; + + async fn reconnect(mut self) -> Result { + let new_client = self.client.clone().reconnect().await?; + self.client = new_client; + Ok(self) + } + + async fn state(&self) -> Result, Self::Error> { + read_client_state::<_, P::SourceHeaderHash, P::SourceHeaderNumber>(&self.client, P::SOURCE_NAME).await + } + + async fn latest_received_nonce( + &self, + id: TargetHeaderIdOf

, + ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error> { + let encoded_response = self + .client + .state_call( + // TODO: https://github.com/paritytech/parity-bridges-common/issues/457 + "InboundLaneApi_latest_received_nonce".into(), + Bytes(self.lane.encode()), + Some(id.1), + ) + .await?; + let latest_received_nonce: P::MessageNonce = + Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; + Ok((id, latest_received_nonce)) + } + + async fn latest_confirmed_received_nonce( + &self, + id: TargetHeaderIdOf

, + ) -> Result<(TargetHeaderIdOf

, P::MessageNonce), Self::Error> { + let encoded_response = self + .client + .state_call( + // TODO: https://github.com/paritytech/parity-bridges-common/issues/457 + "OutboundLaneApi_latest_received_nonce".into(), + Bytes(self.lane.encode()), + Some(id.1), + ) + .await?; + let latest_received_nonce: P::MessageNonce = + Decode::decode(&mut &encoded_response.0[..]).map_err(SubstrateError::ResponseParseFailed)?; + Ok((id, latest_received_nonce)) + } + + async fn prove_messages_receiving( + &self, + id: TargetHeaderIdOf

, + ) -> Result<(TargetHeaderIdOf

, P::MessagesReceivingProof), Self::Error> { + let proof = self + .client + .prove_messages_delivery(self.instance, self.lane, id.1) + .await?; + let proof = (id.1, proof, self.lane); + Ok((id, proof)) + } + + async fn submit_messages_proof( + &self, + generated_at_header: SourceHeaderIdOf

, + nonces: RangeInclusive, + proof: P::MessagesProof, + ) -> Result, Self::Error> { + let tx = self + .tx_maker + .make_messages_delivery_transaction(generated_at_header, nonces.clone(), proof) + .await?; + self.client.submit_extrinsic(Bytes(tx.encode())).await?; + Ok(nonces) + } +} diff --git a/bridges/relays/substrate/src/millau_messages_to_rialto.rs b/bridges/relays/substrate/src/millau_messages_to_rialto.rs new file mode 100644 index 0000000000000..47a5158b552ef --- /dev/null +++ b/bridges/relays/substrate/src/millau_messages_to_rialto.rs @@ -0,0 +1,171 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +//! Millau-to-Rialto messages sync entrypoint. + +use crate::messages_source::{SubstrateMessagesSource, SubstrateTransactionMaker as SubstrateSourceTransactionMaker}; +use crate::messages_target::{SubstrateMessagesTarget, SubstrateTransactionMaker as SubstrateTargetTransactionMaker}; +use crate::{MillauClient, RialtoClient}; + +use async_trait::async_trait; +use bp_message_lane::{LaneId, MessageNonce}; +use bp_runtime::{MILLAU_BRIDGE_INSTANCE, RIALTO_BRIDGE_INSTANCE}; +use frame_support::weights::Weight; +use messages_relay::message_lane::MessageLane; +use relay_millau_client::{HeaderId as MillauHeaderId, Millau, SigningParams as MillauSigningParams}; +use relay_rialto_client::{HeaderId as RialtoHeaderId, Rialto, SigningParams as RialtoSigningParams}; +use relay_substrate_client::{BlockNumberOf, Error as SubstrateError, HashOf, TransactionSignScheme}; +use relay_utils::metrics::MetricsParams; +use sp_core::Pair; +use sp_trie::StorageProof; +use std::{ops::RangeInclusive, time::Duration}; + +/// Millau -> Rialto messages proof: +/// +/// - cumulative dispatch-weight of messages in the batch; +/// - proof that we'll actually submit to the Rialto node. +type FromMillauMessagesProof = ( + Weight, + (HashOf, StorageProof, LaneId, MessageNonce, MessageNonce), +); +/// Rialto -> Millau messages receiving proof. +type FromRialtoMessagesReceivingProof = (HashOf, StorageProof, LaneId); + +/// Millau-to-Rialto messages pipeline. +#[derive(Debug, Clone, Copy)] +struct MillauMessagesToRialto; + +impl MessageLane for MillauMessagesToRialto { + const SOURCE_NAME: &'static str = "Millau"; + const TARGET_NAME: &'static str = "Rialto"; + + type MessageNonce = MessageNonce; + type MessagesProof = FromMillauMessagesProof; + type MessagesReceivingProof = FromRialtoMessagesReceivingProof; + + type SourceHeaderNumber = BlockNumberOf; + type SourceHeaderHash = HashOf; + + type TargetHeaderNumber = BlockNumberOf; + type TargetHeaderHash = HashOf; +} + +/// Millau node as messages source. +type MillauSourceClient = SubstrateMessagesSource; + +/// Millau transaction maker. +#[derive(Clone)] +struct MillauTransactionMaker { + client: MillauClient, + sign: MillauSigningParams, +} + +#[async_trait] +impl SubstrateSourceTransactionMaker for MillauTransactionMaker { + type SignedTransaction = ::SignedTransaction; + + async fn make_messages_receiving_proof_transaction( + &self, + _generated_at_block: RialtoHeaderId, + proof: FromRialtoMessagesReceivingProof, + ) -> Result { + let account_id = self.sign.signer.public().as_array_ref().clone().into(); + let nonce = self.client.next_account_index(account_id).await?; + let call = millau_runtime::MessageLaneCall::receive_messages_delivery_proof(proof).into(); + let transaction = Millau::sign_transaction(&self.client, &self.sign.signer, nonce, call); + Ok(transaction) + } +} + +/// Rialto node as messages target. +type RialtoTargetClient = SubstrateMessagesTarget; + +/// Rialto transaction maker. +#[derive(Clone)] +struct RialtoTransactionMaker { + client: RialtoClient, + relayer_id: bp_millau::AccountId, + sign: RialtoSigningParams, +} + +#[async_trait] +impl SubstrateTargetTransactionMaker for RialtoTransactionMaker { + type SignedTransaction = ::SignedTransaction; + + async fn make_messages_delivery_transaction( + &self, + _generated_at_header: MillauHeaderId, + _nonces: RangeInclusive, + proof: FromMillauMessagesProof, + ) -> Result { + let (dispatch_weight, proof) = proof; + let account_id = self.sign.signer.public().as_array_ref().clone().into(); + let nonce = self.client.next_account_index(account_id).await?; + let call = + rialto_runtime::MessageLaneCall::receive_messages_proof(self.relayer_id.clone(), proof, dispatch_weight) + .into(); + let transaction = Rialto::sign_transaction(&self.client, &self.sign.signer, nonce, call); + Ok(transaction) + } +} + +/// Run Millau-to-Rialto messages sync. +pub fn run( + millau_client: MillauClient, + millau_sign: MillauSigningParams, + rialto_client: RialtoClient, + rialto_sign: RialtoSigningParams, + lane: LaneId, + metrics_params: Option, +) { + let millau_tick = Duration::from_secs(5); + let rialto_tick = Duration::from_secs(5); + let reconnect_delay = Duration::from_secs(10); + let stall_timeout = Duration::from_secs(5 * 60); + let relayer_id = millau_sign.signer.public().as_array_ref().clone().into(); + + messages_relay::message_lane_loop::run( + messages_relay::message_lane_loop::Params { + lane, + source_tick: millau_tick, + target_tick: rialto_tick, + reconnect_delay, + stall_timeout, + max_unconfirmed_nonces_at_target: bp_rialto::MAX_UNCONFIRMED_MESSAGES_AT_INBOUND_LANE, + }, + MillauSourceClient::new( + millau_client.clone(), + MillauTransactionMaker { + client: millau_client, + sign: millau_sign, + }, + lane, + RIALTO_BRIDGE_INSTANCE, + ), + RialtoTargetClient::new( + rialto_client.clone(), + RialtoTransactionMaker { + client: rialto_client, + relayer_id, + sign: rialto_sign, + }, + lane, + MILLAU_BRIDGE_INSTANCE, + ), + metrics_params, + futures::future::pending(), + ); +}