Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return back stale out-of-pool scheduler by timeout #1690

Merged
merged 5 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion runtime/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ impl BankForks {
let bank = if let Some(scheduler_pool) = &self.scheduler_pool {
let context = SchedulingContext::new(bank.clone());
let scheduler = scheduler_pool.take_scheduler(context);
BankWithScheduler::new(bank, Some(scheduler))
let bank_with_scheduler = BankWithScheduler::new(bank, Some(scheduler));
scheduler_pool.register_timeout_listener(bank_with_scheduler.create_timeout_listener());
bank_with_scheduler
} else {
BankWithScheduler::new_without_scheduler(bank)
};
Expand Down
224 changes: 193 additions & 31 deletions runtime/src/installed_scheduler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,59 @@ use {
transaction::{Result, SanitizedTransaction, TransactionError},
},
std::{
fmt::Debug,
fmt::{self, Debug},
mem,
ops::Deref,
sync::{Arc, RwLock},
},
};
#[cfg(feature = "dev-context-only-utils")]
use {mockall::automock, qualifier_attr::qualifiers};

pub fn initialized_result_with_timings() -> ResultWithTimings {
(Ok(()), ExecuteTimings::default())
}

pub trait InstalledSchedulerPool: Send + Sync + Debug {
fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox;
fn take_scheduler(&self, context: SchedulingContext) -> InstalledSchedulerBox {
self.take_resumed_scheduler(context, initialized_result_with_timings())
}

fn take_resumed_scheduler(
&self,
context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> InstalledSchedulerBox;

fn register_timeout_listener(&self, timeout_listener: TimeoutListener);
}

#[derive(Debug)]
pub struct SchedulerAborted;
pub type ScheduleResult = std::result::Result<(), SchedulerAborted>;

pub struct TimeoutListener {
callback: Box<dyn FnOnce(InstalledSchedulerPoolArc) + Sync + Send>,
}

impl TimeoutListener {
pub(crate) fn new(f: impl FnOnce(InstalledSchedulerPoolArc) + Sync + Send + 'static) -> Self {
Self {
callback: Box::new(f),
}
}

pub fn trigger(self, pool: InstalledSchedulerPoolArc) {
(self.callback)(pool);
}
}

impl Debug for TimeoutListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TimeoutListener({self:p})")
}
}

#[cfg_attr(doc, aquamarine::aquamarine)]
/// Schedules, executes, and commits transactions under encapsulated implementation
///
Expand Down Expand Up @@ -250,6 +287,61 @@ impl WaitReason {
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum SchedulerStatus {
Unavailable,
Active(InstalledSchedulerBox),
Stale(InstalledSchedulerPoolArc, ResultWithTimings),
}

impl SchedulerStatus {
fn new(scheduler: Option<InstalledSchedulerBox>) -> Self {
match scheduler {
Some(scheduler) => SchedulerStatus::Active(scheduler),
None => SchedulerStatus::Unavailable,
}
}

fn transition_from_stale_to_active(
&mut self,
f: impl FnOnce(InstalledSchedulerPoolArc, ResultWithTimings) -> InstalledSchedulerBox,
) {
let Self::Stale(pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
panic!();
apfitzge marked this conversation as resolved.
Show resolved Hide resolved
};
*self = Self::Active(f(pool, result_with_timings));
}

pub(crate) fn maybe_transition_from_active_to_stale(
&mut self,
f: impl FnOnce(InstalledSchedulerBox) -> (InstalledSchedulerPoolArc, ResultWithTimings),
) {
if !matches!(self, Self::Active(_scheduler)) {
return;
}
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
panic!();
};
let (pool, result_with_timings) = f(scheduler);
*self = Self::Stale(pool, result_with_timings);
}

pub(crate) fn transition_from_active_to_unavailable(&mut self) -> InstalledSchedulerBox {
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
panic!();
};
scheduler
}

pub(crate) fn transition_from_stale_to_unavailable(&mut self) -> ResultWithTimings {
let Self::Stale(_pool, result_with_timings) = mem::replace(self, Self::Unavailable) else {
panic!();
};
result_with_timings
}
}

/// Very thin wrapper around Arc<Bank>
///
/// It brings type-safety against accidental mixing of bank and scheduler with different slots,
Expand Down Expand Up @@ -277,7 +369,7 @@ pub struct BankWithSchedulerInner {
bank: Arc<Bank>,
scheduler: InstalledSchedulerRwLock,
}
pub type InstalledSchedulerRwLock = RwLock<Option<InstalledSchedulerBox>>;
pub type InstalledSchedulerRwLock = RwLock<SchedulerStatus>;

impl BankWithScheduler {
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
Expand All @@ -292,7 +384,7 @@ impl BankWithScheduler {
Self {
inner: Arc::new(BankWithSchedulerInner {
bank,
scheduler: RwLock::new(scheduler),
scheduler: RwLock::new(SchedulerStatus::new(scheduler)),
}),
}
}
Expand Down Expand Up @@ -321,7 +413,10 @@ impl BankWithScheduler {
}

pub fn has_installed_scheduler(&self) -> bool {
self.inner.scheduler.read().unwrap().is_some()
!matches!(
&*self.inner.scheduler.read().unwrap(),
SchedulerStatus::Unavailable
)
}

/// Schedule the transaction as long as the scheduler hasn't been aborted.
Expand All @@ -338,31 +433,31 @@ impl BankWithScheduler {
transactions_with_indexes.len()
);

let scheduler_guard = self.inner.scheduler.read().unwrap();
let scheduler = scheduler_guard.as_ref().unwrap();

for (sanitized_transaction, &index) in transactions_with_indexes {
if scheduler
.schedule_execution(&(sanitized_transaction, index))
.is_err()
{
drop(scheduler_guard);
// This write lock isn't atomic with the above the read lock. So, another thread
// could have called .recover_error_after_abort() while we're literally stuck at
// the gaps of these locks (i.e. this comment in source code wise) under extreme
// race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
// consideration in mind.
//
// Lastly, this non-atomic nature is intentional for optimizing the fast code-path
let mut scheduler_guard = self.inner.scheduler.write().unwrap();
let scheduler = scheduler_guard.as_mut().unwrap();
return Err(scheduler.recover_error_after_abort());
let schedule_result: ScheduleResult = self.inner.with_active_scheduler(|scheduler| {
for (sanitized_transaction, &index) in transactions_with_indexes {
scheduler.schedule_execution(&(sanitized_transaction, index))?;
}
Ok(())
});

if schedule_result.is_err() {
// This write lock isn't atomic with the above the read lock. So, another thread
// could have called .recover_error_after_abort() while we're literally stuck at
// the gaps of these locks (i.e. this comment in source code wise) under extreme
// race conditions. Thus, .recover_error_after_abort() is made idempotetnt for that
// consideration in mind.
//
// Lastly, this non-atomic nature is intentional for optimizing the fast code-path
return Err(self.inner.retrieve_error_after_schedule_failure());
}

Ok(())
}

pub(crate) fn create_timeout_listener(&self) -> TimeoutListener {
self.inner.do_create_timeout_listener()
}

// take needless &mut only to communicate its semantic mutability to humans...
#[cfg(feature = "dev-context-only-utils")]
pub fn drop_scheduler(&mut self) {
Expand Down Expand Up @@ -391,11 +486,72 @@ impl BankWithScheduler {
}

pub const fn no_scheduler_available() -> InstalledSchedulerRwLock {
RwLock::new(None)
RwLock::new(SchedulerStatus::Unavailable)
}
}

impl BankWithSchedulerInner {
fn with_active_scheduler(
self: &Arc<Self>,
f: impl FnOnce(&InstalledSchedulerBox) -> ScheduleResult,
) -> ScheduleResult {
let scheduler = self.scheduler.read().unwrap();
match &*scheduler {
SchedulerStatus::Active(scheduler) => {
// This is the fast path, needing single read-lock most of time.
f(scheduler)
}
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
Err(SchedulerAborted)
}
SchedulerStatus::Stale(_pool, _result_with_timings) => {
drop(scheduler);

let context = SchedulingContext::new(self.bank.clone());
let mut scheduler = self.scheduler.write().unwrap();
scheduler.transition_from_stale_to_active(|scheduler_pool, result_with_timings| {
scheduler_pool.register_timeout_listener(self.do_create_timeout_listener());
scheduler_pool.take_resumed_scheduler(context, result_with_timings)
});
drop(scheduler);

self.with_active_scheduler(f)
apfitzge marked this conversation as resolved.
Show resolved Hide resolved
}
SchedulerStatus::Unavailable => panic!(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again should have an error message here.

Unavailable seems to be used for more than just transitions (at first glance), so I am still trying to convince myself this cannot happen.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done: 06b7bff

}
}

fn do_create_timeout_listener(self: &Arc<Self>) -> TimeoutListener {
let weak_bank = Arc::downgrade(self);
TimeoutListener::new(move |scheduler_pool| {
let Some(bank) = weak_bank.upgrade() else {
return;
};

let Ok(mut scheduler) = bank.scheduler.write() else {
return;
};

scheduler.maybe_transition_from_active_to_stale(|scheduler| {
let (result_with_timings, uninstalled_scheduler) =
scheduler.wait_for_termination(false);
uninstalled_scheduler.return_to_pool();
(scheduler_pool, result_with_timings)
})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

want to make sure I understand this correctly.
The closure here is called before we transition to Stale. So we wait for termination while the scheduler is still in active state.

I guess I'm not sure what happens here; it seems this intended to kill an extremely long running scheduler. But description of wait_for_termination says

    /// This function blocks the current thread while waiting for the scheduler to complete all of
    /// the executions for the scheduled transactions and to return the finalized
    /// `ResultWithTimings`. This function still blocks for short period of time even in the case
    /// of aborted schedulers to gracefully shutdown the scheduler (like thread joining).

So if it's extremely long-running, aren't we still blocked here? I assume I am missing something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The closure here is called before we transition to Stale. So we wait for termination while the scheduler is still in active state.

yes. this is true.

it seems this intended to kill an extremely long running scheduler.

well, this isn't true. the intention here is to gracefully terminate extremely long idling scheduler. This is to reclaim the idling os native threads to return them to the scheduler pool.

So if it's extremely long-running, aren't we still blocked here? I assume I am missing something.

this is correct. The situation is same both for blockstore processor and unified scheduler from the very beginnings.

Copy link
Member Author

@ryoqun ryoqun Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, also note that even while the scheduler constantly running with new transactions are arriving at the time of TimeoutListener triggering, this wait_for_termiantion() will eventually return. That's because we're taking the write lock and it will stop newer transactions are entered to the scheduler. So it's guaranteed that there's some amount of bounded work before completion of wait_for_termination() at this point, assuming there's no infinite loop bug in svm.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, added some explanation from this convo: 06b7bff

})
}

fn retrieve_error_after_schedule_failure(&self) -> TransactionError {
let mut scheduler = self.scheduler.write().unwrap();
match &mut *scheduler {
SchedulerStatus::Active(scheduler) => scheduler.recover_error_after_abort(),
SchedulerStatus::Stale(_pool, (result, _timings)) if result.is_err() => {
result.clone().unwrap_err()
}
_ => panic!(),
}
}

#[must_use]
fn wait_for_completed_scheduler_from_drop(&self) -> Option<ResultWithTimings> {
Self::wait_for_scheduler_termination(
Expand All @@ -419,18 +575,24 @@ impl BankWithSchedulerInner {
);

let mut scheduler = scheduler.write().unwrap();
let (was_noop, result_with_timings) =
if let Some(scheduler) = scheduler.as_mut().filter(|_| reason.is_paused()) {
let (was_noop, result_with_timings) = match &mut *scheduler {
SchedulerStatus::Active(scheduler) if reason.is_paused() => {
scheduler.pause_for_recent_blockhash();
(false, None)
} else if let Some(scheduler) = scheduler.take() {
}
SchedulerStatus::Active(_scheduler) => {
let scheduler = scheduler.transition_from_active_to_unavailable();
let (result_with_timings, uninstalled_scheduler) =
scheduler.wait_for_termination(reason.is_dropped());
uninstalled_scheduler.return_to_pool();
(false, Some(result_with_timings))
} else {
(true, None)
};
}
SchedulerStatus::Stale(_pool, _result_with_timings) => {
let result_with_timings = scheduler.transition_from_stale_to_unavailable();
(true, Some(result_with_timings))
}
SchedulerStatus::Unavailable => (true, None),
};
debug!(
"wait_for_scheduler_termination(slot: {}, reason: {:?}): noop: {:?}, result: {:?} at {:?}...",
bank.slot(),
Expand Down
Loading