Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tokio Util LongRunningTaskDetector. (https://github.com/tokio-rs/console/issues/150) #6256

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1ea1bb3
First "working" POC.
zolyfarkas Dec 28, 2023
03f3bcc
First "working" POC to implement https://github.com/tokio-rs/console/…
zolyfarkas Dec 28, 2023
9dcefab
Merge branch 'TOKIO-LRTD' of https://github.com/zolyfarkas/tokio into…
zolyfarkas Dec 28, 2023
77c61f5
[cleanup] remove unused method.
zolyfarkas Dec 28, 2023
e6c9c6c
Add ability to hook actions on detection.
zolyfarkas Dec 29, 2023
144cfff
[add] document action traits + minor improvement.
zolyfarkas Dec 30, 2023
6f00e2e
[add] More documentation + test.
zolyfarkas Dec 30, 2023
fb4cc02
[add] one more comment.
zolyfarkas Dec 30, 2023
5247795
[cleanup] fix all warnings + code cleanup.
zolyfarkas Dec 30, 2023
73c8bc5
Fix tests + remove unsafe stuff.
zolyfarkas Dec 30, 2023
6e77b5e
FIX formatting
zolyfarkas Dec 30, 2023
df172e8
Cleanup unnecessary drop.
zolyfarkas Dec 30, 2023
86bbf4c
[fix] doctest
zolyfarkas Dec 30, 2023
f5e1ab0
Merge branch 'master' into TOKIO-LRTD
zolyfarkas Dec 30, 2023
f41f470
Fix formatting issue
zolyfarkas Dec 30, 2023
e978a2c
remove nix dependency
zolyfarkas Dec 31, 2023
ab43c31
Separate out blocking detection from thread state dump.
zolyfarkas Dec 31, 2023
97572a9
Move lrtd into tokio (out of tokio-util)
zolyfarkas Dec 31, 2023
78deaf8
Move lrtd into tokio (out of tokio-util)
zolyfarkas Dec 31, 2023
8a66f5b
Merge branch 'TOKIO-LRTD' of https://github.com/zolyfarkas/tokio into…
zolyfarkas Dec 31, 2023
067b0c4
Fix some merge issue
zolyfarkas Dec 31, 2023
a9f009b
Fix lint issues.
zolyfarkas Dec 31, 2023
22b4df3
Cleanup lint issues.
zolyfarkas Jan 1, 2024
3de7422
FIX more clippy lints
zolyfarkas Jan 1, 2024
a9caaa9
undo some unintended changes
zolyfarkas Jan 1, 2024
3561be9
make is_current_threaded public
zolyfarkas Jan 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ futures-util = { version = "0.3.0", optional = true }
pin-project-lite = "0.2.11"
slab = { version = "0.4.4", optional = true } # Backs `DelayQueue`
tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true }
libc = "0.2"

[target.'cfg(tokio_unstable)'.dependencies]
hashbrown = { version = "0.14.0", optional = true }
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,17 @@ impl Builder {
}
}

/// Returns true if kind is "CurrentThread" of this [`Builder`]. False otherwise.
pub fn is_current_threaded(&self) -> bool {
match &self.kind {
Kind::CurrentThread => true,
#[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
Kind::MultiThread => false,
#[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
Kind::MultiThreadAlt => false,
}
}

/// Enables both I/O and time drivers.
///
/// Doing this is a shorthand for calling `enable_io` and `enable_time`
Expand Down
233 changes: 233 additions & 0 deletions tokio/src/runtime/lrtd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
//! Utility to help with "really nice to add a warning for tasks that might be blocking"
use libc;
use std::collections::HashSet;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{env, thread};

use crate::runtime::{Builder, Runtime};
use crate::util::rand::FastRand;

const PANIC_WORKER_BLOCK_DURATION_DEFAULT: Duration = Duration::from_secs(60);

fn get_panic_worker_block_duration() -> Duration {
let duration_str = env::var("MY_DURATION_ENV").unwrap_or_else(|_| "60".to_string());
duration_str
.parse::<u64>()
.map(Duration::from_secs)
.unwrap_or(PANIC_WORKER_BLOCK_DURATION_DEFAULT)
}

fn get_thread_id() -> libc::pthread_t {
unsafe { libc::pthread_self() }
}

/// A trait for handling actions when blocking is detected.
///
/// This trait provides a method for handling the detection of a blocking action.
pub trait BlockingActionHandler: Send + Sync {
/// Called when a blocking action is detected and prior to thread signaling.
///
/// # Arguments
///
/// * `workers` - The list of thread IDs of the tokio runtime worker threads. /// # Returns
///
fn blocking_detected(&self, workers: &[libc::pthread_t]);
}

struct StdErrBlockingActionHandler;

/// BlockingActionHandler implementation that writes blocker details to standard error.
impl BlockingActionHandler for StdErrBlockingActionHandler {
fn blocking_detected(&self, workers: &[libc::pthread_t]) {
eprintln!("Detected blocking in worker threads: {:?}", workers);
}
}

#[derive(Debug)]
struct WorkerSet {
inner: Mutex<HashSet<libc::pthread_t>>,
}

impl WorkerSet {
fn new() -> Self {
WorkerSet {
inner: Mutex::new(HashSet::new()),
}
}

fn add(&self, pid: libc::pthread_t) {
let mut set = self.inner.lock().unwrap();
set.insert(pid);
}

fn remove(&self, pid: libc::pthread_t) {
let mut set = self.inner.lock().unwrap();
set.remove(&pid);
}

fn get_all(&self) -> Vec<libc::pthread_t> {
let set = self.inner.lock().unwrap();
set.iter().cloned().collect()
}
}

/// Utility to help with "really nice to add a warning for tasks that might be blocking"
#[derive(Debug)]
pub struct LongRunningTaskDetector {
interval: Duration,
detection_time: Duration,
stop_flag: Arc<Mutex<bool>>,
workers: Arc<WorkerSet>,
}

async fn do_nothing(tx: mpsc::Sender<()>) {
// signal I am done
tx.send(()).unwrap();
}

fn probe(
tokio_runtime: &Arc<Runtime>,
detection_time: Duration,
workers: &Arc<WorkerSet>,
action: &Arc<dyn BlockingActionHandler>,
) {
let (tx, rx) = mpsc::channel();
let _nothing_handle = tokio_runtime.spawn(do_nothing(tx));
let is_probe_success = match rx.recv_timeout(detection_time) {
Ok(_result) => true,
Err(_) => false,
};
if !is_probe_success {
let targets = workers.get_all();
action.blocking_detected(&targets);
rx.recv_timeout(get_panic_worker_block_duration()).unwrap();
}
}

/// Utility to help with "really nice to add a warning for tasks that might be blocking"
/// Example use:
/// ```
/// use std::sync::Arc;
/// use tokio::runtime::lrtd::LongRunningTaskDetector;
///
/// let mut builder = tokio::runtime::Builder::new_multi_thread();
/// let mutable_builder = builder.worker_threads(2);
/// let lrtd = LongRunningTaskDetector::new(
/// std::time::Duration::from_millis(10),
/// std::time::Duration::from_millis(100),
/// mutable_builder,
/// );
/// let runtime = builder.enable_all().build().unwrap();
/// let arc_runtime = Arc::new(runtime);
/// let arc_runtime2 = arc_runtime.clone();
/// lrtd.start(arc_runtime);
/// arc_runtime2.block_on(async {
/// print!("my async code")
/// });
///
/// ```
///
/// The above will allow you to get details on what is blocking your tokio worker threads for longer that 100ms.
/// The detail will look like:
///
/// ```text
/// Detected blocking in worker threads: [123145318232064, 123145320341504]
/// ```
///
/// To get more details(like stack traces) start LongRunningTaskDetector with start_with_custom_action and provide a custom handler.
///
impl LongRunningTaskDetector {
/// Creates a new `LongRunningTaskDetector` instance.
///
/// # Arguments
///
/// * `interval` - The interval between probes. This interval is randomized.
/// * `detection_time` - The maximum time allowed for a probe to succeed.
/// A probe running for longer indicates something is blocking the worker threads.
/// * `runtime_builder` - A mutable reference to a `tokio::runtime::Builder`.
///
/// # Returns
///
/// Returns a new `LongRunningTaskDetector` instance.
pub fn new(
interval: Duration,
detection_time: Duration,
runtime_builder: &mut Builder,
) -> Self {
let workers = Arc::new(WorkerSet::new());
if runtime_builder.is_current_threaded() {
workers.add(get_thread_id());
} else {
let workers_clone = Arc::clone(&workers);
let workers_clone2 = Arc::clone(&workers);
runtime_builder
.on_thread_start(move || {
let pid = get_thread_id();
workers_clone.add(pid);
})
.on_thread_stop(move || {
let pid = get_thread_id();
workers_clone2.remove(pid);
});
}
LongRunningTaskDetector {
interval,
detection_time,
stop_flag: Arc::new(Mutex::new(true)),
workers,
}
}

/// Starts the monitoring thread with default action handlers (that write details to std err).
///
/// # Arguments
///
/// * `runtime` - An `Arc` reference to a `tokio::runtime::Runtime`.
pub fn start(&self, runtime: Arc<Runtime>) {
self.start_with_custom_action(runtime, Arc::new(StdErrBlockingActionHandler))
}

/// Starts the monitoring process with custom action handlers that
/// allow you to customize what happens when blocking is detected.
///
/// # Arguments
///
/// * `runtime` - An `Arc` reference to a `tokio::runtime::Runtime`.
/// * `action` - An `Arc` reference to a custom `BlockingActionHandler`.
/// * `thread_action` - An `Arc` reference to a custom `ThreadStateHandler`.
pub fn start_with_custom_action(
&self,
runtime: Arc<Runtime>,
action: Arc<dyn BlockingActionHandler>,
) {
*self.stop_flag.lock().unwrap() = false;
let stop_flag = Arc::clone(&self.stop_flag);
let detection_time = self.detection_time;
let interval = self.interval;
let workers = Arc::clone(&self.workers);
thread::spawn(move || {
let mut rnd = FastRand::new();
let max: u32 = <u128 as TryInto<u32>>::try_into(interval.as_micros()).unwrap() - 10;
while !*stop_flag.lock().unwrap() {
probe(&runtime, detection_time, &workers, &action);
thread::sleep(Duration::from_micros(rnd.fastrand_n(max).into()));
}
});
}

/// Stops the monitoring thread. Does nothing if LRTD is already stopped.
pub fn stop(&self) {
let mut sf = self.stop_flag.lock().unwrap();
if !(*sf) {
*sf = true;
}
}
}

impl Drop for LongRunningTaskDetector {
fn drop(&mut self) {
self.stop();
}
}
3 changes: 3 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ cfg_rt! {
pub use dump::Dump;
}

#[cfg(unix)]
pub mod lrtd;

mod handle;
pub use handle::{EnterGuard, Handle, TryCurrentError};

Expand Down
Loading
Loading