From a245e96b5b97da9860256a501431d0282bc16bcd Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Tue, 14 Jul 2020 13:50:09 +0200 Subject: [PATCH 01/26] Initial commit Forked at: 60e3a693b29789045614e2ed73126695bc8b0794 Parent branch: origin/master From fe1c372859d7e72b8ebd79bad29aac9c7a47297f Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Tue, 14 Jul 2020 13:51:47 +0200 Subject: [PATCH 02/26] Move task_manager.rs to mod.rs --- client/service/src/{task_manager.rs => task_manager/mod.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename client/service/src/{task_manager.rs => task_manager/mod.rs} (100%) diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager/mod.rs similarity index 100% rename from client/service/src/task_manager.rs rename to client/service/src/task_manager/mod.rs From 163501a0ab15689e4bde9b2355c18fbf0f28742e Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Tue, 14 Jul 2020 15:40:19 +0200 Subject: [PATCH 03/26] Graceful shutdown for the task manager --- client/service/src/config.rs | 20 ++++++---- client/service/src/task_manager/mod.rs | 54 +++++++++++++++++++++----- 2 files changed, 56 insertions(+), 18 deletions(-) diff --git a/client/service/src/config.rs b/client/service/src/config.rs index 397dacd747b14..0b13a90327b2d 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -263,7 +263,9 @@ impl std::convert::From for BasePath { } } -type TaskExecutorInner = Arc + Send>>, TaskType) + Send + Sync>; +// NOTE: here for code readability. +pub(crate) type SomeFuture = Pin + Send>>; +pub(crate) type JoinFuture = Pin + Send>>; /// Callable object that execute tasks. /// @@ -281,7 +283,8 @@ type TaskExecutorInner = Arc + Send>>, Ta /// # impl Runtime { /// # pub fn new() -> Result { Ok(Runtime) } /// # pub fn handle(&self) -> &Self { &self } -/// # pub fn spawn(&self, _: std::pin::Pin + Send>>) {} +/// # pub fn spawn(&self, _: std::pin::Pin + Send>>) +/// # -> std::pin::Pin + Send>> { Box::pin(async {}) } /// # } /// # } } /// use tokio::runtime::Runtime; @@ -289,7 +292,7 @@ type TaskExecutorInner = Arc + Send>>, Ta /// let runtime = Runtime::new().unwrap(); /// let handle = runtime.handle().clone(); /// let task_executor: TaskExecutor = (move |future, _task_type| { -/// handle.spawn(future); +/// handle.spawn(future) /// }).into(); /// ``` /// @@ -298,14 +301,15 @@ type TaskExecutorInner = Arc + Send>>, Ta /// ``` /// # use sc_service::TaskExecutor; /// # mod async_std { pub mod task { -/// # pub fn spawn(_: std::pin::Pin + Send>>) {} +/// # pub fn spawn(_: std::pin::Pin + Send>>) +/// # -> std::pin::Pin + Send>> { Box::pin(async {}) } /// # } } /// let task_executor: TaskExecutor = (|future, _task_type| { -/// async_std::task::spawn(future); +/// async_std::task::spawn(future) /// }).into(); /// ``` #[derive(Clone)] -pub struct TaskExecutor(TaskExecutorInner); +pub struct TaskExecutor(Arc JoinFuture + Send + Sync>); impl std::fmt::Debug for TaskExecutor { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -315,7 +319,7 @@ impl std::fmt::Debug for TaskExecutor { impl std::convert::From for TaskExecutor where - F: Fn(Pin + Send>>, TaskType) + Send + Sync + 'static, + F: Fn(SomeFuture, TaskType) -> JoinFuture + Send + Sync + 'static, { fn from(x: F) -> Self { Self(Arc::new(x)) @@ -324,7 +328,7 @@ where impl TaskExecutor { /// Spawns a new asynchronous task. - pub fn spawn(&self, future: Pin + Send>>, task_type: TaskType) { + pub fn spawn(&self, future: SomeFuture, task_type: TaskType) -> JoinFuture { self.0(future, task_type) } } diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index b6cc26005570a..8abfe88dfe1c5 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -15,7 +15,7 @@ use std::{panic, result::Result, pin::Pin}; use exit_future::Signal; -use log::debug; +use log::{debug, error}; use futures::{ Future, FutureExt, StreamExt, future::{select, Either, BoxFuture}, @@ -30,7 +30,7 @@ use prometheus_endpoint::{ }; use sc_client_api::CloneableSpawn; use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded}; -use crate::{config::{TaskExecutor, TaskType}, Error}; +use crate::{config::{TaskExecutor, TaskType, JoinFuture}, Error}; mod prometheus_future; @@ -40,6 +40,7 @@ pub struct SpawnTaskHandle { on_exit: exit_future::Exit, executor: TaskExecutor, metrics: Option, + task_notifier: TracingUnboundedSender, } impl SpawnTaskHandle { @@ -67,6 +68,11 @@ impl SpawnTaskHandle { task: impl Future + Send + 'static, task_type: TaskType, ) { + if self.task_notifier.is_closed() { + debug!("Attempt to spawn a new task has been prevented: {}", name); + return; + } + let on_exit = self.on_exit.clone(); let metrics = self.metrics.clone(); @@ -111,7 +117,15 @@ impl SpawnTaskHandle { } }; - self.executor.spawn(Box::pin(future), task_type); + let join_handle = self.executor.spawn(Box::pin(future), task_type); + let mut task_notifier = self.task_notifier.clone(); + self.executor.spawn(Box::pin(async move { + if let Err(err) = task_notifier.send(join_handle).await { + error!("Could not send spawned task handle to queue: {}", err); + } + }), + TaskType::Async, + ); } } @@ -223,6 +237,8 @@ pub struct TaskManager { essential_failed_rx: TracingUnboundedReceiver<()>, /// Things to keep alive until the task manager is dropped. keep_alive: Box, + task_notifier: TracingUnboundedSender, + completion_future: JoinFuture, } impl TaskManager { @@ -233,11 +249,19 @@ impl TaskManager { prometheus_registry: Option<&Registry> ) -> Result { let (signal, on_exit) = exit_future::signal(); + // A side-channel for essential tasks to communicate shutdown. let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks"); let metrics = prometheus_registry.map(Metrics::register).transpose()?; + let (task_notifier, mut background_tasks) = tracing_unbounded("mpsc_background_tasks"); + let completion_future = executor.spawn(Box::pin(async move { + while let Some(handle) = background_tasks.next().await { + handle.await; + } + }), TaskType::Async); + Ok(Self { on_exit, signal: Some(signal), @@ -246,6 +270,8 @@ impl TaskManager { essential_failed_tx, essential_failed_rx, keep_alive: Box::new(()), + task_notifier, + completion_future, }) } @@ -256,6 +282,7 @@ impl TaskManager { on_exit: self.on_exit.clone(), executor: self.executor.clone(), metrics: self.metrics.clone(), + task_notifier: self.task_notifier.clone(), } } @@ -264,6 +291,19 @@ impl TaskManager { SpawnEssentialTaskHandle::new(self.essential_failed_tx.clone(), self.spawn_handle()) } + /// Send the signal for termination, prevent new tasks to be created, await for all the existing + /// tasks to finished. + pub fn clean_shutdown(mut self) -> Pin + Send>> { + self.terminate(); + let keep_alive = self.keep_alive; + let completion_future = self.completion_future; + + Box::pin(async move { + completion_future.await; + drop(keep_alive) + }) + } + /// Return a future that will end if an essential task fails. pub fn future<'a>(&'a mut self) -> Pin> + Send + 'a>> { Box::pin(async move { @@ -277,6 +317,7 @@ impl TaskManager { pub fn terminate(&mut self) { if let Some(signal) = self.signal.take() { let _ = signal.fire(); + self.task_notifier.close_channel(); } } @@ -286,13 +327,6 @@ impl TaskManager { } } -impl Drop for TaskManager { - fn drop(&mut self) { - debug!(target: "service", "Tasks manager shutdown"); - self.terminate(); - } -} - #[derive(Clone)] struct Metrics { // This list is ordered alphabetically From cf228c04cd0ca6a7a25f0f89fbb474f03f99d747 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 15 Jul 2020 08:27:31 +0200 Subject: [PATCH 04/26] Await all background task JoinHandle at the same time --- client/service/src/task_manager/mod.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 8abfe88dfe1c5..1294619c647fb 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -255,12 +255,10 @@ impl TaskManager { let metrics = prometheus_registry.map(Metrics::register).transpose()?; - let (task_notifier, mut background_tasks) = tracing_unbounded("mpsc_background_tasks"); - let completion_future = executor.spawn(Box::pin(async move { - while let Some(handle) = background_tasks.next().await { - handle.await; - } - }), TaskType::Async); + let (task_notifier, background_tasks) = tracing_unbounded("mpsc_background_tasks"); + let completion_future = executor.spawn(Box::pin( + background_tasks.for_each_concurrent(None, |x| x) + ), TaskType::Async); Ok(Self { on_exit, @@ -300,7 +298,7 @@ impl TaskManager { Box::pin(async move { completion_future.await; - drop(keep_alive) + drop(keep_alive); }) } From 551268b387a8d6c802e538697c4d2b2fe0cee4d4 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 15 Jul 2020 09:59:13 +0200 Subject: [PATCH 05/26] Add tests --- Cargo.lock | 2 + client/service/Cargo.toml | 2 + client/service/src/config.rs | 27 ++--- client/service/src/task_manager/mod.rs | 15 ++- client/service/src/task_manager/tests.rs | 148 +++++++++++++++++++++++ 5 files changed, 169 insertions(+), 25 deletions(-) create mode 100644 client/service/src/task_manager/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 0b24f9ef57218..0b4b954f86c2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6883,6 +6883,7 @@ dependencies = [ name = "sc-service" version = "0.8.0-rc4" dependencies = [ + "async-std", "derive_more", "directories", "exit-future", @@ -6940,6 +6941,7 @@ dependencies = [ "substrate-test-runtime-client", "sysinfo", "tempfile", + "tokio 0.2.18", "tracing", "wasm-timer", ] diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index 29f89635f663d..6f60925753982 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -90,3 +90,5 @@ substrate-test-runtime-client = { version = "2.0.0-rc4", path = "../../test-util sp-consensus-babe = { version = "0.8.0-rc4", path = "../../primitives/consensus/babe" } grandpa = { version = "0.8.0-rc4", package = "sc-finality-grandpa", path = "../finality-grandpa" } grandpa-primitives = { version = "2.0.0-rc4", package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" } +tokio = { version = "0.2", default-features = false } +async-std = { version = "1.6", default-features = false } diff --git a/client/service/src/config.rs b/client/service/src/config.rs index 0b13a90327b2d..0b9257798d347 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -277,22 +277,13 @@ pub(crate) type JoinFuture = Pin + Send>>; /// /// ``` /// # use sc_service::TaskExecutor; -/// # mod tokio { pub mod runtime { -/// # #[derive(Clone)] -/// # pub struct Runtime; -/// # impl Runtime { -/// # pub fn new() -> Result { Ok(Runtime) } -/// # pub fn handle(&self) -> &Self { &self } -/// # pub fn spawn(&self, _: std::pin::Pin + Send>>) -/// # -> std::pin::Pin + Send>> { Box::pin(async {}) } -/// # } -/// # } } +/// use futures::future::FutureExt; /// use tokio::runtime::Runtime; /// /// let runtime = Runtime::new().unwrap(); /// let handle = runtime.handle().clone(); /// let task_executor: TaskExecutor = (move |future, _task_type| { -/// handle.spawn(future) +/// handle.spawn(future).map(|_| ()) /// }).into(); /// ``` /// @@ -300,11 +291,8 @@ pub(crate) type JoinFuture = Pin + Send>>; /// /// ``` /// # use sc_service::TaskExecutor; -/// # mod async_std { pub mod task { -/// # pub fn spawn(_: std::pin::Pin + Send>>) -/// # -> std::pin::Pin + Send>> { Box::pin(async {}) } -/// # } } /// let task_executor: TaskExecutor = (|future, _task_type| { +/// // NOTE: async-std's JoinHandle is not a Result so we don't need to map the result /// async_std::task::spawn(future) /// }).into(); /// ``` @@ -317,12 +305,13 @@ impl std::fmt::Debug for TaskExecutor { } } -impl std::convert::From for TaskExecutor +impl std::convert::From for TaskExecutor where - F: Fn(SomeFuture, TaskType) -> JoinFuture + Send + Sync + 'static, + F: Fn(SomeFuture, TaskType) -> FUT + Send + Sync + 'static, + FUT: Future + Send + 'static, { - fn from(x: F) -> Self { - Self(Arc::new(x)) + fn from(func: F) -> Self { + Self(Arc::new(move |fut, tt| Box::pin(func(fut, tt)))) } } diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 1294619c647fb..a33db01c259b2 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -33,6 +33,8 @@ use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_u use crate::{config::{TaskExecutor, TaskType, JoinFuture}, Error}; mod prometheus_future; +#[cfg(test)] +mod tests; /// An handle for spawning tasks in the service. #[derive(Clone)] @@ -119,7 +121,8 @@ impl SpawnTaskHandle { let join_handle = self.executor.spawn(Box::pin(future), task_type); let mut task_notifier = self.task_notifier.clone(); - self.executor.spawn(Box::pin(async move { + self.executor.spawn( + Box::pin(async move { if let Err(err) = task_notifier.send(join_handle).await { error!("Could not send spawned task handle to queue: {}", err); } @@ -256,9 +259,10 @@ impl TaskManager { let metrics = prometheus_registry.map(Metrics::register).transpose()?; let (task_notifier, background_tasks) = tracing_unbounded("mpsc_background_tasks"); - let completion_future = executor.spawn(Box::pin( - background_tasks.for_each_concurrent(None, |x| x) - ), TaskType::Async); + let completion_future = executor.spawn( + Box::pin(background_tasks.for_each_concurrent(None, |x| x)), + TaskType::Async, + ); Ok(Self { on_exit, @@ -273,7 +277,6 @@ impl TaskManager { }) } - /// Get a handle for spawning tasks. pub fn spawn_handle(&self) -> SpawnTaskHandle { SpawnTaskHandle { @@ -290,7 +293,7 @@ impl TaskManager { } /// Send the signal for termination, prevent new tasks to be created, await for all the existing - /// tasks to finished. + /// tasks to be finished and drop the object. You can consider this as an async drop. pub fn clean_shutdown(mut self) -> Pin + Send>> { self.terminate(); let keep_alive = self.keep_alive; diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs new file mode 100644 index 0000000000000..0467654570616 --- /dev/null +++ b/client/service/src/task_manager/tests.rs @@ -0,0 +1,148 @@ +use crate::config::TaskExecutor; +use crate::task_manager::TaskManager; +use futures::future::FutureExt; +use parking_lot::Mutex; +use std::any::Any; +use std::sync::Arc; +use std::time::Duration; + +#[derive(Clone)] +struct DropTester(Arc>); + +struct DropTesterRef(DropTester); + +impl DropTester { + fn new() -> DropTester { + DropTester(Arc::new(Mutex::new(0))) + } + + fn new_ref(&self) -> DropTesterRef { + *self.0.lock() += 1; + DropTesterRef(self.clone()) + } + + fn assert_eq(&self, n: usize) { + assert_eq!(*self.0.lock(), n, "unexpected value for drop tester"); + } +} + +impl Drop for DropTesterRef { + fn drop(&mut self) { + *(self.0).0.lock() -= 1; + } +} + +#[test] +fn ensure_drop_tester_working() { + let drop_tester = DropTester::new(); + drop_tester.assert_eq(0); + let drop_tester_ref_1 = drop_tester.new_ref(); + drop_tester.assert_eq(1); + let drop_tester_ref_2 = drop_tester.new_ref(); + drop_tester.assert_eq(2); + drop(drop_tester_ref_1); + drop_tester.assert_eq(1); + drop(drop_tester_ref_2); + drop_tester.assert_eq(0); +} + +async fn run_background_task(_keep_alive: impl Any) { + loop { + tokio::time::delay_for(Duration::from_secs(1)).await; + } +} + +async fn run_background_task_blocking(duration: Duration, _keep_alive: impl Any) { + loop { + // block for X sec (not interruptible) + std::thread::sleep(duration); + // await for 1 sec (interruptible) + tokio::time::delay_for(Duration::from_secs(1)).await; + } +} + +#[test] +fn ensure_futures_are_awaited_on_shutdown() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let task_manager = TaskManager::new(task_executor, None).unwrap(); + let spawn_handle = task_manager.spawn_handle(); + let drop_tester = DropTester::new(); + spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + drop_tester.assert_eq(2); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + drop_tester.assert_eq(2); + runtime.block_on(task_manager.clean_shutdown()); + drop_tester.assert_eq(0); +} + +#[test] +fn ensure_keep_alive_during_shutdown() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let mut task_manager = TaskManager::new(task_executor, None).unwrap(); + let spawn_handle = task_manager.spawn_handle(); + let drop_tester = DropTester::new(); + task_manager.keep_alive(drop_tester.new_ref()); + spawn_handle.spawn("task1", run_background_task(())); + drop_tester.assert_eq(1); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + drop_tester.assert_eq(1); + runtime.block_on(task_manager.clean_shutdown()); + drop_tester.assert_eq(0); +} + +#[test] +fn ensure_blocking_futures_are_awaited_on_shutdown() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let task_manager = TaskManager::new(task_executor, None).unwrap(); + let spawn_handle = task_manager.spawn_handle(); + let drop_tester = DropTester::new(); + spawn_handle.spawn( + "task1", + run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), + ); + spawn_handle.spawn( + "task2", + run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), + ); + drop_tester.assert_eq(2); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + drop_tester.assert_eq(2); + runtime.block_on(task_manager.clean_shutdown()); + drop_tester.assert_eq(0); +} + +#[test] +fn ensure_no_task_can_be_spawn_after_terminate() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let mut task_manager = TaskManager::new(task_executor, None).unwrap(); + let spawn_handle = task_manager.spawn_handle(); + let drop_tester = DropTester::new(); + spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + drop_tester.assert_eq(2); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + drop_tester.assert_eq(2); + task_manager.terminate(); + spawn_handle.spawn("task3", run_background_task(drop_tester.new_ref())); + // NOTE: task3 will not increase the count because it has been ignored + drop_tester.assert_eq(2); + runtime.block_on(task_manager.clean_shutdown()); + drop_tester.assert_eq(0); +} From b3c064867cd95b86aab14e2b4daaeda27f5f9686 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 15 Jul 2020 11:29:58 +0200 Subject: [PATCH 06/26] Make future() wait also for exit signal + fix essential task failed Probably related to https://github.com/paritytech/cumulus/issues/111 --- client/service/src/task_manager/mod.rs | 20 ++++++++--- client/service/src/task_manager/tests.rs | 45 ++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 5 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index a33db01c259b2..a7e03fe048934 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -210,12 +210,12 @@ impl SpawnEssentialTaskHandle { task: impl Future + Send + 'static, task_type: TaskType, ) { - let mut essential_failed = self.essential_failed_tx.clone(); + let essential_failed = self.essential_failed_tx.clone(); let essential_task = std::panic::AssertUnwindSafe(task) .catch_unwind() .map(move |_| { log::error!("Essential task `{}` failed. Shutting down service.", name); - let _ = essential_failed.send(()); + let _ = essential_failed.close_channel(); }); let _ = self.inner.spawn_inner(name, essential_task, task_type); @@ -305,12 +305,22 @@ impl TaskManager { }) } - /// Return a future that will end if an essential task fails. + /// Return a future that will end with success if the signal to terminate was sent + /// (`self.terminate()`) or with an error if an essential task fails. + /// + /// # Warning + /// + /// This function will not end the remaining task. You must call and await `clean_shutdown()` + /// after this. pub fn future<'a>(&'a mut self) -> Pin> + Send + 'a>> { Box::pin(async move { - self.essential_failed_rx.next().await; + let mut t1 = self.essential_failed_rx.next().fuse(); + let mut t2 = self.on_exit.clone().fuse(); - Err(Error::Other("Essential task failed.".into())) + futures::select! { + _ = t1 => Err(Error::Other("Essential task failed.".into())), + _ = t2 => Ok(()), + } }) } diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index 0467654570616..48debf3512027 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -146,3 +146,48 @@ fn ensure_no_task_can_be_spawn_after_terminate() { runtime.block_on(task_manager.clean_shutdown()); drop_tester.assert_eq(0); } + +#[test] +fn ensure_task_manager_future_ends_when_task_manager_terminated() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let mut task_manager = TaskManager::new(task_executor, None).unwrap(); + let spawn_handle = task_manager.spawn_handle(); + let drop_tester = DropTester::new(); + spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + drop_tester.assert_eq(2); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + drop_tester.assert_eq(2); + task_manager.terminate(); + runtime.block_on(task_manager.future()).expect("future has ended without error"); + drop_tester.assert_eq(2); + runtime.block_on(task_manager.clean_shutdown()); + drop_tester.assert_eq(0); +} + +#[test] +fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let mut task_manager = TaskManager::new(task_executor, None).unwrap(); + let spawn_handle = task_manager.spawn_handle(); + let spawn_essential_handle = task_manager.spawn_essential_handle(); + let drop_tester = DropTester::new(); + spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + drop_tester.assert_eq(2); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + drop_tester.assert_eq(2); + spawn_essential_handle.spawn("task3", async { panic!("task failed") }); + runtime.block_on(task_manager.future()).expect_err("future()'s Result must be Err"); + drop_tester.assert_eq(2); + runtime.block_on(task_manager.clean_shutdown()); + drop_tester.assert_eq(0); +} From 2855ada9869998aa9b53ef17f5c69760e14e79f1 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 15 Jul 2020 14:43:26 +0200 Subject: [PATCH 07/26] add comments for non-obvious code --- client/service/src/task_manager/mod.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index a7e03fe048934..bd4b2cfde73ec 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -259,6 +259,9 @@ impl TaskManager { let metrics = prometheus_registry.map(Metrics::register).transpose()?; let (task_notifier, background_tasks) = tracing_unbounded("mpsc_background_tasks"); + // NOTE: for_each_concurrent will await on all the JoinHandle futures at the same time. It + // is possible to limit this but it's actually better for the memory foot print to await + // them all to not accumulate anything on that stream. let completion_future = executor.spawn( Box::pin(background_tasks.for_each_concurrent(None, |x| x)), TaskType::Async, @@ -310,8 +313,8 @@ impl TaskManager { /// /// # Warning /// - /// This function will not end the remaining task. You must call and await `clean_shutdown()` - /// after this. + /// This function will not wait until the end of the remaining task. You must call and await + /// `clean_shutdown()` after this. pub fn future<'a>(&'a mut self) -> Pin> + Send + 'a>> { Box::pin(async move { let mut t1 = self.essential_failed_rx.next().fuse(); @@ -328,6 +331,7 @@ impl TaskManager { pub fn terminate(&mut self) { if let Some(signal) = self.signal.take() { let _ = signal.fire(); + // NOTE: task will prevent new tasks to be spawned self.task_notifier.close_channel(); } } From db34d964c69d076e74a7e4b1c1a436ab8e7c21b9 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 15 Jul 2020 14:56:16 +0200 Subject: [PATCH 08/26] Use clean_shutdown() in sc-cli --- client/cli/src/runner.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 05445c9d85d85..d3d05c6ebc332 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -94,8 +94,8 @@ pub fn build_runtime() -> std::result::Result( - mut tokio_runtime: tokio::runtime::Runtime, - future: FUT, + mut tokio_runtime: tokio::runtime::Runtime, + future: FUT, mut task_manager: TaskManager, ) -> Result<()> where @@ -235,8 +235,7 @@ impl Runner { let mut task_manager = initialise(self.config)?; self.tokio_runtime.block_on(main(task_manager.future().fuse())) .map_err(|e| e.to_string())?; - task_manager.terminate(); - drop(self.tokio_runtime); + task_manager.clean_shutdown(); Ok(()) } From 2dcd0b318ae8b3f52e1b9f0f1dbac883cc987ed3 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 15 Jul 2020 15:20:06 +0200 Subject: [PATCH 09/26] Adapt code and upgrade tokio in sc-cli --- Cargo.lock | 28 ++++++++++++++-------------- client/cli/Cargo.toml | 2 +- client/cli/src/runner.rs | 16 +++++++--------- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b4b954f86c2d..0880208d47c23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2035,7 +2035,7 @@ dependencies = [ "indexmap", "log", "slab", - "tokio 0.2.18", + "tokio 0.2.21", "tokio-util", ] @@ -2257,7 +2257,7 @@ dependencies = [ "net2", "pin-project", "time", - "tokio 0.2.18", + "tokio 0.2.21", "tower-service", "want 0.3.0", ] @@ -2275,7 +2275,7 @@ dependencies = [ "log", "rustls", "rustls-native-certs", - "tokio 0.2.18", + "tokio 0.2.21", "tokio-rustls", "webpki", ] @@ -6129,7 +6129,7 @@ dependencies = [ "substrate-prometheus-endpoint", "tempfile", "time", - "tokio 0.2.18", + "tokio 0.2.21", ] [[package]] @@ -6369,7 +6369,7 @@ dependencies = [ "substrate-test-runtime-client", "substrate-test-runtime-transaction-pool", "tempfile", - "tokio 0.2.18", + "tokio 0.2.21", ] [[package]] @@ -6560,7 +6560,7 @@ dependencies = [ "substrate-prometheus-endpoint", "substrate-test-runtime-client", "tempfile", - "tokio 0.2.18", + "tokio 0.2.21", ] [[package]] @@ -6765,7 +6765,7 @@ dependencies = [ "sp-utils", "substrate-test-runtime-client", "threadpool", - "tokio 0.2.18", + "tokio 0.2.21", ] [[package]] @@ -6941,7 +6941,7 @@ dependencies = [ "substrate-test-runtime-client", "sysinfo", "tempfile", - "tokio 0.2.18", + "tokio 0.2.21", "tracing", "wasm-timer", ] @@ -8362,7 +8362,7 @@ dependencies = [ "sc-rpc-api", "serde", "sp-storage", - "tokio 0.2.18", + "tokio 0.2.21", ] [[package]] @@ -8400,7 +8400,7 @@ dependencies = [ "hyper 0.13.4", "log", "prometheus", - "tokio 0.2.18", + "tokio 0.2.21", ] [[package]] @@ -8875,9 +8875,9 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.18" +version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34ef16d072d2b6dc8b4a56c70f5c5ced1a37752116f8e7c1e80c659aa7cb6713" +checksum = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58" dependencies = [ "bytes 0.5.4", "fnv", @@ -9022,7 +9022,7 @@ checksum = "15cb62a0d2770787abc96e99c1cd98fcf17f94959f3af63ca85bdfb203f051b4" dependencies = [ "futures-core", "rustls", - "tokio 0.2.18", + "tokio 0.2.21", "webpki", ] @@ -9143,7 +9143,7 @@ dependencies = [ "futures-sink", "log", "pin-project-lite", - "tokio 0.2.18", + "tokio 0.2.21", ] [[package]] diff --git a/client/cli/Cargo.toml b/client/cli/Cargo.toml index 3bf480f0b1eee..89b94144659c8 100644 --- a/client/cli/Cargo.toml +++ b/client/cli/Cargo.toml @@ -20,7 +20,7 @@ regex = "1.3.1" time = "0.1.42" ansi_term = "0.12.1" lazy_static = "1.4.0" -tokio = { version = "0.2.9", features = [ "signal", "rt-core", "rt-threaded" ] } +tokio = { version = "0.2.21", features = [ "signal", "rt-core", "rt-threaded", "blocking" ] } futures = "0.3.4" fdlimit = "0.1.4" serde_json = "1.0.41" diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index d3d05c6ebc332..4bf7640c8d843 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -127,15 +127,13 @@ impl Runner { let runtime_handle = tokio_runtime.handle().clone(); let task_executor = move |fut, task_type| { - match task_type { - TaskType::Async => { runtime_handle.spawn(fut); } - TaskType::Blocking => { - runtime_handle.spawn(async move { - // `spawn_blocking` is looking for the current runtime, and as such has to - // be called from within `spawn`. - tokio::task::spawn_blocking(move || futures::executor::block_on(fut)) - }); - } + let map_result = |_| (); + + match task_type{ + TaskType::Async => runtime_handle.spawn(fut).map(map_result), + TaskType::Blocking => + runtime_handle.spawn_blocking(move || futures::executor::block_on(fut)) + .map(map_result), } }; From 42333f42bed34f14abb6ad6a58e5709c5f59de2b Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 15 Jul 2020 15:23:09 +0200 Subject: [PATCH 10/26] cleanup spacing in doc --- client/service/src/config.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/client/service/src/config.rs b/client/service/src/config.rs index 0b9257798d347..15783a87f9917 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -283,8 +283,8 @@ pub(crate) type JoinFuture = Pin + Send>>; /// let runtime = Runtime::new().unwrap(); /// let handle = runtime.handle().clone(); /// let task_executor: TaskExecutor = (move |future, _task_type| { -/// handle.spawn(future).map(|_| ()) -/// }).into(); +/// handle.spawn(future).map(|_| ()) +/// }).into(); /// ``` /// /// ## Using async-std @@ -292,9 +292,9 @@ pub(crate) type JoinFuture = Pin + Send>>; /// ``` /// # use sc_service::TaskExecutor; /// let task_executor: TaskExecutor = (|future, _task_type| { -/// // NOTE: async-std's JoinHandle is not a Result so we don't need to map the result -/// async_std::task::spawn(future) -/// }).into(); +/// // NOTE: async-std's JoinHandle is not a Result so we don't need to map the result +/// async_std::task::spawn(future) +/// }).into(); /// ``` #[derive(Clone)] pub struct TaskExecutor(Arc JoinFuture + Send + Sync>); From 1198324b3ca61743c8a71df84197c2630d661b77 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 15 Jul 2020 15:26:44 +0200 Subject: [PATCH 11/26] Add license --- client/service/src/task_manager/tests.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index 48debf3512027..c00786756baba 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -1,3 +1,21 @@ +// This file is part of Substrate. + +// Copyright (C) 2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + use crate::config::TaskExecutor; use crate::task_manager::TaskManager; use futures::future::FutureExt; From 56c8139a4ff19f9309869862417966bdd50107f2 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 15 Jul 2020 15:32:04 +0200 Subject: [PATCH 12/26] I guess actually running the clean shutdown would be a good idea --- client/cli/src/runner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 4bf7640c8d843..16169c01e435f 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -233,7 +233,7 @@ impl Runner { let mut task_manager = initialise(self.config)?; self.tokio_runtime.block_on(main(task_manager.future().fuse())) .map_err(|e| e.to_string())?; - task_manager.clean_shutdown(); + self.tokio_runtime.block_on(task_manager.clean_shutdown()); Ok(()) } From ef3a054200dc5da188a936b77f067ff5014820d7 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Wed, 15 Jul 2020 15:43:12 +0200 Subject: [PATCH 13/26] fix tests --- client/service/test/src/lib.rs | 1 + utils/browser/src/lib.rs | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index ac95dd11e8b27..5f79e298d5566 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -317,6 +317,7 @@ impl TestNet where let executor = executor.clone(); (move |fut: Pin + Send>>, _| { executor.spawn(fut.unit_error().compat()); + async {} }).into() }; diff --git a/utils/browser/src/lib.rs b/utils/browser/src/lib.rs index 9313d41bf5726..718a9b9751154 100644 --- a/utils/browser/src/lib.rs +++ b/utils/browser/src/lib.rs @@ -64,7 +64,10 @@ where network, telemetry_endpoints: chain_spec.telemetry_endpoints().clone(), chain_spec: Box::new(chain_spec), - task_executor: (|fut, _| wasm_bindgen_futures::spawn_local(fut)).into(), + task_executor: (|fut, _| { + wasm_bindgen_futures::spawn_local(fut); + async {} + }).into(), telemetry_external_transport: Some(transport), role: Role::Light, database: { From 4e6b8d21b8e682ee39266b3696bfec193974787d Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 16 Jul 2020 14:02:35 +0200 Subject: [PATCH 14/26] Update client/cli/src/runner.rs Co-authored-by: Benjamin Kampmann --- client/cli/src/runner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 16169c01e435f..3b75a2f46426c 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -129,7 +129,7 @@ impl Runner { let task_executor = move |fut, task_type| { let map_result = |_| (); - match task_type{ + match task_type { TaskType::Async => runtime_handle.spawn(fut).map(map_result), TaskType::Blocking => runtime_handle.spawn_blocking(move || futures::executor::block_on(fut)) From 5dbe179df2100b6d2dfa79eee6130c6a19e999b3 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 16 Jul 2020 14:47:38 +0200 Subject: [PATCH 15/26] Improve error logging --- client/service/src/task_manager/tests.rs | 60 ++++++++++++------------ 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index c00786756baba..9974138b69fcd 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -24,7 +24,7 @@ use std::any::Any; use std::sync::Arc; use std::time::Duration; -#[derive(Clone)] +#[derive(Clone, Debug)] struct DropTester(Arc>); struct DropTesterRef(DropTester); @@ -38,9 +38,11 @@ impl DropTester { *self.0.lock() += 1; DropTesterRef(self.clone()) } +} - fn assert_eq(&self, n: usize) { - assert_eq!(*self.0.lock(), n, "unexpected value for drop tester"); +impl PartialEq for DropTester { + fn eq(&self, other: &usize) -> bool { + &*self.0.lock() == other } } @@ -53,15 +55,15 @@ impl Drop for DropTesterRef { #[test] fn ensure_drop_tester_working() { let drop_tester = DropTester::new(); - drop_tester.assert_eq(0); + assert_eq!(drop_tester, 0); let drop_tester_ref_1 = drop_tester.new_ref(); - drop_tester.assert_eq(1); + assert_eq!(drop_tester, 1); let drop_tester_ref_2 = drop_tester.new_ref(); - drop_tester.assert_eq(2); + assert_eq!(drop_tester, 2); drop(drop_tester_ref_1); - drop_tester.assert_eq(1); + assert_eq!(drop_tester, 1); drop(drop_tester_ref_2); - drop_tester.assert_eq(0); + assert_eq!(drop_tester, 0); } async fn run_background_task(_keep_alive: impl Any) { @@ -90,12 +92,12 @@ fn ensure_futures_are_awaited_on_shutdown() { let drop_tester = DropTester::new(); spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); - drop_tester.assert_eq(2); + assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); - drop_tester.assert_eq(2); + assert_eq!(drop_tester, 2); runtime.block_on(task_manager.clean_shutdown()); - drop_tester.assert_eq(0); + assert_eq!(drop_tester, 0); } #[test] @@ -109,12 +111,12 @@ fn ensure_keep_alive_during_shutdown() { let drop_tester = DropTester::new(); task_manager.keep_alive(drop_tester.new_ref()); spawn_handle.spawn("task1", run_background_task(())); - drop_tester.assert_eq(1); + assert_eq!(drop_tester, 1); // allow the tasks to even start runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); - drop_tester.assert_eq(1); + assert_eq!(drop_tester, 1); runtime.block_on(task_manager.clean_shutdown()); - drop_tester.assert_eq(0); + assert_eq!(drop_tester, 0); } #[test] @@ -134,12 +136,12 @@ fn ensure_blocking_futures_are_awaited_on_shutdown() { "task2", run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), ); - drop_tester.assert_eq(2); + assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); - drop_tester.assert_eq(2); + assert_eq!(drop_tester, 2); runtime.block_on(task_manager.clean_shutdown()); - drop_tester.assert_eq(0); + assert_eq!(drop_tester, 0); } #[test] @@ -153,16 +155,16 @@ fn ensure_no_task_can_be_spawn_after_terminate() { let drop_tester = DropTester::new(); spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); - drop_tester.assert_eq(2); + assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); - drop_tester.assert_eq(2); + assert_eq!(drop_tester, 2); task_manager.terminate(); spawn_handle.spawn("task3", run_background_task(drop_tester.new_ref())); // NOTE: task3 will not increase the count because it has been ignored - drop_tester.assert_eq(2); + assert_eq!(drop_tester, 2); runtime.block_on(task_manager.clean_shutdown()); - drop_tester.assert_eq(0); + assert_eq!(drop_tester, 0); } #[test] @@ -176,15 +178,15 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { let drop_tester = DropTester::new(); spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); - drop_tester.assert_eq(2); + assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); - drop_tester.assert_eq(2); + assert_eq!(drop_tester, 2); task_manager.terminate(); runtime.block_on(task_manager.future()).expect("future has ended without error"); - drop_tester.assert_eq(2); + assert_eq!(drop_tester, 2); runtime.block_on(task_manager.clean_shutdown()); - drop_tester.assert_eq(0); + assert_eq!(drop_tester, 0); } #[test] @@ -199,13 +201,13 @@ fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() { let drop_tester = DropTester::new(); spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); - drop_tester.assert_eq(2); + assert_eq!(drop_tester, 2); // allow the tasks to even start runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); - drop_tester.assert_eq(2); + assert_eq!(drop_tester, 2); spawn_essential_handle.spawn("task3", async { panic!("task failed") }); runtime.block_on(task_manager.future()).expect_err("future()'s Result must be Err"); - drop_tester.assert_eq(2); + assert_eq!(drop_tester, 2); runtime.block_on(task_manager.clean_shutdown()); - drop_tester.assert_eq(0); + assert_eq!(drop_tester, 0); } From c133c590f33c253123ba0555ce719a71ededd60d Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 16 Jul 2020 14:52:29 +0200 Subject: [PATCH 16/26] disable other tests (can't reproduce on my machine) --- .gitlab-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 80b96b752ae1b..b26660c204a9e 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -206,7 +206,7 @@ test-linux-stable: &test-linux variables: - $DEPLOY_TAG script: - - WASM_BUILD_NO_COLOR=1 time cargo test --all --release --verbose --locked + - WASM_BUILD_NO_COLOR=1 time cargo test --release --verbose --locked -p sc-service --lib - sccache -s unleash-check: From 1fb6481d9bc20dd06453c4a0344df69814da2426 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 16 Jul 2020 15:10:41 +0200 Subject: [PATCH 17/26] Revert "disable other tests (can't reproduce on my machine)" This reverts commit c133c590f33c253123ba0555ce719a71ededd60d. --- .gitlab-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index b26660c204a9e..80b96b752ae1b 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -206,7 +206,7 @@ test-linux-stable: &test-linux variables: - $DEPLOY_TAG script: - - WASM_BUILD_NO_COLOR=1 time cargo test --release --verbose --locked -p sc-service --lib + - WASM_BUILD_NO_COLOR=1 time cargo test --all --release --verbose --locked - sccache -s unleash-check: From 502aba4a49fb5d892e704c412b8a81768a3f2c71 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 16 Jul 2020 15:20:59 +0200 Subject: [PATCH 18/26] It is possible that the tasks are ended first --- client/service/src/task_manager/tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index 9974138b69fcd..b9a8362a364b3 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -184,7 +184,6 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { assert_eq!(drop_tester, 2); task_manager.terminate(); runtime.block_on(task_manager.future()).expect("future has ended without error"); - assert_eq!(drop_tester, 2); runtime.block_on(task_manager.clean_shutdown()); assert_eq!(drop_tester, 0); } From 1b91a8ca3eebbdc18be199c8ca188e88669ae649 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 16 Jul 2020 16:25:24 +0200 Subject: [PATCH 19/26] Revert "It is possible that the tasks are ended first" This reverts commit 502aba4a49fb5d892e704c412b8a81768a3f2c71. --- client/service/src/task_manager/tests.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index b9a8362a364b3..9974138b69fcd 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -184,6 +184,7 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { assert_eq!(drop_tester, 2); task_manager.terminate(); runtime.block_on(task_manager.future()).expect("future has ended without error"); + assert_eq!(drop_tester, 2); runtime.block_on(task_manager.clean_shutdown()); assert_eq!(drop_tester, 0); } From ee5e13c5f13ff71e012dcda13579dffeb15f8ffc Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 16 Jul 2020 16:27:49 +0200 Subject: [PATCH 20/26] Use single threaded scheduler for more predictability --- client/service/src/task_manager/tests.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index 9974138b69fcd..83a36a8f75c2d 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -83,7 +83,7 @@ async fn run_background_task_blocking(duration: Duration, _keep_alive: impl Any) #[test] fn ensure_futures_are_awaited_on_shutdown() { - let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -102,7 +102,7 @@ fn ensure_futures_are_awaited_on_shutdown() { #[test] fn ensure_keep_alive_during_shutdown() { - let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -121,7 +121,7 @@ fn ensure_keep_alive_during_shutdown() { #[test] fn ensure_blocking_futures_are_awaited_on_shutdown() { - let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -146,7 +146,7 @@ fn ensure_blocking_futures_are_awaited_on_shutdown() { #[test] fn ensure_no_task_can_be_spawn_after_terminate() { - let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -169,7 +169,7 @@ fn ensure_no_task_can_be_spawn_after_terminate() { #[test] fn ensure_task_manager_future_ends_when_task_manager_terminated() { - let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -191,7 +191,7 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { #[test] fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() { - let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); From 4e152140764a4bddeedff06a4e36ec701909e8c7 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 16 Jul 2020 17:36:23 +0200 Subject: [PATCH 21/26] enable_time --- client/service/src/task_manager/tests.rs | 44 +++++++++++++++++++----- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index 83a36a8f75c2d..e89d8031daf7e 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -83,7 +83,11 @@ async fn run_background_task_blocking(duration: Duration, _keep_alive: impl Any) #[test] fn ensure_futures_are_awaited_on_shutdown() { - let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_time() + .build() + .unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -102,7 +106,11 @@ fn ensure_futures_are_awaited_on_shutdown() { #[test] fn ensure_keep_alive_during_shutdown() { - let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_time() + .build() + .unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -121,7 +129,11 @@ fn ensure_keep_alive_during_shutdown() { #[test] fn ensure_blocking_futures_are_awaited_on_shutdown() { - let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_time() + .build() + .unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -146,7 +158,11 @@ fn ensure_blocking_futures_are_awaited_on_shutdown() { #[test] fn ensure_no_task_can_be_spawn_after_terminate() { - let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_time() + .build() + .unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -169,7 +185,11 @@ fn ensure_no_task_can_be_spawn_after_terminate() { #[test] fn ensure_task_manager_future_ends_when_task_manager_terminated() { - let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_time() + .build() + .unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -183,7 +203,9 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 2); task_manager.terminate(); - runtime.block_on(task_manager.future()).expect("future has ended without error"); + runtime + .block_on(task_manager.future()) + .expect("future has ended without error"); assert_eq!(drop_tester, 2); runtime.block_on(task_manager.clean_shutdown()); assert_eq!(drop_tester, 0); @@ -191,7 +213,11 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { #[test] fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() { - let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_time() + .build() + .unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -206,7 +232,9 @@ fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() { runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 2); spawn_essential_handle.spawn("task3", async { panic!("task failed") }); - runtime.block_on(task_manager.future()).expect_err("future()'s Result must be Err"); + runtime + .block_on(task_manager.future()) + .expect_err("future()'s Result must be Err"); assert_eq!(drop_tester, 2); runtime.block_on(task_manager.clean_shutdown()); assert_eq!(drop_tester, 0); From fbcefa445fb15c660fb7fcfe68e5f1c387ee275e Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 16 Jul 2020 19:36:37 +0200 Subject: [PATCH 22/26] Revert "enable_time" This reverts commit 4e152140764a4bddeedff06a4e36ec701909e8c7. --- client/service/src/task_manager/tests.rs | 44 +++++------------------- 1 file changed, 8 insertions(+), 36 deletions(-) diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index e89d8031daf7e..83a36a8f75c2d 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -83,11 +83,7 @@ async fn run_background_task_blocking(duration: Duration, _keep_alive: impl Any) #[test] fn ensure_futures_are_awaited_on_shutdown() { - let mut runtime = tokio::runtime::Builder::new() - .basic_scheduler() - .enable_time() - .build() - .unwrap(); + let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -106,11 +102,7 @@ fn ensure_futures_are_awaited_on_shutdown() { #[test] fn ensure_keep_alive_during_shutdown() { - let mut runtime = tokio::runtime::Builder::new() - .basic_scheduler() - .enable_time() - .build() - .unwrap(); + let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -129,11 +121,7 @@ fn ensure_keep_alive_during_shutdown() { #[test] fn ensure_blocking_futures_are_awaited_on_shutdown() { - let mut runtime = tokio::runtime::Builder::new() - .basic_scheduler() - .enable_time() - .build() - .unwrap(); + let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -158,11 +146,7 @@ fn ensure_blocking_futures_are_awaited_on_shutdown() { #[test] fn ensure_no_task_can_be_spawn_after_terminate() { - let mut runtime = tokio::runtime::Builder::new() - .basic_scheduler() - .enable_time() - .build() - .unwrap(); + let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -185,11 +169,7 @@ fn ensure_no_task_can_be_spawn_after_terminate() { #[test] fn ensure_task_manager_future_ends_when_task_manager_terminated() { - let mut runtime = tokio::runtime::Builder::new() - .basic_scheduler() - .enable_time() - .build() - .unwrap(); + let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -203,9 +183,7 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 2); task_manager.terminate(); - runtime - .block_on(task_manager.future()) - .expect("future has ended without error"); + runtime.block_on(task_manager.future()).expect("future has ended without error"); assert_eq!(drop_tester, 2); runtime.block_on(task_manager.clean_shutdown()); assert_eq!(drop_tester, 0); @@ -213,11 +191,7 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { #[test] fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() { - let mut runtime = tokio::runtime::Builder::new() - .basic_scheduler() - .enable_time() - .build() - .unwrap(); + let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -232,9 +206,7 @@ fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() { runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); assert_eq!(drop_tester, 2); spawn_essential_handle.spawn("task3", async { panic!("task failed") }); - runtime - .block_on(task_manager.future()) - .expect_err("future()'s Result must be Err"); + runtime.block_on(task_manager.future()).expect_err("future()'s Result must be Err"); assert_eq!(drop_tester, 2); runtime.block_on(task_manager.clean_shutdown()); assert_eq!(drop_tester, 0); From 23b642a9e043a967535d13b649bc96e05d383ea0 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 16 Jul 2020 19:36:45 +0200 Subject: [PATCH 23/26] Revert "Use single threaded scheduler for more predictability" This reverts commit ee5e13c5f13ff71e012dcda13579dffeb15f8ffc. --- client/service/src/task_manager/tests.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index 83a36a8f75c2d..9974138b69fcd 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -83,7 +83,7 @@ async fn run_background_task_blocking(duration: Duration, _keep_alive: impl Any) #[test] fn ensure_futures_are_awaited_on_shutdown() { - let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -102,7 +102,7 @@ fn ensure_futures_are_awaited_on_shutdown() { #[test] fn ensure_keep_alive_during_shutdown() { - let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -121,7 +121,7 @@ fn ensure_keep_alive_during_shutdown() { #[test] fn ensure_blocking_futures_are_awaited_on_shutdown() { - let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -146,7 +146,7 @@ fn ensure_blocking_futures_are_awaited_on_shutdown() { #[test] fn ensure_no_task_can_be_spawn_after_terminate() { - let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -169,7 +169,7 @@ fn ensure_no_task_can_be_spawn_after_terminate() { #[test] fn ensure_task_manager_future_ends_when_task_manager_terminated() { - let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); @@ -191,7 +191,7 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { #[test] fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() { - let mut runtime = tokio::runtime::Builder::new().basic_scheduler().build().unwrap(); + let mut runtime = tokio::runtime::Runtime::new().unwrap(); let handle = runtime.handle().clone(); let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); From 5b5becb305514d32e28de0abe31bbc2cd6f55741 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 16 Jul 2020 19:36:57 +0200 Subject: [PATCH 24/26] Revert "Revert "It is possible that the tasks are ended first"" This reverts commit 1b91a8ca3eebbdc18be199c8ca188e88669ae649. --- client/service/src/task_manager/tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index 9974138b69fcd..b9a8362a364b3 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -184,7 +184,6 @@ fn ensure_task_manager_future_ends_when_task_manager_terminated() { assert_eq!(drop_tester, 2); task_manager.terminate(); runtime.block_on(task_manager.future()).expect("future has ended without error"); - assert_eq!(drop_tester, 2); runtime.block_on(task_manager.clean_shutdown()); assert_eq!(drop_tester, 0); } From bc431b6280f25a8795afb58e4904322acd9e2110 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Thu, 16 Jul 2020 19:38:00 +0200 Subject: [PATCH 25/26] This cannot be verified either with a threaded pool --- client/service/src/task_manager/tests.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index b9a8362a364b3..c60d15b3394c3 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -161,8 +161,6 @@ fn ensure_no_task_can_be_spawn_after_terminate() { assert_eq!(drop_tester, 2); task_manager.terminate(); spawn_handle.spawn("task3", run_background_task(drop_tester.new_ref())); - // NOTE: task3 will not increase the count because it has been ignored - assert_eq!(drop_tester, 2); runtime.block_on(task_manager.clean_shutdown()); assert_eq!(drop_tester, 0); } From b5e5fe24e75b273770074802bfa1a882cbc3e576 Mon Sep 17 00:00:00 2001 From: Cecile Tonglet Date: Tue, 21 Jul 2020 21:07:27 +0200 Subject: [PATCH 26/26] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/cli/src/runner.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 3b75a2f46426c..219613e6bddee 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -127,13 +127,11 @@ impl Runner { let runtime_handle = tokio_runtime.handle().clone(); let task_executor = move |fut, task_type| { - let map_result = |_| (); - match task_type { - TaskType::Async => runtime_handle.spawn(fut).map(map_result), + TaskType::Async => runtime_handle.spawn(fut).map(drop), TaskType::Blocking => runtime_handle.spawn_blocking(move || futures::executor::block_on(fut)) - .map(map_result), + .map(drop), } };