Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Optimize offchain worker api by re-using http-client #6454

Merged
merged 8 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/offchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ hyper-rustls = "0.20"

[dev-dependencies]
env_logger = "0.7.0"
fdlimit = "0.1.4"
sc-client-db = { version = "0.8.0-rc3", default-features = true, path = "../db/" }
sc-transaction-pool = { version = "2.0.0-rc3", path = "../../client/transaction-pool" }
sp-transaction-pool = { version = "2.0.0-rc3", path = "../../primitives/transaction-pool" }
substrate-test-runtime-client = { version = "2.0.0-rc3", path = "../../test-utils/runtime/client" }
tokio = "0.2"
lazy_static = "1.4.0"

[features]
default = []
11 changes: 8 additions & 3 deletions client/offchain/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use sp_core::offchain::{
OpaqueNetworkState, OpaquePeerId, OpaqueMultiaddr, StorageKind,
};
pub use sp_offchain::STORAGE_PREFIX;
pub use http::SharedClient;

#[cfg(not(target_os = "unknown"))]
mod http;
Expand Down Expand Up @@ -260,8 +261,9 @@ impl AsyncApi {
db: S,
network_state: Arc<dyn NetworkStateInfo + Send + Sync>,
is_validator: bool,
) -> (Api<S>, AsyncApi) {
let (http_api, http_worker) = http::http();
shared_client: SharedClient,
) -> (Api<S>, Self) {
let (http_api, http_worker) = http::http(shared_client);

let api = Api {
db,
Expand All @@ -270,7 +272,7 @@ impl AsyncApi {
http: http_api,
};

let async_api = AsyncApi {
let async_api = Self {
http: Some(http_worker),
};

Expand Down Expand Up @@ -308,11 +310,14 @@ mod tests {
let _ = env_logger::try_init();
let db = LocalStorage::new_test();
let mock = Arc::new(MockNetworkStateInfo());
let shared_client = SharedClient::new();


AsyncApi::new(
db,
mock,
false,
shared_client,
)
}

Expand Down
37 changes: 26 additions & 11 deletions client/offchain/src/api/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,22 @@ use log::error;
use sp_core::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError};
use std::{convert::TryFrom, fmt, io::Read as _, pin::Pin, task::{Context, Poll}};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
use std::sync::Arc;
use hyper::{Client as HyperClient, Body, client};
use hyper_rustls::HttpsConnector;

/// Wrapper struct used for keeping the hyper_rustls client running.
#[derive(Clone)]
pub struct SharedClient(Arc<HyperClient<HttpsConnector<client::HttpConnector>, Body>>);

impl SharedClient {
pub fn new() -> Self {
Self(Arc::new(HyperClient::builder().build(HttpsConnector::new())))
}
}

/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
pub fn http() -> (HttpApi, HttpWorker) {
pub fn http(shared_client: SharedClient) -> (HttpApi, HttpWorker) {
let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker");
let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api");

Expand All @@ -51,7 +64,7 @@ pub fn http() -> (HttpApi, HttpWorker) {
let engine = HttpWorker {
to_api,
from_api,
http_client: hyper::Client::builder().build(hyper_rustls::HttpsConnector::new()),
http_client: shared_client.0,
requests: Vec::new(),
};

Expand Down Expand Up @@ -551,7 +564,7 @@ pub struct HttpWorker {
/// Used to receive messages from the `HttpApi`.
from_api: TracingUnboundedReceiver<ApiToWorker>,
/// The engine that runs HTTP requests.
http_client: hyper::Client<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>, hyper::Body>,
http_client: Arc<HyperClient<HttpsConnector<client::HttpConnector>, Body>>,
/// HTTP requests that are being worked on by the engine.
requests: Vec<(HttpRequestId, HttpWorkerRequest)>,
}
Expand Down Expand Up @@ -685,21 +698,23 @@ impl fmt::Debug for HttpWorkerRequest {
mod tests {
use core::convert::Infallible;
use crate::api::timestamp;
use super::http;
use super::{http, SharedClient};
use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Duration};
use futures::future;
use lazy_static::lazy_static;

// Using lazy_static to avoid spawning lots of different SharedClients,
// as spawning a SharedClient is CPU-intensive and opens lots of fds.
lazy_static! {
static ref SHARED_CLIENT: SharedClient = SharedClient::new();
}

// Returns an `HttpApi` whose worker is ran in the background, and a `SocketAddr` to an HTTP
// server that runs in the background as well.
macro_rules! build_api_server {
() => {{
// We spawn quite a bit of HTTP servers here due to how async API
// works for offchain workers, so be sure to raise the FD limit
// (particularly useful for macOS where the default soft limit may
// not be enough).
fdlimit::raise_fd_limit();

let (api, worker) = http();
let hyper_client = SHARED_CLIENT.clone();
let (api, worker) = http(hyper_client.clone());

let (addr_tx, addr_rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
Expand Down
12 changes: 11 additions & 1 deletion client/offchain/src/api/http_dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,18 @@
use sp_core::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError};
use std::{future::Future, pin::Pin, task::Context, task::Poll};

/// Wrapper struct (wrapping nothing in case of http_dummy) used for keeping the hyper_rustls client running.
#[derive(Clone)]
pub struct SharedClient;

impl SharedClient {
pub fn new() -> Self {
Self
}
}

/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
pub fn http() -> (HttpApi, HttpWorker) {
pub fn http(_: SharedClient) -> (HttpApi, HttpWorker) {
(HttpApi, HttpWorker)
}

Expand Down
7 changes: 6 additions & 1 deletion client/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! The offchain workers is a special function of the runtime that
//! gets executed after block is imported. During execution
//! it's able to asynchronously submit extrinsics that will either
//! be propagated to other nodes added to the next block
//! be propagated to other nodes or added to the next block
//! produced by the node as unsigned transactions.
//!
//! Offchain workers can be used for computation-heavy tasks
Expand All @@ -46,6 +46,7 @@ use sp_runtime::{generic::BlockId, traits::{self, Header}};
use futures::{prelude::*, future::ready};

mod api;
use api::SharedClient;

pub use sp_offchain::{OffchainWorkerApi, STORAGE_PREFIX};

Expand All @@ -55,16 +56,19 @@ pub struct OffchainWorkers<Client, Storage, Block: traits::Block> {
db: Storage,
_block: PhantomData<Block>,
thread_pool: Mutex<ThreadPool>,
shared_client: SharedClient,
}

impl<Client, Storage, Block: traits::Block> OffchainWorkers<Client, Storage, Block> {
/// Creates new `OffchainWorkers`.
pub fn new(client: Arc<Client>, db: Storage) -> Self {
let shared_client = SharedClient::new();
Self {
client,
db,
_block: PhantomData,
thread_pool: Mutex::new(ThreadPool::new(num_cpus::get())),
shared_client,
}
}
}
Expand Down Expand Up @@ -120,6 +124,7 @@ impl<Client, Storage, Block> OffchainWorkers<
self.db.clone(),
network_state.clone(),
is_validator,
self.shared_client.clone(),
);
debug!("Spawning offchain workers at {:?}", at);
let header = header.clone();
Expand Down