From 9e0275977981356fc5618b2061cdac8eb39a9569 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 15 Sep 2022 16:31:55 -0700 Subject: [PATCH] rt: create driver::Handle struct (#5018) Move all individual driver handles into a single `driver::Handle` struct. --- tokio/src/runtime/builder.rs | 14 +-- tokio/src/runtime/context.rs | 22 ++++- tokio/src/runtime/driver.rs | 128 ++++++++++++++------------- tokio/src/runtime/handle.rs | 34 +------ tokio/src/runtime/metrics/runtime.rs | 3 +- tokio/src/runtime/time/entry.rs | 4 +- tokio/src/runtime/time/tests/mod.rs | 24 +++-- 7 files changed, 114 insertions(+), 115 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 45f40033782..5be8e4aeedc 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -835,7 +835,7 @@ impl Builder { use crate::runtime::{Config, CurrentThread, HandleInner, Scheduler}; use std::sync::Arc; - let (driver, resources) = driver::Driver::new(self.get_cfg())?; + let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; // Blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); @@ -861,10 +861,7 @@ impl Builder { let inner = Arc::new(HandleInner { spawner, - io_handle: resources.io_handle, - time_handle: resources.time_handle, - signal_handle: resources.signal_handle, - clock: resources.clock, + driver: driver_handle, blocking_spawner, }); @@ -957,7 +954,7 @@ cfg_rt_multi_thread! { let core_threads = self.worker_threads.unwrap_or_else(num_cpus); - let (driver, resources) = driver::Driver::new(self.get_cfg())?; + let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; // Create the blocking pool let blocking_pool = @@ -981,10 +978,7 @@ cfg_rt_multi_thread! { let inner = Arc::new(HandleInner { spawner, - io_handle: resources.io_handle, - time_handle: resources.time_handle, - signal_handle: resources.signal_handle, - clock: resources.clock, + driver: driver_handle, blocking_spawner, }); diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 23afb2bcbd8..e0f8a384ae6 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -28,7 +28,12 @@ cfg_io_driver! { pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle { match CONTEXT.try_with(|ctx| { let ctx = ctx.borrow(); - ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().io_handle.clone() + ctx.as_ref() + .expect(crate::util::error::CONTEXT_MISSING_ERROR) + .as_inner() + .driver + .io + .clone() }) { Ok(io_handle) => io_handle, Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), @@ -41,7 +46,12 @@ cfg_signal_internal! { pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle { match CONTEXT.try_with(|ctx| { let ctx = ctx.borrow(); - ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().signal_handle.clone() + ctx.as_ref() + .expect(crate::util::error::CONTEXT_MISSING_ERROR) + .as_inner() + .driver + .signal + .clone() }) { Ok(signal_handle) => signal_handle, Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), @@ -52,7 +62,13 @@ cfg_signal_internal! { cfg_time! { cfg_test_util! { pub(crate) fn clock() -> Option { - match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.as_inner().clock.clone())) { + match CONTEXT.try_with(|ctx| { + let ctx = ctx.borrow(); + ctx.as_ref() + .map(|ctx| { + ctx.as_inner().driver.clock.clone() + }) + }) { Ok(clock) => clock, Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), } diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 7715d70304a..5e739aebe6b 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -9,6 +9,74 @@ use crate::park::thread::{ParkThread, UnparkThread}; use std::io; use std::time::Duration; +#[derive(Debug)] +pub(crate) struct Driver { + inner: TimeDriver, +} + +#[derive(Debug)] +pub(crate) struct Handle { + /// IO driver handle + pub(crate) io: IoHandle, + + /// Signal driver handle + #[cfg_attr(any(not(unix), loom), allow(dead_code))] + pub(crate) signal: SignalHandle, + + /// Time driver handle + pub(crate) time: TimeHandle, + + /// Source of `Instant::now()` + #[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))] + pub(crate) clock: Clock, +} + +pub(crate) struct Cfg { + pub(crate) enable_io: bool, + pub(crate) enable_time: bool, + pub(crate) enable_pause_time: bool, + pub(crate) start_paused: bool, +} + +pub(crate) type Unpark = TimerUnpark; + +impl Driver { + pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> { + let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?; + + let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); + + let (time_driver, time_handle) = + create_time_driver(cfg.enable_time, io_stack, clock.clone()); + + Ok(( + Self { inner: time_driver }, + Handle { + io: io_handle, + signal: signal_handle, + time: time_handle, + clock, + }, + )) + } + + pub(crate) fn unpark(&self) -> TimerUnpark { + self.inner.unpark() + } + + pub(crate) fn park(&mut self) { + self.inner.park() + } + + pub(crate) fn park_timeout(&mut self, duration: Duration) { + self.inner.park_timeout(duration) + } + + pub(crate) fn shutdown(&mut self) { + self.inner.shutdown() + } +} + // ===== io driver ===== cfg_io_driver! { @@ -266,63 +334,3 @@ cfg_not_time! { (io_stack, ()) } } - -// ===== runtime driver ===== - -#[derive(Debug)] -pub(crate) struct Driver { - inner: TimeDriver, -} - -pub(crate) type Unpark = TimerUnpark; - -pub(crate) struct Resources { - pub(crate) io_handle: IoHandle, - pub(crate) signal_handle: SignalHandle, - pub(crate) time_handle: TimeHandle, - pub(crate) clock: Clock, -} - -pub(crate) struct Cfg { - pub(crate) enable_io: bool, - pub(crate) enable_time: bool, - pub(crate) enable_pause_time: bool, - pub(crate) start_paused: bool, -} - -impl Driver { - pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> { - let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?; - - let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); - - let (time_driver, time_handle) = - create_time_driver(cfg.enable_time, io_stack, clock.clone()); - - Ok(( - Self { inner: time_driver }, - Resources { - io_handle, - signal_handle, - time_handle, - clock, - }, - )) - } - - pub(crate) fn unpark(&self) -> TimerUnpark { - self.inner.unpark() - } - - pub(crate) fn park(&mut self) { - self.inner.park() - } - - pub(crate) fn park_timeout(&mut self, duration: Duration) { - self.inner.park_timeout(duration) - } - - pub(crate) fn shutdown(&mut self) { - self.inner.shutdown() - } -} diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 0e1712b25cd..e10b07c3d9a 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -25,35 +25,9 @@ pub(crate) struct HandleInner { #[cfg(feature = "rt")] pub(super) spawner: Spawner, - /// Handles to the I/O drivers - #[cfg_attr( - not(any( - feature = "net", - all(unix, feature = "process"), - all(unix, feature = "signal"), - )), - allow(dead_code) - )] - pub(super) io_handle: driver::IoHandle, - - /// Handles to the signal drivers - #[cfg_attr( - any( - loom, - not(all(unix, feature = "signal")), - not(all(unix, feature = "process")), - ), - allow(dead_code) - )] - pub(super) signal_handle: driver::SignalHandle, - - /// Handles to the time drivers - #[cfg_attr(not(feature = "time"), allow(dead_code))] - pub(super) time_handle: driver::TimeHandle, - - /// Source of `Instant::now()` - #[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))] - pub(super) clock: driver::Clock, + /// Resource driver handles + #[cfg_attr(not(feature = "full"), allow(dead_code))] + pub(super) driver: driver::Handle, /// Blocking pool spawner #[cfg(feature = "rt")] @@ -414,7 +388,7 @@ cfg_time! { impl Handle { #[track_caller] pub(crate) fn as_time_handle(&self) -> &crate::runtime::time::Handle { - self.inner.time_handle.as_ref() + self.inner.driver.time.as_ref() .expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.") } } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 3d3c0cb0d4b..c3ebff369ab 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -535,7 +535,8 @@ cfg_net! { // thus this breaks that guarantee. self.handle .as_inner() - .io_handle + .driver + .io .as_ref() .map(|h| f(h.metrics())) .unwrap_or(0) diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 12ff1202ff2..f4fcd254fc7 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -550,7 +550,7 @@ impl TimerEntry { unsafe { self.driver() - .reregister(&self.driver.inner.io_handle, tick, self.inner().into()); + .reregister(&self.driver.inner.driver.io, tick, self.inner().into()); } } @@ -573,7 +573,7 @@ impl TimerEntry { fn driver(&self) -> &super::Handle { // At this point, we know the time_handle is Some. - self.driver.inner.time_handle.as_ref().unwrap() + self.driver.inner.driver.time.as_ref().unwrap() } } diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index 4ae531b9185..3d62c753a06 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -48,8 +48,10 @@ fn single_timer() { let handle_ = handle.clone(); let jh = thread::spawn(move || { - let entry = - TimerEntry::new(&handle_, handle_.inner.clock.now() + Duration::from_secs(1)); + let entry = TimerEntry::new( + &handle_, + handle_.inner.driver.clock.now() + Duration::from_secs(1), + ); pin!(entry); block_on(futures::future::poll_fn(|cx| { @@ -79,8 +81,10 @@ fn drop_timer() { let handle_ = handle.clone(); let jh = thread::spawn(move || { - let entry = - TimerEntry::new(&handle_, handle_.inner.clock.now() + Duration::from_secs(1)); + let entry = TimerEntry::new( + &handle_, + handle_.inner.driver.clock.now() + Duration::from_secs(1), + ); pin!(entry); let _ = entry @@ -110,8 +114,10 @@ fn change_waker() { let handle_ = handle.clone(); let jh = thread::spawn(move || { - let entry = - TimerEntry::new(&handle_, handle_.inner.clock.now() + Duration::from_secs(1)); + let entry = TimerEntry::new( + &handle_, + handle_.inner.driver.clock.now() + Duration::from_secs(1), + ); pin!(entry); let _ = entry @@ -145,7 +151,7 @@ fn reset_future() { let handle_ = handle.clone(); let finished_early_ = finished_early.clone(); - let start = handle.inner.clock.now(); + let start = handle.inner.driver.clock.now(); let jh = thread::spawn(move || { let entry = TimerEntry::new(&handle_, start + Duration::from_secs(1)); @@ -211,7 +217,7 @@ fn poll_process_levels() { for i in 0..normal_or_miri(1024, 64) { let mut entry = Box::pin(TimerEntry::new( &handle, - handle.inner.clock.now() + Duration::from_millis(i), + handle.inner.driver.clock.now() + Duration::from_millis(i), )); let _ = entry @@ -245,7 +251,7 @@ fn poll_process_levels_targeted() { let e1 = TimerEntry::new( &handle, - handle.inner.clock.now() + Duration::from_millis(193), + handle.inner.driver.clock.now() + Duration::from_millis(193), ); pin!(e1);