Skip to content

Commit

Permalink
refactor: Fail-fast during provider spawning on out-of-sync nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
KirilMihaylov committed Mar 29, 2024
1 parent ad5c1f2 commit 18f193d
Showing 1 changed file with 112 additions and 98 deletions.
210 changes: 112 additions & 98 deletions market-data-feeder/src/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use thiserror::Error;
use tokio::{
select,
sync::{mpsc::error::SendError, Mutex},
sync::{mpsc::error::SendError, Mutex, Notify},
task::{JoinError, JoinSet},
time::{error::Elapsed, sleep, timeout_at, Instant},
};
Expand Down Expand Up @@ -101,6 +101,8 @@ pub async fn spawn(
let mut tx_result_senders: BTreeMap<usize, CommitResultSender> =
BTreeMap::new();

let all_checks_passed = Arc::new(Notify::new());

for (monotonic_id, (provider_id, config)) in
providers.into_iter().enumerate()
{
Expand Down Expand Up @@ -144,6 +146,7 @@ pub async fn spawn(
monotonic_id,
tick_time,
poll_time,
all_checks_passed: all_checks_passed.clone(),
},
node_client: &node_client,
tx_generators_set: &mut tx_generators_set,
Expand Down Expand Up @@ -203,6 +206,7 @@ struct TaskContext {
monotonic_id: usize,
tick_time: Duration,
poll_time: Duration,
all_checks_passed: Arc<Notify>,
}

struct PriceComparisonProviderVisitor<'r> {
Expand Down Expand Up @@ -268,7 +272,7 @@ impl<'r> ProviderVisitor for TaskSpawningProviderVisitor<'r> {
.await;

match result {
Ok(provider) => {
Ok(mut provider) => {
let (commit_result_sender, commit_result_receiver): (
CommitResultSender,
CommitResultReceiver,
Expand All @@ -279,14 +283,18 @@ impl<'r> ProviderVisitor for TaskSpawningProviderVisitor<'r> {
commit_result_sender,
);

self.tx_generators_set.spawn(perform_check_and_enter_loop(
ProviderWithIds {
provider,
provider_id: self.provider_id,
},
self.worker_task_context,
perform_price_comparison(
&mut provider,
&self.provider_id,
self.price_comparison_provider,
)
.await;

self.tx_generators_set.spawn(wrap_provider_main_loop(
provider,
self.provider_id,
self.time_before_feeding,
self.worker_task_context,
self.node_client.clone(),
oracle_address,
commit_result_receiver,
Expand All @@ -302,114 +310,71 @@ impl<'r> ProviderVisitor for TaskSpawningProviderVisitor<'r> {
}
}

struct ProviderWithIds<P> {
provider: P,
provider_id: Box<str>,
}

async fn perform_check_and_enter_loop<P>(
ProviderWithIds {
mut provider,
provider_id,
}: ProviderWithIds<P>,
worker_task_context: TaskContext,
async fn perform_price_comparison<P>(
provider: &mut P,
provider_id: &str,
comparison_provider_and_deviation: Option<
ComparisonProviderWithIdAndMaxDeviation,
>,
time_before_feeding: Duration,
node_client: NodeClient,
oracle_address: Arc<str>,
commit_result_receiver: CommitResultReceiver,
) -> Infallible
) -> Option<Result<ChannelClosed, error_mod::Worker>>
where
P: Provider,
{
let result: Result<ChannelClosed, error_mod::Worker> = 'result: {
let prices: Box<[Price<CoinWithDecimalPlaces>]> = {
if let Err(error) =
run_provider_healthcheck(&mut provider, &provider_id).await
{
break 'result Err(error_mod::Worker::ProviderHealthcheck(
error,
));
}
let prices: Box<[Price<CoinWithDecimalPlaces>]> = {
if let Err(error) =
run_provider_healthcheck(provider, &provider_id).await
{
return Some(Err(error_mod::Worker::ProviderHealthcheck(error)));
}

let result = provider.get_prices(false).await.map_err(|error| {
error_mod::Worker::PriceComparisonGuard(
PriceComparisonGuardError::FetchPrices(error),
)
});
let result = provider.get_prices(false).await.map_err(|error| {
error_mod::Worker::PriceComparisonGuard(
PriceComparisonGuardError::FetchPrices(error),
)
});

match result {
Ok(prices) => prices,
Err(error) => {
break 'result Err(error);
},
}
};
match result {
Ok(prices) => prices,
Err(error) => {
return Some(Err(error));
},
}
};

if prices.is_empty() {
error!(
r#"Price list returned for provider "{provider_id}" is empty! Exiting providing task."#
);
if prices.is_empty() {
error!(
r#"Price list returned for provider "{provider_id}" is empty! Exiting providing task."#
);

break 'result Err(error_mod::Worker::EmptyPriceList);
}
return Some(Err(error_mod::Worker::EmptyPriceList));
}

if let Some((
if let Some((
comparison_provider_id,
comparison_provider,
max_deviation_exclusive,
)) = { comparison_provider_and_deviation }
{
if let Err(error) = compare_prices(
provider.instance_id(),
&prices,
comparison_provider_id,
comparison_provider,
max_deviation_exclusive,
)) = { comparison_provider_and_deviation }
{
if let Err(error) = compare_prices(
provider.instance_id(),
&prices,
comparison_provider_id,
comparison_provider,
max_deviation_exclusive,
)
.await
{
break 'result Err(error);
}
} else {
info!(
r#"Provider "{provider_id}" isn't associated with a comparison provider."#
);
}

print_prices_pretty::print(&provider, &{ prices });

sleep(time_before_feeding).await;

provider_main_loop(
provider,
&provider_id,
worker_task_context,
node_client,
oracle_address,
commit_result_receiver,
)
.await
};

let is_error: bool = result.is_err();

let (error, cause): (String, String) = match result {
Ok(output) => (format!("{output:?}"), output.to_string()),
Err(error) => (format!("{error:?}"), error.to_string()),
};

loop {
if is_error {
error!(%provider_id, %error, "Provider task stopped! Cause: {cause}");
} else {
warn!(%provider_id, %error, "Provider task stopped! Cause: {cause}");
{
return Some(Err(error));
}

sleep(Duration::from_secs(15)).await;
} else {
info!(
r#"Provider "{provider_id}" isn't associated with a comparison provider."#
);
}

print_prices_pretty::print(&provider, &{ prices });

None
}

async fn compare_prices(
Expand Down Expand Up @@ -479,16 +444,61 @@ async fn run_comparison_provider_healthcheck(
.await
}

async fn wrap_provider_main_loop<P>(
provider: P,
provider_id: Box<str>,
time_before_feeding: Duration,
worker_task_context: TaskContext,
node_client: NodeClient,
oracle_address: Arc<str>,
commit_result_receiver: CommitResultReceiver,
) -> Infallible
where
P: Provider,
{
let result = provider_main_loop(
provider,
&provider_id,
time_before_feeding,
worker_task_context,
node_client,
oracle_address,
commit_result_receiver,
)
.await;

let is_error: bool = result.is_err();

let (error, cause): (String, String) = match result {
Ok(output @ ChannelClosed { .. }) => {
(format!("{output:?}"), output.to_string())
},
Err(error) => (format!("{error:?}"), error.to_string()),
};

loop {
if is_error {
error!(%provider_id, %error, "Provider task stopped! Cause: {cause}");
} else {
warn!(%provider_id, %error, "Provider task stopped! Cause: {cause}");
}

sleep(Duration::from_secs(15)).await;
}
}

async fn provider_main_loop<P>(
mut provider: P,
provider_id: &str,
time_before_feeding: Duration,
TaskContext {
tx_request_sender,
signer_address,
hard_gas_limit,
monotonic_id,
tick_time,
poll_time,
all_checks_passed,
}: TaskContext,
node_client: NodeClient,
oracle_address: Arc<str>,
Expand All @@ -497,6 +507,10 @@ async fn provider_main_loop<P>(
where
P: Provider,
{
all_checks_passed.notified().await;

sleep(time_before_feeding).await;

let send_tx_request =
move |message, fallback_gas_limit, hard_gas_limit, expiration| {
tx_request_sender.send(TxRequest::<NonBlocking>::new(
Expand Down

0 comments on commit 18f193d

Please sign in to comment.