Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return back stale out-of-pool scheduler by timeout #1690

Merged
merged 5 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion runtime/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ impl BankForks {
let bank = if let Some(scheduler_pool) = &self.scheduler_pool {
let context = SchedulingContext::new(bank.clone());
let scheduler = scheduler_pool.take_scheduler(context);
BankWithScheduler::new(bank, Some(scheduler))
let bank_with_scheduler = BankWithScheduler::new(bank, Some(scheduler));
scheduler_pool.register_timeout_listener(bank_with_scheduler.create_timeout_listener());
bank_with_scheduler
} else {
BankWithScheduler::new_without_scheduler(bank)
};
Expand Down
276 changes: 245 additions & 31 deletions runtime/src/installed_scheduler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,59 @@ use {
transaction::{Result, SanitizedTransaction, TransactionError},
},
std::{
fmt::Debug,
fmt::{self, Debug},
mem,
ops::Deref,
sync::{Arc, RwLock},
},
};
#[cfg(feature = "dev-context-only-utils")]
use {mockall::automock, qualifier_attr::qualifiers};

pub fn initialized_result_with_timings() -> ResultWithTimings {
(Ok(()), ExecuteTimings::default())
}

pub trait InstalledSchedulerPool: Send + Sync + Debug {
fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox;
fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox {
self.take_resumed_scheduler(context, initialized_result_with_timings())
}

fn take_resumed_scheduler(
&self,
context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> InstalledSchedulerBox;

fn register_timeout_listener(&self, timeout_listener: TimeoutListener);
}

#[derive(Debug)]
pub struct SchedulerAborted;
pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;

pub struct TimeoutListener {
callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>,
}

impl TimeoutListener {
pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self {
Self {
callback: Box::new(f),
}
}

pub fn trigger(self, pool: InstalledSchedulerPoolArc) {
(self.callback)(pool);
}
}

impl Debug for TimeoutListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TimeoutListener({self:p})")
}
}

#[cfg_attr(doc, aquamarine::aquamarine)]
/// Schedules, executes, and commits transactions under encapsulated implementation
///
Expand Down Expand Up @@ -250,6 +287,76 @@ impl WaitReason {
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum SchedulerStatus {
/// Unified scheduler is disabled or installed scheduler is consumed by wait_for_termination().
/// Note that transition to Unavailable from {Active, Stale} is one-way (i.e. one-time).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seem strictly true, because it seems to also be used as an intermediate state when transitioning between active <-> stale?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, nice you're spotting this. I've omitted for simplicity here. I will do a follow-up for completeness.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be addressed by: #1797

Unavailable,
/// Scheduler is installed into a bank; could be running or just be idling.
/// This will be transitioned to Stale after certain time has passed if its bank hasn't been
/// frozen.
Active(InstalledSchedulerBox),
/// Scheduler is idling for long time, returning scheduler back to the pool.
/// This will be immediately (i.e. transaparently) transitioned to Active as soon as there's
/// new transaction to be executed.
Stale(InstalledSchedulerPoolArc, ResultWithTimings),
}

impl SchedulerStatus {
fn new(scheduler: Option<InstalledSchedulerBox>) -> Self {
match scheduler {
Some(scheduler) => SchedulerStatus::Active(scheduler),
None => SchedulerStatus::Unavailable,
}
}

fn transition_from_stale_to_active(
&mut self,
f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
) {
let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
panic!("transition to Active failed: {self:?}");
};
*self = Self::Active(f(pool, result_with_timings));
}

fn maybe_transition_from_active_to_stale(
&mut self,
f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
) {
if !matches!(self, Self::Active(_scheduler)) {
return;
}
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
unreachable!("not active: {:?}", self);
};
let (pool, result_with_timings) = f(scheduler);
*self = Self::Stale(pool, result_with_timings);
}

fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
panic!("transition to Unavailable failed: {self:?}");
};
scheduler
}

fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
panic!("transition to Unavailable failed: {self:?}");
};
result_with_timings
}

fn active_scheduler(&self) -> &InstalledSchedulerBox {
let SchedulerStatus::Active(active_scheduler) = self else {
panic!("not active: {self:?}");
};
active_scheduler
}
}

/// Very thin wrapper around Arc<Bank>
///
/// It brings type-safety against accidental mixing of bank and scheduler with different slots,
Expand Down Expand Up @@ -277,7 +384,7 @@ pub struct BankWithSchedulerInner {
bank: Arc<Bank>,
scheduler: InstalledSchedulerRwLock,
}
pub type InstalledSchedulerRwLock = RwLock<Option<InstalledSchedulerBox>>;
pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;

impl BankWithScheduler {
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
Expand All @@ -292,7 +399,7 @@ impl BankWithScheduler {
Self {
inner: Arc::new(BankWithSchedulerInner {
bank,
scheduler: RwLock::new(scheduler),
scheduler: RwLock::new(SchedulerStatus::new(scheduler)),
}),
}
}
Expand Down Expand Up @@ -321,13 +428,19 @@ impl BankWithScheduler {
}

pub fn has_installed_scheduler(&self) -> bool {
self.inner.scheduler.read().unwrap().is_some()
!matches!(
&*self.inner.scheduler.read().unwrap(),
SchedulerStatus::Unavailable
)
}

/// Schedule the transaction as long as the scheduler hasn't been aborted.
///
/// If the scheduler has been aborted, this doesn't schedule the transaction, instead just
/// return the error of prior scheduled transaction.
///
/// Calling this will panic if the installed scheduler is Unavailable (the bank is
/// wait_for_termination()-ed or the unified scheduler is disabled in the first place).
// 'a is needed; anonymous_lifetime_in_impl_trait isn't stabilized yet...
pub fn schedule_transaction_executions<'a>(
&self,
Expand All @@ -338,31 +451,32 @@ impl BankWithScheduler {
transactions_with_indexes.len()
);

let scheduler_guard = self.inner.scheduler.read().unwrap();
let scheduler = scheduler_guard.as_ref().unwrap();

for (sanitized_transaction, &index) in transactions_with_indexes {
if scheduler
.schedule_execution(&(sanitized_transaction, index))
.is_err()
{
drop(scheduler_guard);
// This write lock isn't atomic with the above the read lock. So, another thread
// could have called .recover_error_after_abort() while we're literally stuck at
// the gaps of these locks (i.e. this comment in source code wise) under extreme
// race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
// consideration in mind.
//
// Lastly, this non-atomic nature is intentional for optimizing the fast code-path
let mut scheduler_guard = self.inner.scheduler.write().unwrap();
let scheduler = scheduler_guard.as_mut().unwrap();
return Err(scheduler.recover_error_after_abort());
let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| {
for (sanitized_transaction, &index) in transactions_with_indexes {
scheduler.schedule_execution(&(sanitized_transaction, index))?;
}
Ok(())
});

if schedule_result.is_err() {
// This write lock isn't atomic with the above the read lock. So, another thread
// could have called .recover_error_after_abort() while we're literally stuck at
// the gaps of these locks (i.e. this comment in source code wise) under extreme
// race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
// consideration in mind.
//
// Lastly, this non-atomic nature is intentional for optimizing the fast code-path
return Err(self.inner.retrieve_error_after_schedule_failure());
}

Ok(())
}

#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
self.inner.do_create_timeout_listener()
}

// take needless &mut only to communicate its semantic mutability to humans...
#[cfg(feature = "dev-context-only-utils")]
pub fn drop_scheduler(&mut self) {
Expand Down Expand Up @@ -391,11 +505,101 @@ impl BankWithScheduler {
}

pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
RwLock::new(None)
RwLock::new(SchedulerStatus::Unavailable)
}
}

impl BankWithSchedulerInner {
fn with_active_scheduler(
self: &Arc<Self>,
f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult,
) -> ScheduleResult {
let scheduler = self.scheduler.read().unwrap();
match &*scheduler {
SchedulerStatus::Active(scheduler) => {
// This is the fast path, needing single read-lock most of time.
f(scheduler)
}
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
trace!(
"with_active_scheduler: bank (slot: {}) has a stale aborted scheduler...",
self.bank.slot(),
);
Err(SchedulerAborted)
}
SchedulerStatus::Stale(pool, _result_with_timings) => {
let pool = pool.clone();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to clone the pool?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because this pool is borrowed and we need to re-lock scheduler with the write access:

error[E0505]: cannot move out of `scheduler` because it is borrowed
   --> runtime/src/installed_scheduler_pool.rs:532:22
    |
517 |         let scheduler = self.scheduler.read().unwrap();
    |             --------- binding `scheduler` declared here
518 |         match &*scheduler {
    |                 --------- borrow of `scheduler` occurs here
...
532 |                 drop(scheduler);
    |                      ^^^^^^^^^ move out of `scheduler` occurs here
...
552 |                 pool.register_timeout_listener(self.do_create_timeout_listener());
    |                 ---- borrow later used here

For more information about this error, try `rustc --explain E0505`.
error: could not compile `solana-runtime` (lib) due to 1 previous error

drop(scheduler);

let context = SchedulingContext::new(self.bank.clone());
let mut scheduler = self.scheduler.write().unwrap();
trace!("with_active_scheduler: {:?}", scheduler);
scheduler.transition_from_stale_to_active(|pool, result_with_timings| {
let scheduler = pool.take_resumed_scheduler(context, result_with_timings);
info!(
"with_active_scheduler: bank (slot: {}) got active, taking scheduler (id: {})",
self.bank.slot(),
scheduler.id(),
);
scheduler
});
drop(scheduler);

let scheduler = self.scheduler.read().unwrap();
// Re-register a new timeout listener only after acquiring the read lock;
// Otherwise, the listener would again put scheduler into Stale before the read
// lock under an extremely-rare race condition, causing panic below.
pool.register_timeout_listener(self.do_create_timeout_listener());
f(scheduler.active_scheduler())
}
SchedulerStatus::Unavailable => unreachable!("no installed scheduler"),
}
}

fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
let weak_bank = Arc::downgrade(self);
TimeoutListener::new(move |pool| {
let Some(bank) = weak_bank.upgrade() else {
return;
};

let Ok(mut scheduler) = bank.scheduler.write() else {
return;
};

scheduler.maybe_transition_from_active_to_stale(|scheduler| {
// The scheduler hasn't still been wait_for_termination()-ed after awhile...
// Return the installed scheduler back to the scheduler pool as soon as the
// scheduler gets idle after executing all currently-scheduled transactions.

let id = scheduler.id();
let (result_with_timings, uninstalled_scheduler) =
scheduler.wait_for_termination(false);
uninstalled_scheduler.return_to_pool();
info!(
"timeout_listener: bank (slot: {}) got stale, returning scheduler (id: {})",
bank.bank.slot(),
id,
);
(pool, result_with_timings)
});
trace!("timeout_listener: {:?}", scheduler);
})
}

/// This must not be called until `Err(SchedulerAborted)` is observed. Violating this should
/// `panic!()`.
fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
let mut scheduler = self.scheduler.write().unwrap();
match &mut *scheduler {
SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
result.clone().unwrap_err()
}
_ => unreachable!("no error in {:?}", self.scheduler),
}
}

#[must_use]
fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
Self::wait_for_scheduler_termination(
Expand All @@ -419,18 +623,24 @@ impl BankWithSchedulerInner {
);

let mut scheduler = scheduler.write().unwrap();
let (was_noop, result_with_timings) =
if let Some(scheduler) = scheduler.as_mut().filter(|_| reason.is_paused()) {
let (was_noop, result_with_timings) = match &mut *scheduler {
SchedulerStatus::Active(scheduler) if reason.is_paused() => {
scheduler.pause_for_recent_blockhash();
(false, None)
} else if let Some(scheduler) = scheduler.take() {
}
SchedulerStatus::Active(_scheduler) => {
let scheduler = scheduler.transition_from_active_to_unavailable();
let (result_with_timings, uninstalled_scheduler) =
scheduler.wait_for_termination(reason.is_dropped());
uninstalled_scheduler.return_to_pool();
(false, Some(result_with_timings))
} else {
(true, None)
};
}
SchedulerStatus::Stale(_pool, _result_with_timings) => {
let result_with_timings = scheduler.transition_from_stale_to_unavailable();
(true, Some(result_with_timings))
}
SchedulerStatus::Unavailable => (true, None),
};
debug!(
"wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at {:?}...",
bank.slot(),
Expand All @@ -439,6 +649,10 @@ impl BankWithSchedulerInner {
result_with_timings.as_ref().map(|(result, _)| result),
std::thread::current(),
);
trace!(
"wait_for_scheduler_termination(result_with_timings: {:?})",
result_with_timings,
);

result_with_timings
}
Expand Down
Loading