Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
impl From<Fn> for TaskExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
cecton committed Jun 18, 2020
1 parent 563da0a commit 7b0590e
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 39 deletions.
28 changes: 12 additions & 16 deletions client/cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use futures::pin_mut;
use futures::select;
use futures::{future, future::FutureExt, Future};
use log::info;
use sc_service::{
AbstractService, Configuration, Role, ServiceBuilderCommand, TaskType, config::TaskExecutor,
};
use sc_service::{AbstractService, Configuration, Role, ServiceBuilderCommand, TaskType};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use sp_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
use sp_version::RuntimeVersion;
Expand Down Expand Up @@ -121,23 +119,21 @@ impl<C: SubstrateCli> Runner<C> {
let tokio_runtime = build_runtime()?;
let runtime_handle = tokio_runtime.handle().clone();

let task_executor = TaskExecutor::from_fn(
move |fut, task_type| {
match task_type {
TaskType::Async => { runtime_handle.spawn(fut); }
TaskType::Blocking => {
runtime_handle.spawn( async move {
// `spawn_blocking` is looking for the current runtime, and as such has to be called
// from within `spawn`.
tokio::task::spawn_blocking(move || futures::executor::block_on(fut))
});
}
let task_executor = move |fut, task_type| {
match task_type {
TaskType::Async => { runtime_handle.spawn(fut); }
TaskType::Blocking => {
runtime_handle.spawn( async move {
// `spawn_blocking` is looking for the current runtime, and as such has to
// be called from within `spawn`.
tokio::task::spawn_blocking(move || futures::executor::block_on(fut))
});
}
}
);
};

Ok(Runner {
config: command.create_configuration(cli, task_executor)?,
config: command.create_configuration(cli, task_executor.into())?,
tokio_runtime,
phantom: PhantomData,
})
Expand Down
21 changes: 6 additions & 15 deletions client/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,14 @@ impl std::fmt::Debug for TaskExecutor {
}
}

/*
impl std::convert::From<TaskExecutorInner> for TaskExecutor {
fn from(x: TaskExecutorInner)
-> Self {
Self(x)
impl<F> std::convert::From<F> for TaskExecutor
where
F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync + 'static,
{
fn from(x: F) -> Self {
Self(Arc::new(x))
}
}
*/

impl std::ops::Deref for TaskExecutor {
type Target = TaskExecutorInner;
Expand All @@ -282,12 +282,3 @@ impl std::ops::Deref for TaskExecutor {
&self.0
}
}

impl TaskExecutor {
/// Create a `TaskExecutor` from a function
pub fn from_fn(
f: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>, TaskType) + Send + Sync + 'static,
) -> Self {
Self(Arc::new(f))
}
}
10 changes: 4 additions & 6 deletions client/service/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,11 @@ impl<G, E, F, L, U> TestNet<G, E, F, L, U> where
authorities: impl Iterator<Item = (String, impl FnOnce(Configuration) -> Result<(F, U), Error>)>
) {
let executor = self.runtime.executor();
let task_executor = {
let task_executor: TaskExecutor = {
let executor = executor.clone();
TaskExecutor::from_fn(
move |fut: Pin<Box<dyn futures::Future<Output = ()> + Send>>, _| {
executor.spawn(fut.unit_error().compat());
},
)
(move |fut: Pin<Box<dyn futures::Future<Output = ()> + Send>>, _| {
executor.spawn(fut.unit_error().compat());
}).into()
};

for (key, authority) in authorities {
Expand Down
4 changes: 2 additions & 2 deletions utils/browser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use log::{debug, info};
use sc_network::config::TransportConfig;
use sc_service::{
AbstractService, RpcSession, Role, Configuration,
config::{DatabaseConfig, KeystoreConfig, NetworkConfiguration, TaskExecutor},
config::{DatabaseConfig, KeystoreConfig, NetworkConfiguration},
GenericChainSpec, RuntimeGenesis
};
use wasm_bindgen::prelude::*;
Expand Down Expand Up @@ -63,7 +63,7 @@ where
network,
telemetry_endpoints: chain_spec.telemetry_endpoints().clone(),
chain_spec: Box::new(chain_spec),
task_executor: TaskExecutor::from_fn(|fut, _| wasm_bindgen_futures::spawn_local(fut)),
task_executor: (|fut, _| wasm_bindgen_futures::spawn_local(fut)).into(),
telemetry_external_transport: Some(transport),
role: Role::Light,
database: {
Expand Down

0 comments on commit 7b0590e

Please sign in to comment.