Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Graceful shutdown for the task manager #6654

Merged
33 commits merged into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a245e96
Initial commit
cecton Jul 14, 2020
fe1c372
Move task_manager.rs to mod.rs
cecton Jul 14, 2020
163501a
Graceful shutdown for the task manager
cecton Jul 14, 2020
cf228c0
Await all background task JoinHandle at the same time
cecton Jul 15, 2020
551268b
Add tests
cecton Jul 15, 2020
b3c0648
Make future() wait also for exit signal + fix essential task failed
cecton Jul 15, 2020
2855ada
add comments for non-obvious code
cecton Jul 15, 2020
db34d96
Use clean_shutdown() in sc-cli
cecton Jul 15, 2020
2dcd0b3
Adapt code and upgrade tokio in sc-cli
cecton Jul 15, 2020
42333f4
cleanup spacing in doc
cecton Jul 15, 2020
1198324
Add license
cecton Jul 15, 2020
56c8139
I guess actually running the clean shutdown would be a good idea
cecton Jul 15, 2020
ef3a054
fix tests
cecton Jul 15, 2020
6f656cf
Merge remote-tracking branch 'origin/master' into cecton-async-gracef…
gnunicorn Jul 16, 2020
4e6b8d2
Update client/cli/src/runner.rs
cecton Jul 16, 2020
5dbe179
Improve error logging
cecton Jul 16, 2020
c133c59
disable other tests (can't reproduce on my machine)
cecton Jul 16, 2020
1fb6481
Revert "disable other tests (can't reproduce on my machine)"
cecton Jul 16, 2020
502aba4
It is possible that the tasks are ended first
cecton Jul 16, 2020
1b91a8c
Revert "It is possible that the tasks are ended first"
cecton Jul 16, 2020
ee5e13c
Use single threaded scheduler for more predictability
cecton Jul 16, 2020
4e15214
enable_time
cecton Jul 16, 2020
fbcefa4
Revert "enable_time"
cecton Jul 16, 2020
23b642a
Revert "Use single threaded scheduler for more predictability"
cecton Jul 16, 2020
5b5becb
Revert "Revert "It is possible that the tasks are ended first""
cecton Jul 16, 2020
bc431b6
This cannot be verified either with a threaded pool
cecton Jul 16, 2020
aaf038c
Merge commit 1be02953d4eb521ac1d40e55c71b44e2031ac105 (no conflict)
cecton Jul 17, 2020
2a1a2ac
Merge commit 47be8d939148b0cb0d98d9ba132f082829c12e04 (no conflict)
cecton Jul 17, 2020
c079e31
Merge commit 5c43b2bebb331ebeaac5b6e21778203b1c73aa83 (no conflict)
cecton Jul 21, 2020
fbcb752
Merge commit cd67889e08b8f79af00c159b35f126c1cb106dda (no conflict)
cecton Jul 21, 2020
e4ad84a
Merge commit 86f85949329f0b27b56c41cd02ae30413e62fa5e (conflicts)
cecton Jul 21, 2020
b5e5fe2
Apply suggestions from code review
cecton Jul 21, 2020
5fd98a6
Merge commit e3bb2cea3187fca77fd892becd35294a6cd7daeb (no conflict)
cecton Jul 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions client/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ impl std::convert::From<PathBuf> for BasePath {
}
}

type TaskExecutorInner = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync>;
// NOTE: here for code readability.
pub(crate) type SomeFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
pub(crate) type JoinFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

/// Callable object that execute tasks.
///
Expand All @@ -281,15 +283,16 @@ type TaskExecutorInner = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, Ta
/// # impl Runtime {
/// # pub fn new() -> Result<Self, ()> { Ok(Runtime) }
/// # pub fn handle(&self) -> &Self { &self }
/// # pub fn spawn(&self, _: std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>>) {}
/// # pub fn spawn(&self, _: std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>>)
/// # -> std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>> { Box::pin(async {}) }
/// # }
/// # } }
/// use tokio::runtime::Runtime;
///
/// let runtime = Runtime::new().unwrap();
/// let handle = runtime.handle().clone();
/// let task_executor: TaskExecutor = (move |future, _task_type| {
/// handle.spawn(future);
/// handle.spawn(future)
/// }).into();
/// ```
///
Expand All @@ -298,14 +301,15 @@ type TaskExecutorInner = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, Ta
/// ```
/// # use sc_service::TaskExecutor;
/// # mod async_std { pub mod task {
/// # pub fn spawn(_: std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>>) {}
/// # pub fn spawn(_: std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>>)
/// # -> std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>> { Box::pin(async {}) }
/// # } }
/// let task_executor: TaskExecutor = (|future, _task_type| {
/// async_std::task::spawn(future);
/// async_std::task::spawn(future)
/// }).into();
/// ```
#[derive(Clone)]
pub struct TaskExecutor(TaskExecutorInner);
pub struct TaskExecutor(Arc<dyn Fn(SomeFuture, TaskType) -> JoinFuture + Send + Sync>);

impl std::fmt::Debug for TaskExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
Expand All @@ -315,7 +319,7 @@ impl std::fmt::Debug for TaskExecutor {

impl<F> std::convert::From<F> for TaskExecutor
where
F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync + 'static,
F: Fn(SomeFuture, TaskType) -> JoinFuture + Send + Sync + 'static,
{
fn from(x: F) -> Self {
Self(Arc::new(x))
Expand All @@ -324,7 +328,7 @@ where

impl TaskExecutor {
/// Spawns a new asynchronous task.
pub fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>, task_type: TaskType) {
pub fn spawn(&self, future: SomeFuture, task_type: TaskType) -> JoinFuture {
self.0(future, task_type)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

use std::{panic, result::Result, pin::Pin};
use exit_future::Signal;
use log::debug;
use log::{debug, error};
use futures::{
Future, FutureExt, StreamExt,
future::{select, Either, BoxFuture},
Expand All @@ -30,7 +30,7 @@ use prometheus_endpoint::{
};
use sc_client_api::CloneableSpawn;
use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded};
use crate::{config::{TaskExecutor, TaskType}, Error};
use crate::{config::{TaskExecutor, TaskType, JoinFuture}, Error};

mod prometheus_future;

Expand All @@ -40,6 +40,7 @@ pub struct SpawnTaskHandle {
on_exit: exit_future::Exit,
executor: TaskExecutor,
metrics: Option<Metrics>,
task_notifier: TracingUnboundedSender<JoinFuture>,
}

impl SpawnTaskHandle {
Expand Down Expand Up @@ -67,6 +68,11 @@ impl SpawnTaskHandle {
task: impl Future<Output = ()> + Send + 'static,
task_type: TaskType,
) {
if self.task_notifier.is_closed() {
debug!("Attempt to spawn a new task has been prevented: {}", name);
return;
}

let on_exit = self.on_exit.clone();
let metrics = self.metrics.clone();

Expand Down Expand Up @@ -111,7 +117,15 @@ impl SpawnTaskHandle {
}
};

self.executor.spawn(Box::pin(future), task_type);
let join_handle = self.executor.spawn(Box::pin(future), task_type);
let mut task_notifier = self.task_notifier.clone();
self.executor.spawn(Box::pin(async move {
if let Err(err) = task_notifier.send(join_handle).await {
error!("Could not send spawned task handle to queue: {}", err);
}
}),
TaskType::Async,
);
}
}

Expand Down Expand Up @@ -223,6 +237,8 @@ pub struct TaskManager {
essential_failed_rx: TracingUnboundedReceiver<()>,
/// Things to keep alive until the task manager is dropped.
keep_alive: Box<dyn std::any::Any + Send + Sync>,
task_notifier: TracingUnboundedSender<JoinFuture>,
cecton marked this conversation as resolved.
Show resolved Hide resolved
completion_future: JoinFuture,
}

impl TaskManager {
Expand All @@ -233,11 +249,19 @@ impl TaskManager {
prometheus_registry: Option<&Registry>
) -> Result<Self, PrometheusError> {
let (signal, on_exit) = exit_future::signal();

// A side-channel for essential tasks to communicate shutdown.
let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks");

let metrics = prometheus_registry.map(Metrics::register).transpose()?;

let (task_notifier, mut background_tasks) = tracing_unbounded("mpsc_background_tasks");
let completion_future = executor.spawn(Box::pin(async move {
while let Some(handle) = background_tasks.next().await {
handle.await;
}
}), TaskType::Async);

Ok(Self {
on_exit,
signal: Some(signal),
Expand All @@ -246,6 +270,8 @@ impl TaskManager {
essential_failed_tx,
essential_failed_rx,
keep_alive: Box::new(()),
task_notifier,
completion_future,
})
}

Expand All @@ -256,6 +282,7 @@ impl TaskManager {
on_exit: self.on_exit.clone(),
executor: self.executor.clone(),
metrics: self.metrics.clone(),
task_notifier: self.task_notifier.clone(),
}
}

Expand All @@ -264,6 +291,19 @@ impl TaskManager {
SpawnEssentialTaskHandle::new(self.essential_failed_tx.clone(), self.spawn_handle())
}

/// Send the signal for termination, prevent new tasks to be created, await for all the existing
/// tasks to finished.
pub fn clean_shutdown(mut self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
self.terminate();
let keep_alive = self.keep_alive;
let completion_future = self.completion_future;

Box::pin(async move {
completion_future.await;
drop(keep_alive)
})
}

/// Return a future that will end if an essential task fails.
pub fn future<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'a>> {
Box::pin(async move {
Expand All @@ -277,6 +317,7 @@ impl TaskManager {
pub fn terminate(&mut self) {
if let Some(signal) = self.signal.take() {
let _ = signal.fire();
self.task_notifier.close_channel();
}
}

Expand All @@ -286,13 +327,6 @@ impl TaskManager {
}
}

impl Drop for TaskManager {
fn drop(&mut self) {
debug!(target: "service", "Tasks manager shutdown");
self.terminate();
}
}

#[derive(Clone)]
struct Metrics {
// This list is ordered alphabetically
Expand Down