Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Graceful shutdown for the task manager #6654

Merged
33 commits merged into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a245e96
Initial commit
cecton Jul 14, 2020
fe1c372
Move task_manager.rs to mod.rs
cecton Jul 14, 2020
163501a
Graceful shutdown for the task manager
cecton Jul 14, 2020
cf228c0
Await all background task JoinHandle at the same time
cecton Jul 15, 2020
551268b
Add tests
cecton Jul 15, 2020
b3c0648
Make future() wait also for exit signal + fix essential task failed
cecton Jul 15, 2020
2855ada
add comments for non-obvious code
cecton Jul 15, 2020
db34d96
Use clean_shutdown() in sc-cli
cecton Jul 15, 2020
2dcd0b3
Adapt code and upgrade tokio in sc-cli
cecton Jul 15, 2020
42333f4
cleanup spacing in doc
cecton Jul 15, 2020
1198324
Add license
cecton Jul 15, 2020
56c8139
I guess actually running the clean shutdown would be a good idea
cecton Jul 15, 2020
ef3a054
fix tests
cecton Jul 15, 2020
6f656cf
Merge remote-tracking branch 'origin/master' into cecton-async-gracef…
gnunicorn Jul 16, 2020
4e6b8d2
Update client/cli/src/runner.rs
cecton Jul 16, 2020
5dbe179
Improve error logging
cecton Jul 16, 2020
c133c59
disable other tests (can't reproduce on my machine)
cecton Jul 16, 2020
1fb6481
Revert "disable other tests (can't reproduce on my machine)"
cecton Jul 16, 2020
502aba4
It is possible that the tasks are ended first
cecton Jul 16, 2020
1b91a8c
Revert "It is possible that the tasks are ended first"
cecton Jul 16, 2020
ee5e13c
Use single threaded scheduler for more predictability
cecton Jul 16, 2020
4e15214
enable_time
cecton Jul 16, 2020
fbcefa4
Revert "enable_time"
cecton Jul 16, 2020
23b642a
Revert "Use single threaded scheduler for more predictability"
cecton Jul 16, 2020
5b5becb
Revert "Revert "It is possible that the tasks are ended first""
cecton Jul 16, 2020
bc431b6
This cannot be verified either with a threaded pool
cecton Jul 16, 2020
aaf038c
Merge commit 1be02953d4eb521ac1d40e55c71b44e2031ac105 (no conflict)
cecton Jul 17, 2020
2a1a2ac
Merge commit 47be8d939148b0cb0d98d9ba132f082829c12e04 (no conflict)
cecton Jul 17, 2020
c079e31
Merge commit 5c43b2bebb331ebeaac5b6e21778203b1c73aa83 (no conflict)
cecton Jul 21, 2020
fbcb752
Merge commit cd67889e08b8f79af00c159b35f126c1cb106dda (no conflict)
cecton Jul 21, 2020
e4ad84a
Merge commit 86f85949329f0b27b56c41cd02ae30413e62fa5e (conflicts)
cecton Jul 21, 2020
b5e5fe2
Apply suggestions from code review
cecton Jul 21, 2020
5fd98a6
Merge commit e3bb2cea3187fca77fd892becd35294a6cd7daeb (no conflict)
cecton Jul 22, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ regex = "1.3.1"
time = "0.1.42"
ansi_term = "0.12.1"
lazy_static = "1.4.0"
tokio = { version = "0.2.9", features = [ "signal", "rt-core", "rt-threaded" ] }
tokio = { version = "0.2.21", features = [ "signal", "rt-core", "rt-threaded", "blocking" ] }
futures = "0.3.4"
fdlimit = "0.1.4"
serde_json = "1.0.41"
Expand Down
23 changes: 10 additions & 13 deletions client/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ pub fn build_runtime() -> std::result::Result<tokio::runtime::Runtime, std::io::
}

fn run_until_exit<FUT, ERR>(
mut tokio_runtime: tokio::runtime::Runtime,
future: FUT,
mut tokio_runtime: tokio::runtime::Runtime,
future: FUT,
mut task_manager: TaskManager,
) -> Result<()>
where
Expand Down Expand Up @@ -127,15 +127,13 @@ impl<C: SubstrateCli> Runner<C> {
let runtime_handle = tokio_runtime.handle().clone();

let task_executor = move |fut, task_type| {
match task_type {
TaskType::Async => { runtime_handle.spawn(fut); }
TaskType::Blocking => {
runtime_handle.spawn(async move {
// `spawn_blocking` is looking for the current runtime, and as such has to
// be called from within `spawn`.
tokio::task::spawn_blocking(move || futures::executor::block_on(fut))
});
}
let map_result = |_| ();

cecton marked this conversation as resolved.
Show resolved Hide resolved
match task_type{
cecton marked this conversation as resolved.
Show resolved Hide resolved
TaskType::Async => runtime_handle.spawn(fut).map(map_result),
cecton marked this conversation as resolved.
Show resolved Hide resolved
TaskType::Blocking =>
runtime_handle.spawn_blocking(move || futures::executor::block_on(fut))
.map(map_result),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The upgrade to 0.2.21 allows easier writing

cecton marked this conversation as resolved.
Show resolved Hide resolved
}
};

Expand Down Expand Up @@ -235,8 +233,7 @@ impl<C: SubstrateCli> Runner<C> {
let mut task_manager = initialise(self.config)?;
self.tokio_runtime.block_on(main(task_manager.future().fuse()))
.map_err(|e| e.to_string())?;
task_manager.terminate();
drop(self.tokio_runtime);
self.tokio_runtime.block_on(task_manager.clean_shutdown());
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions client/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,5 @@ substrate-test-runtime-client = { version = "2.0.0-rc4", path = "../../test-util
sp-consensus-babe = { version = "0.8.0-rc4", path = "../../primitives/consensus/babe" }
grandpa = { version = "0.8.0-rc4", package = "sc-finality-grandpa", path = "../finality-grandpa" }
grandpa-primitives = { version = "2.0.0-rc4", package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" }
tokio = { version = "0.2", default-features = false }
async-std = { version = "1.6", default-features = false }
39 changes: 16 additions & 23 deletions client/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ impl std::convert::From<PathBuf> for BasePath {
}
}

type TaskExecutorInner = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync>;
// NOTE: here for code readability.
pub(crate) type SomeFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
pub(crate) type JoinFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

/// Callable object that execute tasks.
///
Expand All @@ -275,56 +277,47 @@ type TaskExecutorInner = Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>, Ta
///
/// ```
/// # use sc_service::TaskExecutor;
/// # mod tokio { pub mod runtime {
/// # #[derive(Clone)]
/// # pub struct Runtime;
/// # impl Runtime {
/// # pub fn new() -> Result<Self, ()> { Ok(Runtime) }
/// # pub fn handle(&self) -> &Self { &self }
/// # pub fn spawn(&self, _: std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>>) {}
/// # }
/// # } }
/// use futures::future::FutureExt;
/// use tokio::runtime::Runtime;
///
/// let runtime = Runtime::new().unwrap();
/// let handle = runtime.handle().clone();
/// let task_executor: TaskExecutor = (move |future, _task_type| {
/// handle.spawn(future);
/// }).into();
/// handle.spawn(future).map(|_| ())
/// }).into();
/// ```
///
/// ## Using async-std
///
/// ```
/// # use sc_service::TaskExecutor;
/// # mod async_std { pub mod task {
/// # pub fn spawn(_: std::pin::Pin<Box<dyn futures::future::Future<Output = ()> + Send>>) {}
/// # } }
/// let task_executor: TaskExecutor = (|future, _task_type| {
/// async_std::task::spawn(future);
/// }).into();
/// // NOTE: async-std's JoinHandle is not a Result so we don't need to map the result
/// async_std::task::spawn(future)
/// }).into();
/// ```
#[derive(Clone)]
pub struct TaskExecutor(TaskExecutorInner);
pub struct TaskExecutor(Arc<dyn Fn(SomeFuture, TaskType) -> JoinFuture + Send + Sync>);

impl std::fmt::Debug for TaskExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "TaskExecutor")
}
}

impl<F> std::convert::From<F> for TaskExecutor
impl<F, FUT> std::convert::From<F> for TaskExecutor
where
F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync + 'static,
F: Fn(SomeFuture, TaskType) -> FUT + Send + Sync + 'static,
FUT: Future<Output = ()> + Send + 'static,
{
fn from(x: F) -> Self {
Self(Arc::new(x))
fn from(func: F) -> Self {
Self(Arc::new(move |fut, tt| Box::pin(func(fut, tt))))
}
}

impl TaskExecutor {
/// Spawns a new asynchronous task.
pub fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>, task_type: TaskType) {
pub fn spawn(&self, future: SomeFuture, task_type: TaskType) -> JoinFuture {
self.0(future, task_type)
}
}
Loading