Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Graceful shutdown for the task manager (#6654)
Browse files Browse the repository at this point in the history
* Initial commit

Forked at: 60e3a69
Parent branch: origin/master

* Move task_manager.rs to mod.rs

* Graceful shutdown for the task manager

* Await all background task JoinHandle at the same time

* Add tests

* Make future() wait also for exit signal + fix essential task failed

Probably related to paritytech/cumulus#111

* add comments for non-obvious code

* Use clean_shutdown() in sc-cli

* Adapt code and upgrade tokio in sc-cli

* cleanup spacing in doc

* Add license

* I guess actually running the clean shutdown would be a good idea

* fix tests

* Update client/cli/src/runner.rs

Co-authored-by: Benjamin Kampmann <ben@gnunicorn.org>

* Improve error logging

* disable other tests (can't reproduce on my machine)

* Revert "disable other tests (can't reproduce on my machine)"

This reverts commit c133c59.

* It is possible that the tasks are ended first

* Revert "It is possible that the tasks are ended first"

This reverts commit 502aba4.

* Use single threaded scheduler for more predictability

* enable_time

* Revert "enable_time"

This reverts commit 4e15214.

* Revert "Use single threaded scheduler for more predictability"

This reverts commit ee5e13c.

* Revert "Revert "It is possible that the tasks are ended first""

This reverts commit 1b91a8c.

* This cannot be verified either with a threaded pool

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

Co-authored-by: Benjamin Kampmann <ben@parity.io>
Co-authored-by: Benjamin Kampmann <ben@gnunicorn.org>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
  • Loading branch information
4 people authored Jul 22, 2020
1 parent e3bb2ce commit 64d4a4d
Show file tree
Hide file tree
Showing 9 changed files with 321 additions and 66 deletions.
28 changes: 15 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ regex = "1.3.1"
time = "0.1.42"
ansi_term = "0.12.1"
lazy_static = "1.4.0"
tokio = { version = "0.2.9", features = [ "signal", "rt-core", "rt-threaded" ] }
tokio = { version = "0.2.21", features = [ "signal", "rt-core", "rt-threaded", "blocking" ] }
futures = "0.3.4"
fdlimit = "0.1.4"
serde_json = "1.0.41"
Expand Down
19 changes: 7 additions & 12 deletions client/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ pub fn build_runtime() -> std::result::Result<tokio::runtime::Runtime, std::io::
}

fn run_until_exit<FUT, ERR>(
mut tokio_runtime: tokio::runtime::Runtime,
future: FUT,
mut tokio_runtime: tokio::runtime::Runtime,
future: FUT,
mut task_manager: TaskManager,
) -> Result<()>
where
Expand Down Expand Up @@ -128,14 +128,10 @@ impl<C: SubstrateCli> Runner<C> {

let task_executor = move |fut, task_type| {
match task_type {
TaskType::Async => { runtime_handle.spawn(fut); }
TaskType::Blocking => {
runtime_handle.spawn(async move {
// `spawn_blocking` is looking for the current runtime, and as such has to
// be called from within `spawn`.
tokio::task::spawn_blocking(move || futures::executor::block_on(fut))
});
}
TaskType::Async => runtime_handle.spawn(fut).map(drop),
TaskType::Blocking =>
runtime_handle.spawn_blocking(move || futures::executor::block_on(fut))
.map(drop),
}
};

Expand Down Expand Up @@ -235,8 +231,7 @@ impl<C: SubstrateCli> Runner<C> {
let mut task_manager = initialise(self.config)?;
self.tokio_runtime.block_on(main(task_manager.future().fuse()))
.map_err(|e| e.to_string())?;
task_manager.terminate();
drop(self.tokio_runtime);
self.tokio_runtime.block_on(task_manager.clean_shutdown());
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions client/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,5 @@ substrate-test-runtime-client = { version = "2.0.0-rc5", path = "../../test-util
sp-consensus-babe = { version = "0.8.0-rc5", path = "../../primitives/consensus/babe" }
grandpa = { version = "0.8.0-rc5", package = "sc-finality-grandpa", path = "../finality-grandpa" }
grandpa-primitives = { version = "2.0.0-rc5", package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" }
tokio = { version = "0.2", default-features = false }
async-std = { version = "1.6", default-features = false }
39 changes: 16 additions & 23 deletions client/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ impl std::convert::From<PathBuf> for BasePath {
}
}

type TaskExecutorInner = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync>;
// NOTE: here for code readability.
pub(crate) type SomeFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
pub(crate) type JoinFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

/// Callable object that execute tasks.
///
Expand All @@ -275,56 +277,47 @@ type TaskExecutorInner = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, Ta
///
/// ```
/// # use sc_service::TaskExecutor;
/// # mod tokio { pub mod runtime {
/// # #[derive(Clone)]
/// # pub struct Runtime;
/// # impl Runtime {
/// # pub fn new() -> Result<Self, ()> { Ok(Runtime) }
/// # pub fn handle(&self) -> &Self { &self }
/// # pub fn spawn(&self, _: std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>>) {}
/// # }
/// # } }
/// use futures::future::FutureExt;
/// use tokio::runtime::Runtime;
///
/// let runtime = Runtime::new().unwrap();
/// let handle = runtime.handle().clone();
/// let task_executor: TaskExecutor = (move |future, _task_type| {
/// handle.spawn(future);
/// }).into();
/// handle.spawn(future).map(|_| ())
/// }).into();
/// ```
///
/// ## Using async-std
///
/// ```
/// # use sc_service::TaskExecutor;
/// # mod async_std { pub mod task {
/// # pub fn spawn(_: std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>>) {}
/// # } }
/// let task_executor: TaskExecutor = (|future, _task_type| {
/// async_std::task::spawn(future);
/// }).into();
/// // NOTE: async-std's JoinHandle is not a Result so we don't need to map the result
/// async_std::task::spawn(future)
/// }).into();
/// ```
#[derive(Clone)]
pub struct TaskExecutor(TaskExecutorInner);
pub struct TaskExecutor(Arc<dyn Fn(SomeFuture, TaskType) -> JoinFuture + Send + Sync>);

impl std::fmt::Debug for TaskExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "TaskExecutor")
}
}

impl<F> std::convert::From<F> for TaskExecutor
impl<F, FUT> std::convert::From<F> for TaskExecutor
where
F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync + 'static,
F: Fn(SomeFuture, TaskType) -> FUT + Send + Sync + 'static,
FUT: Future<Output = ()> + Send + 'static,
{
fn from(x: F) -> Self {
Self(Arc::new(x))
fn from(func: F) -> Self {
Self(Arc::new(move |fut, tt| Box::pin(func(fut, tt))))
}
}

impl TaskExecutor {
/// Spawns a new asynchronous task.
pub fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>, task_type: TaskType) {
pub fn spawn(&self, future: SomeFuture, task_type: TaskType) -> JoinFuture {
self.0(future, task_type)
}
}
Loading

0 comments on commit 64d4a4d

Please sign in to comment.