From 28312c6906c1ce2913045d45bb862d192db01cf4 Mon Sep 17 00:00:00 2001 From: Kiril Mihaylov <80464733+KirilMihaylov@users.noreply.github.com> Date: Fri, 29 Mar 2024 15:26:19 +0200 Subject: [PATCH] refactor: Fail-fast during provider spawning on out-of-sync nodes --- alarms-dispatcher/src/main.rs | 29 +++--- broadcast/src/lib.rs | 7 +- market-data-feeder/src/main.rs | 26 +++--- market-data-feeder/src/workers/mod.rs | 125 +++++++++----------------- 4 files changed, 76 insertions(+), 111 deletions(-) diff --git a/alarms-dispatcher/src/main.rs b/alarms-dispatcher/src/main.rs index 0fdf53c..196ce1f 100644 --- a/alarms-dispatcher/src/main.rs +++ b/alarms-dispatcher/src/main.rs @@ -12,6 +12,7 @@ use semver::{ Prerelease as SemVerPrerelease, Version, }; use serde::Deserialize; +use tokio::task::block_in_place; use tracing::{error, info}; use tracing_appender::{ non_blocking::{self, NonBlocking}, @@ -215,19 +216,21 @@ where let poll_time = config.broadcast.poll_time; - move |tx_sender| { - generators::spawn( - &node_client, - &{ signer_address }, - &{ tx_sender }, - &TasksConfig { - time_alarms_config: config.time_alarms, - oracle_alarms_config: config.market_price_oracle, - tick_time, - poll_time, - }, - contracts, - ) + move |tx_sender| async move { + block_in_place(|| { + generators::spawn( + &node_client, + &{ signer_address }, + &{ tx_sender }, + &TasksConfig { + time_alarms_config: config.time_alarms, + oracle_alarms_config: config.market_price_oracle, + tick_time, + poll_time, + }, + contracts, + ) + }) } }; diff --git a/broadcast/src/lib.rs b/broadcast/src/lib.rs index a85add6..a71f4c8 100644 --- a/broadcast/src/lib.rs +++ b/broadcast/src/lib.rs @@ -56,7 +56,7 @@ pub mod mode; mod preprocess; #[allow(clippy::future_not_send)] -pub async fn broadcast( +pub async fn broadcast( signer: Signer, config: Config, node_client: NodeClient, @@ -66,7 +66,8 @@ pub async fn broadcast( where Impl: mode::Impl, SpawnGeneratorsF: - FnOnce(TxRequestSender) -> Result + Send, + FnOnce(TxRequestSender) -> SpawnGeneratorsFuture + Send, + SpawnGeneratorsFuture: Future> + Send, { let (tx_sender, tx_receiver): ( UnboundedSender>, @@ -76,7 +77,7 @@ where let SpawnResult { mut tx_generators_set, tx_result_senders, - }: SpawnResult = spawn_generators(tx_sender)?; + }: SpawnResult = spawn_generators(tx_sender).await?; let mut signal = pin!(tokio::signal::ctrl_c()); diff --git a/market-data-feeder/src/main.rs b/market-data-feeder/src/main.rs index b0764ec..28efd49 100644 --- a/market-data-feeder/src/main.rs +++ b/market-data-feeder/src/main.rs @@ -12,7 +12,6 @@ use semver::{ Prerelease as SemVerPrerelease, Version, }; use serde::Deserialize; -use tokio::task::block_in_place; use tracing::{error, info}; use tracing_appender::{ non_blocking::{self, NonBlocking}, @@ -95,22 +94,21 @@ async fn app_main() -> Result<()> { let signer_address: Arc = Arc::from(signer.signer_address()); - move |tx_request_sender| { + move |tx_request_sender| async move { info!("Starting workers..."); - block_in_place(move || { - workers::spawn(SpawnContext { - node_client: node_client.clone(), - providers: config.providers, - price_comparison_providers: config.comparison_providers, - tx_request_sender, - signer_address, - hard_gas_limit: config.hard_gas_limit, - time_before_feeding: config.time_before_feeding, - tick_time: config.broadcast.tick_time, - poll_time: config.broadcast.poll_time, - }) + workers::spawn(SpawnContext { + node_client: node_client.clone(), + providers: config.providers, + price_comparison_providers: config.comparison_providers, + tx_request_sender, + signer_address, + hard_gas_limit: config.hard_gas_limit, + time_before_feeding: config.time_before_feeding, + tick_time: config.broadcast.tick_time, + poll_time: config.broadcast.poll_time, }) + .await .map(|spawn_result| { info!("Workers started successfully."); diff --git a/market-data-feeder/src/workers/mod.rs b/market-data-feeder/src/workers/mod.rs index 38eec7d..0a4f2d6 100644 --- a/market-data-feeder/src/workers/mod.rs +++ b/market-data-feeder/src/workers/mod.rs @@ -67,7 +67,7 @@ pub(crate) struct SpawnContext { pub(crate) poll_time: Duration, } -pub fn spawn( +pub async fn spawn( SpawnContext { node_client, providers, @@ -95,83 +95,10 @@ pub fn spawn( let mut tx_result_senders: BTreeMap = BTreeMap::new(); - providers - .into_iter() - .enumerate() - .try_for_each(try_for_each_provider_f(TryForEachProviderContext { - node_client, - tx_generators_set: &mut tx_generators_set, - tx_result_senders: &mut tx_result_senders, - tx_request_sender, - signer_address, - price_comparison_providers, - hard_gas_limit, - time_before_feeding, - tick_time, - poll_time, - })) - .map(|()| SpawnResult::new(tx_generators_set, tx_result_senders)) -} - -fn construct_comparison_provider_f( - node_client: &NodeClient, -) -> impl Fn( - (Arc, ComparisonProviderConfig), -) -> AppResult<(Arc, Arc>)> { - let node_client: NodeClient = node_client.clone(); - - move |(id, config): (Arc, ComparisonProviderConfig)| { - if let Some(result) = providers::Providers::visit_comparison_provider( - &config.provider.name().clone(), - PriceComparisonProviderVisitor { - provider_id: id.clone(), - provider_config: config, - node_client: &node_client, - }, - ) { - result - .map(|comparison_provider| (id, comparison_provider)) - .map_err(error_mod::Application::Worker) - } else { - Err(error_mod::Application::UnknownPriceComparisonProviderId(id)) - } - } -} - -struct TryForEachProviderContext<'r> { - node_client: NodeClient, - tx_generators_set: &'r mut JoinSet, - tx_result_senders: &'r mut BTreeMap, - tx_request_sender: TxRequestSender, - signer_address: Arc, - price_comparison_providers: - BTreeMap, Arc>>, - hard_gas_limit: NonZeroU64, - time_before_feeding: Duration, - tick_time: Duration, - poll_time: Duration, -} - -fn try_for_each_provider_f( - TryForEachProviderContext { - node_client, - tx_generators_set, - tx_result_senders, - tx_request_sender, - signer_address, - price_comparison_providers, - hard_gas_limit, - time_before_feeding, - tick_time, - poll_time, - }: TryForEachProviderContext<'_>, -) -> impl FnMut((usize, (Box, ProviderWithComparisonConfig))) -> AppResult<()> - + '_ { - move |(monotonic_id, (provider_id, config)): ( - usize, - (Box, ProviderWithComparisonConfig), - )| { - config + for (monotonic_id, (provider_id, config)) in + providers.into_iter().enumerate() + { + let result = config .comparison .map( |ComparisonProviderIdAndMaxDeviation { @@ -205,8 +132,8 @@ fn try_for_each_provider_f( poll_time, }, node_client: &node_client, - tx_generators_set, - tx_result_senders, + tx_generators_set: &mut tx_generators_set, + tx_result_senders: &mut tx_result_senders, provider_id, provider_config: config.provider, price_comparison_provider, @@ -216,7 +143,43 @@ fn try_for_each_provider_f( .ok_or(error_mod::Application::UnknownProviderId(provider_name)) .and_then(|result: Result<(), error_mod::Worker>| result.map_err(From::from)) }, - ) + ); + + match result { + Ok(()) => {}, + Err(error) => { + tx_generators_set.shutdown().await; + + return Err(error); + }, + } + } + + Ok(SpawnResult::new(tx_generators_set, tx_result_senders)) +} + +fn construct_comparison_provider_f( + node_client: &NodeClient, +) -> impl Fn( + (Arc, ComparisonProviderConfig), +) -> AppResult<(Arc, Arc>)> { + let node_client: NodeClient = node_client.clone(); + + move |(id, config): (Arc, ComparisonProviderConfig)| { + if let Some(result) = providers::Providers::visit_comparison_provider( + &config.provider.name().clone(), + PriceComparisonProviderVisitor { + provider_id: id.clone(), + provider_config: config, + node_client: &node_client, + }, + ) { + result + .map(|comparison_provider| (id, comparison_provider)) + .map_err(error_mod::Application::Worker) + } else { + Err(error_mod::Application::UnknownPriceComparisonProviderId(id)) + } } }