Skip to content

Commit

Permalink
rt: use task::Inject with current_thread scheduler (#5702)
Browse files Browse the repository at this point in the history
Previously, the current_thread scheduler used its own injection queue
instead of sharing the same one as the multi-threaded scheduler. This
patch updates the current_thread scheduler to use the same injection
queue as the multi-threaded one (`task::Inject`).

`task::Inject` includes an optimization where it does not need to
acquire the mutex if the queue is empty.
  • Loading branch information
carllerche authored May 21, 2023
1 parent ddd7250 commit 93bde08
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 139 deletions.
5 changes: 5 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ name = "sync_watch"
path = "sync_watch.rs"
harness = false

[[bench]]
name = "rt_current_thread"
path = "rt_current_thread.rs"
harness = false

[[bench]]
name = "rt_multi_threaded"
path = "rt_multi_threaded.rs"
Expand Down
83 changes: 83 additions & 0 deletions benches/rt_current_thread.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//! Benchmark implementation details of the threaded scheduler. These benches are
//! intended to be used as a form of regression testing and not as a general
//! purpose benchmark demonstrating real-world performance.
use tokio::runtime::{self, Runtime};

use bencher::{benchmark_group, benchmark_main, Bencher};

const NUM_SPAWN: usize = 1_000;

fn spawn_many_local(b: &mut Bencher) {
let rt = rt();
let mut handles = Vec::with_capacity(NUM_SPAWN);

b.iter(|| {
rt.block_on(async {
for _ in 0..NUM_SPAWN {
handles.push(tokio::spawn(async move {}));
}

for handle in handles.drain(..) {
handle.await.unwrap();
}
});
});
}

fn spawn_many_remote_idle(b: &mut Bencher) {
let rt = rt();
let rt_handle = rt.handle();
let mut handles = Vec::with_capacity(NUM_SPAWN);

b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}

rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
}
});
});
}

fn spawn_many_remote_busy(b: &mut Bencher) {
let rt = rt();
let rt_handle = rt.handle();
let mut handles = Vec::with_capacity(NUM_SPAWN);

rt.spawn(async {
fn iter() {
tokio::spawn(async { iter() });
}

iter()
});

b.iter(|| {
for _ in 0..NUM_SPAWN {
handles.push(rt_handle.spawn(async {}));
}

rt.block_on(async {
for handle in handles.drain(..) {
handle.await.unwrap();
}
});
});
}

fn rt() -> Runtime {
runtime::Builder::new_current_thread().build().unwrap()
}

benchmark_group!(
scheduler,
spawn_many_local,
spawn_many_remote_idle,
spawn_many_remote_busy
);

benchmark_main!(scheduler);
66 changes: 24 additions & 42 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::{Arc, Mutex};
use crate::loom::sync::Arc;
use crate::runtime::driver::{self, Driver};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::task::{self, Inject, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::{blocking, context, scheduler, Config};
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::sync::notify::Notify;
Expand Down Expand Up @@ -66,8 +66,8 @@ struct Core {

/// Scheduler state shared between threads.
struct Shared {
/// Remote run queue. None if the `Runtime` has been dropped.
queue: Mutex<Option<VecDeque<Notified>>>,
/// Remote run queue
inject: Inject<Arc<Handle>>,

/// Collection of all active tasks spawned onto this executor.
owned: OwnedTasks<Arc<Handle>>,
Expand Down Expand Up @@ -115,7 +115,7 @@ impl CurrentThread {

let handle = Arc::new(Handle {
shared: Shared {
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
inject: Inject::new(),
owned: OwnedTasks::new(),
woken: AtomicBool::new(false),
config,
Expand Down Expand Up @@ -217,15 +217,12 @@ impl CurrentThread {
drop(task);
}

// Drain remote queue and set it to None
let remote_queue = handle.shared.queue.lock().take();
// Close the injection queue
handle.shared.inject.close();

// Using `Option::take` to replace the shared queue with `None`.
// We already shut down every task, so we just need to drop the task.
if let Some(remote_queue) = remote_queue {
for task in remote_queue {
drop(task);
}
// Drain remote queue
while let Some(task) = handle.shared.inject.pop() {
drop(task);
}

assert!(handle.shared.owned.is_empty());
Expand Down Expand Up @@ -259,9 +256,12 @@ impl Core {

fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
if self.tick % handle.shared.config.global_queue_interval == 0 {
handle.pop().or_else(|| self.next_local_task(handle))
handle
.next_remote_task()
.or_else(|| self.next_local_task(handle))
} else {
self.next_local_task(handle).or_else(|| handle.pop())
self.next_local_task(handle)
.or_else(|| handle.next_remote_task())
}
}

Expand Down Expand Up @@ -440,14 +440,11 @@ impl Handle {
};
let local = &mut core.tasks;

let mut injection = self.shared.queue.lock();
let injection = if let Some(injection) = injection.as_mut() {
injection
} else {
if self.shared.inject.is_closed() {
return;
};
}

traces = trace_current_thread(&self.shared.owned, local, injection)
traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
.into_iter()
.map(dump::Task::new)
.collect();
Expand All @@ -461,11 +458,8 @@ impl Handle {
dump::Dump::new(traces)
}

fn pop(&self) -> Option<Notified> {
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
None => None,
}
fn next_remote_task(&self) -> Option<Notified> {
self.shared.inject.pop()
}

fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
Expand All @@ -488,14 +482,7 @@ cfg_metrics! {
}

pub(crate) fn injection_queue_depth(&self) -> usize {
// TODO: avoid having to lock. The multi-threaded injection queue
// could probably be used here.
self.shared
.queue
.lock()
.as_ref()
.map(|queue| queue.len())
.unwrap_or(0)
self.shared.inject.len()
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
Expand Down Expand Up @@ -549,14 +536,9 @@ impl Schedule for Arc<Handle> {
// Track that a task was scheduled from **outside** of the runtime.
self.shared.scheduler_metrics.inc_remote_schedule_count();

// If the queue is None, then the runtime has shut down. We
// don't need to do anything with the notification in that case.
let mut guard = self.shared.queue.lock();
if let Some(queue) = guard.as_mut() {
queue.push_back(task);
drop(guard);
self.driver.unpark();
}
// Schedule the task
self.shared.inject.push(task);
self.driver.unpark();
}
});
}
Expand Down
10 changes: 3 additions & 7 deletions tokio/src/runtime/task/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,11 @@ impl<T: Future, S: Schedule> Core<T, S> {
}
}

cfg_rt_multi_thread! {
impl Header {
pub(super) unsafe fn set_next(&self, next: Option<NonNull<Header>>) {
self.queue_next.with_mut(|ptr| *ptr = next);
}
impl Header {
pub(super) unsafe fn set_next(&self, next: Option<NonNull<Header>>) {
self.queue_next.with_mut(|ptr| *ptr = next);
}
}

impl Header {
// safety: The caller must guarantee exclusive access to this field, and
// must ensure that the id is either 0 or the id of the OwnedTasks
// containing this task.
Expand Down
Loading

0 comments on commit 93bde08

Please sign in to comment.