Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Optimize offchain worker api by re-using http-client (#6454)
Browse files Browse the repository at this point in the history
* Fix typo in offchain's docs

* Use Self keyword in AsyncApi::new()

* Move httpclient to be part of OffchainWorkers to optimize block import

* Fix compilation errors for tests

* Add wrapper struct for HyperClient

* Use lazy_static share SharedClient amongst OffchainWorkers. Remove the need to raise the fd limit

* Revert "Use lazy_static share SharedClient amongst OffchainWorkers. Remove the need to raise the fd limit"

This reverts commit 7af9749.

* Add lazy_static for tests
  • Loading branch information
pscott committed Jun 23, 2020
1 parent 5a102f7 commit fed834c
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/offchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ hyper-rustls = "0.20"

[dev-dependencies]
env_logger = "0.7.0"
fdlimit = "0.1.4"
sc-client-db = { version = "0.8.0-rc3", default-features = true, path = "../db/" }
sc-transaction-pool = { version = "2.0.0-rc3", path = "../../client/transaction-pool" }
sp-transaction-pool = { version = "2.0.0-rc3", path = "../../primitives/transaction-pool" }
substrate-test-runtime-client = { version = "2.0.0-rc3", path = "../../test-utils/runtime/client" }
tokio = "0.2"
lazy_static = "1.4.0"

[features]
default = []
11 changes: 8 additions & 3 deletions client/offchain/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use sp_core::offchain::{
OpaqueNetworkState, OpaquePeerId, OpaqueMultiaddr, StorageKind,
};
pub use sp_offchain::STORAGE_PREFIX;
pub use http::SharedClient;

#[cfg(not(target_os = "unknown"))]
mod http;
Expand Down Expand Up @@ -260,8 +261,9 @@ impl AsyncApi {
db: S,
network_state: Arc<dyn NetworkStateInfo + Send + Sync>,
is_validator: bool,
) -> (Api<S>, AsyncApi) {
let (http_api, http_worker) = http::http();
shared_client: SharedClient,
) -> (Api<S>, Self) {
let (http_api, http_worker) = http::http(shared_client);

let api = Api {
db,
Expand All @@ -270,7 +272,7 @@ impl AsyncApi {
http: http_api,
};

let async_api = AsyncApi {
let async_api = Self {
http: Some(http_worker),
};

Expand Down Expand Up @@ -308,11 +310,14 @@ mod tests {
let _ = env_logger::try_init();
let db = LocalStorage::new_test();
let mock = Arc::new(MockNetworkStateInfo());
let shared_client = SharedClient::new();


AsyncApi::new(
db,
mock,
false,
shared_client,
)
}

Expand Down
37 changes: 26 additions & 11 deletions client/offchain/src/api/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,22 @@ use log::error;
use sp_core::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError};
use std::{convert::TryFrom, fmt, io::Read as _, pin::Pin, task::{Context, Poll}};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
use std::sync::Arc;
use hyper::{Client as HyperClient, Body, client};
use hyper_rustls::HttpsConnector;

/// Wrapper struct used for keeping the hyper_rustls client running.
#[derive(Clone)]
pub struct SharedClient(Arc<HyperClient<HttpsConnector<client::HttpConnector>, Body>>);

impl SharedClient {
pub fn new() -> Self {
Self(Arc::new(HyperClient::builder().build(HttpsConnector::new())))
}
}

/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
pub fn http() -> (HttpApi, HttpWorker) {
pub fn http(shared_client: SharedClient) -> (HttpApi, HttpWorker) {
let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker");
let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api");

Expand All @@ -51,7 +64,7 @@ pub fn http() -> (HttpApi, HttpWorker) {
let engine = HttpWorker {
to_api,
from_api,
http_client: hyper::Client::builder().build(hyper_rustls::HttpsConnector::new()),
http_client: shared_client.0,
requests: Vec::new(),
};

Expand Down Expand Up @@ -551,7 +564,7 @@ pub struct HttpWorker {
/// Used to receive messages from the `HttpApi`.
from_api: TracingUnboundedReceiver<ApiToWorker>,
/// The engine that runs HTTP requests.
http_client: hyper::Client<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>, hyper::Body>,
http_client: Arc<HyperClient<HttpsConnector<client::HttpConnector>, Body>>,
/// HTTP requests that are being worked on by the engine.
requests: Vec<(HttpRequestId, HttpWorkerRequest)>,
}
Expand Down Expand Up @@ -685,21 +698,23 @@ impl fmt::Debug for HttpWorkerRequest {
mod tests {
use core::convert::Infallible;
use crate::api::timestamp;
use super::http;
use super::{http, SharedClient};
use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Duration};
use futures::future;
use lazy_static::lazy_static;

// Using lazy_static to avoid spawning lots of different SharedClients,
// as spawning a SharedClient is CPU-intensive and opens lots of fds.
lazy_static! {
static ref SHARED_CLIENT: SharedClient = SharedClient::new();
}

// Returns an `HttpApi` whose worker is ran in the background, and a `SocketAddr` to an HTTP
// server that runs in the background as well.
macro_rules! build_api_server {
() => {{
// We spawn quite a bit of HTTP servers here due to how async API
// works for offchain workers, so be sure to raise the FD limit
// (particularly useful for macOS where the default soft limit may
// not be enough).
fdlimit::raise_fd_limit();

let (api, worker) = http();
let hyper_client = SHARED_CLIENT.clone();
let (api, worker) = http(hyper_client.clone());

let (addr_tx, addr_rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
Expand Down
12 changes: 11 additions & 1 deletion client/offchain/src/api/http_dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,18 @@
use sp_core::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError};
use std::{future::Future, pin::Pin, task::Context, task::Poll};

/// Wrapper struct (wrapping nothing in case of http_dummy) used for keeping the hyper_rustls client running.
#[derive(Clone)]
pub struct SharedClient;

impl SharedClient {
pub fn new() -> Self {
Self
}
}

/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
pub fn http() -> (HttpApi, HttpWorker) {
pub fn http(_: SharedClient) -> (HttpApi, HttpWorker) {
(HttpApi, HttpWorker)
}

Expand Down
7 changes: 6 additions & 1 deletion client/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! The offchain workers is a special function of the runtime that
//! gets executed after block is imported. During execution
//! it's able to asynchronously submit extrinsics that will either
//! be propagated to other nodes added to the next block
//! be propagated to other nodes or added to the next block
//! produced by the node as unsigned transactions.
//!
//! Offchain workers can be used for computation-heavy tasks
Expand All @@ -46,6 +46,7 @@ use sp_runtime::{generic::BlockId, traits::{self, Header}};
use futures::{prelude::*, future::ready};

mod api;
use api::SharedClient;

pub use sp_offchain::{OffchainWorkerApi, STORAGE_PREFIX};

Expand All @@ -55,16 +56,19 @@ pub struct OffchainWorkers<Client, Storage, Block: traits::Block> {
db: Storage,
_block: PhantomData<Block>,
thread_pool: Mutex<ThreadPool>,
shared_client: SharedClient,
}

impl<Client, Storage, Block: traits::Block> OffchainWorkers<Client, Storage, Block> {
/// Creates new `OffchainWorkers`.
pub fn new(client: Arc<Client>, db: Storage) -> Self {
let shared_client = SharedClient::new();
Self {
client,
db,
_block: PhantomData,
thread_pool: Mutex::new(ThreadPool::new(num_cpus::get())),
shared_client,
}
}
}
Expand Down Expand Up @@ -120,6 +124,7 @@ impl<Client, Storage, Block> OffchainWorkers<
self.db.clone(),
network_state.clone(),
is_validator,
self.shared_client.clone(),
);
debug!("Spawning offchain workers at {:?}", at);
let header = header.clone();
Expand Down

0 comments on commit fed834c

Please sign in to comment.