Skip to content

Commit

Permalink
refactor: Fail-fast during provider spawning on out-of-sync nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
KirilMihaylov committed Mar 29, 2024
1 parent cd5d44d commit 28312c6
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 111 deletions.
29 changes: 16 additions & 13 deletions alarms-dispatcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use semver::{
Prerelease as SemVerPrerelease, Version,
};
use serde::Deserialize;
use tokio::task::block_in_place;
use tracing::{error, info};
use tracing_appender::{
non_blocking::{self, NonBlocking},
Expand Down Expand Up @@ -215,19 +216,21 @@ where

let poll_time = config.broadcast.poll_time;

move |tx_sender| {
generators::spawn(
&node_client,
&{ signer_address },
&{ tx_sender },
&TasksConfig {
time_alarms_config: config.time_alarms,
oracle_alarms_config: config.market_price_oracle,
tick_time,
poll_time,
},
contracts,
)
move |tx_sender| async move {
block_in_place(|| {
generators::spawn(
&node_client,
&{ signer_address },
&{ tx_sender },
&TasksConfig {
time_alarms_config: config.time_alarms,
oracle_alarms_config: config.market_price_oracle,
tick_time,
poll_time,
},
contracts,
)
})
}
};

Expand Down
7 changes: 4 additions & 3 deletions broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub mod mode;
mod preprocess;

#[allow(clippy::future_not_send)]
pub async fn broadcast<Impl, SpawnGeneratorsF, SpawnE>(
pub async fn broadcast<Impl, SpawnGeneratorsF, SpawnGeneratorsFuture, SpawnE>(
signer: Signer,
config: Config,
node_client: NodeClient,
Expand All @@ -66,7 +66,8 @@ pub async fn broadcast<Impl, SpawnGeneratorsF, SpawnE>(
where
Impl: mode::Impl,
SpawnGeneratorsF:
FnOnce(TxRequestSender<Impl>) -> Result<SpawnResult, SpawnE> + Send,
FnOnce(TxRequestSender<Impl>) -> SpawnGeneratorsFuture + Send,
SpawnGeneratorsFuture: Future<Output = Result<SpawnResult, SpawnE>> + Send,
{
let (tx_sender, tx_receiver): (
UnboundedSender<TxRequest<Impl>>,
Expand All @@ -76,7 +77,7 @@ where
let SpawnResult {
mut tx_generators_set,
tx_result_senders,
}: SpawnResult = spawn_generators(tx_sender)?;
}: SpawnResult = spawn_generators(tx_sender).await?;

let mut signal = pin!(tokio::signal::ctrl_c());

Expand Down
26 changes: 12 additions & 14 deletions market-data-feeder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use semver::{
Prerelease as SemVerPrerelease, Version,
};
use serde::Deserialize;
use tokio::task::block_in_place;
use tracing::{error, info};
use tracing_appender::{
non_blocking::{self, NonBlocking},
Expand Down Expand Up @@ -95,22 +94,21 @@ async fn app_main() -> Result<()> {

let signer_address: Arc<str> = Arc::from(signer.signer_address());

move |tx_request_sender| {
move |tx_request_sender| async move {
info!("Starting workers...");

block_in_place(move || {
workers::spawn(SpawnContext {
node_client: node_client.clone(),
providers: config.providers,
price_comparison_providers: config.comparison_providers,
tx_request_sender,
signer_address,
hard_gas_limit: config.hard_gas_limit,
time_before_feeding: config.time_before_feeding,
tick_time: config.broadcast.tick_time,
poll_time: config.broadcast.poll_time,
})
workers::spawn(SpawnContext {
node_client: node_client.clone(),
providers: config.providers,
price_comparison_providers: config.comparison_providers,
tx_request_sender,
signer_address,
hard_gas_limit: config.hard_gas_limit,
time_before_feeding: config.time_before_feeding,
tick_time: config.broadcast.tick_time,
poll_time: config.broadcast.poll_time,
})
.await
.map(|spawn_result| {
info!("Workers started successfully.");

Expand Down
125 changes: 44 additions & 81 deletions market-data-feeder/src/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub(crate) struct SpawnContext {
pub(crate) poll_time: Duration,
}

pub fn spawn(
pub async fn spawn(
SpawnContext {
node_client,
providers,
Expand Down Expand Up @@ -95,83 +95,10 @@ pub fn spawn(
let mut tx_result_senders: BTreeMap<usize, CommitResultSender> =
BTreeMap::new();

providers
.into_iter()
.enumerate()
.try_for_each(try_for_each_provider_f(TryForEachProviderContext {
node_client,
tx_generators_set: &mut tx_generators_set,
tx_result_senders: &mut tx_result_senders,
tx_request_sender,
signer_address,
price_comparison_providers,
hard_gas_limit,
time_before_feeding,
tick_time,
poll_time,
}))
.map(|()| SpawnResult::new(tx_generators_set, tx_result_senders))
}

fn construct_comparison_provider_f(
node_client: &NodeClient,
) -> impl Fn(
(Arc<str>, ComparisonProviderConfig),
) -> AppResult<(Arc<str>, Arc<Mutex<dyn ComparisonProvider>>)> {
let node_client: NodeClient = node_client.clone();

move |(id, config): (Arc<str>, ComparisonProviderConfig)| {
if let Some(result) = providers::Providers::visit_comparison_provider(
&config.provider.name().clone(),
PriceComparisonProviderVisitor {
provider_id: id.clone(),
provider_config: config,
node_client: &node_client,
},
) {
result
.map(|comparison_provider| (id, comparison_provider))
.map_err(error_mod::Application::Worker)
} else {
Err(error_mod::Application::UnknownPriceComparisonProviderId(id))
}
}
}

struct TryForEachProviderContext<'r> {
node_client: NodeClient,
tx_generators_set: &'r mut JoinSet<Infallible>,
tx_result_senders: &'r mut BTreeMap<usize, CommitResultSender>,
tx_request_sender: TxRequestSender<NonBlocking>,
signer_address: Arc<str>,
price_comparison_providers:
BTreeMap<Arc<str>, Arc<Mutex<dyn ComparisonProvider>>>,
hard_gas_limit: NonZeroU64,
time_before_feeding: Duration,
tick_time: Duration,
poll_time: Duration,
}

fn try_for_each_provider_f(
TryForEachProviderContext {
node_client,
tx_generators_set,
tx_result_senders,
tx_request_sender,
signer_address,
price_comparison_providers,
hard_gas_limit,
time_before_feeding,
tick_time,
poll_time,
}: TryForEachProviderContext<'_>,
) -> impl FnMut((usize, (Box<str>, ProviderWithComparisonConfig))) -> AppResult<()>
+ '_ {
move |(monotonic_id, (provider_id, config)): (
usize,
(Box<str>, ProviderWithComparisonConfig),
)| {
config
for (monotonic_id, (provider_id, config)) in
providers.into_iter().enumerate()
{
let result = config
.comparison
.map(
|ComparisonProviderIdAndMaxDeviation {
Expand Down Expand Up @@ -205,8 +132,8 @@ fn try_for_each_provider_f(
poll_time,
},
node_client: &node_client,
tx_generators_set,
tx_result_senders,
tx_generators_set: &mut tx_generators_set,
tx_result_senders: &mut tx_result_senders,
provider_id,
provider_config: config.provider,
price_comparison_provider,
Expand All @@ -216,7 +143,43 @@ fn try_for_each_provider_f(
.ok_or(error_mod::Application::UnknownProviderId(provider_name))
.and_then(|result: Result<(), error_mod::Worker>| result.map_err(From::from))
},
)
);

match result {
Ok(()) => {},
Err(error) => {
tx_generators_set.shutdown().await;

return Err(error);
},
}
}

Ok(SpawnResult::new(tx_generators_set, tx_result_senders))
}

fn construct_comparison_provider_f(
node_client: &NodeClient,
) -> impl Fn(
(Arc<str>, ComparisonProviderConfig),
) -> AppResult<(Arc<str>, Arc<Mutex<dyn ComparisonProvider>>)> {
let node_client: NodeClient = node_client.clone();

move |(id, config): (Arc<str>, ComparisonProviderConfig)| {
if let Some(result) = providers::Providers::visit_comparison_provider(
&config.provider.name().clone(),
PriceComparisonProviderVisitor {
provider_id: id.clone(),
provider_config: config,
node_client: &node_client,
},
) {
result
.map(|comparison_provider| (id, comparison_provider))
.map_err(error_mod::Application::Worker)
} else {
Err(error_mod::Application::UnknownPriceComparisonProviderId(id))
}
}
}

Expand Down

0 comments on commit 28312c6

Please sign in to comment.