diff --git a/market-data-feeder/src/providers/mod.rs b/market-data-feeder/src/providers/mod.rs index cda9c14..af8ffae 100644 --- a/market-data-feeder/src/providers/mod.rs +++ b/market-data-feeder/src/providers/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, convert::identity}; +use std::{collections::BTreeMap, convert::identity, future::Future}; use tokio::task::JoinSet; @@ -19,20 +19,22 @@ mod osmosis; pub(crate) struct Providers; impl Providers { - pub fn visit_provider(id: &str, visitor: V) -> Option + pub async fn visit_provider(id: &str, visitor: V) -> Option where V: ProviderVisitor, { match id { >::ID => { - Some(visitor.on::()) + Some(visitor.on::().await) + }, + >::ID => { + Some(visitor.on::().await) }, - >::ID => Some(visitor.on::()), _ => None, } } - pub fn visit_comparison_provider( + pub async fn visit_comparison_provider( id: &str, visitor: V, ) -> Option @@ -41,9 +43,12 @@ impl Providers { { match id { CoinGeckoSanityCheck::ID => { - Some(visitor.on::()) + Some(visitor.on::().await) + }, + _ => { + Self::visit_provider(id, ProviderConversionVisitor(visitor)) + .await }, - _ => Self::visit_provider(id, ProviderConversionVisitor(visitor)), } } } @@ -55,26 +60,26 @@ impl ProviderVisitor { type Return = V::Return; - fn on

(self) -> Self::Return + async fn on

(self) -> Self::Return where P: Provider + FromConfig, { - self.0.on::

() + self.0.on::

().await } } -pub(crate) trait ProviderVisitor { +pub(crate) trait ProviderVisitor: Send { type Return; - fn on

(self) -> Self::Return + fn on

(self) -> impl Future + Send where P: Provider + FromConfig; } -pub(crate) trait ComparisonProviderVisitor { +pub(crate) trait ComparisonProviderVisitor: Send { type Return; - fn on

(self) -> Self::Return + fn on

(self) -> impl Future + Send where P: ComparisonProvider + FromConfig; } diff --git a/market-data-feeder/src/workers/mod.rs b/market-data-feeder/src/workers/mod.rs index 0a4f2d6..7779b2d 100644 --- a/market-data-feeder/src/workers/mod.rs +++ b/market-data-feeder/src/workers/mod.rs @@ -5,10 +5,9 @@ use std::{ use thiserror::Error; use tokio::{ - runtime::Handle, select, sync::{mpsc::error::SendError, Mutex}, - task::{block_in_place, JoinError, JoinSet}, + task::{JoinError, JoinSet}, time::{error::Elapsed, sleep, timeout_at, Instant}, }; use tracing::{error, error_span, info, warn}; @@ -85,12 +84,19 @@ pub async fn spawn( let price_comparison_providers: BTreeMap< Arc, Arc>, - > = block_in_place(|| { - price_comparison_providers - .into_iter() - .map(construct_comparison_provider_f(&node_client)) - .collect::>() - })?; + > = { + let mut comparison_providers = BTreeMap::new(); + + for (id, config) in price_comparison_providers { + let (id, comparison_provider) = + construct_comparison_provider(node_client.clone(), id, config) + .await?; + + comparison_providers.insert(id, comparison_provider); + } + + comparison_providers + }; let mut tx_result_senders: BTreeMap = BTreeMap::new(); @@ -115,35 +121,44 @@ pub async fn spawn( } }, ) - .transpose() - .and_then( - |price_comparison_provider| { - let provider_name: Arc = config.provider.name().clone(); - - providers::Providers::visit_provider( - &provider_name, - TaskSpawningProviderVisitor { - worker_task_context: TaskContext { - tx_request_sender: tx_request_sender.clone(), - signer_address: signer_address.clone(), - hard_gas_limit, - monotonic_id, - tick_time, - poll_time, - }, - node_client: &node_client, - tx_generators_set: &mut tx_generators_set, - tx_result_senders: &mut tx_result_senders, - provider_id, - provider_config: config.provider, - price_comparison_provider, - time_before_feeding, - }, - ) - .ok_or(error_mod::Application::UnknownProviderId(provider_name)) - .and_then(|result: Result<(), error_mod::Worker>| result.map_err(From::from)) + .transpose(); + + let price_comparison_provider = match result { + Ok(price_comparison_provider) => price_comparison_provider, + Err(error) => { + tx_generators_set.shutdown().await; + + return Err(error); + }, + }; + + let provider_name: Arc = config.provider.name().clone(); + + let result = providers::Providers::visit_provider( + &provider_name, + TaskSpawningProviderVisitor { + worker_task_context: TaskContext { + tx_request_sender: tx_request_sender.clone(), + signer_address: signer_address.clone(), + hard_gas_limit, + monotonic_id, + tick_time, + poll_time, }, - ); + node_client: &node_client, + tx_generators_set: &mut tx_generators_set, + tx_result_senders: &mut tx_result_senders, + provider_id, + provider_config: config.provider, + price_comparison_provider, + time_before_feeding, + }, + ) + .await + .ok_or(error_mod::Application::UnknownProviderId(provider_name)) + .and_then(|result: Result<(), error_mod::Worker>| { + result.map_err(From::from) + }); match result { Ok(()) => {}, @@ -158,28 +173,26 @@ pub async fn spawn( Ok(SpawnResult::new(tx_generators_set, tx_result_senders)) } -fn construct_comparison_provider_f( - node_client: &NodeClient, -) -> impl Fn( - (Arc, ComparisonProviderConfig), +async fn construct_comparison_provider( + node_client: NodeClient, + id: Arc, + config: ComparisonProviderConfig, ) -> AppResult<(Arc, Arc>)> { - let node_client: NodeClient = node_client.clone(); - - move |(id, config): (Arc, ComparisonProviderConfig)| { - if let Some(result) = providers::Providers::visit_comparison_provider( - &config.provider.name().clone(), - PriceComparisonProviderVisitor { - provider_id: id.clone(), - provider_config: config, - node_client: &node_client, - }, - ) { - result - .map(|comparison_provider| (id, comparison_provider)) - .map_err(error_mod::Application::Worker) - } else { - Err(error_mod::Application::UnknownPriceComparisonProviderId(id)) - } + if let Some(result) = providers::Providers::visit_comparison_provider( + &config.provider.name().clone(), + PriceComparisonProviderVisitor { + provider_id: id.clone(), + provider_config: config, + node_client: &node_client, + }, + ) + .await + { + result + .map(|comparison_provider| (id, comparison_provider)) + .map_err(error_mod::Application::Worker) + } else { + Err(error_mod::Application::UnknownPriceComparisonProviderId(id)) } } @@ -201,26 +214,25 @@ struct PriceComparisonProviderVisitor<'r> { impl<'r> ComparisonProviderVisitor for PriceComparisonProviderVisitor<'r> { type Return = Result>, error_mod::Worker>; - fn on

(self) -> Self::Return + async fn on

(self) -> Self::Return where P: ComparisonProvider + FromConfig, { - Handle::current() - .block_on(FromConfig::::from_config( - &self.provider_id, - self.provider_config.provider, - self.node_client, - )) - .map(|provider: P| { - Arc::new(Mutex::new(provider)) - as Arc> - }) - .map_err(|error: P::ConstructError| { - error_mod::Worker::InstantiatePriceComparisonProvider( - self.provider_id, - Box::new(error), - ) - }) + FromConfig::::from_config( + &self.provider_id, + self.provider_config.provider, + self.node_client, + ) + .await + .map(|provider: P| { + Arc::new(Mutex::new(provider)) as Arc> + }) + .map_err(|error: P::ConstructError| { + error_mod::Worker::InstantiatePriceComparisonProvider( + self.provider_id, + Box::new(error), + ) + }) } } @@ -241,18 +253,21 @@ struct TaskSpawningProviderVisitor<'r> { impl<'r> ProviderVisitor for TaskSpawningProviderVisitor<'r> { type Return = Result<(), error_mod::Worker>; - fn on

(self) -> Self::Return + async fn on

(self) -> Self::Return where P: Provider + FromConfig, { let oracle_address: Arc = self.provider_config.oracle_addr().clone(); - match Handle::current().block_on(

>::from_config( + let result =

>::from_config( &self.provider_id, self.provider_config, self.node_client, - )) { + ) + .await; + + match result { Ok(provider) => { let (commit_result_sender, commit_result_receiver): ( CommitResultSender,