Skip to content

Commit

Permalink
rt: rm internal Unpark types for Handle types (#5027)
Browse files Browse the repository at this point in the history
This patch removes all internal `Unpark` runtime types. The runtime now
uses the `Handle` types (`runtime::io::Handle` and
`runtime::time::Handle`) to signal threads to unpark.

Without separate `Unpark` types, future patches will be able to remove
more `Arc`s used inside various drivers.
  • Loading branch information
carllerche authored Sep 18, 2022
1 parent cba5c10 commit 0b92f80
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 152 deletions.
5 changes: 0 additions & 5 deletions tokio/src/process/unix/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//! Process driver.

use crate::process::unix::GlobalOrphanQueue;
use crate::runtime::io::Handle;
use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};

use std::time::Duration;
Expand All @@ -28,10 +27,6 @@ impl Driver {
}
}

pub(crate) fn unpark(&self) -> Handle {
self.park.unpark()
}

pub(crate) fn park(&mut self) {
self.park.park();
GlobalOrphanQueue::reap_orphans(&self.signal_handle);
Expand Down
73 changes: 28 additions & 45 deletions tokio/src/runtime/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ pub(crate) struct Cfg {
pub(crate) start_paused: bool,
}

pub(crate) type Unpark = TimerUnpark;

impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;
Expand All @@ -60,10 +58,6 @@ impl Driver {
))
}

pub(crate) fn unpark(&self) -> TimerUnpark {
self.inner.unpark()
}

pub(crate) fn park(&mut self) {
self.inner.park()
}
Expand All @@ -77,11 +71,21 @@ impl Driver {
}
}

impl Handle {
pub(crate) fn unpark(&self) {
#[cfg(feature = "time")]
if let Some(handle) = &self.time {
handle.unpark();
}

self.io.unpark();
}
}

// ===== io driver =====

cfg_io_driver! {
pub(crate) type IoDriver = crate::runtime::io::Driver;
pub(crate) type IoHandle = IoUnpark;

#[derive(Debug)]
pub(crate) enum IoStack {
Expand All @@ -90,7 +94,7 @@ cfg_io_driver! {
}

#[derive(Debug, Clone)]
pub(crate) enum IoUnpark {
pub(crate) enum IoHandle {
Enabled(crate::runtime::io::Handle),
Disabled(UnparkThread),
}
Expand All @@ -106,23 +110,25 @@ cfg_io_driver! {
let (signal_driver, signal_handle) = create_signal_driver(io_driver)?;
let process_driver = create_process_driver(signal_driver);

(IoStack::Enabled(process_driver), IoUnpark::Enabled(io_handle), signal_handle)
(IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), signal_handle)
} else {
let park_thread = ParkThread::new();
let unpark_thread = park_thread.unpark();
(IoStack::Disabled(park_thread), IoUnpark::Disabled(unpark_thread), Default::default())
(IoStack::Disabled(park_thread), IoHandle::Disabled(unpark_thread), Default::default())
};

Ok(ret)
}

impl IoStack {
pub(crate) fn unpark(&self) -> IoUnpark {
/*
pub(crate) fn handle(&self) -> IoHandle {
match self {
IoStack::Enabled(v) => IoUnpark::Enabled(v.unpark()),
IoStack::Disabled(v) => IoUnpark::Disabled(v.unpark()),
IoStack::Enabled(v) => IoHandle::Enabled(v.handle()),
IoStack::Disabled(v) => IoHandle::Disabled(v.unpark()),
}
}
}]
*/

pub(crate) fn park(&mut self) {
match self {
Expand All @@ -146,37 +152,36 @@ cfg_io_driver! {
}
}

impl IoUnpark {
impl IoHandle {
pub(crate) fn unpark(&self) {
match self {
IoUnpark::Enabled(v) => v.unpark(),
IoUnpark::Disabled(v) => v.unpark(),
IoHandle::Enabled(handle) => handle.unpark(),
IoHandle::Disabled(handle) => handle.unpark(),
}
}

#[track_caller]
pub(crate) fn expect(self, msg: &'static str) -> crate::runtime::io::Handle {
match self {
IoUnpark::Enabled(v) => v,
IoUnpark::Disabled(..) => panic!("{}", msg),
IoHandle::Enabled(v) => v,
IoHandle::Disabled(..) => panic!("{}", msg),
}
}

cfg_unstable! {
pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> {
match self {
IoUnpark::Enabled(v) => Some(v),
IoUnpark::Disabled(..) => None,
IoHandle::Enabled(v) => Some(v),
IoHandle::Disabled(..) => None,
}
}
}
}
}

cfg_not_io_driver! {
pub(crate) type IoHandle = IoUnpark;
pub(crate) type IoHandle = UnparkThread;
pub(crate) type IoStack = ParkThread;
pub(crate) type IoUnpark = UnparkThread;

fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
let park_thread = ParkThread::new();
Expand Down Expand Up @@ -249,11 +254,6 @@ cfg_time! {
Disabled(IoStack),
}

pub(crate) enum TimerUnpark {
Enabled(crate::runtime::time::TimerUnpark),
Disabled(IoUnpark),
}

pub(crate) type Clock = crate::time::Clock;
pub(crate) type TimeHandle = Option<crate::runtime::time::Handle>;

Expand All @@ -276,13 +276,6 @@ cfg_time! {
}

impl TimeDriver {
pub(crate) fn unpark(&self) -> TimerUnpark {
match self {
TimeDriver::Enabled { driver, .. } => TimerUnpark::Enabled(driver.unpark()),
TimeDriver::Disabled(v) => TimerUnpark::Disabled(v.unpark()),
}
}

pub(crate) fn park(&mut self) {
match self {
TimeDriver::Enabled { driver, handle } => driver.park(handle),
Expand All @@ -304,20 +297,10 @@ cfg_time! {
}
}
}

impl TimerUnpark {
pub(crate) fn unpark(&self) {
match self {
TimerUnpark::Enabled(v) => v.unpark(),
TimerUnpark::Disabled(v) => v.unpark(),
}
}
}
}

cfg_not_time! {
type TimeDriver = IoStack;
type TimerUnpark = IoUnpark;

pub(crate) type Clock = ();
pub(crate) type TimeHandle = ();
Expand Down
5 changes: 0 additions & 5 deletions tokio/src/runtime/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,6 @@ impl Driver {
}
}

// TODO: remove this in a later refactor
pub(crate) fn unpark(&self) -> Handle {
self.handle()
}

pub(crate) fn park(&mut self) {
self.turn(None);
}
Expand Down
12 changes: 3 additions & 9 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::context::EnterGuard;
use crate::runtime::driver::{self, Driver, Unpark};
use crate::runtime::driver::{self, Driver};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::{blocking, Config};
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
Expand Down Expand Up @@ -82,9 +82,6 @@ struct Shared {
/// Collection of all active tasks spawned onto this executor.
owned: OwnedTasks<Arc<Handle>>,

/// Unpark the blocked thread.
unpark: Unpark,

/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,

Expand Down Expand Up @@ -122,13 +119,10 @@ impl CurrentThread {
seed_generator: RngSeedGenerator,
config: Config,
) -> CurrentThread {
let unpark = driver.unpark();

let handle = Arc::new(Handle {
shared: Shared {
queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
owned: OwnedTasks::new(),
unpark,
woken: AtomicBool::new(false),
config,
scheduler_metrics: SchedulerMetrics::new(),
Expand Down Expand Up @@ -467,7 +461,7 @@ impl Schedule for Arc<Handle> {
if let Some(queue) = guard.as_mut() {
queue.push_back(task);
drop(guard);
self.shared.unpark.unpark();
self.driver.unpark();
}
}
});
Expand Down Expand Up @@ -511,7 +505,7 @@ impl Wake for Handle {
/// Wake by reference
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.shared.woken.store(true, Release);
arc_self.shared.unpark.unpark();
arc_self.driver.unpark();
}
}

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Handle {
}

pub(crate) fn shutdown(&self) {
self.shared.close();
self.close();
}

pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output>
Expand All @@ -46,7 +46,7 @@ impl Handle {
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);

if let Some(notified) = notified {
me.shared.schedule(notified, false);
me.schedule_task(notified, false);
}

handle
Expand Down
2 changes: 0 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,11 @@ impl MultiThread {
seed_generator: RngSeedGenerator,
config: Config,
) -> (MultiThread, Launch) {
let driver_unpark = driver.unpark();
let parker = Parker::new(driver);
let (handle, launch) = worker::create(
size,
parker,
driver_handle,
driver_unpark,
blocking_spawner,
seed_generator,
config,
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/multi_thread/park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl Clone for Parker {
}

impl Unparker {
pub(crate) fn unpark(&self, driver: &driver::Unpark) {
pub(crate) fn unpark(&self, driver: &driver::Handle) {
self.inner.unpark(driver);
}
}
Expand Down Expand Up @@ -195,7 +195,7 @@ impl Inner {
}
}

fn unpark(&self, driver: &driver::Unpark) {
fn unpark(&self, driver: &driver::Handle) {
// To ensure the unparked thread will observe any writes we made before
// this call, we must perform a release operation that `park` can
// synchronize with. To do that we must write `NOTIFIED` even if `state`
Expand Down
Loading

0 comments on commit 0b92f80

Please sign in to comment.