Skip to content

Commit

Permalink
rt(threaded): cap LIFO slot polls (tokio-rs#5712)
Browse files Browse the repository at this point in the history
As an optimization to improve locality, the multi-threaded scheduler
maintains a single slot (LIFO slot). When a task is scheduled, it goes
into the LIFO slot. The scheduler will run tasks in the LIFO slot first
before checking the local queue.

Ping-ping style workloads where task A notifies task B, which
notifies task A again, can cause starvation as these two tasks 
repeatedly schedule the other in the LIFO slot. tokio-rs#5686, a first
attempt at solving this problem, consumes a unit of budget each time a
task is scheduled from the LIFO slot. However, at the time of this
commit, the scheduler allocates 128 units of budget for each chunk of
work. This is relatively high in situations where tasks do not perform many
async operations yet have meaningful poll times (even 5-10 microsecond
poll times can have an outsized impact on the scheduler).

In an ideal world, the scheduler would adapt to the workload it is
executing. However, as a stopgap, this commit limits the times
the LIFO slot is prioritized per scheduler tick.
  • Loading branch information
carllerche authored May 23, 2023
1 parent 3a94eb0 commit 9eb3f5b
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 30 deletions.
26 changes: 26 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ jobs:
- wasm32-wasi
- check-external-types
- check-fuzzing
- check-unstable-mt-counters
steps:
- run: exit 0

Expand Down Expand Up @@ -233,6 +234,31 @@ jobs:
# the unstable cfg to RustDoc
RUSTDOCFLAGS: --cfg tokio_unstable --cfg tokio_taskdump

check-unstable-mt-counters:
name: check tokio full --internal-mt-counters
needs: basics
runs-on: ${{ matrix.os }}
strategy:
matrix:
include:
- os: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
# Run `tokio` with "unstable" and "taskdump" cfg flags.
- name: check tokio full --cfg unstable --cfg internal-mt-counters
run: cargo test --all-features
working-directory: tokio
env:
RUSTFLAGS: --cfg tokio_unstable --cfg tokio_internal_mt_counters -Dwarnings
# in order to run doctests for unstable features, we must also pass
# the unstable cfg to RustDoc
RUSTDOCFLAGS: --cfg tokio_unstable --cfg tokio_internal_mt_counters

miri:
name: miri
needs: basics
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/loom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
run: cargo test --lib --release --features full -- --nocapture $SCOPE
working-directory: tokio
env:
RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings
RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings -C debug-assertions
LOOM_MAX_PREEMPTIONS: ${{ matrix.max_preemptions }}
LOOM_MAX_BRANCHES: 10000
SCOPE: ${{ matrix.scope }}
11 changes: 0 additions & 11 deletions tokio/src/runtime/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,6 @@ cfg_rt_multi_thread! {
pub(crate) fn set(budget: Budget) {
let _ = context::budget(|cell| cell.set(budget));
}

/// Consume one unit of progress from the current task's budget.
pub(crate) fn consume_one() {
let _ = context::budget(|cell| {
let mut budget = cell.get();
if let Some(ref mut counter) = budget.0 {
*counter = counter.saturating_sub(1);
}
cell.set(budget);
});
}
}

cfg_rt! {
Expand Down
16 changes: 16 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@ mod imp {
static NUM_MAINTENANCE: AtomicUsize = AtomicUsize::new(0);
static NUM_NOTIFY_LOCAL: AtomicUsize = AtomicUsize::new(0);
static NUM_UNPARKS_LOCAL: AtomicUsize = AtomicUsize::new(0);
static NUM_LIFO_SCHEDULES: AtomicUsize = AtomicUsize::new(0);
static NUM_LIFO_CAPPED: AtomicUsize = AtomicUsize::new(0);

impl Drop for super::Counters {
fn drop(&mut self) {
let notifies_local = NUM_NOTIFY_LOCAL.load(Relaxed);
let unparks_local = NUM_UNPARKS_LOCAL.load(Relaxed);
let maintenance = NUM_MAINTENANCE.load(Relaxed);
let lifo_scheds = NUM_LIFO_SCHEDULES.load(Relaxed);
let lifo_capped = NUM_LIFO_CAPPED.load(Relaxed);

println!("---");
println!("notifies (local): {}", notifies_local);
println!(" unparks (local): {}", unparks_local);
println!(" maintenance: {}", maintenance);
println!(" LIFO schedules: {}", lifo_scheds);
println!(" LIFO capped: {}", lifo_capped);
}
}

Expand All @@ -31,13 +37,23 @@ mod imp {
pub(crate) fn inc_num_maintenance() {
NUM_MAINTENANCE.fetch_add(1, Relaxed);
}

pub(crate) fn inc_lifo_schedules() {
NUM_LIFO_SCHEDULES.fetch_add(1, Relaxed);
}

pub(crate) fn inc_lifo_capped() {
NUM_LIFO_CAPPED.fetch_add(1, Relaxed);
}
}

#[cfg(not(tokio_internal_mt_counters))]
mod imp {
pub(crate) fn inc_num_inc_notify_local() {}
pub(crate) fn inc_num_unparks_local() {}
pub(crate) fn inc_num_maintenance() {}
pub(crate) fn inc_lifo_schedules() {}
pub(crate) fn inc_lifo_capped() {}
}

#[derive(Debug)]
Expand Down
90 changes: 72 additions & 18 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,14 @@ struct Core {
/// When a task is scheduled from a worker, it is stored in this slot. The
/// worker will check this slot for a task **before** checking the run
/// queue. This effectively results in the **last** scheduled task to be run
/// next (LIFO). This is an optimization for message passing patterns and
/// helps to reduce latency.
/// next (LIFO). This is an optimization for improving locality which
/// benefits message passing patterns and helps to reduce latency.
lifo_slot: Option<Notified>,

/// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
/// they go to the back of the `run_queue`.
lifo_enabled: bool,

/// The worker-local run queue.
run_queue: queue::Local<Arc<Handle>>,

Expand Down Expand Up @@ -191,6 +195,12 @@ type Notified = task::Notified<Arc<Handle>>;
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);

/// Value picked out of thin-air. Running the LIFO slot a handful of times
/// seemms sufficient to benefit from locality. More than 3 times probably is
/// overweighing. The value can be tuned in the future with data that shows
/// improvements.
const MAX_LIFO_POLLS_PER_TICK: usize = 3;

pub(super) fn create(
size: usize,
park: Parker,
Expand All @@ -214,6 +224,7 @@ pub(super) fn create(
cores.push(Box::new(Core {
tick: 0,
lifo_slot: None,
lifo_enabled: !config.disable_lifo_slot,
run_queue,
is_searching: false,
is_shutdown: false,
Expand Down Expand Up @@ -422,7 +433,13 @@ fn run(worker: Arc<Worker>) {

impl Context {
fn run(&self, mut core: Box<Core>) -> RunResult {
// Reset `lifo_enabled` here in case the core was previously stolen from
// a task that had the LIFO slot disabled.
self.reset_lifo_enabled(&mut core);

while !core.is_shutdown {
self.assert_lifo_enabled_is_correct(&core);

// Increment the tick
core.tick();

Expand Down Expand Up @@ -463,13 +480,16 @@ impl Context {
// another idle worker to try to steal work.
core.transition_from_searching(&self.worker);

self.assert_lifo_enabled_is_correct(&core);

// Make the core available to the runtime context
core.metrics.start_poll();
*self.core.borrow_mut() = Some(core);

// Run the task
coop::budget(|| {
task.run();
let mut lifo_polls = 0;

// As long as there is budget remaining and a task exists in the
// `lifo_slot`, then keep running.
Expand All @@ -478,7 +498,12 @@ impl Context {
// by another worker.
let mut core = match self.core.borrow_mut().take() {
Some(core) => core,
None => return Err(()),
None => {
// In this case, we cannot call `reset_lifo_enabled()`
// because the core was stolen. The stealer will handle
// that at the top of `Context::run`
return Err(());
}
};

// If task poll times is enabled, measure the poll time. Note
Expand All @@ -491,35 +516,62 @@ impl Context {
// Check for a task in the LIFO slot
let task = match core.lifo_slot.take() {
Some(task) => task,
None => return Ok(core),
None => {
self.reset_lifo_enabled(&mut core);
return Ok(core);
}
};

// Polling a task doesn't necessarily consume any budget, if it
// doesn't use any Tokio leaf futures. To prevent such tasks
// from using the lifo slot in an infinite loop, we consume an
// extra unit of budget between each iteration of the loop.
coop::consume_one();

if coop::has_budget_remaining() {
// Run the LIFO task, then loop
core.metrics.start_poll();
*self.core.borrow_mut() = Some(core);
let task = self.worker.handle.shared.owned.assert_owner(task);
task.run();
} else {
if !coop::has_budget_remaining() {
// Not enough budget left to run the LIFO task, push it to
// the back of the queue and return.
core.run_queue.push_back_or_overflow(
task,
self.worker.inject(),
&mut core.metrics,
);
// If we hit this point, the LIFO slot should be enabled.
// There is no need to reset it.
debug_assert!(core.lifo_enabled);
return Ok(core);
}

// Track that we are about to run a task from the LIFO slot.
lifo_polls += 1;
super::counters::inc_lifo_schedules();

// Disable the LIFO slot if we reach our limit
//
// In ping-ping style workloads where task A notifies task B,
// which notifies task A again, continuously prioritizing the
// LIFO slot can cause starvation as these two tasks will
// repeatedly schedule the other. To mitigate this, we limit the
// number of times the LIFO slot is prioritized.
if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
core.lifo_enabled = false;
super::counters::inc_lifo_capped();
}

// Run the LIFO task, then loop
core.metrics.start_poll();
*self.core.borrow_mut() = Some(core);
let task = self.worker.handle.shared.owned.assert_owner(task);
task.run();
}
})
}

fn reset_lifo_enabled(&self, core: &mut Core) {
core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
}

fn assert_lifo_enabled_is_correct(&self, core: &Core) {
debug_assert_eq!(
core.lifo_enabled,
!self.worker.handle.shared.config.disable_lifo_slot
);
}

fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
if core.tick % self.worker.handle.shared.config.event_interval == 0 {
super::counters::inc_num_maintenance();
Expand Down Expand Up @@ -573,6 +625,8 @@ impl Context {
}

fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
self.assert_lifo_enabled_is_correct(&core);

// Take the parker out of core
let mut park = core.park.take().expect("park missing");

Expand Down Expand Up @@ -840,7 +894,7 @@ impl Handle {
// task must always be pushed to the back of the queue, enabling other
// tasks to be executed. If **not** a yield, then there is more
// flexibility and the task may go to the front of the queue.
let should_notify = if is_yield || self.shared.config.disable_lifo_slot {
let should_notify = if is_yield || !core.lifo_enabled {
core.run_queue
.push_back_or_overflow(task, &self.shared.inject, &mut core.metrics);
true
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ cfg_loom! {
mod loom_shutdown_join;
mod loom_join_set;
mod loom_yield;

// Make sure debug assertions are enabled
#[cfg(not(debug_assertions))]
compiler_error!("these tests require debug assertions to be enabled");
}

cfg_not_loom! {
Expand Down

0 comments on commit 9eb3f5b

Please sign in to comment.