diff --git a/tokio/tests/rt_unstable_metrics.rs b/tokio/tests/rt_unstable_metrics.rs index bfe3d7f2a68..6e3e97f1df4 100644 --- a/tokio/tests/rt_unstable_metrics.rs +++ b/tokio/tests/rt_unstable_metrics.rs @@ -8,7 +8,7 @@ ))] use std::future::Future; -use std::sync::{Arc, Mutex}; +use std::sync::{mpsc, Arc, Mutex}; use std::task::Poll; use std::thread; use tokio::macros::support::poll_fn; @@ -295,42 +295,34 @@ fn worker_noop_count() { } #[test] -#[ignore] // this test is flaky, see https://github.com/tokio-rs/tokio/issues/6470 fn worker_steal_count() { // This metric only applies to the multi-threaded runtime. - // - // We use a blocking channel to backup one worker thread. - use std::sync::mpsc::channel; - - let rt = threaded_no_lifo(); - let metrics = rt.metrics(); - - rt.block_on(async { - let (tx, rx) = channel(); + for _ in 0..10 { + let rt = threaded_no_lifo(); + let metrics = rt.metrics(); - // Move to the runtime. - tokio::spawn(async move { - // Spawn the task that sends to the channel - // - // Since the lifo slot is disabled, this task is stealable. - tokio::spawn(async move { - tx.send(()).unwrap(); - }); + let successfully_spawned_stealable_task = rt.block_on(async { + // The call to `try_spawn_stealable_task` may time out, which means + // that the sending task couldn't be scheduled due to a deadlock in + // the runtime. + // This is expected behaviour, we just retry until we succeed or + // exhaust all tries, the latter causing this test to fail. + try_spawn_stealable_task().await.is_ok() + }); - // Blocking receive on the channel. - rx.recv().unwrap(); - }) - .await - .unwrap(); - }); + drop(rt); - drop(rt); + if successfully_spawned_stealable_task { + let n: u64 = (0..metrics.num_workers()) + .map(|i| metrics.worker_steal_count(i)) + .sum(); - let n: u64 = (0..metrics.num_workers()) - .map(|i| metrics.worker_steal_count(i)) - .sum(); + assert_eq!(1, n); + return; + } + } - assert_eq!(1, n); + panic!("exhausted every try to schedule the stealable task"); } #[test] @@ -835,6 +827,30 @@ fn io_driver_ready_count() { assert_eq!(metrics.io_driver_ready_count(), 1); } +async fn try_spawn_stealable_task() -> Result<(), mpsc::RecvTimeoutError> { + // We use a blocking channel to synchronize the tasks. + let (tx, rx) = mpsc::channel(); + + // Make sure we are in the context of the runtime. + tokio::spawn(async move { + // Spawn the task that sends to the channel. + // + // Note that the runtime needs to have the lifo slot disabled to make + // this task stealable. + tokio::spawn(async move { + tx.send(()).unwrap(); + }); + + // Blocking receive on the channel, timing out if the sending task + // wasn't scheduled in time. + rx.recv_timeout(Duration::from_secs(1)) + }) + .await + .unwrap()?; + + Ok(()) +} + fn current_thread() -> Runtime { tokio::runtime::Builder::new_current_thread() .enable_all()