Skip to content

Commit

Permalink
Improve SchedulerStatus code and test as follow-up (#1797)
Browse files Browse the repository at this point in the history
* Improve SchedulerStatus code and test as follow-up

* Don't use wait_timeout_while with magic number
  • Loading branch information
ryoqun authored Jun 21, 2024
1 parent 077a651 commit 114d94a
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 49 deletions.
7 changes: 5 additions & 2 deletions runtime/src/installed_scheduler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ impl WaitReason {
pub enum SchedulerStatus {
/// Unified scheduler is disabled or installed scheduler is consumed by wait_for_termination().
/// Note that transition to Unavailable from {Active, Stale} is one-way (i.e. one-time).
/// Also, this variant is transiently used as a placeholder internally when transitioning
/// scheduler statuses, which isn't observable unless panic is happening.
Unavailable,
/// Scheduler is installed into a bank; could be running or just be idling.
/// This will be transitioned to Stale after certain time has passed if its bank hasn't been
Expand Down Expand Up @@ -329,7 +331,7 @@ impl SchedulerStatus {
return;
}
let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else {
unreachable!("not active: {:?}", self);
unreachable!("not active: {self:?}");
};
let (pool, result_with_timings) = f(scheduler);
*self = Self::Stale(pool, result_with_timings);
Expand Down Expand Up @@ -549,7 +551,8 @@ impl BankWithSchedulerInner {
let scheduler = self.scheduler.read().unwrap();
// Re-register a new timeout listener only after acquiring the read lock;
// Otherwise, the listener would again put scheduler into Stale before the read
// lock under an extremely-rare race condition, causing panic below.
// lock under an extremely-rare race condition, causing panic below in
// active_scheduler().
pool.register_timeout_listener(self.do_create_timeout_listener());
f(scheduler.active_scheduler())
}
Expand Down
11 changes: 11 additions & 0 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ where
context: SchedulingContext,
result_with_timings: ResultWithTimings,
) -> S {
assert_matches!(result_with_timings, (Ok(_), _));

// pop is intentional for filo, expecting relatively warmed-up scheduler due to having been
// returned recently
if let Some((inner, _pooled_at)) = self.scheduler_inners.lock().expect("not poisoned").pop()
Expand Down Expand Up @@ -1711,6 +1713,10 @@ mod tests {
&CheckPoint::TimeoutListenerTriggered(0),
&CheckPoint::TimeoutListenerTriggered(1),
&TestCheckPoint::AfterTimeoutListenerTriggered,
&TestCheckPoint::BeforeTimeoutListenerTriggered,
&CheckPoint::TimeoutListenerTriggered(0),
&CheckPoint::TimeoutListenerTriggered(1),
&TestCheckPoint::AfterTimeoutListenerTriggered,
]);

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
Expand Down Expand Up @@ -1778,6 +1784,11 @@ mod tests {
bank.schedule_transaction_executions([(tx_after_stale, &1)].into_iter())
.unwrap();

// Observe second occurrence of TimeoutListenerTriggered(1), which indicates a new timeout
// lister is registered correctly again for reactivated scheduler.
sleepless_testing::at(TestCheckPoint::BeforeTimeoutListenerTriggered);
sleepless_testing::at(TestCheckPoint::AfterTimeoutListenerTriggered);

let (result, timings) = bank.wait_for_completed_scheduler().unwrap();
assert_matches!(result, Ok(()));
// ResultWithTimings should be carried over across active=>stale=>active transitions.
Expand Down
124 changes: 77 additions & 47 deletions unified-scheduler-pool/src/sleepless_testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,29 @@ pub(crate) trait BuilderTracked: Sized {
}

#[cfg(not(test))]
pub(crate) use sleepless_testing_dummy::*;
pub(crate) use dummy::*;
#[cfg(test)]
pub(crate) use sleepless_testing_real::*;
pub(crate) use real::*;

#[cfg(test)]
mod sleepless_testing_real {
mod real {
use {
lazy_static::lazy_static,
log::trace,
std::{
cmp::Ordering::{Equal, Greater, Less},
collections::{HashMap, HashSet},
collections::HashMap,
fmt::Debug,
sync::{Arc, Condvar, Mutex},
thread::{current, JoinHandle, ThreadId},
thread::{current, panicking, JoinHandle, ThreadId},
},
};

#[derive(Debug)]
struct Progress {
_name: String,
check_points: Vec<String>,
current_check_point: Mutex<String>,
current_index: Mutex<usize>,
condvar: Condvar,
}

Expand All @@ -61,61 +62,88 @@ mod sleepless_testing_real {
.into_iter()
.chain(check_points)
.collect::<Vec<_>>();
let check_points_set = check_points.iter().collect::<HashSet<_>>();
assert_eq!(check_points.len(), check_points_set.len());

Self {
_name: name,
check_points,
current_check_point: Mutex::new(initial_check_point),
current_index: Mutex::new(0),
condvar: Condvar::new(),
}
}

fn change_current_check_point(&self, anchored_check_point: String) {
let Some(anchored_index) = self
.check_points
.iter()
.position(|check_point| check_point == &anchored_check_point)
let mut current_index = self.current_index.lock().unwrap();

let Some(anchored_index) = self.anchored_index(*current_index, &anchored_check_point)
else {
// Ignore unrecognizable checkpoints...
trace!("Ignore {} at {:?}", anchored_check_point, current());
return;
};

let mut current_check_point = self.current_check_point.lock().unwrap();

let should_change =
match anchored_index.cmp(&self.expected_next_index(&current_check_point)) {
Equal => true,
Greater => {
// anchor is one of future check points; block the current thread until
// that happens
current_check_point = self
.condvar
.wait_while(current_check_point, |current_check_point| {
anchored_index != self.expected_next_index(current_check_point)
})
.unwrap();
true
}
// anchor is already observed.
Less => false,
};
let next_index = self.expected_next_index(*current_index);
let should_change = match anchored_index.cmp(&next_index) {
Equal => true,
Greater => {
trace!("Blocked on {} at {:?}", anchored_check_point, current());
// anchor is one of future check points; block the current thread until
// that happens
current_index = self
.condvar
.wait_while(current_index, |&mut current_index| {
let Some(anchored_index) =
self.anchored_index(current_index, &anchored_check_point)
else {
// don't wait. seems the progress is made by other threads
// anchored to the same checkpoint.
return false;
};
let next_index = self.expected_next_index(current_index);

// determine we should wait further or not
match anchored_index.cmp(&next_index) {
Equal => false,
Greater => {
trace!(
"Re-blocked on {} ({} != {}) at {:?}",
anchored_check_point,
anchored_index,
next_index,
current()
);
true
}
Less => unreachable!(),
}
})
.unwrap();
true
}
Less => unreachable!(),
};

if should_change {
*current_check_point = anchored_check_point;
if *current_index != anchored_index {
trace!("Progressed to: {} at {:?}", anchored_check_point, current());
*current_index = anchored_index;
}

self.condvar.notify_all();
}
}

fn expected_next_index(&self, current_check_point: &String) -> usize {
let current_index = self
.check_points
.iter()
.position(|check_point| check_point == current_check_point)
.unwrap();
fn expected_next_index(&self, current_index: usize) -> usize {
current_index.checked_add(1).unwrap()
}

fn anchored_index(
&self,
current_index: usize,
anchored_check_point: &String,
) -> Option<usize> {
self.check_points[current_index..]
.iter()
.position(|check_point| check_point == anchored_check_point)
.map(|subslice_index| subslice_index.checked_add(current_index).unwrap())
}
}

lazy_static! {
Expand All @@ -142,11 +170,13 @@ mod sleepless_testing_real {
}

fn deactivate(&self) {
assert_eq!(
*self.0.check_points.last().unwrap(),
*self.0.current_check_point.lock().unwrap(),
"unfinished progress"
);
if !panicking() {
assert_eq!(
self.0.check_points.len().checked_sub(1).unwrap(),
*self.0.current_index.lock().unwrap(),
"unfinished progress"
);
}
THREAD_REGISTRY.lock().unwrap().remove(&self.1).unwrap();
}
}
Expand Down Expand Up @@ -299,7 +329,7 @@ mod sleepless_testing_real {
}

#[cfg(not(test))]
mod sleepless_testing_dummy {
mod dummy {
use std::fmt::Debug;

#[inline]
Expand Down

0 comments on commit 114d94a

Please sign in to comment.