From 1ea1bb333f637a9455dbe0489009fd7858cba935 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Thu, 28 Dec 2023 11:13:23 +0100 Subject: [PATCH 01/23] First "working" POC. --- tokio-util/Cargo.toml | 3 + tokio-util/src/lib.rs | 2 + tokio-util/src/lrtd/lrtd.rs | 175 ++++++++++++++++++++++++++++++++++++ tokio-util/src/lrtd/mod.rs | 5 ++ tokio-util/tests/lrtd.rs | 41 +++++++++ 5 files changed, 226 insertions(+) create mode 100644 tokio-util/src/lrtd/lrtd.rs create mode 100644 tokio-util/src/lrtd/mod.rs create mode 100644 tokio-util/tests/lrtd.rs diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 12c3c813656..17ae2a0ee94 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -43,6 +43,9 @@ futures-util = { version = "0.3.0", optional = true } pin-project-lite = "0.2.11" slab = { version = "0.4.4", optional = true } # Backs `DelayQueue` tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } +nix = "0.21" +rand = "0.8" +libc = "0.2" [target.'cfg(tokio_unstable)'.dependencies] hashbrown = { version = "0.14.0", optional = true } diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 22ad92b8c4b..435a925526a 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -51,6 +51,8 @@ cfg_time! { pub mod time; } +pub mod lrtd; + pub mod sync; pub mod either; diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs new file mode 100644 index 00000000000..c4fa5d37174 --- /dev/null +++ b/tokio-util/src/lrtd/lrtd.rs @@ -0,0 +1,175 @@ + +use nix::sys::signal::{self, SigAction, SaFlags, SigSet, SigHandler, Signal}; +use tokio::runtime::{Runtime, Builder}; +use std::backtrace::Backtrace; +use std::collections::HashSet; +use std::thread; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use std::sync::mpsc; +use rand::Rng; +use libc; + + +fn get_thread_id() -> libc::pthread_t { + unsafe { libc::pthread_self() } +} + + +#[derive(Debug)] +struct WorkerSet { + inner: Mutex>, +} + +impl WorkerSet { + fn new() -> Self { + WorkerSet { + inner: Mutex::new(HashSet::new()), + } + } + + fn add(&self, pid: libc::pthread_t) { + let mut set = self.inner.lock().unwrap(); + set.insert(pid); + } + + fn remove(&self, pid: libc::pthread_t) { + let mut set = self.inner.lock().unwrap(); + set.remove(&pid); + } + + fn get_all(&self) -> Vec { + let set = self.inner.lock().unwrap(); + set.iter().cloned().collect() + } +} + + + +extern "C" fn signal_handler(_: i32) { + let thread = thread::current(); + let name = thread + .name() + .map(|n| format!(" for thread \"{}\"", n)) + .unwrap_or("".to_owned()); + let trace = Backtrace::force_capture(); + eprintln!("Stack trace{}:{}\n{}", name, get_thread_id(), trace); +} + + +pub fn install_thread_stack_stace_handler(signal: Signal) { + + let mut sigset = SigSet::empty(); + sigset.add(signal); + + // Set up a signal action + let sa = SigAction::new( + SigHandler::Handler(signal_handler), + SaFlags::empty(), + sigset, + ); + + // Register the signal action for process + unsafe { + signal::sigaction(signal, &sa).expect("Failed to register signal handler"); + } + +} + +pub fn install_thread_stack_stace_handler_default() { + let default_signal = Signal::SIGUSR1; + install_thread_stack_stace_handler(default_signal); +} + + +fn signal_all_threads(signal: Signal, targets: Vec) { + for thread_id in &targets { + let result = unsafe { libc::pthread_kill(*thread_id, match signal.into() { + Some(s) => s as libc::c_int, + None => 0, + }) }; + if result != 0 { + eprintln!("Error sending signal: {:?}", result); + } + } +} + + +#[derive(Debug)] +pub struct LongRunningTaskDetector { + interval: Duration, + detection_time: Duration, + stop_flag: Arc>, + workers: Arc, + signal: Signal +} + +async fn do_nothing(tx: mpsc::Sender<()>) { + // signal I am done + let _ = tx.send(()).unwrap(); +} + + +fn probe(tokio_runtime: &Arc, detection_time:Duration, signal: Signal, workers: &Arc) { + let (tx, rx) = mpsc::channel(); + let handle = tokio_runtime.handle(); + let _nothing_handle = handle.spawn(do_nothing(tx)); + let is_probe_success = match rx.recv_timeout(detection_time) { + Ok(_result) => true, + Err(_) => false, + }; + if !is_probe_success { + let targets = workers.get_all(); + eprintln!("Detected worker blocking, signaling worker threads: {:?}", targets); + signal_all_threads(signal, targets); + // Wait for our probe to eventually finish, we do not want to have multiple probes running at the same time. + let _ = rx.recv().unwrap(); + } +} + +impl LongRunningTaskDetector { + + pub fn new(interval: Duration, detection_time: Duration, signal: Signal, runtime_builder: &mut Builder) -> Self { + let workers = Arc::new(WorkerSet::new()); + let workers_clone = Arc::clone(&workers); + let workers_clone2 = Arc::clone(&workers); + runtime_builder.on_thread_start( + move || { + let pid = get_thread_id(); + workers_clone.add(pid); + } + ) + .on_thread_stop(move ||{ + let pid = get_thread_id(); + workers_clone2.remove(pid); + }); + LongRunningTaskDetector { + interval, + detection_time, + stop_flag: Arc::new(Mutex::new(false)), + workers, + signal + } + } + + pub fn start(&self, runtime: Arc) + { + let stop_flag = Arc::clone(&self.stop_flag); + let detection_time = self.detection_time.clone(); + let interval = self.interval.clone(); + let signal = self.signal.clone(); + let tokio_runtime = runtime.clone(); + let workers = Arc::clone(&self.workers); + thread::spawn(move || { + let mut rng = rand::thread_rng(); + while !*stop_flag.lock().unwrap() { + probe(&tokio_runtime, detection_time, signal, &workers); + thread::sleep(Duration::from_micros(rng.gen_range(1..=interval.as_micros()).try_into().unwrap())); + } + }); + } + + pub fn stop(&self) { + *self.stop_flag.lock().unwrap() = true; + } +} diff --git a/tokio-util/src/lrtd/mod.rs b/tokio-util/src/lrtd/mod.rs new file mode 100644 index 00000000000..be4b0832b10 --- /dev/null +++ b/tokio-util/src/lrtd/mod.rs @@ -0,0 +1,5 @@ +mod lrtd; + +pub use self::lrtd::install_thread_stack_stace_handler; +pub use self::lrtd::install_thread_stack_stace_handler_default; +pub use self::lrtd::LongRunningTaskDetector; \ No newline at end of file diff --git a/tokio-util/tests/lrtd.rs b/tokio-util/tests/lrtd.rs new file mode 100644 index 00000000000..35a9accba87 --- /dev/null +++ b/tokio-util/tests/lrtd.rs @@ -0,0 +1,41 @@ +use nix::sys::signal::Signal; +use std::sync::Arc; +use std::thread; +use std::time::Duration; +use tokio_util::lrtd::{ + install_thread_stack_stace_handler, LongRunningTaskDetector, +}; + +async fn run_blocking_stuff() { + println!("slow start"); + thread::sleep(Duration::from_secs(2)); + println!("slow done"); +} + +fn install_thread_stack_stace_handler_default() { + install_thread_stack_stace_handler(Signal::SIGUSR1); +} + +#[test] +fn test_blocking_detection() { + install_thread_stack_stace_handler_default(); + let mut builder = tokio::runtime::Builder::new_multi_thread(); + let mutable_builder = builder.worker_threads(2); + let lrtd = LongRunningTaskDetector::new( + Duration::from_millis(10), + Duration::from_millis(100), + Signal::SIGUSR1, + mutable_builder, + ); + let runtime = builder.enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + lrtd.start(arc_runtime.clone()); + let runtime_handle = arc_runtime.handle(); + runtime_handle.spawn(run_blocking_stuff()); + runtime_handle.spawn(run_blocking_stuff()); + runtime_handle.block_on(async { + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + println!("Hello world"); + }); + lrtd.stop() +} From 03f3bcc57f489a348b3d20f47754925a367a5000 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Thu, 28 Dec 2023 11:13:23 +0100 Subject: [PATCH 02/23] First "working" POC to implement https://github.com/tokio-rs/console/issues/150 --- tokio-util/Cargo.toml | 3 + tokio-util/src/lib.rs | 2 + tokio-util/src/lrtd/lrtd.rs | 194 +++++++++++++++++++++++++++++++++++ tokio-util/src/lrtd/mod.rs | 5 + tokio-util/tests/lrtd.rs | 63 ++++++++++++ tokio/src/runtime/builder.rs | 11 ++ 6 files changed, 278 insertions(+) create mode 100644 tokio-util/src/lrtd/lrtd.rs create mode 100644 tokio-util/src/lrtd/mod.rs create mode 100644 tokio-util/tests/lrtd.rs diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 12c3c813656..17ae2a0ee94 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -43,6 +43,9 @@ futures-util = { version = "0.3.0", optional = true } pin-project-lite = "0.2.11" slab = { version = "0.4.4", optional = true } # Backs `DelayQueue` tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } +nix = "0.21" +rand = "0.8" +libc = "0.2" [target.'cfg(tokio_unstable)'.dependencies] hashbrown = { version = "0.14.0", optional = true } diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 22ad92b8c4b..435a925526a 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -51,6 +51,8 @@ cfg_time! { pub mod time; } +pub mod lrtd; + pub mod sync; pub mod either; diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs new file mode 100644 index 00000000000..9159cbd385a --- /dev/null +++ b/tokio-util/src/lrtd/lrtd.rs @@ -0,0 +1,194 @@ +use libc; +use nix::sys::signal::{self, SaFlags, SigAction, SigHandler, SigSet, Signal}; +use rand::Rng; +use std::backtrace::Backtrace; +use std::collections::HashSet; +use std::sync::mpsc; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use std::{env, thread}; +use tokio::runtime::{Builder, Runtime}; + +const PANIC_WORKER_BLOCK_DURATION_DEFAULT: Duration = Duration::from_secs(60); + +fn get_panic_worker_block_duration() -> Duration { + let duration_str = env::var("MY_DURATION_ENV").unwrap_or_else(|_| "60".to_string()); + duration_str + .parse::() + .map(Duration::from_secs) + .unwrap_or(PANIC_WORKER_BLOCK_DURATION_DEFAULT) +} + +fn get_thread_id() -> libc::pthread_t { + unsafe { libc::pthread_self() } +} + +#[derive(Debug)] +struct WorkerSet { + inner: Mutex>, +} + +impl WorkerSet { + fn new() -> Self { + WorkerSet { + inner: Mutex::new(HashSet::new()), + } + } + + fn add(&self, pid: libc::pthread_t) { + let mut set = self.inner.lock().unwrap(); + set.insert(pid); + } + + fn remove(&self, pid: libc::pthread_t) { + let mut set = self.inner.lock().unwrap(); + set.remove(&pid); + } + + fn get_all(&self) -> Vec { + let set = self.inner.lock().unwrap(); + set.iter().cloned().collect() + } +} + +extern "C" fn signal_handler(_: i32) { + let thread = thread::current(); + let name = thread + .name() + .map(|n| format!(" for thread \"{}\"", n)) + .unwrap_or("".to_owned()); + let trace = Backtrace::force_capture(); + eprintln!("Stack trace{}:{}\n{}", name, get_thread_id(), trace); +} + +pub fn install_thread_stack_stace_handler(signal: Signal) { + let mut sigset = SigSet::empty(); + sigset.add(signal); + + // Set up a signal action + let sa = SigAction::new( + SigHandler::Handler(signal_handler), + SaFlags::empty(), + sigset, + ); + + // Register the signal action for process + unsafe { + signal::sigaction(signal, &sa).expect("Failed to register signal handler"); + } +} + +pub fn install_thread_stack_stace_handler_default() { + let default_signal = Signal::SIGUSR1; + install_thread_stack_stace_handler(default_signal); +} + +fn signal_all_threads(signal: Signal, targets: Vec) { + for thread_id in &targets { + let result = unsafe { + libc::pthread_kill( + *thread_id, + match signal.into() { + Some(s) => s as libc::c_int, + None => 0, + }, + ) + }; + if result != 0 { + eprintln!("Error sending signal: {:?}", result); + } + } +} + +#[derive(Debug)] +pub struct LongRunningTaskDetector { + interval: Duration, + detection_time: Duration, + stop_flag: Arc>, + workers: Arc, + signal: Signal, +} + +async fn do_nothing(tx: mpsc::Sender<()>) { + // signal I am done + let _ = tx.send(()).unwrap(); +} + +fn probe( + tokio_runtime: &Arc, + detection_time: Duration, + signal: Signal, + workers: &Arc, +) { + let (tx, rx) = mpsc::channel(); + let _nothing_handle = tokio_runtime.spawn(do_nothing(tx)); + let is_probe_success = match rx.recv_timeout(detection_time) { + Ok(_result) => true, + Err(_) => false, + }; + if !is_probe_success { + let targets = workers.get_all(); + eprintln!( + "Detected worker blocking, signaling worker threads: {:?}", + targets + ); + signal_all_threads(signal, targets); + // Wait for our probe to eventually finish, we do not want to have multiple probes running at the same time. + let _ = rx.recv_timeout(get_panic_worker_block_duration()).unwrap(); + } +} + +impl LongRunningTaskDetector { + pub fn new( + interval: Duration, + detection_time: Duration, + signal: Signal, + runtime_builder: &mut Builder, + ) -> Self { + let workers = Arc::new(WorkerSet::new()); + if runtime_builder.is_current_threaded() { + workers.add(get_thread_id()); + } else { + let workers_clone = Arc::clone(&workers); + let workers_clone2 = Arc::clone(&workers); + runtime_builder + .on_thread_start(move || { + let pid = get_thread_id(); + workers_clone.add(pid); + }) + .on_thread_stop(move || { + let pid = get_thread_id(); + workers_clone2.remove(pid); + }); + } + LongRunningTaskDetector { + interval, + detection_time, + stop_flag: Arc::new(Mutex::new(false)), + workers, + signal, + } + } + + pub fn start(&self, runtime: Arc) { + let stop_flag = Arc::clone(&self.stop_flag); + let detection_time = self.detection_time.clone(); + let interval = self.interval.clone(); + let signal = self.signal.clone(); + let tokio_runtime = runtime.clone(); + let workers = Arc::clone(&self.workers); + thread::spawn(move || { + let mut rng = rand::thread_rng(); + while !*stop_flag.lock().unwrap() { + probe(&tokio_runtime, detection_time, signal, &workers); + thread::sleep(Duration::from_micros( + rng.gen_range(1..=interval.as_micros()).try_into().unwrap(), + )); + } + }); + } + + pub fn stop(&self) { + *self.stop_flag.lock().unwrap() = true; + } +} diff --git a/tokio-util/src/lrtd/mod.rs b/tokio-util/src/lrtd/mod.rs new file mode 100644 index 00000000000..be4b0832b10 --- /dev/null +++ b/tokio-util/src/lrtd/mod.rs @@ -0,0 +1,5 @@ +mod lrtd; + +pub use self::lrtd::install_thread_stack_stace_handler; +pub use self::lrtd::install_thread_stack_stace_handler_default; +pub use self::lrtd::LongRunningTaskDetector; \ No newline at end of file diff --git a/tokio-util/tests/lrtd.rs b/tokio-util/tests/lrtd.rs new file mode 100644 index 00000000000..f8c4c8bbf36 --- /dev/null +++ b/tokio-util/tests/lrtd.rs @@ -0,0 +1,63 @@ +use nix::sys::signal::Signal; +use std::sync::Arc; +use std::thread; +use std::time::Duration; +use tokio_util::lrtd::{install_thread_stack_stace_handler, LongRunningTaskDetector}; + +async fn run_blocking_stuff() { + println!("slow start"); + thread::sleep(Duration::from_secs(2)); + println!("slow done"); +} + +fn install_thread_stack_stace_handler_default() { + install_thread_stack_stace_handler(Signal::SIGUSR1); +} + +#[test] +fn test_blocking_detection_multi() { + install_thread_stack_stace_handler_default(); + let mut builder = tokio::runtime::Builder::new_multi_thread(); + let mutable_builder = builder.worker_threads(2); + let lrtd = LongRunningTaskDetector::new( + Duration::from_millis(10), + Duration::from_millis(100), + Signal::SIGUSR1, + mutable_builder, + ); + let runtime = builder.enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + lrtd.start(arc_runtime.clone()); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.block_on(async { + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + println!("Hello world"); + }); + lrtd.stop() +} + +#[test] +fn test_blocking_detection_current() { + install_thread_stack_stace_handler_default(); + let mut builder = tokio::runtime::Builder::new_current_thread(); + let mutable_builder = builder.enable_all(); + let lrtd = LongRunningTaskDetector::new( + Duration::from_millis(10), + Duration::from_millis(100), + Signal::SIGUSR1, + mutable_builder, + ); + let runtime = mutable_builder.build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + lrtd.start(arc_runtime); + arc_runtime2.block_on(async { + run_blocking_stuff().await; + println!("Sleeping"); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + println!("Hello world"); + }); + lrtd.stop() +} diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 78e6bf50d62..f4eeb775984 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -318,6 +318,17 @@ impl Builder { } } + /// Returns true if kind is "CurrentThread" of this [`Builder`]. False otherwise. + pub fn is_current_threaded(&self) -> bool { + match &self.kind { + Kind::CurrentThread => true, + #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] + Kind::MultiThread => false, + #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] + Kind::MultiThreadAlt => false, + } + } + /// Enables both I/O and time drivers. /// /// Doing this is a shorthand for calling `enable_io` and `enable_time` From 77c61f54ac200392e2cb3921009e92201808d214 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Thu, 28 Dec 2023 16:58:30 +0100 Subject: [PATCH 03/23] [cleanup] remove unused method. --- tokio-util/src/lrtd/lrtd.rs | 5 ----- tokio-util/src/lrtd/mod.rs | 1 - 2 files changed, 6 deletions(-) diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs index 9159cbd385a..62209c10634 100644 --- a/tokio-util/src/lrtd/lrtd.rs +++ b/tokio-util/src/lrtd/lrtd.rs @@ -78,11 +78,6 @@ pub fn install_thread_stack_stace_handler(signal: Signal) { } } -pub fn install_thread_stack_stace_handler_default() { - let default_signal = Signal::SIGUSR1; - install_thread_stack_stace_handler(default_signal); -} - fn signal_all_threads(signal: Signal, targets: Vec) { for thread_id in &targets { let result = unsafe { diff --git a/tokio-util/src/lrtd/mod.rs b/tokio-util/src/lrtd/mod.rs index be4b0832b10..eb79da7b280 100644 --- a/tokio-util/src/lrtd/mod.rs +++ b/tokio-util/src/lrtd/mod.rs @@ -1,5 +1,4 @@ mod lrtd; pub use self::lrtd::install_thread_stack_stace_handler; -pub use self::lrtd::install_thread_stack_stace_handler_default; pub use self::lrtd::LongRunningTaskDetector; \ No newline at end of file From e6c9c6cb8cb062c4b7e72a6ef9f97ce0e5d70353 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Fri, 29 Dec 2023 14:21:40 +0100 Subject: [PATCH 04/23] Add ability to hook actions on detection. --- tokio-util/src/lrtd/lrtd.rs | 191 +++++++++++++++++++++++++++++++++--- tokio-util/src/lrtd/mod.rs | 5 +- tokio-util/tests/lrtd.rs | 8 +- 3 files changed, 180 insertions(+), 24 deletions(-) diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs index 62209c10634..89845d25aff 100644 --- a/tokio-util/src/lrtd/lrtd.rs +++ b/tokio-util/src/lrtd/lrtd.rs @@ -3,12 +3,111 @@ use nix::sys::signal::{self, SaFlags, SigAction, SigHandler, SigSet, Signal}; use rand::Rng; use std::backtrace::Backtrace; use std::collections::HashSet; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Once}; use std::time::Duration; use std::{env, thread}; use tokio::runtime::{Builder, Runtime}; +// A static AtomicUsize variable +static mut GLOBAL_COUNTER: Option = None; +static INIT: Once = Once::new(); + +// Function to initialize the global counter if not already initialized +fn initialize_global_counter() { + INIT.call_once(|| unsafe { + GLOBAL_COUNTER = Some(AtomicUsize::new(0)); + }); +} + +// Function to get the next value in the global counter +fn get_next_global_value() -> usize { + initialize_global_counter(); + + unsafe { + GLOBAL_COUNTER + .as_ref() + .expect("Global counter not initialized") + .fetch_add(1, Ordering::Relaxed) + } +} + +struct OwnedThreadStateHandler { + owners: Vec, + action: Arc, +} + +impl OwnedThreadStateHandler { + fn add(&mut self, action: Arc, owner: usize) { + if !Arc::ptr_eq(&action, &(self.action)) { + panic!("Cannot overwrite action with something different!") + } + let mut found = false; + for x_owner in &(self.owners) { + if *x_owner == owner { + panic!("Cannot set thread state handler twice by the same owner") + } + } + if !found { + self.owners.push(owner); + } + } + + fn remove(&mut self, owner: usize) { + if let Some(index) = self.owners.iter().position(|&x| x == owner) { + // Use remove() to remove the value at the found index + let removed_value = self.owners.remove(index); + } else { + panic!("Cannot find owner") + } + } +} + +static mut GLOBAL_TH_STATE_HANDLER: Mutex> = Mutex::new(None); + +fn set_thread_state_handler(action: Arc, owner: usize) { + unsafe { + if let Ok(mut value) = GLOBAL_TH_STATE_HANDLER.lock() { + match value.as_mut() { + Some(handler) => { + (*handler).add(action, owner); + } + None => { + let mut owners = Vec::new(); + owners.push(owner); + *value = Some(OwnedThreadStateHandler { owners, action }); + } + } + } + } +} + +fn reset_thread_state_handler(owner: usize) { + unsafe { + if let Ok(mut value) = GLOBAL_TH_STATE_HANDLER.lock() { + match value.as_mut() { + Some(handler) => { + (*handler).remove(owner); + } + None => { + panic!("Cannot find handler"); + } + } + } + } +} + +fn get_thread_state_handler() -> Arc { + unsafe { + let option = GLOBAL_TH_STATE_HANDLER.lock().unwrap(); + match option.as_ref() { + Some(value) => value.action.clone(), + None => Arc::new(StdErrThreadStateHandler), + } + } +} + const PANIC_WORKER_BLOCK_DURATION_DEFAULT: Duration = Duration::from_secs(60); fn get_panic_worker_block_duration() -> Duration { @@ -23,6 +122,46 @@ fn get_thread_id() -> libc::pthread_t { unsafe { libc::pthread_self() } } +pub trait ThreadStateHandler: Send + Sync { + fn blocking_thread_details( + &self, + thread_id: libc::pthread_t, + thread_name: Option<&str>, + backtrace: Backtrace, + ); +} + +pub trait BlockingActionHandler: Send + Sync { + fn blocking_detected(&self, signal: Signal, targets: &Vec); +} + +struct StdErrBlockingActionHandler; + +impl BlockingActionHandler for StdErrBlockingActionHandler { + fn blocking_detected(&self, signal: Signal, targets: &Vec) { + eprintln!( + "Detected worker blocking, signaling worker threads: {:?}", + targets + ); + } +} + +struct StdErrThreadStateHandler; + +impl ThreadStateHandler for StdErrThreadStateHandler { + fn blocking_thread_details( + &self, + thread_id: libc::pthread_t, + thread_name: Option<&str>, + backtrace: Backtrace, + ) { + let name = thread_name + .map(|n| format!(" for thread \"{}\"", n)) + .unwrap_or("".to_owned()); + eprintln!("Stack trace{}:{}\n{}", name, thread_id, backtrace); + } +} + #[derive(Debug)] struct WorkerSet { inner: Mutex>, @@ -52,16 +191,21 @@ impl WorkerSet { } extern "C" fn signal_handler(_: i32) { - let thread = thread::current(); - let name = thread - .name() - .map(|n| format!(" for thread \"{}\"", n)) - .unwrap_or("".to_owned()); - let trace = Backtrace::force_capture(); - eprintln!("Stack trace{}:{}\n{}", name, get_thread_id(), trace); + let backtrace = Backtrace::force_capture(); + get_thread_state_handler().blocking_thread_details( + get_thread_id(), + thread::current().name(), + backtrace, + ); +} + +static INIT_SIGNAL_HANDLER: Once = Once::new(); + +fn install_thread_stack_stace_handler(signal: Signal) { + INIT_SIGNAL_HANDLER.call_once(|| _install_thread_stack_stace_handler(signal)) } -pub fn install_thread_stack_stace_handler(signal: Signal) { +fn _install_thread_stack_stace_handler(signal: Signal) { let mut sigset = SigSet::empty(); sigset.add(signal); @@ -102,6 +246,7 @@ pub struct LongRunningTaskDetector { stop_flag: Arc>, workers: Arc, signal: Signal, + identity: usize, } async fn do_nothing(tx: mpsc::Sender<()>) { @@ -114,6 +259,7 @@ fn probe( detection_time: Duration, signal: Signal, workers: &Arc, + action: &Arc, ) { let (tx, rx) = mpsc::channel(); let _nothing_handle = tokio_runtime.spawn(do_nothing(tx)); @@ -123,10 +269,8 @@ fn probe( }; if !is_probe_success { let targets = workers.get_all(); - eprintln!( - "Detected worker blocking, signaling worker threads: {:?}", - targets - ); + action.blocking_detected(signal, &targets); + signal_all_threads(signal, targets); // Wait for our probe to eventually finish, we do not want to have multiple probes running at the same time. let _ = rx.recv_timeout(get_panic_worker_block_duration()).unwrap(); @@ -140,6 +284,7 @@ impl LongRunningTaskDetector { signal: Signal, runtime_builder: &mut Builder, ) -> Self { + install_thread_stack_stace_handler(signal); let workers = Arc::new(WorkerSet::new()); if runtime_builder.is_current_threaded() { workers.add(get_thread_id()); @@ -156,26 +301,41 @@ impl LongRunningTaskDetector { workers_clone2.remove(pid); }); } + let identity = get_next_global_value(); LongRunningTaskDetector { interval, detection_time, stop_flag: Arc::new(Mutex::new(false)), workers, signal, + identity, } } pub fn start(&self, runtime: Arc) { + self.start_with_custom_action( + runtime, + Arc::new(StdErrBlockingActionHandler), + Arc::new(StdErrThreadStateHandler), + ) + } + + pub fn start_with_custom_action( + &self, + runtime: Arc, + action: Arc, + thread_action: Arc, + ) { + set_thread_state_handler(thread_action, self.identity); let stop_flag = Arc::clone(&self.stop_flag); let detection_time = self.detection_time.clone(); let interval = self.interval.clone(); let signal = self.signal.clone(); - let tokio_runtime = runtime.clone(); let workers = Arc::clone(&self.workers); thread::spawn(move || { let mut rng = rand::thread_rng(); while !*stop_flag.lock().unwrap() { - probe(&tokio_runtime, detection_time, signal, &workers); + probe(&runtime, detection_time, signal, &workers, &action); thread::sleep(Duration::from_micros( rng.gen_range(1..=interval.as_micros()).try_into().unwrap(), )); @@ -185,5 +345,6 @@ impl LongRunningTaskDetector { pub fn stop(&self) { *self.stop_flag.lock().unwrap() = true; + reset_thread_state_handler(self.identity); } } diff --git a/tokio-util/src/lrtd/mod.rs b/tokio-util/src/lrtd/mod.rs index eb79da7b280..478571cc47b 100644 --- a/tokio-util/src/lrtd/mod.rs +++ b/tokio-util/src/lrtd/mod.rs @@ -1,4 +1,5 @@ mod lrtd; -pub use self::lrtd::install_thread_stack_stace_handler; -pub use self::lrtd::LongRunningTaskDetector; \ No newline at end of file +pub use self::lrtd::LongRunningTaskDetector; +pub use self::lrtd::ThreadStateHandler; +pub use self::lrtd::BlockingActionHandler; \ No newline at end of file diff --git a/tokio-util/tests/lrtd.rs b/tokio-util/tests/lrtd.rs index f8c4c8bbf36..44f3390660b 100644 --- a/tokio-util/tests/lrtd.rs +++ b/tokio-util/tests/lrtd.rs @@ -2,7 +2,7 @@ use nix::sys::signal::Signal; use std::sync::Arc; use std::thread; use std::time::Duration; -use tokio_util::lrtd::{install_thread_stack_stace_handler, LongRunningTaskDetector}; +use tokio_util::lrtd::LongRunningTaskDetector; async fn run_blocking_stuff() { println!("slow start"); @@ -10,13 +10,8 @@ async fn run_blocking_stuff() { println!("slow done"); } -fn install_thread_stack_stace_handler_default() { - install_thread_stack_stace_handler(Signal::SIGUSR1); -} - #[test] fn test_blocking_detection_multi() { - install_thread_stack_stace_handler_default(); let mut builder = tokio::runtime::Builder::new_multi_thread(); let mutable_builder = builder.worker_threads(2); let lrtd = LongRunningTaskDetector::new( @@ -40,7 +35,6 @@ fn test_blocking_detection_multi() { #[test] fn test_blocking_detection_current() { - install_thread_stack_stace_handler_default(); let mut builder = tokio::runtime::Builder::new_current_thread(); let mutable_builder = builder.enable_all(); let lrtd = LongRunningTaskDetector::new( From 144cfffa96c46f92cc4097eb8ffaf734361d57da Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sat, 30 Dec 2023 09:25:24 +0100 Subject: [PATCH 05/23] [add] document action traits + minor improvement. --- tokio-util/src/lrtd/lrtd.rs | 40 ++++++++++++++++++++++++++++++------- tokio-util/tests/lrtd.rs | 2 ++ 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs index 89845d25aff..c0ec28b34db 100644 --- a/tokio-util/src/lrtd/lrtd.rs +++ b/tokio-util/src/lrtd/lrtd.rs @@ -122,7 +122,17 @@ fn get_thread_id() -> libc::pthread_t { unsafe { libc::pthread_self() } } +/// A trait for handling thread state details. +/// +/// This trait provides a method for capturing details of a blocking thread. pub trait ThreadStateHandler: Send + Sync { + /// Invoked with details of a blocking thread. + /// + /// # Arguments + /// + /// * `thread_id` - The ID of the blocking thread. + /// * `thread_name` - An optional name of the blocking thread. + /// * `backtrace` - The backtrace of the blocking thread. fn blocking_thread_details( &self, thread_id: libc::pthread_t, @@ -131,18 +141,34 @@ pub trait ThreadStateHandler: Send + Sync { ); } +/// A trait for handling actions when blocking is detected. +/// +/// This trait provides a method for handling the detection of a blocking action. pub trait BlockingActionHandler: Send + Sync { - fn blocking_detected(&self, signal: Signal, targets: &Vec); + /// Called when a blocking action is detected and prior to thread signaling. + /// + /// # Arguments + /// + /// * `signal` - The signal used to signal tokio worker threads for state details. + /// * `targets` - The list of thread IDs of the tokio runtime worker threads. /// # Returns + /// + /// # Returns + /// + /// Returns `true` if the signaling of the threads is to be executed, false otherwise. + /// + fn blocking_detected(&self, signal: Signal, targets: &Vec) -> bool; } struct StdErrBlockingActionHandler; impl BlockingActionHandler for StdErrBlockingActionHandler { - fn blocking_detected(&self, signal: Signal, targets: &Vec) { + fn blocking_detected(&self, signal: Signal, targets: &Vec) -> bool { eprintln!( - "Detected worker blocking, signaling worker threads: {:?}", + "Detected worker blocking, signaling {} worker threads: {:?}", + signal, targets ); + true } } @@ -269,10 +295,10 @@ fn probe( }; if !is_probe_success { let targets = workers.get_all(); - action.blocking_detected(signal, &targets); - - signal_all_threads(signal, targets); - // Wait for our probe to eventually finish, we do not want to have multiple probes running at the same time. + if action.blocking_detected(signal, &targets) { + signal_all_threads(signal, targets); + // Wait for our probe to eventually finish, we do not want to have multiple probes running at the same time. + } let _ = rx.recv_timeout(get_panic_worker_block_duration()).unwrap(); } } diff --git a/tokio-util/tests/lrtd.rs b/tokio-util/tests/lrtd.rs index 44f3390660b..4640e936b7d 100644 --- a/tokio-util/tests/lrtd.rs +++ b/tokio-util/tests/lrtd.rs @@ -55,3 +55,5 @@ fn test_blocking_detection_current() { }); lrtd.stop() } + + From 6f00e2e3efec0705f85507c2b80af500e3864e5b Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sat, 30 Dec 2023 11:16:46 +0100 Subject: [PATCH 06/23] [add] More documentation + test. --- tokio-util/src/lrtd/lrtd.rs | 80 ++++++++++++++++++++++++++- tokio-util/tests/lrtd.rs | 106 +++++++++++++++++++++++++++++++++++- 2 files changed, 180 insertions(+), 6 deletions(-) diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs index c0ec28b34db..a51572393d8 100644 --- a/tokio-util/src/lrtd/lrtd.rs +++ b/tokio-util/src/lrtd/lrtd.rs @@ -10,18 +10,15 @@ use std::time::Duration; use std::{env, thread}; use tokio::runtime::{Builder, Runtime}; -// A static AtomicUsize variable static mut GLOBAL_COUNTER: Option = None; static INIT: Once = Once::new(); -// Function to initialize the global counter if not already initialized fn initialize_global_counter() { INIT.call_once(|| unsafe { GLOBAL_COUNTER = Some(AtomicUsize::new(0)); }); } -// Function to get the next value in the global counter fn get_next_global_value() -> usize { initialize_global_counter(); @@ -303,7 +300,70 @@ fn probe( } } +/// Utility to help with "really nice to add a warning for tasks that might be blocking" +/// Example use: +/// +/// let mut builder = tokio::runtime::Builder::new_multi_thread(); +/// let mutable_builder = builder.worker_threads(2); +/// let lrtd = LongRunningTaskDetector::new( +/// Duration::from_millis(10), +/// Duration::from_millis(100), +/// Signal::SIGUSR1, +/// mutable_builder, +/// ); +/// let runtime = builder.enable_all().build().unwrap(); +/// let arc_runtime = Arc::new(runtime); +/// let arc_runtime2 = arc_runtime.clone(); +/// lrtd.start(arc_runtime.clone()); +/// arc_runtime2.block_on(async { +/// ... +/// }); +/// lrtd.stop() +/// +/// The above will allow you to get details on what is blocking your tokio worker threads for longer that 100ms. +/// The detail will look like: +/// +/// Detected worker blocking, signaling SIGUSR1 worker threads: [123145318232064, 123145320341504] +/// Stack trace for thread "tokio-runtime-worker":123145318232064 +/// 0: std::backtrace_rs::backtrace::libunwind::trace +/// at /rustc/a28077b28a02b92985b3a3faecf92813155f1ea1/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93:5 +/// 1: std::backtrace_rs::backtrace::trace_unsynchronized +/// at /rustc/a28077b28a02b92985b3a3faecf92813155f1ea1/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5 +/// 2: std::backtrace::Backtrace::create +/// at /rustc/a28077b28a02b92985b3a3faecf92813155f1ea1/library/std/src/backtrace.rs:331:13 +/// 3: std::backtrace::Backtrace::force_capture +/// at /rustc/a28077b28a02b92985b3a3faecf92813155f1ea1/library/std/src/backtrace.rs:313:9 +/// 4: tokio_util::lrtd::lrtd::signal_handler +/// at ./src/lrtd/lrtd.rs:217:21 +/// 5: __sigtramp +/// 6: ___semwait_signal +/// 7: +/// 8: std::sys::unix::thread::Thread::sleep +/// at /rustc/a28077b28a02b92985b3a3faecf92813155f1ea1/library/std/src/sys/unix/thread.rs:241:20 +/// 9: std::thread::sleep +/// at /rustc/a28077b28a02b92985b3a3faecf92813155f1ea1/library/std/src/thread/mod.rs:872:5 +/// 10: lrtd::run_blocking_stuff::{{closure}} +/// at ./tests/lrtd.rs:11:5 +/// 11: tokio::runtime::task::core::Core::poll::{{closure}} +/// at /Users/zoly/NetBeansProjects/tokio/tokio/src/runtime/task/core.rs:328:17 +/// 12: tokio::loom::std::unsafe_cell::UnsafeCell::with_mut +/// at /Users/zoly/NetBeansProjects/tokio/tokio/src/loom/std/unsafe_cell.rs:16:9 +/// 13: tokio::runtime::task::core::Core::poll +/// impl LongRunningTaskDetector { + /// Creates a new `LongRunningTaskDetector` instance. + /// + /// # Arguments + /// + /// * `interval` - The interval between probes. This interval is randomized. + /// * `detection_time` - The maximum time allowed for a probe to succeed. + /// A probe running for longer indicates something is blocking the worker threads. + /// * `signal` - The signal to use for signaling worker threads when blocking is detected. + /// * `runtime_builder` - A mutable reference to a `tokio::runtime::Builder`. + /// + /// # Returns + /// + /// Returns a new `LongRunningTaskDetector` instance. pub fn new( interval: Duration, detection_time: Duration, @@ -338,6 +398,11 @@ impl LongRunningTaskDetector { } } + /// Starts the monitoring thread with default action handlers (that write details to std err). + /// + /// # Arguments + /// + /// * `runtime` - An `Arc` reference to a `tokio::runtime::Runtime`. pub fn start(&self, runtime: Arc) { self.start_with_custom_action( runtime, @@ -346,6 +411,14 @@ impl LongRunningTaskDetector { ) } + /// Starts the monitoring process with custom action handlers that + /// allow you to customize what happens when blocking is detected. + /// + /// # Arguments + /// + /// * `runtime` - An `Arc` reference to a `tokio::runtime::Runtime`. + /// * `action` - An `Arc` reference to a custom `BlockingActionHandler`. + /// * `thread_action` - An `Arc` reference to a custom `ThreadStateHandler`. pub fn start_with_custom_action( &self, runtime: Arc, @@ -369,6 +442,7 @@ impl LongRunningTaskDetector { }); } + /// Stops the monitoring thread. pub fn stop(&self) { *self.stop_flag.lock().unwrap() = true; reset_thread_state_handler(self.identity); diff --git a/tokio-util/tests/lrtd.rs b/tokio-util/tests/lrtd.rs index 4640e936b7d..37771b7e02a 100644 --- a/tokio-util/tests/lrtd.rs +++ b/tokio-util/tests/lrtd.rs @@ -1,8 +1,10 @@ use nix::sys::signal::Signal; -use std::sync::Arc; +use std::backtrace::Backtrace; +use std::collections::HashSet; +use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; -use tokio_util::lrtd::LongRunningTaskDetector; +use tokio_util::lrtd::{LongRunningTaskDetector, BlockingActionHandler, ThreadStateHandler}; async fn run_blocking_stuff() { println!("slow start"); @@ -23,7 +25,7 @@ fn test_blocking_detection_multi() { let runtime = builder.enable_all().build().unwrap(); let arc_runtime = Arc::new(runtime); let arc_runtime2 = arc_runtime.clone(); - lrtd.start(arc_runtime.clone()); + lrtd.start(arc_runtime); arc_runtime2.spawn(run_blocking_stuff()); arc_runtime2.spawn(run_blocking_stuff()); arc_runtime2.block_on(async { @@ -57,3 +59,101 @@ fn test_blocking_detection_current() { } + +struct CaptureBlockingActionHandler { + inner: Mutex> +} + +impl CaptureBlockingActionHandler { + fn new() -> Self { + CaptureBlockingActionHandler { + inner: Mutex::new(HashSet::new()), + } + } + + fn get_all(&self) -> Vec { + let set = self.inner.lock().unwrap(); + set.iter().cloned().collect() + } +} + +impl BlockingActionHandler for CaptureBlockingActionHandler { + fn blocking_detected(&self, _signal: Signal, targets: &Vec) -> bool { + let mut set = self.inner.lock().unwrap(); + set.extend(targets); + true + } +} + +struct CaptureThreadStateHandler { + inner: Mutex> +} + +impl CaptureThreadStateHandler { + fn new() -> Self { + CaptureThreadStateHandler { + inner: Mutex::new(Vec::new()), + } + } + + fn get_all(&self) -> Vec { + let vec = self.inner.lock().unwrap(); + vec.clone() + } + + fn contains_symbol(&self, symbol_name: &str) -> bool { + // Iterate over the frames in the backtrace + let traces = self.get_all(); + if traces.is_empty() { + return false + } + let bt_str = traces.get(0).unwrap(); + bt_str.contains(symbol_name) + } + +} + + +impl ThreadStateHandler for CaptureThreadStateHandler { + fn blocking_thread_details( + &self, + _thread_id: libc::pthread_t, + _thread_name: Option<&str>, + backtrace: Backtrace, + ) { + let mut vec = self.inner.lock().unwrap(); + vec.push( format!("{}", backtrace)); + } +} + + + + +#[test] +fn test_blocking_detection_multi_capture() { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + let mutable_builder = builder.worker_threads(2); + let lrtd = LongRunningTaskDetector::new( + Duration::from_millis(10), + Duration::from_millis(100), + Signal::SIGUSR1, + mutable_builder, + ); + let runtime = builder.enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + let blocking_action = Arc::new(CaptureBlockingActionHandler::new()); + let thread_state_action = Arc::new(CaptureThreadStateHandler::new()); + let to_assert_blocking = blocking_action.clone(); + let to_assert_thread = thread_state_action.clone(); + lrtd.start_with_custom_action(arc_runtime, blocking_action, thread_state_action); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.block_on(async { + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + println!("Hello world"); + }); + assert_eq!(2, to_assert_blocking.get_all().len()); + assert!(to_assert_thread.contains_symbol("std::thread::sleep")); + lrtd.stop() +} \ No newline at end of file From fb4cc0258ee271c0ea960085df4cfb2e7db83839 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sat, 30 Dec 2023 11:20:02 +0100 Subject: [PATCH 07/23] [add] one more comment. --- tokio-util/src/lrtd/lrtd.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs index a51572393d8..6afe446b564 100644 --- a/tokio-util/src/lrtd/lrtd.rs +++ b/tokio-util/src/lrtd/lrtd.rs @@ -262,6 +262,7 @@ fn signal_all_threads(signal: Signal, targets: Vec) { } } +/// Utility to help with "really nice to add a warning for tasks that might be blocking" #[derive(Debug)] pub struct LongRunningTaskDetector { interval: Duration, From 5247795f4b00c524912f5de2db695289613df26f Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sat, 30 Dec 2023 14:10:50 +0100 Subject: [PATCH 08/23] [cleanup] fix all warnings + code cleanup. --- tokio-util/src/lrtd/lrtd.rs | 33 ++++++++++++++++++++------------- tokio-util/src/lrtd/mod.rs | 1 + tokio-util/tests/lrtd.rs | 13 +++++++++++++ 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs index 6afe446b564..7e592abdd3b 100644 --- a/tokio-util/src/lrtd/lrtd.rs +++ b/tokio-util/src/lrtd/lrtd.rs @@ -1,3 +1,4 @@ +/// Utility to help with "really nice to add a warning for tasks that might be blocking" use libc; use nix::sys::signal::{self, SaFlags, SigAction, SigHandler, SigSet, Signal}; use rand::Rng; @@ -19,6 +20,7 @@ fn initialize_global_counter() { }); } +/// The purpose of this method is to generate unique IDs for LRTD instances. fn get_next_global_value() -> usize { initialize_global_counter(); @@ -40,23 +42,22 @@ impl OwnedThreadStateHandler { if !Arc::ptr_eq(&action, &(self.action)) { panic!("Cannot overwrite action with something different!") } - let mut found = false; for x_owner in &(self.owners) { if *x_owner == owner { - panic!("Cannot set thread state handler twice by the same owner") + panic!("Cannot set thread state handler twice by the same owner: {}", owner); } } - if !found { - self.owners.push(owner); - } + self.owners.push(owner); } fn remove(&mut self, owner: usize) { if let Some(index) = self.owners.iter().position(|&x| x == owner) { - // Use remove() to remove the value at the found index let removed_value = self.owners.remove(index); + if removed_value != owner { + panic!("Wrong value, whould have been {}", owner); + } } else { - panic!("Cannot find owner") + panic!("Cannot find owner: {}", owner) } } } @@ -88,7 +89,7 @@ fn reset_thread_state_handler(owner: usize) { (*handler).remove(owner); } None => { - panic!("Cannot find handler"); + panic!("No action handler found for: {}, likely LongRunningTaskDetector not started", owner); } } } @@ -158,6 +159,7 @@ pub trait BlockingActionHandler: Send + Sync { struct StdErrBlockingActionHandler; +/// BlockingActionHandler implementation that writes blocker details to standard error. impl BlockingActionHandler for StdErrBlockingActionHandler { fn blocking_detected(&self, signal: Signal, targets: &Vec) -> bool { eprintln!( @@ -171,6 +173,7 @@ impl BlockingActionHandler for StdErrBlockingActionHandler { struct StdErrThreadStateHandler; +/// ThreadStateHandler implementation that writes details to standard error. impl ThreadStateHandler for StdErrThreadStateHandler { fn blocking_thread_details( &self, @@ -392,14 +395,14 @@ impl LongRunningTaskDetector { LongRunningTaskDetector { interval, detection_time, - stop_flag: Arc::new(Mutex::new(false)), + stop_flag: Arc::new(Mutex::new(true)), workers, signal, identity, } } - /// Starts the monitoring thread with default action handlers (that write details to std err). + /// Starts the monitoring thread with default action handlers (that write details to std err). /// /// # Arguments /// @@ -427,6 +430,7 @@ impl LongRunningTaskDetector { thread_action: Arc, ) { set_thread_state_handler(thread_action, self.identity); + *self.stop_flag.lock().unwrap() = false; let stop_flag = Arc::clone(&self.stop_flag); let detection_time = self.detection_time.clone(); let interval = self.interval.clone(); @@ -443,9 +447,12 @@ impl LongRunningTaskDetector { }); } - /// Stops the monitoring thread. + /// Stops the monitoring thread. Does nothing if LRTD is already stopped. pub fn stop(&self) { - *self.stop_flag.lock().unwrap() = true; - reset_thread_state_handler(self.identity); + let mut sf = self.stop_flag.lock().unwrap(); + if *sf != true { + *sf = true; + reset_thread_state_handler(self.identity); + } } } diff --git a/tokio-util/src/lrtd/mod.rs b/tokio-util/src/lrtd/mod.rs index 478571cc47b..2975b36cda4 100644 --- a/tokio-util/src/lrtd/mod.rs +++ b/tokio-util/src/lrtd/mod.rs @@ -1,3 +1,4 @@ +//! Utility to help with "really nice to add a warning for tasks that might be blocking" mod lrtd; pub use self::lrtd::LongRunningTaskDetector; diff --git a/tokio-util/tests/lrtd.rs b/tokio-util/tests/lrtd.rs index 37771b7e02a..5c46de98397 100644 --- a/tokio-util/tests/lrtd.rs +++ b/tokio-util/tests/lrtd.rs @@ -156,4 +156,17 @@ fn test_blocking_detection_multi_capture() { assert_eq!(2, to_assert_blocking.get_all().len()); assert!(to_assert_thread.contains_symbol("std::thread::sleep")); lrtd.stop() +} + +#[test] +fn test_blocking_detection_stop_unstarted() { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + let mutable_builder = builder.worker_threads(2); + let lrtd = LongRunningTaskDetector::new( + Duration::from_millis(10), + Duration::from_millis(100), + Signal::SIGUSR1, + mutable_builder, + ); + lrtd.stop() } \ No newline at end of file From 73c8bc523feb3a339889405238b93d36cd4d49ca Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sat, 30 Dec 2023 16:16:09 +0100 Subject: [PATCH 09/23] Fix tests + remove unsafe stuff. --- tokio-util/Cargo.toml | 3 +- tokio-util/src/cfg.rs | 10 ++++++ tokio-util/src/lib.rs | 4 ++- tokio-util/src/lrtd/lrtd.rs | 63 +++++++++++++++++++------------------ tokio-util/tests/lrtd.rs | 6 ++++ 5 files changed, 54 insertions(+), 32 deletions(-) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 17ae2a0ee94..e609b2d6ff2 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -21,7 +21,7 @@ categories = ["asynchronous"] default = [] # Shorthand for enabling everything -full = ["codec", "compat", "io-util", "time", "net", "rt"] +full = ["codec", "compat", "io-util", "time", "net", "rt", "lrtd"] net = ["tokio/net"] compat = ["futures-io",] @@ -30,6 +30,7 @@ time = ["tokio/time","slab"] io = [] io-util = ["io", "tokio/rt", "tokio/io-util"] rt = ["tokio/rt", "tokio/sync", "futures-util", "hashbrown"] +lrtd = [] __docs_rs = ["futures-util"] diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs index 4035255aff0..8b24feeb866 100644 --- a/tokio-util/src/cfg.rs +++ b/tokio-util/src/cfg.rs @@ -69,3 +69,13 @@ macro_rules! cfg_time { )* } } + +macro_rules! cfg_lrtd { + ($($item:item)*) => { + $( + #[cfg(feature = "lrtd")] + #[cfg_attr(docsrs, doc(cfg(feature = "lrtd")))] + $item + )* + } +} \ No newline at end of file diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 435a925526a..92176346de6 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -51,7 +51,9 @@ cfg_time! { pub mod time; } -pub mod lrtd; +cfg_lrtd! { + pub mod lrtd; +} pub mod sync; diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs index 7e592abdd3b..35fcdb5710f 100644 --- a/tokio-util/src/lrtd/lrtd.rs +++ b/tokio-util/src/lrtd/lrtd.rs @@ -40,7 +40,7 @@ struct OwnedThreadStateHandler { impl OwnedThreadStateHandler { fn add(&mut self, action: Arc, owner: usize) { if !Arc::ptr_eq(&action, &(self.action)) { - panic!("Cannot overwrite action with something different!") + panic!("Cannot overwrite action with something different!"); } for x_owner in &(self.owners) { if *x_owner == owner { @@ -62,47 +62,43 @@ impl OwnedThreadStateHandler { } } -static mut GLOBAL_TH_STATE_HANDLER: Mutex> = Mutex::new(None); +static GLOBAL_TH_STATE_HANDLER: Mutex> = Mutex::new(None); fn set_thread_state_handler(action: Arc, owner: usize) { - unsafe { - if let Ok(mut value) = GLOBAL_TH_STATE_HANDLER.lock() { - match value.as_mut() { - Some(handler) => { - (*handler).add(action, owner); - } - None => { - let mut owners = Vec::new(); - owners.push(owner); - *value = Some(OwnedThreadStateHandler { owners, action }); - } - } + let mut value = GLOBAL_TH_STATE_HANDLER.lock().unwrap(); + match value.as_mut() { + Some(handler) => { + (*handler).add(action, owner); + } + None => { + let mut owners = Vec::new(); + owners.push(owner); + *value = Some(OwnedThreadStateHandler { owners, action }); } } } fn reset_thread_state_handler(owner: usize) { - unsafe { - if let Ok(mut value) = GLOBAL_TH_STATE_HANDLER.lock() { - match value.as_mut() { - Some(handler) => { - (*handler).remove(owner); - } - None => { - panic!("No action handler found for: {}, likely LongRunningTaskDetector not started", owner); - } - } + let mut value = GLOBAL_TH_STATE_HANDLER.lock().unwrap(); + let h = match value.as_mut() { + Some(handler) => { + (*handler).remove(owner); + handler + } + None => { + panic!("No action handler found for: {}, likely LongRunningTaskDetector not started", owner) } + }; + if h.owners.is_empty() { + *value = None } } fn get_thread_state_handler() -> Arc { - unsafe { - let option = GLOBAL_TH_STATE_HANDLER.lock().unwrap(); - match option.as_ref() { - Some(value) => value.action.clone(), - None => Arc::new(StdErrThreadStateHandler), - } + let option = GLOBAL_TH_STATE_HANDLER.lock().unwrap(); + match option.as_ref() { + Some(value) => value.action.clone(), + None => Arc::new(StdErrThreadStateHandler), } } @@ -455,4 +451,11 @@ impl LongRunningTaskDetector { reset_thread_state_handler(self.identity); } } + +} + +impl Drop for LongRunningTaskDetector { + fn drop(&mut self) { + self.stop(); + } } diff --git a/tokio-util/tests/lrtd.rs b/tokio-util/tests/lrtd.rs index 5c46de98397..c7372e810f6 100644 --- a/tokio-util/tests/lrtd.rs +++ b/tokio-util/tests/lrtd.rs @@ -1,3 +1,4 @@ +#![cfg(all(feature = "lrtd"))] use nix::sys::signal::Signal; use std::backtrace::Backtrace; use std::collections::HashSet; @@ -12,8 +13,11 @@ async fn run_blocking_stuff() { println!("slow done"); } +static GLOBAL_MUTEX: Mutex<()> = Mutex::new(()); + #[test] fn test_blocking_detection_multi() { + let _guard = GLOBAL_MUTEX.lock().unwrap(); let mut builder = tokio::runtime::Builder::new_multi_thread(); let mutable_builder = builder.worker_threads(2); let lrtd = LongRunningTaskDetector::new( @@ -37,6 +41,7 @@ fn test_blocking_detection_multi() { #[test] fn test_blocking_detection_current() { + let _guard = GLOBAL_MUTEX.lock().unwrap(); let mut builder = tokio::runtime::Builder::new_current_thread(); let mutable_builder = builder.enable_all(); let lrtd = LongRunningTaskDetector::new( @@ -131,6 +136,7 @@ impl ThreadStateHandler for CaptureThreadStateHandler { #[test] fn test_blocking_detection_multi_capture() { + let _guard = GLOBAL_MUTEX.lock().unwrap(); let mut builder = tokio::runtime::Builder::new_multi_thread(); let mutable_builder = builder.worker_threads(2); let lrtd = LongRunningTaskDetector::new( From 6e77b5eec2d2b1305c6f5c66ca1f6863d7053362 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sat, 30 Dec 2023 16:41:32 +0100 Subject: [PATCH 10/23] FIX formatting --- tokio-util/src/cfg.rs | 2 +- tokio-util/src/lrtd/lrtd.rs | 30 +++++++++++++++++------------- tokio-util/src/lrtd/mod.rs | 2 +- tokio-util/tests/lrtd.rs | 34 +++++++++++++--------------------- 4 files changed, 32 insertions(+), 36 deletions(-) diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs index 8b24feeb866..250c52e121a 100644 --- a/tokio-util/src/cfg.rs +++ b/tokio-util/src/cfg.rs @@ -78,4 +78,4 @@ macro_rules! cfg_lrtd { $item )* } -} \ No newline at end of file +} diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs index 35fcdb5710f..762e57af890 100644 --- a/tokio-util/src/lrtd/lrtd.rs +++ b/tokio-util/src/lrtd/lrtd.rs @@ -44,10 +44,13 @@ impl OwnedThreadStateHandler { } for x_owner in &(self.owners) { if *x_owner == owner { - panic!("Cannot set thread state handler twice by the same owner: {}", owner); + panic!( + "Cannot set thread state handler twice by the same owner: {}", + owner + ); } } - self.owners.push(owner); + self.owners.push(owner); } fn remove(&mut self, owner: usize) { @@ -86,7 +89,10 @@ fn reset_thread_state_handler(owner: usize) { handler } None => { - panic!("No action handler found for: {}, likely LongRunningTaskDetector not started", owner) + panic!( + "No action handler found for: {}, likely LongRunningTaskDetector not started", + owner + ) } }; if h.owners.is_empty() { @@ -147,9 +153,9 @@ pub trait BlockingActionHandler: Send + Sync { /// * `targets` - The list of thread IDs of the tokio runtime worker threads. /// # Returns /// /// # Returns - /// + /// /// Returns `true` if the signaling of the threads is to be executed, false otherwise. - /// + /// fn blocking_detected(&self, signal: Signal, targets: &Vec) -> bool; } @@ -160,8 +166,7 @@ impl BlockingActionHandler for StdErrBlockingActionHandler { fn blocking_detected(&self, signal: Signal, targets: &Vec) -> bool { eprintln!( "Detected worker blocking, signaling {} worker threads: {:?}", - signal, - targets + signal, targets ); true } @@ -302,7 +307,7 @@ fn probe( /// Utility to help with "really nice to add a warning for tasks that might be blocking" /// Example use: -/// +/// /// let mut builder = tokio::runtime::Builder::new_multi_thread(); /// let mutable_builder = builder.worker_threads(2); /// let lrtd = LongRunningTaskDetector::new( @@ -319,7 +324,7 @@ fn probe( /// ... /// }); /// lrtd.stop() -/// +/// /// The above will allow you to get details on what is blocking your tokio worker threads for longer that 100ms. /// The detail will look like: /// @@ -349,14 +354,14 @@ fn probe( /// 12: tokio::loom::std::unsafe_cell::UnsafeCell::with_mut /// at /Users/zoly/NetBeansProjects/tokio/tokio/src/loom/std/unsafe_cell.rs:16:9 /// 13: tokio::runtime::task::core::Core::poll -/// +/// impl LongRunningTaskDetector { /// Creates a new `LongRunningTaskDetector` instance. /// /// # Arguments /// /// * `interval` - The interval between probes. This interval is randomized. - /// * `detection_time` - The maximum time allowed for a probe to succeed. + /// * `detection_time` - The maximum time allowed for a probe to succeed. /// A probe running for longer indicates something is blocking the worker threads. /// * `signal` - The signal to use for signaling worker threads when blocking is detected. /// * `runtime_builder` - A mutable reference to a `tokio::runtime::Builder`. @@ -446,12 +451,11 @@ impl LongRunningTaskDetector { /// Stops the monitoring thread. Does nothing if LRTD is already stopped. pub fn stop(&self) { let mut sf = self.stop_flag.lock().unwrap(); - if *sf != true { + if *sf != true { *sf = true; reset_thread_state_handler(self.identity); } } - } impl Drop for LongRunningTaskDetector { diff --git a/tokio-util/src/lrtd/mod.rs b/tokio-util/src/lrtd/mod.rs index 2975b36cda4..67d1b463f1e 100644 --- a/tokio-util/src/lrtd/mod.rs +++ b/tokio-util/src/lrtd/mod.rs @@ -1,6 +1,6 @@ //! Utility to help with "really nice to add a warning for tasks that might be blocking" mod lrtd; +pub use self::lrtd::BlockingActionHandler; pub use self::lrtd::LongRunningTaskDetector; pub use self::lrtd::ThreadStateHandler; -pub use self::lrtd::BlockingActionHandler; \ No newline at end of file diff --git a/tokio-util/tests/lrtd.rs b/tokio-util/tests/lrtd.rs index c7372e810f6..180289cafcd 100644 --- a/tokio-util/tests/lrtd.rs +++ b/tokio-util/tests/lrtd.rs @@ -5,11 +5,11 @@ use std::collections::HashSet; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; -use tokio_util::lrtd::{LongRunningTaskDetector, BlockingActionHandler, ThreadStateHandler}; +use tokio_util::lrtd::{BlockingActionHandler, LongRunningTaskDetector, ThreadStateHandler}; async fn run_blocking_stuff() { println!("slow start"); - thread::sleep(Duration::from_secs(2)); + thread::sleep(Duration::from_secs(1)); println!("slow done"); } @@ -34,14 +34,14 @@ fn test_blocking_detection_multi() { arc_runtime2.spawn(run_blocking_stuff()); arc_runtime2.block_on(async { tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - println!("Hello world"); + println!("Done"); }); lrtd.stop() } #[test] fn test_blocking_detection_current() { - let _guard = GLOBAL_MUTEX.lock().unwrap(); + let _guard = GLOBAL_MUTEX.lock().unwrap(); let mut builder = tokio::runtime::Builder::new_current_thread(); let mutable_builder = builder.enable_all(); let lrtd = LongRunningTaskDetector::new( @@ -56,17 +56,14 @@ fn test_blocking_detection_current() { lrtd.start(arc_runtime); arc_runtime2.block_on(async { run_blocking_stuff().await; - println!("Sleeping"); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - println!("Hello world"); + println!("Done"); }); lrtd.stop() } - - struct CaptureBlockingActionHandler { - inner: Mutex> + inner: Mutex>, } impl CaptureBlockingActionHandler { @@ -79,7 +76,7 @@ impl CaptureBlockingActionHandler { fn get_all(&self) -> Vec { let set = self.inner.lock().unwrap(); set.iter().cloned().collect() - } + } } impl BlockingActionHandler for CaptureBlockingActionHandler { @@ -91,7 +88,7 @@ impl BlockingActionHandler for CaptureBlockingActionHandler { } struct CaptureThreadStateHandler { - inner: Mutex> + inner: Mutex>, } impl CaptureThreadStateHandler { @@ -110,15 +107,13 @@ impl CaptureThreadStateHandler { // Iterate over the frames in the backtrace let traces = self.get_all(); if traces.is_empty() { - return false + return false; } let bt_str = traces.get(0).unwrap(); bt_str.contains(symbol_name) - } - + } } - impl ThreadStateHandler for CaptureThreadStateHandler { fn blocking_thread_details( &self, @@ -127,16 +122,13 @@ impl ThreadStateHandler for CaptureThreadStateHandler { backtrace: Backtrace, ) { let mut vec = self.inner.lock().unwrap(); - vec.push( format!("{}", backtrace)); + vec.push(format!("{}", backtrace)); } } - - - #[test] fn test_blocking_detection_multi_capture() { - let _guard = GLOBAL_MUTEX.lock().unwrap(); + let _guard = GLOBAL_MUTEX.lock().unwrap(); let mut builder = tokio::runtime::Builder::new_multi_thread(); let mutable_builder = builder.worker_threads(2); let lrtd = LongRunningTaskDetector::new( @@ -175,4 +167,4 @@ fn test_blocking_detection_stop_unstarted() { mutable_builder, ); lrtd.stop() -} \ No newline at end of file +} From df172e852ddf91d122e1617d422eebc102aa17c9 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sat, 30 Dec 2023 16:44:57 +0100 Subject: [PATCH 11/23] Cleanup unnecessary drop. --- tokio-util/tests/lrtd.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tokio-util/tests/lrtd.rs b/tokio-util/tests/lrtd.rs index 180289cafcd..e6604bed045 100644 --- a/tokio-util/tests/lrtd.rs +++ b/tokio-util/tests/lrtd.rs @@ -36,7 +36,6 @@ fn test_blocking_detection_multi() { tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; println!("Done"); }); - lrtd.stop() } #[test] @@ -59,7 +58,6 @@ fn test_blocking_detection_current() { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; println!("Done"); }); - lrtd.stop() } struct CaptureBlockingActionHandler { @@ -160,11 +158,10 @@ fn test_blocking_detection_multi_capture() { fn test_blocking_detection_stop_unstarted() { let mut builder = tokio::runtime::Builder::new_multi_thread(); let mutable_builder = builder.worker_threads(2); - let lrtd = LongRunningTaskDetector::new( + let _lrtd = LongRunningTaskDetector::new( Duration::from_millis(10), Duration::from_millis(100), Signal::SIGUSR1, mutable_builder, ); - lrtd.stop() } From 86bbf4c7077f439fa9dc415bb18dcc068902ee7d Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sat, 30 Dec 2023 17:10:24 +0100 Subject: [PATCH 12/23] [fix] doctest --- tokio-util/src/lrtd/lrtd.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs index 762e57af890..97453676404 100644 --- a/tokio-util/src/lrtd/lrtd.rs +++ b/tokio-util/src/lrtd/lrtd.rs @@ -307,27 +307,32 @@ fn probe( /// Utility to help with "really nice to add a warning for tasks that might be blocking" /// Example use: -/// +/// ``` +/// use std::sync::Arc; +/// use tokio_util::lrtd::LongRunningTaskDetector; +/// /// let mut builder = tokio::runtime::Builder::new_multi_thread(); /// let mutable_builder = builder.worker_threads(2); /// let lrtd = LongRunningTaskDetector::new( -/// Duration::from_millis(10), -/// Duration::from_millis(100), -/// Signal::SIGUSR1, +/// std::time::Duration::from_millis(10), +/// std::time::Duration::from_millis(100), +/// nix::sys::signal::Signal::SIGUSR1, /// mutable_builder, /// ); /// let runtime = builder.enable_all().build().unwrap(); /// let arc_runtime = Arc::new(runtime); /// let arc_runtime2 = arc_runtime.clone(); -/// lrtd.start(arc_runtime.clone()); +/// lrtd.start(arc_runtime); /// arc_runtime2.block_on(async { -/// ... +/// print!("my async code") /// }); -/// lrtd.stop() +/// +/// ``` /// /// The above will allow you to get details on what is blocking your tokio worker threads for longer that 100ms. /// The detail will look like: /// +/// ```text /// Detected worker blocking, signaling SIGUSR1 worker threads: [123145318232064, 123145320341504] /// Stack trace for thread "tokio-runtime-worker":123145318232064 /// 0: std::backtrace_rs::backtrace::libunwind::trace @@ -354,6 +359,7 @@ fn probe( /// 12: tokio::loom::std::unsafe_cell::UnsafeCell::with_mut /// at /Users/zoly/NetBeansProjects/tokio/tokio/src/loom/std/unsafe_cell.rs:16:9 /// 13: tokio::runtime::task::core::Core::poll +/// ``` /// impl LongRunningTaskDetector { /// Creates a new `LongRunningTaskDetector` instance. From f41f470f3fa750b8476f827d6b3727a1e05268af Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sat, 30 Dec 2023 19:25:40 +0100 Subject: [PATCH 13/23] Fix formatting issue --- tokio-util/src/lrtd/lrtd.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs index 97453676404..8b6972ee895 100644 --- a/tokio-util/src/lrtd/lrtd.rs +++ b/tokio-util/src/lrtd/lrtd.rs @@ -310,7 +310,7 @@ fn probe( /// ``` /// use std::sync::Arc; /// use tokio_util::lrtd::LongRunningTaskDetector; -/// +/// /// let mut builder = tokio::runtime::Builder::new_multi_thread(); /// let mutable_builder = builder.worker_threads(2); /// let lrtd = LongRunningTaskDetector::new( @@ -326,7 +326,7 @@ fn probe( /// arc_runtime2.block_on(async { /// print!("my async code") /// }); -/// +/// /// ``` /// /// The above will allow you to get details on what is blocking your tokio worker threads for longer that 100ms. From e978a2cb1eb1ea0a4719e783aa91e1eeb38b78b6 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sun, 31 Dec 2023 05:05:21 -0500 Subject: [PATCH 14/23] remove nix dependency --- tokio-util/Cargo.toml | 1 - tokio-util/src/lrtd/lrtd.rs | 44 ++++++++++--------------------------- tokio-util/tests/lrtd.rs | 11 +++++----- 3 files changed, 17 insertions(+), 39 deletions(-) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index e609b2d6ff2..e52babe049e 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -44,7 +44,6 @@ futures-util = { version = "0.3.0", optional = true } pin-project-lite = "0.2.11" slab = { version = "0.4.4", optional = true } # Backs `DelayQueue` tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } -nix = "0.21" rand = "0.8" libc = "0.2" diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs index 8b6972ee895..aca9fdd9a4f 100644 --- a/tokio-util/src/lrtd/lrtd.rs +++ b/tokio-util/src/lrtd/lrtd.rs @@ -1,6 +1,5 @@ /// Utility to help with "really nice to add a warning for tasks that might be blocking" use libc; -use nix::sys::signal::{self, SaFlags, SigAction, SigHandler, SigSet, Signal}; use rand::Rng; use std::backtrace::Backtrace; use std::collections::HashSet; @@ -156,14 +155,14 @@ pub trait BlockingActionHandler: Send + Sync { /// /// Returns `true` if the signaling of the threads is to be executed, false otherwise. /// - fn blocking_detected(&self, signal: Signal, targets: &Vec) -> bool; + fn blocking_detected(&self, signal: libc::c_int, targets: &Vec) -> bool; } struct StdErrBlockingActionHandler; /// BlockingActionHandler implementation that writes blocker details to standard error. impl BlockingActionHandler for StdErrBlockingActionHandler { - fn blocking_detected(&self, signal: Signal, targets: &Vec) -> bool { + fn blocking_detected(&self, signal: libc::c_int, targets: &Vec) -> bool { eprintln!( "Detected worker blocking, signaling {} worker threads: {:?}", signal, targets @@ -228,38 +227,19 @@ extern "C" fn signal_handler(_: i32) { static INIT_SIGNAL_HANDLER: Once = Once::new(); -fn install_thread_stack_stace_handler(signal: Signal) { +fn install_thread_stack_stace_handler(signal: libc::c_int) { INIT_SIGNAL_HANDLER.call_once(|| _install_thread_stack_stace_handler(signal)) } -fn _install_thread_stack_stace_handler(signal: Signal) { - let mut sigset = SigSet::empty(); - sigset.add(signal); - - // Set up a signal action - let sa = SigAction::new( - SigHandler::Handler(signal_handler), - SaFlags::empty(), - sigset, - ); - - // Register the signal action for process +fn _install_thread_stack_stace_handler(signal: libc::c_int) { unsafe { - signal::sigaction(signal, &sa).expect("Failed to register signal handler"); + libc::signal(signal, signal_handler as libc::sighandler_t); } } -fn signal_all_threads(signal: Signal, targets: Vec) { +fn signal_all_threads(signal: libc::c_int, targets: Vec) { for thread_id in &targets { - let result = unsafe { - libc::pthread_kill( - *thread_id, - match signal.into() { - Some(s) => s as libc::c_int, - None => 0, - }, - ) - }; + let result = unsafe { libc::pthread_kill(*thread_id, signal) }; if result != 0 { eprintln!("Error sending signal: {:?}", result); } @@ -273,7 +253,7 @@ pub struct LongRunningTaskDetector { detection_time: Duration, stop_flag: Arc>, workers: Arc, - signal: Signal, + signal: libc::c_int, identity: usize, } @@ -285,7 +265,7 @@ async fn do_nothing(tx: mpsc::Sender<()>) { fn probe( tokio_runtime: &Arc, detection_time: Duration, - signal: Signal, + signal: libc::c_int, workers: &Arc, action: &Arc, ) { @@ -316,7 +296,7 @@ fn probe( /// let lrtd = LongRunningTaskDetector::new( /// std::time::Duration::from_millis(10), /// std::time::Duration::from_millis(100), -/// nix::sys::signal::Signal::SIGUSR1, +/// libc::SIGUSR1, /// mutable_builder, /// ); /// let runtime = builder.enable_all().build().unwrap(); @@ -378,7 +358,7 @@ impl LongRunningTaskDetector { pub fn new( interval: Duration, detection_time: Duration, - signal: Signal, + signal: libc::c_int, runtime_builder: &mut Builder, ) -> Self { install_thread_stack_stace_handler(signal); @@ -441,7 +421,7 @@ impl LongRunningTaskDetector { let stop_flag = Arc::clone(&self.stop_flag); let detection_time = self.detection_time.clone(); let interval = self.interval.clone(); - let signal = self.signal.clone(); + let signal = self.signal; let workers = Arc::clone(&self.workers); thread::spawn(move || { let mut rng = rand::thread_rng(); diff --git a/tokio-util/tests/lrtd.rs b/tokio-util/tests/lrtd.rs index e6604bed045..2d4c50b4b21 100644 --- a/tokio-util/tests/lrtd.rs +++ b/tokio-util/tests/lrtd.rs @@ -1,5 +1,4 @@ #![cfg(all(feature = "lrtd"))] -use nix::sys::signal::Signal; use std::backtrace::Backtrace; use std::collections::HashSet; use std::sync::{Arc, Mutex}; @@ -23,7 +22,7 @@ fn test_blocking_detection_multi() { let lrtd = LongRunningTaskDetector::new( Duration::from_millis(10), Duration::from_millis(100), - Signal::SIGUSR1, + libc::SIGUSR1, mutable_builder, ); let runtime = builder.enable_all().build().unwrap(); @@ -46,7 +45,7 @@ fn test_blocking_detection_current() { let lrtd = LongRunningTaskDetector::new( Duration::from_millis(10), Duration::from_millis(100), - Signal::SIGUSR1, + libc::SIGUSR1, mutable_builder, ); let runtime = mutable_builder.build().unwrap(); @@ -78,7 +77,7 @@ impl CaptureBlockingActionHandler { } impl BlockingActionHandler for CaptureBlockingActionHandler { - fn blocking_detected(&self, _signal: Signal, targets: &Vec) -> bool { + fn blocking_detected(&self, _signal: libc::c_int, targets: &Vec) -> bool { let mut set = self.inner.lock().unwrap(); set.extend(targets); true @@ -132,7 +131,7 @@ fn test_blocking_detection_multi_capture() { let lrtd = LongRunningTaskDetector::new( Duration::from_millis(10), Duration::from_millis(100), - Signal::SIGUSR1, + libc::SIGUSR1, mutable_builder, ); let runtime = builder.enable_all().build().unwrap(); @@ -161,7 +160,7 @@ fn test_blocking_detection_stop_unstarted() { let _lrtd = LongRunningTaskDetector::new( Duration::from_millis(10), Duration::from_millis(100), - Signal::SIGUSR1, + libc::SIGUSR1, mutable_builder, ); } From ab43c31cf34cfa5287df80582425fbcf6fe74e47 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sun, 31 Dec 2023 07:57:27 -0500 Subject: [PATCH 15/23] Separate out blocking detection from thread state dump. --- tokio-util/src/lrtd/lrtd.rs | 240 ++---------------------------------- tokio-util/src/lrtd/mod.rs | 1 - tokio-util/tests/lrtd.rs | 168 ++++++++++++++----------- 3 files changed, 110 insertions(+), 299 deletions(-) diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio-util/src/lrtd/lrtd.rs index aca9fdd9a4f..aa007b39c86 100644 --- a/tokio-util/src/lrtd/lrtd.rs +++ b/tokio-util/src/lrtd/lrtd.rs @@ -1,112 +1,13 @@ /// Utility to help with "really nice to add a warning for tasks that might be blocking" use libc; use rand::Rng; -use std::backtrace::Backtrace; use std::collections::HashSet; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc; -use std::sync::{Arc, Mutex, Once}; +use std::sync::{Arc, Mutex}; use std::time::Duration; use std::{env, thread}; use tokio::runtime::{Builder, Runtime}; -static mut GLOBAL_COUNTER: Option = None; -static INIT: Once = Once::new(); - -fn initialize_global_counter() { - INIT.call_once(|| unsafe { - GLOBAL_COUNTER = Some(AtomicUsize::new(0)); - }); -} - -/// The purpose of this method is to generate unique IDs for LRTD instances. -fn get_next_global_value() -> usize { - initialize_global_counter(); - - unsafe { - GLOBAL_COUNTER - .as_ref() - .expect("Global counter not initialized") - .fetch_add(1, Ordering::Relaxed) - } -} - -struct OwnedThreadStateHandler { - owners: Vec, - action: Arc, -} - -impl OwnedThreadStateHandler { - fn add(&mut self, action: Arc, owner: usize) { - if !Arc::ptr_eq(&action, &(self.action)) { - panic!("Cannot overwrite action with something different!"); - } - for x_owner in &(self.owners) { - if *x_owner == owner { - panic!( - "Cannot set thread state handler twice by the same owner: {}", - owner - ); - } - } - self.owners.push(owner); - } - - fn remove(&mut self, owner: usize) { - if let Some(index) = self.owners.iter().position(|&x| x == owner) { - let removed_value = self.owners.remove(index); - if removed_value != owner { - panic!("Wrong value, whould have been {}", owner); - } - } else { - panic!("Cannot find owner: {}", owner) - } - } -} - -static GLOBAL_TH_STATE_HANDLER: Mutex> = Mutex::new(None); - -fn set_thread_state_handler(action: Arc, owner: usize) { - let mut value = GLOBAL_TH_STATE_HANDLER.lock().unwrap(); - match value.as_mut() { - Some(handler) => { - (*handler).add(action, owner); - } - None => { - let mut owners = Vec::new(); - owners.push(owner); - *value = Some(OwnedThreadStateHandler { owners, action }); - } - } -} - -fn reset_thread_state_handler(owner: usize) { - let mut value = GLOBAL_TH_STATE_HANDLER.lock().unwrap(); - let h = match value.as_mut() { - Some(handler) => { - (*handler).remove(owner); - handler - } - None => { - panic!( - "No action handler found for: {}, likely LongRunningTaskDetector not started", - owner - ) - } - }; - if h.owners.is_empty() { - *value = None - } -} - -fn get_thread_state_handler() -> Arc { - let option = GLOBAL_TH_STATE_HANDLER.lock().unwrap(); - match option.as_ref() { - Some(value) => value.action.clone(), - None => Arc::new(StdErrThreadStateHandler), - } -} - const PANIC_WORKER_BLOCK_DURATION_DEFAULT: Duration = Duration::from_secs(60); fn get_panic_worker_block_duration() -> Duration { @@ -121,25 +22,6 @@ fn get_thread_id() -> libc::pthread_t { unsafe { libc::pthread_self() } } -/// A trait for handling thread state details. -/// -/// This trait provides a method for capturing details of a blocking thread. -pub trait ThreadStateHandler: Send + Sync { - /// Invoked with details of a blocking thread. - /// - /// # Arguments - /// - /// * `thread_id` - The ID of the blocking thread. - /// * `thread_name` - An optional name of the blocking thread. - /// * `backtrace` - The backtrace of the blocking thread. - fn blocking_thread_details( - &self, - thread_id: libc::pthread_t, - thread_name: Option<&str>, - backtrace: Backtrace, - ); -} - /// A trait for handling actions when blocking is detected. /// /// This trait provides a method for handling the detection of a blocking action. @@ -148,43 +30,17 @@ pub trait BlockingActionHandler: Send + Sync { /// /// # Arguments /// - /// * `signal` - The signal used to signal tokio worker threads for state details. - /// * `targets` - The list of thread IDs of the tokio runtime worker threads. /// # Returns - /// - /// # Returns - /// - /// Returns `true` if the signaling of the threads is to be executed, false otherwise. + /// * `workers` - The list of thread IDs of the tokio runtime worker threads. /// # Returns /// - fn blocking_detected(&self, signal: libc::c_int, targets: &Vec) -> bool; + fn blocking_detected(&self, workers: &Vec); } struct StdErrBlockingActionHandler; /// BlockingActionHandler implementation that writes blocker details to standard error. impl BlockingActionHandler for StdErrBlockingActionHandler { - fn blocking_detected(&self, signal: libc::c_int, targets: &Vec) -> bool { - eprintln!( - "Detected worker blocking, signaling {} worker threads: {:?}", - signal, targets - ); - true - } -} - -struct StdErrThreadStateHandler; - -/// ThreadStateHandler implementation that writes details to standard error. -impl ThreadStateHandler for StdErrThreadStateHandler { - fn blocking_thread_details( - &self, - thread_id: libc::pthread_t, - thread_name: Option<&str>, - backtrace: Backtrace, - ) { - let name = thread_name - .map(|n| format!(" for thread \"{}\"", n)) - .unwrap_or("".to_owned()); - eprintln!("Stack trace{}:{}\n{}", name, thread_id, backtrace); + fn blocking_detected(&self, workers: &Vec) { + eprintln!("Detected blocking in worker threads: {:?}", workers); } } @@ -216,36 +72,6 @@ impl WorkerSet { } } -extern "C" fn signal_handler(_: i32) { - let backtrace = Backtrace::force_capture(); - get_thread_state_handler().blocking_thread_details( - get_thread_id(), - thread::current().name(), - backtrace, - ); -} - -static INIT_SIGNAL_HANDLER: Once = Once::new(); - -fn install_thread_stack_stace_handler(signal: libc::c_int) { - INIT_SIGNAL_HANDLER.call_once(|| _install_thread_stack_stace_handler(signal)) -} - -fn _install_thread_stack_stace_handler(signal: libc::c_int) { - unsafe { - libc::signal(signal, signal_handler as libc::sighandler_t); - } -} - -fn signal_all_threads(signal: libc::c_int, targets: Vec) { - for thread_id in &targets { - let result = unsafe { libc::pthread_kill(*thread_id, signal) }; - if result != 0 { - eprintln!("Error sending signal: {:?}", result); - } - } -} - /// Utility to help with "really nice to add a warning for tasks that might be blocking" #[derive(Debug)] pub struct LongRunningTaskDetector { @@ -253,8 +79,6 @@ pub struct LongRunningTaskDetector { detection_time: Duration, stop_flag: Arc>, workers: Arc, - signal: libc::c_int, - identity: usize, } async fn do_nothing(tx: mpsc::Sender<()>) { @@ -265,7 +89,6 @@ async fn do_nothing(tx: mpsc::Sender<()>) { fn probe( tokio_runtime: &Arc, detection_time: Duration, - signal: libc::c_int, workers: &Arc, action: &Arc, ) { @@ -277,10 +100,7 @@ fn probe( }; if !is_probe_success { let targets = workers.get_all(); - if action.blocking_detected(signal, &targets) { - signal_all_threads(signal, targets); - // Wait for our probe to eventually finish, we do not want to have multiple probes running at the same time. - } + action.blocking_detected(&targets); let _ = rx.recv_timeout(get_panic_worker_block_duration()).unwrap(); } } @@ -296,7 +116,6 @@ fn probe( /// let lrtd = LongRunningTaskDetector::new( /// std::time::Duration::from_millis(10), /// std::time::Duration::from_millis(100), -/// libc::SIGUSR1, /// mutable_builder, /// ); /// let runtime = builder.enable_all().build().unwrap(); @@ -313,33 +132,10 @@ fn probe( /// The detail will look like: /// /// ```text -/// Detected worker blocking, signaling SIGUSR1 worker threads: [123145318232064, 123145320341504] -/// Stack trace for thread "tokio-runtime-worker":123145318232064 -/// 0: std::backtrace_rs::backtrace::libunwind::trace -/// at /rustc/a28077b28a02b92985b3a3faecf92813155f1ea1/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93:5 -/// 1: std::backtrace_rs::backtrace::trace_unsynchronized -/// at /rustc/a28077b28a02b92985b3a3faecf92813155f1ea1/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5 -/// 2: std::backtrace::Backtrace::create -/// at /rustc/a28077b28a02b92985b3a3faecf92813155f1ea1/library/std/src/backtrace.rs:331:13 -/// 3: std::backtrace::Backtrace::force_capture -/// at /rustc/a28077b28a02b92985b3a3faecf92813155f1ea1/library/std/src/backtrace.rs:313:9 -/// 4: tokio_util::lrtd::lrtd::signal_handler -/// at ./src/lrtd/lrtd.rs:217:21 -/// 5: __sigtramp -/// 6: ___semwait_signal -/// 7: -/// 8: std::sys::unix::thread::Thread::sleep -/// at /rustc/a28077b28a02b92985b3a3faecf92813155f1ea1/library/std/src/sys/unix/thread.rs:241:20 -/// 9: std::thread::sleep -/// at /rustc/a28077b28a02b92985b3a3faecf92813155f1ea1/library/std/src/thread/mod.rs:872:5 -/// 10: lrtd::run_blocking_stuff::{{closure}} -/// at ./tests/lrtd.rs:11:5 -/// 11: tokio::runtime::task::core::Core::poll::{{closure}} -/// at /Users/zoly/NetBeansProjects/tokio/tokio/src/runtime/task/core.rs:328:17 -/// 12: tokio::loom::std::unsafe_cell::UnsafeCell::with_mut -/// at /Users/zoly/NetBeansProjects/tokio/tokio/src/loom/std/unsafe_cell.rs:16:9 -/// 13: tokio::runtime::task::core::Core::poll +/// Detected blocking in worker threads: [123145318232064, 123145320341504] /// ``` +/// +/// To get more details(like stack traces) start LongRunningTaskDetector with start_with_custom_action and provide a custom handler. /// impl LongRunningTaskDetector { /// Creates a new `LongRunningTaskDetector` instance. @@ -349,7 +145,6 @@ impl LongRunningTaskDetector { /// * `interval` - The interval between probes. This interval is randomized. /// * `detection_time` - The maximum time allowed for a probe to succeed. /// A probe running for longer indicates something is blocking the worker threads. - /// * `signal` - The signal to use for signaling worker threads when blocking is detected. /// * `runtime_builder` - A mutable reference to a `tokio::runtime::Builder`. /// /// # Returns @@ -358,10 +153,8 @@ impl LongRunningTaskDetector { pub fn new( interval: Duration, detection_time: Duration, - signal: libc::c_int, runtime_builder: &mut Builder, ) -> Self { - install_thread_stack_stace_handler(signal); let workers = Arc::new(WorkerSet::new()); if runtime_builder.is_current_threaded() { workers.add(get_thread_id()); @@ -378,14 +171,11 @@ impl LongRunningTaskDetector { workers_clone2.remove(pid); }); } - let identity = get_next_global_value(); LongRunningTaskDetector { interval, detection_time, stop_flag: Arc::new(Mutex::new(true)), workers, - signal, - identity, } } @@ -395,11 +185,7 @@ impl LongRunningTaskDetector { /// /// * `runtime` - An `Arc` reference to a `tokio::runtime::Runtime`. pub fn start(&self, runtime: Arc) { - self.start_with_custom_action( - runtime, - Arc::new(StdErrBlockingActionHandler), - Arc::new(StdErrThreadStateHandler), - ) + self.start_with_custom_action(runtime, Arc::new(StdErrBlockingActionHandler)) } /// Starts the monitoring process with custom action handlers that @@ -414,19 +200,16 @@ impl LongRunningTaskDetector { &self, runtime: Arc, action: Arc, - thread_action: Arc, ) { - set_thread_state_handler(thread_action, self.identity); *self.stop_flag.lock().unwrap() = false; let stop_flag = Arc::clone(&self.stop_flag); let detection_time = self.detection_time.clone(); let interval = self.interval.clone(); - let signal = self.signal; let workers = Arc::clone(&self.workers); thread::spawn(move || { let mut rng = rand::thread_rng(); while !*stop_flag.lock().unwrap() { - probe(&runtime, detection_time, signal, &workers, &action); + probe(&runtime, detection_time, &workers, &action); thread::sleep(Duration::from_micros( rng.gen_range(1..=interval.as_micros()).try_into().unwrap(), )); @@ -439,7 +222,6 @@ impl LongRunningTaskDetector { let mut sf = self.stop_flag.lock().unwrap(); if *sf != true { *sf = true; - reset_thread_state_handler(self.identity); } } } diff --git a/tokio-util/src/lrtd/mod.rs b/tokio-util/src/lrtd/mod.rs index 67d1b463f1e..caee6bd73b1 100644 --- a/tokio-util/src/lrtd/mod.rs +++ b/tokio-util/src/lrtd/mod.rs @@ -3,4 +3,3 @@ mod lrtd; pub use self::lrtd::BlockingActionHandler; pub use self::lrtd::LongRunningTaskDetector; -pub use self::lrtd::ThreadStateHandler; diff --git a/tokio-util/tests/lrtd.rs b/tokio-util/tests/lrtd.rs index 2d4c50b4b21..2d7eaff3b38 100644 --- a/tokio-util/tests/lrtd.rs +++ b/tokio-util/tests/lrtd.rs @@ -1,10 +1,11 @@ #![cfg(all(feature = "lrtd"))] use std::backtrace::Backtrace; -use std::collections::HashSet; +use std::collections::HashMap; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; -use std::time::Duration; -use tokio_util::lrtd::{BlockingActionHandler, LongRunningTaskDetector, ThreadStateHandler}; +use std::time::{Duration, Instant}; +use tokio_util::lrtd::{BlockingActionHandler, LongRunningTaskDetector}; async fn run_blocking_stuff() { println!("slow start"); @@ -12,17 +13,13 @@ async fn run_blocking_stuff() { println!("slow done"); } -static GLOBAL_MUTEX: Mutex<()> = Mutex::new(()); - #[test] fn test_blocking_detection_multi() { - let _guard = GLOBAL_MUTEX.lock().unwrap(); let mut builder = tokio::runtime::Builder::new_multi_thread(); let mutable_builder = builder.worker_threads(2); let lrtd = LongRunningTaskDetector::new( Duration::from_millis(10), Duration::from_millis(100), - libc::SIGUSR1, mutable_builder, ); let runtime = builder.enable_all().build().unwrap(); @@ -39,13 +36,11 @@ fn test_blocking_detection_multi() { #[test] fn test_blocking_detection_current() { - let _guard = GLOBAL_MUTEX.lock().unwrap(); let mut builder = tokio::runtime::Builder::new_current_thread(); let mutable_builder = builder.enable_all(); let lrtd = LongRunningTaskDetector::new( Duration::from_millis(10), Duration::from_millis(100), - libc::SIGUSR1, mutable_builder, ); let runtime = mutable_builder.build().unwrap(); @@ -59,108 +54,143 @@ fn test_blocking_detection_current() { }); } -struct CaptureBlockingActionHandler { - inner: Mutex>, +#[test] +fn test_blocking_detection_stop_unstarted() { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + let mutable_builder = builder.worker_threads(2); + let _lrtd = LongRunningTaskDetector::new( + Duration::from_millis(10), + Duration::from_millis(100), + mutable_builder, + ); } -impl CaptureBlockingActionHandler { - fn new() -> Self { - CaptureBlockingActionHandler { - inner: Mutex::new(HashSet::new()), - } - } +fn get_thread_id() -> libc::pthread_t { + unsafe { libc::pthread_self() } +} + +static SIGNAL_COUNTER: AtomicUsize = AtomicUsize::new(0); - fn get_all(&self) -> Vec { - let set = self.inner.lock().unwrap(); - set.iter().cloned().collect() +static THREAD_DUMPS: Mutex>> = Mutex::new(None); + +extern "C" fn signal_handler(_: i32) { + // not signal safe, this needs to be rewritten to avoid mem allocations and use a pre-allocated buffer. + let backtrace = Backtrace::force_capture(); + let name = thread::current() + .name() + .map(|n| format!(" for thread \"{}\"", n)) + .unwrap_or("".to_owned()); + let tid = get_thread_id(); + let detail = format!("Stack trace{}:{}\n{}", name, tid, backtrace); + let mut omap = THREAD_DUMPS.lock().unwrap(); + let map = omap.as_mut().unwrap(); + (*map).insert(tid, detail); + SIGNAL_COUNTER.fetch_sub(1, Ordering::SeqCst); +} + +fn install_thread_stack_stace_handler(signal: libc::c_int) { + unsafe { + libc::signal(signal, signal_handler as libc::sighandler_t); } } -impl BlockingActionHandler for CaptureBlockingActionHandler { - fn blocking_detected(&self, _signal: libc::c_int, targets: &Vec) -> bool { - let mut set = self.inner.lock().unwrap(); - set.extend(targets); - true +static GTI_MUTEX: Mutex<()> = Mutex::new(()); + +/// A naive stack trace capture implementation for threads for DEMO/TEST only purposes. +fn get_thread_info( + signal: libc::c_int, + targets: &Vec, +) -> HashMap { + let _lock = GTI_MUTEX.lock(); + { + let mut omap = THREAD_DUMPS.lock().unwrap(); + *omap = Some(HashMap::new()); + SIGNAL_COUNTER.store(targets.len(), Ordering::SeqCst); + } + for thread_id in targets { + let result = unsafe { libc::pthread_kill(*thread_id, signal) }; + if result != 0 { + eprintln!("Error sending signal: {:?}", result); + } + } + let time_limit = Duration::from_secs(1); + let start_time = Instant::now(); + loop { + let signal_count = SIGNAL_COUNTER.load(Ordering::SeqCst); + if signal_count <= 0 { + SIGNAL_COUNTER.store(0, Ordering::SeqCst); + break; + } + if Instant::now() - start_time >= time_limit { + break; + } + std::thread::sleep(std::time::Duration::from_micros(10)); + } + { + let omap = THREAD_DUMPS.lock().unwrap(); + omap.clone().unwrap().clone() } } -struct CaptureThreadStateHandler { - inner: Mutex>, +struct DetailedCaptureBlockingActionHandler { + inner: Mutex>>, } -impl CaptureThreadStateHandler { +impl DetailedCaptureBlockingActionHandler { fn new() -> Self { - CaptureThreadStateHandler { - inner: Mutex::new(Vec::new()), + DetailedCaptureBlockingActionHandler { + inner: Mutex::new(None), } } - fn get_all(&self) -> Vec { - let vec = self.inner.lock().unwrap(); - vec.clone() - } - fn contains_symbol(&self, symbol_name: &str) -> bool { // Iterate over the frames in the backtrace - let traces = self.get_all(); - if traces.is_empty() { - return false; + let omap = self.inner.lock().unwrap(); + match omap.as_ref() { + Some(map) => { + if map.is_empty() { + false + } else { + let bt_str = map.values().next().unwrap(); + bt_str.contains(symbol_name) + } + } + None => false, } - let bt_str = traces.get(0).unwrap(); - bt_str.contains(symbol_name) } } -impl ThreadStateHandler for CaptureThreadStateHandler { - fn blocking_thread_details( - &self, - _thread_id: libc::pthread_t, - _thread_name: Option<&str>, - backtrace: Backtrace, - ) { - let mut vec = self.inner.lock().unwrap(); - vec.push(format!("{}", backtrace)); +impl BlockingActionHandler for DetailedCaptureBlockingActionHandler { + fn blocking_detected(&self, workers: &Vec) { + let mut map = self.inner.lock().unwrap(); + let tinfo = get_thread_info(libc::SIGUSR1, workers); + eprint!("Blocking detected with details: {:?}\n", tinfo); + *map = Some(tinfo); } } #[test] fn test_blocking_detection_multi_capture() { - let _guard = GLOBAL_MUTEX.lock().unwrap(); + install_thread_stack_stace_handler(libc::SIGUSR1); let mut builder = tokio::runtime::Builder::new_multi_thread(); let mutable_builder = builder.worker_threads(2); let lrtd = LongRunningTaskDetector::new( Duration::from_millis(10), Duration::from_millis(100), - libc::SIGUSR1, mutable_builder, ); let runtime = builder.enable_all().build().unwrap(); let arc_runtime = Arc::new(runtime); let arc_runtime2 = arc_runtime.clone(); - let blocking_action = Arc::new(CaptureBlockingActionHandler::new()); - let thread_state_action = Arc::new(CaptureThreadStateHandler::new()); + let blocking_action = Arc::new(DetailedCaptureBlockingActionHandler::new()); let to_assert_blocking = blocking_action.clone(); - let to_assert_thread = thread_state_action.clone(); - lrtd.start_with_custom_action(arc_runtime, blocking_action, thread_state_action); + lrtd.start_with_custom_action(arc_runtime, blocking_action); arc_runtime2.spawn(run_blocking_stuff()); arc_runtime2.spawn(run_blocking_stuff()); arc_runtime2.block_on(async { tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; println!("Hello world"); }); - assert_eq!(2, to_assert_blocking.get_all().len()); - assert!(to_assert_thread.contains_symbol("std::thread::sleep")); + assert!(to_assert_blocking.contains_symbol("std::thread::sleep")); lrtd.stop() } - -#[test] -fn test_blocking_detection_stop_unstarted() { - let mut builder = tokio::runtime::Builder::new_multi_thread(); - let mutable_builder = builder.worker_threads(2); - let _lrtd = LongRunningTaskDetector::new( - Duration::from_millis(10), - Duration::from_millis(100), - libc::SIGUSR1, - mutable_builder, - ); -} From 97572a9a03e7f51a74e99fc22738d86216ac482b Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sun, 31 Dec 2023 15:28:44 -0500 Subject: [PATCH 16/23] Move lrtd into tokio (out of tokio-util) --- tokio-util/Cargo.toml | 4 +--- tokio-util/src/cfg.rs | 10 ---------- tokio-util/src/lib.rs | 4 ---- tokio-util/src/lrtd/mod.rs | 5 ----- tokio-util/tests/lrtd.rs | 2 +- tokio/src/runtime/builder.rs | 2 +- .../src/lrtd => tokio/src/runtime}/lrtd.rs | 16 ++++++++-------- tokio/src/runtime/mod.rs | 2 ++ 8 files changed, 13 insertions(+), 32 deletions(-) delete mode 100644 tokio-util/src/lrtd/mod.rs rename {tokio-util/src/lrtd => tokio/src/runtime}/lrtd.rs (94%) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index e52babe049e..86db89167af 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -21,7 +21,7 @@ categories = ["asynchronous"] default = [] # Shorthand for enabling everything -full = ["codec", "compat", "io-util", "time", "net", "rt", "lrtd"] +full = ["codec", "compat", "io-util", "time", "net", "rt"] net = ["tokio/net"] compat = ["futures-io",] @@ -30,7 +30,6 @@ time = ["tokio/time","slab"] io = [] io-util = ["io", "tokio/rt", "tokio/io-util"] rt = ["tokio/rt", "tokio/sync", "futures-util", "hashbrown"] -lrtd = [] __docs_rs = ["futures-util"] @@ -44,7 +43,6 @@ futures-util = { version = "0.3.0", optional = true } pin-project-lite = "0.2.11" slab = { version = "0.4.4", optional = true } # Backs `DelayQueue` tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } -rand = "0.8" libc = "0.2" [target.'cfg(tokio_unstable)'.dependencies] diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs index 250c52e121a..4035255aff0 100644 --- a/tokio-util/src/cfg.rs +++ b/tokio-util/src/cfg.rs @@ -69,13 +69,3 @@ macro_rules! cfg_time { )* } } - -macro_rules! cfg_lrtd { - ($($item:item)*) => { - $( - #[cfg(feature = "lrtd")] - #[cfg_attr(docsrs, doc(cfg(feature = "lrtd")))] - $item - )* - } -} diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 92176346de6..22ad92b8c4b 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -51,10 +51,6 @@ cfg_time! { pub mod time; } -cfg_lrtd! { - pub mod lrtd; -} - pub mod sync; pub mod either; diff --git a/tokio-util/src/lrtd/mod.rs b/tokio-util/src/lrtd/mod.rs deleted file mode 100644 index caee6bd73b1..00000000000 --- a/tokio-util/src/lrtd/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! Utility to help with "really nice to add a warning for tasks that might be blocking" -mod lrtd; - -pub use self::lrtd::BlockingActionHandler; -pub use self::lrtd::LongRunningTaskDetector; diff --git a/tokio-util/tests/lrtd.rs b/tokio-util/tests/lrtd.rs index 2d7eaff3b38..209469f7864 100644 --- a/tokio-util/tests/lrtd.rs +++ b/tokio-util/tests/lrtd.rs @@ -5,7 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; -use tokio_util::lrtd::{BlockingActionHandler, LongRunningTaskDetector}; +use tokio::runtime::lrtd::{BlockingActionHandler, LongRunningTaskDetector}; async fn run_blocking_stuff() { println!("slow start"); diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index f4eeb775984..91211b877de 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -319,7 +319,7 @@ impl Builder { } /// Returns true if kind is "CurrentThread" of this [`Builder`]. False otherwise. - pub fn is_current_threaded(&self) -> bool { + pub(crate) fn is_current_threaded(&self) -> bool { match &self.kind { Kind::CurrentThread => true, #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio/src/runtime/lrtd.rs similarity index 94% rename from tokio-util/src/lrtd/lrtd.rs rename to tokio/src/runtime/lrtd.rs index aa007b39c86..4ef949ef5d6 100644 --- a/tokio-util/src/lrtd/lrtd.rs +++ b/tokio/src/runtime/lrtd.rs @@ -1,12 +1,13 @@ -/// Utility to help with "really nice to add a warning for tasks that might be blocking" +//! Utility to help with "really nice to add a warning for tasks that might be blocking" use libc; -use rand::Rng; use std::collections::HashSet; use std::sync::mpsc; use std::sync::{Arc, Mutex}; use std::time::Duration; use std::{env, thread}; -use tokio::runtime::{Builder, Runtime}; + +use crate::runtime::{Builder, Runtime}; +use crate::util::rand::FastRand; const PANIC_WORKER_BLOCK_DURATION_DEFAULT: Duration = Duration::from_secs(60); @@ -109,7 +110,7 @@ fn probe( /// Example use: /// ``` /// use std::sync::Arc; -/// use tokio_util::lrtd::LongRunningTaskDetector; +/// use tokio::runtime::lrtd::LongRunningTaskDetector; /// /// let mut builder = tokio::runtime::Builder::new_multi_thread(); /// let mutable_builder = builder.worker_threads(2); @@ -207,12 +208,11 @@ impl LongRunningTaskDetector { let interval = self.interval.clone(); let workers = Arc::clone(&self.workers); thread::spawn(move || { - let mut rng = rand::thread_rng(); + let mut rnd = FastRand::new(); + let max: u32 = >::try_into(interval.as_micros()).unwrap() - 10; while !*stop_flag.lock().unwrap() { probe(&runtime, detection_time, &workers, &action); - thread::sleep(Duration::from_micros( - rng.gen_range(1..=interval.as_micros()).try_into().unwrap(), - )); + thread::sleep(Duration::from_micros(rnd.fastrand_n(max).into())); } }); } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 3d333960f3d..f5b924a3d90 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -379,6 +379,8 @@ cfg_rt! { pub use dump::Dump; } + pub mod lrtd; + mod handle; pub use handle::{EnterGuard, Handle, TryCurrentError}; From 78deaf81d53bdb53a9adbbe320ddc6bed16a16a5 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sun, 31 Dec 2023 15:28:44 -0500 Subject: [PATCH 17/23] Move lrtd into tokio (out of tokio-util) --- tokio-util/Cargo.toml | 4 +--- tokio-util/src/cfg.rs | 10 ---------- tokio-util/src/lib.rs | 4 ---- tokio-util/src/lrtd/mod.rs | 5 ----- tokio/src/runtime/builder.rs | 2 +- .../src/lrtd => tokio/src/runtime}/lrtd.rs | 17 +++++++++-------- tokio/src/runtime/mod.rs | 3 +++ {tokio-util => tokio}/tests/lrtd.rs | 3 +-- 8 files changed, 15 insertions(+), 33 deletions(-) delete mode 100644 tokio-util/src/lrtd/mod.rs rename {tokio-util/src/lrtd => tokio/src/runtime}/lrtd.rs (94%) rename {tokio-util => tokio}/tests/lrtd.rs (98%) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index e52babe049e..86db89167af 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -21,7 +21,7 @@ categories = ["asynchronous"] default = [] # Shorthand for enabling everything -full = ["codec", "compat", "io-util", "time", "net", "rt", "lrtd"] +full = ["codec", "compat", "io-util", "time", "net", "rt"] net = ["tokio/net"] compat = ["futures-io",] @@ -30,7 +30,6 @@ time = ["tokio/time","slab"] io = [] io-util = ["io", "tokio/rt", "tokio/io-util"] rt = ["tokio/rt", "tokio/sync", "futures-util", "hashbrown"] -lrtd = [] __docs_rs = ["futures-util"] @@ -44,7 +43,6 @@ futures-util = { version = "0.3.0", optional = true } pin-project-lite = "0.2.11" slab = { version = "0.4.4", optional = true } # Backs `DelayQueue` tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } -rand = "0.8" libc = "0.2" [target.'cfg(tokio_unstable)'.dependencies] diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs index 250c52e121a..4035255aff0 100644 --- a/tokio-util/src/cfg.rs +++ b/tokio-util/src/cfg.rs @@ -69,13 +69,3 @@ macro_rules! cfg_time { )* } } - -macro_rules! cfg_lrtd { - ($($item:item)*) => { - $( - #[cfg(feature = "lrtd")] - #[cfg_attr(docsrs, doc(cfg(feature = "lrtd")))] - $item - )* - } -} diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 92176346de6..22ad92b8c4b 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -51,10 +51,6 @@ cfg_time! { pub mod time; } -cfg_lrtd! { - pub mod lrtd; -} - pub mod sync; pub mod either; diff --git a/tokio-util/src/lrtd/mod.rs b/tokio-util/src/lrtd/mod.rs deleted file mode 100644 index caee6bd73b1..00000000000 --- a/tokio-util/src/lrtd/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! Utility to help with "really nice to add a warning for tasks that might be blocking" -mod lrtd; - -pub use self::lrtd::BlockingActionHandler; -pub use self::lrtd::LongRunningTaskDetector; diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index f4eeb775984..91211b877de 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -319,7 +319,7 @@ impl Builder { } /// Returns true if kind is "CurrentThread" of this [`Builder`]. False otherwise. - pub fn is_current_threaded(&self) -> bool { + pub(crate) fn is_current_threaded(&self) -> bool { match &self.kind { Kind::CurrentThread => true, #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] diff --git a/tokio-util/src/lrtd/lrtd.rs b/tokio/src/runtime/lrtd.rs similarity index 94% rename from tokio-util/src/lrtd/lrtd.rs rename to tokio/src/runtime/lrtd.rs index aa007b39c86..7cbcb7b2fa9 100644 --- a/tokio-util/src/lrtd/lrtd.rs +++ b/tokio/src/runtime/lrtd.rs @@ -1,12 +1,14 @@ -/// Utility to help with "really nice to add a warning for tasks that might be blocking" +//! Utility to help with "really nice to add a warning for tasks that might be blocking" +#![cfg(unix)] use libc; -use rand::Rng; use std::collections::HashSet; use std::sync::mpsc; use std::sync::{Arc, Mutex}; use std::time::Duration; use std::{env, thread}; -use tokio::runtime::{Builder, Runtime}; + +use crate::runtime::{Builder, Runtime}; +use crate::util::rand::FastRand; const PANIC_WORKER_BLOCK_DURATION_DEFAULT: Duration = Duration::from_secs(60); @@ -109,7 +111,7 @@ fn probe( /// Example use: /// ``` /// use std::sync::Arc; -/// use tokio_util::lrtd::LongRunningTaskDetector; +/// use tokio::runtime::lrtd::LongRunningTaskDetector; /// /// let mut builder = tokio::runtime::Builder::new_multi_thread(); /// let mutable_builder = builder.worker_threads(2); @@ -207,12 +209,11 @@ impl LongRunningTaskDetector { let interval = self.interval.clone(); let workers = Arc::clone(&self.workers); thread::spawn(move || { - let mut rng = rand::thread_rng(); + let mut rnd = FastRand::new(); + let max: u32 = >::try_into(interval.as_micros()).unwrap() - 10; while !*stop_flag.lock().unwrap() { probe(&runtime, detection_time, &workers, &action); - thread::sleep(Duration::from_micros( - rng.gen_range(1..=interval.as_micros()).try_into().unwrap(), - )); + thread::sleep(Duration::from_micros(rnd.fastrand_n(max).into())); } }); } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 3d333960f3d..2cdaae5bb8d 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -379,6 +379,9 @@ cfg_rt! { pub use dump::Dump; } + #[cfg(unix)] + pub mod lrtd; + mod handle; pub use handle::{EnterGuard, Handle, TryCurrentError}; diff --git a/tokio-util/tests/lrtd.rs b/tokio/tests/lrtd.rs similarity index 98% rename from tokio-util/tests/lrtd.rs rename to tokio/tests/lrtd.rs index 2d7eaff3b38..16bce604be8 100644 --- a/tokio-util/tests/lrtd.rs +++ b/tokio/tests/lrtd.rs @@ -1,11 +1,10 @@ -#![cfg(all(feature = "lrtd"))] use std::backtrace::Backtrace; use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; -use tokio_util::lrtd::{BlockingActionHandler, LongRunningTaskDetector}; +use tokio::runtime::lrtd::{BlockingActionHandler, LongRunningTaskDetector}; async fn run_blocking_stuff() { println!("slow start"); From 067b0c409a21b3e48745408db07c269129e55a69 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sun, 31 Dec 2023 17:26:37 -0500 Subject: [PATCH 18/23] Fix some merge issue --- tokio/src/runtime/lrtd.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/tokio/src/runtime/lrtd.rs b/tokio/src/runtime/lrtd.rs index 98127dd2547..7cbcb7b2fa9 100644 --- a/tokio/src/runtime/lrtd.rs +++ b/tokio/src/runtime/lrtd.rs @@ -10,9 +10,6 @@ use std::{env, thread}; use crate::runtime::{Builder, Runtime}; use crate::util::rand::FastRand; -use crate::runtime::{Builder, Runtime}; -use crate::util::rand::FastRand; - const PANIC_WORKER_BLOCK_DURATION_DEFAULT: Duration = Duration::from_secs(60); fn get_panic_worker_block_duration() -> Duration { From a9f009bfab1cb2ae21877a852d09d48da5a7d433 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sun, 31 Dec 2023 17:38:12 -0500 Subject: [PATCH 19/23] Fix lint issues. --- tokio/src/runtime/lrtd.rs | 14 +++++++------- tokio/tests/lrtd.rs | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tokio/src/runtime/lrtd.rs b/tokio/src/runtime/lrtd.rs index 7cbcb7b2fa9..8a9d57db090 100644 --- a/tokio/src/runtime/lrtd.rs +++ b/tokio/src/runtime/lrtd.rs @@ -34,14 +34,14 @@ pub trait BlockingActionHandler: Send + Sync { /// /// * `workers` - The list of thread IDs of the tokio runtime worker threads. /// # Returns /// - fn blocking_detected(&self, workers: &Vec); + fn blocking_detected(&self, workers: &[libc::pthread_t]); } struct StdErrBlockingActionHandler; /// BlockingActionHandler implementation that writes blocker details to standard error. impl BlockingActionHandler for StdErrBlockingActionHandler { - fn blocking_detected(&self, workers: &Vec) { + fn blocking_detected(&self, workers: &[libc::pthread_t]) { eprintln!("Detected blocking in worker threads: {:?}", workers); } } @@ -85,7 +85,7 @@ pub struct LongRunningTaskDetector { async fn do_nothing(tx: mpsc::Sender<()>) { // signal I am done - let _ = tx.send(()).unwrap(); + tx.send(()).unwrap(); } fn probe( @@ -103,7 +103,7 @@ fn probe( if !is_probe_success { let targets = workers.get_all(); action.blocking_detected(&targets); - let _ = rx.recv_timeout(get_panic_worker_block_duration()).unwrap(); + rx.recv_timeout(get_panic_worker_block_duration()).unwrap(); } } @@ -205,8 +205,8 @@ impl LongRunningTaskDetector { ) { *self.stop_flag.lock().unwrap() = false; let stop_flag = Arc::clone(&self.stop_flag); - let detection_time = self.detection_time.clone(); - let interval = self.interval.clone(); + let detection_time = self.detection_time; + let interval = self.interval; let workers = Arc::clone(&self.workers); thread::spawn(move || { let mut rnd = FastRand::new(); @@ -221,7 +221,7 @@ impl LongRunningTaskDetector { /// Stops the monitoring thread. Does nothing if LRTD is already stopped. pub fn stop(&self) { let mut sf = self.stop_flag.lock().unwrap(); - if *sf != true { + if !(*sf) { *sf = true; } } diff --git a/tokio/tests/lrtd.rs b/tokio/tests/lrtd.rs index 16bce604be8..1e04d0b1b33 100644 --- a/tokio/tests/lrtd.rs +++ b/tokio/tests/lrtd.rs @@ -98,7 +98,7 @@ static GTI_MUTEX: Mutex<()> = Mutex::new(()); /// A naive stack trace capture implementation for threads for DEMO/TEST only purposes. fn get_thread_info( signal: libc::c_int, - targets: &Vec, + targets: &[libc::pthread_t], ) -> HashMap { let _lock = GTI_MUTEX.lock(); { @@ -160,7 +160,7 @@ impl DetailedCaptureBlockingActionHandler { } impl BlockingActionHandler for DetailedCaptureBlockingActionHandler { - fn blocking_detected(&self, workers: &Vec) { + fn blocking_detected(&self, workers: &[libc::pthread_t]) { let mut map = self.inner.lock().unwrap(); let tinfo = get_thread_info(libc::SIGUSR1, workers); eprint!("Blocking detected with details: {:?}\n", tinfo); From 22b4df37880e46eae3f24dcf7f63a3b261eee4d7 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Mon, 1 Jan 2024 09:31:25 -0500 Subject: [PATCH 20/23] Cleanup lint issues. --- .../tests/fail/macros_type_mismatch.stderr | 22 +- tokio/src/runtime/lrtd.rs | 1 - tokio/tests/lrtd.rs | 340 +++++++++--------- 3 files changed, 176 insertions(+), 187 deletions(-) diff --git a/tests-build/tests/fail/macros_type_mismatch.stderr b/tests-build/tests/fail/macros_type_mismatch.stderr index 579c241559b..d900ac2330c 100644 --- a/tests-build/tests/fail/macros_type_mismatch.stderr +++ b/tests-build/tests/fail/macros_type_mismatch.stderr @@ -1,36 +1,24 @@ error[E0308]: mismatched types --> tests/fail/macros_type_mismatch.rs:5:5 | +4 | async fn missing_semicolon_or_return_type() { + | - help: a return type might be missing here: `-> _` 5 | Ok(()) | ^^^^^^ expected `()`, found `Result<(), _>` | = note: expected unit type `()` found enum `Result<(), _>` -help: a return type might be missing here - | -4 | async fn missing_semicolon_or_return_type() -> _ { - | ++++ -help: consider using `Result::expect` to unwrap the `Result<(), _>` value, panicking if the value is a `Result::Err` - | -5 | Ok(()).expect("REASON") - | +++++++++++++++++ error[E0308]: mismatched types --> tests/fail/macros_type_mismatch.rs:10:5 | +9 | async fn missing_return_type() { + | - help: a return type might be missing here: `-> _` 10 | return Ok(()); | ^^^^^^^^^^^^^^ expected `()`, found `Result<(), _>` | = note: expected unit type `()` found enum `Result<(), _>` -help: a return type might be missing here - | -9 | async fn missing_return_type() -> _ { - | ++++ -help: consider using `Result::expect` to unwrap the `Result<(), _>` value, panicking if the value is a `Result::Err` - | -10 | return Ok(());.expect("REASON") - | +++++++++++++++++ error[E0308]: mismatched types --> tests/fail/macros_type_mismatch.rs:23:5 @@ -53,7 +41,7 @@ error[E0308]: mismatched types --> tests/fail/macros_type_mismatch.rs:32:5 | 30 | async fn issue_4635() { - | - help: try adding a return type: `-> i32` + | - help: try adding a return type: `-> i32` 31 | return 1; 32 | ; | ^ expected `()`, found integer diff --git a/tokio/src/runtime/lrtd.rs b/tokio/src/runtime/lrtd.rs index 8a9d57db090..b8e2fb55304 100644 --- a/tokio/src/runtime/lrtd.rs +++ b/tokio/src/runtime/lrtd.rs @@ -1,5 +1,4 @@ //! Utility to help with "really nice to add a warning for tasks that might be blocking" -#![cfg(unix)] use libc; use std::collections::HashSet; use std::sync::mpsc; diff --git a/tokio/tests/lrtd.rs b/tokio/tests/lrtd.rs index 1e04d0b1b33..501d63802a7 100644 --- a/tokio/tests/lrtd.rs +++ b/tokio/tests/lrtd.rs @@ -1,195 +1,197 @@ -use std::backtrace::Backtrace; -use std::collections::HashMap; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; -use std::thread; -use std::time::{Duration, Instant}; -use tokio::runtime::lrtd::{BlockingActionHandler, LongRunningTaskDetector}; - -async fn run_blocking_stuff() { - println!("slow start"); - thread::sleep(Duration::from_secs(1)); - println!("slow done"); -} - -#[test] -fn test_blocking_detection_multi() { - let mut builder = tokio::runtime::Builder::new_multi_thread(); - let mutable_builder = builder.worker_threads(2); - let lrtd = LongRunningTaskDetector::new( - Duration::from_millis(10), - Duration::from_millis(100), - mutable_builder, - ); - let runtime = builder.enable_all().build().unwrap(); - let arc_runtime = Arc::new(runtime); - let arc_runtime2 = arc_runtime.clone(); - lrtd.start(arc_runtime); - arc_runtime2.spawn(run_blocking_stuff()); - arc_runtime2.spawn(run_blocking_stuff()); - arc_runtime2.block_on(async { - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - println!("Done"); - }); -} - -#[test] -fn test_blocking_detection_current() { - let mut builder = tokio::runtime::Builder::new_current_thread(); - let mutable_builder = builder.enable_all(); - let lrtd = LongRunningTaskDetector::new( - Duration::from_millis(10), - Duration::from_millis(100), - mutable_builder, - ); - let runtime = mutable_builder.build().unwrap(); - let arc_runtime = Arc::new(runtime); - let arc_runtime2 = arc_runtime.clone(); - lrtd.start(arc_runtime); - arc_runtime2.block_on(async { - run_blocking_stuff().await; - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - println!("Done"); - }); -} +#![cfg(unix)] +mod lrtd_tests { + use std::backtrace::Backtrace; + use std::collections::HashMap; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::{Arc, Mutex}; + use std::thread; + use std::time::{Duration, Instant}; + use tokio::runtime::lrtd::{BlockingActionHandler, LongRunningTaskDetector}; + + async fn run_blocking_stuff() { + println!("slow start"); + thread::sleep(Duration::from_secs(1)); + println!("slow done"); + } -#[test] -fn test_blocking_detection_stop_unstarted() { - let mut builder = tokio::runtime::Builder::new_multi_thread(); - let mutable_builder = builder.worker_threads(2); - let _lrtd = LongRunningTaskDetector::new( - Duration::from_millis(10), - Duration::from_millis(100), - mutable_builder, - ); -} + #[test] + fn test_blocking_detection_multi() { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + let mutable_builder = builder.worker_threads(2); + let lrtd = LongRunningTaskDetector::new( + Duration::from_millis(10), + Duration::from_millis(100), + mutable_builder, + ); + let runtime = builder.enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + lrtd.start(arc_runtime); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.block_on(async { + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + println!("Done"); + }); + } -fn get_thread_id() -> libc::pthread_t { - unsafe { libc::pthread_self() } -} + #[test] + fn test_blocking_detection_current() { + let mut builder = tokio::runtime::Builder::new_current_thread(); + let mutable_builder = builder.enable_all(); + let lrtd = LongRunningTaskDetector::new( + Duration::from_millis(10), + Duration::from_millis(100), + mutable_builder, + ); + let runtime = mutable_builder.build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + lrtd.start(arc_runtime); + arc_runtime2.block_on(async { + run_blocking_stuff().await; + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + println!("Done"); + }); + } -static SIGNAL_COUNTER: AtomicUsize = AtomicUsize::new(0); - -static THREAD_DUMPS: Mutex>> = Mutex::new(None); - -extern "C" fn signal_handler(_: i32) { - // not signal safe, this needs to be rewritten to avoid mem allocations and use a pre-allocated buffer. - let backtrace = Backtrace::force_capture(); - let name = thread::current() - .name() - .map(|n| format!(" for thread \"{}\"", n)) - .unwrap_or("".to_owned()); - let tid = get_thread_id(); - let detail = format!("Stack trace{}:{}\n{}", name, tid, backtrace); - let mut omap = THREAD_DUMPS.lock().unwrap(); - let map = omap.as_mut().unwrap(); - (*map).insert(tid, detail); - SIGNAL_COUNTER.fetch_sub(1, Ordering::SeqCst); -} + #[test] + fn test_blocking_detection_stop_unstarted() { + let mut builder = tokio::runtime::Builder::new_multi_thread(); + let mutable_builder = builder.worker_threads(2); + let _lrtd = LongRunningTaskDetector::new( + Duration::from_millis(10), + Duration::from_millis(100), + mutable_builder, + ); + } -fn install_thread_stack_stace_handler(signal: libc::c_int) { - unsafe { - libc::signal(signal, signal_handler as libc::sighandler_t); + fn get_thread_id() -> libc::pthread_t { + unsafe { libc::pthread_self() } } -} -static GTI_MUTEX: Mutex<()> = Mutex::new(()); + static SIGNAL_COUNTER: AtomicUsize = AtomicUsize::new(0); + + static THREAD_DUMPS: Mutex>> = Mutex::new(None); -/// A naive stack trace capture implementation for threads for DEMO/TEST only purposes. -fn get_thread_info( - signal: libc::c_int, - targets: &[libc::pthread_t], -) -> HashMap { - let _lock = GTI_MUTEX.lock(); - { + extern "C" fn signal_handler(_: i32) { + // not signal safe, this needs to be rewritten to avoid mem allocations and use a pre-allocated buffer. + let backtrace = Backtrace::force_capture(); + let name = thread::current() + .name() + .map(|n| format!(" for thread \"{}\"", n)) + .unwrap_or("".to_owned()); + let tid = get_thread_id(); + let detail = format!("Stack trace{}:{}\n{}", name, tid, backtrace); let mut omap = THREAD_DUMPS.lock().unwrap(); - *omap = Some(HashMap::new()); - SIGNAL_COUNTER.store(targets.len(), Ordering::SeqCst); + let map = omap.as_mut().unwrap(); + (*map).insert(tid, detail); + SIGNAL_COUNTER.fetch_sub(1, Ordering::SeqCst); } - for thread_id in targets { - let result = unsafe { libc::pthread_kill(*thread_id, signal) }; - if result != 0 { - eprintln!("Error sending signal: {:?}", result); + + fn install_thread_stack_stace_handler(signal: libc::c_int) { + unsafe { + libc::signal(signal, signal_handler as libc::sighandler_t); } } - let time_limit = Duration::from_secs(1); - let start_time = Instant::now(); - loop { - let signal_count = SIGNAL_COUNTER.load(Ordering::SeqCst); - if signal_count <= 0 { - SIGNAL_COUNTER.store(0, Ordering::SeqCst); - break; + + static GTI_MUTEX: Mutex<()> = Mutex::new(()); + + /// A naive stack trace capture implementation for threads for DEMO/TEST only purposes. + fn get_thread_info( + signal: libc::c_int, + targets: &[libc::pthread_t], + ) -> HashMap { + let _lock = GTI_MUTEX.lock(); + { + let mut omap = THREAD_DUMPS.lock().unwrap(); + *omap = Some(HashMap::new()); + SIGNAL_COUNTER.store(targets.len(), Ordering::SeqCst); } - if Instant::now() - start_time >= time_limit { - break; + for thread_id in targets { + let result = unsafe { libc::pthread_kill(*thread_id, signal) }; + if result != 0 { + eprintln!("Error sending signal: {:?}", result); + } + } + let time_limit = Duration::from_secs(1); + let start_time = Instant::now(); + loop { + let signal_count = SIGNAL_COUNTER.load(Ordering::SeqCst); + if signal_count == 0 { + break; + } + if Instant::now() - start_time >= time_limit { + break; + } + std::thread::sleep(std::time::Duration::from_micros(10)); + } + { + let omap = THREAD_DUMPS.lock().unwrap(); + omap.clone().unwrap().clone() } - std::thread::sleep(std::time::Duration::from_micros(10)); - } - { - let omap = THREAD_DUMPS.lock().unwrap(); - omap.clone().unwrap().clone() } -} -struct DetailedCaptureBlockingActionHandler { - inner: Mutex>>, -} + struct DetailedCaptureBlockingActionHandler { + inner: Mutex>>, + } -impl DetailedCaptureBlockingActionHandler { - fn new() -> Self { - DetailedCaptureBlockingActionHandler { - inner: Mutex::new(None), + impl DetailedCaptureBlockingActionHandler { + fn new() -> Self { + DetailedCaptureBlockingActionHandler { + inner: Mutex::new(None), + } } - } - fn contains_symbol(&self, symbol_name: &str) -> bool { - // Iterate over the frames in the backtrace - let omap = self.inner.lock().unwrap(); - match omap.as_ref() { - Some(map) => { - if map.is_empty() { - false - } else { - let bt_str = map.values().next().unwrap(); - bt_str.contains(symbol_name) + fn contains_symbol(&self, symbol_name: &str) -> bool { + // Iterate over the frames in the backtrace + let omap = self.inner.lock().unwrap(); + match omap.as_ref() { + Some(map) => { + if map.is_empty() { + false + } else { + let bt_str = map.values().next().unwrap(); + bt_str.contains(symbol_name) + } } + None => false, } - None => false, } } -} -impl BlockingActionHandler for DetailedCaptureBlockingActionHandler { - fn blocking_detected(&self, workers: &[libc::pthread_t]) { - let mut map = self.inner.lock().unwrap(); - let tinfo = get_thread_info(libc::SIGUSR1, workers); - eprint!("Blocking detected with details: {:?}\n", tinfo); - *map = Some(tinfo); + impl BlockingActionHandler for DetailedCaptureBlockingActionHandler { + fn blocking_detected(&self, workers: &[libc::pthread_t]) { + let mut map = self.inner.lock().unwrap(); + let tinfo = get_thread_info(libc::SIGUSR1, workers); + eprintln!("Blocking detected with details: {:?}", tinfo); + *map = Some(tinfo); + } } -} -#[test] -fn test_blocking_detection_multi_capture() { - install_thread_stack_stace_handler(libc::SIGUSR1); - let mut builder = tokio::runtime::Builder::new_multi_thread(); - let mutable_builder = builder.worker_threads(2); - let lrtd = LongRunningTaskDetector::new( - Duration::from_millis(10), - Duration::from_millis(100), - mutable_builder, - ); - let runtime = builder.enable_all().build().unwrap(); - let arc_runtime = Arc::new(runtime); - let arc_runtime2 = arc_runtime.clone(); - let blocking_action = Arc::new(DetailedCaptureBlockingActionHandler::new()); - let to_assert_blocking = blocking_action.clone(); - lrtd.start_with_custom_action(arc_runtime, blocking_action); - arc_runtime2.spawn(run_blocking_stuff()); - arc_runtime2.spawn(run_blocking_stuff()); - arc_runtime2.block_on(async { - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - println!("Hello world"); - }); - assert!(to_assert_blocking.contains_symbol("std::thread::sleep")); - lrtd.stop() + #[test] + fn test_blocking_detection_multi_capture() { + install_thread_stack_stace_handler(libc::SIGUSR1); + let mut builder = tokio::runtime::Builder::new_multi_thread(); + let mutable_builder = builder.worker_threads(2); + let lrtd = LongRunningTaskDetector::new( + Duration::from_millis(10), + Duration::from_millis(100), + mutable_builder, + ); + let runtime = builder.enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + let blocking_action = Arc::new(DetailedCaptureBlockingActionHandler::new()); + let to_assert_blocking = blocking_action.clone(); + lrtd.start_with_custom_action(arc_runtime, blocking_action); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.block_on(async { + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + println!("Hello world"); + }); + assert!(to_assert_blocking.contains_symbol("std::thread::sleep")); + lrtd.stop() + } } From 3de7422e6d17cb32bd483e0782cd0cde01fdee85 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Mon, 1 Jan 2024 09:44:59 -0500 Subject: [PATCH 21/23] FIX more clippy lints --- tokio/tests/lrtd.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/tests/lrtd.rs b/tokio/tests/lrtd.rs index 501d63802a7..7b765409956 100644 --- a/tokio/tests/lrtd.rs +++ b/tokio/tests/lrtd.rs @@ -80,7 +80,7 @@ mod lrtd_tests { let name = thread::current() .name() .map(|n| format!(" for thread \"{}\"", n)) - .unwrap_or("".to_owned()); + .unwrap_or_else(|| "".to_owned()); let tid = get_thread_id(); let detail = format!("Stack trace{}:{}\n{}", name, tid, backtrace); let mut omap = THREAD_DUMPS.lock().unwrap(); @@ -128,7 +128,7 @@ mod lrtd_tests { } { let omap = THREAD_DUMPS.lock().unwrap(); - omap.clone().unwrap().clone() + omap.clone().unwrap() } } From a9caaa9fa547f63a923cb9bd8498f3cff618f2a8 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Mon, 1 Jan 2024 09:50:11 -0500 Subject: [PATCH 22/23] undo some unintended changes --- .../tests/fail/macros_type_mismatch.stderr | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/tests-build/tests/fail/macros_type_mismatch.stderr b/tests-build/tests/fail/macros_type_mismatch.stderr index d900ac2330c..579c241559b 100644 --- a/tests-build/tests/fail/macros_type_mismatch.stderr +++ b/tests-build/tests/fail/macros_type_mismatch.stderr @@ -1,24 +1,36 @@ error[E0308]: mismatched types --> tests/fail/macros_type_mismatch.rs:5:5 | -4 | async fn missing_semicolon_or_return_type() { - | - help: a return type might be missing here: `-> _` 5 | Ok(()) | ^^^^^^ expected `()`, found `Result<(), _>` | = note: expected unit type `()` found enum `Result<(), _>` +help: a return type might be missing here + | +4 | async fn missing_semicolon_or_return_type() -> _ { + | ++++ +help: consider using `Result::expect` to unwrap the `Result<(), _>` value, panicking if the value is a `Result::Err` + | +5 | Ok(()).expect("REASON") + | +++++++++++++++++ error[E0308]: mismatched types --> tests/fail/macros_type_mismatch.rs:10:5 | -9 | async fn missing_return_type() { - | - help: a return type might be missing here: `-> _` 10 | return Ok(()); | ^^^^^^^^^^^^^^ expected `()`, found `Result<(), _>` | = note: expected unit type `()` found enum `Result<(), _>` +help: a return type might be missing here + | +9 | async fn missing_return_type() -> _ { + | ++++ +help: consider using `Result::expect` to unwrap the `Result<(), _>` value, panicking if the value is a `Result::Err` + | +10 | return Ok(());.expect("REASON") + | +++++++++++++++++ error[E0308]: mismatched types --> tests/fail/macros_type_mismatch.rs:23:5 @@ -41,7 +53,7 @@ error[E0308]: mismatched types --> tests/fail/macros_type_mismatch.rs:32:5 | 30 | async fn issue_4635() { - | - help: try adding a return type: `-> i32` + | - help: try adding a return type: `-> i32` 31 | return 1; 32 | ; | ^ expected `()`, found integer From 3561be9a166cef64945198c115171e5bff3b8f6b Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Mon, 1 Jan 2024 13:39:32 -0500 Subject: [PATCH 23/23] make is_current_threaded public --- tokio/src/runtime/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 91211b877de..f4eeb775984 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -319,7 +319,7 @@ impl Builder { } /// Returns true if kind is "CurrentThread" of this [`Builder`]. False otherwise. - pub(crate) fn is_current_threaded(&self) -> bool { + pub fn is_current_threaded(&self) -> bool { match &self.kind { Kind::CurrentThread => true, #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]