-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rt: unhandled panic config for current thread rt #4518
Changes from 1 commit
798c971
def9bd1
b2cd498
c49365d
1d8f256
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,13 +57,30 @@ struct Core { | |
|
||
/// Metrics batch | ||
metrics: MetricsBatch, | ||
|
||
/// True if a task panicked without being handled and the runtime is | ||
/// configured to shutdown on unhandled panic. | ||
unhandled_panic: bool, | ||
} | ||
|
||
#[derive(Clone)] | ||
pub(crate) struct Spawner { | ||
shared: Arc<Shared>, | ||
} | ||
|
||
/// Configuration settings passed in from the runtime builder. | ||
pub(crate) struct Config { | ||
/// Callback for a worker parking itself | ||
pub(crate) before_park: Option<Callback>, | ||
|
||
/// Callback for a worker unparking itself | ||
pub(crate) after_unpark: Option<Callback>, | ||
|
||
#[cfg(tokio_unstable)] | ||
/// How to respond to unhandled task panics. | ||
pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, | ||
} | ||
|
||
/// Scheduler state shared between threads. | ||
struct Shared { | ||
/// Remote run queue. None if the `Runtime` has been dropped. | ||
|
@@ -78,11 +95,8 @@ struct Shared { | |
/// Indicates whether the blocked on thread was woken. | ||
woken: AtomicBool, | ||
|
||
/// Callback for a worker parking itself | ||
before_park: Option<Callback>, | ||
|
||
/// Callback for a worker unparking itself | ||
after_unpark: Option<Callback>, | ||
/// Scheduler configuration options | ||
config: Config, | ||
|
||
/// Keeps track of various runtime metrics. | ||
scheduler_metrics: SchedulerMetrics, | ||
|
@@ -117,11 +131,7 @@ const REMOTE_FIRST_INTERVAL: u8 = 31; | |
scoped_thread_local!(static CURRENT: Context); | ||
|
||
impl BasicScheduler { | ||
pub(crate) fn new( | ||
driver: Driver, | ||
before_park: Option<Callback>, | ||
after_unpark: Option<Callback>, | ||
) -> BasicScheduler { | ||
pub(crate) fn new(driver: Driver, config: Config) -> BasicScheduler { | ||
let unpark = driver.unpark(); | ||
|
||
let spawner = Spawner { | ||
|
@@ -130,8 +140,7 @@ impl BasicScheduler { | |
owned: OwnedTasks::new(), | ||
unpark, | ||
woken: AtomicBool::new(false), | ||
before_park, | ||
after_unpark, | ||
config, | ||
scheduler_metrics: SchedulerMetrics::new(), | ||
worker_metrics: WorkerMetrics::new(), | ||
}), | ||
|
@@ -143,6 +152,7 @@ impl BasicScheduler { | |
tick: 0, | ||
driver: Some(driver), | ||
metrics: MetricsBatch::new(), | ||
unhandled_panic: false, | ||
}))); | ||
|
||
BasicScheduler { | ||
|
@@ -157,6 +167,7 @@ impl BasicScheduler { | |
&self.spawner | ||
} | ||
|
||
#[track_caller] | ||
pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output { | ||
pin!(future); | ||
|
||
|
@@ -296,7 +307,7 @@ impl Context { | |
fn park(&self, mut core: Box<Core>) -> Box<Core> { | ||
let mut driver = core.driver.take().expect("driver missing"); | ||
|
||
if let Some(f) = &self.spawner.shared.before_park { | ||
if let Some(f) = &self.spawner.shared.config.before_park { | ||
// Incorrect lint, the closures are actually different types so `f` | ||
// cannot be passed as an argument to `enter`. | ||
#[allow(clippy::redundant_closure)] | ||
|
@@ -319,7 +330,7 @@ impl Context { | |
core.metrics.returned_from_park(); | ||
} | ||
|
||
if let Some(f) = &self.spawner.shared.after_unpark { | ||
if let Some(f) = &self.spawner.shared.config.after_unpark { | ||
// Incorrect lint, the closures are actually different types so `f` | ||
// cannot be passed as an argument to `enter`. | ||
#[allow(clippy::redundant_closure)] | ||
|
@@ -460,6 +471,35 @@ impl Schedule for Arc<Shared> { | |
} | ||
}); | ||
} | ||
|
||
cfg_unstable! { | ||
fn unhandled_panic(&self) { | ||
use crate::runtime::UnhandledPanic; | ||
|
||
match self.config.unhandled_panic { | ||
UnhandledPanic::Ignore => { | ||
// Do nothing | ||
} | ||
UnhandledPanic::ShutdownRuntime => { | ||
// This hook is only called from within the runtime, so | ||
// `CURRENT` should match with `&self`, i.e. there is no | ||
// opportunity for a nested scheduler to be called. | ||
CURRENT.with(|maybe_cx| match maybe_cx { | ||
Some(cx) if Arc::ptr_eq(self, &cx.spawner.shared) => { | ||
let mut core = cx.core.borrow_mut(); | ||
|
||
// If `None`, the runtime is shutting down, so there is no need to signal shutdown | ||
if let Some(core) = core.as_mut() { | ||
core.unhandled_panic = true; | ||
self.owned.close_and_shutdown_all(); | ||
} | ||
} | ||
_ => panic!("runtime core not set in CURRENT thread-local"), | ||
}) | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl Wake for Shared { | ||
|
@@ -484,8 +524,9 @@ struct CoreGuard<'a> { | |
} | ||
|
||
impl CoreGuard<'_> { | ||
#[track_caller] | ||
fn block_on<F: Future>(self, future: F) -> F::Output { | ||
self.enter(|mut core, context| { | ||
let ret = self.enter(|mut core, context| { | ||
let _enter = crate::runtime::enter(false); | ||
let waker = context.spawner.waker_ref(); | ||
let mut cx = std::task::Context::from_waker(&waker); | ||
|
@@ -501,11 +542,16 @@ impl CoreGuard<'_> { | |
core = c; | ||
|
||
if let Ready(v) = res { | ||
return (core, v); | ||
return (core, Some(v)); | ||
} | ||
} | ||
|
||
for _ in 0..MAX_TASKS_PER_TICK { | ||
// Make sure we didn't hit an unhandled_panic | ||
if core.unhandled_panic { | ||
return (core, None); | ||
} | ||
|
||
// Get and increment the current tick | ||
let tick = core.tick; | ||
core.tick = core.tick.wrapping_add(1); | ||
|
@@ -539,7 +585,15 @@ impl CoreGuard<'_> { | |
// pending I/O events. | ||
core = context.park_yield(core); | ||
} | ||
}) | ||
}); | ||
|
||
match ret { | ||
Some(ret) => ret, | ||
None => { | ||
// `block_on` panicked. | ||
panic!("a spawned task panicked and the runtime is configured to shutdown on unhandled panic"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's kind of a bummer that we just create a new panic here, rather than continuing the original panic... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, but I don't think its a big deal. The panic is still printed to stderr by the panic handler. |
||
} | ||
} | ||
} | ||
|
||
/// Enters the scheduler context. This sets the queue and other necessary | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -100,7 +100,7 @@ where | |
let header_ptr = self.header_ptr(); | ||
let waker_ref = waker_ref::<T, S>(&header_ptr); | ||
let cx = Context::from_waker(&*waker_ref); | ||
let res = poll_future(&self.core().stage, cx); | ||
let res = poll_future(&self.core().stage, &self.core().scheduler, cx); | ||
|
||
if res == Poll::Ready(()) { | ||
// The future completed. Move on to complete the task. | ||
|
@@ -450,7 +450,11 @@ fn cancel_task<T: Future>(stage: &CoreStage<T>) { | |
|
||
/// Polls the future. If the future completes, the output is written to the | ||
/// stage field. | ||
fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> { | ||
fn poll_future<T: Future, S: Schedule>( | ||
core: &CoreStage<T>, | ||
scheduler: &S, | ||
cx: Context<'_>, | ||
) -> Poll<()> { | ||
// Poll the future. | ||
let output = panic::catch_unwind(panic::AssertUnwindSafe(|| { | ||
struct Guard<'a, T: Future> { | ||
|
@@ -473,13 +477,20 @@ fn poll_future<T: Future>(core: &CoreStage<T>, cx: Context<'_>) -> Poll<()> { | |
let output = match output { | ||
Ok(Poll::Pending) => return Poll::Pending, | ||
Ok(Poll::Ready(output)) => Ok(output), | ||
Err(panic) => Err(JoinError::panic(panic)), | ||
Err(panic) => { | ||
scheduler.unhandled_panic(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's only unhandled if the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "unhandled" refers to unhandled by the spawn task. As per #4516, two separate behaviors are to always shutdown if the task panics or to only shutdown if the JoinHandle is dropped. I implemented the first one here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW, I think this option will be most useful for tests where the application is expected to gracefully handle all possible user inputs. |
||
Err(JoinError::panic(panic)) | ||
} | ||
}; | ||
|
||
// Catch and ignore panics if the future panics on drop. | ||
let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { | ||
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { | ||
core.store_output(output); | ||
})); | ||
|
||
if res.is_err() { | ||
scheduler.unhandled_panic(); | ||
} | ||
|
||
Poll::Ready(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT, this will only happen in the event of a Tokio bug, so this should probably be