Skip to content

Commit

Permalink
Make PCT scheduling not linear in max number of tasks (release v0.4.1) (
Browse files Browse the repository at this point in the history
#84)

* Make PCT scheduling not linear in max number of tasks

Currently our PCT implementation does a linear search through the list
of priorities. This list's length is determined by the maximum number of
tasks a test has ever created. For tests that create many short-lived
tasks (like many async tests), this means we'll be doing a very
expensive linear scan just to choose from a small set of runnable tasks.

Instead, maintain priorities as a map, and look up the runnable tasks in
the map. This does a little bit more work in the case where |priorities|
~= |runnable|, because there's no early exit any more, but we expect the
runnable set to generally be fairly small and often significantly
smaller than the total set.

* Fix new Clippy lints

* Release v0.4.1
  • Loading branch information
jamesbornholt authored Nov 14, 2022
1 parent 1c87c0e commit 5e0034f
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shuttle"
version = "0.4.0"
version = "0.4.1"
edition = "2021"
license = "Apache-2.0"
description = "A library for testing concurrent Rust code"
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl ExecutionState {
if EXECUTION_STATE.is_set() {
EXECUTION_STATE.with(|cell| {
if let Ok(mut state) = cell.try_borrow_mut() {
Some(f(&mut *state))
Some(f(&mut state))
} else {
None
}
Expand Down
69 changes: 49 additions & 20 deletions src/scheduler/pct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use rand::rngs::OsRng;
use rand::seq::{index::sample, SliceRandom};
use rand::{Rng, RngCore, SeedableRng};
use rand_pcg::Pcg64Mcg;
use std::collections::{HashMap, HashSet};

/// A scheduler that implements the Probabilistic Concurrency Testing (PCT) algorithm.
///
Expand All @@ -20,8 +21,9 @@ pub struct PctScheduler {
max_iterations: usize,
max_depth: usize,
iterations: usize,
// invariant: queue is downward closed; contains all elements in range [0, len)
priority_queue: Vec<TaskId>,
// invariants: every TaskId in [0, len) appears as a key exactly once; all values are distinct
priorities: HashMap<TaskId, usize>,
next_priority: usize,
// invariant: length is self.max_depth - 1
change_points: Vec<usize>,
max_steps: usize,
Expand All @@ -46,7 +48,8 @@ impl PctScheduler {
max_iterations,
max_depth,
iterations: 0,
priority_queue: (0..DEFAULT_INLINE_TASKS).map(TaskId::from).collect::<Vec<_>>(),
priorities: (0..DEFAULT_INLINE_TASKS).map(|i| (TaskId::from(i), i)).collect(),
next_priority: DEFAULT_INLINE_TASKS,
change_points: vec![],
max_steps: 0,
steps: 0,
Expand All @@ -69,8 +72,20 @@ impl Scheduler for PctScheduler {
if self.iterations > 0 {
assert!(self.max_steps > 0);

// Initialize priorities by shuffling the task IDs
self.priority_queue.shuffle(&mut self.rng);
// Priorities are always distinct
debug_assert_eq!(
self.priorities.iter().collect::<HashSet<_>>().len(),
self.priorities.len()
);

// Reinitialize priorities by shuffling the task IDs
let mut priorities = (0..self.priorities.len()).collect::<Vec<_>>();
priorities.shuffle(&mut self.rng);
for (i, priority) in priorities.into_iter().enumerate() {
let old = self.priorities.insert(TaskId::from(i), priority);
debug_assert!(old.is_some(), "priority queue invariant");
}
self.next_priority = self.priorities.len();

// Initialize change points by sampling from the current max_steps. We skip step 0
// because there's no point making a priority change before any tasks have run; the
Expand All @@ -90,13 +105,24 @@ impl Scheduler for PctScheduler {
}

fn next_task(&mut self, runnable: &[TaskId], current: Option<TaskId>, is_yielding: bool) -> Option<TaskId> {
// If any new tasks were created, assign them priorities at random
let known_tasks = self.priority_queue.len();
let max_tasks = usize::from(*runnable.iter().max().unwrap());

for tid in known_tasks..1 + max_tasks {
let index = self.rng.gen_range(0, self.priority_queue.len() + 1);
self.priority_queue.insert(index, TaskId::from(tid));
// If any new tasks were created, assign them priorities by randomly swapping them with an
// existing task's priority, so we maintain the invariant that every priority is distinct
let max_known_task = self.priorities.len();
let max_new_task = usize::from(*runnable.iter().max().unwrap());
for new_task_id in max_known_task..1 + max_new_task {
let new_task_id = TaskId::from(new_task_id);
// Make sure there's a chance to give the new task the lowest priority
let target_task_id = TaskId::from(self.rng.gen_range(0, self.priorities.len()) + 1);
let new_task_priority = if target_task_id == new_task_id {
self.next_priority
} else {
self.priorities
.insert(target_task_id, self.next_priority)
.expect("priority queue invariant")
};
let old = self.priorities.insert(new_task_id, new_task_priority);
debug_assert!(old.is_none(), "priority queue invariant");
self.next_priority += 1;
}

// No point doing priority changes when there's only one runnable task. This also means that
Expand All @@ -108,13 +134,11 @@ impl Scheduler for PctScheduler {
// TODO is this really correct? need to think about it more
if runnable.len() > 1 {
if self.change_points.contains(&self.steps) || is_yielding {
// Deprioritize `current` by moving it to the end of the list
// TODO in the paper, the i'th change point gets priority i, whereas this gives d-i.
// TODO I don't think this matters, because the change points are randomized.
// Deprioritize the current task by lowering its priority to self.next_priority
let current = current.expect("self.steps > 0 should mean a task has run");
let idx = self.priority_queue.iter().position(|tid| *tid == current).unwrap();
self.priority_queue.remove(idx);
self.priority_queue.push(current);
let old = self.priorities.insert(current, self.next_priority);
debug_assert!(old.is_some(), "priority queue invariant");
self.next_priority += 1;
}

self.steps += 1;
Expand All @@ -123,8 +147,13 @@ impl Scheduler for PctScheduler {
}
}

// Choose the highest-priority (== earliest in the queue) runnable task
Some(*self.priority_queue.iter().find(|tid| runnable.contains(tid)).unwrap())
// Choose the highest-priority (== lowest priority value) runnable task
Some(
*runnable
.iter()
.min_by_key(|tid| self.priorities.get(tid))
.expect("priority queue invariant"),
)
}

fn next_u64(&mut self) -> u64 {
Expand Down
4 changes: 2 additions & 2 deletions src/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,13 @@ impl<T: ?Sized> Deref for MutexGuard<'_, T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&**self.inner.as_ref().unwrap()
self.inner.as_ref().unwrap()
}
}

impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut **self.inner.as_mut().unwrap()
self.inner.as_mut().unwrap()
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/sync/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl<T: ?Sized> RwLock<T> {
});

// Block all other waiters, since we won the race to take this lock
Self::block_waiters(&*state, me, typ);
Self::block_waiters(&state, me, typ);
drop(state);

// We need to let other threads in here so they may fail a `try_read` or `try_write`. This
Expand Down Expand Up @@ -327,7 +327,7 @@ impl<T: ?Sized> RwLock<T> {
});

// Block all other waiters, since we won the race to take this lock
Self::block_waiters(&*state, me, typ);
Self::block_waiters(&state, me, typ);
drop(state);

// We need to let other threads in here so they
Expand Down Expand Up @@ -459,7 +459,7 @@ impl<T: ?Sized> Drop for RwLockReadGuard<'_, T> {

// Unblock every thread waiting on this lock. The scheduler will choose one of them to win
// the race to this lock, and that thread will re-block all the losers.
RwLock::<T>::unblock_waiters(&*state, self.me, RwLockType::Read);
RwLock::<T>::unblock_waiters(&state, self.me, RwLockType::Read);

drop(state);

Expand Down Expand Up @@ -529,7 +529,7 @@ impl<T: ?Sized> Drop for RwLockWriteGuard<'_, T> {

// Unblock every thread waiting on this lock. The scheduler will choose one of them to win
// the race to this lock, and that thread will re-block all the losers.
RwLock::<T>::unblock_waiters(&*state, self.me, RwLockType::Write);
RwLock::<T>::unblock_waiters(&state, self.me, RwLockType::Write);
drop(state);

// Releasing a lock is a yield point
Expand Down

0 comments on commit 5e0034f

Please sign in to comment.