Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into chore/remove-deposit-…
Browse files Browse the repository at this point in the history
…tracker

* origin/main:
  Rename PrimaryOrBackup (#5139)
  feat: Governance extrinsic to recover Solana Durable Nonces (#5130)
  fix: pallet versions should match release + bump old dylib version to 1.5.1 (#5142)

# Conflicts:
#	state-chain/pallets/cf-ingress-egress/src/migrations.rs
  • Loading branch information
syan095 committed Aug 12, 2024
2 parents 9a83d1d + f3876fa commit 09fdded
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 130 deletions.
4 changes: 2 additions & 2 deletions engine-runner-bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ assets = [
# The old version gets put into target/release by the package github actions workflow.
# It downloads the correct version from the releases page.
[
"target/release/libchainflip_engine_v1_5_0.so",
"target/release/libchainflip_engine_v1_5_1.so",
# This is the path where the engine dylib is searched for on linux.
# As set in the build.rs file.
"usr/lib/chainflip-engine/libchainflip_engine_v1_5_0.so",
"usr/lib/chainflip-engine/libchainflip_engine_v1_5_1.so",
"755",
],
]
Expand Down
2 changes: 1 addition & 1 deletion engine-runner-bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use engine_upgrade_utils::{CStrArray, NEW_VERSION, OLD_VERSION};

// Declare the entrypoints into each version of the engine
mod old {
#[engine_proc_macros::link_engine_library_version("1.5.0")]
#[engine_proc_macros::link_engine_library_version("1.5.1")]
extern "C" {
pub fn cfe_entrypoint(
c_args: engine_upgrade_utils::CStrArray,
Expand Down
2 changes: 1 addition & 1 deletion engine-upgrade-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub mod build_helpers;
// rest of the places the version needs changing on build using the build scripts in each of the
// relevant crates.
// Should also check that the compatibility function below `args_compatible_with_old` is correct.
pub const OLD_VERSION: &str = "1.5.0";
pub const OLD_VERSION: &str = "1.5.1";
pub const NEW_VERSION: &str = "1.6.0";

pub const ENGINE_LIB_PREFIX: &str = "chainflip_engine_v";
Expand Down
141 changes: 68 additions & 73 deletions engine/src/retrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,38 +79,33 @@ impl fmt::Display for RequestLog {
}

#[derive(PartialEq, Eq, Debug, Clone)]
pub enum PrimaryOrSecondary {
pub enum PrimaryOrBackup {
Primary,
Secondary,
Backup,
}

impl std::ops::Not for PrimaryOrSecondary {
impl std::ops::Not for PrimaryOrBackup {
type Output = Self;

fn not(self) -> Self::Output {
match self {
PrimaryOrSecondary::Primary => PrimaryOrSecondary::Secondary,
PrimaryOrSecondary::Secondary => PrimaryOrSecondary::Primary,
PrimaryOrBackup::Primary => PrimaryOrBackup::Backup,
PrimaryOrBackup::Backup => PrimaryOrBackup::Primary,
}
}
}

impl Display for PrimaryOrSecondary {
impl Display for PrimaryOrBackup {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
PrimaryOrSecondary::Primary => write!(f, "Primary"),
PrimaryOrSecondary::Secondary => write!(f, "Secondary"),
PrimaryOrBackup::Primary => write!(f, "Primary"),
PrimaryOrBackup::Backup => write!(f, "Backup"),
}
}
}

type SubmissionFutureOutput = (
RequestId,
RequestLog,
RetryLimit,
PrimaryOrSecondary,
Result<BoxAny, (anyhow::Error, Attempt)>,
);
type SubmissionFutureOutput =
(RequestId, RequestLog, RetryLimit, PrimaryOrBackup, Result<BoxAny, (anyhow::Error, Attempt)>);
type SubmissionFuture = Pin<Box<dyn Future<Output = SubmissionFutureOutput> + Send + 'static>>;
type SubmissionFutures = FuturesUnordered<SubmissionFuture>;

Expand Down Expand Up @@ -212,7 +207,7 @@ fn submission_future<Client: Clone + Send + Sync + 'static>(
request_id: RequestId,
initial_request_timeout: Duration,
attempt: Attempt,
primary_or_secondary: PrimaryOrSecondary,
primary_or_backup: PrimaryOrBackup,
) -> SubmissionFuture {
let submission_fut = submission_fn(client);
// Apply exponential backoff to the request.
Expand All @@ -221,7 +216,7 @@ fn submission_future<Client: Clone + Send + Sync + 'static>(
request_id,
request_log.clone(),
retry_limit,
primary_or_secondary,
primary_or_backup,
match tokio::time::timeout(
max_sleep_duration(initial_request_timeout, attempt),
submission_fut,
Expand All @@ -239,15 +234,15 @@ fn submission_future<Client: Clone + Send + Sync + 'static>(

const TRY_PRIMARY_AFTER: Duration = Duration::from_secs(120);

// Pass in two clients, a primary and an optional secondary.
// Pass in two clients, a primary and an optional backup.
// We can then select the client requested if it's ready, otherwise we return the client that's
// ready first.
#[derive(Clone)]
struct ClientSelector<Client: Clone + Send + Sync + 'static> {
primary_signal: Signal<(Client, PrimaryOrSecondary)>,
secondary_signal: Option<Signal<(Client, PrimaryOrSecondary)>>,
primary_signal: Signal<(Client, PrimaryOrBackup)>,
backup_signal: Option<Signal<(Client, PrimaryOrBackup)>>,
// The client to favour for the next request or attempt.
prefer: PrimaryOrSecondary,
prefer: PrimaryOrBackup,
// The time we last tried the primary. If we haven't tried the primary in some time, then we
// should try it as it could be back online.
last_failed_primary: Option<tokio::time::Instant>,
Expand All @@ -259,33 +254,33 @@ impl<Client: Send + Sync + Clone + 'static> ClientSelector<Client> {
pub fn new<ClientFut: Future<Output = Client> + Send + 'static>(
scope: &Scope<'_, anyhow::Error>,
primary_fut: ClientFut,
secondary_fut: Option<ClientFut>,
backup_fut: Option<ClientFut>,
) -> Self {
let (primary_signaller, primary_signal) = Signal::new();

scope.spawn_weak(async move {
let client = primary_fut.await;
primary_signaller.signal((client, PrimaryOrSecondary::Primary));
primary_signaller.signal((client, PrimaryOrBackup::Primary));
Ok(())
});

let secondary_signal = if let Some(secondary_fut) = secondary_fut {
let (secondary_signaller, secondary_signal) = Signal::new();
let backup_signal = if let Some(backup_fut) = backup_fut {
let (backup_signaller, backup_signal) = Signal::new();

scope.spawn_weak(async move {
let client = secondary_fut.await;
secondary_signaller.signal((client, PrimaryOrSecondary::Secondary));
let client = backup_fut.await;
backup_signaller.signal((client, PrimaryOrBackup::Backup));
Ok(())
});
Some(secondary_signal)
Some(backup_signal)
} else {
None
};

Self {
primary_signal,
secondary_signal,
prefer: PrimaryOrSecondary::Primary,
backup_signal,
prefer: PrimaryOrBackup::Primary,
last_failed_primary: None,
}
}
Expand All @@ -297,24 +292,24 @@ impl<Client: Send + Sync + Clone + 'static> ClientSelector<Client> {
// are willing to stop retrying for a request, then we are also willing to exit early when
// not ready - to allow the network to select another participant more quickly.
retry_limit: RetryLimit,
) -> Option<(Client, PrimaryOrSecondary)> {
) -> Option<(Client, PrimaryOrBackup)> {
let client_select_fut = futures::future::select_all(
utilities::conditional::conditional(
&self.secondary_signal,
|secondary_signal| {
&self.backup_signal,
|backup_signal| {
// If we have two clients, then we should bias the requested one, but if it's
// not ready, request from the other one.
match self.prefer {
PrimaryOrSecondary::Secondary => match self.last_failed_primary {
PrimaryOrBackup::Backup => match self.last_failed_primary {
// If we haven't tried the primary in some time, then we should try it
// as it could be back online
Some(last_failed_primary)
if last_failed_primary.elapsed() > TRY_PRIMARY_AFTER =>
[&self.primary_signal, secondary_signal],
[&self.primary_signal, backup_signal],

_ => [secondary_signal, &self.primary_signal],
_ => [backup_signal, &self.primary_signal],
},
PrimaryOrSecondary::Primary => [&self.primary_signal, secondary_signal],
PrimaryOrBackup::Primary => [&self.primary_signal, backup_signal],
}
.into_iter()
},
Expand All @@ -331,14 +326,14 @@ impl<Client: Send + Sync + Clone + 'static> ClientSelector<Client> {
}
}

pub fn request_failed(&mut self, failed_client: PrimaryOrSecondary) {
// If we have a second endpoint, then we should switch to the other one.
if self.secondary_signal.is_some() {
self.prefer = if failed_client == PrimaryOrSecondary::Primary {
pub fn request_failed(&mut self, failed_client: PrimaryOrBackup) {
// If we have a backup endpoint, then we should switch to the other one.
if self.backup_signal.is_some() {
self.prefer = if failed_client == PrimaryOrBackup::Primary {
self.last_failed_primary = Some(tokio::time::Instant::now());
PrimaryOrSecondary::Secondary
PrimaryOrBackup::Backup
} else {
PrimaryOrSecondary::Primary
PrimaryOrBackup::Primary
};
}
}
Expand Down Expand Up @@ -402,7 +397,7 @@ where
// The name of the retrier that appears in the logs.
name: &'static str,
primary_client_fut: ClientFut,
secondary_client_fut: Option<ClientFut>,
backup_client_fut: Option<ClientFut>,
initial_request_timeout: Duration,
maximum_concurrent_submissions: u32,
) -> Self {
Expand All @@ -416,24 +411,24 @@ where
let mut submission_holder = SubmissionHolder::new(maximum_concurrent_submissions);

let mut client_selector: ClientSelector<Client> =
ClientSelector::new(scope, primary_client_fut, secondary_client_fut);
ClientSelector::new(scope, primary_client_fut, backup_client_fut);

scope.spawn(async move {
utilities::loop_select! {
if let Some((response_sender, request_log, closure, retry_limit)) = request_receiver.recv() => {
RPC_RETRIER_REQUESTS.inc(&[name, request_log.rpc_method.as_str()]);
let request_id = request_holder.next_request_id();

if let Some((client, primary_or_secondary)) = client_selector.select_client(retry_limit).await {
tracing::debug!("Retrier {name}: Received request `{request_log}` assigning request_id `{request_id}` and requesting with `{primary_or_secondary:?}`");
submission_holder.push(submission_future(client, request_log, retry_limit, &closure, request_id, initial_request_timeout, 0, primary_or_secondary));
if let Some((client, primary_or_backup)) = client_selector.select_client(retry_limit).await {
tracing::debug!("Retrier {name}: Received request `{request_log}` assigning request_id `{request_id}` and requesting with `{primary_or_backup:?}`");
submission_holder.push(submission_future(client, request_log, retry_limit, &closure, request_id, initial_request_timeout, 0, primary_or_backup));
request_holder.insert(request_id, (response_sender, closure));
} else {
tracing::warn!("Retrier {name}: No clients available for request when received `{request_log}` with id `{request_id}`. Dropping request.");
}
},
let (request_id, request_log, retry_limit, primary_or_secondary, result) = submission_holder.next_or_pending() => {
RPC_RETRIER_TOTAL_REQUESTS.inc(&[name, request_log.rpc_method.as_str(), primary_or_secondary.to_string().as_str()]);
let (request_id, request_log, retry_limit, primary_or_backup, result) = submission_holder.next_or_pending() => {
RPC_RETRIER_TOTAL_REQUESTS.inc(&[name, request_log.rpc_method.as_str(), primary_or_backup.to_string().as_str()]);
match result {
Ok(value) => {
if let Some((response_sender, _)) = request_holder.remove(&request_id) {
Expand All @@ -446,20 +441,20 @@ where
let half_max = max_sleep_duration(initial_request_timeout, attempt) / 2;
let sleep_duration = half_max + rand::thread_rng().gen_range(Duration::default()..half_max);

let error_message = format!("Retrier {name}: Error for request `{request_log}` with id `{request_id}`, attempt `{attempt}`: {e}. Delaying for {:?}", sleep_duration);
let error_message = format!("Retrier {name}: Error for request `{request_log}` with id `{request_id}` requested with `{primary_or_backup:?}`, attempt `{attempt}`: {e}. Delaying for {:?}", sleep_duration);
if attempt == 0 && !matches!(retry_limit, RetryLimit::Limit(1)) {
tracing::warn!(error_message);
} else {
tracing::error!(error_message);
}

client_selector.request_failed(primary_or_secondary);
client_selector.request_failed(primary_or_backup);

// Delay the request before the next retry.
retry_delays.push(Box::pin(
async move {
tokio::time::sleep(sleep_duration).await;
// pass in primary or secondary so we know which client to use.
// pass in primary or backup so we know which client to use.
(request_id, request_log, attempt, retry_limit)
}
));
Expand All @@ -482,9 +477,9 @@ where
}
_ => {
// This await should always return immediately since we must already have a client if we've already made a request.
if let Some((next_client, next_primary_or_secondary)) = client_selector.select_client(retry_limit).await {
tracing::trace!("Retrier {name}: Retrying request `{request_log}` with id `{request_id}` and client `{next_primary_or_secondary:?}`, attempt `{next_attempt}`");
submission_holder.push(submission_future(next_client, request_log, retry_limit, closure, request_id, initial_request_timeout, next_attempt, next_primary_or_secondary));
if let Some((next_client, next_primary_or_backup)) = client_selector.select_client(retry_limit).await {
tracing::trace!("Retrier {name}: Retrying request `{request_log}` with id `{request_id}` and client `{next_primary_or_backup:?}`, attempt `{next_attempt}`");
submission_holder.push(submission_future(next_client, request_log, retry_limit, closure, request_id, initial_request_timeout, next_attempt, next_primary_or_backup));
} else {
tracing::warn!("Retrier {name}: No clients available for request `{request_log}` with id `{request_id}`. Dropping request.");
request_holder.remove(&request_id);
Expand Down Expand Up @@ -914,20 +909,20 @@ mod tests {

#[tokio::test]
async fn backup_used_for_next_request_if_primary_fails() {
async fn get_client_primary_or_secondary(
primary_or_secondary: PrimaryOrSecondary,
) -> PrimaryOrSecondary {
primary_or_secondary
async fn get_client_primary_or_backup(
primary_or_backup: PrimaryOrBackup,
) -> PrimaryOrBackup {
primary_or_backup
}

thread_local! {
pub static ATTEMPTED: std::cell::RefCell<u32> = std::cell::RefCell::new(0);
pub static TRIED_CLIENTS: std::cell::RefCell<Vec<PrimaryOrSecondary>> = std::cell::RefCell::new(Vec::new());
pub static TRIED_CLIENTS: std::cell::RefCell<Vec<PrimaryOrBackup >> = std::cell::RefCell::new(Vec::new());
}

fn specific_fut_closure_err_after_one(
timeout: Duration,
) -> TypedFutureGenerator<(), PrimaryOrSecondary> {
) -> TypedFutureGenerator<(), PrimaryOrBackup> {
Box::pin(move |client| {
Box::pin(async move {
// We need to delay in the tests, else we'll resolve immediately, meaning the
Expand Down Expand Up @@ -969,8 +964,8 @@ mod tests {
let retrier_client = RetrierClient::new(
scope,
"test",
get_client_primary_or_secondary(PrimaryOrSecondary::Primary),
Some(get_client_primary_or_secondary(PrimaryOrSecondary::Secondary)),
get_client_primary_or_backup(PrimaryOrBackup::Primary),
Some(get_client_primary_or_backup(PrimaryOrBackup::Backup)),
INITIAL_TIMEOUT,
100,
);
Expand Down Expand Up @@ -1023,23 +1018,23 @@ mod tests {
TRIED_CLIENTS.with(|cell| cell.borrow().clone()),
vec![
// first request fails
PrimaryOrSecondary::Primary,
PrimaryOrBackup::Primary,
// first request succeeds on second attempt
PrimaryOrSecondary::Secondary,
PrimaryOrBackup::Backup,
// second succeeds
PrimaryOrSecondary::Secondary,
PrimaryOrBackup::Backup,
// third succeeds
PrimaryOrSecondary::Secondary,
PrimaryOrBackup::Backup,
// fourth succeeds
PrimaryOrSecondary::Secondary,
PrimaryOrBackup::Backup,
// try primary again, and succeeds, so no further items in this list
PrimaryOrSecondary::Primary,
PrimaryOrBackup::Primary,
// primary should still be favoured after it succeeds
PrimaryOrSecondary::Primary,
PrimaryOrBackup::Primary,
// primary fails again, so seocondary is used
PrimaryOrSecondary::Secondary,
PrimaryOrBackup::Backup,
// time elapsed, primary again
PrimaryOrSecondary::Primary,
PrimaryOrBackup::Primary,
]
);

Expand Down
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit 09fdded

Please sign in to comment.