Skip to content

Commit

Permalink
Return back stale out-of-pool scheduler by timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Jun 11, 2024
1 parent 56d1572 commit 23f8c65
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 60 deletions.
4 changes: 3 additions & 1 deletion runtime/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ impl BankForks {
let bank = if let Some(scheduler_pool) = &self.scheduler_pool {
let context = SchedulingContext::new(bank.clone());
let scheduler = scheduler_pool.take_scheduler(context);
BankWithScheduler::new(bank, Some(scheduler))
let bank_with_scheduler = BankWithScheduler::new(bank, Some(scheduler));
scheduler_pool.register_timeout_listener(bank_with_scheduler.create_timeout_listener());
bank_with_scheduler
} else {
BankWithScheduler::new_without_scheduler(bank)
};
Expand Down
224 changes: 193 additions & 31 deletions runtime/src/installed_scheduler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,59 @@ use {
transaction::{Result, SanitizedTransaction, TransactionError},
},
std::{
fmt::Debug,
fmt::{self, Debug},
mem,
ops::Deref,
sync::{Arc, RwLock},
},
};
#[cfg(feature = "dev-context-only-utils")]
use {mockall::automock, qualifier_attr::qualifiers};

pub fn initialized_result_with_timings() -> ResultWithTimings {
(Ok(()), ExecuteTimings::default())
}

pub trait InstalledSchedulerPool: Send + Sync + Debug {
fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox;
fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox {
self.take_resumed_scheduler(context, initialized_result_with_timings())
}

fn take_resumed_scheduler(
&self,
context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> InstalledSchedulerBox;

fn register_timeout_listener(&self, timeout_listener: TimeoutListener);
}

#[derive(Debug)]
pub struct SchedulerAborted;
pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;

pub struct TimeoutListener {
callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>,
}

impl TimeoutListener {
pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self {
Self {
callback: Box::new(f),
}
}

pub fn trigger(self, pool: InstalledSchedulerPoolArc) {
(self.callback)(pool);
}
}

impl Debug for TimeoutListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TimeoutListener({self:p})")
}
}

#[cfg_attr(doc, aquamarine::aquamarine)]
/// Schedules, executes, and commits transactions under encapsulated implementation
///
Expand Down Expand Up @@ -250,6 +287,61 @@ impl WaitReason {
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum SchedulerStatus {
Unavailable,
Active(InstalledSchedulerBox),
Stale(InstalledSchedulerPoolArc, ResultWithTimings),
}

impl SchedulerStatus {
fn new(scheduler: Option<InstalledSchedulerBox>) -> Self {
match scheduler {
Some(scheduler) => SchedulerStatus::Active(scheduler),
None => SchedulerStatus::Unavailable,
}
}

fn transition_from_stale_to_active(
&mut self,
f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
) {
let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
panic!();
};
*self = Self::Active(f(pool, result_with_timings));
}

pub(crate) fn maybe_transition_from_active_to_stale(
&mut self,
f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
) {
if !matches!(self, Self::Active(_scheduler)) {
return;
}
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
panic!();
};
let (pool, result_with_timings) = f(scheduler);
*self = Self::Stale(pool, result_with_timings);
}

pub(crate) fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
panic!();
};
scheduler
}

pub(crate) fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
panic!();
};
result_with_timings
}
}

/// Very thin wrapper around Arc<Bank>
///
/// It brings type-safety against accidental mixing of bank and scheduler with different slots,
Expand Down Expand Up @@ -277,7 +369,7 @@ pub struct BankWithSchedulerInner {
bank: Arc<Bank>,
scheduler: InstalledSchedulerRwLock,
}
pub type InstalledSchedulerRwLock = RwLock<Option<InstalledSchedulerBox>>;
pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;

impl BankWithScheduler {
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
Expand All @@ -292,7 +384,7 @@ impl BankWithScheduler {
Self {
inner: Arc::new(BankWithSchedulerInner {
bank,
scheduler: RwLock::new(scheduler),
scheduler: RwLock::new(SchedulerStatus::new(scheduler)),
}),
}
}
Expand Down Expand Up @@ -321,7 +413,10 @@ impl BankWithScheduler {
}

pub fn has_installed_scheduler(&self) -> bool {
self.inner.scheduler.read().unwrap().is_some()
!matches!(
&*self.inner.scheduler.read().unwrap(),
SchedulerStatus::Unavailable
)
}

/// Schedule the transaction as long as the scheduler hasn't been aborted.
Expand All @@ -338,31 +433,31 @@ impl BankWithScheduler {
transactions_with_indexes.len()
);

let scheduler_guard = self.inner.scheduler.read().unwrap();
let scheduler = scheduler_guard.as_ref().unwrap();

for (sanitized_transaction, &index) in transactions_with_indexes {
if scheduler
.schedule_execution(&(sanitized_transaction, index))
.is_err()
{
drop(scheduler_guard);
// This write lock isn't atomic with the above the read lock. So, another thread
// could have called .recover_error_after_abort() while we're literally stuck at
// the gaps of these locks (i.e. this comment in source code wise) under extreme
// race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
// consideration in mind.
//
// Lastly, this non-atomic nature is intentional for optimizing the fast code-path
let mut scheduler_guard = self.inner.scheduler.write().unwrap();
let scheduler = scheduler_guard.as_mut().unwrap();
return Err(scheduler.recover_error_after_abort());
let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| {
for (sanitized_transaction, &index) in transactions_with_indexes {
scheduler.schedule_execution(&(sanitized_transaction, index))?;
}
Ok(())
});

if schedule_result.is_err() {
// This write lock isn't atomic with the above the read lock. So, another thread
// could have called .recover_error_after_abort() while we're literally stuck at
// the gaps of these locks (i.e. this comment in source code wise) under extreme
// race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
// consideration in mind.
//
// Lastly, this non-atomic nature is intentional for optimizing the fast code-path
return Err(self.inner.retrieve_error_after_schedule_failure());
}

Ok(())
}

pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
self.inner.do_create_timeout_listener()
}

// take needless &mut only to communicate its semantic mutability to humans...
#[cfg(feature = "dev-context-only-utils")]
pub fn drop_scheduler(&mut self) {
Expand Down Expand Up @@ -391,11 +486,72 @@ impl BankWithScheduler {
}

pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
RwLock::new(None)
RwLock::new(SchedulerStatus::Unavailable)
}
}

impl BankWithSchedulerInner {
fn with_active_scheduler(
self: &Arc<Self>,
f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult,
) -> ScheduleResult {
let scheduler = self.scheduler.read().unwrap();
match &*scheduler {
SchedulerStatus::Active(scheduler) => {
// This is the fast path, needing single read-lock most of time.
f(scheduler)
}
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
Err(SchedulerAborted)
}
SchedulerStatus::Stale(_pool, _result_with_timings) => {
drop(scheduler);

let context = SchedulingContext::new(self.bank.clone());
let mut scheduler = self.scheduler.write().unwrap();
scheduler.transition_from_stale_to_active(|scheduler_pool, result_with_timings| {
scheduler_pool.register_timeout_listener(self.do_create_timeout_listener());
scheduler_pool.take_resumed_scheduler(context, result_with_timings)
});
drop(scheduler);

self.with_active_scheduler(f)
}
SchedulerStatus::Unavailable => panic!(),
}
}

fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
let weak_bank = Arc::downgrade(self);
TimeoutListener::new(move |scheduler_pool| {
let Some(bank) = weak_bank.upgrade() else {
return;
};

let Ok(mut scheduler) = bank.scheduler.write() else {
return;
};

scheduler.maybe_transition_from_active_to_stale(|scheduler| {
let (result_with_timings, uninstalled_scheduler) =
scheduler.wait_for_termination(false);
uninstalled_scheduler.return_to_pool();
(scheduler_pool, result_with_timings)
})
})
}

fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
let mut scheduler = self.scheduler.write().unwrap();
match &mut *scheduler {
SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
result.clone().unwrap_err()
}
_ => panic!(),
}
}

#[must_use]
fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
Self::wait_for_scheduler_termination(
Expand All @@ -419,18 +575,24 @@ impl BankWithSchedulerInner {
);

let mut scheduler = scheduler.write().unwrap();
let (was_noop, result_with_timings) =
if let Some(scheduler) = scheduler.as_mut().filter(|_| reason.is_paused()) {
let (was_noop, result_with_timings) = match &mut *scheduler {
SchedulerStatus::Active(scheduler) if reason.is_paused() => {
scheduler.pause_for_recent_blockhash();
(false, None)
} else if let Some(scheduler) = scheduler.take() {
}
SchedulerStatus::Active(_scheduler) => {
let scheduler = scheduler.transition_from_active_to_unavailable();
let (result_with_timings, uninstalled_scheduler) =
scheduler.wait_for_termination(reason.is_dropped());
uninstalled_scheduler.return_to_pool();
(false, Some(result_with_timings))
} else {
(true, None)
};
}
SchedulerStatus::Stale(_pool, _result_with_timings) => {
let result_with_timings = scheduler.transition_from_stale_to_unavailable();
(true, Some(result_with_timings))
}
SchedulerStatus::Unavailable => (true, None),
};
debug!(
"wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at {:?}...",
bank.slot(),
Expand Down
Loading

0 comments on commit 23f8c65

Please sign in to comment.