Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): bump http, hyper etc. to 1.0; jsonrpsee 0.23 #7018

Merged
merged 16 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
779 changes: 367 additions & 412 deletions Cargo.lock

Large diffs are not rendered by default.

58 changes: 34 additions & 24 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ members = [
"examples/rpc-db/",
"examples/txpool-tracing/",
"testing/ef-tests/",
"testing/testing-utils"
"testing/testing-utils",
]
default-members = ["bin/reth"]

Expand Down Expand Up @@ -330,9 +330,15 @@ reth-trie-parallel = { path = "crates/trie/parallel" }
reth-trie-types = { path = "crates/trie/types" }

# revm
revm = { version = "9.0.0", features = [ "std", "secp256k1", "blst", ], default-features = false }
revm-primitives = { version = "4.0.0", features = [ "std", ], default-features = false }
revm-inspectors = { git = "https://github.com/paradigmxyz/evm-inspectors", rev = "53aa2b2" }
revm = { version = "9.0.0", features = [
"std",
"secp256k1",
"blst",
], default-features = false }
revm-primitives = { version = "4.0.0", features = [
"std",
], default-features = false }
revm-inspectors = { git = "https://github.com/paradigmxyz/revm-inspectors", rev = "5e3058a" }

# eth
alloy-chains = "0.1.15"
Expand All @@ -341,21 +347,21 @@ alloy-dyn-abi = "0.7.2"
alloy-sol-types = "0.7.2"
alloy-rlp = "0.3.4"
alloy-trie = "0.4"
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-rpc-types-anvil = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-rpc-types-beacon = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-genesis = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-node-bindings = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93", default-features = false, features = [
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-rpc-types-anvil = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-rpc-types-beacon = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-genesis = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-node-bindings = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d", default-features = false, features = [
"reqwest",
] }
alloy-eips = { git = "https://github.com/alloy-rs/alloy", default-features = false, rev = "cc68b93" }
alloy-signer = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-signer-wallet = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-network = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" }
alloy-eips = { git = "https://github.com/alloy-rs/alloy", default-features = false, rev = "14ed25d" }
alloy-signer = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-signer-wallet = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-network = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }

# misc
auto_impl = "1"
Expand Down Expand Up @@ -415,21 +421,25 @@ async-trait = "0.1.68"
futures = "0.3.26"
pin-project = "1.0.12"
futures-util = "0.3.25"
hyper = "0.14.25"
hyper = "1.3"
hyper-util = "0.1.5"
reqwest = { version = "0.12", default-features = false }
tower = "0.4"
tower-http = "0.4"
http = "0.2.8"
http-body = "0.4.5"
tower-http = "0.5"

# p2p
discv5 = "0.6.0"
igd-next = "0.14.3"

# rpc
jsonrpsee = "0.22"
jsonrpsee-core = "0.22"
jsonrpsee-types = "0.22"
jsonrpsee = "0.23"
jsonrpsee-core = "0.23"
jsonrpsee-types = "0.23"
jsonrpsee-http-client = "0.23"

# http
http = "1.0"
http-body = "1.0"

# crypto
secp256k1 = { version = "0.28", default-features = false, features = [
Expand Down
3 changes: 2 additions & 1 deletion crates/consensus/debug-client/src/providers/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use alloy_eips::BlockNumberOrTag;
use alloy_provider::{Provider, ProviderBuilder};
use futures::StreamExt;
use reth_node_core::rpc::types::RichBlock;
use reth_rpc_types::BlockTransactionsKind;
use tokio::sync::mpsc::Sender;

/// Block provider that fetches new blocks from an RPC endpoint using a websocket connection.
Expand Down Expand Up @@ -32,7 +33,7 @@ impl BlockProvider for RpcBlockProvider {

while let Some(block) = stream.next().await {
let full_block = ws_provider
.get_block_by_hash(block.header.hash.unwrap(), true)
.get_block_by_hash(block.header.hash.unwrap(), BlockTransactionsKind::Full)
.await
.expect("failed to get block")
.expect("block not found");
Expand Down
4 changes: 3 additions & 1 deletion crates/node-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ serde.workspace = true
serde_json.workspace = true

# http/rpc
hyper.workspace = true
http.workspace = true
jsonrpsee.workspace = true
tower.workspace = true

# tracing
tracing.workspace = true
Expand Down
54 changes: 29 additions & 25 deletions crates/node-core/src/metrics/prometheus_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

use crate::metrics::version_metrics::register_version_metrics;
use eyre::WrapErr;
use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
use http::Response;
use metrics::describe_gauge;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use metrics_util::layers::{PrefixLayer, Stack};
Expand Down Expand Up @@ -64,29 +61,36 @@ async fn start_endpoint<F: Hook + 'static>(
hook: Arc<F>,
task_executor: TaskExecutor,
) -> eyre::Result<()> {
let make_svc = make_service_fn(move |_| {
let handle = handle.clone();
let hook = Arc::clone(&hook);
async move {
Ok::<_, Infallible>(service_fn(move |_: Request<Body>| {
let listener =
tokio::net::TcpListener::bind(listen_addr).await.wrap_err("Could not bind to address")?;

task_executor.spawn_with_graceful_shutdown_signal(|signal| async move {
let mut shutdown = signal.ignore_guard();
loop {
let io = tokio::select! {
res = listener.accept() => match res {
Ok((stream, _remote_addr)) => stream,
Err(err) => {
tracing::error!(%err, "failed to accept connection");
continue;
}
},
_ = &mut shutdown => break,
};

let handle = handle.clone();
let hook = hook.clone();
let service = tower::service_fn(move |_| {
(hook)();
let metrics = handle.render();
async move { Ok::<_, Infallible>(Response::new(Body::from(metrics))) }
}))
}
});

let server =
Server::try_bind(&listen_addr).wrap_err("Could not bind to address")?.serve(make_svc);

task_executor.spawn_with_graceful_shutdown_signal(move |signal| async move {
if let Err(error) = server
.with_graceful_shutdown(async move {
let _ = signal.await;
})
.await
{
tracing::error!(%error, "metrics endpoint crashed")
async move { Ok::<_, Infallible>(Response::new(metrics)) }
});

if let Err(error) =
jsonrpsee::server::serve_with_graceful_shutdown(io, service, &mut shutdown).await
{
tracing::error!(%error, "metrics endpoint crashed")
}
}
});

Expand Down
5 changes: 1 addition & 4 deletions crates/optimism/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ revm-primitives.workspace = true

# async
async-trait.workspace = true
hyper.workspace = true
reqwest = { workspace = true, default-features = false, features = [
"rustls-tls-native-roots",
] }
reqwest = { workspace = true, features = ["rustls-tls-native-roots"] }
tracing.workspace = true

# misc
Expand Down
3 changes: 0 additions & 3 deletions crates/optimism/node/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ use std::sync::{atomic::AtomicUsize, Arc};
/// Error type when interacting with the Sequencer
#[derive(Debug, thiserror::Error)]
pub enum SequencerRpcError {
/// Wrapper around a [`hyper::Error`].
#[error(transparent)]
HyperError(#[from] hyper::Error),
/// Wrapper around an [`reqwest::Error`].
#[error(transparent)]
HttpError(#[from] reqwest::Error),
Expand Down
22 changes: 11 additions & 11 deletions crates/rpc/ipc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ where
/// async fn run_server() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let server = Builder::default().build("/tmp/my-uds");
/// let mut module = RpcModule::new(());
/// module.register_method("say_hello", |_, _| "lo")?;
/// module.register_method("say_hello", |_, _, _| "lo")?;
/// let handle = server.start(module).await?;
///
/// // In this example we don't care about doing shutdown so let's it run forever.
Expand Down Expand Up @@ -390,7 +390,7 @@ where
let rpc_service = self.rpc_middleware.service(RpcService::new(
self.inner.methods.clone(),
max_response_body_size,
self.inner.conn_id as usize,
self.inner.conn_id.into(),
cfg,
));
// an ipc connection needs to handle read+write concurrently
Expand Down Expand Up @@ -896,7 +896,7 @@ mod tests {
let endpoint = dummy_endpoint();
let server = Builder::default().max_response_body_size(100).build(&endpoint);
let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "a".repeat(101)).unwrap();
module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());

Expand All @@ -911,7 +911,7 @@ mod tests {
let endpoint = dummy_endpoint();
let server = Builder::default().max_request_body_size(100).build(&endpoint);
let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "succeed").unwrap();
module.register_method("anything", |_, _, _| "succeed").unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());

Expand Down Expand Up @@ -939,7 +939,7 @@ mod tests {
let endpoint = dummy_endpoint();
let server = Builder::default().max_connections(2).build(&endpoint);
let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "succeed").unwrap();
module.register_method("anything", |_, _, _| "succeed").unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());

Expand Down Expand Up @@ -973,7 +973,7 @@ mod tests {
let server = Builder::default().build(&endpoint);
let mut module = RpcModule::new(());
let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
module.register_method("eth_chainId", move |_, _| msg).unwrap();
module.register_method("eth_chainId", move |_, _, _| msg).unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());

Expand All @@ -987,7 +987,7 @@ mod tests {
let endpoint = dummy_endpoint();
let server = Builder::default().build(&endpoint);
let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "ok").unwrap();
module.register_method("anything", |_, _, _| "ok").unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());

Expand All @@ -1013,7 +1013,7 @@ mod tests {
let server = Builder::default().build(&endpoint);
let mut module = RpcModule::new(());
let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#;
module.register_method("rpc_modules", move |_, _| msg).unwrap();
module.register_method("rpc_modules", move |_, _, _| msg).unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());

Expand All @@ -1036,7 +1036,7 @@ mod tests {
"subscribe_hello",
"s_hello",
"unsubscribe_hello",
|_, pending, tx| async move {
|_, pending, tx, _| async move {
let rx = tx.subscribe();
let stream = BroadcastStream::new(rx);
pipe_from_stream_with_bounded_buffer(pending, stream).await?;
Expand Down Expand Up @@ -1088,8 +1088,8 @@ mod tests {
let mut module = RpcModule::new(());
let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#;
let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#;
module.register_method("say_hello", move |_, _| hello_msg).unwrap();
module.register_method("say_goodbye", move |_, _| goodbye_msg).unwrap();
module.register_method("say_hello", move |_, _, _| hello_msg).unwrap();
module.register_method("say_goodbye", move |_, _, _| goodbye_msg).unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());

Expand Down
37 changes: 12 additions & 25 deletions crates/rpc/ipc/src/server/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use jsonrpsee::{
IdProvider,
},
types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Request},
BoundedSubscriptions, ConnectionDetails, MethodCallback, MethodResponse, MethodSink, Methods,
SubscriptionState,
BoundedSubscriptions, ConnectionId, Extensions, MethodCallback, MethodResponse, MethodSink,
Methods, SubscriptionState,
};
use std::sync::Arc;

/// JSON-RPC service middleware.
#[derive(Clone, Debug)]
pub struct RpcService {
conn_id: usize,
conn_id: ConnectionId,
methods: Methods,
max_response_body_size: usize,
cfg: RpcServiceCfg,
Expand All @@ -39,7 +39,7 @@ impl RpcService {
pub(crate) const fn new(
methods: Methods,
max_response_body_size: usize,
conn_id: usize,
conn_id: ConnectionId,
cfg: RpcServiceCfg,
) -> Self {
Self { methods, max_response_body_size, conn_id, cfg }
Expand All @@ -58,38 +58,25 @@ impl<'a> RpcServiceT<'a> for RpcService {
let params = req.params();
let name = req.method_name();
let id = req.id().clone();
let extensions = Extensions::new();

match self.methods.method_with_name(name) {
None => {
let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
ResponseFuture::ready(rp)
}
Some((_name, method)) => match method {
MethodCallback::Async(callback) => {
let params = params.into_owned();
let id = id.into_owned();

let fut = (callback)(id, params, conn_id, max_response_body_size);
ResponseFuture::future(fut)
MethodCallback::Sync(callback) => {
let rp = (callback)(id, params, max_response_body_size, extensions);
ResponseFuture::ready(rp)
}
MethodCallback::AsyncWithDetails(callback) => {
MethodCallback::Async(callback) => {
let params = params.into_owned();
let id = id.into_owned();

// Note: Add the `Request::extensions` to the connection details when available
// here.
let fut = (callback)(
id,
params,
ConnectionDetails::_new(conn_id),
max_response_body_size,
);
let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
ResponseFuture::future(fut)
}
MethodCallback::Sync(callback) => {
let rp = (callback)(id, params, max_response_body_size);
ResponseFuture::ready(rp)
}
MethodCallback::Subscription(callback) => {
let RpcServiceCfg::CallsAndSubscriptions {
bounded_subscriptions,
Expand All @@ -110,7 +97,7 @@ impl<'a> RpcServiceT<'a> for RpcService {
subscription_permit: p,
};

let fut = callback(id.clone(), params, sink, conn_state);
let fut = callback(id.clone(), params, sink, conn_state, extensions);
ResponseFuture::future(fut)
} else {
let max = bounded_subscriptions.max();
Expand All @@ -129,7 +116,7 @@ impl<'a> RpcServiceT<'a> for RpcService {
return ResponseFuture::ready(rp);
};

let rp = callback(id, params, conn_id, max_response_body_size);
let rp = callback(id, params, conn_id, max_response_body_size, extensions);
ResponseFuture::ready(rp)
}
},
Expand Down
Loading
Loading