Skip to content

Commit

Permalink
fix(feeder): Removed incorrect use of block_on & used async instead
Browse files Browse the repository at this point in the history
  • Loading branch information
KirilMihaylov committed Mar 29, 2024
1 parent 28312c6 commit ad5c1f2
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 90 deletions.
31 changes: 18 additions & 13 deletions market-data-feeder/src/providers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, convert::identity};
use std::{collections::BTreeMap, convert::identity, future::Future};

use tokio::task::JoinSet;

Expand All @@ -19,20 +19,22 @@ mod osmosis;
pub(crate) struct Providers;

impl Providers {
pub fn visit_provider<V>(id: &str, visitor: V) -> Option<V::Return>
pub async fn visit_provider<V>(id: &str, visitor: V) -> Option<V::Return>
where
V: ProviderVisitor,
{
match id {
<Astroport as FromConfig<false>>::ID => {
Some(visitor.on::<Astroport>())
Some(visitor.on::<Astroport>().await)
},
<Osmosis as FromConfig<false>>::ID => {
Some(visitor.on::<Osmosis>().await)
},
<Osmosis as FromConfig<false>>::ID => Some(visitor.on::<Osmosis>()),
_ => None,
}
}

pub fn visit_comparison_provider<V>(
pub async fn visit_comparison_provider<V>(
id: &str,
visitor: V,
) -> Option<V::Return>
Expand All @@ -41,9 +43,12 @@ impl Providers {
{
match id {
CoinGeckoSanityCheck::ID => {
Some(visitor.on::<CoinGeckoSanityCheck>())
Some(visitor.on::<CoinGeckoSanityCheck>().await)
},
_ => {
Self::visit_provider(id, ProviderConversionVisitor(visitor))
.await
},
_ => Self::visit_provider(id, ProviderConversionVisitor(visitor)),
}
}
}
Expand All @@ -55,26 +60,26 @@ impl<V: ComparisonProviderVisitor> ProviderVisitor
{
type Return = V::Return;

fn on<P>(self) -> Self::Return
async fn on<P>(self) -> Self::Return
where
P: Provider + FromConfig<false>,
{
self.0.on::<P>()
self.0.on::<P>().await
}
}

pub(crate) trait ProviderVisitor {
pub(crate) trait ProviderVisitor: Send {
type Return;

fn on<P>(self) -> Self::Return
fn on<P>(self) -> impl Future<Output = Self::Return> + Send
where
P: Provider + FromConfig<false>;
}

pub(crate) trait ComparisonProviderVisitor {
pub(crate) trait ComparisonProviderVisitor: Send {
type Return;

fn on<P>(self) -> Self::Return
fn on<P>(self) -> impl Future<Output = Self::Return> + Send
where
P: ComparisonProvider + FromConfig<true>;
}
Expand Down
169 changes: 92 additions & 77 deletions market-data-feeder/src/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ use std::{

use thiserror::Error;
use tokio::{
runtime::Handle,
select,
sync::{mpsc::error::SendError, Mutex},
task::{block_in_place, JoinError, JoinSet},
task::{JoinError, JoinSet},
time::{error::Elapsed, sleep, timeout_at, Instant},
};
use tracing::{error, error_span, info, warn};
Expand Down Expand Up @@ -85,12 +84,19 @@ pub async fn spawn(
let price_comparison_providers: BTreeMap<
Arc<str>,
Arc<Mutex<dyn ComparisonProvider>>,
> = block_in_place(|| {
price_comparison_providers
.into_iter()
.map(construct_comparison_provider_f(&node_client))
.collect::<Result<_, _>>()
})?;
> = {
let mut comparison_providers = BTreeMap::new();

for (id, config) in price_comparison_providers {
let (id, comparison_provider) =
construct_comparison_provider(node_client.clone(), id, config)
.await?;

comparison_providers.insert(id, comparison_provider);
}

comparison_providers
};

let mut tx_result_senders: BTreeMap<usize, CommitResultSender> =
BTreeMap::new();
Expand All @@ -115,35 +121,44 @@ pub async fn spawn(
}
},
)
.transpose()
.and_then(
|price_comparison_provider| {
let provider_name: Arc<str> = config.provider.name().clone();

providers::Providers::visit_provider(
&provider_name,
TaskSpawningProviderVisitor {
worker_task_context: TaskContext {
tx_request_sender: tx_request_sender.clone(),
signer_address: signer_address.clone(),
hard_gas_limit,
monotonic_id,
tick_time,
poll_time,
},
node_client: &node_client,
tx_generators_set: &mut tx_generators_set,
tx_result_senders: &mut tx_result_senders,
provider_id,
provider_config: config.provider,
price_comparison_provider,
time_before_feeding,
},
)
.ok_or(error_mod::Application::UnknownProviderId(provider_name))
.and_then(|result: Result<(), error_mod::Worker>| result.map_err(From::from))
.transpose();

let price_comparison_provider = match result {
Ok(price_comparison_provider) => price_comparison_provider,
Err(error) => {
tx_generators_set.shutdown().await;

return Err(error);
},
};

let provider_name: Arc<str> = config.provider.name().clone();

let result = providers::Providers::visit_provider(
&provider_name,
TaskSpawningProviderVisitor {
worker_task_context: TaskContext {
tx_request_sender: tx_request_sender.clone(),
signer_address: signer_address.clone(),
hard_gas_limit,
monotonic_id,
tick_time,
poll_time,
},
);
node_client: &node_client,
tx_generators_set: &mut tx_generators_set,
tx_result_senders: &mut tx_result_senders,
provider_id,
provider_config: config.provider,
price_comparison_provider,
time_before_feeding,
},
)
.await
.ok_or(error_mod::Application::UnknownProviderId(provider_name))
.and_then(|result: Result<(), error_mod::Worker>| {
result.map_err(From::from)
});

match result {
Ok(()) => {},
Expand All @@ -158,28 +173,26 @@ pub async fn spawn(
Ok(SpawnResult::new(tx_generators_set, tx_result_senders))
}

fn construct_comparison_provider_f(
node_client: &NodeClient,
) -> impl Fn(
(Arc<str>, ComparisonProviderConfig),
async fn construct_comparison_provider(
node_client: NodeClient,
id: Arc<str>,
config: ComparisonProviderConfig,
) -> AppResult<(Arc<str>, Arc<Mutex<dyn ComparisonProvider>>)> {
let node_client: NodeClient = node_client.clone();

move |(id, config): (Arc<str>, ComparisonProviderConfig)| {
if let Some(result) = providers::Providers::visit_comparison_provider(
&config.provider.name().clone(),
PriceComparisonProviderVisitor {
provider_id: id.clone(),
provider_config: config,
node_client: &node_client,
},
) {
result
.map(|comparison_provider| (id, comparison_provider))
.map_err(error_mod::Application::Worker)
} else {
Err(error_mod::Application::UnknownPriceComparisonProviderId(id))
}
if let Some(result) = providers::Providers::visit_comparison_provider(
&config.provider.name().clone(),
PriceComparisonProviderVisitor {
provider_id: id.clone(),
provider_config: config,
node_client: &node_client,
},
)
.await
{
result
.map(|comparison_provider| (id, comparison_provider))
.map_err(error_mod::Application::Worker)
} else {
Err(error_mod::Application::UnknownPriceComparisonProviderId(id))
}
}

Expand All @@ -201,26 +214,25 @@ struct PriceComparisonProviderVisitor<'r> {
impl<'r> ComparisonProviderVisitor for PriceComparisonProviderVisitor<'r> {
type Return = Result<Arc<Mutex<dyn ComparisonProvider>>, error_mod::Worker>;

fn on<P>(self) -> Self::Return
async fn on<P>(self) -> Self::Return
where
P: ComparisonProvider + FromConfig<true>,
{
Handle::current()
.block_on(FromConfig::<true>::from_config(
&self.provider_id,
self.provider_config.provider,
self.node_client,
))
.map(|provider: P| {
Arc::new(Mutex::new(provider))
as Arc<Mutex<dyn ComparisonProvider>>
})
.map_err(|error: P::ConstructError| {
error_mod::Worker::InstantiatePriceComparisonProvider(
self.provider_id,
Box::new(error),
)
})
FromConfig::<true>::from_config(
&self.provider_id,
self.provider_config.provider,
self.node_client,
)
.await
.map(|provider: P| {
Arc::new(Mutex::new(provider)) as Arc<Mutex<dyn ComparisonProvider>>
})
.map_err(|error: P::ConstructError| {
error_mod::Worker::InstantiatePriceComparisonProvider(
self.provider_id,
Box::new(error),
)
})
}
}

Expand All @@ -241,18 +253,21 @@ struct TaskSpawningProviderVisitor<'r> {
impl<'r> ProviderVisitor for TaskSpawningProviderVisitor<'r> {
type Return = Result<(), error_mod::Worker>;

fn on<P>(self) -> Self::Return
async fn on<P>(self) -> Self::Return
where
P: Provider + FromConfig<false>,
{
let oracle_address: Arc<str> =
self.provider_config.oracle_addr().clone();

match Handle::current().block_on(<P as FromConfig<false>>::from_config(
let result = <P as FromConfig<false>>::from_config(
&self.provider_id,
self.provider_config,
self.node_client,
)) {
)
.await;

match result {
Ok(provider) => {
let (commit_result_sender, commit_result_receiver): (
CommitResultSender,
Expand Down

0 comments on commit ad5c1f2

Please sign in to comment.