diff --git a/Cargo.lock b/Cargo.lock index 1dda817059975..a79f6134a88d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2120,7 +2120,7 @@ dependencies = [ "indexmap", "log", "slab", - "tokio 0.2.18", + "tokio 0.2.21", "tokio-util", ] @@ -2342,7 +2342,7 @@ dependencies = [ "net2", "pin-project", "time", - "tokio 0.2.18", + "tokio 0.2.21", "tower-service", "want 0.3.0", ] @@ -2360,7 +2360,7 @@ dependencies = [ "log", "rustls", "rustls-native-certs", - "tokio 0.2.18", + "tokio 0.2.21", "tokio-rustls", "webpki", ] @@ -6230,7 +6230,7 @@ dependencies = [ "substrate-prometheus-endpoint", "tempfile", "time", - "tokio 0.2.18", + "tokio 0.2.21", ] [[package]] @@ -6470,7 +6470,7 @@ dependencies = [ "substrate-test-runtime-client", "substrate-test-runtime-transaction-pool", "tempfile", - "tokio 0.2.18", + "tokio 0.2.21", ] [[package]] @@ -6661,7 +6661,7 @@ dependencies = [ "substrate-prometheus-endpoint", "substrate-test-runtime-client", "tempfile", - "tokio 0.2.18", + "tokio 0.2.21", ] [[package]] @@ -6866,7 +6866,7 @@ dependencies = [ "sp-utils", "substrate-test-runtime-client", "threadpool", - "tokio 0.2.18", + "tokio 0.2.21", ] [[package]] @@ -6984,6 +6984,7 @@ dependencies = [ name = "sc-service" version = "0.8.0-rc5" dependencies = [ + "async-std", "derive_more", "directories", "exit-future", @@ -7041,6 +7042,7 @@ dependencies = [ "substrate-test-runtime-client", "sysinfo", "tempfile", + "tokio 0.2.21", "tracing", "wasm-timer", ] @@ -8489,7 +8491,7 @@ dependencies = [ "sc-rpc-api", "serde", "sp-storage", - "tokio 0.2.18", + "tokio 0.2.21", ] [[package]] @@ -8527,7 +8529,7 @@ dependencies = [ "hyper 0.13.4", "log", "prometheus", - "tokio 0.2.18", + "tokio 0.2.21", ] [[package]] @@ -9002,9 +9004,9 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.18" +version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34ef16d072d2b6dc8b4a56c70f5c5ced1a37752116f8e7c1e80c659aa7cb6713" +checksum = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58" dependencies = [ "bytes 0.5.4", "fnv", @@ -9149,7 +9151,7 @@ checksum = "228139ddd4fea3fa345a29233009635235833e52807af7ea6448ead03890d6a9" dependencies = [ "futures-core", "rustls", - "tokio 0.2.18", + "tokio 0.2.21", "webpki", ] @@ -9270,7 +9272,7 @@ dependencies = [ "futures-sink", "log", "pin-project-lite", - "tokio 0.2.18", + "tokio 0.2.21", ] [[package]] diff --git a/client/cli/Cargo.toml b/client/cli/Cargo.toml index a63b371b70ac6..cde64ad6738b5 100644 --- a/client/cli/Cargo.toml +++ b/client/cli/Cargo.toml @@ -20,7 +20,7 @@ regex = "1.3.1" time = "0.1.42" ansi_term = "0.12.1" lazy_static = "1.4.0" -tokio = { version = "0.2.9", features = [ "signal", "rt-core", "rt-threaded" ] } +tokio = { version = "0.2.21", features = [ "signal", "rt-core", "rt-threaded", "blocking" ] } futures = "0.3.4" fdlimit = "0.1.4" serde_json = "1.0.41" diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 05445c9d85d85..219613e6bddee 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -94,8 +94,8 @@ pub fn build_runtime() -> std::result::Result( - mut tokio_runtime: tokio::runtime::Runtime, - future: FUT, + mut tokio_runtime: tokio::runtime::Runtime, + future: FUT, mut task_manager: TaskManager, ) -> Result<()> where @@ -128,14 +128,10 @@ impl Runner { let task_executor = move |fut, task_type| { match task_type { - TaskType::Async => { runtime_handle.spawn(fut); } - TaskType::Blocking => { - runtime_handle.spawn(async move { - // `spawn_blocking` is looking for the current runtime, and as such has to - // be called from within `spawn`. - tokio::task::spawn_blocking(move || futures::executor::block_on(fut)) - }); - } + TaskType::Async => runtime_handle.spawn(fut).map(drop), + TaskType::Blocking => + runtime_handle.spawn_blocking(move || futures::executor::block_on(fut)) + .map(drop), } }; @@ -235,8 +231,7 @@ impl Runner { let mut task_manager = initialise(self.config)?; self.tokio_runtime.block_on(main(task_manager.future().fuse())) .map_err(|e| e.to_string())?; - task_manager.terminate(); - drop(self.tokio_runtime); + self.tokio_runtime.block_on(task_manager.clean_shutdown()); Ok(()) } diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index 77b6bb2d71828..7d321d535fa54 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -90,3 +90,5 @@ substrate-test-runtime-client = { version = "2.0.0-rc5", path = "../../test-util sp-consensus-babe = { version = "0.8.0-rc5", path = "../../primitives/consensus/babe" } grandpa = { version = "0.8.0-rc5", package = "sc-finality-grandpa", path = "../finality-grandpa" } grandpa-primitives = { version = "2.0.0-rc5", package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" } +tokio = { version = "0.2", default-features = false } +async-std = { version = "1.6", default-features = false } diff --git a/client/service/src/config.rs b/client/service/src/config.rs index 397dacd747b14..15783a87f9917 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -263,7 +263,9 @@ impl std::convert::From for BasePath { } } -type TaskExecutorInner = Arc + Send>>, TaskType) + Send + Sync>; +// NOTE: here for code readability. +pub(crate) type SomeFuture = Pin + Send>>; +pub(crate) type JoinFuture = Pin + Send>>; /// Callable object that execute tasks. /// @@ -275,37 +277,27 @@ type TaskExecutorInner = Arc + Send>>, Ta /// /// ``` /// # use sc_service::TaskExecutor; -/// # mod tokio { pub mod runtime { -/// # #[derive(Clone)] -/// # pub struct Runtime; -/// # impl Runtime { -/// # pub fn new() -> Result { Ok(Runtime) } -/// # pub fn handle(&self) -> &Self { &self } -/// # pub fn spawn(&self, _: std::pin::Pin + Send>>) {} -/// # } -/// # } } +/// use futures::future::FutureExt; /// use tokio::runtime::Runtime; /// /// let runtime = Runtime::new().unwrap(); /// let handle = runtime.handle().clone(); /// let task_executor: TaskExecutor = (move |future, _task_type| { -/// handle.spawn(future); -/// }).into(); +/// handle.spawn(future).map(|_| ()) +/// }).into(); /// ``` /// /// ## Using async-std /// /// ``` /// # use sc_service::TaskExecutor; -/// # mod async_std { pub mod task { -/// # pub fn spawn(_: std::pin::Pin + Send>>) {} -/// # } } /// let task_executor: TaskExecutor = (|future, _task_type| { -/// async_std::task::spawn(future); -/// }).into(); +/// // NOTE: async-std's JoinHandle is not a Result so we don't need to map the result +/// async_std::task::spawn(future) +/// }).into(); /// ``` #[derive(Clone)] -pub struct TaskExecutor(TaskExecutorInner); +pub struct TaskExecutor(Arc JoinFuture + Send + Sync>); impl std::fmt::Debug for TaskExecutor { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -313,18 +305,19 @@ impl std::fmt::Debug for TaskExecutor { } } -impl std::convert::From for TaskExecutor +impl std::convert::From for TaskExecutor where - F: Fn(Pin + Send>>, TaskType) + Send + Sync + 'static, + F: Fn(SomeFuture, TaskType) -> FUT + Send + Sync + 'static, + FUT: Future + Send + 'static, { - fn from(x: F) -> Self { - Self(Arc::new(x)) + fn from(func: F) -> Self { + Self(Arc::new(move |fut, tt| Box::pin(func(fut, tt)))) } } impl TaskExecutor { /// Spawns a new asynchronous task. - pub fn spawn(&self, future: Pin + Send>>, task_type: TaskType) { + pub fn spawn(&self, future: SomeFuture, task_type: TaskType) -> JoinFuture { self.0(future, task_type) } } diff --git a/client/service/src/task_manager.rs b/client/service/src/task_manager/mod.rs similarity index 81% rename from client/service/src/task_manager.rs rename to client/service/src/task_manager/mod.rs index b6cc26005570a..bd4b2cfde73ec 100644 --- a/client/service/src/task_manager.rs +++ b/client/service/src/task_manager/mod.rs @@ -15,7 +15,7 @@ use std::{panic, result::Result, pin::Pin}; use exit_future::Signal; -use log::debug; +use log::{debug, error}; use futures::{ Future, FutureExt, StreamExt, future::{select, Either, BoxFuture}, @@ -30,9 +30,11 @@ use prometheus_endpoint::{ }; use sc_client_api::CloneableSpawn; use sp_utils::mpsc::{TracingUnboundedSender, TracingUnboundedReceiver, tracing_unbounded}; -use crate::{config::{TaskExecutor, TaskType}, Error}; +use crate::{config::{TaskExecutor, TaskType, JoinFuture}, Error}; mod prometheus_future; +#[cfg(test)] +mod tests; /// An handle for spawning tasks in the service. #[derive(Clone)] @@ -40,6 +42,7 @@ pub struct SpawnTaskHandle { on_exit: exit_future::Exit, executor: TaskExecutor, metrics: Option, + task_notifier: TracingUnboundedSender, } impl SpawnTaskHandle { @@ -67,6 +70,11 @@ impl SpawnTaskHandle { task: impl Future + Send + 'static, task_type: TaskType, ) { + if self.task_notifier.is_closed() { + debug!("Attempt to spawn a new task has been prevented: {}", name); + return; + } + let on_exit = self.on_exit.clone(); let metrics = self.metrics.clone(); @@ -111,7 +119,16 @@ impl SpawnTaskHandle { } }; - self.executor.spawn(Box::pin(future), task_type); + let join_handle = self.executor.spawn(Box::pin(future), task_type); + let mut task_notifier = self.task_notifier.clone(); + self.executor.spawn( + Box::pin(async move { + if let Err(err) = task_notifier.send(join_handle).await { + error!("Could not send spawned task handle to queue: {}", err); + } + }), + TaskType::Async, + ); } } @@ -193,12 +210,12 @@ impl SpawnEssentialTaskHandle { task: impl Future + Send + 'static, task_type: TaskType, ) { - let mut essential_failed = self.essential_failed_tx.clone(); + let essential_failed = self.essential_failed_tx.clone(); let essential_task = std::panic::AssertUnwindSafe(task) .catch_unwind() .map(move |_| { log::error!("Essential task `{}` failed. Shutting down service.", name); - let _ = essential_failed.send(()); + let _ = essential_failed.close_channel(); }); let _ = self.inner.spawn_inner(name, essential_task, task_type); @@ -223,6 +240,8 @@ pub struct TaskManager { essential_failed_rx: TracingUnboundedReceiver<()>, /// Things to keep alive until the task manager is dropped. keep_alive: Box, + task_notifier: TracingUnboundedSender, + completion_future: JoinFuture, } impl TaskManager { @@ -233,11 +252,21 @@ impl TaskManager { prometheus_registry: Option<&Registry> ) -> Result { let (signal, on_exit) = exit_future::signal(); + // A side-channel for essential tasks to communicate shutdown. let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks"); let metrics = prometheus_registry.map(Metrics::register).transpose()?; + let (task_notifier, background_tasks) = tracing_unbounded("mpsc_background_tasks"); + // NOTE: for_each_concurrent will await on all the JoinHandle futures at the same time. It + // is possible to limit this but it's actually better for the memory foot print to await + // them all to not accumulate anything on that stream. + let completion_future = executor.spawn( + Box::pin(background_tasks.for_each_concurrent(None, |x| x)), + TaskType::Async, + ); + Ok(Self { on_exit, signal: Some(signal), @@ -246,16 +275,18 @@ impl TaskManager { essential_failed_tx, essential_failed_rx, keep_alive: Box::new(()), + task_notifier, + completion_future, }) } - /// Get a handle for spawning tasks. pub fn spawn_handle(&self) -> SpawnTaskHandle { SpawnTaskHandle { on_exit: self.on_exit.clone(), executor: self.executor.clone(), metrics: self.metrics.clone(), + task_notifier: self.task_notifier.clone(), } } @@ -264,12 +295,35 @@ impl TaskManager { SpawnEssentialTaskHandle::new(self.essential_failed_tx.clone(), self.spawn_handle()) } - /// Return a future that will end if an essential task fails. + /// Send the signal for termination, prevent new tasks to be created, await for all the existing + /// tasks to be finished and drop the object. You can consider this as an async drop. + pub fn clean_shutdown(mut self) -> Pin + Send>> { + self.terminate(); + let keep_alive = self.keep_alive; + let completion_future = self.completion_future; + + Box::pin(async move { + completion_future.await; + drop(keep_alive); + }) + } + + /// Return a future that will end with success if the signal to terminate was sent + /// (`self.terminate()`) or with an error if an essential task fails. + /// + /// # Warning + /// + /// This function will not wait until the end of the remaining task. You must call and await + /// `clean_shutdown()` after this. pub fn future<'a>(&'a mut self) -> Pin> + Send + 'a>> { Box::pin(async move { - self.essential_failed_rx.next().await; + let mut t1 = self.essential_failed_rx.next().fuse(); + let mut t2 = self.on_exit.clone().fuse(); - Err(Error::Other("Essential task failed.".into())) + futures::select! { + _ = t1 => Err(Error::Other("Essential task failed.".into())), + _ = t2 => Ok(()), + } }) } @@ -277,6 +331,8 @@ impl TaskManager { pub fn terminate(&mut self) { if let Some(signal) = self.signal.take() { let _ = signal.fire(); + // NOTE: task will prevent new tasks to be spawned + self.task_notifier.close_channel(); } } @@ -286,13 +342,6 @@ impl TaskManager { } } -impl Drop for TaskManager { - fn drop(&mut self) { - debug!(target: "service", "Tasks manager shutdown"); - self.terminate(); - } -} - #[derive(Clone)] struct Metrics { // This list is ordered alphabetically diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs new file mode 100644 index 0000000000000..c60d15b3394c3 --- /dev/null +++ b/client/service/src/task_manager/tests.rs @@ -0,0 +1,210 @@ +// This file is part of Substrate. + +// Copyright (C) 2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::config::TaskExecutor; +use crate::task_manager::TaskManager; +use futures::future::FutureExt; +use parking_lot::Mutex; +use std::any::Any; +use std::sync::Arc; +use std::time::Duration; + +#[derive(Clone, Debug)] +struct DropTester(Arc>); + +struct DropTesterRef(DropTester); + +impl DropTester { + fn new() -> DropTester { + DropTester(Arc::new(Mutex::new(0))) + } + + fn new_ref(&self) -> DropTesterRef { + *self.0.lock() += 1; + DropTesterRef(self.clone()) + } +} + +impl PartialEq for DropTester { + fn eq(&self, other: &usize) -> bool { + &*self.0.lock() == other + } +} + +impl Drop for DropTesterRef { + fn drop(&mut self) { + *(self.0).0.lock() -= 1; + } +} + +#[test] +fn ensure_drop_tester_working() { + let drop_tester = DropTester::new(); + assert_eq!(drop_tester, 0); + let drop_tester_ref_1 = drop_tester.new_ref(); + assert_eq!(drop_tester, 1); + let drop_tester_ref_2 = drop_tester.new_ref(); + assert_eq!(drop_tester, 2); + drop(drop_tester_ref_1); + assert_eq!(drop_tester, 1); + drop(drop_tester_ref_2); + assert_eq!(drop_tester, 0); +} + +async fn run_background_task(_keep_alive: impl Any) { + loop { + tokio::time::delay_for(Duration::from_secs(1)).await; + } +} + +async fn run_background_task_blocking(duration: Duration, _keep_alive: impl Any) { + loop { + // block for X sec (not interruptible) + std::thread::sleep(duration); + // await for 1 sec (interruptible) + tokio::time::delay_for(Duration::from_secs(1)).await; + } +} + +#[test] +fn ensure_futures_are_awaited_on_shutdown() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let task_manager = TaskManager::new(task_executor, None).unwrap(); + let spawn_handle = task_manager.spawn_handle(); + let drop_tester = DropTester::new(); + spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + assert_eq!(drop_tester, 2); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 2); + runtime.block_on(task_manager.clean_shutdown()); + assert_eq!(drop_tester, 0); +} + +#[test] +fn ensure_keep_alive_during_shutdown() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let mut task_manager = TaskManager::new(task_executor, None).unwrap(); + let spawn_handle = task_manager.spawn_handle(); + let drop_tester = DropTester::new(); + task_manager.keep_alive(drop_tester.new_ref()); + spawn_handle.spawn("task1", run_background_task(())); + assert_eq!(drop_tester, 1); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 1); + runtime.block_on(task_manager.clean_shutdown()); + assert_eq!(drop_tester, 0); +} + +#[test] +fn ensure_blocking_futures_are_awaited_on_shutdown() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let task_manager = TaskManager::new(task_executor, None).unwrap(); + let spawn_handle = task_manager.spawn_handle(); + let drop_tester = DropTester::new(); + spawn_handle.spawn( + "task1", + run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), + ); + spawn_handle.spawn( + "task2", + run_background_task_blocking(Duration::from_secs(3), drop_tester.new_ref()), + ); + assert_eq!(drop_tester, 2); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 2); + runtime.block_on(task_manager.clean_shutdown()); + assert_eq!(drop_tester, 0); +} + +#[test] +fn ensure_no_task_can_be_spawn_after_terminate() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let mut task_manager = TaskManager::new(task_executor, None).unwrap(); + let spawn_handle = task_manager.spawn_handle(); + let drop_tester = DropTester::new(); + spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + assert_eq!(drop_tester, 2); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 2); + task_manager.terminate(); + spawn_handle.spawn("task3", run_background_task(drop_tester.new_ref())); + runtime.block_on(task_manager.clean_shutdown()); + assert_eq!(drop_tester, 0); +} + +#[test] +fn ensure_task_manager_future_ends_when_task_manager_terminated() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let mut task_manager = TaskManager::new(task_executor, None).unwrap(); + let spawn_handle = task_manager.spawn_handle(); + let drop_tester = DropTester::new(); + spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + assert_eq!(drop_tester, 2); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 2); + task_manager.terminate(); + runtime.block_on(task_manager.future()).expect("future has ended without error"); + runtime.block_on(task_manager.clean_shutdown()); + assert_eq!(drop_tester, 0); +} + +#[test] +fn ensure_task_manager_future_ends_with_error_when_essential_task_ends() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let handle = runtime.handle().clone(); + let task_executor: TaskExecutor = (move |future, _| handle.spawn(future).map(|_| ())).into(); + + let mut task_manager = TaskManager::new(task_executor, None).unwrap(); + let spawn_handle = task_manager.spawn_handle(); + let spawn_essential_handle = task_manager.spawn_essential_handle(); + let drop_tester = DropTester::new(); + spawn_handle.spawn("task1", run_background_task(drop_tester.new_ref())); + spawn_handle.spawn("task2", run_background_task(drop_tester.new_ref())); + assert_eq!(drop_tester, 2); + // allow the tasks to even start + runtime.block_on(async { tokio::time::delay_for(Duration::from_secs(1)).await }); + assert_eq!(drop_tester, 2); + spawn_essential_handle.spawn("task3", async { panic!("task failed") }); + runtime.block_on(task_manager.future()).expect_err("future()'s Result must be Err"); + assert_eq!(drop_tester, 2); + runtime.block_on(task_manager.clean_shutdown()); + assert_eq!(drop_tester, 0); +} diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index b0dd2c0e257df..0d589cee7e12d 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -317,6 +317,7 @@ impl TestNet where let executor = executor.clone(); (move |fut: Pin + Send>>, _| { executor.spawn(fut.unit_error().compat()); + async {} }).into() }; diff --git a/utils/browser/src/lib.rs b/utils/browser/src/lib.rs index 9313d41bf5726..718a9b9751154 100644 --- a/utils/browser/src/lib.rs +++ b/utils/browser/src/lib.rs @@ -64,7 +64,10 @@ where network, telemetry_endpoints: chain_spec.telemetry_endpoints().clone(), chain_spec: Box::new(chain_spec), - task_executor: (|fut, _| wasm_bindgen_futures::spawn_local(fut)).into(), + task_executor: (|fut, _| { + wasm_bindgen_futures::spawn_local(fut); + async {} + }).into(), telemetry_external_transport: Some(transport), role: Role::Light, database: {