Skip to content

Commit

Permalink
task: surface spawn_blocking errors through the Builder
Browse files Browse the repository at this point in the history
Using `tokio::task::spawn_blocking` continues to exhibit the previous
behavior (panic if there aren't any worker threads available to accept
the task, but return a dummy handle if the runtime is shutting down)
  • Loading branch information
ipetkov committed Jul 11, 2022
1 parent 5e65b7c commit b259dd7
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 15 deletions.
2 changes: 1 addition & 1 deletion tokio/src/runtime/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! compilation.

mod pool;
pub(crate) use pool::{spawn_blocking, BlockingPool, Mandatory, Spawner, Task};
pub(crate) use pool::{spawn_blocking, BlockingPool, Mandatory, SpawnError, Spawner, Task};

cfg_fs! {
pub(crate) use pool::spawn_mandatory_blocking;
Expand Down
27 changes: 24 additions & 3 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::runtime::{Builder, Callback, ToHandle};

use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::io;
use std::time::Duration;

pub(crate) struct BlockingPool {
Expand Down Expand Up @@ -82,6 +83,26 @@ pub(crate) enum Mandatory {
NonMandatory,
}

pub(crate) enum SpawnError {
/// Pool is shutting down and the task was not scheduled
ShuttingDown,
/// There are no worker threads available to take the task
/// and the OS failed to spawn a new one
NoThreads(io::Error),
}

#[allow(clippy::from_over_into)] // Orphan rules
impl Into<io::Error> for SpawnError {
fn into(self) -> io::Error {
match self {
Self::ShuttingDown => {
io::Error::new(io::ErrorKind::Other, "blocking pool shutting down")
}
Self::NoThreads(e) => e,
}
}
}

impl Task {
pub(crate) fn new(task: task::UnownedTask<NoopSchedule>, mandatory: Mandatory) -> Task {
Task { task, mandatory }
Expand Down Expand Up @@ -220,7 +241,7 @@ impl fmt::Debug for BlockingPool {
// ===== impl Spawner =====

impl Spawner {
pub(crate) fn spawn(&self, task: Task, rt: &dyn ToHandle) -> Result<(), ()> {
pub(crate) fn spawn(&self, task: Task, rt: &dyn ToHandle) -> Result<(), SpawnError> {
let mut shared = self.inner.shared.lock();

if shared.shutdown {
Expand All @@ -230,7 +251,7 @@ impl Spawner {
task.task.shutdown();

// no need to even push this task; it would never get picked up
return Err(());
return Err(SpawnError::ShuttingDown);
}

shared.queue.push_back(task);
Expand Down Expand Up @@ -261,7 +282,7 @@ impl Spawner {
Err(e) => {
// The OS refused to spawn the thread and there is no thread
// to pick up the task that has just been pushed to the queue.
panic!("OS can't spawn worker thread: {}", e)
return Err(SpawnError::NoThreads(e));
}
}
}
Expand Down
19 changes: 13 additions & 6 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,22 @@ impl HandleInner {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (join_handle, _was_spawned) = if cfg!(debug_assertions)
let (join_handle, spawn_result) = if cfg!(debug_assertions)
&& std::mem::size_of::<F>() > 2048
{
self.spawn_blocking_inner(Box::new(func), blocking::Mandatory::NonMandatory, None, rt)
} else {
self.spawn_blocking_inner(func, blocking::Mandatory::NonMandatory, None, rt)
};

join_handle
match spawn_result {
Ok(()) => join_handle,
// Compat: do not panic here, return the join_handle even though it will never resolve
Err(blocking::SpawnError::ShuttingDown) => join_handle,
Err(blocking::SpawnError::NoThreads(e)) => {
panic!("OS can't spawn worker thread: {}", e)
}
}
}

cfg_fs! {
Expand All @@ -363,7 +370,7 @@ impl HandleInner {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (join_handle, was_spawned) = if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 {
let (join_handle, spawn_result) = if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 {
self.spawn_blocking_inner(
Box::new(func),
blocking::Mandatory::Mandatory,
Expand All @@ -379,7 +386,7 @@ impl HandleInner {
)
};

if was_spawned {
if spawn_result.is_ok() {
Some(join_handle)
} else {
None
Expand All @@ -394,7 +401,7 @@ impl HandleInner {
is_mandatory: blocking::Mandatory,
name: Option<&str>,
rt: &dyn ToHandle,
) -> (JoinHandle<R>, bool)
) -> (JoinHandle<R>, Result<(), blocking::SpawnError>)
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
Expand Down Expand Up @@ -424,7 +431,7 @@ impl HandleInner {
let spawned = self
.blocking_spawner
.spawn(blocking::Task::new(task, is_mandatory), rt);
(handle, spawned.is_ok())
(handle, spawned)
}
}

Expand Down
21 changes: 16 additions & 5 deletions tokio/src/task/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ impl<'a> Builder<'a> {
/// [runtime handle]: crate::runtime::Handle
/// [`Handle::spawn`]: crate::runtime::Handle::spawn
#[track_caller]
pub fn spawn_on<Fut>(&mut self, future: Fut, handle: &Handle) -> io::Result<JoinHandle<Fut::Output>>
pub fn spawn_on<Fut>(
&mut self,
future: Fut,
handle: &Handle,
) -> io::Result<JoinHandle<Fut::Output>>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
Expand Down Expand Up @@ -138,7 +142,11 @@ impl<'a> Builder<'a> {
/// [`LocalSet::spawn_local`]: crate::task::LocalSet::spawn_local
/// [`LocalSet`]: crate::task::LocalSet
#[track_caller]
pub fn spawn_local_on<Fut>(self, future: Fut, local_set: &LocalSet) -> io::Result<JoinHandle<Fut::Output>>
pub fn spawn_local_on<Fut>(
self,
future: Fut,
local_set: &LocalSet,
) -> io::Result<JoinHandle<Fut::Output>>
where
Fut: Future + 'static,
Fut::Output: 'static,
Expand All @@ -155,7 +163,10 @@ impl<'a> Builder<'a> {
/// See [`task::spawn_blocking`](crate::task::spawn_blocking)
/// for more details.
#[track_caller]
pub fn spawn_blocking<Function, Output>(self, function: Function) -> io::Result<JoinHandle<Output>>
pub fn spawn_blocking<Function, Output>(
self,
function: Function,
) -> io::Result<JoinHandle<Output>>
where
Function: FnOnce() -> Output + Send + 'static,
Output: Send + 'static,
Expand All @@ -180,13 +191,13 @@ impl<'a> Builder<'a> {
Output: Send + 'static,
{
use crate::runtime::Mandatory;
let (join_handle, _was_spawned) = handle.as_inner().spawn_blocking_inner(
let (join_handle, spawn_result) = handle.as_inner().spawn_blocking_inner(
function,
Mandatory::NonMandatory,
self.name,
handle,
);

Ok(join_handle)
spawn_result.map(|()| join_handle).map_err(Into::into)
}
}

0 comments on commit b259dd7

Please sign in to comment.