Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Graceful shutdown for the task manager #6654

Merged
33 commits merged into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a245e96
Initial commit
cecton Jul 14, 2020
fe1c372
Move task_manager.rs to mod.rs
cecton Jul 14, 2020
163501a
Graceful shutdown for the task manager
cecton Jul 14, 2020
cf228c0
Await all background task JoinHandle at the same time
cecton Jul 15, 2020
551268b
Add tests
cecton Jul 15, 2020
b3c0648
Make future() wait also for exit signal + fix essential task failed
cecton Jul 15, 2020
2855ada
add comments for non-obvious code
cecton Jul 15, 2020
db34d96
Use clean_shutdown() in sc-cli
cecton Jul 15, 2020
2dcd0b3
Adapt code and upgrade tokio in sc-cli
cecton Jul 15, 2020
42333f4
cleanup spacing in doc
cecton Jul 15, 2020
1198324
Add license
cecton Jul 15, 2020
56c8139
I guess actually running the clean shutdown would be a good idea
cecton Jul 15, 2020
ef3a054
fix tests
cecton Jul 15, 2020
6f656cf
Merge remote-tracking branch 'origin/master' into cecton-async-gracef…
gnunicorn Jul 16, 2020
4e6b8d2
Update client/cli/src/runner.rs
cecton Jul 16, 2020
5dbe179
Improve error logging
cecton Jul 16, 2020
c133c59
disable other tests (can't reproduce on my machine)
cecton Jul 16, 2020
1fb6481
Revert "disable other tests (can't reproduce on my machine)"
cecton Jul 16, 2020
502aba4
It is possible that the tasks are ended first
cecton Jul 16, 2020
1b91a8c
Revert "It is possible that the tasks are ended first"
cecton Jul 16, 2020
ee5e13c
Use single threaded scheduler for more predictability
cecton Jul 16, 2020
4e15214
enable_time
cecton Jul 16, 2020
fbcefa4
Revert "enable_time"
cecton Jul 16, 2020
23b642a
Revert "Use single threaded scheduler for more predictability"
cecton Jul 16, 2020
5b5becb
Revert "Revert "It is possible that the tasks are ended first""
cecton Jul 16, 2020
bc431b6
This cannot be verified either with a threaded pool
cecton Jul 16, 2020
aaf038c
Merge commit 1be02953d4eb521ac1d40e55c71b44e2031ac105 (no conflict)
cecton Jul 17, 2020
2a1a2ac
Merge commit 47be8d939148b0cb0d98d9ba132f082829c12e04 (no conflict)
cecton Jul 17, 2020
c079e31
Merge commit 5c43b2bebb331ebeaac5b6e21778203b1c73aa83 (no conflict)
cecton Jul 21, 2020
fbcb752
Merge commit cd67889e08b8f79af00c159b35f126c1cb106dda (no conflict)
cecton Jul 21, 2020
e4ad84a
Merge commit 86f85949329f0b27b56c41cd02ae30413e62fa5e (conflicts)
cecton Jul 21, 2020
b5e5fe2
Apply suggestions from code review
cecton Jul 21, 2020
5fd98a6
Merge commit e3bb2cea3187fca77fd892becd35294a6cd7daeb (no conflict)
cecton Jul 22, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,5 @@ substrate-test-runtime-client = { version = "2.0.0-rc4", path = "../../test-util
sp-consensus-babe = { version = "0.8.0-rc4", path = "../../primitives/consensus/babe" }
grandpa = { version = "0.8.0-rc4", package = "sc-finality-grandpa", path = "../finality-grandpa" }
grandpa-primitives = { version = "2.0.0-rc4", package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" }
tokio = { version = "0.2", default-features = false }
async-std = { version = "1.6", default-features = false }
35 changes: 14 additions & 21 deletions client/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ impl std::convert::From<PathBuf> for BasePath {
}
}

type TaskExecutorInner = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync>;
// NOTE: here for code readability.
pub(crate) type SomeFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
pub(crate) type JoinFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

/// Callable object that execute tasks.
///
Expand All @@ -275,56 +277,47 @@ type TaskExecutorInner = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, Ta
///
/// ```
/// # use sc_service::TaskExecutor;
/// # mod tokio { pub mod runtime {
/// # #[derive(Clone)]
/// # pub struct Runtime;
/// # impl Runtime {
/// # pub fn new() -> Result<Self, ()> { Ok(Runtime) }
/// # pub fn handle(&self) -> &Self { &self }
/// # pub fn spawn(&self, _: std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>>) {}
/// # }
/// # } }
/// use futures::future::FutureExt;
/// use tokio::runtime::Runtime;
///
/// let runtime = Runtime::new().unwrap();
/// let handle = runtime.handle().clone();
/// let task_executor: TaskExecutor = (move |future, _task_type| {
/// handle.spawn(future);
/// handle.spawn(future).map(|_| ())
/// }).into();
/// ```
///
/// ## Using async-std
///
/// ```
/// # use sc_service::TaskExecutor;
/// # mod async_std { pub mod task {
/// # pub fn spawn(_: std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>>) {}
/// # } }
/// let task_executor: TaskExecutor = (|future, _task_type| {
/// async_std::task::spawn(future);
/// // NOTE: async-std's JoinHandle is not a Result so we don't need to map the result
/// async_std::task::spawn(future)
/// }).into();
/// ```
#[derive(Clone)]
pub struct TaskExecutor(TaskExecutorInner);
pub struct TaskExecutor(Arc<dyn Fn(SomeFuture, TaskType) -> JoinFuture + Send + Sync>);

impl std::fmt::Debug for TaskExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "TaskExecutor")
}
}

impl<F> std::convert::From<F> for TaskExecutor
impl<F, FUT> std::convert::From<F> for TaskExecutor
where
F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync + 'static,
F: Fn(SomeFuture, TaskType) -> FUT + Send + Sync + 'static,
FUT: Future<Output = ()> + Send + 'static,
{
fn from(x: F) -> Self {
Self(Arc::new(x))
fn from(func: F) -> Self {
Self(Arc::new(move |fut, tt| Box::pin(func(fut, tt))))
}
}

impl TaskExecutor {
/// Spawns a new asynchronous task.
pub fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>, task_type: TaskType) {
pub fn spawn(&self, future: SomeFuture, task_type: TaskType) -> JoinFuture {
self.0(future, task_type)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

use std::{panic, result::Result, pin::Pin};
use exit_future::Signal;
use log::debug;
use log::{debug, error};
use futures::{
Future, FutureExt, StreamExt,
future::{select, Either, BoxFuture},
Expand All @@ -30,16 +30,19 @@ use prometheus_endpoint::{
};
use sc_client_api::CloneableSpawn;
use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded};
use crate::{config::{TaskExecutor, TaskType}, Error};
use crate::{config::{TaskExecutor, TaskType, JoinFuture}, Error};

mod prometheus_future;
#[cfg(test)]
mod tests;

/// An handle for spawning tasks in the service.
#[derive(Clone)]
pub struct SpawnTaskHandle {
on_exit: exit_future::Exit,
executor: TaskExecutor,
metrics: Option<Metrics>,
task_notifier: TracingUnboundedSender<JoinFuture>,
}

impl SpawnTaskHandle {
Expand Down Expand Up @@ -67,6 +70,11 @@ impl SpawnTaskHandle {
task: impl Future<Output = ()> + Send + 'static,
task_type: TaskType,
) {
if self.task_notifier.is_closed() {
debug!("Attempt to spawn a new task has been prevented: {}", name);
return;
}

let on_exit = self.on_exit.clone();
let metrics = self.metrics.clone();

Expand Down Expand Up @@ -111,7 +119,16 @@ impl SpawnTaskHandle {
}
};

self.executor.spawn(Box::pin(future), task_type);
let join_handle = self.executor.spawn(Box::pin(future), task_type);
let mut task_notifier = self.task_notifier.clone();
self.executor.spawn(
Box::pin(async move {
if let Err(err) = task_notifier.send(join_handle).await {
error!("Could not send spawned task handle to queue: {}", err);
}
}),
TaskType::Async,
);
}
}

Expand Down Expand Up @@ -193,12 +210,12 @@ impl SpawnEssentialTaskHandle {
task: impl Future<Output = ()> + Send + 'static,
task_type: TaskType,
) {
let mut essential_failed = self.essential_failed_tx.clone();
let essential_failed = self.essential_failed_tx.clone();
let essential_task = std::panic::AssertUnwindSafe(task)
.catch_unwind()
.map(move |_| {
log::error!("Essential task `{}` failed. Shutting down service.", name);
let _ = essential_failed.send(());
let _ = essential_failed.close_channel();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending didn't work for some reason (it doesn't work in the current version of substrate!). I think it is fine as it as it is an internal detail. I also added a test to make sure that if we ever make this feature more sophisticated it will at least ensure that fn future() stops properly.

});

let _ = self.inner.spawn_inner(name, essential_task, task_type);
Expand All @@ -223,6 +240,8 @@ pub struct TaskManager {
essential_failed_rx: TracingUnboundedReceiver<()>,
/// Things to keep alive until the task manager is dropped.
keep_alive: Box<dyn std::any::Any + Send + Sync>,
task_notifier: TracingUnboundedSender<JoinFuture>,
cecton marked this conversation as resolved.
Show resolved Hide resolved
completion_future: JoinFuture,
}

impl TaskManager {
Expand All @@ -233,11 +252,18 @@ impl TaskManager {
prometheus_registry: Option<&Registry>
) -> Result<Self, PrometheusError> {
let (signal, on_exit) = exit_future::signal();

// A side-channel for essential tasks to communicate shutdown.
let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks");

let metrics = prometheus_registry.map(Metrics::register).transpose()?;

let (task_notifier, background_tasks) = tracing_unbounded("mpsc_background_tasks");
let completion_future = executor.spawn(
Box::pin(background_tasks.for_each_concurrent(None, |x| x)),
cecton marked this conversation as resolved.
Show resolved Hide resolved
TaskType::Async,
);

Ok(Self {
on_exit,
signal: Some(signal),
Expand All @@ -246,16 +272,18 @@ impl TaskManager {
essential_failed_tx,
essential_failed_rx,
keep_alive: Box::new(()),
task_notifier,
completion_future,
})
}


/// Get a handle for spawning tasks.
pub fn spawn_handle(&self) -> SpawnTaskHandle {
SpawnTaskHandle {
on_exit: self.on_exit.clone(),
executor: self.executor.clone(),
metrics: self.metrics.clone(),
task_notifier: self.task_notifier.clone(),
}
}

Expand All @@ -264,19 +292,43 @@ impl TaskManager {
SpawnEssentialTaskHandle::new(self.essential_failed_tx.clone(), self.spawn_handle())
}

/// Return a future that will end if an essential task fails.
/// Send the signal for termination, prevent new tasks to be created, await for all the existing
/// tasks to be finished and drop the object. You can consider this as an async drop.
pub fn clean_shutdown(mut self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
self.terminate();
let keep_alive = self.keep_alive;
let completion_future = self.completion_future;

Box::pin(async move {
completion_future.await;
drop(keep_alive);
})
}

/// Return a future that will end with success if the signal to terminate was sent
/// (`self.terminate()`) or with an error if an essential task fails.
///
/// # Warning
///
/// This function will not end the remaining task. You must call and await `clean_shutdown()`
/// after this.
pub fn future<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
Box::pin(async move {
self.essential_failed_rx.next().await;
let mut t1 = self.essential_failed_rx.next().fuse();
let mut t2 = self.on_exit.clone().fuse();

Err(Error::Other("Essential task failed.".into()))
futures::select! {
_ = t1 => Err(Error::Other("Essential task failed.".into())),
_ = t2 => Ok(()),
}
})
}

/// Signal to terminate all the running tasks.
pub fn terminate(&mut self) {
if let Some(signal) = self.signal.take() {
let _ = signal.fire();
self.task_notifier.close_channel();
}
}

Expand All @@ -286,13 +338,6 @@ impl TaskManager {
}
}

impl Drop for TaskManager {
fn drop(&mut self) {
debug!(target: "service", "Tasks manager shutdown");
self.terminate();
}
}

#[derive(Clone)]
struct Metrics {
// This list is ordered alphabetically
Expand Down
Loading