From edf47b85d03a23b2efda69adc9d4c999523db7cb Mon Sep 17 00:00:00 2001 From: Serban Iorga Date: Tue, 12 Sep 2023 16:38:26 +0300 Subject: [PATCH] Equivocation detection loop: Reorganize block checking logic as state machine (#2555) (#2557) * Split `reconnect_failed_client()` logic * Reorganize block checking logic as state machine This way we'll be able to save the state in case of a failure --- .../relays/equivocation/src/block_checker.rs | 252 ++++++++++++++++++ .../equivocation/src/equivocation_loop.rs | 198 ++------------ bridges/relays/equivocation/src/lib.rs | 50 +++- bridges/relays/utils/src/relay_loop.rs | 57 ++-- 4 files changed, 340 insertions(+), 217 deletions(-) create mode 100644 bridges/relays/equivocation/src/block_checker.rs diff --git a/bridges/relays/equivocation/src/block_checker.rs b/bridges/relays/equivocation/src/block_checker.rs new file mode 100644 index 0000000000000..358d61fcf8e56 --- /dev/null +++ b/bridges/relays/equivocation/src/block_checker.rs @@ -0,0 +1,252 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see . + +use crate::{ + handle_client_error, reporter::EquivocationsReporter, EquivocationDetectionPipeline, + EquivocationReportingContext, HeaderFinalityInfo, SourceClient, TargetClient, +}; + +use bp_header_chain::{FinalityProof, FindEquivocations as FindEquivocationsT}; +use finality_relay::FinalityProofsBuf; +use futures::future::{BoxFuture, FutureExt}; +use num_traits::Saturating; + +/// First step in the block checking state machine. +/// +/// Getting the finality info associated to the source headers synced with the target chain +/// at the specified block. +pub struct ReadSyncedHeaders { + pub target_block_num: P::TargetNumber, +} + +impl ReadSyncedHeaders

{ + pub async fn next>( + self, + target_client: &mut TC, + ) -> Result, Self> { + match target_client.synced_headers_finality_info(self.target_block_num).await { + Ok(synced_headers) => + Ok(ReadContext { target_block_num: self.target_block_num, synced_headers }), + Err(e) => { + log::error!( + target: "bridge", + "Could not get {} headers synced to {} at block {}: {e:?}", + P::SOURCE_NAME, + P::TARGET_NAME, + self.target_block_num + ); + + // Reconnect target client in case of a connection error. + handle_client_error(target_client, e).await; + + Err(self) + }, + } + } +} + +/// Second step in the block checking state machine. +/// +/// Reading the equivocation reporting context from the target chain. +pub struct ReadContext { + target_block_num: P::TargetNumber, + synced_headers: Vec>, +} + +impl ReadContext

{ + pub async fn next>( + self, + target_client: &mut TC, + ) -> Result>, Self> { + match EquivocationReportingContext::try_read_from_target::( + target_client, + self.target_block_num.saturating_sub(1.into()), + ) + .await + { + Ok(Some(context)) => Ok(Some(FindEquivocations { + target_block_num: self.target_block_num, + synced_headers: self.synced_headers, + context, + })), + Ok(None) => Ok(None), + Err(e) => { + log::error!( + target: "bridge", + "Could not read {} `EquivocationReportingContext` from {} at block {}: {e:?}", + P::SOURCE_NAME, + P::TARGET_NAME, + self.target_block_num.saturating_sub(1.into()), + ); + + // Reconnect target client in case of a connection error. + handle_client_error(target_client, e).await; + + Err(self) + }, + } + } +} + +/// Third step in the block checking state machine. +/// +/// Searching for equivocations in the source headers synced with the target chain. +pub struct FindEquivocations { + target_block_num: P::TargetNumber, + synced_headers: Vec>, + context: EquivocationReportingContext

, +} + +impl FindEquivocations

{ + pub fn next( + mut self, + finality_proofs_buf: &mut FinalityProofsBuf

, + ) -> Vec> { + let mut result = vec![]; + for synced_header in self.synced_headers { + match P::EquivocationsFinder::find_equivocations( + &self.context.synced_verification_context, + &synced_header.finality_proof, + finality_proofs_buf.buf().as_slice(), + ) { + Ok(equivocations) => result.push(ReportEquivocations { + source_block_hash: self.context.synced_header_hash, + equivocations, + }), + Err(e) => { + log::error!( + target: "bridge", + "Could not search for equivocations in the finality proof \ + for source header {:?} synced at target block {}: {e:?}", + synced_header.finality_proof.target_header_hash(), + self.target_block_num + ); + }, + }; + + finality_proofs_buf.prune(synced_header.finality_proof.target_header_number(), None); + self.context.update(synced_header); + } + + result + } +} + +/// Fourth step in the block checking state machine. +/// +/// Reporting the detected equivocations (if any). +pub struct ReportEquivocations { + source_block_hash: P::Hash, + equivocations: Vec, +} + +impl ReportEquivocations

{ + pub async fn next>( + mut self, + source_client: &mut SC, + reporter: &mut EquivocationsReporter, + ) -> Result<(), Self> { + let mut unprocessed_equivocations = vec![]; + for equivocation in self.equivocations { + match reporter + .submit_report(source_client, self.source_block_hash, equivocation.clone()) + .await + { + Ok(_) => {}, + Err(e) => { + log::error!( + target: "bridge", + "Could not submit equivocation report to {} for {equivocation:?}: {e:?}", + P::SOURCE_NAME, + ); + + // Mark the equivocation as unprocessed + unprocessed_equivocations.push(equivocation); + // Reconnect source client in case of a connection error. + handle_client_error(source_client, e).await; + }, + } + } + + self.equivocations = unprocessed_equivocations; + if !self.equivocations.is_empty() { + return Err(self) + } + + Ok(()) + } +} + +/// Block checking state machine. +pub enum BlockChecker { + ReadSyncedHeaders(ReadSyncedHeaders

), + ReadContext(ReadContext

), + ReportEquivocations(Vec>), +} + +impl BlockChecker

{ + pub fn new(target_block_num: P::TargetNumber) -> Self { + Self::ReadSyncedHeaders(ReadSyncedHeaders { target_block_num }) + } + + pub fn run<'a, SC: SourceClient

, TC: TargetClient

>( + self, + source_client: &'a mut SC, + target_client: &'a mut TC, + finality_proofs_buf: &'a mut FinalityProofsBuf

, + reporter: &'a mut EquivocationsReporter, + ) -> BoxFuture<'a, Result<(), Self>> { + async move { + match self { + Self::ReadSyncedHeaders(state) => { + let read_context = + state.next(target_client).await.map_err(Self::ReadSyncedHeaders)?; + Self::ReadContext(read_context) + .run(source_client, target_client, finality_proofs_buf, reporter) + .await + }, + Self::ReadContext(state) => { + let maybe_find_equivocations = + state.next(target_client).await.map_err(Self::ReadContext)?; + let find_equivocations = match maybe_find_equivocations { + Some(find_equivocations) => find_equivocations, + None => return Ok(()), + }; + Self::ReportEquivocations(find_equivocations.next(finality_proofs_buf)) + .run(source_client, target_client, finality_proofs_buf, reporter) + .await + }, + Self::ReportEquivocations(state) => { + let mut failures = vec![]; + for report_equivocations in state { + if let Err(failure) = + report_equivocations.next(source_client, reporter).await + { + failures.push(failure); + } + } + + if !failures.is_empty() { + return Err(Self::ReportEquivocations(failures)) + } + + Ok(()) + }, + } + } + .boxed() + } +} diff --git a/bridges/relays/equivocation/src/equivocation_loop.rs b/bridges/relays/equivocation/src/equivocation_loop.rs index 61ffa92c8dc46..da3f72b94660d 100644 --- a/bridges/relays/equivocation/src/equivocation_loop.rs +++ b/bridges/relays/equivocation/src/equivocation_loop.rs @@ -15,55 +15,17 @@ // along with Parity Bridges Common. If not, see . use crate::{ - reporter::EquivocationsReporter, EquivocationDetectionPipeline, HeaderFinalityInfo, + handle_client_error, reporter::EquivocationsReporter, EquivocationDetectionPipeline, SourceClient, TargetClient, }; -use bp_header_chain::{FinalityProof, FindEquivocations}; +use crate::block_checker::BlockChecker; use finality_relay::{FinalityProofsBuf, FinalityProofsStream}; use futures::{select, FutureExt}; use num_traits::Saturating; -use relay_utils::{ - metrics::MetricsParams, - relay_loop::{reconnect_failed_client, RECONNECT_DELAY}, - FailedClient, MaybeConnectionError, -}; +use relay_utils::{metrics::MetricsParams, FailedClient}; use std::{future::Future, time::Duration}; -/// The context needed for finding equivocations inside finality proofs and reporting them. -struct EquivocationReportingContext { - synced_header_hash: P::Hash, - synced_verification_context: P::FinalityVerificationContext, -} - -impl EquivocationReportingContext

{ - /// Try to get the `EquivocationReportingContext` used by the target chain - /// at the provided block. - async fn try_read_from_target>( - target_client: &TC, - at: P::TargetNumber, - ) -> Result, TC::Error> { - let maybe_best_synced_header_hash = target_client.best_synced_header_hash(at).await?; - Ok(match maybe_best_synced_header_hash { - Some(best_synced_header_hash) => Some(EquivocationReportingContext { - synced_header_hash: best_synced_header_hash, - synced_verification_context: target_client - .finality_verification_context(at) - .await?, - }), - None => None, - }) - } - - /// Update with the new context introduced by the `HeaderFinalityInfo

` if any. - fn update(&mut self, info: HeaderFinalityInfo

) { - if let Some(new_verification_context) = info.new_verification_context { - self.synced_header_hash = info.finality_proof.target_header_hash(); - self.synced_verification_context = new_verification_context; - } - } -} - /// Equivocations detection loop state. struct EquivocationDetectionLoop< P: EquivocationDetectionPipeline, @@ -85,34 +47,6 @@ struct EquivocationDetectionLoop< impl, TC: TargetClient

> EquivocationDetectionLoop { - async fn handle_source_error(&mut self, e: SC::Error) { - if e.is_connection_error() { - reconnect_failed_client( - FailedClient::Source, - RECONNECT_DELAY, - &mut self.source_client, - &mut self.target_client, - ) - .await; - } else { - async_std::task::sleep(RECONNECT_DELAY).await; - } - } - - async fn handle_target_error(&mut self, e: TC::Error) { - if e.is_connection_error() { - reconnect_failed_client( - FailedClient::Target, - RECONNECT_DELAY, - &mut self.source_client, - &mut self.target_client, - ) - .await; - } else { - async_std::task::sleep(RECONNECT_DELAY).await; - } - } - async fn ensure_finality_proofs_stream(&mut self) { match self.finality_proofs_stream.ensure_stream(&self.source_client).await { Ok(_) => {}, @@ -124,7 +58,7 @@ impl, TC: TargetClient

> ); // Reconnect to the source client if needed - self.handle_source_error(e).await + handle_client_error(&mut self.source_client, e).await; }, } } @@ -140,116 +74,13 @@ impl, TC: TargetClient

> ); // Reconnect target client and move on - self.handle_target_error(e).await; - - None - }, - } - } - - async fn build_equivocation_reporting_context( - &mut self, - block_num: P::TargetNumber, - ) -> Option> { - match EquivocationReportingContext::try_read_from_target( - &self.target_client, - block_num.saturating_sub(1.into()), - ) - .await - { - Ok(Some(context)) => Some(context), - Ok(None) => None, - Err(e) => { - log::error!( - target: "bridge", - "Could not read {} `EquivocationReportingContext` from {} at block {block_num}: {e:?}", - P::SOURCE_NAME, - P::TARGET_NAME, - ); + handle_client_error(&mut self.target_client, e).await; - // Reconnect target client if needed and move on. - self.handle_target_error(e).await; None }, } } - /// Try to get the finality info associated to the source headers synced with the target chain - /// at the specified block. - async fn synced_source_headers_at_target( - &mut self, - at: P::TargetNumber, - ) -> Vec> { - match self.target_client.synced_headers_finality_info(at).await { - Ok(synced_headers) => synced_headers, - Err(e) => { - log::error!( - target: "bridge", - "Could not get {} headers synced to {} at block {at:?}", - P::SOURCE_NAME, - P::TARGET_NAME - ); - - // Reconnect in case of a connection error. - self.handle_target_error(e).await; - // And move on to the next block. - vec![] - }, - } - } - - async fn report_equivocation(&mut self, at: P::Hash, equivocation: P::EquivocationProof) { - match self.reporter.submit_report(&self.source_client, at, equivocation.clone()).await { - Ok(_) => {}, - Err(e) => { - log::error!( - target: "bridge", - "Could not submit equivocation report to {} for {equivocation:?}: {e:?}", - P::SOURCE_NAME, - ); - - // Reconnect source client and move on - self.handle_source_error(e).await; - }, - } - } - - async fn check_block( - &mut self, - block_num: P::TargetNumber, - context: &mut EquivocationReportingContext

, - ) { - let synced_headers = self.synced_source_headers_at_target(block_num).await; - - for synced_header in synced_headers { - self.finality_proofs_buf.fill(&mut self.finality_proofs_stream); - - let equivocations = match P::EquivocationsFinder::find_equivocations( - &context.synced_verification_context, - &synced_header.finality_proof, - self.finality_proofs_buf.buf().as_slice(), - ) { - Ok(equivocations) => equivocations, - Err(e) => { - log::error!( - target: "bridge", - "Could not search for equivocations in the finality proof \ - for source header {:?} synced at target block {block_num:?}: {e:?}", - synced_header.finality_proof.target_header_hash() - ); - continue - }, - }; - for equivocation in equivocations { - self.report_equivocation(context.synced_header_hash, equivocation).await; - } - - self.finality_proofs_buf - .prune(synced_header.finality_proof.target_header_number(), None); - context.update(synced_header); - } - } - async fn do_run(&mut self, tick: Duration, exit_signal: impl Future) { let exit_signal = exit_signal.fuse(); futures::pin_mut!(exit_signal); @@ -273,15 +104,16 @@ impl, TC: TargetClient

> // Check the available blocks let mut current_block_number = from; while current_block_number <= until { - let mut context = - match self.build_equivocation_reporting_context(current_block_number).await { - Some(context) => context, - None => { - current_block_number = current_block_number.saturating_add(1.into()); - continue - }, - }; - self.check_block(current_block_number, &mut context).await; + self.finality_proofs_buf.fill(&mut self.finality_proofs_stream); + let block_checker = BlockChecker::new(current_block_number); + let _ = block_checker + .run( + &mut self.source_client, + &mut self.target_client, + &mut self.finality_proofs_buf, + &mut self.reporter, + ) + .await; current_block_number = current_block_number.saturating_add(1.into()); } self.until_block_num = Some(current_block_number); diff --git a/bridges/relays/equivocation/src/lib.rs b/bridges/relays/equivocation/src/lib.rs index 6f9337483fdab..bb1f40c13e6d4 100644 --- a/bridges/relays/equivocation/src/lib.rs +++ b/bridges/relays/equivocation/src/lib.rs @@ -14,13 +14,17 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . +mod block_checker; mod equivocation_loop; mod reporter; use async_trait::async_trait; -use bp_header_chain::FindEquivocations; +use bp_header_chain::{FinalityProof, FindEquivocations}; use finality_relay::{FinalityPipeline, SourceClientBase}; -use relay_utils::{relay_loop::Client as RelayClient, TransactionTracker}; +use relay_utils::{ + relay_loop::{Client as RelayClient, RECONNECT_DELAY}, + MaybeConnectionError, TransactionTracker, +}; use std::fmt::Debug; pub use equivocation_loop::run; @@ -85,3 +89,45 @@ pub trait TargetClient: RelayClient { at: P::TargetNumber, ) -> Result>, Self::Error>; } + +/// The context needed for finding equivocations inside finality proofs and reporting them. +struct EquivocationReportingContext { + pub synced_header_hash: P::Hash, + pub synced_verification_context: P::FinalityVerificationContext, +} + +impl EquivocationReportingContext

{ + /// Try to get the `EquivocationReportingContext` used by the target chain + /// at the provided block. + pub async fn try_read_from_target>( + target_client: &TC, + at: P::TargetNumber, + ) -> Result, TC::Error> { + let maybe_best_synced_header_hash = target_client.best_synced_header_hash(at).await?; + Ok(match maybe_best_synced_header_hash { + Some(best_synced_header_hash) => Some(EquivocationReportingContext { + synced_header_hash: best_synced_header_hash, + synced_verification_context: target_client + .finality_verification_context(at) + .await?, + }), + None => None, + }) + } + + /// Update with the new context introduced by the `HeaderFinalityInfo

` if any. + pub fn update(&mut self, info: HeaderFinalityInfo

) { + if let Some(new_verification_context) = info.new_verification_context { + self.synced_header_hash = info.finality_proof.target_header_hash(); + self.synced_verification_context = new_verification_context; + } + } +} + +async fn handle_client_error(client: &mut C, e: C::Error) { + if e.is_connection_error() { + client.reconnect_until_success(RECONNECT_DELAY).await; + } else { + async_std::task::sleep(RECONNECT_DELAY).await; + } +} diff --git a/bridges/relays/utils/src/relay_loop.rs b/bridges/relays/utils/src/relay_loop.rs index dad7293de6d28..7105190a45831 100644 --- a/bridges/relays/utils/src/relay_loop.rs +++ b/bridges/relays/utils/src/relay_loop.rs @@ -35,6 +35,25 @@ pub trait Client: 'static + Clone + Send + Sync { /// Try to reconnect to source node. async fn reconnect(&mut self) -> Result<(), Self::Error>; + + /// Try to reconnect to the source node in an infinite loop until it succeeds. + async fn reconnect_until_success(&mut self, delay: Duration) { + loop { + match self.reconnect().await { + Ok(()) => break, + Err(error) => { + log::warn!( + target: "bridge", + "Failed to reconnect to client. Going to retry in {}s: {:?}", + delay.as_secs(), + error, + ); + + async_std::task::sleep(delay).await; + }, + } + } + } } #[async_trait] @@ -226,44 +245,18 @@ impl LoopMetrics { } } -/// Deal with the client who has returned connection error. +/// Deal with the clients that have returned connection error. pub async fn reconnect_failed_client( failed_client: FailedClient, reconnect_delay: Duration, source_client: &mut impl Client, target_client: &mut impl Client, ) { - loop { - async_std::task::sleep(reconnect_delay).await; - if failed_client == FailedClient::Both || failed_client == FailedClient::Source { - match source_client.reconnect().await { - Ok(()) => (), - Err(error) => { - log::warn!( - target: "bridge", - "Failed to reconnect to source client. Going to retry in {}s: {:?}", - reconnect_delay.as_secs(), - error, - ); - continue - }, - } - } - if failed_client == FailedClient::Both || failed_client == FailedClient::Target { - match target_client.reconnect().await { - Ok(()) => (), - Err(error) => { - log::warn!( - target: "bridge", - "Failed to reconnect to target client. Going to retry in {}s: {:?}", - reconnect_delay.as_secs(), - error, - ); - continue - }, - } - } + if failed_client == FailedClient::Source || failed_client == FailedClient::Both { + source_client.reconnect_until_success(reconnect_delay).await; + } - break + if failed_client == FailedClient::Target || failed_client == FailedClient::Both { + target_client.reconnect_until_success(reconnect_delay).await; } }