Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Optimize offchain worker api by re-using http-client #6454

Merged
merged 8 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions client/offchain/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,9 @@ impl AsyncApi {
db: S,
network_state: Arc<dyn NetworkStateInfo + Send + Sync>,
is_validator: bool,
) -> (Api<S>, AsyncApi) {
let (http_api, http_worker) = http::http();
hyper_client: Arc<hyper::Client<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>, hyper::Body>>,
) -> (Api<S>, Self) {
let (http_api, http_worker) = http::http(hyper_client);

let api = Api {
db,
Expand All @@ -270,7 +271,7 @@ impl AsyncApi {
http: http_api,
};

let async_api = AsyncApi {
let async_api = Self {
http: Some(http_worker),
};

Expand All @@ -291,6 +292,8 @@ mod tests {
use std::{convert::{TryFrom, TryInto}, time::SystemTime};
use sc_client_db::offchain::LocalStorage;
use sc_network::PeerId;
use hyper_rustls::HttpsConnector;
use hyper::{Client as HyperClient};

struct MockNetworkStateInfo();

Expand All @@ -308,11 +311,14 @@ mod tests {
let _ = env_logger::try_init();
let db = LocalStorage::new_test();
let mock = Arc::new(MockNetworkStateInfo());
let hyper_client = Arc::new(HyperClient::builder().build(HttpsConnector::new()));


AsyncApi::new(
db,
mock,
false,
hyper_client,
)
}

Expand Down
15 changes: 11 additions & 4 deletions client/offchain/src/api/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ use log::error;
use sp_core::offchain::{HttpRequestId, Timestamp, HttpRequestStatus, HttpError};
use std::{convert::TryFrom, fmt, io::Read as _, pin::Pin, task::{Context, Poll}};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
use std::sync::Arc;
use hyper::{Client as HyperClient, Body, client};
use hyper_rustls::HttpsConnector;

/// Creates a pair of [`HttpApi`] and [`HttpWorker`].
pub fn http() -> (HttpApi, HttpWorker) {
pub fn http(hyper_client: Arc<HyperClient<HttpsConnector<client::HttpConnector>, Body>>) -> (HttpApi, HttpWorker) {
let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker");
let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api");

Expand All @@ -51,7 +54,7 @@ pub fn http() -> (HttpApi, HttpWorker) {
let engine = HttpWorker {
to_api,
from_api,
http_client: hyper::Client::builder().build(hyper_rustls::HttpsConnector::new()),
http_client: hyper_client,
requests: Vec::new(),
};

Expand Down Expand Up @@ -551,7 +554,7 @@ pub struct HttpWorker {
/// Used to receive messages from the `HttpApi`.
from_api: TracingUnboundedReceiver<ApiToWorker>,
/// The engine that runs HTTP requests.
http_client: hyper::Client<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>, hyper::Body>,
http_client: Arc<HyperClient<HttpsConnector<client::HttpConnector>, Body>>,
/// HTTP requests that are being worked on by the engine.
requests: Vec<(HttpRequestId, HttpWorkerRequest)>,
}
Expand Down Expand Up @@ -688,6 +691,9 @@ mod tests {
use super::http;
use sp_core::offchain::{HttpError, HttpRequestId, HttpRequestStatus, Duration};
use futures::future;
use std::sync::Arc;
use hyper::{Client as HyperClient};
use hyper_rustls::HttpsConnector;

// Returns an `HttpApi` whose worker is ran in the background, and a `SocketAddr` to an HTTP
// server that runs in the background as well.
Expand All @@ -699,7 +705,8 @@ mod tests {
// not be enough).
fdlimit::raise_fd_limit();

let (api, worker) = http();
let hyper_client = Arc::new(HyperClient::builder().build(HttpsConnector::new()));
pscott marked this conversation as resolved.
Show resolved Hide resolved
let (api, worker) = http(hyper_client.clone());

let (addr_tx, addr_rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
Expand Down
8 changes: 7 additions & 1 deletion client/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! The offchain workers is a special function of the runtime that
//! gets executed after block is imported. During execution
//! it's able to asynchronously submit extrinsics that will either
//! be propagated to other nodes added to the next block
//! be propagated to other nodes or added to the next block
//! produced by the node as unsigned transactions.
//!
//! Offchain workers can be used for computation-heavy tasks
Expand All @@ -44,6 +44,8 @@ use sc_network::NetworkStateInfo;
use sp_core::{offchain::{self, OffchainStorage}, ExecutionContext, traits::SpawnNamed};
use sp_runtime::{generic::BlockId, traits::{self, Header}};
use futures::{prelude::*, future::ready};
use hyper_rustls::HttpsConnector;
use hyper::{Client as HyperClient, Body, client};

mod api;

Expand All @@ -55,16 +57,19 @@ pub struct OffchainWorkers<Client, Storage, Block: traits::Block> {
db: Storage,
_block: PhantomData<Block>,
thread_pool: Mutex<ThreadPool>,
hyper_client: Arc<HyperClient<HttpsConnector<client::HttpConnector>, Body>>,
}

impl<Client, Storage, Block: traits::Block> OffchainWorkers<Client, Storage, Block> {
/// Creates new `OffchainWorkers`.
pub fn new(client: Arc<Client>, db: Storage) -> Self {
let hyper_client = Arc::new(HyperClient::builder().build(HttpsConnector::new()));
Self {
client,
db,
_block: PhantomData,
thread_pool: Mutex::new(ThreadPool::new(num_cpus::get())),
hyper_client,
}
}
}
Expand Down Expand Up @@ -120,6 +125,7 @@ impl<Client, Storage, Block> OffchainWorkers<
self.db.clone(),
network_state.clone(),
is_validator,
self.hyper_client.clone(),
);
debug!("Spawning offchain workers at {:?}", at);
let header = header.clone();
Expand Down