From 68952c00372b4c388c7effc53cd562fd321c0aa3 Mon Sep 17 00:00:00 2001 From: Yash Atreya <44857776+yash-atreya@users.noreply.github.com> Date: Tue, 26 Mar 2024 16:25:29 -0400 Subject: [PATCH] feat(providers): connect_boxed api (#342) * feat(providers): connect_boxed api * parse error handling and nits * enhance parse str to transport util * extract auth info for ws in connect_boxed * ci nits * fix: dependencies, refactor provider utils * return conn_str in enum * fix: parsing urls tests and resolve conflicts * udpate utils and remove unwraps * fix: cfg for ws, ipc features * nits and fix test * more nits * smol nits and url in enum * fix ci: make ipc and ws optional * ci nits * refactor: bump down and bubble up * feature: connect_builtin * feature: connect_builtin * feature: on_ws and on_ipc * fix: http feature handling * fix: parsing * fix: test * fix: feature check * fix: import * fix: unreachable match * fix: matches are such a pain * fix: imports some more * fix: activate url for ws --------- Co-authored-by: James --- crates/alloy/Cargo.toml | 4 +- crates/provider/Cargo.toml | 8 +- crates/provider/src/builder.rs | 76 ++++++++- crates/provider/src/provider.rs | 47 +++++- crates/rpc-client/Cargo.toml | 2 +- crates/rpc-client/src/builder.rs | 15 ++ crates/rpc-client/src/builtin.rs | 279 +++++++++++++++++++++++++++++++ crates/rpc-client/src/client.rs | 15 +- crates/rpc-client/src/lib.rs | 3 + crates/transport/src/common.rs | 30 +++- 10 files changed, 455 insertions(+), 24 deletions(-) create mode 100644 crates/rpc-client/src/builtin.rs diff --git a/crates/alloy/Cargo.toml b/crates/alloy/Cargo.toml index 8aee2be2649..3d59c08071a 100644 --- a/crates/alloy/Cargo.toml +++ b/crates/alloy/Cargo.toml @@ -21,11 +21,11 @@ alloy-genesis = { workspace = true, default-features = false, optional = true } alloy-network = { workspace = true, default-features = false, optional = true } alloy-node-bindings = { workspace = true, default-features = false, optional = true } alloy-provider = { workspace = true, default-features = false, optional = true } -alloy-pubsub = { workspace = true, default-features = false, optional = true } +alloy-pubsub = { workspace = true, optional = true } # rpc alloy-json-rpc = { workspace = true, default-features = false, optional = true } -alloy-rpc-client = { workspace = true, default-features = false, optional = true } +alloy-rpc-client = { workspace = true, optional = true } alloy-rpc-engine-types = { workspace = true, default-features = false, optional = true } alloy-rpc-trace-types = { workspace = true, default-features = false, optional = true } alloy-rpc-types = { workspace = true, default-features = false, optional = true } diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml index 226b9d20a49..4b481fd8b49 100644 --- a/crates/provider/Cargo.toml +++ b/crates/provider/Cargo.toml @@ -18,8 +18,9 @@ alloy-rpc-client = { workspace = true, features = ["reqwest"] } alloy-rpc-trace-types.workspace = true alloy-rpc-types.workspace = true alloy-transport-http = { workspace = true, features = ["reqwest"] } +alloy-transport-ws = { workspace = true, optional = true } +alloy-transport-ipc = { workspace = true, optional = true } alloy-transport.workspace = true - alloy-primitives.workspace = true async-stream = "0.3" @@ -43,8 +44,9 @@ alloy-signer.workspace = true alloy-signer-wallet.workspace = true tokio = { workspace = true, features = ["macros"] } tracing-subscriber = { workspace = true, features = ["fmt"] } +tempfile.workspace = true [features] pubsub = ["alloy-rpc-client/pubsub", "dep:alloy-pubsub"] -ws = ["pubsub", "alloy-rpc-client/ws"] -ipc = ["pubsub", "alloy-rpc-client/ipc"] +ws = ["pubsub", "alloy-rpc-client/ws", "alloy-transport-ws"] +ipc = ["pubsub", "alloy-rpc-client/ipc", "alloy-transport-ipc"] diff --git a/crates/provider/src/builder.rs b/crates/provider/src/builder.rs index 38bc0da1470..eb5e2402ea6 100644 --- a/crates/provider/src/builder.rs +++ b/crates/provider/src/builder.rs @@ -3,8 +3,8 @@ use crate::{ Provider, RootProvider, }; use alloy_network::{Ethereum, Network}; -use alloy_rpc_client::RpcClient; -use alloy_transport::Transport; +use alloy_rpc_client::{BuiltInConnectionString, ClientBuilder, RpcClient}; +use alloy_transport::{BoxTransport, Transport, TransportError}; use std::marker::PhantomData; /// A layering abstraction in the vein of [`tower::Layer`] @@ -172,6 +172,78 @@ impl ProviderBuilder { { self.provider(RootProvider::new(client)) } + + /// Finish the layer stack by providing a connection string for a built-in + /// transport type, outputting the final [`Provider`] type with all stack + /// components. + /// + /// This is a convenience function for + pub async fn on_builtin(self, s: &str) -> Result + where + L: ProviderLayer, N, BoxTransport>, + N: Network, + { + let connect: BuiltInConnectionString = s.parse()?; + let client = ClientBuilder::default().connect_boxed(connect).await?; + Ok(self.on_client(client)) + } + + /// Build this provider with a websocket connection. + #[cfg(feature = "ws")] + pub async fn on_ws( + self, + connect: alloy_transport_ws::WsConnect, + ) -> Result + where + L: ProviderLayer< + RootProvider, + N, + alloy_pubsub::PubSubFrontend, + >, + N: Network, + { + let client = ClientBuilder::default().ws(connect).await?; + Ok(self.on_client(client)) + } + + /// Build this provider with an IPC connection. + #[cfg(feature = "ipc")] + pub async fn on_ipc( + self, + connect: alloy_transport_ipc::IpcConnect, + ) -> Result + where + alloy_transport_ipc::IpcConnect: alloy_pubsub::PubSubConnect, + L: ProviderLayer< + RootProvider, + N, + alloy_pubsub::PubSubFrontend, + >, + N: Network, + { + let client = ClientBuilder::default().ipc(connect).await?; + Ok(self.on_client(client)) + } + + #[cfg(feature = "reqwest")] + pub fn on_reqwest_http(self, url: url::Url) -> Result + where + L: ProviderLayer, N, BoxTransport>, + N: Network, + { + let client = ClientBuilder::default().reqwest_http(url); + Ok(self.on_client(client)) + } + + #[cfg(feature = "hyper")] + pub fn on_hyper_http(self, url: url::Url) -> Result + where + L: ProviderLayer, N, BoxTransport>, + N: Network, + { + let client = ClientBuilder::default().hyper_http(url); + Ok(self.on_client(client)) + } } // Copyright (c) 2019 Tower Contributors diff --git a/crates/provider/src/provider.rs b/crates/provider/src/provider.rs index ef4fe3f5705..acc8c788690 100644 --- a/crates/provider/src/provider.rs +++ b/crates/provider/src/provider.rs @@ -11,7 +11,9 @@ use alloy_network::{Network, TransactionBuilder}; use alloy_primitives::{ hex, Address, BlockHash, BlockNumber, Bytes, StorageKey, StorageValue, TxHash, B256, U256, U64, }; -use alloy_rpc_client::{ClientRef, PollerBuilder, RpcClient, WeakClient}; +use alloy_rpc_client::{ + BuiltInConnectionString, ClientBuilder, ClientRef, PollerBuilder, RpcClient, WeakClient, +}; use alloy_rpc_trace_types::{ geth::{GethDebugTracingOptions, GethTrace}, parity::{LocalizedTransactionTrace, TraceResults, TraceType}, @@ -20,7 +22,10 @@ use alloy_rpc_types::{ state::StateOverride, AccessListWithGasUsed, Block, BlockId, BlockNumberOrTag, EIP1186AccountProofResponse, FeeHistory, Filter, FilterChanges, Log, SyncStatus, }; -use alloy_transport::{BoxTransport, Transport, TransportErrorKind, TransportResult}; +use alloy_transport::{ + BoxTransport, BoxTransportConnect, Transport, TransportError, TransportErrorKind, + TransportResult, +}; use alloy_transport_http::Http; use serde_json::value::RawValue; use std::{ @@ -71,6 +76,21 @@ impl RootProvider { } } +impl RootProvider { + /// Connects to a boxed transport with the given connector. + pub async fn connect_boxed(conn: C) -> Result { + let client = ClientBuilder::default().connect_boxed(conn).await?; + Ok(Self::new(client)) + } + + /// Creates a new root provider from the provided connection details. + pub async fn connect_builtin(s: &str) -> Result { + let conn: BuiltInConnectionString = s.parse()?; + let client = ClientBuilder::default().connect_boxed(conn).await?; + Ok(Self::new(client)) + } +} + impl RootProvider { /// Boxes the inner client. /// @@ -1377,4 +1397,27 @@ mod tests { "0x9dae5cf33694a02e8a7d5de3fe31e9d05ca0ba6e9180efac4ab20a06c9e598a3" ); } + + #[tokio::test] + async fn connect_boxed() { + init_tracing(); + let (_provider, anvil) = spawn_anvil(); + + let provider = + RootProvider::::connect_builtin(anvil.endpoint().as_str()) + .await; + + match provider { + Ok(provider) => { + let num = provider.get_block_number().await.unwrap(); + assert_eq!(0, num); + } + Err(e) => { + assert_eq!( + format!("{}",e), + "hyper not supported by BuiltinConnectionString. Please instantiate a hyper client manually" + ); + } + } + } } diff --git a/crates/rpc-client/Cargo.toml b/crates/rpc-client/Cargo.toml index 406e500848d..540259748bf 100644 --- a/crates/rpc-client/Cargo.toml +++ b/crates/rpc-client/Cargo.toml @@ -51,5 +51,5 @@ default = ["reqwest"] reqwest = ["dep:url", "dep:reqwest", "alloy-transport-http/reqwest"] hyper = ["dep:url", "dep:hyper-util", "alloy-transport-http/hyper"] pubsub = ["dep:alloy-pubsub", "dep:alloy-primitives"] -ws = ["pubsub", "dep:alloy-transport-ws"] +ws = ["pubsub", "dep:alloy-transport-ws", "dep:url"] ipc = ["pubsub", "dep:alloy-transport-ipc"] diff --git a/crates/rpc-client/src/builder.rs b/crates/rpc-client/src/builder.rs index 2c5a3e1371b..945aba07470 100644 --- a/crates/rpc-client/src/builder.rs +++ b/crates/rpc-client/src/builder.rs @@ -103,6 +103,21 @@ impl ClientBuilder { self.pubsub(ws_connect).await } + /// Connect an IPC transport, producing an [`RpcClient`] with the provided + /// connection. + #[cfg(feature = "ipc")] + pub async fn ipc( + self, + ipc_connect: alloy_transport_ipc::IpcConnect, + ) -> TransportResult> + where + alloy_transport_ipc::IpcConnect: alloy_pubsub::PubSubConnect, + L: Layer, + L::Service: Transport, + { + self.pubsub(ipc_connect).await + } + /// Connect a transport, producing an [`RpcClient`] with the provided /// connection. pub async fn connect(self, connect: C) -> TransportResult> diff --git a/crates/rpc-client/src/builtin.rs b/crates/rpc-client/src/builtin.rs new file mode 100644 index 00000000000..a06c7a38c53 --- /dev/null +++ b/crates/rpc-client/src/builtin.rs @@ -0,0 +1,279 @@ +use std::str::FromStr; + +use alloy_json_rpc::RpcError; +use alloy_transport::{BoxTransport, BoxTransportConnect, TransportError, TransportErrorKind}; + +#[cfg(feature = "pubsub")] +use alloy_pubsub::PubSubConnect; + +/// Connection string for built-in transports. +#[derive(Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub enum BuiltInConnectionString { + #[cfg(any(feature = "reqwest", feature = "hyper"))] + /// HTTP transport. + Http(url::Url), + #[cfg(feature = "ws")] + /// WebSocket transport. + Ws(url::Url, Option), + #[cfg(feature = "ipc")] + /// IPC transport. + Ipc(String), +} + +impl BoxTransportConnect for BuiltInConnectionString { + fn is_local(&self) -> bool { + match self { + #[cfg(any(feature = "reqwest", feature = "hyper"))] + Self::Http(url) => alloy_transport::utils::guess_local_url(url), + #[cfg(feature = "ws")] + Self::Ws(url, _) => alloy_transport::utils::guess_local_url(url), + #[cfg(feature = "ipc")] + Self::Ipc(_) => true, + #[cfg(not(any( + feature = "reqwest", + feature = "hyper", + feature = "ws", + feature = "ipc" + )))] + _ => false, + } + } + + fn get_boxed_transport<'a: 'b, 'b>( + &'a self, + ) -> alloy_transport::Pbf<'b, BoxTransport, TransportError> { + Box::pin(async move { self.connect_boxed().await }) + } +} + +impl BuiltInConnectionString { + /// Connect with the given connection string. + /// + /// # Notes + /// + /// - If `hyper` feature is enabled + /// - WS will extract auth, however, auth is disabled for wasm. + pub async fn connect_boxed(&self) -> Result { + // NB: + // HTTP match will always produce hyper if the feature is enabled. + // WS match arms are fall-through. Auth arm is disabled for wasm. + match self { + // reqwest is enabled, hyper is not + #[cfg(all(not(feature = "hyper"), feature = "reqwest"))] + Self::Http(url) => { + Ok( + alloy_transport::Transport::boxed( + alloy_transport_http::Http::::new(url.clone()) + ) + ) + }, + + // hyper is enabled, reqwest is not + #[cfg(feature = "hyper")] + Self::Http(_) => Err(TransportErrorKind::custom_str( + "hyper not supported by BuiltinConnectionString. Please instantiate a hyper client manually", + )), + + #[cfg(all(not(target = "wasm"), feature = "ws"))] + Self::Ws(url, Some(auth)) => { + alloy_transport_ws::WsConnect::with_auth(url.clone(), Some(auth.clone())) + .into_service() + .await + .map(alloy_transport::Transport::boxed) + } + + #[cfg(feature = "ws")] + Self::Ws(url, _) => alloy_transport_ws::WsConnect::new(url.clone()) + .into_service() + .await + .map(alloy_transport::Transport::boxed), + + #[cfg(feature = "ipc")] + Self::Ipc(path) => alloy_transport_ipc::IpcConnect::new(path.to_owned()) + .into_service() + .await + .map(alloy_transport::Transport::boxed), + + #[cfg(not(any(feature = "reqwest", feature = "hyper", feature = "ws", feature = "ipc")))] + _ => Err(TransportErrorKind::custom_str( + "No transports enabled. Enable one of: reqwest, hyper, ws, ipc", + )), + } + } + + /// Tries to parse the given string as an HTTP URL. + #[cfg(any(feature = "reqwest", feature = "hyper"))] + pub fn try_as_http(s: &str) -> Result { + let url = if s.starts_with("localhost:") || s.parse::().is_ok() { + let s = format!("http://{}", s); + url::Url::parse(&s) + } else { + url::Url::parse(s) + } + .map_err(TransportErrorKind::custom)?; + + if url.scheme() != "http" && url.scheme() != "https" { + Err(TransportErrorKind::custom_str("Invalid scheme. Expected http or https"))?; + } + + Ok(Self::Http(url)) + } + + /// Tries to parse the given string as a WebSocket URL. + #[cfg(feature = "ws")] + pub fn try_as_ws(s: &str) -> Result { + let url = if s.starts_with("localhost:") || s.parse::().is_ok() { + let s = format!("ws://{}", s); + url::Url::parse(&s) + } else { + url::Url::parse(s) + } + .map_err(TransportErrorKind::custom)?; + + if url.scheme() != "ws" && url.scheme() != "wss" { + Err(TransportErrorKind::custom_str("Invalid scheme. Expected ws or wss"))?; + } + + let auth = alloy_transport::Authorization::extract_from_url(&url); + + Ok(Self::Ws(url, auth)) + } + + /// Tries to parse the given string as an IPC path, returning an error if + /// the path does not exist. + #[cfg(feature = "ipc")] + pub fn try_as_ipc(s: &str) -> Result { + let s = s.strip_prefix("file://").unwrap_or(s); + let s = s.strip_prefix("ipc://").unwrap_or(s); + + // Check if s is a path and it exists + let path = std::path::Path::new(&s); + + path.is_file().then_some(Self::Ipc(s.to_string())).ok_or_else(|| { + TransportErrorKind::custom_str(&format!( + "Invalid IPC path. File does not exist: {}", + path.display() + )) + }) + } +} + +impl FromStr for BuiltInConnectionString { + type Err = RpcError; + + fn from_str(s: &str) -> Result { + let res = Err(TransportErrorKind::custom_str(&format!( + "No transports enabled. Enable one of: reqwest, hyper, ws, ipc. Connection info: '{}'", + s + ))); + #[cfg(any(feature = "reqwest", feature = "hyper"))] + let res = res.or_else(|_| Self::try_as_http(s)); + #[cfg(feature = "ws")] + let res = res.or_else(|_| Self::try_as_ws(s)); + #[cfg(feature = "ipc")] + let res = res.or_else(|_| Self::try_as_ipc(s)); + res + } +} + +#[cfg(test)] +mod test { + use super::*; + use url::Url; + + #[test] + fn test_parsing_urls() { + assert_eq!( + BuiltInConnectionString::from_str("http://localhost:8545").unwrap(), + BuiltInConnectionString::Http("http://localhost:8545".parse::().unwrap()) + ); + assert_eq!( + BuiltInConnectionString::from_str("localhost:8545").unwrap(), + BuiltInConnectionString::Http("http://localhost:8545".parse::().unwrap()) + ); + assert_eq!( + BuiltInConnectionString::from_str("https://localhost:8545").unwrap(), + BuiltInConnectionString::Http("https://localhost:8545".parse::().unwrap()) + ); + assert_eq!( + BuiltInConnectionString::from_str("localhost:8545").unwrap(), + BuiltInConnectionString::Http("http://localhost:8545".parse::().unwrap()) + ); + assert_eq!( + BuiltInConnectionString::from_str("http://127.0.0.1:8545").unwrap(), + BuiltInConnectionString::Http("http://127.0.0.1:8545".parse::().unwrap()) + ); + + assert_eq!( + BuiltInConnectionString::from_str("http://localhost").unwrap(), + BuiltInConnectionString::Http("http://localhost".parse::().unwrap()) + ); + assert_eq!( + BuiltInConnectionString::from_str("127.0.0.1:8545").unwrap(), + BuiltInConnectionString::Http("http://127.0.0.1:8545".parse::().unwrap()) + ); + } + + #[test] + #[cfg(feature = "ws")] + fn test_parsing_ws() { + use alloy_transport::Authorization; + + assert_eq!( + BuiltInConnectionString::from_str("ws://localhost:8545").unwrap(), + BuiltInConnectionString::Ws("ws://localhost:8545".parse::().unwrap(), None) + ); + assert_eq!( + BuiltInConnectionString::from_str("wss://localhost:8545").unwrap(), + BuiltInConnectionString::Ws("wss://localhost:8545".parse::().unwrap(), None) + ); + assert_eq!( + BuiltInConnectionString::from_str("ws://127.0.0.1:8545").unwrap(), + BuiltInConnectionString::Ws("ws://127.0.0.1:8545".parse::().unwrap(), None) + ); + + assert_eq!( + BuiltInConnectionString::from_str("ws://alice:pass@127.0.0.1:8545").unwrap(), + BuiltInConnectionString::Ws( + "ws://alice:pass@127.0.0.1:8545".parse::().unwrap(), + Some(Authorization::basic("alice", "pass")) + ) + ); + } + + #[test] + #[cfg(feature = "ipc")] + fn test_parsing_ipc() { + // Create a temp file and save it. + let temp_dir = tempfile::tempdir().unwrap(); + let temp_file = temp_dir.path().join("reth.ipc"); + + // Save it + std::fs::write(&temp_file, "reth ipc").unwrap(); + assert!(temp_file.is_file()); + let temp_file_str = temp_file.to_str().unwrap().to_string(); + + assert_eq!( + BuiltInConnectionString::from_str(&format!("ipc://{}", temp_file_str)).unwrap(), + BuiltInConnectionString::Ipc(temp_file_str.clone()) + ); + + assert_eq!( + BuiltInConnectionString::from_str(&format!("file://{}", temp_file_str)).unwrap(), + BuiltInConnectionString::Ipc(temp_file_str.clone()) + ); + + assert_eq!( + BuiltInConnectionString::from_str(temp_file.to_str().unwrap()).unwrap(), + BuiltInConnectionString::Ipc(temp_file_str.clone()) + ); + + // Delete the written file after test + std::fs::remove_file(temp_file).unwrap(); + assert_eq!( + BuiltInConnectionString::from_str("http://user:pass@example.com").unwrap(), + BuiltInConnectionString::Http("http://user:pass@example.com".parse::().unwrap()) + ); + } +} diff --git a/crates/rpc-client/src/client.rs b/crates/rpc-client/src/client.rs index 7960e8deae4..a3c7bd6d91a 100644 --- a/crates/rpc-client/src/client.rs +++ b/crates/rpc-client/src/client.rs @@ -1,6 +1,6 @@ use crate::{poller::PollerBuilder, BatchRequest, ClientBuilder, RpcCall}; use alloy_json_rpc::{Id, Request, RpcParam, RpcReturn}; -use alloy_transport::{BoxTransport, Transport, TransportConnect, TransportError}; +use alloy_transport::{BoxTransport, Transport}; use alloy_transport_http::Http; use std::{ borrow::Cow, @@ -20,6 +20,11 @@ pub type WeakClient = Weak>; pub type ClientRef<'a, T> = &'a RpcClientInner; /// A JSON-RPC client. +/// +/// [`RpcClient`] should never be instantiated directly. Instead, use +/// [`ClientBuilder`]. +/// +/// [`ClientBuilder`]: crate::ClientBuilder #[derive(Debug)] pub struct RpcClient(Arc>); @@ -79,14 +84,6 @@ impl RpcClient { } impl RpcClient { - /// Connect to a transport via a [`TransportConnect`] implementor. - pub async fn connect(connect: C) -> Result - where - C: TransportConnect, - { - ClientBuilder::default().connect(connect).await - } - /// Build a poller that polls a method with the given parameters. /// /// See [`PollerBuilder`] for examples and more details. diff --git a/crates/rpc-client/src/lib.rs b/crates/rpc-client/src/lib.rs index fac949473d5..807d1c75efb 100644 --- a/crates/rpc-client/src/lib.rs +++ b/crates/rpc-client/src/lib.rs @@ -24,6 +24,9 @@ pub use batch::BatchRequest; mod builder; pub use builder::ClientBuilder; +mod builtin; +pub use builtin::BuiltInConnectionString; + mod call; pub use call::RpcCall; diff --git a/crates/transport/src/common.rs b/crates/transport/src/common.rs index 4ccae413a70..5018a696a33 100644 --- a/crates/transport/src/common.rs +++ b/crates/transport/src/common.rs @@ -1,10 +1,10 @@ use base64::{engine::general_purpose, Engine}; -use std::fmt; +use std::{fmt, net::SocketAddr}; /// Basic or bearer authentication in http or websocket transport /// /// Use to inject username and password or an auth token into requests -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum Authorization { /// HTTP Basic Auth Basic(String), @@ -13,12 +13,32 @@ pub enum Authorization { } impl Authorization { - /// Instantiate a new basic auth. + /// Extract the auth info from a URL. + pub fn extract_from_url(url: &url::Url) -> Option { + let username = url.username(); + let password = url.password().unwrap_or_default(); + + // eliminates false positives on the authority + if username.contains("localhost") || username.parse::().is_ok() { + return None; + } + + dbg!(username, password); + + (!username.is_empty() || !password.is_empty()).then(|| Self::basic(username, password)) + } + + /// Instantiate a new basic auth from an authority string. + pub fn authority(auth: impl AsRef) -> Self { + let auth_secret = general_purpose::STANDARD.encode(auth.as_ref()); + Self::Basic(auth_secret) + } + + /// Instantiate a new basic auth from a username and password. pub fn basic(username: impl AsRef, password: impl AsRef) -> Self { let username = username.as_ref(); let password = password.as_ref(); - let auth_secret = general_purpose::STANDARD.encode(format!("{username}:{password}")); - Self::Basic(auth_secret) + Self::authority(format!("{username}:{password}")) } /// Instantiate a new bearer auth.